New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

node-resque

Package Overview
Dependencies
Maintainers
3
Versions
181
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

node-resque - npm Package Compare versions

Comparing version 0.16.5 to 0.17.0

examples/scheduledJobs.js

1

examples/example.js

@@ -89,2 +89,3 @@ /////////////////////////

scheduler.on('poll', function(){ console.log("scheduler polling"); })
scheduler.on('master', function(state){ console.log("scheduler became master"); })
scheduler.on('error', function(error){ console.log("scheduler error >> " + error); })

@@ -91,0 +92,0 @@ scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); })

145

lib/scheduler.js

@@ -1,5 +0,7 @@

// TODO: Locking like ruby does
// To read notes about the master locking scheme, check out:
// https://github.com/resque/resque-scheduler/blob/master/lib/resque/scheduler/locking.rb
var EventEmitter = require('events').EventEmitter;
var util = require("util");
var os = require("os");
var connection = require(__dirname + "/connection.js").connection;

@@ -20,4 +22,6 @@ var queue = require(__dirname + "/queue.js").queue;

}
self.options = options;
self.running = false;
self.options = options;
self.name = self.options.name;
self.master = false;
self.running = false;
self.queue = new queue({connection: options.connection}, jobs, function(err){

@@ -34,3 +38,5 @@ self.connection = self.queue.connection;

return {
timeout: 5000,
timeout: 5000, // in ms
masterLockTimeout: 60 * 3, // in seconds
name: os.hostname() + ":" + process.pid, // assumes only one worker per node process
};

@@ -54,12 +60,18 @@ };

self.running = false;
if(self.processing === false){
clearTimeout(self.timer);
self.queue.end(function() {
self.emit('end');
self.connection.disconnect();
process.nextTick(function(){
if(typeof callback === 'function'){ callback(); }
self.releaseMasterLock(function(error, wasMaster){
if(error){ self.emit('error', error); }
self.queue.end(function() {
self.emit('end');
self.connection.disconnect();
process.nextTick(function(){
if(typeof callback === 'function'){ callback(); }
});
});
});
}else if(self.processing === true){
}
else{
setTimeout(function(){

@@ -75,18 +87,27 @@ self.end(callback);

clearTimeout(self.timer);
self.emit('poll');
self.nextDelayedTimestamp(function(err, timestamp){
if(!err && timestamp){
self.emit('working_timestamp', timestamp);
self.enqueueDelayedItemsForTimestamp(timestamp, function(err){
if(err){ self.emit('error', err); }
self.poll(callback);
self.tryForMaster(function(error, isMaster){
if(isMaster){
if(!self.master){
self.master = true;
self.emit('master');
}
self.emit('poll');
self.nextDelayedTimestamp(function(error, timestamp){
if(!error && timestamp){
self.emit('working_timestamp', timestamp);
self.enqueueDelayedItemsForTimestamp(timestamp, function(error){
if(error){ self.emit('error', error); }
self.poll(callback);
});
}else{
if(error){ self.emit('error', error); }
self.processing = false;
self.pollAgainLater();
if(typeof callback === 'function'){ callback(); }
}
});
}else{
if(err){ self.emit('error', err); }
self.master = false;
self.processing = false;
if(self.running === true){
self.timer = setTimeout((function() {
self.poll();
}), self.options.timeout);
}
self.pollAgainLater();
if(typeof callback === 'function'){ callback(); }

@@ -97,8 +118,56 @@ }

scheduler.prototype.pollAgainLater = function(){
var self = this;
if(self.running === true){
self.timer = setTimeout((function() {
self.poll();
}), self.options.timeout);
}
};
scheduler.prototype.tryForMaster = function(callback) {
var self = this;
var masterKey = self.connection.key('resque_scheduler_master_lock');
self.connection.redis.setnx(masterKey, self.options.name, function(error, locked){
if(error){ return callback(error); }
else if(locked === true || locked === 1){
self.connection.redis.expire(masterKey, self.options.masterLockTimeout, function(error){
return callback(error, true);
});
}else{
self.connection.redis.get(masterKey, function(error, value){
if(error){ return callback(error); }
else if(value === self.options.name){
self.connection.redis.expire(masterKey, self.options.masterLockTimeout, function(error){
return callback(error, true);
});
}else{
return callback(null, false);
}
});
}
});
};
scheduler.prototype.releaseMasterLock = function(callback){
var self = this;
var masterKey = self.connection.key('resque_scheduler_master_lock');
self.tryForMaster(function(error, isMaster){
if(error){ return callback(error); }
else if(!isMaster){ return callback(null, false); }
else{
self.connection.redis.del(masterKey, function(error, delted){
self.master = false;
callback(error, (delted === 1));
});
}
});
};
scheduler.prototype.nextDelayedTimestamp = function(callback) {
var self = this;
var time = Math.round(new Date().getTime() / 1000);
self.connection.redis.zrangebyscore(self.connection.key('delayed_queue_schedule'), '-inf', time, 'limit', 0, 1, function(err, items) {
if (err || items === null || items.length === 0) {
callback(err);
self.connection.redis.zrangebyscore(self.connection.key('delayed_queue_schedule'), '-inf', time, 'limit', 0, 1, function(error, items) {
if (error || items === null || items.length === 0) {
callback(error);
} else {

@@ -112,4 +181,4 @@ callback(null, items[0]);

var self = this;
self.nextItemForTimestamp(timestamp, function(err, job){
if (!err && job ) {
self.nextItemForTimestamp(timestamp, function(error, job){
if (!error && job ) {
self.transfer(timestamp, job, function(){

@@ -119,3 +188,3 @@ self.enqueueDelayedItemsForTimestamp(timestamp, callback);

} else {
callback(err);
callback(error);
}

@@ -128,10 +197,10 @@ });

var key = self.connection.key("delayed:" + timestamp);
self.connection.redis.lpop(key, function(err, job){
if(err){
callback(err);
self.connection.redis.lpop(key, function(error, job){
if(error){
callback(error);
}else{
self.connection.redis.srem(self.connection.key("timestamps:" + job), key, function(err){
self.connection.redis.srem(self.connection.key("timestamps:" + job), key, function(error){
self.cleanupTimestamp(timestamp, function(){
if (err) {
callback(err);
if (error) {
callback(error);
} else {

@@ -148,4 +217,4 @@ callback(null, JSON.parse(job));

var self = this;
self.queue.enqueue(job.queue, job.class, job.args, function(err){
if(err){ self.emit('error', err); }
self.queue.enqueue(job.queue, job.class, job.args, function(error){
if(error){ self.emit('error', error); }
self.emit('transferred_job', timestamp, job);

@@ -159,3 +228,3 @@ callback();

var key = self.connection.key("delayed:" + timestamp);
self.connection.redis.llen(key, function(err, len) {
self.connection.redis.llen(key, function(error, len) {
if (len === 0) {

@@ -162,0 +231,0 @@ self.connection.redis.del(key);

@@ -5,3 +5,4 @@ {

"description": "an opinionated implementation of resque in node",
"version": "0.16.5",
"license": "Apache-2.0",
"version": "0.17.0",
"homepage": "http://github.com/taskrabbit/node-resque",

@@ -28,5 +29,6 @@ "repository": {

"devDependencies": {
"mocha": "latest",
"should": "latest",
"fakeredis": "latest"
"mocha": "latest",
"should": "latest",
"fakeredis": "latest",
"node-schedule": "latest"
},

@@ -33,0 +35,0 @@ "scripts": {

@@ -88,2 +88,3 @@ # node-resque

scheduler.on('error', function(error){ console.log("scheduler error >> " + error); })
scheduler.on('master', function(){ console.log("scheduler became master"); })
scheduler.on('poll', function(){ console.log("scheduler polling"); })

@@ -245,2 +246,22 @@ scheduler.on('working_timestamp', function(timestamp){ console.log("scheduler working timestamp " + timestamp); })

## Job Schedules
You may want to use node-resque to schedule jobs every minute/hour/day, like a distribued CRON job. There are a nuber of excelent node packages to help you with this, like (node-schedule)[https://github.com/tejasmanohar/node-schedule] and (node-cron)[https://github.com/ncb000gt/node-cron]. Node-resque makes it possible for you to use the packages to schedule jobs with.
Assuming you are running node-resque across multiple machines, you will need to ensure that only one of your processes is actually scheduluing the jobs. To help you with this, you can inspect which of the scheduler processes is corrently acting as master, and flag only the master scheduler process to run the schedule. A full example can be found at [/examples/scheduledJobs.js](https://github.com/taskrabbit/node-resque/blob/master/examples/scheduledJobs.js), but the relevent section is:
``` javascript
var schedule = require('node-schedule');
var queue = new NR.queue({connection: connectionDetails}, jobs, function(){
schedule.scheduleJob('10,20,30,40,50 * * * * *', function(){ // do this job every 10 seconds, cron style
// we want to ensure that only one instance of this job is scheduled in our enviornment at once,
// no matter how many schedulers we have running
if(scheduler.master){
console.log(">>> enquing a job");
queue.enqueue('time', "ticktock", new Date().toString() );
}
});
});
```
## Plugins

@@ -247,0 +268,0 @@

@@ -42,2 +42,27 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper;

describe('locking', function(){
before(function(done){ specHelper.connect(done); });
beforeEach(function(done){ specHelper.cleanup(done); });
after(function(done){ specHelper.cleanup(done); });
it('should only have one master; and can failover', function(done){
sheduler_1 = new specHelper.NR.scheduler({connection: specHelper.connectionDetails, name: 'scheduler_1', timeout: specHelper.timeout});
sheduler_2 = new specHelper.NR.scheduler({connection: specHelper.connectionDetails, name: 'scheduler_2', timeout: specHelper.timeout});
sheduler_1.start();
sheduler_2.start();
setTimeout(function(){
sheduler_1.master.should.equal(true);
sheduler_2.master.should.equal(false);
sheduler_1.end();
setTimeout(function(){
sheduler_1.master.should.equal(false);
sheduler_2.master.should.equal(true);
sheduler_2.end(function(){ done(); });
}, (specHelper.timeout * 2));
}, (specHelper.timeout * 2));
});
});
describe('[with connection]', function() {

@@ -54,14 +79,5 @@ before(function(done){

beforeEach(function(done) {
specHelper.cleanup(function(){
done();
});
});
beforeEach(function(done){ specHelper.cleanup(done); });
after(function(done){ specHelper.cleanup(done); });
after(function(done){
specHelper.cleanup(function(){
done();
});
});
it("can boot", function(done){

@@ -68,0 +84,0 @@ scheduler.start();

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc