Comparing version 3.0.0-rc.2 to 3.0.0-rc.3
@@ -0,1 +1,10 @@ | ||
v.3.0.0-rc.3 | ||
============ | ||
- Fixed #579. | ||
- Lazy subscription to events for better performance. | ||
- Corrected calculation of next repeat job. #563. | ||
[Changes](https://github.com/OptimalBits/bull/compare/v3.0.0-rc.2...v3.0.0-rc.3) | ||
v.3.0.0-rc.2 | ||
@@ -7,2 +16,4 @@ ============ | ||
[Changes](https://github.com/OptimalBits/bull/compare/v3.0.0-rc.1...v3.0.0-rc.2) | ||
v.3.0.0-rc.1 | ||
@@ -9,0 +20,0 @@ ============ |
@@ -52,3 +52,3 @@ /*eslint-env node */ | ||
var opts = job.opts; | ||
var jobData = job.toData(); | ||
var jobData = job.toJSON(); | ||
var toKey = _.bind(queue.toKey, queue); | ||
@@ -80,3 +80,3 @@ return scripts.addJob(queue.client, toKey, jobData, { | ||
if(!_.isEmpty(jobData)){ | ||
return Job.fromData(queue, jobData, jobId); | ||
return Job.fromJSON(queue, jobData, jobId); | ||
}else{ | ||
@@ -98,3 +98,3 @@ return null; | ||
var opts = _.extend({}, this.opts || {}); | ||
return { | ||
var json = { | ||
id: this.id, | ||
@@ -112,8 +112,4 @@ name: this.name, | ||
}; | ||
}; | ||
Job.prototype.toData = function(){ | ||
var json = this.toJSON(); | ||
var whitelist = ['data', 'opts', 'stacktrace', 'returnvalue']; | ||
var whitelist = ['data', 'opts', 'stacktrace', 'failedReason', 'returnvalue']; | ||
_.extend(json, _.mapValues(_.pick(json, whitelist), JSON.stringify)); | ||
@@ -155,3 +151,3 @@ | ||
this.returnvalue = returnValue || 0; | ||
return scripts.moveToCompleted(this, this.returnvalue, this.opts.removeOnComplete, ignoreLock); | ||
return scripts.moveToCompleted(this, JSON.stringify(this.returnvalue), this.opts.removeOnComplete, ignoreLock); | ||
}; | ||
@@ -165,2 +161,3 @@ | ||
var _this = this; | ||
this.failedReason = err.message; | ||
return new Promise(function(resolve, reject){ | ||
@@ -429,13 +426,7 @@ var command; | ||
Job.fromData = function(queue, raw, jobId){ | ||
raw = _.extend({}, raw); | ||
raw.data = JSON.parse(raw.data); | ||
raw.opts = JSON.parse(raw.opts); | ||
Job.fromJSON = function(queue, json, jobId){ | ||
json = _.extend({}, json); | ||
json.data = JSON.parse(json.data); | ||
json.opts = JSON.parse(json.opts); | ||
var job = Job.fromJSON(queue, raw, jobId); | ||
return job; | ||
}; | ||
Job.fromJSON = function(queue, json, jobId){ | ||
var job = new Job(queue, json.name || Job.DEFAULT_JOB_NAME, json.data, json.opts); | ||
@@ -467,3 +458,3 @@ | ||
} | ||
return job; | ||
@@ -470,0 +461,0 @@ }; |
219
lib/queue.js
@@ -52,3 +52,3 @@ /*eslint-env node */ | ||
*/ | ||
var MINIMUM_REDIS_VERSION = '2.8.11'; | ||
var MINIMUM_REDIS_VERSION = '2.8.18'; | ||
@@ -180,29 +180,2 @@ var MAX_TIMEOUT_MS = Math.pow(2, 31) - 1; // 32 bit signed | ||
// | ||
// Only setup listeners if .on/.addEventListener called, or process function defined. | ||
// | ||
this._setupQueueEventListeners(); | ||
this.delayedTimestamp = Number.MAX_VALUE; | ||
this.isReady().then(function(){ | ||
// TODO: These are only useful if a process function has been defined. | ||
// | ||
// Init delay timestamp. | ||
// | ||
scripts.updateDelaySet(_this, Date.now()).then(function(timestamp){ | ||
if(timestamp){ | ||
_this.updateDelayTimer(timestamp); | ||
} | ||
}); | ||
// | ||
// Create a guardian timer to revive delayTimer if necessary | ||
// This is necessary when redis connection is unstable, which can cause the pub/sub to fail | ||
// | ||
_this.guardianTimer = setGuardianTimer(_this); | ||
}); | ||
this.errorRetryTimer = {}; | ||
// Bind these methods to avoid constant rebinding and/or creating closures | ||
@@ -252,14 +225,65 @@ // in processJobs etc. | ||
var _on = Queue.prototype.on; | ||
Queue.prototype.on = function(eventName){ | ||
_on.apply(this, arguments); | ||
return this._registerEvent(eventName); | ||
}; | ||
var _once = Queue.prototype.once; | ||
Queue.prototype.once = function(eventName){ | ||
_once.apply(this, arguments); | ||
return this._registerEvent(eventName); | ||
}; | ||
Queue.prototype._init = function(name){ | ||
var _this = this; | ||
this._initializing = _this.eclient.psubscribe(_this.toKey('') + '*') | ||
.then(function(){ | ||
return commands(_this.client); | ||
_this._initializing = commands(_this.client).then(function(){ | ||
debuglog(name + ' queue ready'); | ||
}, function(err){ | ||
_this.emit('error', err, 'Error initializing queue'); | ||
throw err; | ||
}); | ||
}; | ||
Queue.prototype._initProcess = function(){ | ||
var _this = this; | ||
if(!this._initializingProcess){ | ||
var newJobs = _this.newJobsHandler = function(){ | ||
_this.emit('newjobs'); | ||
}; | ||
// | ||
// Only setup listeners if .on/.addEventListener called, or process function defined. | ||
// | ||
this.delayedTimestamp = Number.MAX_VALUE; | ||
this._initializingProcess = this.isReady().then(function(){ | ||
return Promise.join( | ||
_this._registerEvent('delayed'), | ||
_this.on('added', newJobs), | ||
_this.on('global:resumed', newJobs), | ||
_this.on('wait-finished', newJobs)); | ||
}).then(function(){ | ||
debuglog(name + ' queue ready'); | ||
}, function(err){ | ||
_this.emit('error', err, 'Error initializing queue'); | ||
throw err; | ||
// | ||
// Init delay timestamp. | ||
// | ||
return scripts.updateDelaySet(_this, Date.now()).then(function(timestamp){ | ||
if(timestamp){ | ||
_this.updateDelayTimer(timestamp); | ||
} | ||
}); | ||
}).then(function(){ | ||
// | ||
// Create a guardian timer to revive delayTimer if necessary | ||
// This is necessary when redis connection is unstable, which can cause the pub/sub to fail | ||
// | ||
_this.guardianTimer = setGuardianTimer(_this); | ||
}); | ||
this.errorRetryTimer = {}; | ||
} | ||
return this._initializingProcess; | ||
}; | ||
@@ -283,7 +307,7 @@ | ||
this.eclient.on('pmessage', function(channel, pattern, message){ | ||
var keyAndToken = pattern.split('@'); | ||
this.eclient.on('pmessage', function(pattern, channel, message){ | ||
var keyAndToken = channel.split('@'); | ||
var key = keyAndToken[0]; | ||
var token = keyAndToken[1]; | ||
var token = keyAndToken[1]; | ||
switch(key){ | ||
@@ -293,2 +317,19 @@ case activeKey: | ||
break; | ||
case addedKey: | ||
_this.emit('added', message); | ||
if(_this.token === token){ | ||
_this.emit('waiting', message, null); | ||
} | ||
// | ||
// This cannot be correct, since we would end having one global event per worker | ||
// Idea. Get rid of 'added' event and only have a 'waiting' event. | ||
token && _this.emit('global:waiting', message, null); | ||
break; | ||
} | ||
}); | ||
this.eclient.on('message', function(channel, message){ | ||
var key = channel.split('@')[0]; | ||
switch(key){ | ||
case progressKey: | ||
@@ -305,9 +346,2 @@ var jobAndProgress = message.split(':'); | ||
break; | ||
case addedKey: | ||
_this.emit('added', message); | ||
if(_this.token === token){ | ||
_this.emit('waiting', message, null); | ||
} | ||
token && _this.emit('global:waiting', message, null); | ||
break; | ||
case completedKey: | ||
@@ -327,2 +361,29 @@ var data = JSON.parse(message); | ||
Queue.prototype._registerEvent = function(eventName){ | ||
var internalEvents = ['added', 'waiting', 'delayed']; | ||
if(eventName.startsWith('global:') || internalEvents.indexOf(eventName) !== -1){ | ||
if(!this.registeredEvents){ | ||
this._setupQueueEventListeners(); | ||
this.registeredEvents = this.registeredEvents || {}; | ||
} | ||
eventName = eventName.replace('global:', ''); | ||
if(eventName === 'waiting'){ | ||
eventName = 'added'; | ||
} | ||
if(!this.registeredEvents[eventName]){ | ||
var channel = this.toKey(eventName); | ||
if(eventName === 'added' || eventName === 'active'){ | ||
return this.registeredEvents[eventName] = this.eclient.psubscribe(channel + '*'); | ||
} else { | ||
return this.registeredEvents[eventName] = this.eclient.subscribe(channel); | ||
} | ||
} | ||
} | ||
return Promise.resolve(); | ||
}; | ||
Queue.ErrorMessages = errors.Messages; | ||
@@ -343,2 +404,3 @@ | ||
Queue.prototype.disconnect = function(){ | ||
// TODO: Only quit clients that we "own". | ||
var clients = [this.client, this.eclient].filter(function(client){ | ||
@@ -354,6 +416,9 @@ return client.status === 'ready'; | ||
}); | ||
return Promise.all(clients.map(function(client){ | ||
return client.quit(); | ||
})).then(function(){ | ||
return ended; | ||
if(clients.length){ | ||
return ended; | ||
} | ||
}); | ||
@@ -370,2 +435,6 @@ }; | ||
return this.closing = this.isReady().then(function(){ | ||
if(_this._initializingProcess){ | ||
return _this._initializingProcess; | ||
} | ||
}).then(function(){ | ||
return _this._clearTimers(); | ||
@@ -377,3 +446,11 @@ }).then(function(){ | ||
}).then(function(){ | ||
_this.closed = true; | ||
if(_this.newJobsHandler){ | ||
// | ||
// remove newjobs listeners | ||
// | ||
_this.removeListener('added', _this.newJobsHandler); | ||
_this.removeListener('global:resumed', _this.newJobsHandler); | ||
_this.removeListener('wait-finished', _this.newJobsHandler); | ||
_this.closed = true; | ||
} | ||
}); | ||
@@ -423,3 +500,3 @@ }; | ||
var _this = this; | ||
return this.isReady().then(function(){ | ||
return this._initProcess().then(function(){ | ||
return _this.start(concurrency); | ||
@@ -434,3 +511,3 @@ }); | ||
function nextRepeatableJob(queue, name, data, opts){ | ||
function nextRepeatableJob(queue, name, data, opts, isRepeat){ | ||
var repeat = opts.repeat; | ||
@@ -441,4 +518,6 @@ var repeatKey = queue.toKey('repeat') + ':' + name + ':' + repeat.cron; | ||
// Get millis for this repeatable job. | ||
// Only use `millis` from the `repeatKey` when the job is a repeat, otherwise, we want | ||
// `Date.now()` to ensure we try to add the next iteration only | ||
// | ||
return queue.client.get(repeatKey).then(function(millis){ | ||
return (isRepeat ? queue.client.get(repeatKey) : Promise.resolve(Date.now())).then(function(millis){ | ||
if(millis){ | ||
@@ -459,6 +538,6 @@ return parseInt(millis); | ||
} | ||
if(nextMillis){ | ||
nextMillis = nextMillis.getTime(); | ||
var delay = nextMillis - millis; | ||
var delay = nextMillis - Date.now(); | ||
@@ -666,3 +745,3 @@ // | ||
this.delayTimer = setTimeout(function(){ | ||
var delayUpdate = function(){ | ||
scripts.updateDelaySet(_this, _this.delayedTimestamp).then(function(nextTimestamp){ | ||
@@ -679,3 +758,9 @@ if(nextTimestamp){ | ||
_this.delayedTimestamp = Number.MAX_VALUE; | ||
}, delay); | ||
}; | ||
if(delay){ | ||
this.delayTimer = setTimeout(delayUpdate, delay); | ||
} else { | ||
this.delayTimer = delayUpdate(); | ||
} | ||
} | ||
@@ -797,2 +882,3 @@ }; | ||
_this.emit('completed', job, result, 'active'); | ||
return null; | ||
}); | ||
@@ -806,2 +892,3 @@ } | ||
_this.emit('failed', job, error, 'active'); | ||
return null; | ||
}); | ||
@@ -836,2 +923,3 @@ } | ||
/** | ||
@@ -852,22 +940,11 @@ Returns a promise that resolves to the next job in queue. | ||
var newJobs = new Promise(function(_resolve){ | ||
// Needs to wrap to ignore the emitted value, or the promise will not resolve. | ||
resolve = function(){ | ||
_resolve(); | ||
}; | ||
_this.on('added', resolve); | ||
_this.on('global:resumed', resolve); | ||
_this.on('wait-finished', resolve); | ||
resolve = _resolve; | ||
_this.on('newjobs', resolve); | ||
}); | ||
var removeListeners = function(){ | ||
_this.removeListener('added', resolve); | ||
_this.removeListener('global:resumed', resolve); | ||
_this.removeListener('wait-finished', resolve); | ||
}; | ||
return scripts.moveToActive(this).spread(function(jobData, jobId){ | ||
if(jobData){ | ||
var job = Job.fromData(_this, jobData, jobId); | ||
var job = Job.fromJSON(_this, jobData, jobId); | ||
if(job.opts.repeat){ | ||
return nextRepeatableJob(_this, job.name, job.data, job.opts).then(function(){ | ||
return nextRepeatableJob(_this, job.name, job.data, job.opts, true).then(function(){ | ||
return job; | ||
@@ -881,3 +958,3 @@ }); | ||
}).finally(function(){ | ||
removeListeners(); | ||
_this.removeListener('newjobs', resolve); | ||
}); | ||
@@ -884,0 +961,0 @@ }; |
@@ -60,3 +60,3 @@ /** | ||
var keys = _.map([src, dst, 'meta-paused', 'paused'], function(name){ | ||
var keys = _.map([src, dst, 'meta-paused', pause ? 'paused' : 'resumed'], function(name){ | ||
return queue.toKey(name); | ||
@@ -111,3 +111,4 @@ }); | ||
job[propVal] = val; | ||
var json = job.toJSON(); | ||
json[propVal] = val; | ||
@@ -121,3 +122,3 @@ var args = [ | ||
shouldRemove ? '1' : '0', | ||
JSON.stringify({job: job.toJSON(), val: val}) | ||
JSON.stringify({job: json, val: val}) | ||
]; | ||
@@ -124,0 +125,0 @@ |
{ | ||
"name": "bull", | ||
"version": "3.0.0-rc.2", | ||
"version": "3.0.0-rc.3", | ||
"description": "Job manager", | ||
@@ -23,6 +23,6 @@ "main": "./lib/queue", | ||
"debuglog": "^1.0.0", | ||
"ioredis": "^3.0.0", | ||
"ioredis": "^3.1.1", | ||
"lodash": "^4.17.4", | ||
"semver": "^5.3.0", | ||
"uuid": "^3.0.1" | ||
"uuid": "^3.1.0" | ||
}, | ||
@@ -33,3 +33,3 @@ "devDependencies": { | ||
"expect.js": "^0.3.1", | ||
"mocha": "^3.3.0", | ||
"mocha": "^3.4.2", | ||
"sinon": "^1.17.7" | ||
@@ -36,0 +36,0 @@ }, |
@@ -92,5 +92,11 @@ | ||
- [matador](https://github.com/ShaneK/Matador) | ||
**Bull v3** | ||
- [Arena](https://github.com/mixmaxhq/arena) | ||
**Bull <= v2** | ||
- [Matador](https://github.com/ShaneK/Matador) | ||
- [react-bull](https://github.com/kfatehi/react-bull) | ||
- [toureiro](https://github.com/Epharmix/Toureiro) | ||
- [Toureiro](https://github.com/Epharmix/Toureiro) | ||
@@ -126,3 +132,3 @@ --- | ||
_**Requirements:** Bull requires a Redis version greater than or equal to `2.8.11`._ | ||
_**Requirements:** Bull requires a Redis version greater than or equal to `2.8.18`._ | ||
@@ -145,3 +151,3 @@ --- | ||
// job.data contains the custom data passed when the job was created | ||
// job.jobId contains id of this job. | ||
// job.id contains id of this job. | ||
@@ -300,3 +306,3 @@ // transcode video asynchronously and report progress | ||
queue.process(function(job, jobDone){ | ||
console.log("Job done by worker", cluster.worker.id, job.jobId); | ||
console.log("Job done by worker", cluster.worker.id, job.id); | ||
jobDone(); | ||
@@ -303,0 +309,0 @@ }); |
@@ -414,6 +414,2 @@ | ||
```js | ||
.on('ready', function() { | ||
// Redis is connected and the queue is ready to accept jobs. | ||
}) | ||
.on('error', function(error) { | ||
@@ -420,0 +416,0 @@ // An error occured. |
@@ -24,3 +24,5 @@ /*eslint-env node */ | ||
// error event has to be observed or the exception will bubble up | ||
}).process(function (job, jobDone) { | ||
}); | ||
queue.process(function (job, jobDone) { | ||
expect(job.data.foo).to.be.equal('bar'); | ||
@@ -27,0 +29,0 @@ jobDone(); |
@@ -16,3 +16,2 @@ /*eslint-env node */ | ||
describe('repeat', function () { | ||
var sandbox = sinon.sandbox.create(); | ||
var queue; | ||
@@ -19,0 +18,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
286315
56
5138
344
Updatedioredis@^3.1.1
Updateduuid@^3.1.0