node-resque
Advanced tools
Comparing version 0.4.1 to 0.5.0
@@ -66,3 +66,3 @@ ///////////////////////// | ||
pluginOptions: { | ||
myPlugin: {messagePrefix: '!!!'}, | ||
myPlugin: {messagePrefix: '[ custom logger plugin ]'}, | ||
}, | ||
@@ -69,0 +69,0 @@ perform: function(a,b,callback){ |
@@ -10,3 +10,13 @@ var connection = function(options){ | ||
self.options = options; | ||
self.connected = false; | ||
if(options.redis == null){ | ||
self.connected = false; | ||
}else{ | ||
// already connected, so the 'connect' event won't be fired | ||
// we need another test | ||
self.connected = true; | ||
options.redis.get('test_key', function(err){ | ||
if(err){ self.connected = false; } | ||
}) | ||
} | ||
} | ||
@@ -27,3 +37,5 @@ | ||
var self = this; | ||
if(self.connected === false){ | ||
if(self.options.fake === true){ | ||
callack(); | ||
}else if(self.connected === false){ | ||
var err = new Error('not connected to redis'); | ||
@@ -61,3 +73,3 @@ if(typeof parentCallback === 'function'){ | ||
}); | ||
}else if(self.options.fake != true){ | ||
}else if(options.fake != true){ | ||
self.redis.select(options.database, function(err){ | ||
@@ -69,5 +81,6 @@ self.redis.info(function(err, data){ | ||
}else{ | ||
self.redis.info(function(err, data){ | ||
callback(err); | ||
}); | ||
// fakeredis cannot use the 'info' command; but you can assume you are connected | ||
process.nextTick(function(){ | ||
callback(); | ||
}) | ||
} | ||
@@ -74,0 +87,0 @@ } |
@@ -82,3 +82,3 @@ // If a job with the same name, queue, and args is already running, put this job back in the queue and try later | ||
}else{ | ||
return 1000; // in ms | ||
return 1001; // in ms | ||
} | ||
@@ -85,0 +85,0 @@ } |
@@ -26,3 +26,2 @@ var os = require("os"); | ||
self.job = null; | ||
self.activePlugins = []; | ||
@@ -129,3 +128,3 @@ self.runPlugin = pluginRunner.runPlugin | ||
d.on('error', function(err){ | ||
self.completeJob(err, null, callback); | ||
self.completeJob(err, null, true, callback); | ||
}); | ||
@@ -148,3 +147,3 @@ d.run(function(){ | ||
else if(toRun == false){ | ||
self.completeJob(null, null, callback); | ||
self.completeJob(null, null, false, callback); | ||
}else{ | ||
@@ -168,3 +167,3 @@ self.workingOn(job); | ||
}else{ | ||
self.completeJob(err, result, callback); | ||
self.completeJob(err, result, true, callback); | ||
} | ||
@@ -177,3 +176,3 @@ }); | ||
}else{ | ||
self.completeJob(new Error("Missing Job: " + job["class"]), null, callback); | ||
self.completeJob(new Error("Missing Job: " + job["class"]), null, true, callback); | ||
} | ||
@@ -184,44 +183,19 @@ } | ||
worker.prototype.completeJob = function(err, result, callback){ | ||
worker.prototype.completeJob = function(err, result, toRespond, callback){ | ||
var self = this; | ||
var job = self.job; | ||
var onComplete = function(){ | ||
if(err != null){ | ||
self.fail(err, job); | ||
if(err != null){ | ||
self.fail(err, job); | ||
}else if(toRespond){ | ||
self.succeed(result, job); | ||
} | ||
self.doneWorking(); | ||
self.job = null; | ||
process.nextTick((function() { | ||
if(self.options.looping){ | ||
return self.poll(); | ||
}else{ | ||
self.succeed(result, job); | ||
callback(); | ||
} | ||
self.doneWorking(); | ||
self.job = null; | ||
self.activePlugins = []; | ||
process.nextTick((function() { | ||
if(self.options.looping){ | ||
return self.poll(); | ||
}else{ | ||
callback(); | ||
} | ||
})); | ||
} | ||
if(self.activePlugins.length === 0){ | ||
onComplete(); | ||
}else{ | ||
var completedCallbacks = 0; | ||
self.activePlugins.forEach(function(plugin){ | ||
if(typeof plugin.jobComplete == 'function'){ | ||
plugin.jobComplete(function(){ | ||
completedCallbacks++; | ||
if(self.activePlugins.length == completedCallbacks){ | ||
onComplete(); | ||
} | ||
}); | ||
}else{ | ||
completedCallbacks++; | ||
if(self.activePlugins.length == completedCallbacks){ | ||
onComplete(); | ||
} | ||
} | ||
}); | ||
} | ||
})); | ||
}; | ||
@@ -228,0 +202,0 @@ |
{ | ||
"author": "Evan Tahler <evantahler@gmail.com>", | ||
"name": "node-resque", | ||
"author": "Evan Tahler <evantahler@gmail.com>", | ||
"name": "node-resque", | ||
"description": "an opinionated implementation of resque in node", | ||
"version": "0.4.1", | ||
"homepage": "http://github.com/taskrabbit/node-resque", | ||
"version": "0.5.0", | ||
"homepage": "http://github.com/taskrabbit/node-resque", | ||
"repository": { | ||
@@ -12,3 +12,9 @@ "type": "git", | ||
"main": "index.js", | ||
"keywords": ["delayed", "queue", "resque", "redis", "worker"], | ||
"keywords": [ | ||
"delayed", | ||
"queue", | ||
"resque", | ||
"redis", | ||
"worker" | ||
], | ||
"engines": { | ||
@@ -18,11 +24,13 @@ "node": ">=0.8.0" | ||
"dependencies": { | ||
"redis": "~0.10.x" | ||
"redis": "~0.10.x", | ||
"async": "^0.2.10" | ||
}, | ||
"devDependencies": { | ||
"mocha": "latest", | ||
"should": "latest" | ||
"mocha": "latest", | ||
"should": "latest", | ||
"fakeredis": "latest" | ||
}, | ||
"scripts": { | ||
"test": "mocha" | ||
"test": "node ./test/runner" | ||
} | ||
} |
var redis = require('redis'); | ||
var fakeredis = require('fakeredis'); | ||
var namespace = "resque_test"; | ||
var queue = "test_queue"; | ||
var toFakeredis = true; | ||
if(process.env.fakeredis == 'false'){ toFakeredis = false; } | ||
exports.specHelper = { | ||
toFakeredis: toFakeredis, | ||
NR: require(__dirname + "/../index.js"), | ||
@@ -11,2 +16,3 @@ namespace: namespace, | ||
connectionDetails: { | ||
fake: toFakeredis, | ||
host: "127.0.0.1", | ||
@@ -21,12 +27,24 @@ password: "", | ||
var self = this; | ||
self.redis = redis.createClient(self.connectionDetails.port, self.connectionDetails.host, self.connectionDetails.options); | ||
if(self.connectionDetails.password != null && self.connectionDetails.password != ""){ | ||
self.redis.auth(self.connectionDetails.password, function(err){ | ||
if(toFakeredis != true){ | ||
self.redis = redis.createClient(self.connectionDetails.port, self.connectionDetails.host, self.connectionDetails.options); | ||
if(self.connectionDetails.password != null && self.connectionDetails.password != ""){ | ||
self.redis.auth(self.connectionDetails.password, function(err){ | ||
self.redis.select(self.connectionDetails.database, function(err){ | ||
self.connectionDetails.redis = self.redis; | ||
callback(err); | ||
}); | ||
}); | ||
}else{ | ||
self.redis.select(self.connectionDetails.database, function(err){ | ||
self.connectionDetails.redis = self.redis; | ||
callback(err); | ||
}); | ||
}); | ||
} | ||
}else{ | ||
self.redis = fakeredis.createClient('test'); | ||
self.redis.select(self.connectionDetails.database, function(err){ | ||
callback(err); | ||
process.nextTick(function(){ | ||
self.connectionDetails.redis = self.redis; | ||
callback(err); | ||
}); | ||
}); | ||
@@ -51,5 +69,5 @@ } | ||
var self = this; | ||
self.worker = new self.NR.worker({connection: self.connectionDetails, queues: self.queue, timeout: self.timeout}, jobs, function(){ | ||
self.scheduler = new self.NR.scheduler({connection: self.connectionDetails, timeout: self.timeout}, function(){ | ||
self.queue = new self.NR.queue({connection: self.connectionDetails}, function(){ | ||
self.worker = new self.NR.worker({connection: {redis: self.redis}, queues: self.queue, timeout: self.timeout}, jobs, function(){ | ||
self.scheduler = new self.NR.scheduler({connection: {redis: self.redis}, timeout: self.timeout}, function(){ | ||
self.queue = new self.NR.queue({connection: {redis: self.redis}}, function(){ | ||
callback(); | ||
@@ -56,0 +74,0 @@ }); |
@@ -33,19 +33,13 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper; | ||
it('can add a normal job', function(done){ | ||
queue.enqueue(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
specHelper.popFromQueue(function(err, obj){ | ||
should.exist(obj); | ||
obj = JSON.parse(obj); | ||
obj['class'].should.equal('someJob'); | ||
// obj['args'].should.equal([1,2,3]); | ||
describe('[with connection]', function(){ | ||
beforeEach(function(done){ | ||
queue = new specHelper.NR.queue({connection: specHelper.connectionDetails, queue: specHelper.queue}, function(){ | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}) | ||
it('can add delayed job (enqueueAt)', function(done){ | ||
queue.enqueueAt(10000, specHelper.queue, 'someJob', [1,2,3], function(){ | ||
specHelper.redis.zscore(specHelper.namespace + ":delayed_queue_schedule", "10", function(err, score){ | ||
score.should.equal("10"); | ||
specHelper.redis.lpop(specHelper.namespace + ":delayed:" + "10", function(err, obj){ | ||
it('can add a normal job', function(done){ | ||
queue.enqueue(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
specHelper.popFromQueue(function(err, obj){ | ||
should.exist(obj); | ||
@@ -55,69 +49,85 @@ obj = JSON.parse(obj); | ||
// obj['args'].should.equal([1,2,3]); | ||
done(); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('can add delayed job (enqueueIn)', function(done){ | ||
var now = Math.round( new Date().getTime() / 1000 ) + 5; | ||
queue.enqueueIn(5 * 1000, specHelper.queue, 'someJob', [1,2,3], function(){ | ||
specHelper.redis.zscore(specHelper.namespace + ":delayed_queue_schedule", now, function(err, score){ | ||
score.should.equal(String(now)); | ||
specHelper.redis.lpop(specHelper.namespace + ":delayed:" + now, function(err, obj){ | ||
should.exist(obj); | ||
obj = JSON.parse(obj); | ||
obj['class'].should.equal('someJob'); | ||
// obj['args'].should.equal([1,2,3]); | ||
done(); | ||
it('can add delayed job (enqueueAt)', function(done){ | ||
queue.enqueueAt(10000, specHelper.queue, 'someJob', [1,2,3], function(){ | ||
specHelper.redis.zscore(specHelper.namespace + ":delayed_queue_schedule", "10", function(err, score){ | ||
String(score).should.equal("10"); | ||
specHelper.redis.lpop(specHelper.namespace + ":delayed:" + "10", function(err, obj){ | ||
should.exist(obj); | ||
obj = JSON.parse(obj); | ||
obj['class'].should.equal('someJob'); | ||
// obj['args'].should.equal([1,2,3]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('can get the number of jobs currently enqueued', function(done){ | ||
queue.enqueue(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
it('can add delayed job (enqueueIn)', function(done){ | ||
var now = Math.round( new Date().getTime() / 1000 ) + 5; | ||
queue.enqueueIn(5 * 1000, specHelper.queue, 'someJob', [1,2,3], function(){ | ||
specHelper.redis.zscore(specHelper.namespace + ":delayed_queue_schedule", now, function(err, score){ | ||
String(score).should.equal(String(now)); | ||
specHelper.redis.lpop(specHelper.namespace + ":delayed:" + now, function(err, obj){ | ||
should.exist(obj); | ||
obj = JSON.parse(obj); | ||
obj['class'].should.equal('someJob'); | ||
// obj['args'].should.equal([1,2,3]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('can get the number of jobs currently enqueued', function(done){ | ||
queue.enqueue(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(2); | ||
queue.enqueue(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(2); | ||
done(); | ||
}) | ||
}); | ||
}) | ||
}); | ||
it('can find previously scheduled jobs', function(done){ | ||
queue.enqueueAt(10000, specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.scheduledAt(specHelper.queue, 'someJob', [1,2,3], function(err, timestamps){ | ||
timestamps.length.should.equal(1); | ||
timestamps[0].should.equal('10'); | ||
done(); | ||
}) | ||
}); | ||
}); | ||
}) | ||
}); | ||
}); | ||
it('can find previously scheduled jobs', function(done){ | ||
queue.enqueueAt(10000, specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.scheduledAt(specHelper.queue, 'someJob', [1,2,3], function(err, timestamps){ | ||
timestamps.length.should.equal(1); | ||
timestamps[0].should.equal('10'); | ||
done(); | ||
it('can deleted an enqued job', function(done){ | ||
queue.enqueue(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(1); | ||
queue.del(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(0); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('can deleted an enqued job', function(done){ | ||
queue.enqueue(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(1); | ||
queue.del(specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.length(specHelper.queue, function(err, len){ | ||
len.should.equal(0); | ||
done(); | ||
}); | ||
}); | ||
it('can deleted a delayed job', function(done){ | ||
queue.enqueueAt(10000, specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.delDelayed(specHelper.queue, 'someJob', [1,2,3], function(err, timestamps){ | ||
timestamps.length.should.equal(1); | ||
timestamps[0].should.equal('10'); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('can deleted a delayed job', function(done){ | ||
queue.enqueueAt(10000, specHelper.queue, 'someJob', [1,2,3], function(){ | ||
queue.delDelayed(specHelper.queue, 'someJob', [1,2,3], function(err, timestamps){ | ||
timestamps.length.should.equal(1); | ||
timestamps[0].should.equal('10'); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
}); |
@@ -6,3 +6,3 @@ var specHelper = require(__dirname + "/../_specHelper.js").specHelper; | ||
var jobDelay = 100; | ||
var jobDelay = 1000; | ||
@@ -89,2 +89,3 @@ var jobs = { | ||
done(); | ||
worker1.end(); | ||
} | ||
@@ -100,2 +101,3 @@ return this.worker.connection.key('customKey', Math.max.apply(Math.max, this.args)); | ||
} | ||
worker1 = new specHelper.NR.worker({connection: specHelper.connectionDetails, timeout: specHelper.timeout, queues: specHelper.queue}, jobs, function(){ | ||
@@ -105,3 +107,5 @@ queue.enqueue(specHelper.queue, "jobLockAdd", [1,2], function(){ | ||
}); | ||
}) | ||
}); | ||
worker1.on('error', function(queue, job, error){ console.log(error.stack); }) | ||
}) | ||
@@ -108,0 +112,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Environment variable access
Supply chain riskPackage accesses environment variables, which may be a sign of credential stuffing or data theft.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
81678
32
2071
2
3
27
+ Addedasync@^0.2.10
+ Addedasync@0.2.10(transitive)