Better Queue - Powerful flow control
Super simple to use
Better Queue is designed to be simple to set up but still let you do complex things.
var Queue = require('better-queue');
var q = new Queue(function (n, cb) {
cb(null, n+1);
})
q.push(1)
q.push(2)
q.push(3)
Table of contents
You will be able to combine any (and all) of these options
for your queue!
Setting up the queue
You can control how many tasks happen at the same time.
var q = new Queue(fn, { concurrent: 3 })
Now the queue will allow 3 tasks running at the same time. (By
default, we handle tasks one at a time.)
You can also turn the queue into a stack by turning on filo
.
var q = new Queue(fn, { filo: true })
Now items you push on will be handled first.
Queuing
It's very easy to push tasks into the queue.
var q = new Queue(fn);
q.push(1);
q.push({ x: 1, y: 2 });
q.push("hello");
You can also include a callback as a second parameter to the push
function, which would be called when that task is done. For example:
var q = new Queue(fn);
q.push(1, function (err, result) {
});
You can also listen to events on the results of the push
call.
var q = new Queue(fn);
q.push(1)
.on('done', function (result) {
})
.on('fail', function (err) {
})
Alternatively, you can subscribe to the queue's events.
var q = new Queue(fn);
q.on('task_finish', function (taskId, result) {
})
q.on('task_failed', function (taskId, err) {
})
q.on('empty', function (){})
q.on('drain', function (){})
q.push({ id: 1, a: 1, b: 2 });
q.push({ id: 2, a: 2, b: 3 });
empty
event fires when all of the tasks have been pulled off of
the queue (there may still be tasks running!)
drain
event fires when there are no more tasks on the queue and
when no more tasks are running.
back to top
Task Management
Batch Processing
Tasks can be identified by task.id
. If it isn't defined,
a unique ID is automatically assigned. One thing you can do
with Task ID is merge tasks with the same ID.
var counter = new Queue(function (task, cb) {
console.log("I have %d %ss.", task.count, task.id);
cb();
}, {
merge: function (oldTask, newTask, cb) {
oldTask.count += newTask.count;
cb(null, oldTask);
}
})
counter.push({ id: 'apple', count: 2 });
counter.push({ id: 'apple', count: 1 });
counter.push({ id: 'orange', count: 1 });
counter.push({ id: 'orange', count: 1 });
Your processing function can also be modified to handle multiple
tasks at the same time. For example:
var ages = new Queue(function (batch, cb) {
cb();
}, { batchSize: 3 })
ages.push({ id: 'steve', age: 21 });
ages.push({ id: 'john', age: 34 });
ages.push({ id: 'joe', age: 18 });
ages.push({ id: 'mary', age: 23 });
Note how the queue will only handle at most 3 items at a time.
Below is another example of a batched call with numbers.
var ages = new Queue(function (batch, cb) {
cb();
}, { batchSize: 3 })
ages.push(1);
ages.push(2);
ages.push(3);
Filtering, Validation and Priority
You can also format (and filter) the input that arrives from a push
before it gets processed by the queue by passing in a filter
function.
var greeter = new Queue(function (name, cb) {
console.log("Hello, %s!", name)
cb();
}, {
filter: function (input, cb) {
if (input === 'Bob') {
return cb('not_allowed');
}
return cb(null, input.toUpperCase())
}
});
greeter.push('anna');
This can be particularly useful if your queue needs to do some pre-processing,
input validation, database lookup, etc. before you load it onto the queue.
You can also define a priority function to control which tasks get
processed first.
var greeter = new Queue(function (name, cb) {
console.log("Greetings, %s.", name);
cb();
}, {
priority: function (name, cb) {
if (name === "Steve") return cb(null, 10);
if (name === "Mary") return cb(null, 5);
if (name === "Joe") return cb(null, 5);
cb(null, 1);
}
})
greeter.push("Steve");
greeter.push("John");
greeter.push("Joe");
greeter.push("Mary");
If filo
is set to true
in the example above, then Joe and Mary
would swap order.
Retry
You can set tasks to retry maxRetries
times if they fail. By default,
tasks will fail (and will not retry.)
var q = new Queue(fn, { maxRetries: 10 })
Progress/Finish/Fail Specific Tasks
The process function will be run in a context of a Worker
object, which
gives you access to functions to help report on the status of specific tasks.
The example below illustrates how you can use these functions:
var uploader = new Queue(function (file, cb) {
this.progress()
});
uploader.on('task_progress', function (taskId, progress) {})
uploader.push('/some/file.jpg')
.on('progress', function (err, progress) {
})
back to top
Timing
You can configure the queue to have a maxTimeout
.
var q = new Queue(function (name, cb) {
someLongTask(function () {
cb();
})
}, { maxTimeout: 2000 })
After 2 seconds, the process will throw an error instead of waiting for the
callback to finish.
You can also delay the queue before it starts its processing. This is the
behaviour of a timed cargo.
var q = new Queue(function (batch, cb) {
cb();
}, { batchSize: 5, processDelay: 2000 })
q.push(1);
setTimeout(function () {
q.push(2);
}, 1000)
You can also set idleTimeout
, which will delay processing between tasks.
var q = new Queue(function (task, cb) {
cb();
}, { idleTimeout: 1000 })
q.push(1);
q.push(2);
back to top
Control Flow
There are even more options to control
- cancel, pause, resume
- cancelIfRunning
back to top
Storage
back to top
Full Documentation