What is bull?
Bull is a Node library that implements a fast and robust queue system based on Redis. It is designed to handle background jobs and message queues, providing features like job scheduling, concurrency control, and job prioritization.
What are bull's main functionalities?
Job Creation
This feature allows you to create and add jobs to a queue. The code sample demonstrates how to create a queue named 'myQueue' and add a job with data { foo: 'bar' } to it.
const Queue = require('bull');
const myQueue = new Queue('myQueue');
myQueue.add({ foo: 'bar' });
Job Processing
This feature allows you to define a processor for the jobs in the queue. The code sample shows how to process jobs in 'myQueue' by logging the job data and performing some processing.
myQueue.process(async (job) => {
console.log(job.data);
// Process the job
});
Job Scheduling
This feature allows you to schedule jobs to be processed at a later time. The code sample demonstrates how to add a job to 'myQueue' that will be processed after a delay of 60 seconds.
myQueue.add({ foo: 'bar' }, { delay: 60000 });
Job Prioritization
This feature allows you to set the priority of jobs in the queue. The code sample shows how to add a job with a priority of 1 to 'myQueue'. Higher priority jobs are processed before lower priority ones.
myQueue.add({ foo: 'bar' }, { priority: 1 });
Concurrency Control
This feature allows you to control the number of concurrent job processors. The code sample demonstrates how to process up to 5 jobs concurrently in 'myQueue'.
myQueue.process(5, async (job) => {
console.log(job.data);
// Process the job
});
Other packages similar to bull
kue
Kue is another Redis-based priority job queue for Node.js. It provides a similar set of features to Bull, including job creation, processing, scheduling, and prioritization. However, Kue has a more extensive UI for monitoring and managing jobs.
agenda
Agenda is a lightweight job scheduling library for Node.js that uses MongoDB for persistence. It offers features like job scheduling, concurrency control, and job prioritization. Unlike Bull, which uses Redis, Agenda uses MongoDB, making it a good choice for applications already using MongoDB.
bee-queue
Bee-Queue is a simple, fast, and robust job/task queue for Node.js, backed by Redis. It is designed for high performance and low latency, making it suitable for real-time applications. Bee-Queue focuses on simplicity and performance, whereas Bull offers more advanced features and flexibility.
Bull Job Manager
The fastest, more reliable redis based queue for nodejs.
Carefully written for rock solid stability and atomicity.
Follow manast for news and updates regarding this library.
Are you developing bull sponsored by a company? Please, let us now!
Features:
- Minimal CPU usage by poll free design.
- Robust design based on Redis.
- Delayed jobs.
- Retries.
- Priority.
- Concurrency.
- Pause/resume (globally or locally).
- Automatic recovery from process crashes.
UIs:
There are a few third party UIs that can be used for easier administration of the queues (not in any particular order):
We also have an official UI which is at the moment bare bones project: bull-ui
Install:
npm install bull
Note that you need a redis version higher or equal than 2.8.11 for bull to work properly.
Quick Guide
var Queue = require('bull');
var videoQueue = Queue('video transcoding', 6379, '127.0.0.1');
var audioQueue = Queue('audio transcoding', 6379, '127.0.0.1');
var imageQueue = Queue('image transcoding', 6379, '127.0.0.1');
var pdfQueue = Queue('pdf transcoding', 6379, '127.0.0.1');
videoQueue.process(function(job, done){
job.progress(42);
done();
done(Error('error transcoding'));
done(null, { framerate: 29.5 });
throw (Error('some unexpected error'));
});
audioQueue.process(function(job, done){
job.progress(42);
done();
done(Error('error transcoding'));
done(null, { samplerate: 48000 });
throw (Error('some unexpected error'));
});
imageQueue.process(function(job, done){
job.progress(42);
done();
done(Error('error transcoding'));
done(null, { width: 1280, height: 720 });
throw (Error('some unexpected error'));
});
pdfQueue.process(function(job){
return pdfAsyncProcessor();
});
videoQueue.add({video: 'http://example.com/video1.mov'});
audioQueue.add({audio: 'http://example.com/audio1.mp3'});
imageQueue.add({image: 'http://example.com/image1.tiff'});
Alternatively, you can use return promises instead of using the done
callback:
videoQueue.process(function(job){
return fetchVideo(job.data.url).then(transcodeVideo);
return Promise.reject(new Error('error transcoding'));
return Promise.resolve({ framerate: 29.5 });
throw new Error('some unexpected error');
return Promise.reject(new Error('some unexpected error'));
});
A queue can be paused and resumed globally (pass true
to pause processing for
just this worker):
queue.pause().then(function(){
});
queue.resume().then(function(){
})
A queue emits also some useful events:
.on('ready', function() {
})
.on('error', function(error) {
})
.on('active', function(job, jobPromise){
})
.on('stalled', function(job){
})
.on('progress', function(job, progress){
})
.on('completed', function(job, result){
})
.on('failed', function(job, err){
})
.on('paused', function(){
})
.on('resumed', function(job){
})
.on('cleaned', function(jobs, type) {
});
Events are by default local, i.e., they only fire on the listeners that are registered on the given worker,
if you need to listen to events globally, just set to true the last argument:
queue.on('completed', listener, true):
Queues are cheap, so if you need many of them just create new ones with different
names:
var userJohn = Queue('john');
var userLisa = Queue('lisa');
.
.
.
Queues are robust and can be run in parallel in several threads or processes
without any risk of hazards or queue corruption. Check this simple example
using cluster to parallelize jobs across processes:
var
Queue = require('bull'),
cluster = require('cluster');
var numWorkers = 8;
var queue = Queue("test concurrent queue", 6379, '127.0.0.1');
if(cluster.isMaster){
for (var i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('online', function(worker) {
for(var i=0; i<500; i++){
queue.add({foo: 'bar'});
};
});
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
}else{
queue.process(function(job, jobDone){
console.log("Job done by worker", cluster.worker.id, job.jobId);
jobDone();
});
}
Important Notes
The queue aims for "at most once" working strategy. When a worker is processing a job, it will keep the job locked until the work is done. However, it is important that the worker does not lock the event loop too long, otherwise other workers could pick the job believing that the worker processing it has been stalled.
Useful patterns
####Message Queue
Bull can also be used for persistent message queues. This is a quite useful
feature in some usecases. For example, you can have two servers that need to
communicate with each other. By using a queue the servers do not need to be online
at the same time, this create a very robust communication channel. You can treat
add as send and process as receive:
Server A:
var Queue = require('bull');
var sendQueue = Queue("Server B");
var receiveQueue = Queue("Server A");
receiveQueue.process(function(msg, done){
console.log("Received message", msg);
done();
});
sendQueue.add({msg:"Hello"});
Server B:
var Queue = require('bull');
var sendQueue = Queue("Server A");
var receiveQueue = Queue("Server B");
receiveQueue.process(function(msg, done){
console.log("Received message", msg);
done();
});
sendQueue.add({msg:"World"});
####Returning job completions
A common pattern is where you have a cluster of queue processors that just
process jobs as fast as they can, and some other services that need to take the
result of this processors and do something with it, maybe storing results in a
database.
The most robust and scalable way to accomplish this is by combining the standard
job queue with the message queue pattern: a service sends jobs to the cluster
just by opening a job queue and adding jobs to it, the cluster will start
processing as fast as it can. Everytime a job gets completed in the cluster a
message is send to a results message queue with the result data, this queue is
listened by some other service that stores the results in a database.
##Documentation
Reference
###Queue(queueName, redisPort, redisHost, [redisOpts])
###Queue(queueName, redisConnectionString, [redisOpts])
This is the Queue constructor. It creates a new Queue that is persisted in
Redis. Everytime the same queue is instantiated it tries to process all the
old jobs that may exist from a previous unfinished session.
Arguments
queueName {String} A unique name for this Queue.
redisPort {Number} A port where redis server is running.
redisHost {String} A host specified as IP or domain where redis is running.
redisOptions {Object} Options to pass to the redis client. https:
Alternatively, it's possible to pass a connection string to create a new queue.
Arguments
queueName {String} A unique name for this Queue.
redisConnectionString {String} A connection string containing the redis server host, port and (optional) authentication.
redisOptions {Object} Options to pass to the redis client. https:
#### Queue##process([concurrency,] function(job[, done]))
Defines a processing function for the jobs placed into a given Queue.
The callback is called everytime a job is placed in the queue. It is passed
an instance of the job as first argument.
If the callback signature contains the second optional done
argument,
the callback will be passed a done
callback to be called after the job
has been completed. The done
callback can be called with an Error instance,
to signal that the job did not complete successfully, or with a result as
second argument as second argument (e.g.: done(null, result);
) when the
job is successful.
Errors will be passed as a second argument to the "failed" event;
results, as a second argument to the "completed" event.
If, however, the callback signature does not contain the done
argument,
a promise must be returned to signal job completion.
If the promise is rejected, the error will be passed as
a second argument to the "failed" event.
If it is resolved, its value will be the "completed" event's second argument.
Note: in order to determine whether job completion is signaled by
returning a promise or calling the done
callback, Bull looks at
the length property of the callback you pass to it.
So watch out, as the following won't work:
queue.process(function(job, done) {
return Promise.resolve();
});
This, however, will:
queue.process(function(job) {
return Promise.resolve();
});
You can specify a concurrency. Bull will then call you handler in parallel respecting this max number.
Arguments
job {String} The job to process.
done {Function} The done callback to be called after the job has been completed.
#### Queue##add(data, opts)
Creates a new job and adds it to the queue. If the queue is empty the job
will be executed directly, otherwise it will be placed in the queue and
executed as soon as possible.
Arguments
data {PlainObject} A plain object with arguments that will be passed to
the job processing function in job.data.
opts A plain object with arguments that will be passed to the job
processing function in job.opts.
{
delay {Number} An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both server and clients should have their clocks synchronized. [optional]
attempts {Number} The total number of attempts to try the job until it completes.
backoff {Number|Object} Backoff setting for automatic retries if the job fails
backoff.type {String} Backoff type, which can be either `fixed` or `exponential`
backoff.delay {Number} Backoff delay, in milliseconds
lifo {Boolean} A boolean which, if true, adds the job to the right of the queue
instead of the left (default false)
timeout {Number} The number of milliseconds after which the job should be fail
with a timeout error [optional]
jobId {Number|String} Override the job ID - by default, the job ID is a unique
integer, but you can use this setting to override it.
If you use this option, it is up to you to ensure the
jobId is unique. If you attempt to add a job with an id that
already exists, it will not be added.
removeOnComplete {Boolean} A boolean which, if true, removes the job when it successfully
completes. Default behavior is to keep the job in the completed queue.
}
returns {Promise} A promise that resolves when the job has been succesfully
added to the queue (or rejects if some error occured). On success, the promise
resolves to the new Job.
#### Queue##pause([isLocal])
Returns a promise that resolves when the queue is paused. A paused queue will not
process new jobs until resumed, but current jobs being processed will continue until
they are finalized. The pause can be either global or local. If global, all workers in all queue instances for a given queue will be paused. If local, just this worker will stop processing new jobs after the current lock expires. This can be useful to stop a worker from taking new jobs prior to shutting down.
Pausing a queue that is already paused does nothing.
Arguments
isLocal {Boolean} True to only pause the local worker. Defaults to false.
returns {Promise} A promise that resolves when the queue is paused.
#### Queue##resume([isLocal])
Returns a promise that resolves when the queue is resumed after being paused.
The resume can be either local or global. If global, all workers in all queue
instances for a given queue will be resumed. If local, only this worker will be
resumed. Note that resuming a queue globally will not resume workers that have been
paused locally; for those, resume(true)
must be called directly on their instances.
Resuming a queue that is not paused does nothing.
Arguments
isLocal {Boolean} True to resume only the local worker. Defaults to false.
returns {Promise} A promise that resolves when the queue is resumed.
#### Queue##count()
Returns a promise that returns the number of jobs in the queue, waiting or
paused. Since there may be other processes adding or processing jobs, this
value may be true only for a very small amount of time.
Arguments
returns {Promise} A promise that resolves with the current jobs count.
#### Queue##empty()
Empties a queue deleting all the input lists and associated jobs.
Arguments
returns {Promise} A promise that resolves with the queue is emptied.
#### Queue##close()
Closes the underlying redis client. Use this to perform a graceful
shutdown.
var Queue = require('bull');
var queue = Queue('example');
var after100 = _.after(100, function () {
queue.close().then(function () { console.log('done') })
});
queue.on('completed', after100);
close
can be called from anywhere, with one caveat: if called
from within a job handler the queue won't close until after
the job has been processed, so the following won't work:
queue.process(function (job, jobDone) {
handle(job);
queue.close().then(jobDone);
});
Instead, do this:
queue.process(function (job, jobDone) {
handle(job);
queue.close();
jobDone();
});
Or this:
queue.process(function (job) {
queue.close();
return handle(job).then(...);
});
Arguments
returns {Promise} A promise that resolves when the redis client closes.
#### Queue##getJob(jobId)
Returns a promise that will return the job instance associated with the jobId
parameter. If the specified job cannot be located, the promise callback parameter
will be set to null
.
Arguments
jobId {String} A string identifying the ID of the to look up.
returns {Promise} A promise that resolves with the job instance when the job
has been retrieved to the queue, or null otherwise.
#### Queue##clean(grace, [type], [limit])
Tells the queue remove jobs of a specific type created outside of a grace period.
Example
queue.clean(5000);
queue.clean(10000, 'failed');
queue.on('cleaned', function (job, type) {
console.log('Cleaned %s %s jobs', job.length, type);
});
Arguments
grace {int} Grace period in milliseconds.
type {string} type of job to clean. Values are completed, wait, active,
delayed, and failed. Defaults to completed.
limit {int} maximum amount of jobs to clean per call. If not provided will clean all matching jobs.
returns {Promise} A promise that resolves with an array of removed jobs.
Events
The cleaner emits the cleaned
event anytime the queue is cleaned.
queue.on('cleaned', function (jobs, type) {});
jobs {Array} An array of jobs that have been cleaned.
type {String} The type of job cleaned. Options are completed, wait, active,
delayed, or failed.
###PriorityQueue(queueName, redisPort, redisHost, [redisOpts])
This is the Queue constructor of priority queue. It works same a normal queue, with same function and parameters.
The only difference is that the Queue#add() allow an options opts.priority that could take
["low", "normal", "medium", "high", "critical"]. If no options provider, "normal" will be taken.
The priority queue will process more often higher priority jobs than lower.
var PriorityQueue = require("bull/lib/priority-queue");
var queue = new PriorityQueue("myPriorityQueues");
queue.add({todo: "Improve feature"}, {priority: "normal"});
queue.add({todo: "Read 9gags"}, {priority: "low"});
queue.add({todo: "Fix my test unit"}, {priority: "critical"});
queue.process(function(job, done) {
console.log("I have to: " + job.data.todo);
done();
});
Warning: Priority queue use 5 times more redis connections than a normal queue.
### Job
A job includes all data needed to perform its execution, as well as the progress
method needed to update its progress.
The most important property for the user is Job##data that includes the
object that was passed to Queue##add, and that is normally used to
perform the job.
#### Job##remove()
Removes a Job from the queue from all the lists where it may be included.
Arguments
returns {Promise} A promise that resolves when the job is removed.
#### Job##retry()
Rerun a Job that has failed.
Arguments
returns {Promise} A promise that resolves when the job is scheduled for retry.
#### Job##discard()
Ensure this job is never ran again even if attemptsMade is less than job.attempts
Arguments
returns {Promise} A promise that resolves when the job is scheduled for retry.
####Debugging
To see debug statements set or add bull
to the NODE_DEBUG environment variable.
export NODE_DEBUG=bull
##License
(The MIT License)
Copyright (c) 2013 Manuel Astudillo manuel@optimalbits.com
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
'Software'), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.