What is bull?
Bull is a Node library that implements a fast and robust queue system based on Redis. It is designed to handle background jobs and message queues, providing features like job scheduling, concurrency control, and job prioritization.
What are bull's main functionalities?
Job Creation
This feature allows you to create and add jobs to a queue. The code sample demonstrates how to create a queue named 'myQueue' and add a job with data { foo: 'bar' } to it.
const Queue = require('bull');
const myQueue = new Queue('myQueue');
myQueue.add({ foo: 'bar' });
Job Processing
This feature allows you to define a processor for the jobs in the queue. The code sample shows how to process jobs in 'myQueue' by logging the job data and performing some processing.
myQueue.process(async (job) => {
console.log(job.data);
// Process the job
});
Job Scheduling
This feature allows you to schedule jobs to be processed at a later time. The code sample demonstrates how to add a job to 'myQueue' that will be processed after a delay of 60 seconds.
myQueue.add({ foo: 'bar' }, { delay: 60000 });
Job Prioritization
This feature allows you to set the priority of jobs in the queue. The code sample shows how to add a job with a priority of 1 to 'myQueue'. Higher priority jobs are processed before lower priority ones.
myQueue.add({ foo: 'bar' }, { priority: 1 });
Concurrency Control
This feature allows you to control the number of concurrent job processors. The code sample demonstrates how to process up to 5 jobs concurrently in 'myQueue'.
myQueue.process(5, async (job) => {
console.log(job.data);
// Process the job
});
Other packages similar to bull
kue
Kue is another Redis-based priority job queue for Node.js. It provides a similar set of features to Bull, including job creation, processing, scheduling, and prioritization. However, Kue has a more extensive UI for monitoring and managing jobs.
agenda
Agenda is a lightweight job scheduling library for Node.js that uses MongoDB for persistence. It offers features like job scheduling, concurrency control, and job prioritization. Unlike Bull, which uses Redis, Agenda uses MongoDB, making it a good choice for applications already using MongoDB.
bee-queue
Bee-Queue is a simple, fast, and robust job/task queue for Node.js, backed by Redis. It is designed for high performance and low latency, making it suitable for real-time applications. Bee-Queue focuses on simplicity and performance, whereas Bull offers more advanced features and flexibility.
Are you developing bull sponsored by a company? Please, let us now!
Features
And coming up on the roadmap...
UIs
There are a few third-party UIs that you can use for monitoring:
Feature Comparison
Since there are a few job queue solutions, here a table comparing them to help you use the one that
better suits your needs.
Feature | Bull | Kue | Bee | Agenda |
---|
Backend | redis | redis | redis | mongo |
Priorities | ✓ | ✓ | | ✓ |
Concurrency | ✓ | ✓ | ✓ | ✓ |
Delayed jobs | ✓ | ✓ | | ✓ |
Pause/Resume | ✓ | ✓ | | |
Repeatable jobs | ✓ | | | ✓ |
Atomic ops | ✓ | | ✓ | |
Persistence | ✓ | ✓ | ✓ | ✓ |
Optimized for | Jobs / Messages | Jobs | Messages | Jobs |
Install
npm install bull --save
yarn add bull
Requirements: Bull requires a Redis version greater than or equal to 2.8.11
.
Quick Guide
var Queue = require('bull');
var videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');
var audioQueue = new Queue('audio transcoding', {redis: {port: 6379, host: '127.0.0.1'}});
var imageQueue = new Queue('image transcoding');
var pdfQueue = new Queue('pdf transcoding');
videoQueue.process(function(job, done){
job.progress(42);
done();
done(new Error('error transcoding'));
done(null, { framerate: 29.5 });
throw new Error('some unexpected error');
});
audioQueue.process(function(job, done){
job.progress(42);
done();
done(new Error('error transcoding'));
done(null, { samplerate: 48000 });
throw new Error('some unexpected error');
});
imageQueue.process(function(job, done){
job.progress(42);
done();
done(new Error('error transcoding'));
done(null, { width: 1280, height: 720 });
throw new Error('some unexpected error');
});
pdfQueue.process(function(job){
return pdfAsyncProcessor();
});
videoQueue.add({video: 'http://example.com/video1.mov'});
audioQueue.add({audio: 'http://example.com/audio1.mp3'});
imageQueue.add({image: 'http://example.com/image1.tiff'});
Alternatively, you can use return promises instead of using the done
callback:
videoQueue.process(function(job){
return fetchVideo(job.data.url).then(transcodeVideo);
return Promise.reject(new Error('error transcoding'));
return Promise.resolve({ framerate: 29.5 });
throw new Error('some unexpected error');
return Promise.reject(new Error('some unexpected error'));
});
A job can be added to a queue and processed repeatedly according to a cron specification:
paymentsQueue.process(function(job){
// Check payments
});
// Repeat payment job once every day at 3:15 (am)
paymentsQueue.add(paymentsData, {repeat: cron: {'15 3 * * *'}});
A queue can be paused and resumed globally (pass true
to pause processing for
just this worker):
queue.pause().then(function(){
});
queue.resume().then(function(){
})
A queue emits also some useful events, for example...
.on('completed', function(job, result){
})
For more information on events, including the full list of events that are fired, check out the Events reference
Queues are cheap, so if you need many of them just create new ones with different
names:
var userJohn = new Queue('john');
var userLisa = new Queue('lisa');
.
.
.
Queues are robust and can be run in parallel in several threads or processes
without any risk of hazards or queue corruption. Check this simple example
using cluster to parallelize jobs across processes:
var
Queue = require('bull'),
cluster = require('cluster');
var numWorkers = 8;
var queue = new Queue("test concurrent queue");
if(cluster.isMaster){
for (var i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('online', function(worker) {
for(var i=0; i<500; i++){
queue.add({foo: 'bar'});
};
});
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
}else{
queue.process(function(job, jobDone){
console.log("Job done by worker", cluster.worker.id, job.jobId);
jobDone();
});
}
Documentation
For the full documentation, check out the reference and common patterns:
- Reference — the full reference material for Bull.
- Patterns — a set of examples for common patterns.
- License — the Bull license—it's MIT.
If you see anything that could use more docs, please submit a pull request!
Important Notes
The queue aims for "at least once" working strategy. It means that in some situations a job
could be processed more than once. This mostly happens when a worker fails to keep a lock
for a given job during the total duration of the processing.
When a worker is processing a job it will keep the job "locked" so other workers can't process it.
It's important to understand how locking works to prevent your jobs from losing their lock - becoming stalled -
and being restarted as a result. Locking is implemented internally by creating a lock for lockDuration
on interval
lockRenewTime
(which is usually half lockDuration
). If lockDuration
elapses before the lock can be renewed,
the job will be considered stalled and is automatically restarted; it will be double processed. This can happen when:
- The Node process running your job processor unexpectedly terminates.
- Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the
lockDuration
setting (with the tradeoff being that it will take longer to recognize a real stalled job).
As such, you should always listen for the stalled
event and log this to your error monitoring system, as this means your jobs are likely getting double-processed.
As a safeguard so problematic jobs won't get restarted indefinitely (e.g. if the job processor aways crashes its Node process), jobs will be recovered from a stalled state a maximum of maxStalledCount
times (default: 1
).