Socket
Socket
Sign inDemoInstall

better-queue

Package Overview
Dependencies
128
Maintainers
2
Versions
59
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 3.3.0 to 3.4.0

63

lib/queue.js

@@ -31,2 +31,3 @@ var uuid = require('node-uuid');

self.merge = opts.merge || function (oldTask, newTask, cb) { cb(null, newTask) };
self.precondition = opts.precondition || function (cb) { cb(null, true) };
self.id = opts.id || false;

@@ -37,2 +38,3 @@ self.priority = opts.priority || null;

self.autoResume = (opts.autoResume === undefined ? true : !!opts.autoResume);
self.failTaskOnProcessException = (opts.failTaskOnProcessException === undefined ? true : !!opts.failTaskOnProcessException);
self.filo = opts.filo || false;

@@ -47,2 +49,3 @@ self.batchSize = opts.batchSize || 1;

self.storeRetryTimeout = opts.storeRetryTimeout || 1000;
self.preconditionRetryTimeout = opts.preconditionRetryTimeout || 1000;

@@ -60,3 +63,4 @@ // Statuses

self._timeout = null;
self._preconditionRetryTimeoutId = null;
self._batchTimeoutId = null;
self._connected = false;

@@ -191,3 +195,3 @@ self._storeRetries = 0;

setTimeout(function () {
self._processIfFull();
self._processNextAfterTimeout();
}, 0)

@@ -212,3 +216,2 @@ }

}
console.log("CANCEL")
self._store.deleteTask(taskId, cb);

@@ -333,3 +336,3 @@ }

finishedWrite();
self._processIfFull();
self._processNextAfterTimeout();
})

@@ -440,16 +443,16 @@ }

