node-resque
Advanced tools
Comparing version 0.10.1 to 0.10.2
@@ -166,2 +166,43 @@ var util = require('util'); | ||
queue.prototype.timestamps = function(callback){ | ||
var self = this; | ||
var results = []; | ||
self.connection.redis.keys(self.connection.key("delayed:*"), function(err, timestamps){ | ||
timestamps.forEach(function(timestamp){ | ||
var parts = timestamp.split(":"); | ||
results.push(parseInt(parts[(parts.length - 1)]) * 1000); | ||
}); | ||
results.sort(); | ||
callback(err, results); | ||
}); | ||
} | ||
queue.prototype.delayedAt = function(timestamp, callback){ | ||
var self = this; | ||
var rTimestamp = Math.round(timestamp / 1000); // assume timestamp is in ms | ||
var tasks = []; | ||
self.connection.redis.lrange(self.connection.key("delayed:" + rTimestamp), 0, -1, function(err, items){ | ||
items.forEach(function(i){ | ||
tasks.push( JSON.parse(i) ); | ||
}); | ||
callback(err, tasks, rTimestamp); | ||
}); | ||
} | ||
queue.prototype.allDelayed = function(callback){ | ||
var self = this; | ||
var started = 0; | ||
var results = {}; | ||
self.timestamps(function(err, timestamps){ | ||
timestamps.forEach(function(timestamp){ | ||
started++; | ||
self.delayedAt(timestamp, function(err, tasks, rTimestamp){ | ||
results[(rTimestamp * 1000)] = tasks; | ||
started--; | ||
if(started === 0){ callback(err, results) } | ||
}); | ||
}); | ||
}); | ||
} | ||
queue.prototype.workers = function(callback){ | ||
@@ -168,0 +209,0 @@ var self = this; |
@@ -5,3 +5,3 @@ { | ||
"description": "an opinionated implementation of resque in node", | ||
"version": "0.10.1", | ||
"version": "0.10.2", | ||
"homepage": "http://github.com/taskrabbit/node-resque", | ||
@@ -8,0 +8,0 @@ "repository": { |
@@ -168,2 +168,13 @@ # node-resque | ||
## Delayed Status | ||
- **queue.timestamps** = function(callback) | ||
- callback(error, timestamps) | ||
- **queue.delayedAt** = function(timestamp, callback) | ||
- callback(error, jobs_enqueued_at_this_timestamp) | ||
- **queue.allDelayed** = function(timestamp) | ||
- callback(error, jobsHash) | ||
- jobsHash is an object with its keys being timestamps, and the vales are arrays of jobs at each time. | ||
- note that this operation can be very slow and very ram-heavy | ||
## Worker Status | ||
@@ -170,0 +181,0 @@ |
@@ -160,2 +160,52 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper; | ||
describe('delayed status', function(){ | ||
beforeEach(function(done){ | ||
queue.enqueueAt(10000, specHelper.queue, 'job1', [1,2,3], function(){ | ||
queue.enqueueAt(10000, specHelper.queue, 'job2', [1,2,3], function(){ | ||
queue.enqueueAt(20000, specHelper.queue, 'job3', [1,2,3], function(){ | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('can list the timestamps that exist', function(done){ | ||
queue.timestamps(function(err, timestamps){ | ||
should.not.exist(err); | ||
timestamps.length.should.equal(2); | ||
timestamps[0].should.equal(10000); | ||
timestamps[1].should.equal(20000); | ||
done(); | ||
}); | ||
}); | ||
it('can list the jobs delayed at a timestamp', function(done){ | ||
queue.delayedAt(10000, function(err, tasks_a){ | ||
should.not.exist(err); | ||
tasks_a.length.should.equal(2); | ||
tasks_a[0].class.should.equal('job1'); | ||
tasks_a[1].class.should.equal('job2'); | ||
queue.delayedAt(20000, function(err, tasks_b){ | ||
should.not.exist(err); | ||
tasks_b.length.should.equal(1); | ||
tasks_b[0].class.should.equal('job3'); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('can also return a hash with all delayed tasks', function(done){ | ||
queue.allDelayed(function(err, hash){ | ||
Object.keys(hash).length.should.equal(2); | ||
Object.keys(hash)[0].should.equal('10000'); | ||
Object.keys(hash)[1].should.equal('20000'); | ||
hash['10000'].length.should.equal(2); | ||
hash['20000'].length.should.equal(1); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
describe('worker status', function(){ | ||
@@ -162,0 +212,0 @@ var workerA; |
116677
2674
273