better-queue
Advanced tools
Comparing version 2.2.3 to 3.0.1
123
lib/queue.js
@@ -31,3 +31,3 @@ var uuid = require('node-uuid'); | ||
self.merge = opts.merge || function (oldTask, newTask, cb) { cb(null, newTask) }; | ||
self.id = opts.id || 'id'; | ||
self.id = opts.id || false; | ||
self.priority = opts.priority || null; | ||
@@ -39,9 +39,12 @@ | ||
self.batchSize = opts.batchSize || 1; | ||
self.batchDelay = opts.batchDelay || 0; | ||
self.afterProcessDelay = opts.afterProcessDelay || 0; | ||
self.concurrent = opts.concurrent || 1; | ||
self.processDelay = opts.processDelay || 0; | ||
self.maxTimeout = opts.maxTimeout || Infinity; | ||
self.idleTimeout = opts.idleTimeout || 0; | ||
self.maxRetries = opts.maxRetries || 0; | ||
self.storeMaxRetries = opts.storeMaxRetries || Infinity; | ||
self.storeRetryTimeout = opts.storeRetryTimeout || 1000; | ||
// Statuses | ||
self.length = 0; | ||
self._stopped = false; | ||
@@ -52,2 +55,3 @@ self._saturated = false; | ||
self._connected = false; | ||
self._storeRetries = 0; | ||
@@ -70,3 +74,3 @@ // Locks | ||
// Initialize Storage | ||
self.use(opts.store || 'memory'); | ||
self.use(opts.store || 'sqlite'); | ||
if (!self._store) { | ||
@@ -124,5 +128,19 @@ throw new Error('Queue cannot continue without a valid store.') | ||
self._tasksWaitingForConnect = []; | ||
self._store.connect(function (err) { | ||
if (err) return; | ||
self._connectToStore(); | ||
} | ||
Queue.prototype._connectToStore = function () { | ||
var self = this; | ||
if (self._connected) return; | ||
if (self._storeRetries >= self.storeMaxRetries) { | ||
return self.emit('error', new Error('failed_connect_to_store')); | ||
} | ||
self._storeRetries++; | ||
self._store.connect(function (err, len) { | ||
if (err) return setTimeout(function () { | ||
self._connectToStore(); | ||
}, self.storeRetryTimeout); | ||
self._connected = true; | ||
self.length = len; | ||
self._storeRetries = 0; | ||
if (!self._stopped && self.autoResume) { | ||
@@ -135,2 +153,3 @@ self.resume(); | ||
}) | ||
} | ||
@@ -147,3 +166,3 @@ | ||
setTimeout(function () { | ||
self._processNext(); | ||
self._processIfFull(); | ||
}, 0) | ||
@@ -241,3 +260,2 @@ } | ||
} | ||
// If something else has written to taskId, then wait. | ||
@@ -251,2 +269,3 @@ if (self._writing[taskId].id !== writeId) { | ||
// Task is in the queue | ||
self.length++; | ||
self.emit('task_queued', taskId, task); | ||
@@ -264,15 +283,7 @@ self._tickets[taskId].queued(); | ||
self._hasMore = true; | ||
return finishedWrite(); | ||
} | ||
// If we're not already waiting, wait processDelay before starting. | ||
if (!self._timeout) { | ||
self._timeout = setTimeout(function () { | ||
self._timeout = null; | ||
self._processNext(); | ||
}, self.processDelay) | ||
} | ||
// Write more | ||
finishedWrite() | ||
// Finish writing | ||
finishedWrite(); | ||
self._processIfFull(); | ||
}) | ||
@@ -375,3 +386,2 @@ } | ||
Queue.prototype._drained = function () { | ||
this._emptied(); | ||
if (this._calledDrain) return; | ||
@@ -386,3 +396,3 @@ this._calledDrain = true; | ||
Queue.prototype._processNext = function () { | ||
Queue.prototype._processIfFull = function () { | ||
var self = this; | ||
@@ -394,4 +404,33 @@ self._saturated = (self._running >= self.concurrent); | ||
if (self._fetchingNext) return; | ||
if (!self.length) { | ||
if (!self._hasMore) { | ||
self._emptied(); | ||
if (!self._running) { | ||
self._drained(); | ||
} | ||
} | ||
return; | ||
} | ||
if (self.length >= self.batchSize) { | ||
if (self._timeout) { | ||
clearTimeout(self._timeout); | ||
self._timeout = null; | ||
} | ||
setImmediate(function () { | ||
self._processNext(); | ||
}) | ||
} else if (!self._timeout) { | ||
self._timeout = setTimeout(function () { | ||
self._timeout = null; | ||
self._processNext(); | ||
}, self.batchDelay) | ||
} | ||
} | ||
Queue.prototype._processNext = function () { | ||
var self = this; | ||
// Fetch next batch | ||
// FIXME: There may still be things writing | ||
self._hasMore = false; | ||
@@ -402,17 +441,25 @@ self._fetchingNext = true; | ||
if (err || !batch) return; | ||
var batchSize = Object.keys(batch).length; | ||
var isEmpty = (batchSize === 0); | ||
if (!self._hasMore) { | ||
var isEmpty = !Object.keys(batch).length; | ||
if (isEmpty && !self._running) { | ||
return self._drained(); | ||
if (self.length < batchSize) { | ||
self.length = batchSize; | ||
} | ||
self.length -= batchSize; | ||
if (!self._hasMore && isEmpty) { | ||
self._emptied(); | ||
if (!self._running) { | ||
self._drained(); | ||
} | ||
if (isEmpty) { | ||
return self._emptied(); | ||
} | ||
} else if (!Object.keys(batch).length) { | ||
return setImmediate(function () { | ||
self._processNext(); | ||
}) | ||
return; | ||
} | ||
// TODO: Collect stats | ||
// The write queue wasn't empty on fetch, so we should fetch more. | ||
if (self._hasMore && isEmpty) { | ||
return self._processIfFull() | ||
} | ||
var tickets = {}; | ||
@@ -430,5 +477,3 @@ Object.keys(batch).forEach(function (taskId) { | ||
// Continue processing until saturated | ||
setImmediate(function () { | ||
self._processNext(); | ||
}) | ||
self._processIfFull(); | ||
} | ||
@@ -504,9 +549,9 @@ | ||
}); | ||
if (self.idleTimeout) { | ||
if (self.afterProcessDelay) { | ||
setTimeout(function () { | ||
self._processNext(); | ||
}, self.idleTimeout); | ||
self._processIfFull(); | ||
}, self.afterProcessDelay); | ||
} else { | ||
setImmediate(function () { | ||
self._processNext(); | ||
self._processIfFull(); | ||
}) | ||
@@ -513,0 +558,0 @@ } |
@@ -21,3 +21,3 @@ | ||
MemoryStore.prototype.connect = function (cb) { | ||
cb() | ||
cb(null, this._queue.length); | ||
} | ||
@@ -24,0 +24,0 @@ |
@@ -35,4 +35,10 @@ var uuid = require('node-uuid'); | ||
self.db = new sqlite3.Database(self.path, function (err) { | ||
if (err) return cb(err); | ||
self.db.run("CREATE TABLE IF NOT EXISTS tasks (id TEXT UNIQUE, lock TEXT, task TEXT, priority NUMERIC, added INTEGER PRIMARY KEY AUTOINCREMENT)", cb); | ||
if (err) return cb({ message: 'failed_to_open_sqlite_db' }); | ||
self.db.run("CREATE TABLE IF NOT EXISTS tasks (id TEXT UNIQUE, lock TEXT, task TEXT, priority NUMERIC, added INTEGER PRIMARY KEY AUTOINCREMENT)", function (err) { | ||
if (err) return cb({ message: 'failed_to_create_table' }); | ||
self.db.get("SELECT count(*) as n FROM tasks WHERE lock = ''", function (err, row) { | ||
if (err) return cb({ message: 'failed_to_fetch_count' }); | ||
cb(null, row.n); | ||
}); | ||
}); | ||
}); | ||
@@ -39,0 +45,0 @@ } |
{ | ||
"name": "better-queue", | ||
"version": "2.2.3", | ||
"version": "3.0.1", | ||
"description": "Better Queue for NodeJS", | ||
@@ -5,0 +5,0 @@ "main": "lib/queue.js", |
@@ -357,3 +357,3 @@ # Better Queue - Powerful flow control | ||
cb(); | ||
}, { batchSize: 5, processDelay: 2000 }) | ||
}, { batchSize: 5, batchDelay: 2000 }) | ||
q.push(1); | ||
@@ -365,3 +365,3 @@ setTimeout(function () { | ||
You can also set `idleTimeout`, which will delay processing between tasks. | ||
You can also set `afterProcessDelay`, which will delay processing between tasks. | ||
@@ -371,3 +371,3 @@ ```js | ||
cb(); // Will wait 1 second before taking the next task | ||
}, { idleTimeout: 1000 }) | ||
}, { afterProcessDelay: 1000 }) | ||
q.push(1); | ||
@@ -590,3 +590,2 @@ q.push(2); | ||
- `merge` - function to merge tasks with the same task ID. Will be run with `oldTask`, `newTask` and a callback `cb(error, mergedTask)`. If you define this function then the callback is expected to be called. | ||
- `id` - By default, this will be the "id" property of the task (if it's an object.) This can be a string representing which property of the task to be used as the ID. It can also be a function that takes in a task and returns a callback `cb(error, taskId)`. | ||
- `priority` - function to determine the priority of a task. Takes in a task and returns callback `cb(error, priority)`. | ||
@@ -596,2 +595,3 @@ | ||
- `id` - The property to use as the task ID. This can be a string or a function (for more complicated IDs). The function `(task, cb)` and must call the callback with `cb(error, taskId)`. | ||
- `cancelIfRunning` - If true, when a task with the same ID is running, its worker will be cancelled. Defaults to `false`. | ||
@@ -601,7 +601,9 @@ - `autoResume` - If true, tasks in the store will automatically start processing once it connects to the store. Defaults to `true`. | ||
- `batchSize` - The number of tasks (at most) that can be processed at once. Defaults to `1`. | ||
- `batchDelay` - Number of milliseconds to delay before starting to popping items off the queue. Defaults to `0`. | ||
- `concurrent` - Number of workers that can be running at any given time. Defaults to `1`. | ||
- `processDelay` - Number of milliseconds to delay before starting to popping items off the queue. Defaults to `0`. | ||
- `maxTimeout` - Number of milliseconds before a task is considered timed out. Defaults to `Infinity`. | ||
- `idleTimeout` - Number of milliseconds to delay before processing the next batch of items. Defaults to `1`. | ||
- `afterProcessDelay` - Number of milliseconds to delay before processing the next batch of items. Defaults to `1`. | ||
- `maxRetries` - Maximum number of attempts to retry on a failed task. Defaults to `0`. | ||
- `storeMaxRetries` - Maximum number of attempts before giving up on the store. Defaults to `Infinity`. | ||
- `storeRetryTimeout` - Number of milliseconds to delay before trying to connect to the store again. Defaults to `1000`. | ||
- `store` - Represents the options for the initial store. Can be an object containing `{ type: storeType, ... options ... }`, or the store instance itself. | ||
@@ -608,0 +610,0 @@ |
34
test.js
@@ -96,11 +96,29 @@ var Queue = require('./lib/queue') | ||
var counter = new Queue(function (task, cb) { cb() }, { id: 'id', store: { type: 'sqlite', path: './abc.db' } }) | ||
counter.on('task_finish', function (taskId, result) { | ||
// taskId will be 'jim' or 'bob' | ||
console.log(taskId) | ||
}) | ||
counter.push({ id: 'jim', count: 2 }); | ||
counter.push({ id: 'jim', count: 2 }); | ||
counter.push({ id: 'bob', count: 1 }); | ||
// var counter = new Queue(function (task, cb) { cb() }, { id: 'id', store: { type: 'sqlite', path: './abc.db' } }) | ||
// counter.on('task_finish', function (taskId, result) { | ||
// // taskId will be 'jim' or 'bob' | ||
// console.log(taskId) | ||
// }) | ||
// counter.push({ id: 'jim', count: 2 }); | ||
// counter.push({ id: 'jim', count: 2 }); | ||
// counter.push({ id: 'bob', count: 1 }); | ||
// var q = new Queue(function (b, cb) { | ||
// console.log("Pushed %s.", b.join(', ')); | ||
// cb(); | ||
// }, { batchSize: 3, batchDelay: 950 }) | ||
// q.push(1); | ||
// q.push(2); | ||
// q.push(3); | ||
// q.push(4); | ||
// setTimeout(function () { | ||
// q.push(5); | ||
// q.push(6); | ||
// setTimeout(function () { | ||
// q.push(7); | ||
// q.push(8); | ||
// q.push(9); | ||
// q.push(10); | ||
// }, 1000) | ||
// }, 400) | ||
@@ -152,2 +152,39 @@ var assert = require('assert'); | ||
it('should batch delay', function (done) { | ||
var batches = 0; | ||
var q = new Queue(function (batch, cb) { | ||
batches++; | ||
if (batches === 1) { | ||
assert.equal(batch.length, 2); | ||
return cb(); | ||
} | ||
if (batches === 2) { | ||
assert.equal(batch.length, 1); | ||
cb(); | ||
return done(); | ||
} | ||
}, { batchSize: 2, batchDelay: 2 }); | ||
q.push(1); | ||
q.push(2); | ||
q.push(3); | ||
}) | ||
it('should batch 2', function (done) { | ||
var finished = 0; | ||
var q = new Queue(function (batch, cb) { | ||
finished++; | ||
assert.equal(batch.length, 1); | ||
if (finished >= 2) { | ||
done(); | ||
} | ||
cb(); | ||
}, { batchSize: 2, batchDelay: 1 }); | ||
q.push(1) | ||
.on('queued', function () { | ||
setTimeout(function () { | ||
q.push(2); | ||
}, 2) | ||
}) | ||
}) | ||
it('should drain and empty', function (done) { | ||
@@ -154,0 +191,0 @@ var emptied = false; |
@@ -130,2 +130,3 @@ var assert = require('assert'); | ||
}, { | ||
id: 'id', | ||
merge: function (a, b, cb) { | ||
@@ -254,3 +255,3 @@ a.x += b.x; | ||
} | ||
}, { cancelIfRunning: true }) | ||
}, { id: 'id', cancelIfRunning: true }) | ||
q.push({ id: 1 }) | ||
@@ -265,5 +266,3 @@ .on('started', function () { | ||
// TODO: Test progress | ||
// TODO: Test stores | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
67946
19
1770
634