node-resque
Advanced tools
Comparing version 0.1.0 to 0.2.0
@@ -20,21 +20,8 @@ // If a job with the same name, queue, and args is already in the delayed queue(s), do not enqueue it again | ||
var self = this; | ||
var delayedKeyMatcher = self.worker.connection.key('delayed', "*"); | ||
var found = false; | ||
self.worker.connection.redis.keys(delayedKeyMatcher, function(err, timestamps){ | ||
var started = 0; | ||
timestamps.forEach(function(timestamp){ | ||
started++; | ||
self.worker.connection.redis.lrange(timestamp, 0, -1, function(err, jobs){ | ||
for(var i in jobs){ | ||
var job = JSON.parse(jobs[i]); | ||
if(job.class == self.func && job.queue == self.queue && JSON.stringify(job.args) == JSON.stringify(self.args)){ | ||
found = true; | ||
break; | ||
} | ||
} | ||
started--; | ||
if(started == 0){ callback(null, !found); } | ||
}); | ||
}); | ||
if(started == 0){ callback(null, !found); } | ||
self.worker.queueObject.scheduledAt(self.queue, self.func, self.args, function(err, timestamps){ | ||
if(timestamps.length > 0){ | ||
callback(null, false); | ||
}else{ | ||
callback(null, true); | ||
} | ||
}); | ||
@@ -41,0 +28,0 @@ } |
@@ -32,2 +32,10 @@ var util = require('util'); | ||
queue.prototype.encode = function(q, func, args){ | ||
return JSON.stringify({ | ||
"class": func, | ||
queue: q, | ||
args: args || [] | ||
}); | ||
} | ||
queue.prototype.enqueue = function(q, func, args, callback){ | ||
@@ -41,6 +49,3 @@ var self = this; | ||
self.connection.redis.sadd(self.connection.key('queues'), q, function(){ | ||
self.connection.redis.rpush(self.connection.key('queue', q), JSON.stringify({ | ||
"class": func, | ||
args: args || [] | ||
}), function(){ | ||
self.connection.redis.rpush(self.connection.key('queue', q), self.encode(q, func, args), function(){ | ||
self.runPlugins('after_enqueue', func, q, job, args, function(err, toRun){ | ||
@@ -58,11 +63,9 @@ if(typeof callback == "function"){ callback(); } | ||
var self = this; | ||
var item = JSON.stringify({ | ||
"class": func, | ||
queue: q, | ||
args: args || [] | ||
}); | ||
var item = self.encode(q, func, args); | ||
var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms | ||
self.connection.redis.rpush(self.connection.key("delayed:" + rTimestamp), item, function(){ | ||
self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){ | ||
if(typeof callback == "function"){ callback(); } | ||
self.connection.redis.sadd(self.connection.key("timestamps:" + item), self.connection.key("delayed:" + rTimestamp), function(){ | ||
self.connection.redis.zadd(self.connection.key('delayed_queue_schedule'), rTimestamp, rTimestamp, function(){ | ||
if(typeof callback == "function"){ callback(); } | ||
}); | ||
}); | ||
@@ -94,2 +97,49 @@ }); | ||
queue.prototype.del = function(q, func, args, count, callback){ | ||
var self = this; | ||
if(typeof count == "function" && callback == null){ | ||
callback = count; | ||
count = 0; // remove all enqueued items that match | ||
} | ||
self.connection.redis.lrem(self.connection.key('queue', q), count, self.encode(q, func, args), function(err, count){ | ||
if(typeof callback == "function"){ callback(err, count); } | ||
}); | ||
} | ||
queue.prototype.delDelayed = function(q, func, args, callback){ | ||
var self = this; | ||
var search = self.encode(q, func, args); | ||
var timestamps = self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){ | ||
if(members.length == 0 ){ if(typeof callback == "function"){ callback(err, null); } } | ||
else{ | ||
var started = 0; | ||
var timestamps = []; | ||
members.forEach(function(key){ | ||
started++; | ||
self.connection.redis.lrem(key, 0, search, function(){ | ||
self.connection.redis.srem(self.connection.key("timestamps:" + search), key, function(){ | ||
timestamps.push(key.split(":")[key.split(":").length - 1]); | ||
started--; | ||
if(started == 0){ | ||
if(typeof callback == "function"){ callback(err, timestamps); } | ||
} | ||
}) | ||
}) | ||
}); | ||
} | ||
}) | ||
} | ||
queue.prototype.scheduledAt = function(q, func, args, callback){ | ||
var self = this; | ||
var search = self.encode(q, func, args); | ||
self.connection.redis.smembers(self.connection.key("timestamps:" + search), function(err, members){ | ||
var timestamps = []; | ||
members.forEach(function(key){ | ||
timestamps.push(key.split(":")[key.split(":").length - 1]); | ||
}) | ||
if(typeof callback == "function"){ callback(err, timestamps); } | ||
}); | ||
} | ||
exports.queue = queue; |
@@ -108,8 +108,10 @@ // TODO: Locking like ruby does | ||
self.connection.redis.lpop(key, function(err, job){ | ||
self.cleanupTimestamp(timestamp, function(){ | ||
if (err) { | ||
callback(err); | ||
} else { | ||
callback(false, JSON.parse(job)); | ||
} | ||
self.connection.redis.srem(self.connection.key("timestamps:" + job), key, function(){ | ||
self.cleanupTimestamp(timestamp, function(){ | ||
if (err) { | ||
callback(err); | ||
} else { | ||
callback(false, JSON.parse(job)); | ||
} | ||
}); | ||
}); | ||
@@ -116,0 +118,0 @@ }); |
@@ -5,3 +5,3 @@ { | ||
"description": "an opinionated implementation of resque in node", | ||
"version": "0.1.0", | ||
"version": "0.2.0", | ||
"homepage": "http://github.com/taskrabbit/node-resque", | ||
@@ -12,3 +12,3 @@ "repository": { | ||
}, | ||
"main": "index.js", | ||
"main": "index.js", | ||
"keywords": ["delayed", "queue", "resque", "redis"], | ||
@@ -15,0 +15,0 @@ "engines": { |
# node-resque | ||
Delayed Tasks in nodejs. A very opinionated but compatible API with [resque](https://github.com/resque/resque) and [resque scheduler](https://github.com/resque/resque-scheduler) | ||
[Find me on the NPM @ node-resque](https://npmjs.org/package/node-resque) | ||
[![Build Status](https://secure.travis-ci.org/taskrabbit/node-resque.png?branch=master)](http://travis-ci.org/taskrabbit/node-resque) | ||
@@ -137,3 +139,17 @@ | ||
``` | ||
## Queue Managment | ||
Additonal methods provided on the `queue` object: | ||
- **queue.prototype.queues** = function(callback) | ||
- callback(error, array_of_queues) | ||
- **queue.prototype.length** = function(q, callback) | ||
- callback(error, number_of_elements_in_queue) | ||
- **queue.prototype.del** = function(q, func, args, count, callback) | ||
- callback(error, number_of_items_deleted) | ||
- **queue.prototype.delDelayed** = function(q, func, args, callback) | ||
- callback(error, timestamps_the_job_was_removed_from) | ||
- **queue.prototype.scheduledAt** = function(q, func, args, callback) | ||
- callback(error, timestamps_the_job_is_scheduled_for) | ||
## Plugins | ||
@@ -140,0 +156,0 @@ |
@@ -149,13 +149,11 @@ describe('plugins', function(){ | ||
queue.enqueueIn((10 * 1000) ,specHelper.queue, "uniqueJob", [1,2], function(){ | ||
setTimeout(function(){ | ||
queue.enqueue(specHelper.queue, "uniqueJob", [1,2], function(){ | ||
specHelper.redis.zcount(specHelper.namespace + ":delayed_queue_schedule", '-inf', '+inf', function(err, delayedLen){ | ||
queue.length(specHelper.queue, function(err, queueLen){ | ||
delayedLen.should.equal(1); | ||
queueLen.should.equal(0); | ||
done(); | ||
}); | ||
queue.enqueue(specHelper.queue, "uniqueJob", [1,2], function(){ | ||
specHelper.redis.zcount(specHelper.namespace + ":delayed_queue_schedule", '-inf', '+inf', function(err, delayedLen){ | ||
queue.length(specHelper.queue, function(err, queueLen){ | ||
delayedLen.should.equal(1); | ||
queueLen.should.equal(0); | ||
done(); | ||
}); | ||
}); | ||
}, 1001) | ||
}); | ||
}); | ||
@@ -166,13 +164,11 @@ }); | ||
queue.enqueueIn((10 * 1000) ,specHelper.queue, "uniqueJob", [1,2], function(){ | ||
setTimeout(function(){ | ||
queue.enqueue(specHelper.queue, "uniqueJob", [3,4], function(){ | ||
specHelper.redis.zcount(specHelper.namespace + ":delayed_queue_schedule", '-inf', '+inf', function(err, delayedLen){ | ||
queue.length(specHelper.queue, function(err, queueLen){ | ||
delayedLen.should.equal(1); | ||
queueLen.should.equal(1); | ||
done(); | ||
}); | ||
queue.enqueue(specHelper.queue, "uniqueJob", [3,4], function(){ | ||
specHelper.redis.zcount(specHelper.namespace + ":delayed_queue_schedule", '-inf', '+inf', function(err, delayedLen){ | ||
queue.length(specHelper.queue, function(err, queueLen){ | ||
delayedLen.should.equal(1); | ||
queueLen.should.equal(1); | ||
done(); | ||
}); | ||
}); | ||
}, 1001) | ||
}); | ||
}); | ||
@@ -179,0 +175,0 @@ }); |
@@ -9,8 +9,12 @@ describe('queue', function(){ | ||
specHelper.connect(function(){ | ||
specHelper.cleanup(function(){ | ||
done(); | ||
}); | ||
done(); | ||
}); | ||
}); | ||
beforeEach(function(done){ | ||
specHelper.cleanup(function(){ | ||
done(); | ||
}); | ||
}) | ||
after(function(done){ | ||
@@ -83,2 +87,36 @@ specHelper.cleanup(function(){ | ||
it('can find previously scheduled jobs', function(done){ | ||
queue.enqueueAt(10000, specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.scheduledAt(specHelper.queue, 'someJob', [1,2,3], function(err, timestamps){ | ||
timestamps.length.should.equal(1); | ||
timestamps[0].should.equal('10'); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('can deleted an enqued job', function(done){ | ||
queue.enqueue(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(1); | ||
queue.del(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(0); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('can deleted a delayed job', function(done){ | ||
queue.enqueueAt(10000, specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.delDelayed(specHelper.queue, 'someJob', [1,2,3], function(err, timestamps){ | ||
timestamps.length.should.equal(1); | ||
timestamps[0].should.equal('10'); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); |
57641
1483
222