tress
Easy to use asynchronous job queue. It stores jobs in the memory and runs it asynchronously in parallel with a given concurrency. Successor of caolan/async.queue
.
Install
npm install tress
Synopsis
var tress = require('tress');
var q = tress(function(job, done){
console.log('hello ' + job.name);
someAsyncFunction(job, function(err, data){
if (err) {
done(err);
} else {
done(null, data);
}
});
}, 2);
q.drain = function(){
console.log('Finished');
};
q.error = function(err) {
console.log('Job ' + this + ' failed with error ' + err);
};
q.success = function(data) {
console.log('Job ' + this + ' successfully finished. Result is ' + data);
}
q.push({name: 'Bob'});
q.push({name: 'Alice'});
q.push([{name: 'Good'}, {name: 'Bad'}, {name: 'Ugly'}], function (err) {
console.log('finished processing item');
});
q.unshift({name: 'Cristobal Jose Junta'});
Quick Guide
Basically tress
is a clone of queue
from famous caolan/async
but without all other implements of that Swiss Army Knife of asynchronous code. Although tress
was intended to be an extended and more safe alternative of caolan/async.queue
, but first and foremost tress
is backward compatible. It means, that everywhere you use async.queue
(except undocumented features) you can write tress
instead and it must work.
You can do like this:
var async = require('async');
var q = async.queue(function(job, done){});
var tress = require('tress');
var q = tress(function(job, done){});
Every code using caolan/async.queue
must work with tress
. If it does not work exactly the same way, please start the issue.
All documentation of caolan/async.queue
is right for tress
, but it doesn't describe it completely. Any way, you can use tress
only with this documentation and don't even think about any extra features.
Only exception - tress
require Node.js 4+
and doesn't work in browsers.
Main difference between tress
and caolan/async.queue
is that in tress
job not disappear after worker finished. It moves to failed
or finished
(depends of done
first argument) and can be used later.
Second difference is that in tress
fields of queue object are more safe. They are readable/writable only in correct way.
Also tress
has some new fields in queue object.
Reference
tress(worker, [concurrency])
creates queue object that will store jobs and process them with worker
function in parallel (up to the concurrency
limit).
Arguments:
worker(job, done)
- An asynchronous function for processing a queued job
, which must call its done
argument when finished. Callback done
may take various argumens, but first argument must be error (if job failed), null/undefined (if job successfully finished) or boolean (if job returned to queue head (if true
) or to queue tail (if false
)).
concurrency
- An integer for determining how many worker functions should be run in parallel. If omitted, the concurrency defaults to 1. If negative - no parallel and delay between worker functions (concurrency -1000 sets 1 second delay).
Queue object properties
started
- still false
till any items have been pushed and processed by the queue. Than became true
and never change in queue lifecycle (Not writable).
concurrency
- This property for alter the concurrency/delay on-the-fly.
buffer
A minimum threshold buffer in order to say that the queue is unsaturated.
paused
- a boolean for determining whether the queue is in a paused state. Not writable (use pause()
and resume()
instead).
waiting
(new) - array of queued jobs.
active
(new) - array of jobs currently being processed.
failed
(new) - array of failed jobs (done
callback was called from worker with error in first argument).
finished
(new) - array of correctly finished jobs (done
callback was called from worker with null
or undefined
(or any other false
equivalent) in first argument).
Note, that properties waiting
, active
, failed
and finished
are not writable, but they point to arrays, that you can cahge manually. Do it carefully.
Queue object methods
Note, that in tress
you can't rewrite methods.
push(job, [callback])
- add a new job to the queue. Instead of a single job, a jobs array can be submitted.
unshift(job, [callback])
- add a new job to the front of the queue. Instead of a single job, a jobs array can be submitted.
Note, that if you pass callback to push
or unshift
as second argument, tress
calls this callback once the worker has finished processing the job.
pause()
- a function that pauses the processing of jobs until resume()
is called.
resume()
- a function that resumes the processing of queued jobs when the queue is paused.
kill()
- a function that removes the drain callback and empties remaining jobs from the queue forcing it to go idle.
length()
- a function returning the number of items waiting to be processed.
running()
- a function returning the number of items currently being processed.
workersList()
- a function returning the array of items currently being processed.
idle()
- a function returning false if there are items waiting or being processed, or true if not.
save(callback)
(new) - a function that runs a callback with object, that contains arrays of waiting
, failed
, and finished
jobs. If there are any active
jobs at the moment, they will be concatenated to waiting
array.
load(data)
(new) - a function that loads new arrays from data
object to waiting
, failed
, and finished
arrays and sets active
to empty array. Rise error if started
is true
.
status(job)
(new) a function returning the status of job
('waiting'
, 'running'
, 'finished'
, 'pending'
or 'missing'
).
Queue objects callbacks
You can assign callback function to this six properties. Note, you can't call any of that function manually via tress
property.
saturated
- a callback that is called when the number of running workers hits the concurrency limit, and further jobs will be queued.
unsaturated
- a callback that is called when the number of running workers is less than the concurrency & buffer limits, and further jobs will not be queued.
empty
- a callback that is called when the last item from the queue is given to a worker.
drain
- a callback that is called when the last item from the queue has returned from the worker.
error
(new) - a callback that is called when job failed (worker call done
with error as first argument).
success
(new) - a callback that is called when job correctly finished (worker call done
with null
or undefined
as first argument).
retry
(new) - a callback that is called when job returned to queue (worker call done
with boolean as first argument).
Note, that error
/success
is called after job has been moved from active
to failed
/finished
and after job callback (from push
/unshift
) was called.
License
MIT