📻 News and updates
Follow me on Twitter for important news and updates.
🛠 Tutorials
You can find tutorials and news in this blog: https://blog.taskforce.sh/
BullMQ
If you want to start using the next major version of Bull written entirely in Typescript you are welcome to the new repo here. Otherwise you are very welcome to still use Bull, which is a safe, battle tested codebase.
If you need high quality production Redis instances for your Bull projects, please consider subscribing
to RedisGreen,
leaders in Redis hosting that works perfectly with Bull. Use the promo code "BULLMQ" when signing up to help us
sponsor the development of Bull!
Official FrontEnd
Supercharge your queues with a professional front end:
- Get a complete overview of all your queues.
- Inspect jobs, search, retry, or promote delayed jobs.
- Metrics and statistics.
- and many more features.
Sign up at Taskforce.sh
Bull Features
And coming up on the roadmap...
UIs
There are a few third-party UIs that you can use for monitoring:
BullMQ
Bull v3
Bull <= v2
Monitoring & Alerting
Feature Comparison
Since there are a few job queue solutions, here is a table comparing them:
Feature | Bull | Kue | Bee | Agenda |
---|
Backend | redis | redis | redis | mongo |
Priorities | ✓ | ✓ | | ✓ |
Concurrency | ✓ | ✓ | ✓ | ✓ |
Delayed jobs | ✓ | ✓ | | ✓ |
Global events | ✓ | ✓ | | |
Rate Limiter | ✓ | | | |
Pause/Resume | ✓ | ✓ | | |
Sandboxed worker | ✓ | | | |
Repeatable jobs | ✓ | | | ✓ |
Atomic ops | ✓ | | ✓ | |
Persistence | ✓ | ✓ | ✓ | ✓ |
UI | ✓ | ✓ | | ✓ |
Optimized for | Jobs / Messages | Jobs | Messages | Jobs |
Install
npm install bull --save
or
yarn add bull
Requirements: Bull requires a Redis version greater than or equal to 2.8.18
.
Typescript Definitions
npm install @types/bull --save-dev
yarn add --dev @types/bull
Definitions are currently maintained in the DefinitelyTyped repo.
Contributing
We welcome all types of contributions, either code fixes, new features or doc improvements.
Code formatting is enforced by prettier.
For commits please follow conventional commits convention.
All code must pass lint rules and test suites before it can be merged into develop.
Quick Guide
Basic Usage
const Queue = require('bull');
const videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');
const audioQueue = new Queue('audio transcoding', { redis: { port: 6379, host: '127.0.0.1', password: 'foobared' } });
const imageQueue = new Queue('image transcoding');
const pdfQueue = new Queue('pdf transcoding');
videoQueue.process(function (job, done) {
job.progress(42);
done();
done(new Error('error transcoding'));
done(null, { framerate: 29.5 });
throw new Error('some unexpected error');
});
audioQueue.process(function (job, done) {
job.progress(42);
done();
done(new Error('error transcoding'));
done(null, { samplerate: 48000 });
throw new Error('some unexpected error');
});
imageQueue.process(function (job, done) {
job.progress(42);
done();
done(new Error('error transcoding'));
done(null, { width: 1280, height: 720 });
throw new Error('some unexpected error');
});
pdfQueue.process(function (job) {
return pdfAsyncProcessor();
});
videoQueue.add({ video: 'http://example.com/video1.mov' });
audioQueue.add({ audio: 'http://example.com/audio1.mp3' });
imageQueue.add({ image: 'http://example.com/image1.tiff' });
Using promises
Alternatively, you can use return promises instead of using the done
callback:
videoQueue.process(function (job) {
return fetchVideo(job.data.url).then(transcodeVideo);
return Promise.reject(new Error('error transcoding'));
return Promise.resolve({ framerate: 29.5 });
throw new Error('some unexpected error');
return Promise.reject(new Error('some unexpected error'));
});
Separate processes
The process function can also be run in a separate process. This has several advantages:
- The process is sandboxed so if it crashes it does not affect the worker.
- You can run blocking code without affecting the queue (jobs will not stall).
- Much better utilization of multi-core CPUs.
- Less connections to redis.
In order to use this feature just create a separate file with the processor:
module.exports = function (job) {
return Promise.resolve(result);
}
And define the processor like this:
queue.process('/path/to/my/processor.js');
queue.process(5, '/path/to/my/processor.js');
queue.process('my processor', 5, '/path/to/my/processor.js');
Repeated jobs
A job can be added to a queue and processed repeatedly according to a cron specification:
paymentsQueue.process(function (job) {
});
paymentsQueue.add(paymentsData, { repeat: { cron: '15 3 * * *' } });
As a tip, check your expressions here to verify they are correct:
cron expression generator
Pause / Resume
A queue can be paused and resumed globally (pass true
to pause processing for
just this worker):
queue.pause().then(function () {
});
queue.resume().then(function () {
})
Events
A queue emits some useful events, for example...
.on('completed', function (job, result) {
})
For more information on events, including the full list of events that are fired, check out the Events reference
Queues performance
Queues are cheap, so if you need many of them just create new ones with different
names:
const userJohn = new Queue('john');
const userLisa = new Queue('lisa');
.
.
.
However every queue instance will require new redis connections, check how to reuse connections or you can also use named processors to achieve a similar result.
Cluster support
NOTE: From version 3.2.0 and above it is recommended to use threaded processors instead.
Queues are robust and can be run in parallel in several threads or processes
without any risk of hazards or queue corruption. Check this simple example
using cluster to parallelize jobs across processes:
const Queue = require('bull');
const cluster = require('cluster');
const numWorkers = 8;
const queue = new Queue('test concurrent queue');
if (cluster.isMaster) {
for (let i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('online', function (worker) {
for (let i = 0; i < 500; i++) {
queue.add({ foo: 'bar' });
};
});
cluster.on('exit', function (worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
} else {
queue.process(function (job, jobDone) {
console.log('Job done by worker', cluster.worker.id, job.id);
jobDone();
});
}
Documentation
For the full documentation, check out the reference and common patterns:
- Guide — Your starting point for developing with Bull.
- Reference — Reference document with all objects and methods available.
- Patterns — a set of examples for common patterns.
- License — the Bull license—it's MIT.
If you see anything that could use more docs, please submit a pull request!
Important Notes
The queue aims for an "at least once" working strategy. This means that in some situations, a job
could be processed more than once. This mostly happens when a worker fails to keep a lock
for a given job during the total duration of the processing.
When a worker is processing a job it will keep the job "locked" so other workers can't process it.
It's important to understand how locking works to prevent your jobs from losing their lock - becoming stalled -
and being restarted as a result. Locking is implemented internally by creating a lock for lockDuration
on interval
lockRenewTime
(which is usually half lockDuration
). If lockDuration
elapses before the lock can be renewed,
the job will be considered stalled and is automatically restarted; it will be double processed. This can happen when:
- The Node process running your job processor unexpectedly terminates.
- Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the
lockDuration
setting (with the tradeoff being that it will take longer to recognize a real stalled job).
As such, you should always listen for the stalled
event and log this to your error monitoring system, as this means your jobs are likely getting double-processed.
As a safeguard so problematic jobs won't get restarted indefinitely (e.g. if the job processor always crashes its Node process), jobs will be recovered from a stalled state a maximum of maxStalledCount
times (default: 1
).