Comparing version 0.6.0 to 0.7.0
@@ -0,1 +1,10 @@ | ||
v0.7.0 | ||
====== | ||
- store the return value from the job handlers. | ||
- store stacktraces. | ||
- improvements in delayed jobs. | ||
[Changes](https://github.com/OptimalBits/bull/compare/v0.6.0...v0.7.0) | ||
v0.4.0 | ||
@@ -2,0 +11,0 @@ ====== |
145
lib/job.js
@@ -32,2 +32,3 @@ /*eslint-env node */ | ||
} | ||
this.returnvalue = null; | ||
this.attemptsMade = 0; | ||
@@ -47,3 +48,3 @@ }; | ||
return Job.fromData(queue, +jobId, jobData); | ||
} else{ | ||
}else{ | ||
return jobData; | ||
@@ -62,3 +63,5 @@ } | ||
attempts: this.attempts, | ||
attemptsMade: this.attemptsMade | ||
attemptsMade: this.attemptsMade, | ||
stacktrace: JSON.stringify(this.stacktrace || null), | ||
returnvalue: JSON.stringify(this.returnvalue || null) | ||
}; | ||
@@ -136,4 +139,9 @@ }; | ||
Job.prototype.moveToCompleted = function(){ | ||
return this._moveToSet('completed'); | ||
Job.prototype.moveToCompleted = function(returnvalue){ | ||
var _this = this; | ||
this.returnvalue = returnvalue; | ||
return this._saveAttempt().then(function() { | ||
// Move to completed | ||
return _this._moveToSet('completed', returnvalue); | ||
}); | ||
}; | ||
@@ -144,12 +152,3 @@ | ||
this.stacktrace.push(err.stack); | ||
if(isNaN(this.attemptsMade)){ | ||
this.attemptsMade = 1; | ||
}else{ | ||
this.attemptsMade++; | ||
} | ||
// Update job states | ||
return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), { | ||
stacktrace: JSON.stringify(this.stacktrace), | ||
attemptsMade: this.attemptsMade | ||
}).then(function() { | ||
return this._saveAttempt().then(function() { | ||
// Check if an automatic retry should be performed | ||
@@ -176,2 +175,34 @@ if(_this.attemptsMade < _this.attempts){ | ||
Job.prototype.promote = function(){ | ||
var queue = this.queue; | ||
var jobId = this.jobId; | ||
var script = [ | ||
'if redis.call("ZREM", KEYS[1], ARGV[1]) == 1 then', | ||
' redis.call("LPUSH", KEYS[2], ARGV[1])', | ||
' return 0', | ||
'else', | ||
' return -1', | ||
'end' | ||
].join('\n'); | ||
var keys = _.map([ | ||
'delayed', | ||
'wait'], function(name){ | ||
return queue.toKey(name); | ||
} | ||
); | ||
return queue.client.evalAsync( | ||
script, | ||
keys.length, | ||
keys[0], | ||
keys[1], | ||
jobId).then(function(result){ | ||
if(result === -1){ | ||
throw new Error('Job ' + jobId + ' is not in a delayed state'); | ||
} | ||
}); | ||
}; | ||
Job.prototype.retry = function(){ | ||
@@ -325,9 +356,6 @@ var key = this.queue.toKey('wait'); | ||
Job.prototype._moveToSet = function(setName, delayTimestamp){ | ||
Job.prototype._moveToSet = function(set, context){ | ||
var queue = this.queue; | ||
var jobId = this.jobId; | ||
delayTimestamp = +delayTimestamp || 0; | ||
delayTimestamp = delayTimestamp < 0 ? 0 : delayTimestamp; | ||
// | ||
@@ -340,14 +368,46 @@ // Bake in the job id first 12 bits into the timestamp | ||
// | ||
if(delayTimestamp > 0){ | ||
delayTimestamp = delayTimestamp * 0x1000 + (jobId & 0xfff); | ||
if(set === 'delayed') { | ||
context = +context || 0; | ||
context = context < 0 ? 0 : context; | ||
if(context > 0){ | ||
context = context * 0x1000 + (jobId & 0xfff); | ||
} | ||
} | ||
// this lua script takes three keys and two arguments | ||
// keys: | ||
// - the expanded key for the active set | ||
// - the expanded key for the destination set | ||
// - the expanded key for the job | ||
// | ||
// arguments: | ||
// - json serilized context which is: | ||
// - delayedTimestamp when the destination set is 'delayed' | ||
// - stacktrace when the destination set is 'failed' | ||
// - returnvalue of the handler when the destination set is 'completed' | ||
// - the id of the job | ||
// | ||
// it checks whether KEYS[2] the destination set ends with 'delayed', 'completed' | ||
// or 'failed'. And then adds the context to the jobhash and adds the job to | ||
// the destination set. Finally it removes the job from the active list. | ||
// | ||
// it returns either 0 for success or -1 for failure. | ||
var script = [ | ||
'if redis.call("EXISTS", KEYS[3]) == 1 then', | ||
' if string.find(KEYS[2], "delayed$") ~= nil then', | ||
' local score = tonumber(ARGV[1])', | ||
' if score ~= 0 then', | ||
' redis.call("ZADD", KEYS[2], score, ARGV[2])', | ||
' redis.call("PUBLISH", KEYS[2], (score / 0x1000))', | ||
' if score ~= 0 then', | ||
' redis.call("ZADD", KEYS[2], score, ARGV[2])', | ||
' redis.call("PUBLISH", KEYS[2], (score / 0x1000))', | ||
' else', | ||
' redis.call("SADD", KEYS[2], ARGV[2])', | ||
' end', | ||
' elseif string.find(KEYS[2], "completed$") ~= nil then', | ||
' redis.call("HSET", KEYS[3], "returnvalue", ARGV[1])', | ||
' redis.call("SADD", KEYS[2], ARGV[2])', | ||
' elseif string.find(KEYS[2], "failed$") ~= nil then', | ||
' redis.call("HSET", KEYS[3], "stacktrace", ARGV[1])', | ||
' redis.call("SADD", KEYS[2], ARGV[2])', | ||
' else', | ||
' redis.call("SADD", KEYS[2], ARGV[2])', | ||
' return -1', | ||
' end', | ||
@@ -363,4 +423,5 @@ ' redis.call("LREM", KEYS[1], 0, ARGV[2])', | ||
'active', | ||
setName, | ||
jobId], function(name){ | ||
set, | ||
jobId | ||
], function(name){ | ||
return queue.toKey(name); | ||
@@ -376,6 +437,6 @@ } | ||
keys[2], | ||
delayTimestamp, | ||
JSON.stringify(context), | ||
jobId).then(function(result){ | ||
if(result === -1){ | ||
throw new Error('Missing Job ' + jobId + ' when trying to move from active to ' + setName); | ||
throw new Error('Missing Job ' + jobId + ' when trying to move from active to ' + set); | ||
} | ||
@@ -417,3 +478,3 @@ }); | ||
if(result === -1){ | ||
throw new Error('Missing Job ' + jobId + ' when trying to add to delayed'); | ||
throw new Error('Missing Job ' + jobId + ' when trying to move from active to ' + delayTimestamp + ' or unknown destination set'); | ||
} | ||
@@ -476,2 +537,17 @@ }); | ||
Job.prototype._saveAttempt = function(){ | ||
if(isNaN(this.attemptsMade)){ | ||
this.attemptsMade = 1; | ||
}else{ | ||
this.attemptsMade++; | ||
} | ||
var params = { | ||
attemptsMade: this.attemptsMade | ||
}; | ||
if(this.stacktrace){ | ||
params.stacktrace = JSON.stringify(this.stacktrace); | ||
} | ||
return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), params); | ||
}; | ||
/** | ||
@@ -491,11 +567,16 @@ */ | ||
try{ | ||
_traces = JSON.stringify(data.stacktrace); | ||
_traces = JSON.parse(data.stacktrace); | ||
if(!(_traces instanceof Array)){ | ||
_traces = []; | ||
} | ||
} catch (err) { | ||
}catch (err){ | ||
_traces = []; | ||
} | ||
job.stacktrace = _traces; | ||
try{ | ||
job.returnvalue = JSON.parse(data.returnvalue); | ||
}catch (e){ | ||
//swallow exception because the returnvalue got corrupted somehow. | ||
} | ||
return job; | ||
@@ -502,0 +583,0 @@ }; |
@@ -27,4 +27,4 @@ /*eslint-env node */ | ||
job -> wait -> active | ||
| ^ | \ | ||
v | | -- > failed | ||
| ^ \ | ||
v | -- > failed | ||
delayed | ||
@@ -561,3 +561,3 @@ */ | ||
_this.processing--; | ||
return job.moveToCompleted().then(function(){ | ||
return job.moveToCompleted(data).then(function(){ | ||
_this.emit('completed', job, data); | ||
@@ -564,0 +564,0 @@ }); |
{ | ||
"name": "bull", | ||
"version": "0.6.0", | ||
"version": "0.7.0", | ||
"description": "Job manager", | ||
@@ -20,5 +20,5 @@ "main": "index.js", | ||
"dependencies": { | ||
"bluebird": "^2.9.30", | ||
"lodash": "^3.9.3", | ||
"node-uuid": "^1.4.3", | ||
"bluebird": "^2.10.2", | ||
"lodash": "^3.10.1", | ||
"node-uuid": "^1.4.7", | ||
"redis": "^0.12.1", | ||
@@ -31,9 +31,9 @@ "semver": "^4.2.0" | ||
"gulp-eslint": "^0.13.2", | ||
"mocha": "^2.2.5", | ||
"sinon": "^1.14.1" | ||
"mocha": "^2.3.4", | ||
"sinon": "^1.17.2" | ||
}, | ||
"scripts": { | ||
"test": "gulp && mocha test/test_* --reporter spec", | ||
"test": "gulp && mocha test/test_* --reporter spec --timeout 5000", | ||
"postpublish": "git push && git push --tags" | ||
} | ||
} |
@@ -142,2 +142,9 @@ Bull Job Manager | ||
```javascript | ||
.on('ready', function() { | ||
// Queue ready for job | ||
// All Redis connections are done | ||
}) | ||
.on('error', function(error) { | ||
// Error | ||
}) | ||
.on('active', function(job, jobPromise){ | ||
@@ -150,3 +157,3 @@ // Job started | ||
}) | ||
queue.on('completed', function(job, result){ | ||
.on('completed', function(job, result){ | ||
// Job completed with output result! | ||
@@ -387,3 +394,4 @@ }) | ||
returns {Promise} A promise that resolves when the job has been succesfully | ||
added to the queue (or rejects if some error occured). | ||
added to the queue (or rejects if some error occured). On success, the promise | ||
resolves to the new Job. | ||
``` | ||
@@ -390,0 +398,0 @@ |
@@ -180,6 +180,7 @@ /*eslint-env node */ | ||
}).then(function(){ | ||
return job.moveToCompleted(); | ||
return job.moveToCompleted('succeeded'); | ||
}).then(function(){ | ||
return job.isCompleted().then(function(isCompleted){ | ||
expect(isCompleted).to.be(true); | ||
expect(job.returnvalue).to.be('succeeded'); | ||
}); | ||
@@ -262,67 +263,107 @@ }); | ||
it('get job status', function() { | ||
var client = Promise.promisifyAll(redis.createClient()); | ||
return Job.create(queue, 100, {foo: 'baz'}).then(function(job) { | ||
return job.isStuck().then(function(yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('stuck'); | ||
return job.moveToCompleted(); | ||
}).then(function (){ | ||
return job.isCompleted(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('completed'); | ||
return client.sremAsync(queue.toKey('completed'), job.jobId); | ||
}).then(function(){ | ||
return job.moveToDelayed(Date.now() + 10000); | ||
}).then(function (){ | ||
return job.isDelayed(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('delayed'); | ||
return client.zremAsync(queue.toKey('delayed'), job.jobId); | ||
}); | ||
describe('.promote', function() { | ||
it('can promote a delayed job to be executed immediately', function() { | ||
return Job.create(queue, 8, {foo: 'bar'}, {delay: 1500}).then(function(job){ | ||
var delay = job.timestamp + job.delay; | ||
return job._addToDelayed(delay).then(function() { | ||
return job.isDelayed().then(function(isDelayed) { | ||
expect(isDelayed).to.be(true); | ||
}).then(function() { | ||
return job.promote(); | ||
}).then(function() { | ||
return job.isDelayed().then(function(isDelayed) { | ||
expect(isDelayed).to.be(false); | ||
return job.isWaiting().then(function(isWaiting) { | ||
expect(isWaiting).to.be(true); | ||
return; | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('should not promote a job that is not delayed', function() { | ||
return Job.create(queue, 9, {foo: 'bar'}).then(function(job){ | ||
return job.isDelayed().then(function(isDelayed) { | ||
expect(isDelayed).to.be(false); | ||
}).then(function() { | ||
return job.moveToFailed(new Error('test')); | ||
}).then(function (){ | ||
return job.isFailed(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('failed'); | ||
return client.sremAsync(queue.toKey('failed'), job.jobId); | ||
}).then(function(res) { | ||
expect(res).to.be(1); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('stuck'); | ||
return client.lpushAsync(queue.toKey('paused'), job.jobId); | ||
return job.promote(); | ||
}).then(function() { | ||
return job.isPaused(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('paused'); | ||
return client.rpopAsync(queue.toKey('paused')); | ||
}).then(function() { | ||
return client.lpushAsync(queue.toKey('wait'), job.jobId); | ||
}).then(function() { | ||
return job.isWaiting(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('waiting'); | ||
throw new Error('Job should not be promoted!'); | ||
}).catch(function(err) { | ||
expect(err).to.be.ok(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
it('get job status', function() { | ||
var client = Promise.promisifyAll(redis.createClient()); | ||
return Job.create(queue, 100, {foo: 'baz'}).then(function(job) { | ||
return job.isStuck().then(function(yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('stuck'); | ||
return job.moveToCompleted(); | ||
}).then(function (){ | ||
return job.isCompleted(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('completed'); | ||
return client.sremAsync(queue.toKey('completed'), job.jobId); | ||
}).then(function(){ | ||
return job.moveToDelayed(Date.now() + 10000); | ||
}).then(function (){ | ||
return job.isDelayed(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('delayed'); | ||
return client.zremAsync(queue.toKey('delayed'), job.jobId); | ||
}).then(function() { | ||
return job.moveToFailed(new Error('test')); | ||
}).then(function (){ | ||
return job.isFailed(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('failed'); | ||
return client.sremAsync(queue.toKey('failed'), job.jobId); | ||
}).then(function(res) { | ||
expect(res).to.be(1); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('stuck'); | ||
return client.lpushAsync(queue.toKey('paused'), job.jobId); | ||
}).then(function() { | ||
return job.isPaused(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('paused'); | ||
return client.rpopAsync(queue.toKey('paused')); | ||
}).then(function() { | ||
return client.lpushAsync(queue.toKey('wait'), job.jobId); | ||
}).then(function() { | ||
return job.isWaiting(); | ||
}).then(function (yes) { | ||
expect(yes).to.be(true); | ||
return job.getState(); | ||
}).then(function(state) { | ||
expect(state).to.be('waiting'); | ||
}); | ||
}); | ||
}); | ||
}); |
@@ -283,2 +283,3 @@ /*eslint-env node */ | ||
expect(data).to.be.eql(37); | ||
expect(job.returnvalue).to.be.eql(37); | ||
done(); | ||
@@ -288,2 +289,25 @@ }); | ||
it('process a job that returns data in the process handler and the returnvalue gets stored in the database', function (done) { | ||
queue = buildQueue(); | ||
queue.process(function (job, jobDone) { | ||
expect(job.data.foo).to.be.equal('bar'); | ||
jobDone(null, 37); | ||
}); | ||
queue.add({ foo: 'bar' }).then(function (job) { | ||
expect(job.jobId).to.be.ok(); | ||
expect(job.data.foo).to.be('bar'); | ||
}).catch(done); | ||
queue.on('completed', function (job, data) { | ||
expect(job).to.be.ok(); | ||
expect(data).to.be.eql(37); | ||
expect(job.returnvalue).to.be.eql(37); | ||
queue.client.hgetAsync(queue.toKey(job.jobId), 'returnvalue').then(function (retval) { | ||
expect(JSON.parse(retval)).to.be.eql(37); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
it('process a job that returns a promise', function (done) { | ||
@@ -290,0 +314,0 @@ queue = buildQueue(); |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
152663
4044
667
Updatedbluebird@^2.10.2
Updatedlodash@^3.10.1
Updatednode-uuid@^1.4.7