Research
Security News
Quasar RAT Disguised as an npm Package for Detecting Vulnerabilities in Ethereum Smart Contracts
Socket researchers uncover a malicious npm package posing as a tool for detecting vulnerabilities in Etherium smart contracts.
Bull is a Node library that implements a fast and robust queue system based on Redis. It is designed to handle background jobs and message queues, providing features like job scheduling, concurrency control, and job prioritization.
Job Creation
This feature allows you to create and add jobs to a queue. The code sample demonstrates how to create a queue named 'myQueue' and add a job with data { foo: 'bar' } to it.
const Queue = require('bull');
const myQueue = new Queue('myQueue');
myQueue.add({ foo: 'bar' });
Job Processing
This feature allows you to define a processor for the jobs in the queue. The code sample shows how to process jobs in 'myQueue' by logging the job data and performing some processing.
myQueue.process(async (job) => {
console.log(job.data);
// Process the job
});
Job Scheduling
This feature allows you to schedule jobs to be processed at a later time. The code sample demonstrates how to add a job to 'myQueue' that will be processed after a delay of 60 seconds.
myQueue.add({ foo: 'bar' }, { delay: 60000 });
Job Prioritization
This feature allows you to set the priority of jobs in the queue. The code sample shows how to add a job with a priority of 1 to 'myQueue'. Higher priority jobs are processed before lower priority ones.
myQueue.add({ foo: 'bar' }, { priority: 1 });
Concurrency Control
This feature allows you to control the number of concurrent job processors. The code sample demonstrates how to process up to 5 jobs concurrently in 'myQueue'.
myQueue.process(5, async (job) => {
console.log(job.data);
// Process the job
});
Kue is another Redis-based priority job queue for Node.js. It provides a similar set of features to Bull, including job creation, processing, scheduling, and prioritization. However, Kue has a more extensive UI for monitoring and managing jobs.
Agenda is a lightweight job scheduling library for Node.js that uses MongoDB for persistence. It offers features like job scheduling, concurrency control, and job prioritization. Unlike Bull, which uses Redis, Agenda uses MongoDB, making it a good choice for applications already using MongoDB.
Bee-Queue is a simple, fast, and robust job/task queue for Node.js, backed by Redis. It is designed for high performance and low latency, making it suitable for real-time applications. Bee-Queue focuses on simplicity and performance, whereas Bull offers more advanced features and flexibility.
A lightweight, robust and fast job processing queue. Carefully written for rock solid stability and atomicity.
It uses redis for persistence, so the queue is not lost if the server goes down for any reason.
Follow manast for news and updates regarding this library.
npm install bull
Note that you need a redis version higher or equal than 2.8.11 for bull to work properly.
var Queue = require('bull');
var videoQueue = Queue('video transcoding', 6379, '127.0.0.1');
var audioQueue = Queue('audio transcoding', 6379, '127.0.0.1');
var imageQueue = Queue('image transcoding', 6379, '127.0.0.1');
videoQueue.process(function(job, done){
// job.data contains the custom data passed when the job was created
// job.jobId contains id of this job.
// transcode video asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give a error if error
done(Error('error transcoding'));
// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
});
audioQueue.process(function(job, done){
// transcode audio asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give a error if error
done(Error('error transcoding'));
// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
});
imageQueue.process(function(job, done){
// transcode image asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give a error if error
done(Error('error transcoding'));
// If the job throws an unhandled exception it is also handled correctly
throw (Error('some unexpected error'));
});
videoQueue.add({video: 'http://example.com/video1.mov'});
audioQueue.add({audio: 'http://example.com/audio1.mp3'});
imageQueue.add({image: 'http://example.com/image1.tiff'});
A queue can be paused and resumed:
queue.pause().then(function(){
// queue is paused now
});
queue.resume().then(function(){
// queue is resumed now
})
A queue emits also some useful events:
queue.on('completed', function(job){
// Job completed!
})
.on('failed', function(job, err){
// Job failed with reason err!
})
.on('progress', function(job, progress){
// Job progress updated!
})
.on('paused', function(){
// The queue has been paused
})
.on('resumed', function(job){
// The queue has been resumed
})
Queues are cheap, so if you need many of them just create new ones with different names:
var userJohn = Queue('john');
var userLisa = Queue('lisa');
.
.
.
Queues are robust and can be run in parallel in several threads or processes without any risk of hazards or queue corruption. Check this simple example using cluster to parallelize jobs accross processes:
var
Queue = require('bull'),
cluster = require('cluster');
var numWorkers = 8;
var queue = Queue("test concurrent queue", 6379, '127.0.0.1');
if(cluster.isMaster){
for (var i = 0; i < numWorkers; i++) {
cluster.fork();
}
cluster.on('online', function(worker) {
// Lets create a few jobs for the queue workers
for(var i=0; i<500; i++){
queue.add({foo: 'bar'});
};
});
cluster.on('exit', function(worker, code, signal) {
console.log('worker ' + worker.process.pid + ' died');
});
}else{
queue.process(function(job, jobDone){
console.log("Job done by worker", cluster.worker.id, job.jobId);
jobDone();
});
}
####Message Queue
Bull can also be used for persistent messsage queues. This is a quite useful feature in some usecases. For example, you can have two servers that need to communicate with each other. By using a queue the servers do not need to be online at the same time, this create a very robust communication channel. You can treat add as send and process as receive:
Server A:
var Queue = require('bull');
var sendQueue = Queue("Server B");
var receiveQueue = Queue("Server A");
receiveQueue.process(function(msg, done){
console.log("Received message", msg);
done();
});
sendQueue.add({msg:"Hello"});
Server B:
var Queue = require('bull');
var sendQueue = Queue("Server A");
var receiveQueue = Queue("Server B");
receiveQueue.process(function(msg, done){
console.log("Received message", msg);
done();
});
sendQueue.add({msg:"World"});
####Returning job completions
A common pattern is where you have a cluster of queue processors that just process jobs as fast as they can, and some other services that need to take the result of this processors and do something with it, maybe storing results in a database.
The most robust and scalable way to accomplish this is by combining the standard job queue with the message queue pattern: a service sends jobs to the cluster just by opening a job queue and adding jobs to it, the cluster will start processing as fast as it can. Everytime a job gets completed in the cluster a message is send to a results message queue with the result data, this queue is listened by some other service that stores the results in a database.
##Documentation
This is the Queue constructor. It creates a new Queue that is persisted in Redis. Everytime the same queue is instantiated it tries to process all the old jobs that may exist from a previous unfinished session.
Arguments
queueName {String} A unique name for this Queue.
redisPort {Number} A port where redis server is running.
redisHost {String} A host specified as IP or domain where redis is running.
redisOptions {Object} Options to pass to the redis client. https://github.com/mranney/node_redis
Defines a processing function for the jobs placed into a given Queue.
The callback is called everytime a job is placed in the queue and provides an instance of the job and a done callback to be called after the job has been completed. If done can be called providing an Error instance to signal that the job did not complete successfully.
You can specify a concurrency. Bull will then call you handler in parallel respecting this max number.
Arguments
job {String} The job to process.
done {Function} The done callback to be called after the job has been completed.
Creates a new job and adds it to the queue. If the queue is empty the job will be executed directly, otherwise it will be placed in the queue and executed as soon as possible.
Arguments
data {PlainObject} A plain object with arguments that will be passed
to the job processing function in job.data.
opts {PlainObject} A plain object with arguments that will be passed
to the job processing function in job.opts
opts.delay {Number} An amount of miliseconds to wait until this job
can be processed. Note that for accurate delays, both server and clients
should have their clocks synchronized.
opts.lifo {Boolean} A boolean which, if true, adds the job to the right
of the queue instead of the left (default false)
opts.timeout {Number} The number of milliseconds after which the job
should be fail with a timeout error [optional]
returns {Promise} A promise that resolves when the job has been succesfully
added to the queue (or rejects if some error occured).
Returns a promise that resolves when the queue is paused. The pause is global, meaning that all workers in all queue instances for a given queue will be paused. A paused queue will not process new jobs until resumed, but current jobs being processed will continue until they are finalized.
Pausing a queue that is already paused does nothing.
Arguments
returns {Promise} A promise that resolves when the queue is paused.
Returns a promise that resolves when the queue is resumed after being paused. The resume is global, meaning that all workers in all queue instances for a given queue will be resumed.
Resuming a queue that is not paused does nothing.
Arguments
returns {Promise} A promise that resolves when the queue is resumed.
Returns a promise that returns the number of jobs in the queue, waiting or paused. Since there may be other processes adding or processing jobs, this value may be true only for a very small amount of time.
Arguments
returns {Promise} A promise that resolves with the current jobs count.
Empties a queue deleting all the input lists and associated jobs.
Arguments
returns {Promise} A promise that resolves with the queue is emptied.
Returns a promise that will return the job instance associated with the jobId
parameter. If the specified job cannot be located, the promise callback parameter
will be set to null
.
Arguments
jobId {String} A string identifying the ID of the to look up.
returns {Promise} A promise that resolves with the job instance when the job
has been retrieved to the queue, or null otherwise.
This is the Queue constructor of priority queue. It works same a normal queue, with same function and parameters. The only difference is that the Queue#add() allow an options opts.priority that could take ["low", "normal", "medium", "hight", "critical"]. If no options provider, "normal" will be taken.
The priority queue will process more often highter priority jobs than lower.
var PriorityQueue = require("bull/lib/priority-queue");
var queue = new PriorityQueue("myPriorityQueues");
queue.add({todo: "Improve feature"}, {priority: "normal"});
queue.add({todo: "Read 9gags"}, {priority: "low"});
queue.add({todo: "Fix my test unit"}, {priority: "critical"});
queue.process(function(job, done) {
console.log("I have to: " + job.data.todo);
done();
});
Warning: Priority queue use 5 times more redis connections than a normal queue.
### JobA job includes all data needed to perform its execution, as well as the progress method needed to update its progress.
The most important property for the user is Job##data that includes the object that was passed to Queue##add, and that is normally used to perform the job.
Removes a Job from the queue from all the lists where it may be included.
Arguments
returns {Promise} A promise that resolves when the job is removed.
##License
(The MIT License)
Copyright (c) 2013 Manuel Astudillo manuel@optimalbits.com
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the 'Software'), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
v0.2.3
FAQs
Job manager
The npm package bull receives a total of 600,292 weekly downloads. As such, bull popularity was classified as popular.
We found that bull demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Security News
Socket researchers uncover a malicious npm package posing as a tool for detecting vulnerabilities in Etherium smart contracts.
Security News
Research
A supply chain attack on Rspack's npm packages injected cryptomining malware, potentially impacting thousands of developers.
Research
Security News
Socket researchers discovered a malware campaign on npm delivering the Skuld infostealer via typosquatted packages, exposing sensitive data.