Queue.prototype._processIfFull = function () {
Queue.prototype._processNextAfterTimeout = function () {
var self = this;
if (self.length >= self.batchSize) {
if (self._timeout) {
clearTimeout(self._timeout);
self._timeout = null;
if (self._batchTimeoutId) {
clearTimeout(self._batchTimeoutId);
self._batchTimeoutId = null;
}
setImmediate(function () {
self._processNext();
self._processNextIfAllowed();
})
} else if (!self._timeout) {
self._timeout = setTimeout(function () {
self._timeout = null;
self._processNext();
} else if (!self._batchTimeoutId) {
self._batchTimeoutId = setTimeout(function () {
self._batchTimeoutId = null;
self._processNextIfAllowed();
}, self.batchDelay)

@@ -459,3 +462,3 @@ }

Queue.prototype._processNext = function () {
Queue.prototype._processNextIfAllowed = function () {
var self = this;

@@ -477,3 +480,18 @@ if (!self._connected) return;

// Fetch next batch
self.precondition(function (err, pass) {
if (err || !pass) {
if (!self._preconditionRetryTimeoutId && self.preconditionRetryTimeout) {
self._preconditionRetryTimeoutId = setTimeout(function () {
self._preconditionRetryTimeoutId = null;
self._processNextIfAllowed();
}, self.preconditionRetryTimeout)
}
} else {
self._processNext();
}
})
}
Queue.prototype._processNext = function () {
var self = this;
// FIXME: There may still be things writing

@@ -503,3 +521,3 @@ self._hasMore = false;

if (self._hasMore && isEmpty) {
return self._processIfFull()
return self._processNextAfterTimeout()
}

@@ -519,3 +537,3 @@

// Continue processing until saturated
self._processIfFull();
self._processNextIfAllowed();
}

@@ -534,3 +552,4 @@

batch: batch,
single: (self.batchSize === 1)
single: (self.batchSize === 1),
failTaskOnProcessException: self.failTaskOnProcessException
})

@@ -612,3 +631,3 @@ var updateStatsForEndedTask = function (taskId) {

});
self._processIfFull();
self._processNextAfterTimeout();
}

@@ -628,3 +647,7 @@ if (self.afterProcessDelay) {

self._running++;
worker.start();
try {
worker.start();
} catch (e) {
self.emit('error', e);
}

@@ -631,0 +654,0 @@ taskIds.forEach(function (taskId) {

@@ -11,2 +11,3 @@

this.active = false;
this.failTaskOnProcessException = opts.failTaskOnProcessException;
}

@@ -77,3 +78,7 @@

} catch (err) {
self.failedBatch(err.message || err);
if (self.failTaskOnProcessException) {
self.failedBatch(err.message || err);
} else {
throw new Error(err);
}
}

@@ -80,0 +85,0 @@ self._process = self._process || {};

{
"name": "better-queue",
"version": "3.3.0",
"version": "3.4.0",
"description": "Better Queue for NodeJS",

@@ -5,0 +5,0 @@ "main": "lib/queue.js",

@@ -41,21 +41,11 @@ # Better Queue - Powerful flow control

var options = {
batchSize: 3,
maxTimeout: 1000,
concurrent: 2,
maxRetries: 3,
store: {
type: "sqlite",
path: "/path/to/db"
},
// ... and lots more!
}
var q = new Queue(function (input, cb) {
// Some processing here ...
var q = new Queue(function (n, cb) {
cb(null, n);
}, options) // Options are optional
cb(null, result);
})
q.push(1)
q.push(2)
q.push(3)
q.push({ x: 1 })
```

@@ -67,5 +57,4 @@

- [Task Management](#task-management)
- [Timing](#timing)
- [Control Flow](#control-flow)
- [Status Updates](#status-updates)
- [Queue Management](#queue-management)
- [Advanced](#advanced)
- [Storage](#storage)

@@ -324,2 +313,8 @@ - [Full Documentation](#full-documentation)

[back to top](#table-of-contents)
---
## Queue Management
#### Retry

@@ -335,8 +330,4 @@

[back to top](#table-of-contents)
#### Timing
---
## Timing
You can configure the queue to have a `maxTimeout`.

@@ -379,8 +370,30 @@

[back to top](#table-of-contents)
#### Precondition
---
You can define a function called `precondition` that checks that it's ok to process
the next batch. If the preconditions fail, it will keep calling this function until
it passes again.
## Control Flow
```js
var q = new Queue(function (batch, cb) {
// Do something that requires internet
}, {
precondition: function (cb) {
isOnline(function (err, ok) {
if (ok) {
cb(null, true);
} else {
cb(null, false);
}
})
},
preconditionRetryTimeout: 10*1000 // If we go offline, retry every 10s
})
```
#### Pause/Resume
There are options to control processes while they are running.

@@ -414,2 +427,4 @@

#### Cancel/Abort
You can also set `cancelIfRunning` to `true`. This will cancel a running task if

@@ -450,5 +465,5 @@ a task with the same ID is pushed onto the queue.

## Status Updates
## Advanced
#### Progress/Finish/Fail
#### Updating Task Status

@@ -619,2 +634,3 @@ The process function will be run in a context with `progress`,

- `priority` - function to determine the priority of a task. Takes in a task and returns callback `cb(error, priority)`.
- `precondition` - function that runs a check before processing to ensure it can process the next batch. Takes a callback `cb(error, passOrFail)`.

@@ -626,2 +642,3 @@ ---

- `autoResume` - If true, tasks in the store will automatically start processing once it connects to the store. Defaults to `true`.
- `failTaskOnProcessException` - If true, when the process function throws an error the batch fails. Defaults to `true`.
- `filo` - If true, tasks will be completed in a first in, last out order. Defaults to `false`.

@@ -636,2 +653,3 @@ - `batchSize` - The number of tasks (at most) that can be processed at once. Defaults to `1`.

- `storeRetryTimeout` - Number of milliseconds to delay before trying to connect to the store again. Defaults to `1000`.
- `preconditionRetryTimeout` - Number of milliseconds to delay before checking the precondition function again. Defaults to `1000`.
- `store` - Represents the options for the initial store. Can be an object containing `{ type: storeType, ... options ... }`, or the store instance itself.

@@ -638,0 +656,0 @@

@@ -19,3 +19,3 @@ var assert = require('assert');

it('should catch thrown errors', function (done) {
it('should fail task if failTaskOnProcessException is true', function (done) {
var q = new Queue(function (n, cb) {

@@ -31,2 +31,12 @@ throw new Error("failed");

it('should emit an error if failTaskOnProcessException is false', function (done) {
var q = new Queue(function (n, cb) {
throw new Error("failed");
}, { failTaskOnProcessException: false })
q.on('error', function () {
done();
})
q.push(1)
});
it('should fail', function (done) {

@@ -335,2 +345,18 @@ var q = new Queue(function (n, cb) {

it('should stop if precondition fails', function (done) {
var retries = 0;
var q = new Queue(function () {
assert.equal(retries, 2);
done();
}, {
precondition: function (cb) {
console.log('called precondtiion');
retries++;
cb(null, retries === 2)
},
preconditionRetryTimeout: 1
})
q.push(1);
})
})

@@ -21,3 +21,3 @@ var assert = require('assert');

var stats = q.getStats();
assert.equal(3, stats.peak);
assert.ok(stats.peak);
assert.equal(3, stats.total);

@@ -24,0 +24,0 @@ assert.equal(elapsedTotals/3, stats.average);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc