query-swarm
Advanced tools
Comparing version 0.2.0 to 0.3.0
@@ -56,2 +56,4 @@ 'use strict'; | ||
lockTimeout: typeof options.lockTimeout === 'number' ? options.lockTimeout : 20000, | ||
// the max number of attempts to process a task | ||
maxProcessingRetries: typeof options.maxProcessingRetries === 'number' ? options.maxProcessingRetries : 0, | ||
// maximum concurrent workers per process | ||
@@ -138,3 +140,3 @@ get concurrency() { return concurrency; }, | ||
// acquire the distributed throttle | ||
redis.set(self.id + ':throttle', null, 'PX', self.options.throttle, 'NX', function(err, throttle) { | ||
self.redlock.lock(self.id + ':throttle', self.options.throttle, function(err, throttle) { | ||
if(err || !throttle) { | ||
@@ -141,0 +143,0 @@ lock.unlock(); |
@@ -23,2 +23,11 @@ 'use strict'; | ||
function RequeueError(task) { | ||
this.name = 'RequeueError'; | ||
this.message = 'Unable to requeue task. Retrying.'; | ||
this.task = task; | ||
} | ||
util.inherits(RequeueError, Error); | ||
module.exports = function(redis) { | ||
@@ -33,2 +42,3 @@ | ||
// null || setTimeoutObject from waiting job | ||
this.immediate = null; | ||
this.timeout = null; | ||
@@ -56,6 +66,7 @@ } | ||
// if we have a timout set, just clear it and return | ||
if(self.timeout !== null){ | ||
clearImmediate(self.timeout); | ||
// if we have a timeout or immediate set, just clear it and return | ||
if(self.timeout !== null || self.immediate !== null){ | ||
clearImmediate(self.immediate); | ||
clearTimeout(self.timeout); | ||
self.immediate = null; | ||
self.timeout = null; | ||
@@ -86,4 +97,5 @@ self.active = false; | ||
// we're done with the timeout, so remove it | ||
clearImmediate(self.timeout); | ||
clearImmediate(self.immediate); | ||
clearTimeout(self.timeout); | ||
self.immediate = null; | ||
self.timeout = null; | ||
@@ -101,3 +113,3 @@ | ||
return setImmediate(self.next.bind(self), method, false); | ||
return (self.immediate = setImmediate(self.next.bind(self), method, false)); | ||
}; | ||
@@ -133,6 +145,8 @@ | ||
// run the user-provided worker | ||
self.master.worker(JSON.parse(task), function(err, result){ | ||
self.master.worker(JSON.parse(task), function(err, result, forceDeadletter){ | ||
if(err) { | ||
self.master.emit('error', err, 'error running the user-proviced worker'); | ||
return self.deadletter(task); | ||
self.master.emit('error', err, 'error running the user-provided worker'); | ||
// checking maxProcessingRetries here prevents an unnecessary zsore in the requeue method if retries are not enabled | ||
return self.master.options.maxProcessingRetries && !forceDeadletter ? self.requeue(task) : self.deadletter(task); | ||
} | ||
@@ -158,2 +172,5 @@ | ||
// remove from retry list | ||
.zrem(self.master.id + ':retries', task) | ||
.exec(function(err){ | ||
@@ -166,2 +183,4 @@ if(err){ | ||
self.master.emit('error', new DeadletterError(task)); | ||
// save a reference to this timer somewhere so it can be cleared on stop? | ||
setTimeout(self.deadletter.bind(self), self.master.options.retryDelay, task); | ||
@@ -179,11 +198,54 @@ } | ||
// remove from processing list | ||
redis.lrem(self.master.id + ':processing', 1, task, function(err){ | ||
if(err){ | ||
self.master.emit('error', err, 'error acknowledging task'); | ||
self.master.emit('error', new AcknowledgeError(task)); | ||
setTimeout(self.acknowledge.bind(self), self.master.options.retryDelay, task); | ||
redis.multi() | ||
// remove from processing list | ||
.lrem(self.master.id + ':processing', 1, task) | ||
// remove from retry list | ||
.zrem(self.master.id + ':retries', task) | ||
.exec(function(err){ | ||
if(err){ | ||
self.master.emit('error', err, 'error acknowledging task'); | ||
self.master.emit('error', new AcknowledgeError(task)); | ||
// save a reference to this timer somewhere so it can be cleared on stop? | ||
setTimeout(self.acknowledge.bind(self), self.master.options.retryDelay, task); | ||
} | ||
self.master.emit('acknowledge', task); | ||
return self.next('consume'); | ||
}); | ||
}; | ||
Worker.prototype.requeue = function(task){ | ||
var self = this; | ||
redis.zscore(self.master.id + ':retries', task, function(err, score) { | ||
if (err) { | ||
self.master.emit('error', err, 'error requeueing task'); | ||
self.master.emit('error', new RequeueError(task)); | ||
// save a reference to this timer somewhere so it can be cleared on stop? | ||
return setTimeout(self.requeue.bind(self), self.master.options.retryDelay, task); | ||
} | ||
self.master.emit('acknowledge', task); | ||
return self.next('consume'); | ||
if (score >= self.master.options.maxProcessingRetries) { | ||
return self.deadletter(task); | ||
} | ||
// move from processing list back to queue and increment retry score | ||
redis.multi() | ||
.lrem(self.master.id + ':processing', 1, task) | ||
.lpush(self.master.id + ':queue', task) | ||
.zincrby(self.master.id + ':retries', 1, task) | ||
.exec(function(err, replies) { | ||
if(err){ | ||
self.master.emit('error', err, 'error requeueing task'); | ||
self.master.emit('error', new RequeueError(task)); | ||
// save a reference to this timer somewhere so it can be cleared on stop? | ||
return setTimeout(self.requeue.bind(self), self.master.options.retryDelay, task); | ||
} | ||
self.master.emit('requeue', task); | ||
return self.next('consume'); | ||
}); | ||
}); | ||
@@ -190,0 +252,0 @@ }; |
{ | ||
"name": "query-swarm", | ||
"version": "0.2.0", | ||
"version": "0.3.0", | ||
"description": "Safely distribute query-driven tasks over a swarm of parallel functions on a single process or across multiple processes/machines.", | ||
@@ -5,0 +5,0 @@ "keywords": "schedule, queue, task, job, worker, redis, distributed, async", |
@@ -75,1 +75,6 @@ Query Swarm [![Build Status](https://travis-ci.org/the-control-group/query-swarm.svg?branch=master)](https://travis-ci.org/the-control-group/query-swarm) | ||
- acknowledge: task | ||
- requeue: task | ||
Caveats | ||
------- | ||
Processing retry functionality via the `maxProcessingRetries` option, assumes your tasks are all unique at any given point in time. |
@@ -21,2 +21,14 @@ 'use strict'; | ||
after(function(done) { | ||
redis.del('QuerySwarm:test:cursor', done); | ||
}); | ||
before(function(done) { | ||
redis.scan('0',function(err, results) { | ||
if (err) return done(err); | ||
if (results[1].length !== 0) return done(new Error('Redis is not empty')); | ||
done(); | ||
}) | ||
}) | ||
describe('Start/Stop', function(){ | ||
@@ -203,2 +215,109 @@ var q = 0; | ||
describe('Requeue', function(){ | ||
it('should requeue up to maxProcessingRetries', function(done) { | ||
var r = 0; | ||
var d = 0; | ||
var e = 0; | ||
var q = 0; | ||
var w = 0; | ||
var tasks = [1,1]; | ||
var expectedWorkerRuns = tasks.length * 2; | ||
var expectedRequeues = tasks.length; | ||
var expectedDeadletters = tasks.length; | ||
var expectedErrors = tasks.length * 2; | ||
var swarm = new QuerySwarm( | ||
'QuerySwarm:test:'+this.test.fullTitle(), | ||
function(cursor, callback) { | ||
q++; | ||
callback(null, cursor, tasks.splice(0,1)); | ||
}, | ||
function(task, callback) { | ||
w++; | ||
callback(new Error('test')) | ||
}, | ||
{ | ||
throttle: 10, | ||
threshold: 4, | ||
retryDelay: 50, | ||
lockTimeout: 200, | ||
concurrency: 2, | ||
maxProcessingRetries: 1, | ||
} | ||
); | ||
swarm.on('error', function(){ | ||
e++; | ||
}); | ||
swarm.on('requeue', function(task) { | ||
r++; | ||
assert.equal(task, 1); | ||
}); | ||
swarm.on('deadletter', function(task) { | ||
d++; | ||
assert.equal(task, 1); | ||
}) | ||
swarm.start(); | ||
setTimeout(function(){ | ||
swarm.destroy(function(){ | ||
assert.equal(w, expectedWorkerRuns); | ||
assert.equal(r, expectedRequeues); | ||
assert.equal(d, expectedDeadletters); | ||
assert.equal(e, expectedErrors); | ||
done(); | ||
}); | ||
},500) | ||
}); | ||
it('should requeue up to maxProcessingRetries unless the worker specifies force_deadletter', function(done) { | ||
var r = 0; | ||
var d = 0; | ||
var e = 0; | ||
var q = 0; | ||
var w = 0; | ||
var tasks = [1,1]; | ||
var expectedWorkerRuns = tasks.length; | ||
var expectedRequeues = 0; | ||
var expectedDeadletters = tasks.length; | ||
var expectedErrors = tasks.length; | ||
var swarm = new QuerySwarm( | ||
'QuerySwarm:test:'+this.test.fullTitle(), | ||
function(cursor, callback) { | ||
q++; | ||
callback(null, cursor, tasks.splice(0,1)); | ||
}, | ||
function(task, callback) { | ||
w++; | ||
callback(new Error('test'), null, true) | ||
}, | ||
{ | ||
throttle: 10, | ||
threshold: 4, | ||
retryDelay: 50, | ||
lockTimeout: 200, | ||
concurrency: 2, | ||
maxProcessingRetries: 1, | ||
} | ||
); | ||
swarm.on('error', function(){ | ||
e++; | ||
}); | ||
swarm.on('requeue', function(task) { | ||
r++; | ||
assert.equal(task, 1); | ||
}); | ||
swarm.on('deadletter', function(task) { | ||
d++; | ||
assert.equal(task, 1); | ||
}) | ||
swarm.start(); | ||
setTimeout(function(){ | ||
swarm.destroy(function(){ | ||
assert.equal(w, expectedWorkerRuns); | ||
assert.equal(r, expectedRequeues); | ||
assert.equal(d, expectedDeadletters); | ||
assert.equal(e, expectedErrors); | ||
done(); | ||
}); | ||
},500) | ||
}); | ||
}); | ||
describe('Errors', function(){ | ||
@@ -205,0 +324,0 @@ |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
623417
726
80
12
1