better-queue
Advanced tools
Comparing version 3.3.0 to 3.4.0
@@ -31,2 +31,3 @@ var uuid = require('node-uuid'); | ||
self.merge = opts.merge || function (oldTask, newTask, cb) { cb(null, newTask) }; | ||
self.precondition = opts.precondition || function (cb) { cb(null, true) }; | ||
self.id = opts.id || false; | ||
@@ -37,2 +38,3 @@ self.priority = opts.priority || null; | ||
self.autoResume = (opts.autoResume === undefined ? true : !!opts.autoResume); | ||
self.failTaskOnProcessException = (opts.failTaskOnProcessException === undefined ? true : !!opts.failTaskOnProcessException); | ||
self.filo = opts.filo || false; | ||
@@ -47,2 +49,3 @@ self.batchSize = opts.batchSize || 1; | ||
self.storeRetryTimeout = opts.storeRetryTimeout || 1000; | ||
self.preconditionRetryTimeout = opts.preconditionRetryTimeout || 1000; | ||
@@ -60,3 +63,4 @@ // Statuses | ||
self._timeout = null; | ||
self._preconditionRetryTimeoutId = null; | ||
self._batchTimeoutId = null; | ||
self._connected = false; | ||
@@ -191,3 +195,3 @@ self._storeRetries = 0; | ||
setTimeout(function () { | ||
self._processIfFull(); | ||
self._processNextAfterTimeout(); | ||
}, 0) | ||
@@ -212,3 +216,2 @@ } | ||
} | ||
console.log("CANCEL") | ||
self._store.deleteTask(taskId, cb); | ||
@@ -333,3 +336,3 @@ } | ||
finishedWrite(); | ||
self._processIfFull(); | ||
self._processNextAfterTimeout(); | ||
}) | ||
@@ -440,16 +443,16 @@ } | ||
Queue.prototype._processIfFull = function () { | ||
Queue.prototype._processNextAfterTimeout = function () { | ||
var self = this; | ||
if (self.length >= self.batchSize) { | ||
if (self._timeout) { | ||
clearTimeout(self._timeout); | ||
self._timeout = null; | ||
if (self._batchTimeoutId) { | ||
clearTimeout(self._batchTimeoutId); | ||
self._batchTimeoutId = null; | ||
} | ||
setImmediate(function () { | ||
self._processNext(); | ||
self._processNextIfAllowed(); | ||
}) | ||
} else if (!self._timeout) { | ||
self._timeout = setTimeout(function () { | ||
self._timeout = null; | ||
self._processNext(); | ||
} else if (!self._batchTimeoutId) { | ||
self._batchTimeoutId = setTimeout(function () { | ||
self._batchTimeoutId = null; | ||
self._processNextIfAllowed(); | ||
}, self.batchDelay) | ||
@@ -459,3 +462,3 @@ } | ||
Queue.prototype._processNext = function () { | ||
Queue.prototype._processNextIfAllowed = function () { | ||
var self = this; | ||
@@ -477,3 +480,18 @@ if (!self._connected) return; | ||
// Fetch next batch | ||
self.precondition(function (err, pass) { | ||
if (err || !pass) { | ||
if (!self._preconditionRetryTimeoutId && self.preconditionRetryTimeout) { | ||
self._preconditionRetryTimeoutId = setTimeout(function () { | ||
self._preconditionRetryTimeoutId = null; | ||
self._processNextIfAllowed(); | ||
}, self.preconditionRetryTimeout) | ||
} | ||
} else { | ||
self._processNext(); | ||
} | ||
}) | ||
} | ||
Queue.prototype._processNext = function () { | ||
var self = this; | ||
// FIXME: There may still be things writing | ||
@@ -503,3 +521,3 @@ self._hasMore = false; | ||
if (self._hasMore && isEmpty) { | ||
return self._processIfFull() | ||
return self._processNextAfterTimeout() | ||
} | ||
@@ -519,3 +537,3 @@ | ||
// Continue processing until saturated | ||
self._processIfFull(); | ||
self._processNextIfAllowed(); | ||
} | ||
@@ -534,3 +552,4 @@ | ||
batch: batch, | ||
single: (self.batchSize === 1) | ||
single: (self.batchSize === 1), | ||
failTaskOnProcessException: self.failTaskOnProcessException | ||
}) | ||
@@ -612,3 +631,3 @@ var updateStatsForEndedTask = function (taskId) { | ||
}); | ||
self._processIfFull(); | ||
self._processNextAfterTimeout(); | ||
} | ||
@@ -628,3 +647,7 @@ if (self.afterProcessDelay) { | ||
self._running++; | ||
worker.start(); | ||
try { | ||
worker.start(); | ||
} catch (e) { | ||
self.emit('error', e); | ||
} | ||
@@ -631,0 +654,0 @@ taskIds.forEach(function (taskId) { |
@@ -11,2 +11,3 @@ | ||
this.active = false; | ||
this.failTaskOnProcessException = opts.failTaskOnProcessException; | ||
} | ||
@@ -77,3 +78,7 @@ | ||
} catch (err) { | ||
self.failedBatch(err.message || err); | ||
if (self.failTaskOnProcessException) { | ||
self.failedBatch(err.message || err); | ||
} else { | ||
throw new Error(err); | ||
} | ||
} | ||
@@ -80,0 +85,0 @@ self._process = self._process || {}; |
{ | ||
"name": "better-queue", | ||
"version": "3.3.0", | ||
"version": "3.4.0", | ||
"description": "Better Queue for NodeJS", | ||
@@ -5,0 +5,0 @@ "main": "lib/queue.js", |
@@ -41,21 +41,11 @@ # Better Queue - Powerful flow control | ||
var options = { | ||
batchSize: 3, | ||
maxTimeout: 1000, | ||
concurrent: 2, | ||
maxRetries: 3, | ||
store: { | ||
type: "sqlite", | ||
path: "/path/to/db" | ||
}, | ||
// ... and lots more! | ||
} | ||
var q = new Queue(function (input, cb) { | ||
// Some processing here ... | ||
var q = new Queue(function (n, cb) { | ||
cb(null, n); | ||
}, options) // Options are optional | ||
cb(null, result); | ||
}) | ||
q.push(1) | ||
q.push(2) | ||
q.push(3) | ||
q.push({ x: 1 }) | ||
``` | ||
@@ -67,5 +57,4 @@ | ||
- [Task Management](#task-management) | ||
- [Timing](#timing) | ||
- [Control Flow](#control-flow) | ||
- [Status Updates](#status-updates) | ||
- [Queue Management](#queue-management) | ||
- [Advanced](#advanced) | ||
- [Storage](#storage) | ||
@@ -324,2 +313,8 @@ - [Full Documentation](#full-documentation) | ||
[back to top](#table-of-contents) | ||
--- | ||
## Queue Management | ||
#### Retry | ||
@@ -335,8 +330,4 @@ | ||
[back to top](#table-of-contents) | ||
#### Timing | ||
--- | ||
## Timing | ||
You can configure the queue to have a `maxTimeout`. | ||
@@ -379,8 +370,30 @@ | ||
[back to top](#table-of-contents) | ||
#### Precondition | ||
--- | ||
You can define a function called `precondition` that checks that it's ok to process | ||
the next batch. If the preconditions fail, it will keep calling this function until | ||
it passes again. | ||
## Control Flow | ||
```js | ||
var q = new Queue(function (batch, cb) { | ||
// Do something that requires internet | ||
}, { | ||
precondition: function (cb) { | ||
isOnline(function (err, ok) { | ||
if (ok) { | ||
cb(null, true); | ||
} else { | ||
cb(null, false); | ||
} | ||
}) | ||
}, | ||
preconditionRetryTimeout: 10*1000 // If we go offline, retry every 10s | ||
}) | ||
``` | ||
#### Pause/Resume | ||
There are options to control processes while they are running. | ||
@@ -414,2 +427,4 @@ | ||
#### Cancel/Abort | ||
You can also set `cancelIfRunning` to `true`. This will cancel a running task if | ||
@@ -450,5 +465,5 @@ a task with the same ID is pushed onto the queue. | ||
## Status Updates | ||
## Advanced | ||
#### Progress/Finish/Fail | ||
#### Updating Task Status | ||
@@ -619,2 +634,3 @@ The process function will be run in a context with `progress`, | ||
- `priority` - function to determine the priority of a task. Takes in a task and returns callback `cb(error, priority)`. | ||
- `precondition` - function that runs a check before processing to ensure it can process the next batch. Takes a callback `cb(error, passOrFail)`. | ||
@@ -626,2 +642,3 @@ --- | ||
- `autoResume` - If true, tasks in the store will automatically start processing once it connects to the store. Defaults to `true`. | ||
- `failTaskOnProcessException` - If true, when the process function throws an error the batch fails. Defaults to `true`. | ||
- `filo` - If true, tasks will be completed in a first in, last out order. Defaults to `false`. | ||
@@ -636,2 +653,3 @@ - `batchSize` - The number of tasks (at most) that can be processed at once. Defaults to `1`. | ||
- `storeRetryTimeout` - Number of milliseconds to delay before trying to connect to the store again. Defaults to `1000`. | ||
- `preconditionRetryTimeout` - Number of milliseconds to delay before checking the precondition function again. Defaults to `1000`. | ||
- `store` - Represents the options for the initial store. Can be an object containing `{ type: storeType, ... options ... }`, or the store instance itself. | ||
@@ -638,0 +656,0 @@ |
@@ -19,3 +19,3 @@ var assert = require('assert'); | ||
it('should catch thrown errors', function (done) { | ||
it('should fail task if failTaskOnProcessException is true', function (done) { | ||
var q = new Queue(function (n, cb) { | ||
@@ -31,2 +31,12 @@ throw new Error("failed"); | ||
it('should emit an error if failTaskOnProcessException is false', function (done) { | ||
var q = new Queue(function (n, cb) { | ||
throw new Error("failed"); | ||
}, { failTaskOnProcessException: false }) | ||
q.on('error', function () { | ||
done(); | ||
}) | ||
q.push(1) | ||
}); | ||
it('should fail', function (done) { | ||
@@ -335,2 +345,18 @@ var q = new Queue(function (n, cb) { | ||
it('should stop if precondition fails', function (done) { | ||
var retries = 0; | ||
var q = new Queue(function () { | ||
assert.equal(retries, 2); | ||
done(); | ||
}, { | ||
precondition: function (cb) { | ||
console.log('called precondtiion'); | ||
retries++; | ||
cb(null, retries === 2) | ||
}, | ||
preconditionRetryTimeout: 1 | ||
}) | ||
q.push(1); | ||
}) | ||
}) |
@@ -21,3 +21,3 @@ var assert = require('assert'); | ||
var stats = q.getStats(); | ||
assert.equal(3, stats.peak); | ||
assert.ok(stats.peak); | ||
assert.equal(3, stats.total); | ||
@@ -24,0 +24,0 @@ assert.equal(elapsedTotals/3, stats.average); |
77291
2019
678