![PyPI Now Supports iOS and Android Wheels for Mobile Python Development](https://cdn.sanity.io/images/cgdhsj6q/production/96416c872705517a6a65ad9646ce3e7caef623a0-1024x1024.webp?w=400&fit=max&auto=format)
Security News
PyPI Now Supports iOS and Android Wheels for Mobile Python Development
PyPI now supports iOS and Android wheels, making it easier for Python developers to distribute mobile packages.
node-resque
Advanced tools
Delayed Tasks in nodejs. A very opinionated but compatible API with resque and resque scheduler
I learn best by examples:
/////////////////////////
// REQUIRE THE PACKAGE //
/////////////////////////
var NR = require("node-resque");
///////////////////////////
// SET UP THE CONNECTION //
///////////////////////////
var connectionDetails = {
host: "127.0.0.1",
password: "",
port: 6379,
database: 0,
}
//////////////////////////////
// DEFINE YOUR WORKER TASKS //
//////////////////////////////
var jobs = {
"add": {
perform: function(a,b,callback){
var answer = a + b;
callback(null, answer);
},
},
"subtract": {
perform: function(a,b,callback){
var answer = a - b;
callback(null, answer);
},
},
"multiply": function(a,b,callback) {
callback(null, a * b);
},
};
////////////////////
// START A WORKER //
////////////////////
var worker = new NR.worker({connection: connectionDetails, queues: ['math']}, jobs, function(){
worker.workerCleanup(); // optional: cleanup any previous improperly shutdown workers on this host
worker.start();
});
///////////////////////
// START A SCHEDULER //
///////////////////////
var scheduler = new NR.scheduler({connection: connectionDetails}, function(){
scheduler.start();
});
/////////////////////////
// REGISTER FOR EVENTS //
/////////////////////////
worker.on('start', function(){ console.log("worker started"); })
worker.on('end', function(){ console.log("worker ended"); })
worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); })
worker.on('poll', function(queue){ console.log("worker polling " + queue); })
worker.on('job', function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); })
worker.on('reEnqueue', function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); })
worker.on('success', function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); })
worker.on('failure', function(queue, job, failure){ console.log("job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); })
worker.on('error', function(queue, job, error){ console.log("error " + queue + " " + JSON.stringify(job) + " >> " + error); })
worker.on('pause', function(){ console.log("worker paused"); })
scheduler.on('start', function(){ console.log("scheduler started"); })
scheduler.on('end', function(){ console.log("scheduler ended"); })
scheduler.on('error', function(error){ console.log("scheduler error >> " + error); })
scheduler.on('poll', function(){ console.log("scheduler polling"); })
scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); })
scheduler.on('transferred_job', function(timestamp, job){ console.log("scheduler enquing job " + timestamp + " >> " + JSON.stringify(job)); })
////////////////////////
// CONNECT TO A QUEUE //
////////////////////////
var queue = new NR.queue({connection: connectionDetails}, jobs, function(){
queue.enqueue('math', "add", [1,2]);
queue.enqueue('math', "add", [2,3]);
queue.enqueueIn(3000, 'math', "subtract", [2,1]);
});
new queue
requires only the "queue" variable to be set. You can also pass the jobs
hash to it.
new worker
has some additonal options:
options = {
looping: true,
timeout: 5000,
queues: "*",
name: os.hostname() + ":" + process.pid
}
The configuration hash passed to new worker
, new scheduler
or new queue
can also take a connection
option.
var connectionDetails = {
package: "redis",
host: "127.0.0.1",
password: "",
port: 6379,
database: 0,
namespace: "resque",
}
var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs, function(){
worker.start();
});
You can also pass redis client directly.
// assume you already initialize redis client before
var connectionDetails = { redis: redisClient }
var worker = new NR.worker({connection: connectionDetails, queues: 'math'}, jobs, function(){
worker.start();
});
worker.end()
before shutting down your application if you want to properly clear your worker status from resquebeforeEnqueue
or afterEnqueue
, be sure to pass the jobs
argument to the new Queue
constructorfailed
queue. You can then inspect these jobs, write a plugin to manage them, move them back to the normal queues, etc. Failure behavior by default is just to enter the failed
queue, but there are many options. Check out these examples from the ruby ecosystem for insperation:
hostname:pid+unique_id
. For example:var name = os.hostname() + ":" + process.pid + "+" + counter;
var worker = new NR.worker({connection: connectionDetails, queues: 'math', 'name' : name}, jobs);
Additonal methods provided on the queue
object:
You can use the queue object to check on your wokrers:
{ 'host:pid': 'queue1, queue2', 'host:pid': 'queue1, queue2' }
{"run_at":"Fri Dec 12 2014 14:01:16 GMT-0800 (PST)","queue":"test_queue","payload":{"class":"slowJob","queue":"test_queue","args":[null]},"worker":"workerA"}
queue.workingOn
with the worker names as keys.From time to time, your jobs/workers may fail. Resque workers will move failed jobs to a special failed
queue which will store the original arguments of your job, the failing stack trace, and additional medatadata.
You can work with these failed jobs with the following methods:
queue.failedCount = function(callback)
failedCount
is the number of jobs in the failed queuequeue.failed = function(start, stop, callback)
failedJobs
is an array listing the data of the failed jobs. Each element looks like:{ worker: 'busted-worker-3',
queue: 'busted-queue',
payload: { class: 'busted_job', queue: 'busted-queue', args: [ 1, 2, 3 ] },
exception: 'ERROR_NAME',
error: 'I broke',
failed_at: 'Sun Apr 26 2015 14:00:44 GMT+0100 (BST)' }
queue.removeFailed = function(failedJob, callback)
failedJob
is an expanded node object representing the failed job, retrieved via queue.failed
queue.retryAndRemoveFailed = function(failedJob, callback)
failedJob
is an expanded node object representing the failed job, retrieved via queue.failed
Sometimes a worker crashes is a severe way, and it doesn't get the time/chance to notifiy redis that it is leaving the pool (this happens all the time on PAAS providers like Heroku). When this happens, you will not only need to extract the job from the now-zombie worker's "working on" status, but also remove the stuck worker. To aid you in these edge cases, ``queue.cleanOldWorkers(age, callback)` is available.
Because there are no 'heartbeats' in resque, it is imposable for the application to know if a worker has been working on a long job or it is dead. You are required to provide an "age" for how long a worker has been "working", and all those older than that age will be removed, and the job they are working on moved to the error queue (where you can then use queue.retryAndRemoveFailed
) to re-enqueue the job.
If you know the name of a worker that should be removed, you can also call queue.forceCleanWorker(workerName, callback)
directly, and that will also remove the worker and move any job it was working on into the error queue.
Just like ruby's resque, you can write worker plugins. They look look like this. The 4 hooks you have are before_enqueue
, after_enqueue
, before_perform
, and after_perform
var myPlugin = function(worker, func, queue, job, args, options){
var self = this;
self.name = 'myPlugin';
self.worker = worker;
self.queue = queue;
self.func = func;
self.job = job;
self.args = args;
self.options = options;
}
////////////////////
// PLUGIN METHODS //
////////////////////
myPlugin.prototype.before_enqueue = function(callback){
// console.log("** before_enqueue")
callback(null, true);
}
myPlugin.prototype.after_enqueue = function(callback){
// console.log("** after_enqueue")
callback(null, true);
}
myPlugin.prototype.before_perform = function(callback){
// console.log("** before_perform")
callback(null, true);
}
myPlugin.prototype.after_perform = function(callback){
// console.log("** after_perform")
callback(null, true);
}
And then your plugin can be invoked within a job like this:
var jobs = {
"add": {
plugins: [ 'myPlugin' ],
pluginOptions: {
myPlugin: { thing: 'stuff' },
},
perform: function(a,b,callback){
var answer = a + b;
callback(null, answer);
},
},
}
notes
(error, toRun)
. if toRun = false
on beforeEnqueue
, the job beign inqueued will be thrown away, and if toRun = false
on beforePerfporm
, the job will be reEnqued and not run at this time. However, it doesn't really matter what toRun
returns on the after
hooks.worker.error
in your plugin. If worker.error
is null, no error will be logged in the resque error queue.var jobs = {
"add": {
plugins: [ require('myplugin') ],
pluginOptions: {
myPlugin: { thing: 'stuff' },
},
perform: function(a,b,callback){
var answer = a + b;
callback(null, answer);
},
},
}
node-resque provides a wrapper around the worker
object which will auto-scale the number of resque workers. This will process more than one job at a time as long as there is idle CPU within the event loop. For example, if you have a slow job that sends email via SMTP (with low rendering overhead), we can process many jobs at a time, but if you have a math-heavy operation, we'll stick to 1. The multiWorker
handles this by spawngning more and more node-resque workers and managing the pool.
var NR = require(__dirname + "/../index.js");
var connectionDetails = {
package: "redis",
host: "127.0.0.1",
password: ""
}
var multiWorker = new NR.multiWorker({
connection: connectionDetails,
queues: ['slowQueue'],
minTaskProcessors: 1,
maxTaskProcessors: 100,
checkTimeout: 1000,
maxEventLoopDelay: 10,
toDisconnectProcessors: true,
}, jobs, function(){
// normal worker emitters
multiWorker.on('start', function(workerId){ console.log("worker["+workerId+"] started"); })
multiWorker.on('end', function(workerId){ console.log("worker["+workerId+"] ended"); })
multiWorker.on('cleaning_worker', function(workerId, worker, pid){ console.log("cleaning old worker " + worker); })
multiWorker.on('poll', function(workerId, queue){ console.log("worker["+workerId+"] polling " + queue); })
multiWorker.on('job', function(workerId, queue, job){ console.log("worker["+workerId+"] working job " + queue + " " + JSON.stringify(job)); })
multiWorker.on('reEnqueue', function(workerId, queue, job, plugin){ console.log("worker["+workerId+"] reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); })
multiWorker.on('success', function(workerId, queue, job, result){ console.log("worker["+workerId+"] job success " + queue + " " + JSON.stringify(job) + " >> " + result); })
multiWorker.on('failure', function(workerId, queue, job, failure){ console.log("worker["+workerId+"] job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); })
multiWorker.on('error', function(workerId, queue, job, error){ console.log("worker["+workerId+"] error " + queue + " " + JSON.stringify(job) + " >> " + error); })
multiWorker.on('pause', function(workerId){ console.log("worker["+workerId+"] paused"); })
// multiWorker emitters
multiWorker.on('internalError', function(error){ console.log(error); })
multiWorker.on('multiWorkerAction', function(verb, delay){ console.log("*** checked for worker status: " + verb + " (event loop delay: " + delay + "ms)"); })
multiWorker.start();
});
This package was featued heavily in this presentation I gave about background jobs + node.js. It contains more examples!
Most of this code was inspired by / stolen from coffee-resque and coffee-resque-scheduler. Thanks!
FAQs
an opinionated implementation of resque in node
The npm package node-resque receives a total of 10,354 weekly downloads. As such, node-resque popularity was classified as popular.
We found that node-resque demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 0 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
PyPI now supports iOS and Android wheels, making it easier for Python developers to distribute mobile packages.
Security News
Create React App is officially deprecated due to React 19 issues and lack of maintenance—developers should switch to Vite or other modern alternatives.
Security News
Oracle seeks to dismiss fraud claims in the JavaScript trademark dispute, delaying the case and avoiding questions about its right to the name.