node-resque
Advanced tools
Comparing version 0.16.5 to 0.17.0
@@ -89,2 +89,3 @@ ///////////////////////// | ||
scheduler.on('poll', function(){ console.log("scheduler polling"); }) | ||
scheduler.on('master', function(state){ console.log("scheduler became master"); }) | ||
scheduler.on('error', function(error){ console.log("scheduler error >> " + error); }) | ||
@@ -91,0 +92,0 @@ scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); }) |
@@ -1,5 +0,7 @@ | ||
// TODO: Locking like ruby does | ||
// To read notes about the master locking scheme, check out: | ||
// https://github.com/resque/resque-scheduler/blob/master/lib/resque/scheduler/locking.rb | ||
var EventEmitter = require('events').EventEmitter; | ||
var util = require("util"); | ||
var os = require("os"); | ||
var connection = require(__dirname + "/connection.js").connection; | ||
@@ -20,4 +22,6 @@ var queue = require(__dirname + "/queue.js").queue; | ||
} | ||
self.options = options; | ||
self.running = false; | ||
self.options = options; | ||
self.name = self.options.name; | ||
self.master = false; | ||
self.running = false; | ||
self.queue = new queue({connection: options.connection}, jobs, function(err){ | ||
@@ -34,3 +38,5 @@ self.connection = self.queue.connection; | ||
return { | ||
timeout: 5000, | ||
timeout: 5000, // in ms | ||
masterLockTimeout: 60 * 3, // in seconds | ||
name: os.hostname() + ":" + process.pid, // assumes only one worker per node process | ||
}; | ||
@@ -54,12 +60,18 @@ }; | ||
self.running = false; | ||
if(self.processing === false){ | ||
clearTimeout(self.timer); | ||
self.queue.end(function() { | ||
self.emit('end'); | ||
self.connection.disconnect(); | ||
process.nextTick(function(){ | ||
if(typeof callback === 'function'){ callback(); } | ||
self.releaseMasterLock(function(error, wasMaster){ | ||
if(error){ self.emit('error', error); } | ||
self.queue.end(function() { | ||
self.emit('end'); | ||
self.connection.disconnect(); | ||
process.nextTick(function(){ | ||
if(typeof callback === 'function'){ callback(); } | ||
}); | ||
}); | ||
}); | ||
}else if(self.processing === true){ | ||
} | ||
else{ | ||
setTimeout(function(){ | ||
@@ -75,18 +87,27 @@ self.end(callback); | ||
clearTimeout(self.timer); | ||
self.emit('poll'); | ||
self.nextDelayedTimestamp(function(err, timestamp){ | ||
if(!err && timestamp){ | ||
self.emit('working_timestamp', timestamp); | ||
self.enqueueDelayedItemsForTimestamp(timestamp, function(err){ | ||
if(err){ self.emit('error', err); } | ||
self.poll(callback); | ||
self.tryForMaster(function(error, isMaster){ | ||
if(isMaster){ | ||
if(!self.master){ | ||
self.master = true; | ||
self.emit('master'); | ||
} | ||
self.emit('poll'); | ||
self.nextDelayedTimestamp(function(error, timestamp){ | ||
if(!error && timestamp){ | ||
self.emit('working_timestamp', timestamp); | ||
self.enqueueDelayedItemsForTimestamp(timestamp, function(error){ | ||
if(error){ self.emit('error', error); } | ||
self.poll(callback); | ||
}); | ||
}else{ | ||
if(error){ self.emit('error', error); } | ||
self.processing = false; | ||
self.pollAgainLater(); | ||
if(typeof callback === 'function'){ callback(); } | ||
} | ||
}); | ||
}else{ | ||
if(err){ self.emit('error', err); } | ||
self.master = false; | ||
self.processing = false; | ||
if(self.running === true){ | ||
self.timer = setTimeout((function() { | ||
self.poll(); | ||
}), self.options.timeout); | ||
} | ||
self.pollAgainLater(); | ||
if(typeof callback === 'function'){ callback(); } | ||
@@ -97,8 +118,56 @@ } | ||
scheduler.prototype.pollAgainLater = function(){ | ||
var self = this; | ||
if(self.running === true){ | ||
self.timer = setTimeout((function() { | ||
self.poll(); | ||
}), self.options.timeout); | ||
} | ||
}; | ||
scheduler.prototype.tryForMaster = function(callback) { | ||
var self = this; | ||
var masterKey = self.connection.key('resque_scheduler_master_lock'); | ||
self.connection.redis.setnx(masterKey, self.options.name, function(error, locked){ | ||
if(error){ return callback(error); } | ||
else if(locked === true || locked === 1){ | ||
self.connection.redis.expire(masterKey, self.options.masterLockTimeout, function(error){ | ||
return callback(error, true); | ||
}); | ||
}else{ | ||
self.connection.redis.get(masterKey, function(error, value){ | ||
if(error){ return callback(error); } | ||
else if(value === self.options.name){ | ||
self.connection.redis.expire(masterKey, self.options.masterLockTimeout, function(error){ | ||
return callback(error, true); | ||
}); | ||
}else{ | ||
return callback(null, false); | ||
} | ||
}); | ||
} | ||
}); | ||
}; | ||
scheduler.prototype.releaseMasterLock = function(callback){ | ||
var self = this; | ||
var masterKey = self.connection.key('resque_scheduler_master_lock'); | ||
self.tryForMaster(function(error, isMaster){ | ||
if(error){ return callback(error); } | ||
else if(!isMaster){ return callback(null, false); } | ||
else{ | ||
self.connection.redis.del(masterKey, function(error, delted){ | ||
self.master = false; | ||
callback(error, (delted === 1)); | ||
}); | ||
} | ||
}); | ||
}; | ||
scheduler.prototype.nextDelayedTimestamp = function(callback) { | ||
var self = this; | ||
var time = Math.round(new Date().getTime() / 1000); | ||
self.connection.redis.zrangebyscore(self.connection.key('delayed_queue_schedule'), '-inf', time, 'limit', 0, 1, function(err, items) { | ||
if (err || items === null || items.length === 0) { | ||
callback(err); | ||
self.connection.redis.zrangebyscore(self.connection.key('delayed_queue_schedule'), '-inf', time, 'limit', 0, 1, function(error, items) { | ||
if (error || items === null || items.length === 0) { | ||
callback(error); | ||
} else { | ||
@@ -112,4 +181,4 @@ callback(null, items[0]); | ||
var self = this; | ||
self.nextItemForTimestamp(timestamp, function(err, job){ | ||
if (!err && job ) { | ||
self.nextItemForTimestamp(timestamp, function(error, job){ | ||
if (!error && job ) { | ||
self.transfer(timestamp, job, function(){ | ||
@@ -119,3 +188,3 @@ self.enqueueDelayedItemsForTimestamp(timestamp, callback); | ||
} else { | ||
callback(err); | ||
callback(error); | ||
} | ||
@@ -128,10 +197,10 @@ }); | ||
var key = self.connection.key("delayed:" + timestamp); | ||
self.connection.redis.lpop(key, function(err, job){ | ||
if(err){ | ||
callback(err); | ||
self.connection.redis.lpop(key, function(error, job){ | ||
if(error){ | ||
callback(error); | ||
}else{ | ||
self.connection.redis.srem(self.connection.key("timestamps:" + job), key, function(err){ | ||
self.connection.redis.srem(self.connection.key("timestamps:" + job), key, function(error){ | ||
self.cleanupTimestamp(timestamp, function(){ | ||
if (err) { | ||
callback(err); | ||
if (error) { | ||
callback(error); | ||
} else { | ||
@@ -148,4 +217,4 @@ callback(null, JSON.parse(job)); | ||
var self = this; | ||
self.queue.enqueue(job.queue, job.class, job.args, function(err){ | ||
if(err){ self.emit('error', err); } | ||
self.queue.enqueue(job.queue, job.class, job.args, function(error){ | ||
if(error){ self.emit('error', error); } | ||
self.emit('transferred_job', timestamp, job); | ||
@@ -159,3 +228,3 @@ callback(); | ||
var key = self.connection.key("delayed:" + timestamp); | ||
self.connection.redis.llen(key, function(err, len) { | ||
self.connection.redis.llen(key, function(error, len) { | ||
if (len === 0) { | ||
@@ -162,0 +231,0 @@ self.connection.redis.del(key); |
@@ -5,3 +5,4 @@ { | ||
"description": "an opinionated implementation of resque in node", | ||
"version": "0.16.5", | ||
"license": "Apache-2.0", | ||
"version": "0.17.0", | ||
"homepage": "http://github.com/taskrabbit/node-resque", | ||
@@ -28,5 +29,6 @@ "repository": { | ||
"devDependencies": { | ||
"mocha": "latest", | ||
"should": "latest", | ||
"fakeredis": "latest" | ||
"mocha": "latest", | ||
"should": "latest", | ||
"fakeredis": "latest", | ||
"node-schedule": "latest" | ||
}, | ||
@@ -33,0 +35,0 @@ "scripts": { |
@@ -88,2 +88,3 @@ # node-resque | ||
scheduler.on('error', function(error){ console.log("scheduler error >> " + error); }) | ||
scheduler.on('master', function(){ console.log("scheduler became master"); }) | ||
scheduler.on('poll', function(){ console.log("scheduler polling"); }) | ||
@@ -245,2 +246,22 @@ scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); }) | ||
## Job Schedules | ||
You may want to use node-resque to schedule jobs every minute/hour/day, like a distribued CRON job. There are a nuber of excelent node packages to help you with this, like (node-schedule)[https://github.com/tejasmanohar/node-schedule] and (node-cron)[https://github.com/ncb000gt/node-cron]. Node-resque makes it possible for you to use the packages to schedule jobs with. | ||
Assuming you are running node-resque across multiple machines, you will need to ensure that only one of your processes is actually scheduluing the jobs. To help you with this, you can inspect which of the scheduler processes is corrently acting as master, and flag only the master scheduler process to run the schedule. A full example can be found at [/examples/scheduledJobs.js](https://github.com/taskrabbit/node-resque/blob/master/examples/scheduledJobs.js), but the relevent section is: | ||
``` javascript | ||
var schedule = require('node-schedule'); | ||
var queue = new NR.queue({connection: connectionDetails}, jobs, function(){ | ||
schedule.scheduleJob('10,20,30,40,50 * * * * *', function(){ // do this job every 10 seconds, cron style | ||
// we want to ensure that only one instance of this job is scheduled in our enviornment at once, | ||
// no matter how many schedulers we have running | ||
if(scheduler.master){ | ||
console.log(">>> enquing a job"); | ||
queue.enqueue('time', "ticktock", new Date().toString() ); | ||
} | ||
}); | ||
}); | ||
``` | ||
## Plugins | ||
@@ -247,0 +268,0 @@ |
@@ -42,2 +42,27 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper; | ||
describe('locking', function(){ | ||
before(function(done){ specHelper.connect(done); }); | ||
beforeEach(function(done){ specHelper.cleanup(done); }); | ||
after(function(done){ specHelper.cleanup(done); }); | ||
it('should only have one master; and can failover', function(done){ | ||
sheduler_1 = new specHelper.NR.scheduler({connection: specHelper.connectionDetails, name: 'scheduler_1', timeout: specHelper.timeout}); | ||
sheduler_2 = new specHelper.NR.scheduler({connection: specHelper.connectionDetails, name: 'scheduler_2', timeout: specHelper.timeout}); | ||
sheduler_1.start(); | ||
sheduler_2.start(); | ||
setTimeout(function(){ | ||
sheduler_1.master.should.equal(true); | ||
sheduler_2.master.should.equal(false); | ||
sheduler_1.end(); | ||
setTimeout(function(){ | ||
sheduler_1.master.should.equal(false); | ||
sheduler_2.master.should.equal(true); | ||
sheduler_2.end(function(){ done(); }); | ||
}, (specHelper.timeout * 2)); | ||
}, (specHelper.timeout * 2)); | ||
}); | ||
}); | ||
describe('[with connection]', function() { | ||
@@ -54,14 +79,5 @@ before(function(done){ | ||
beforeEach(function(done) { | ||
specHelper.cleanup(function(){ | ||
done(); | ||
}); | ||
}); | ||
beforeEach(function(done){ specHelper.cleanup(done); }); | ||
after(function(done){ specHelper.cleanup(done); }); | ||
after(function(done){ | ||
specHelper.cleanup(function(){ | ||
done(); | ||
}); | ||
}); | ||
it("can boot", function(done){ | ||
@@ -68,0 +84,0 @@ scheduler.start(); |
509251
41
3574
394
4
38