Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 0.6.0 to 0.7.0

9

CHANGELOG.md

@@ -0,1 +1,10 @@

v0.7.0
======
- store the return value from the job handlers.
- store stacktraces.
- improvements in delayed jobs.
[Changes](https://github.com/OptimalBits/bull/compare/v0.6.0...v0.7.0)
v0.4.0

@@ -2,0 +11,0 @@ ======

145

lib/job.js

@@ -32,2 +32,3 @@ /*eslint-env node */

}
this.returnvalue = null;
this.attemptsMade = 0;

@@ -47,3 +48,3 @@ };

return Job.fromData(queue, +jobId, jobData);
} else{
}else{
return jobData;

@@ -62,3 +63,5 @@ }

attempts: this.attempts,
attemptsMade: this.attemptsMade
attemptsMade: this.attemptsMade,
stacktrace: JSON.stringify(this.stacktrace || null),
returnvalue: JSON.stringify(this.returnvalue || null)
};

@@ -136,4 +139,9 @@ };

Job.prototype.moveToCompleted = function(){
return this._moveToSet('completed');
Job.prototype.moveToCompleted = function(returnvalue){
var _this = this;
this.returnvalue = returnvalue;
return this._saveAttempt().then(function() {
// Move to completed
return _this._moveToSet('completed', returnvalue);
});
};

@@ -144,12 +152,3 @@

this.stacktrace.push(err.stack);
if(isNaN(this.attemptsMade)){
this.attemptsMade = 1;
}else{
this.attemptsMade++;
}
// Update job states
return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), {
stacktrace: JSON.stringify(this.stacktrace),
attemptsMade: this.attemptsMade
}).then(function() {
return this._saveAttempt().then(function() {
// Check if an automatic retry should be performed

@@ -176,2 +175,34 @@ if(_this.attemptsMade < _this.attempts){

Job.prototype.promote = function(){
var queue = this.queue;
var jobId = this.jobId;
var script = [
'if redis.call("ZREM", KEYS[1], ARGV[1]) == 1 then',
' redis.call("LPUSH", KEYS[2], ARGV[1])',
' return 0',
'else',
' return -1',
'end'
].join('\n');
var keys = _.map([
'delayed',
'wait'], function(name){
return queue.toKey(name);
}
);
return queue.client.evalAsync(
script,
keys.length,
keys[0],
keys[1],
jobId).then(function(result){
if(result === -1){
throw new Error('Job ' + jobId + ' is not in a delayed state');
}
});
};
Job.prototype.retry = function(){

@@ -325,9 +356,6 @@ var key = this.queue.toKey('wait');

Job.prototype._moveToSet = function(setName, delayTimestamp){
Job.prototype._moveToSet = function(set, context){
var queue = this.queue;
var jobId = this.jobId;
delayTimestamp = +delayTimestamp || 0;
delayTimestamp = delayTimestamp < 0 ? 0 : delayTimestamp;
//

@@ -340,14 +368,46 @@ // Bake in the job id first 12 bits into the timestamp

//
if(delayTimestamp > 0){
delayTimestamp = delayTimestamp * 0x1000 + (jobId & 0xfff);
if(set === 'delayed') {
context = +context || 0;
context = context < 0 ? 0 : context;
if(context > 0){
context = context * 0x1000 + (jobId & 0xfff);
}
}
// this lua script takes three keys and two arguments
// keys:
// - the expanded key for the active set
// - the expanded key for the destination set
// - the expanded key for the job
//
// arguments:
// - json serilized context which is:
// - delayedTimestamp when the destination set is 'delayed'
// - stacktrace when the destination set is 'failed'
// - returnvalue of the handler when the destination set is 'completed'
// - the id of the job
//
// it checks whether KEYS[2] the destination set ends with 'delayed', 'completed'
// or 'failed'. And then adds the context to the jobhash and adds the job to
// the destination set. Finally it removes the job from the active list.
//
// it returns either 0 for success or -1 for failure.
var script = [
'if redis.call("EXISTS", KEYS[3]) == 1 then',
' if string.find(KEYS[2], "delayed$") ~= nil then',
' local score = tonumber(ARGV[1])',
' if score ~= 0 then',
' redis.call("ZADD", KEYS[2], score, ARGV[2])',
' redis.call("PUBLISH", KEYS[2], (score / 0x1000))',
' if score ~= 0 then',
' redis.call("ZADD", KEYS[2], score, ARGV[2])',
' redis.call("PUBLISH", KEYS[2], (score / 0x1000))',
' else',
' redis.call("SADD", KEYS[2], ARGV[2])',
' end',
' elseif string.find(KEYS[2], "completed$") ~= nil then',
' redis.call("HSET", KEYS[3], "returnvalue", ARGV[1])',
' redis.call("SADD", KEYS[2], ARGV[2])',
' elseif string.find(KEYS[2], "failed$") ~= nil then',
' redis.call("HSET", KEYS[3], "stacktrace", ARGV[1])',
' redis.call("SADD", KEYS[2], ARGV[2])',
' else',
' redis.call("SADD", KEYS[2], ARGV[2])',
' return -1',
' end',

@@ -363,4 +423,5 @@ ' redis.call("LREM", KEYS[1], 0, ARGV[2])',

'active',
setName,
jobId], function(name){
set,
jobId
], function(name){
return queue.toKey(name);

@@ -376,6 +437,6 @@ }

keys[2],
delayTimestamp,
JSON.stringify(context),
jobId).then(function(result){
if(result === -1){
throw new Error('Missing Job ' + jobId + ' when trying to move from active to ' + setName);
throw new Error('Missing Job ' + jobId + ' when trying to move from active to ' + set);
}

@@ -417,3 +478,3 @@ });

if(result === -1){
throw new Error('Missing Job ' + jobId + ' when trying to add to delayed');
throw new Error('Missing Job ' + jobId + ' when trying to move from active to ' + delayTimestamp + ' or unknown destination set');
}

@@ -476,2 +537,17 @@ });

Job.prototype._saveAttempt = function(){
if(isNaN(this.attemptsMade)){
this.attemptsMade = 1;
}else{
this.attemptsMade++;
}
var params = {
attemptsMade: this.attemptsMade
};
if(this.stacktrace){
params.stacktrace = JSON.stringify(this.stacktrace);
}
return this.queue.client.hmsetAsync(this.queue.toKey(this.jobId), params);
};
/**

@@ -491,11 +567,16 @@ */

try{
_traces = JSON.stringify(data.stacktrace);
_traces = JSON.parse(data.stacktrace);
if(!(_traces instanceof Array)){
_traces = [];
}
} catch (err) {
}catch (err){
_traces = [];
}
job.stacktrace = _traces;
try{
job.returnvalue = JSON.parse(data.returnvalue);
}catch (e){
//swallow exception because the returnvalue got corrupted somehow.
}
return job;

@@ -502,0 +583,0 @@ };

@@ -27,4 +27,4 @@ /*eslint-env node */

job -> wait -> active
| ^ | \
v | | -- > failed
| ^ \
v | -- > failed
delayed

@@ -561,3 +561,3 @@ */

_this.processing--;
return job.moveToCompleted().then(function(){
return job.moveToCompleted(data).then(function(){
_this.emit('completed', job, data);

@@ -564,0 +564,0 @@ });

{
"name": "bull",
"version": "0.6.0",
"version": "0.7.0",
"description": "Job manager",

@@ -20,5 +20,5 @@ "main": "index.js",

"dependencies": {
"bluebird": "^2.9.30",
"lodash": "^3.9.3",
"node-uuid": "^1.4.3",
"bluebird": "^2.10.2",
"lodash": "^3.10.1",
"node-uuid": "^1.4.7",
"redis": "^0.12.1",

@@ -31,9 +31,9 @@ "semver": "^4.2.0"

"gulp-eslint": "^0.13.2",
"mocha": "^2.2.5",
"sinon": "^1.14.1"
"mocha": "^2.3.4",
"sinon": "^1.17.2"
},
"scripts": {
"test": "gulp && mocha test/test_* --reporter spec",
"test": "gulp && mocha test/test_* --reporter spec --timeout 5000",
"postpublish": "git push && git push --tags"
}
}

@@ -142,2 +142,9 @@ Bull Job Manager

```javascript
.on('ready', function() {
// Queue ready for job
// All Redis connections are done
})
.on('error', function(error) {
// Error
})
.on('active', function(job, jobPromise){

@@ -150,3 +157,3 @@ // Job started

})
queue.on('completed', function(job, result){
.on('completed', function(job, result){
// Job completed with output result!

@@ -387,3 +394,4 @@ })

returns {Promise} A promise that resolves when the job has been succesfully
added to the queue (or rejects if some error occured).
added to the queue (or rejects if some error occured). On success, the promise
resolves to the new Job.
```

@@ -390,0 +398,0 @@

@@ -180,6 +180,7 @@ /*eslint-env node */

}).then(function(){
return job.moveToCompleted();
return job.moveToCompleted('succeeded');
}).then(function(){
return job.isCompleted().then(function(isCompleted){
expect(isCompleted).to.be(true);
expect(job.returnvalue).to.be('succeeded');
});

@@ -262,67 +263,107 @@ });

it('get job status', function() {
var client = Promise.promisifyAll(redis.createClient());
return Job.create(queue, 100, {foo: 'baz'}).then(function(job) {
return job.isStuck().then(function(yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('stuck');
return job.moveToCompleted();
}).then(function (){
return job.isCompleted();
}).then(function (yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('completed');
return client.sremAsync(queue.toKey('completed'), job.jobId);
}).then(function(){
return job.moveToDelayed(Date.now() + 10000);
}).then(function (){
return job.isDelayed();
}).then(function (yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('delayed');
return client.zremAsync(queue.toKey('delayed'), job.jobId);
});
describe('.promote', function() {
it('can promote a delayed job to be executed immediately', function() {
return Job.create(queue, 8, {foo: 'bar'}, {delay: 1500}).then(function(job){
var delay = job.timestamp + job.delay;
return job._addToDelayed(delay).then(function() {
return job.isDelayed().then(function(isDelayed) {
expect(isDelayed).to.be(true);
}).then(function() {
return job.promote();
}).then(function() {
return job.isDelayed().then(function(isDelayed) {
expect(isDelayed).to.be(false);
return job.isWaiting().then(function(isWaiting) {
expect(isWaiting).to.be(true);
return;
});
});
});
});
});
});
it('should not promote a job that is not delayed', function() {
return Job.create(queue, 9, {foo: 'bar'}).then(function(job){
return job.isDelayed().then(function(isDelayed) {
expect(isDelayed).to.be(false);
}).then(function() {
return job.moveToFailed(new Error('test'));
}).then(function (){
return job.isFailed();
}).then(function (yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('failed');
return client.sremAsync(queue.toKey('failed'), job.jobId);
}).then(function(res) {
expect(res).to.be(1);
return job.getState();
}).then(function(state) {
expect(state).to.be('stuck');
return client.lpushAsync(queue.toKey('paused'), job.jobId);
return job.promote();
}).then(function() {
return job.isPaused();
}).then(function (yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('paused');
return client.rpopAsync(queue.toKey('paused'));
}).then(function() {
return client.lpushAsync(queue.toKey('wait'), job.jobId);
}).then(function() {
return job.isWaiting();
}).then(function (yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('waiting');
throw new Error('Job should not be promoted!');
}).catch(function(err) {
expect(err).to.be.ok();
});
});
});
});
it('get job status', function() {
var client = Promise.promisifyAll(redis.createClient());
return Job.create(queue, 100, {foo: 'baz'}).then(function(job) {
return job.isStuck().then(function(yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('stuck');
return job.moveToCompleted();
}).then(function (){
return job.isCompleted();
}).then(function (yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('completed');
return client.sremAsync(queue.toKey('completed'), job.jobId);
}).then(function(){
return job.moveToDelayed(Date.now() + 10000);
}).then(function (){
return job.isDelayed();
}).then(function (yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('delayed');
return client.zremAsync(queue.toKey('delayed'), job.jobId);
}).then(function() {
return job.moveToFailed(new Error('test'));
}).then(function (){
return job.isFailed();
}).then(function (yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('failed');
return client.sremAsync(queue.toKey('failed'), job.jobId);
}).then(function(res) {
expect(res).to.be(1);
return job.getState();
}).then(function(state) {
expect(state).to.be('stuck');
return client.lpushAsync(queue.toKey('paused'), job.jobId);
}).then(function() {
return job.isPaused();
}).then(function (yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('paused');
return client.rpopAsync(queue.toKey('paused'));
}).then(function() {
return client.lpushAsync(queue.toKey('wait'), job.jobId);
}).then(function() {
return job.isWaiting();
}).then(function (yes) {
expect(yes).to.be(true);
return job.getState();
}).then(function(state) {
expect(state).to.be('waiting');
});
});
});
});

@@ -283,2 +283,3 @@ /*eslint-env node */

expect(data).to.be.eql(37);
expect(job.returnvalue).to.be.eql(37);
done();

@@ -288,2 +289,25 @@ });

it('process a job that returns data in the process handler and the returnvalue gets stored in the database', function (done) {
queue = buildQueue();
queue.process(function (job, jobDone) {
expect(job.data.foo).to.be.equal('bar');
jobDone(null, 37);
});
queue.add({ foo: 'bar' }).then(function (job) {
expect(job.jobId).to.be.ok();
expect(job.data.foo).to.be('bar');
}).catch(done);
queue.on('completed', function (job, data) {
expect(job).to.be.ok();
expect(data).to.be.eql(37);
expect(job.returnvalue).to.be.eql(37);
queue.client.hgetAsync(queue.toKey(job.jobId), 'returnvalue').then(function (retval) {
expect(JSON.parse(retval)).to.be.eql(37);
done();
});
});
});
it('process a job that returns a promise', function (done) {

@@ -290,0 +314,0 @@ queue = buildQueue();

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc