query-swarm
Advanced tools
Comparing version 0.0.5 to 0.0.6
@@ -5,2 +5,3 @@ 'use strict'; | ||
var EventEmitter = require('events').EventEmitter; | ||
var Redlock = require('redlock'); | ||
var async = require('async'); | ||
@@ -10,3 +11,3 @@ | ||
this.name = 'UndeadLock'; | ||
this.message = 'The lock expired before the query returned'; | ||
this.message = 'The lock expired before the query returned.'; | ||
this.cursor = cursor; | ||
@@ -53,3 +54,3 @@ this.tasks = tasks; | ||
threshold: typeof options.threshold === 'number' ? options.threshold : 10, | ||
// the max duration a query can run before we try again | ||
// the duration a worker waits between between polling an empty queue | ||
retryDelay: typeof options.retryDelay === 'number' ? options.retryDelay : 500, | ||
@@ -74,2 +75,5 @@ // the max duration a query can run before we try again | ||
self.options.concurrency = typeof options.concurrency === 'number' ? options.concurrency : 10; | ||
// create instance or redlock that won't retry for a lock | ||
self.redlock = new Redlock({ retryCount: 0 }, redis); | ||
} | ||
@@ -109,3 +113,3 @@ | ||
redis.del(self.id + ':lock', self.id + ':cursor', self.id + ':queue', self.id + ':processing', self.id + ':deadletter', callback); | ||
redis.del(self.id + ':lock', self.id + ':throttle', self.id + ':cursor', self.id + ':queue', self.id + ':processing', self.id + ':deadletter', callback); | ||
}); | ||
@@ -130,16 +134,19 @@ return self; | ||
// acquire a redis lock | ||
var timeout = Date.now() + self.options.lockTimeout; | ||
redis.set(self.id + ':lock', null, 'PX', self.options.lockTimeout + 1000, 'NX', function(err, result) { | ||
if(result === null) | ||
self.redlock.lock(self.id + ':lock', self.options.lockTimeout, function(err, lock) { | ||
if(err || !lock) | ||
return; | ||
// acquire the distributed throttle | ||
redis.set(self.id + ':throttle', null, 'PX', self.throttle, 'NX', function(err, result) { | ||
if(result === null) | ||
redis.set(self.id + ':throttle', null, 'PX', self.options.throttle, 'NX', function(err, throttle) { | ||
if(err || !throttle) { | ||
lock.unlock(); | ||
return; | ||
} | ||
// get the cursor | ||
redis.get(self.id + ':cursor', function(err, cursor) { | ||
if(err) | ||
if(err) { | ||
lock.unlock(); | ||
return self.emit('error', err, 'error getting cursor'); | ||
} | ||
@@ -151,7 +158,4 @@ cursor = JSON.parse(cursor); | ||
if(err) { | ||
self.emit('error', err, 'error running the user-provided query'); | ||
if(Date.now() >= timeout) return; | ||
return redis.del(self.id + ':lock', function(err) { | ||
if(err) self.emit('error', err, 'error releasing lock'); | ||
}); | ||
lock.unlock(); | ||
return self.emit('error', err, 'error running the user-provided query'); | ||
} | ||
@@ -162,5 +166,7 @@ | ||
// VERY BAD: we've exceeded our lock timeout | ||
if(Date.now() >= timeout) | ||
if(Date.now() >= lock.expiration) | ||
return self.master.emit('error', new OrphanedQuery(cursor, tasks)); | ||
// TODO: extend the lock if its expiration has fallen below a configurable buffer | ||
var multi = redis.multi(); | ||
@@ -178,7 +184,7 @@ | ||
// release the lock | ||
multi = multi.del(self.id + ':lock'); | ||
multi.exec(function(err) { | ||
// release the lock | ||
lock.unlock(); | ||
// TODO: this *could* be a bad place to have an error for different reasons | ||
@@ -185,0 +191,0 @@ // we need to check which command threw and act accordingly: |
{ | ||
"name": "query-swarm", | ||
"version": "0.0.5", | ||
"version": "0.0.6", | ||
"description": "Safely distribute query-driven tasks over a swarm of parallel functions on a single process or across multiple processes/machines.", | ||
@@ -31,9 +31,9 @@ "keywords": "schedule, queue, task, job, worker, redis, distributed, async", | ||
"async": "^0.9.0", | ||
"redis": "^0.11.0" | ||
"redis": "^0.11.0", | ||
"redlock": "^0.2.0" | ||
}, | ||
"devDependencies": { | ||
"async": "^0.9.0", | ||
"chai": "^1.9.2", | ||
"chai": "^3.0.0", | ||
"mocha": "^2.0.1" | ||
} | ||
} |
@@ -66,4 +66,10 @@ 'use strict'; | ||
setTimeout(function(){ | ||
// the swarm should query once | ||
assert.equal(q, 1); | ||
assert.ok(w > 9 && w < 12); // can be 10 or 11, because we have 2 workers | ||
// and consume the first 10 jobs before issuing a stop command; | ||
// could have processed 10 or 11 jobs, because we have 2 workers | ||
assert.isAbove(w, 9); | ||
assert.isBelow(w, 12); | ||
done(); | ||
@@ -76,4 +82,10 @@ }, 500); | ||
setTimeout(function(){ | ||
// the swarm should query one more time | ||
assert.equal(q, 2); | ||
assert.ok(w > 19 && w < 22); // can be 20 or 21, because we have 2 workers | ||
// and consume the next 10 jobs before issuing a stop command; | ||
// could have processed 20 or 21 jobs, because we have 2 workers | ||
assert.isAbove(w, 19); | ||
assert.isBelow(w, 22); | ||
done(); | ||
@@ -80,0 +92,0 @@ }, 500); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
618791
2
557
3
+ Addedredlock@^0.2.0
+ Addedredlock@0.2.0(transitive)