Comparing version 0.1.9 to 0.1.10
@@ -98,3 +98,3 @@ "use strict"; | ||
'end'].join('\n'); | ||
return this.queue.client.evalAsync(script, 1, this.lockKey(), token).then(function(result){ | ||
@@ -105,8 +105,8 @@ return result === 1; | ||
Job.prototype.completed = function(){ | ||
return this._done('completed'); | ||
Job.prototype.moveToCompleted = function(){ | ||
return this._moveToSet('completed'); | ||
} | ||
Job.prototype.failed = function(err){ | ||
return this._done('failed'); | ||
Job.prototype.moveToFailed = function(err){ | ||
return this._moveToSet('failed'); | ||
} | ||
@@ -149,3 +149,3 @@ | ||
return this.queue.client.evalAsync( | ||
script, | ||
script, | ||
keys.length, | ||
@@ -167,16 +167,16 @@ keys[0], | ||
.sismemberAsync(this.queue.toKey(list), this.jobId).then(function(isMember){ | ||
return isMember === 1; | ||
}); | ||
return isMember === 1; | ||
}); | ||
} | ||
Job.prototype._done = function(list){ | ||
Job.prototype._moveToSet = function(set){ | ||
var queue = this.queue; | ||
var activeList = queue.toKey('active'); | ||
var completedList = queue.toKey(list); | ||
var destinationSet = queue.toKey(set); | ||
var multi = queue.multi(); | ||
return multi | ||
.lrem(activeList, 0, this.jobId) | ||
.sadd(completedList, this.jobId) | ||
.sadd(destinationSet, this.jobId) | ||
.execAsync(); | ||
@@ -183,0 +183,0 @@ } |
155
lib/queue.js
@@ -29,2 +29,3 @@ "use strict"; | ||
var LOCK_RENEW_TIME = 5000; // 5 seconds is the renew time. | ||
var CLIENT_CLOSE_TIMEOUT_MS = 5000; | ||
@@ -84,4 +85,12 @@ var Queue = function Queue(name, redisPort, redisHost, redisOptions){ | ||
Queue.prototype.close = function(){ | ||
this.client.end(); | ||
this.bclient.end(); | ||
var _this = this; | ||
var timeoutMsg = 'Timed out while waiting for redis clients to close'; | ||
return new Promise(function(resolve, reject) { | ||
var triggerEvent = _.after(2, resolve); | ||
_this.client.end(); | ||
_this.bclient.end(); | ||
_this.client.stream.on('close', triggerEvent); | ||
_this.client.stream.on('close', triggerEvent); | ||
}).timeout(CLIENT_CLOSE_TIMEOUT_MS, timeoutMsg); | ||
} | ||
@@ -98,7 +107,7 @@ | ||
this.handler = handler; | ||
this.run().catch(function(err){ | ||
console.log(err); | ||
}); | ||
this.handler = handler; | ||
}; | ||
@@ -141,4 +150,4 @@ | ||
var multi = this.multi(); | ||
multi.llen(this.toKey('wait')) | ||
multi.llen(this.toKey('paused')) | ||
multi.llen(this.toKey('wait')); | ||
multi.llen(this.toKey('paused')); | ||
@@ -173,14 +182,5 @@ return multi.execAsync().then(function(res){ | ||
return multi.execAsync().then(function(res){ | ||
var waiting = res[0]; | ||
var paused = res[1]; | ||
return multi.execAsync().spread(function(waiting, paused){ | ||
var jobKeys = (paused.concat(waiting)).map(_this.toKey, _this); | ||
var jobKeys = _.map(waiting, function(jobId){ | ||
return _this.toKey(jobId); | ||
}); | ||
jobKeys = jobKeys.concat(_.map(paused, function(jobId){ | ||
return _this.toKey(jobId); | ||
})); | ||
if(jobKeys.length){ | ||
@@ -213,11 +213,7 @@ var multi = _this.multi(); | ||
if(_this.processing){ | ||
_this.once('completed', function(){ | ||
_this.emit('paused'); | ||
resolve(); | ||
}); | ||
_this.once('completed', resolve); | ||
}else{ | ||
_this.emit('paused'); | ||
resolve(); | ||
} | ||
}); | ||
}).then(this.emit.bind(this, 'paused')); | ||
@@ -242,6 +238,3 @@ return this.paused; | ||
Queue.prototype.run = function(){ | ||
var _this = this; | ||
return this.processStalledJobs().then(function(){ | ||
return _this.processJobs(); | ||
}); | ||
return this.processStalledJobs().then(this.processJobs.bind(this)); | ||
} | ||
@@ -259,8 +252,8 @@ | ||
return Job.fromId(_this, jobId); | ||
})).then(function(jobs){ | ||
var tasks = jobs.map(function(job){ | ||
return _.bind(_this.processStalledJob, _this, job); | ||
}); | ||
return sequence(tasks); | ||
})) | ||
}).then(function(jobs){ | ||
var tasks = jobs.map(function(job){ | ||
return _this.processStalledJob.bind(_this, job); | ||
}); | ||
return sequence(tasks); | ||
}); | ||
@@ -286,9 +279,9 @@ } | ||
return this.getNextJob().then(function(job){ | ||
return _this.processJob(job); | ||
}).then(function(){ | ||
if(!_this.paused){ | ||
return _this.processJobs(); | ||
} | ||
}); | ||
return this.getNextJob() | ||
.then(this.processJob.bind(this)) | ||
.then(function(){ | ||
if(!_this.paused){ | ||
return _this.processJobs(); | ||
} | ||
}); | ||
} | ||
@@ -298,51 +291,41 @@ | ||
var _this = this; | ||
var deferred = Promise.defer(); | ||
var lockRenewTimeout; | ||
var lockRenewer = function(){ | ||
job.takeLock(_this.token, true); | ||
job.renewLock(); | ||
lockRenewTimeout = setTimeout(lockRenewer, _this.LOCK_RENEW_TIME/2); | ||
}; | ||
var runHandler = Promise.promisify(this.handler.bind(this)); | ||
if(!this.paused){ | ||
this.processing = true; | ||
try{ | ||
lockRenewer(); | ||
_this.handler(job, function(err, data){ | ||
if(err){ | ||
failed(err); | ||
}else{ | ||
completed(data); | ||
} | ||
}); | ||
} catch(err){ | ||
failed(err) | ||
} | ||
}else{ | ||
deferred.resolve(); | ||
function finishProcessing(){ | ||
clearTimeout(lockRenewTimeout); | ||
_this.processing = false; | ||
} | ||
function completed(data){ | ||
var promise = job.completed(); | ||
promise.then(function(){ | ||
clearTimeout(lockRenewTimeout); | ||
_this.processing = false; | ||
_this.emit('completed', job, data); | ||
function handleCompleted(data){ | ||
return job.moveToCompleted().then(function(){ | ||
_this.emit('completed', job, data); | ||
}); | ||
deferred.resolve(promise); | ||
} | ||
function failed(err){ | ||
var promise = job.failed(err); | ||
promise.then(function(){ | ||
job.releaseLock(_this.token).then(function(){ | ||
clearTimeout(lockRenewTimeout); | ||
_this.processing = false; | ||
_this.emit('failed', job, err); | ||
}); | ||
}); | ||
deferred.resolve(promise); | ||
function handleFailed(err){ | ||
var error = err.cause || err; //Handle explicit rejection | ||
return job.moveToFailed(err) | ||
.then(job.releaseLock.bind(job, _this.token)) | ||
.then(function(){ | ||
_this.emit('failed', job, error); | ||
}); | ||
} | ||
return deferred.promise; | ||
return new Promise(function (resolve, reject) { | ||
if(_this.paused){ | ||
return resolve(); | ||
} | ||
_this.processing = true; | ||
lockRenewer(); | ||
return runHandler(job) | ||
.then(handleCompleted, handleFailed) | ||
.then(finishProcessing) | ||
.then(resolve, reject); | ||
}); | ||
} | ||
@@ -354,6 +337,4 @@ | ||
Queue.prototype.getNextJob = function(){ | ||
var _this = this; | ||
return this.moveJob('wait', 'active').then(function(jobId){ | ||
return Job.fromId(_this, jobId); | ||
}); | ||
var getJobFromId = Job.fromId.bind(null, this); //should this be a queue method? | ||
return this.moveJob('wait', 'active').then(getJobFromId); | ||
} | ||
@@ -398,2 +379,4 @@ | ||
var _this = this; | ||
var key = this.toKey(queueType); | ||
var jobs; | ||
@@ -403,8 +386,3 @@ start = _.isUndefined(start) ? 0 : start; | ||
var key = this.toKey(queueType); | ||
var jobs; | ||
if(isList){ | ||
start = _.isUndefined(start) ? 0 : start; | ||
end = _.isUndefined(end) ? -1 : end; | ||
jobs = this.client.lrangeAsync(key, start, end); | ||
@@ -416,7 +394,4 @@ }else{ | ||
return jobs.then(function(jobIds){ | ||
if(jobIds.length){ | ||
return Promise.all(_.map(jobIds, function(jobId){ | ||
return Job.fromId(_this, jobId); | ||
})); | ||
} | ||
var jobsFromId = jobIds.map(Job.fromId.bind(null, _this)); | ||
return Promise.all(jobsFromId); | ||
}); | ||
@@ -423,0 +398,0 @@ } |
{ | ||
"name": "bull", | ||
"version": "0.1.9", | ||
"version": "0.1.10", | ||
"description": "Job manager", | ||
@@ -20,3 +20,3 @@ "main": "index.js", | ||
"dependencies": { | ||
"bluebird": "^2.3.0", | ||
"bluebird": "~2.3.0", | ||
"lodash": "~2.2.1", | ||
@@ -23,0 +23,0 @@ "node-uuid": "~1.4.1", |
@@ -17,2 +17,3 @@ Bull Job Manager | ||
[![BuildStatus](https://secure.travis-ci.org/OptimalBits/bull.png?branch=master)](http://travis-ci.org/OptimalBits/bull) | ||
[![NPM version](https://badge.fury.io/js/bull.svg)](http://badge.fury.io/js/bull) | ||
@@ -19,0 +20,0 @@ Follow [manast](http://twitter.com/manast) for news and updates regarding this library. |
@@ -8,3 +8,3 @@ var Job = require('../lib/job'); | ||
var queue; | ||
before(function(done){ | ||
@@ -27,9 +27,9 @@ queue = new Queue('test', 6379, '127.0.0.1'); | ||
expect(job).to.have.property('data'); | ||
expect(job.data.foo).to.be.equal('bar'); | ||
Job.fromId(queue, job.jobId).then(function(storedJob){ | ||
expect(storedJob).to.have.property('jobId'); | ||
expect(storedJob).to.have.property('data'); | ||
expect(storedJob.data.foo).to.be.equal('bar'); | ||
@@ -46,3 +46,3 @@ done(); | ||
}); | ||
it('remove', function(done){ | ||
@@ -52,5 +52,5 @@ Job.create(queue, 1, {foo: 'bar'}).then(function(job){ | ||
expect(job).to.have.property('data'); | ||
expect(job.data.foo).to.be.equal('bar'); | ||
job.remove().then(function(){ | ||
@@ -70,4 +70,4 @@ Job.fromId(queue, job.jobId).then(function(storedJob){ | ||
}) | ||
describe('Locking', function(){ | ||
@@ -78,3 +78,3 @@ it('take a lock', function(done){ | ||
expect(job).to.have.property('data'); | ||
return job.takeLock('123').then(function(lockTaken){ | ||
@@ -88,3 +88,3 @@ expect(lockTaken).to.be(true); | ||
}); | ||
it('take an already taken lock', function(done){ | ||
@@ -94,3 +94,3 @@ Job.create(queue, 2, {foo: 'bar'}).then(function(job){ | ||
expect(job).to.have.property('data'); | ||
return job.takeLock('123').then(function(lockTaken){ | ||
@@ -108,3 +108,3 @@ expect(lockTaken).to.be(true); | ||
}); | ||
it('renew a taken lock', function(done){ | ||
@@ -114,3 +114,3 @@ Job.create(queue, 3, {foo: 'bar'}).then(function(job){ | ||
expect(job).to.have.property('data'); | ||
return job.takeLock('123').then(function(lockTaken){ | ||
@@ -128,3 +128,3 @@ expect(lockTaken).to.be(true); | ||
}); | ||
it('release a lock', function(done){ | ||
@@ -134,3 +134,3 @@ Job.create(queue, 4, {foo: 'bar'}).then(function(job){ | ||
expect(job).to.have.property('data'); | ||
return job.takeLock('123').then(function(lockTaken){ | ||
@@ -153,4 +153,4 @@ expect(lockTaken).to.be(true); | ||
}) | ||
it('report progress', function(done){ | ||
@@ -175,4 +175,4 @@ Job.create(queue, 2, {foo: 'bar'}).then(function(job){ | ||
}); | ||
it('completed', function(done){ | ||
it('moveToCompleted', function(done){ | ||
Job.create(queue, 3, {foo: 'bar'}).then(function(job){ | ||
@@ -182,3 +182,3 @@ return job.isCompleted().then(function(isCompleted){ | ||
}).then(function(){ | ||
return job.completed(); | ||
return job.moveToCompleted(); | ||
}).then(function(){ | ||
@@ -194,4 +194,4 @@ return job.isCompleted().then(function(isCompleted){ | ||
}); | ||
it('failed', function(done){ | ||
it('moveToFailed', function(done){ | ||
Job.create(queue, 4, {foo: 'bar'}).then(function(job){ | ||
@@ -201,3 +201,3 @@ return job.isFailed().then(function(isFailed){ | ||
}).then(function(){ | ||
return job.failed(Error("test error")); | ||
return job.moveToFailed(Error("test error")); | ||
}).then(function(){ | ||
@@ -212,5 +212,5 @@ return job.isFailed().then(function(isFailed){ | ||
}); | ||
}); | ||
}); | ||
@@ -8,16 +8,31 @@ var Job = require('../lib/job'); | ||
function buildQueue() { | ||
return new Queue(STD_QUEUE_NAME, 6379, '127.0.0.1'); | ||
} | ||
function cleanupQueue(queue, done){ | ||
queue.empty() | ||
.then(queue.close.bind(queue)) | ||
.finally(done) | ||
} | ||
describe('Queue', function(){ | ||
var queue; | ||
beforeEach(function(done){ | ||
queue = Queue(STD_QUEUE_NAME, 6379, '127.0.0.1'); | ||
done(); | ||
afterEach(function(done){ | ||
if(queue){ | ||
cleanupQueue(queue, done); | ||
queue = undefined; | ||
} else { | ||
done(); | ||
} | ||
}); | ||
afterEach(function(done){ | ||
queue.empty().then(function(){ | ||
queue.close(); | ||
done(); | ||
describe('.close', function () { | ||
it('should return a promise', function (done) { | ||
var testQueue = new Queue('test'); | ||
var closePromise = testQueue.close().finally(done); | ||
expect(closePromise).to.be.a(Promise); | ||
}); | ||
}) | ||
}); | ||
@@ -108,2 +123,3 @@ it('create a queue with standard redis opts', function(done){ | ||
it('process a job', function(done){ | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
@@ -124,2 +140,3 @@ expect(job.data.foo).to.be.equal('bar') | ||
it('process a job that updates progress', function(done){ | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
@@ -146,2 +163,3 @@ expect(job.data.foo).to.be.equal('bar') | ||
it('process a job that returns data in the process handler', function(done){ | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
@@ -270,34 +288,28 @@ expect(job.data.foo).to.be.equal('bar') | ||
it('does not process a job that is being processed when a new queue starts', function(done){ | ||
var jobId; | ||
queue.add({foo: 'bar'}).then(function(job){ | ||
jobId = job.jobId; | ||
}); | ||
var err = null; | ||
var anotherQueue; | ||
queue.process(function(job, jobDone){ | ||
expect(job.data.foo).to.be.equal('bar') | ||
queue = buildQueue(); | ||
if(jobId !== job.jobId){ | ||
done(Error("Missmatch job ids")); | ||
} | ||
queue.add({foo: 'bar'}).then(function(addedJob){ | ||
queue.process(function(job, jobDone){ | ||
expect(job.data.foo).to.be.equal('bar') | ||
setTimeout(function(){ | ||
jobDone(); | ||
}, 100); | ||
}); | ||
if(addedJob.jobId !== job.jobId){ | ||
err = new Error('Processed job id does not match that of added job'); | ||
} | ||
queue.on('completed', function(job){ | ||
anotherQueue.close(); | ||
done(); | ||
}); | ||
setTimeout(jobDone, 100); | ||
}); | ||
var anotherQueue = Queue(STD_QUEUE_NAME, 6379, '127.0.0.1'); | ||
setTimeout(function(){ | ||
anotherQueue = Queue(STD_QUEUE_NAME, 6379, '127.0.0.1'); | ||
anotherQueue.process(function(job, jobDone){ | ||
if(job.jobId === jobId){ | ||
done(Error("SHOULD NOT PROCESS")); | ||
} | ||
err = new Error('The second queue should not have received a job to process'); | ||
jobDone(); | ||
}); | ||
}, 50); | ||
queue.on('completed', function(){ | ||
cleanupQueue(anotherQueue, done.bind(null, err)); | ||
}); | ||
}); | ||
}); | ||
@@ -309,6 +321,8 @@ | ||
var jobError = Error("Job Failed"); | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
expect(job.data.foo).to.be.equal('bar') | ||
jobDone(jobError); | ||
}) | ||
}); | ||
@@ -332,2 +346,5 @@ queue.add({foo: 'bar'}).then(function(job){ | ||
var jobError = new Error("Job Failed"); | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
@@ -379,2 +396,4 @@ expect(job.data.foo).to.be.equal('bar') | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
@@ -398,2 +417,4 @@ expect(job.data.num).to.be.equal(counter); | ||
queue = buildQueue(); | ||
for(var i=1; i<=maxJobs; i++){ | ||
@@ -420,2 +441,4 @@ added.push(queue.add({foo: 'bar', num: i})); | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
@@ -446,2 +469,4 @@ expect(ispaused).to.be(false); | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
@@ -513,2 +538,3 @@ expect(ispaused).to.be(false); | ||
it('should get waitting jobs', function(done){ | ||
queue = buildQueue(); | ||
Promise.join(queue.add({foo: 'bar'}), queue.add({baz: 'qux'})).then(function(){ | ||
@@ -528,2 +554,3 @@ queue.getWaiting().then(function(jobs){ | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
@@ -545,2 +572,3 @@ queue.getActive().then(function(jobs){ | ||
queue = buildQueue(); | ||
queue.add(data).then(function(job) { | ||
@@ -558,2 +586,3 @@ queue.getJob(job.jobId).then(function(returnedJob) { | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
@@ -583,2 +612,4 @@ jobDone(); | ||
queue = buildQueue(); | ||
queue.process(function(job, jobDone){ | ||
@@ -603,5 +634,3 @@ jobDone(Error("Forced error")); | ||
}); | ||
}); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
54048
16
376
1354
+ Addedbluebird@2.3.11(transitive)
- Removedbluebird@2.11.0(transitive)
Updatedbluebird@~2.3.0