Product
Socket Now Supports uv.lock Files
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
Bull is a Node library that implements a fast and robust queue system based on Redis. It is designed to handle background jobs and message queues, providing features like job scheduling, concurrency control, and job prioritization.
Job Creation
This feature allows you to create and add jobs to a queue. The code sample demonstrates how to create a queue named 'myQueue' and add a job with data { foo: 'bar' } to it.
const Queue = require('bull');
const myQueue = new Queue('myQueue');
myQueue.add({ foo: 'bar' });
Job Processing
This feature allows you to define a processor for the jobs in the queue. The code sample shows how to process jobs in 'myQueue' by logging the job data and performing some processing.
myQueue.process(async (job) => {
console.log(job.data);
// Process the job
});
Job Scheduling
This feature allows you to schedule jobs to be processed at a later time. The code sample demonstrates how to add a job to 'myQueue' that will be processed after a delay of 60 seconds.
myQueue.add({ foo: 'bar' }, { delay: 60000 });
Job Prioritization
This feature allows you to set the priority of jobs in the queue. The code sample shows how to add a job with a priority of 1 to 'myQueue'. Higher priority jobs are processed before lower priority ones.
myQueue.add({ foo: 'bar' }, { priority: 1 });
Concurrency Control
This feature allows you to control the number of concurrent job processors. The code sample demonstrates how to process up to 5 jobs concurrently in 'myQueue'.
myQueue.process(5, async (job) => {
console.log(job.data);
// Process the job
});
Kue is another Redis-based priority job queue for Node.js. It provides a similar set of features to Bull, including job creation, processing, scheduling, and prioritization. However, Kue has a more extensive UI for monitoring and managing jobs.
Agenda is a lightweight job scheduling library for Node.js that uses MongoDB for persistence. It offers features like job scheduling, concurrency control, and job prioritization. Unlike Bull, which uses Redis, Agenda uses MongoDB, making it a good choice for applications already using MongoDB.
Bee-Queue is a simple, fast, and robust job/task queue for Node.js, backed by Redis. It is designed for high performance and low latency, making it suitable for real-time applications. Bee-Queue focuses on simplicity and performance, whereas Bull offers more advanced features and flexibility.
The fastest, most reliable, Redis-based queue for Node.
Carefully written for rock solid stability and atomicity.
Sponsors · Features · UIs · Install · Quick Guide · Documentation
Check the new Guide!
Follow me on Twitter for important news and updates.
You can find tutorials and news in this blog: https://blog.taskforce.sh/
If you want to start using the next major version of Bull written entirely in Typescript you are welcome to the new repo here. Otherwise you are very welcome to still use Bull, which is a safe, battle tested codebase.
If you need high quality production Redis instances for your Bull projects, please consider subscribing to RedisGreen, leaders in Redis hosting that works perfectly with Bull. Use the promo code "BULLMQ" when signing up to help us sponsor the development of Bull!
Supercharge your queues with a professional front end:
Sign up at Taskforce.sh
And coming up on the roadmap...
There are a few third-party UIs that you can use for monitoring:
BullMQ
Bull v3
Bull <= v2
Since there are a few job queue solutions, here is a table comparing them:
Feature | Bull | Kue | Bee | Agenda |
---|---|---|---|---|
Backend | redis | redis | redis | mongo |
Priorities | ✓ | ✓ | ✓ | |
Concurrency | ✓ | ✓ | ✓ | ✓ |
Delayed jobs | ✓ | ✓ | ✓ | |
Global events | ✓ | ✓ | ||
Rate Limiter | ✓ | |||
Pause/Resume | ✓ | ✓ | ||
Sandboxed worker | ✓ | |||
Repeatable jobs | ✓ | ✓ | ||
Atomic ops | ✓ | ✓ | ||
Persistence | ✓ | ✓ | ✓ | ✓ |
UI | ✓ | ✓ | ✓ | |
Optimized for | Jobs / Messages | Jobs | Messages | Jobs |
npm install bull --save
or
yarn add bull
Requirements: Bull requires a Redis version greater than or equal to 2.8.18
.
npm install @types/bull --save-dev
yarn add --dev @types/bull
Definitions are currently maintained in the DefinitelyTyped repo.
We welcome all types of contributions, either code fixes, new features or doc improvements. Code formatting is enforced by prettier. For commits please follow conventional commits convention. All code must pass lint rules and test suites before it can be merged into develop.
const Queue = require('bull');
const videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');
const audioQueue = new Queue('audio transcoding', { redis: { port: 6379, host: '127.0.0.1', password: 'foobared' } }); // Specify Redis connection using object
const imageQueue = new Queue('image transcoding');
const pdfQueue = new Queue('pdf transcoding');
videoQueue.process(function (job, done) {
// job.data contains the custom data passed when the job was created
// job.id contains id of this job.
// transcode video asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give a error if error
done(new Error('error transcoding'));
// or pass it a result
done(null, { framerate: 29.5 /* etc... */ });
// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
});
audioQueue.process(function (job, done) {
// transcode audio asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give a error if error
done(new Error('error transcoding'));
// or pass it a result
done(null, { samplerate: 48000 /* etc... */ });
// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
});
imageQueue.process(function (job, done) {
// transcode image asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give a error if error
done(new Error('error transcoding'));
// or pass it a result
done(null, { width: 1280, height: 720 /* etc... */ });
// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
});
pdfQueue.process(function (job) {
// Processors can also return promises instead of using the done callback
return pdfAsyncProcessor();
});
videoQueue.add({ video: 'http://example.com/video1.mov' });
audioQueue.add({ audio: 'http://example.com/audio1.mp3' });
imageQueue.add({ image: 'http://example.com/image1.tiff' });
Alternatively, you can use return promises instead of using the done
callback:
videoQueue.process(function (job) { // don't forget to remove the done callback!
// Simply return a promise
return fetchVideo(job.data.url).then(transcodeVideo);
// Handles promise rejection
return Promise.reject(new Error('error transcoding'));
// Passes the value the promise is resolved with to the "completed" event
return Promise.resolve({ framerate: 29.5 /* etc... */ });
// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
// same as
return Promise.reject(new Error('some unexpected error'));
});
The process function can also be run in a separate process. This has several advantages:
In order to use this feature just create a separate file with the processor:
// processor.js
module.exports = function (job) {
// Do some heavy work
return Promise.resolve(result);
}
And define the processor like this:
// Single process:
queue.process('/path/to/my/processor.js');
// You can use concurrency as well:
queue.process(5, '/path/to/my/processor.js');
// and named processors:
queue.process('my processor', 5, '/path/to/my/processor.js');
A job can be added to a queue and processed repeatedly according to a cron specification:
paymentsQueue.process(function (job) {
// Check payments
});
// Repeat payment job once every day at 3:15 (am)
paymentsQueue.add(paymentsData, { repeat: { cron: '15 3 * * *' } });
As a tip, check your expressions here to verify they are correct: cron expression generator
A queue can be paused and resumed globally (pass true
to pause processing for
just this worker):
queue.pause().then(function () {
// queue is paused now
});
queue.resume().then(function () {
// queue is resumed now
})
A queue emits some useful events, for example...
.on('completed', function (job, result) {
// Job completed with output result!
})
For more information on events, including the full list of events that are fired, check out the Events reference
Queues are cheap, so if you need many of them just create new ones with different names:
const userJohn = new Queue('john');
const userLisa = new Queue('lisa');
.
.
.
However every queue instance will require new redis connections, check how to reuse connections or you can also use named processors to achieve a similar result.
NOTE: From version 3.2.0 and above it is recommended to use threaded processors instead.
Queues are robust and can be run in parallel in several threads or processes without any risk of hazards or queue corruption. Check this simple example using cluster to parallelize jobs across processes:
const Queue = require('bull');
const cluster = require('cluster');
const numWorkers = 8;
const queue = new Queue('test concurrent queue');
if (cluster.isMaster) {
for (let i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('online', function (worker) {
// Let's create a few jobs for the queue workers
for (let i = 0; i < 500; i++) {
queue.add({ foo: 'bar' });
};
});
cluster.on('exit', function (worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
} else {
queue.process(function (job, jobDone) {
console.log('Job done by worker', cluster.worker.id, job.id);
jobDone();
});
}
For the full documentation, check out the reference and common patterns:
If you see anything that could use more docs, please submit a pull request!
The queue aims for an "at least once" working strategy. This means that in some situations, a job could be processed more than once. This mostly happens when a worker fails to keep a lock for a given job during the total duration of the processing.
When a worker is processing a job it will keep the job "locked" so other workers can't process it.
It's important to understand how locking works to prevent your jobs from losing their lock - becoming stalled -
and being restarted as a result. Locking is implemented internally by creating a lock for lockDuration
on interval
lockRenewTime
(which is usually half lockDuration
). If lockDuration
elapses before the lock can be renewed,
the job will be considered stalled and is automatically restarted; it will be double processed. This can happen when:
lockDuration
setting (with the tradeoff being that it will take longer to recognize a real stalled job).As such, you should always listen for the stalled
event and log this to your error monitoring system, as this means your jobs are likely getting double-processed.
As a safeguard so problematic jobs won't get restarted indefinitely (e.g. if the job processor always crashes its Node process), jobs will be recovered from a stalled state a maximum of maxStalledCount
times (default: 1
).
FAQs
Job manager
The npm package bull receives a total of 389,422 weekly downloads. As such, bull popularity was classified as popular.
We found that bull demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Product
Socket now supports uv.lock files to ensure consistent, secure dependency resolution for Python projects and enhance supply chain security.
Research
Security News
Socket researchers have discovered multiple malicious npm packages targeting Solana private keys, abusing Gmail to exfiltrate the data and drain Solana wallets.
Security News
PEP 770 proposes adding SBOM support to Python packages to improve transparency and catch hidden non-Python dependencies that security tools often miss.