Comparing version 3.0.0-rc.4 to 3.0.0-rc.5
@@ -0,1 +1,10 @@ | ||
v.3.0.0-rc.5 | ||
============ | ||
- Improved performance, specially when having many concurrent workers. | ||
- Fixed #609 using zsets for storing repeatable jobs. | ||
- Fixed #608 Event chaining no longer works. | ||
- Improved getters. | ||
- Fixed #601 Add multiple repeatable jobs with the same cron pattern | ||
v.3.0.0-rc.4 | ||
@@ -149,6 +158,6 @@ ============ | ||
- Fixed #397, Error: Unable to renew nonexisting lock | ||
- Fixed #402, Job.prototype.finished contains error in promise | ||
- Fixed #402, Job.prototype.finished contains error in promise | ||
- Fixed #371, "Unexpected token u in JSON at position 0" while processing job | ||
- New feature #363, "method to permanently fail a job" | ||
- Fix job.progress() to return the correct progress | ||
- Fix job.progress() to return the correct progress | ||
@@ -155,0 +164,0 @@ [Changes](https://github.com/OptimalBits/bull/compare/v2.0.0...v2.1.0) |
@@ -0,5 +1,7 @@ | ||
'use strict'; | ||
module.exports.Messages = { | ||
RETRY_JOB_NOT_EXIST: 'Couldn\'t retry job: The job doesn\'t exist', | ||
RETRY_JOB_IS_LOCKED: 'Couldn\'t retry job: The job is locked', | ||
RETRY_JOB_NOT_FAILED: 'Couldn\'t retry job: The job has been already retried or has not failed' | ||
RETRY_JOB_NOT_FAILED: 'Couldn\'t retry job: The job has been already retried or has not failed' | ||
}; |
129
lib/job.js
@@ -6,2 +6,3 @@ /*eslint-env node */ | ||
var _ = require('lodash'); | ||
var utils = require('./utils'); | ||
var scripts = require('./scripts'); | ||
@@ -30,10 +31,5 @@ var debuglog = require('debuglog')('bull'); | ||
this.opts = _.defaults(opts, { | ||
attempts: 1, | ||
delay: 0, | ||
timestamp: Date.now() | ||
}); | ||
// defaults | ||
this.opts = setDefaultOpts(opts); | ||
this.opts.attempts = parseInt(this.opts.attempts); | ||
this.name = name; | ||
@@ -48,4 +44,18 @@ this.queue = queue; | ||
this.attemptsMade = 0; | ||
this.toKey = _.bind(queue.toKey, queue); | ||
}; | ||
function setDefaultOpts(opts){ | ||
var _opts = Object.assign({}, opts); | ||
_opts.attempts = typeof _opts.attempts == 'undefined' ? 1 : _opts.attempts; | ||
_opts.delay = typeof _opts.delay == 'undefined' ? 0 : _opts.delay; | ||
_opts.timestamp = typeof _opts.timestamp == 'undefined' ? Date.now() : _opts.timestamp; | ||
_opts.attempts = parseInt(_opts.attempts); | ||
return _opts; | ||
} | ||
Job.DEFAULT_JOB_NAME = '__default__'; | ||
@@ -55,8 +65,8 @@ | ||
var opts = job.opts; | ||
var jobData = job.toData(); | ||
var toKey = _.bind(queue.toKey, queue); | ||
return scripts.addJob(queue.client, toKey, jobData, { | ||
return scripts.addJob(queue.client, queue, jobData, { | ||
lifo: opts.lifo, | ||
customJobId: opts.jobId, | ||
priority: opts.priority | ||
priority: opts.priority | ||
}, queue.token); | ||
@@ -67,8 +77,10 @@ } | ||
var job = new Job(queue, name, data, opts); | ||
return queue.isReady().then(function(){ | ||
return addJob(queue, job).then(function(jobId){ | ||
job.id = jobId; | ||
debuglog('Job added', jobId); | ||
return job; | ||
}); | ||
return addJob(queue, job); | ||
}).then(function(jobId){ | ||
job.id = jobId; | ||
job.lockKey = job.toKey(jobId) + ':lock'; | ||
debuglog('Job added', jobId); | ||
return job; | ||
}); | ||
@@ -83,7 +95,3 @@ }; | ||
return queue.client.hgetall(queue.toKey(jobId)).then(function(jobData){ | ||
if(!_.isEmpty(jobData)){ | ||
return Job.fromJSON(queue, jobData, jobId); | ||
}else{ | ||
return null; | ||
} | ||
return utils.isEmpty(jobData) ? null : Job.fromJSON(queue, jobData, jobId); | ||
}); | ||
@@ -101,3 +109,3 @@ }; | ||
Job.prototype.toJSON = function(){ | ||
var opts = _.extend({}, this.opts || {}); | ||
var opts = Object.assign({}, this.opts); | ||
return { | ||
@@ -121,6 +129,10 @@ id: this.id, | ||
Job.prototype.toData = function(){ | ||
var whitelist = ['data', 'opts', 'stacktrace', 'failedReason', 'returnvalue']; | ||
var json = this.toJSON(); | ||
_.extend(json, _.mapValues(_.pick(json, whitelist), JSON.stringify)); | ||
json.data = JSON.stringify(json.data); | ||
json.opts = JSON.stringify(json.opts); | ||
json.stacktrace = JSON.stringify(json.opts); | ||
json.failedReason = JSON.stringify(json.failedReason); | ||
json.returnvalue = JSON.stringify(json.returnvalue); | ||
return json; | ||
@@ -133,3 +145,3 @@ }; | ||
Job.prototype.lockKey = function(){ | ||
return this.queue.toKey(this.id) + ':lock'; | ||
return this.toKey(this.id) + ':lock'; | ||
}; | ||
@@ -161,3 +173,10 @@ | ||
this.returnvalue = returnValue || 0; | ||
return scripts.moveToCompleted(this, JSON.stringify(this.returnvalue), this.opts.removeOnComplete, ignoreLock); | ||
returnValue = utils.tryCatch(JSON.stringify, JSON, [returnValue]); | ||
if(returnValue === utils.errorObject) { | ||
var err = utils.errorObject.value; | ||
return Promise.reject(err); | ||
} | ||
return scripts.moveToCompleted(this, returnValue, this.opts.removeOnComplete, ignoreLock); | ||
}; | ||
@@ -340,10 +359,10 @@ | ||
} else { | ||
return JSON.parse(job.returnvalue); | ||
return job.returnvalue && JSON.parse(job.returnvalue); | ||
} | ||
}); | ||
}); | ||
}else{ | ||
return new Promise(function(resolve, reject){ | ||
var interval; | ||
function onCompleted(job, resultValue){ | ||
if(String(job.id) === String(_this.id)){ | ||
function onCompleted(jobId, resultValue){ | ||
if(String(jobId) === String(_this.id)){ | ||
var result = undefined; | ||
@@ -361,4 +380,4 @@ try{ | ||
function onFailed(job, failedReason){ | ||
if(String(job.id) === String(_this.id)){ | ||
function onFailed(jobId, failedReason){ | ||
if(String(jobId) === String(_this.id)){ | ||
reject(new Error(failedReason)); | ||
@@ -443,7 +462,6 @@ removeListeners(); | ||
Job.fromJSON = function(queue, json, jobId){ | ||
json = _.extend({}, json); | ||
json.data = JSON.parse(json.data); | ||
json.opts = JSON.parse(json.opts); | ||
var data = JSON.parse(json.data || '{}'); | ||
var opts = JSON.parse(json.opts || '{}'); | ||
var job = new Job(queue, json.name || Job.DEFAULT_JOB_NAME, json.data, json.opts); | ||
var job = new Job(queue, json.name || Job.DEFAULT_JOB_NAME, data, opts); | ||
@@ -457,3 +475,3 @@ job.id = json.id || jobId; | ||
} | ||
if(json.processedOn){ | ||
@@ -465,23 +483,30 @@ job.processedOn = parseInt(json.processedOn); | ||
job.attemptsMade = parseInt(json.attemptsMade || 0); | ||
job.stacktrace = getTraces(json.stacktrace); | ||
job.returnvalue = getReturnValue(json.returnvalue); | ||
return job; | ||
}; | ||
function getTraces(stacktrace){ | ||
var _traces; | ||
try{ | ||
_traces = JSON.parse(json.stacktrace); | ||
if(!(_traces instanceof Array)){ | ||
_traces = []; | ||
} | ||
}catch (err){ | ||
_traces = []; | ||
_traces = utils.tryCatch(JSON.parse, JSON, [stacktrace]); | ||
if(_traces === utils.errorObject || !(_traces instanceof Array)) { | ||
return []; | ||
} else { | ||
return _traces; | ||
} | ||
} | ||
job.stacktrace = _traces; | ||
try{ | ||
job.returnvalue = JSON.parse(json.returnvalue); | ||
}catch (e){ | ||
//swallow exception because the returnvalue got corrupted somehow. | ||
debuglog('corrupted returnvalue: ' + json.returnvalue, e); | ||
function getReturnValue(_value){ | ||
var value = utils.tryCatch(JSON.parse, JSON, [_value]); | ||
if(value !== utils.errorObject){ | ||
return value; | ||
}else{ | ||
debuglog('corrupted returnvalue: ' + _value, value); | ||
} | ||
} | ||
return job; | ||
}; | ||
module.exports = Job; |
421
lib/queue.js
@@ -10,2 +10,3 @@ /*eslint-env node */ | ||
var util = require('util'); | ||
var utils = require('./utils'); | ||
var url = require('url'); | ||
@@ -23,3 +24,2 @@ var Job = require('./job'); | ||
var commands = require('./commands/'); | ||
var worker = require('./worker'); | ||
@@ -120,3 +120,4 @@ /** | ||
var lazyClient = redisClientGetter(opts, function (type, client) { | ||
this.clients = []; | ||
var lazyClient = redisClientGetter(this, opts, function (type, client) { | ||
// bubble up Redis error events | ||
@@ -147,2 +148,5 @@ client.on('error', _this.emit.bind(_this, 'error')); | ||
get: lazyClient('subscriber') | ||
}, | ||
bclient: { | ||
get: lazyClient('bclient') | ||
} | ||
@@ -156,2 +160,4 @@ }); | ||
} | ||
}).catch(function(err){ | ||
// Ignore this error. | ||
}); | ||
@@ -171,3 +177,3 @@ } | ||
retryProcessDelay: 5000, | ||
drainDelay: 2000 | ||
drainDelay: 2 | ||
}); | ||
@@ -191,5 +197,22 @@ | ||
this.getJobFromId = Job.fromId.bind(null, this); | ||
var keys = {}; | ||
_.each([ | ||
'', | ||
'wait', | ||
'paused', | ||
'meta-paused', | ||
'active', | ||
'id', | ||
'delayed', | ||
'priority', | ||
'unlocked-check', | ||
'completed', | ||
'failed'], function(key){ | ||
keys[key] = _this.toKey(key); | ||
}); | ||
this.keys = keys; | ||
}; | ||
function redisClientGetter(options, initCallback) { | ||
function redisClientGetter(queue, options, initCallback) { | ||
var createClient = _.isFunction(options.createClient) | ||
@@ -205,2 +228,3 @@ ? options.createClient | ||
var client = connections[type] = createClient(type, options.redis); | ||
queue.clients.push(client); | ||
return initCallback(type, client), client; | ||
@@ -228,4 +252,5 @@ }; | ||
return setInterval(function() { | ||
if(queue.delayedTimestamp < Date.now() || queue.delayedTimestamp - Date.now() > queue.settings.guardInterval){ | ||
scripts.updateDelaySet(queue, Date.now()).then(function(timestamp){ | ||
var now = Date.now(); | ||
if(queue.delayedTimestamp < now || queue.delayedTimestamp - now > queue.settings.guardInterval){ | ||
scripts.updateDelaySet(queue, now).then(function(timestamp){ | ||
if(timestamp){ | ||
@@ -238,6 +263,2 @@ queue.updateDelayTimer(timestamp); | ||
} | ||
// | ||
// Trigger a getNextJob (if worker is idling) | ||
// | ||
queue.emit('added'); | ||
}, queue.settings.guardInterval); | ||
@@ -248,4 +269,10 @@ } | ||
_.extend(Queue.prototype, worker); | ||
// | ||
// Extend Queue with "aspects" | ||
// | ||
require('./getters')(Queue); | ||
require('./worker')(Queue); | ||
require('./repeatable')(Queue); | ||
// -- | ||
Queue.prototype.off = Queue.prototype.removeListener; | ||
@@ -256,4 +283,4 @@ | ||
Queue.prototype.on = function(eventName){ | ||
_on.apply(this, arguments); | ||
return this._registerEvent(eventName); | ||
this._registerEvent(eventName); | ||
return _on.apply(this, arguments); | ||
}; | ||
@@ -264,4 +291,4 @@ | ||
Queue.prototype.once = function(eventName){ | ||
_once.apply(this, arguments); | ||
return this._registerEvent(eventName); | ||
this._registerEvent(eventName); | ||
return _once.apply(this, arguments); | ||
}; | ||
@@ -272,6 +299,2 @@ | ||
if(!this._initializingProcess){ | ||
var newJobs = _this.newJobsHandler = function(){ | ||
_this.emit('newjobs'); | ||
}; | ||
// | ||
@@ -282,7 +305,3 @@ // Only setup listeners if .on/.addEventListener called, or process function defined. | ||
this._initializingProcess = this.isReady().then(function(){ | ||
return Promise.join( | ||
_this._registerEvent('delayed'), | ||
_this.on('added', newJobs), | ||
_this.on('global:resumed', newJobs), | ||
_this.on('wait-finished', newJobs)); | ||
return _this._registerEvent('delayed'); | ||
}).then(function(){ | ||
@@ -323,3 +342,3 @@ // | ||
var resumedKey = _this.toKey('resumed'); | ||
var addedKey = _this.toKey('added'); | ||
var waitingKey = _this.toKey('waiting'); | ||
var completedKey = _this.toKey('completed'); | ||
@@ -332,3 +351,2 @@ var failedKey = _this.toKey('failed'); | ||
var token = keyAndToken[1]; | ||
switch(key){ | ||
@@ -338,10 +356,6 @@ case activeKey: | ||
break; | ||
case addedKey: | ||
_this.emit('added', message); | ||
case waitingKey: | ||
if(_this.token === token){ | ||
_this.emit('waiting', message, null); | ||
} | ||
// | ||
// This cannot be correct, since we would end having one global event per worker | ||
// Idea. Get rid of 'added' event and only have a 'waiting' event. | ||
token && _this.emit('global:waiting', message, null); | ||
@@ -354,3 +368,2 @@ break; | ||
var key = channel.split('@')[0]; | ||
switch(key){ | ||
@@ -370,9 +383,7 @@ case progressKey: | ||
var data = JSON.parse(message); | ||
var job = Job.fromJSON(_this, data.job); | ||
_this.emit('global:completed', job, data.val, 'active'); | ||
_this.emit('global:completed', data.jobId, data.val, 'active'); | ||
break; | ||
case failedKey: | ||
var data = JSON.parse(message); | ||
var job = Job.fromJSON(_this, data.job); | ||
_this.emit('global:failed', job, data.val, 'active'); | ||
_this.emit('global:failed', data.jobId, data.val, 'active'); | ||
break; | ||
@@ -384,3 +395,3 @@ } | ||
Queue.prototype._registerEvent = function(eventName){ | ||
var internalEvents = ['added', 'waiting', 'delayed']; | ||
var internalEvents = ['waiting', 'delayed']; | ||
@@ -393,15 +404,19 @@ if(eventName.startsWith('global:') || internalEvents.indexOf(eventName) !== -1){ | ||
eventName = eventName.replace('global:', ''); | ||
var _eventName = eventName.replace('global:', ''); | ||
if(eventName === 'waiting'){ | ||
eventName = 'added'; | ||
} | ||
if(!this.registeredEvents[eventName]){ | ||
var channel = this.toKey(eventName); | ||
if(eventName === 'added' || eventName === 'active'){ | ||
return this.registeredEvents[eventName] = this.eclient.psubscribe(channel + '*'); | ||
if(!this.registeredEvents[_eventName]){ | ||
var registering; | ||
var _this = this; | ||
var channel = this.toKey(_eventName); | ||
if(_eventName === 'active' || _eventName === 'waiting'){ | ||
registering = this.registeredEvents[_eventName] = this.eclient.psubscribe(channel + '*'); | ||
} else { | ||
return this.registeredEvents[eventName] = this.eclient.subscribe(channel); | ||
registering = this.registeredEvents[_eventName] = this.eclient.subscribe(channel); | ||
} | ||
registering.then(function(){ | ||
_this.emit('registered:' + eventName); | ||
}); | ||
return registering; | ||
}else{ | ||
return this.registeredEvents[_eventName]; | ||
} | ||
@@ -428,4 +443,4 @@ } | ||
// TODO: Only quit clients that we "own". | ||
var clients = [this.client, this.eclient].filter(function(client){ | ||
return client.status === 'ready' || client.status === 'connecting'; | ||
var clients = this.clients.filter(function(client){ | ||
return client.status !== 'end'; | ||
}); | ||
@@ -444,3 +459,9 @@ | ||
// In any case, no further cmds will be accepted by this client. | ||
client.quit(); | ||
return client.quit().catch(function(err){ | ||
if(err.message !== 'Connection is closed.'){ | ||
throw err; | ||
} | ||
}).timeout(500).catch(function(){ | ||
client.disconnect(); | ||
}); | ||
})).then(function(){ | ||
@@ -471,11 +492,3 @@ if(clients.length){ | ||
}).finally(function(){ | ||
if(_this.newJobsHandler){ | ||
// | ||
// remove newjobs listeners | ||
// | ||
_this.removeListener('added', _this.newJobsHandler); | ||
_this.removeListener('global:resumed', _this.newJobsHandler); | ||
_this.removeListener('wait-finished', _this.newJobsHandler); | ||
_this.closed = true; | ||
} | ||
_this.closed = true; | ||
}); | ||
@@ -530,58 +543,6 @@ }; | ||
// | ||
// This code will be called everytime a job is going to be processed if the job has a repeat option. (from delay -> active). | ||
// | ||
var parser = require('cron-parser'); | ||
function nextRepeatableJob(queue, name, data, opts, isRepeat){ | ||
var repeat = opts.repeat; | ||
var repeatKey = queue.toKey('repeat') + ':' + name + ':' + repeat.cron; | ||
// | ||
// Get millis for this repeatable job. | ||
// Only use `millis` from the `repeatKey` when the job is a repeat, otherwise, we want | ||
// `Date.now()` to ensure we try to add the next iteration only | ||
// | ||
return (isRepeat ? queue.client.get(repeatKey) : Promise.resolve(Date.now())).then(function(millis){ | ||
if(millis){ | ||
return parseInt(millis); | ||
}else{ | ||
return Date.now(); | ||
} | ||
}).then(function(millis){ | ||
var interval = parser.parseExpression(repeat.cron, _.defaults({ | ||
currentDate: new Date(millis) | ||
}, repeat)); | ||
var nextMillis; | ||
try{ | ||
nextMillis = interval.next(); | ||
} catch(e){ | ||
// Ignore error | ||
} | ||
if(nextMillis){ | ||
nextMillis = nextMillis.getTime(); | ||
var delay = nextMillis - Date.now(); | ||
// | ||
// Generate unique job id for this iteration. | ||
// | ||
var customId = 'repeat:' + name + ':' + nextMillis; | ||
// | ||
// Set key and add job should be atomic. | ||
// | ||
return queue.client.set(repeatKey, nextMillis).then(function(){ | ||
return Job.create(queue, name, data, _.extend(_.clone(opts), { | ||
jobId: customId, | ||
delay: delay < 0 ? 0 : delay, | ||
timestamp: Date.now() | ||
})); | ||
}); | ||
} | ||
}); | ||
}; | ||
Queue.prototype.start = function(concurrency){ | ||
var _this = this; | ||
return this.run(concurrency).catch(function(err){ | ||
@@ -593,2 +554,3 @@ _this.emit('error', err, 'error running queue'); | ||
Queue.prototype.setHandler = function(name, handler){ | ||
@@ -629,4 +591,12 @@ if(this.handlers[name]) { | ||
Queue.prototype.add = function(name, data, opts){ | ||
if(typeof name !== 'string'){ | ||
opts = data; | ||
data = name; | ||
name = Job.DEFAULT_JOB_NAME; | ||
} | ||
if(opts && opts.repeat){ | ||
return nextRepeatableJob(this, name || Job.DEFAULT_JOB_NAME, data, opts); | ||
var _this = this; | ||
return this.isReady().then(function(){ | ||
return _this.nextRepeatableJob(name, data, opts); | ||
}); | ||
}else{ | ||
@@ -638,16 +608,2 @@ return Job.create(this, name, data, opts); | ||
/** | ||
Returns the number of jobs waiting to be processed. | ||
*/ | ||
Queue.prototype.count = function(){ | ||
var multi = this.multi(); | ||
multi.llen(this.toKey('wait')); | ||
multi.llen(this.toKey('paused')); | ||
multi.zcard(this.toKey('delayed')); | ||
return multi.exec().then(function(res){ | ||
return Math.max(res[0][1], res[1][1]) + res[2][1]; | ||
}); | ||
}; | ||
/** | ||
Empties the queue. | ||
@@ -764,8 +720,9 @@ | ||
var _this = this; | ||
var now = Date.now(); | ||
newDelayedTimestamp = Math.round(newDelayedTimestamp); | ||
if(newDelayedTimestamp < _this.delayedTimestamp && newDelayedTimestamp < (MAX_TIMEOUT_MS + Date.now())){ | ||
if(newDelayedTimestamp < _this.delayedTimestamp && newDelayedTimestamp < (MAX_TIMEOUT_MS + now)){ | ||
clearTimeout(this.delayTimer); | ||
this.delayedTimestamp = newDelayedTimestamp; | ||
var nextDelayedJob = newDelayedTimestamp - Date.now(); | ||
var nextDelayedJob = newDelayedTimestamp - now; | ||
var delay = nextDelayedJob <= 0 ? 0 : nextDelayedJob; | ||
@@ -776,3 +733,3 @@ | ||
if(nextTimestamp){ | ||
nextTimestamp = nextTimestamp < Date.now() ? Date.now() : nextTimestamp; | ||
nextTimestamp = nextTimestamp < now ? now : nextTimestamp; | ||
}else{ | ||
@@ -901,8 +858,2 @@ nextTimestamp = Number.MAX_VALUE; | ||
function handleCompleted(result){ | ||
try{ | ||
JSON.stringify(result); | ||
}catch(err){ | ||
return handleFailed(err); | ||
} | ||
return job.moveToCompleted(result).then(function(){ | ||
@@ -925,5 +876,7 @@ _this.emit('completed', job, result, 'active'); | ||
var handler = _this.handlers[job.name]; | ||
if(!handler){ | ||
return handleFailed(Error('Missing process handler for job type ' + job.name)); | ||
}else{ | ||
var jobPromise = handler(job); | ||
@@ -940,3 +893,3 @@ | ||
return jobPromise.then(handleCompleted, handleFailed).finally(function(){ | ||
return jobPromise.then(handleCompleted).catch(handleFailed).finally(function(){ | ||
stopTimer(); | ||
@@ -963,170 +916,28 @@ }); | ||
// | ||
// Listen for new jobs, during moveToActive or after. | ||
// Waiting for new jobs to arrive | ||
// | ||
var _resolve; | ||
var newJobs = new Promise(function(resolve){ | ||
_resolve = resolve; | ||
_this.once('newjobs', _resolve); | ||
}); | ||
return scripts.moveToActive(this).spread(function(jobData, jobId){ | ||
if(jobData){ | ||
var job = Job.fromJSON(_this, jobData, jobId); | ||
if(job.opts.repeat){ | ||
return nextRepeatableJob(_this, job.name, job.data, job.opts, true).then(function(){ | ||
return job; | ||
}); | ||
} | ||
return job; | ||
}else{ | ||
return newJobs | ||
.timeout(_this.settings.drainDelay) | ||
.catch(Promise.TimeoutError, function(){ | ||
_this.emit('drained'); | ||
return newJobs; | ||
}); | ||
return this.bclient.blpop(this.toKey('wait:added'), _this.settings.drainDelay).then(function(result){ | ||
if(!result){ | ||
_this.emit('drained'); | ||
return; | ||
} | ||
}) | ||
.finally(function(){ | ||
_this.removeListener('newjobs', _resolve); | ||
}); | ||
}; | ||
Queue.prototype.getJob = function(jobId){ | ||
return Job.fromId(this, jobId); | ||
}; | ||
// Job counts by type | ||
// Queue#getJobCountByTypes('completed') => completed count | ||
// Queue#getJobCountByTypes('completed,failed') => completed + failed count | ||
// Queue#getJobCountByTypes('completed', 'failed') => completed + failed count | ||
// Queue#getJobCountByTypes('completed,waiting', 'failed') => completed + waiting + failed count | ||
Queue.prototype.getJobCountByTypes = function() { | ||
var _this = this; | ||
var args = _.compact(Array.prototype.slice.call(arguments)); | ||
var types = _.compact(args.join(',').replace(/ /g, '').split(',')); | ||
var multi = this.multi(); | ||
_.each(types, function(type) { | ||
var key = _this.toKey(type); | ||
switch(type) { | ||
case 'completed': | ||
case 'failed': | ||
case 'delayed': | ||
multi.zcard(key); | ||
break; | ||
case 'active': | ||
case 'wait': | ||
case 'paused': | ||
multi.llen(key); | ||
break; | ||
} | ||
}); | ||
return multi.exec().then(function(res){ | ||
return res.map(function(v) { | ||
return v[1]; | ||
}).reduce(function(a, b) { | ||
return a + b; | ||
}); | ||
}) || 0; | ||
}; | ||
/** | ||
* Returns all the job counts for every list/set in the queue. | ||
* | ||
*/ | ||
Queue.prototype.getJobCounts = function(){ | ||
var types = ['waiting', 'active', 'completed', 'failed', 'delayed']; | ||
var counts = {}; | ||
return this.client.multi() | ||
.llen(this.toKey('wait')) | ||
.llen(this.toKey('active')) | ||
.zcard(this.toKey('completed')) | ||
.zcard(this.toKey('failed')) | ||
.zcard(this.toKey('delayed')) | ||
.exec().then(function(result){ | ||
result.forEach(function(res, index){ | ||
counts[types[index]] = res[1] || 0; | ||
}); | ||
return counts; | ||
}); | ||
}; | ||
Queue.prototype.getCompletedCount = function() { | ||
return this.client.zcard(this.toKey('completed')); | ||
}; | ||
Queue.prototype.getFailedCount = function() { | ||
return this.client.zcard(this.toKey('failed')); | ||
}; | ||
Queue.prototype.getDelayedCount = function() { | ||
return this.client.zcard(this.toKey('delayed')); | ||
}; | ||
Queue.prototype.getActiveCount = function() { | ||
return this.client.llen(this.toKey('active')); | ||
}; | ||
Queue.prototype.getWaitingCount = function() { | ||
return this.client.llen(this.toKey('wait')); | ||
}; | ||
Queue.prototype.getPausedCount = function() { | ||
return this.client.llen(this.toKey('paused')); | ||
}; | ||
Queue.prototype.getWaiting = function(start, end){ | ||
return Promise.join( | ||
this.getJobs('wait', 'LIST', true, start, end), | ||
this.getJobs('paused', 'LIST', true, start, end)).spread(function(waiting, paused){ | ||
return _.concat(waiting, paused); | ||
}); | ||
}; | ||
Queue.prototype.getActive = function(start, end){ | ||
return this.getJobs('active', 'LIST', true, start, end); | ||
}; | ||
Queue.prototype.getDelayed = function(start, end){ | ||
return this.getJobs('delayed', 'ZSET', true, start, end); | ||
}; | ||
Queue.prototype.getCompleted = function(start, end){ | ||
return this.getJobs('completed', 'ZSET', false, start, end); | ||
}; | ||
Queue.prototype.getFailed = function(start, end){ | ||
return this.getJobs('failed', 'ZSET', false, start, end); | ||
}; | ||
Queue.prototype.getJobs = function(queueType, type, asc, start, end){ | ||
var _this = this; | ||
var key = this.toKey(queueType); | ||
var jobs; | ||
start = _.isUndefined(start) ? 0 : start; | ||
end = _.isUndefined(end) ? -1 : end; | ||
switch(type){ | ||
case 'LIST': | ||
if(asc){ | ||
jobs = this.client.lrange(key, -(end + 1), -(start + 1)).then(function(result){ | ||
return result.reverse(); | ||
}); | ||
var key = result[0], jobId = result[1]; | ||
return scripts.moveToActive(_this).spread(function(jobData, jobId){ | ||
if(jobData){ | ||
var job = Job.fromJSON(_this, jobData, jobId); | ||
if(job.opts.repeat){ | ||
return _this.nextRepeatableJob(job.name, job.data, job.opts, true).then(function(){ | ||
return job; | ||
}); | ||
} | ||
return job; | ||
}else{ | ||
jobs = this.client.lrange(key, start, end); | ||
_this.emit('drained'); | ||
} | ||
break; | ||
case 'ZSET': | ||
jobs = asc ? this.client.zrange(key, start, end) : this.client.zrevrange(key, start, end); | ||
break; | ||
} | ||
return jobs.then(function(jobIds){ | ||
var jobsFromId = jobIds.map(_this.getJobFromId); | ||
return Promise.all(jobsFromId); | ||
}); | ||
}, function(err){ | ||
// Swallow error | ||
if(err.message !== 'Connection is closed.'){ | ||
console.error('BLPOP', err); | ||
} | ||
}); | ||
@@ -1189,4 +1000,2 @@ }; | ||
var _this = this; | ||
_this.emit('wait-finished'); | ||
return new Promise(function(resolve){ | ||
@@ -1202,3 +1011,4 @@ Promise.all(_this.processing).finally(function(){ | ||
// | ||
var getRedisVersion = function getRedisVersion(client){ | ||
function getRedisVersion(client){ | ||
return client.info().then(function(doc){ | ||
@@ -1216,1 +1026,4 @@ var prefix = 'redis_version:'; | ||
module.exports = Queue; | ||
@@ -30,11 +30,16 @@ /** | ||
addJob: function(client, toKey, job, opts, token){ | ||
var keys = _.map(['wait', 'paused', 'meta-paused', 'added', 'id', 'delayed', 'priority'], function(name){ | ||
return toKey(name); | ||
}); | ||
addJob: function(client, queue, job, opts, token){ | ||
keys[3] = keys[3] + '@' + token; | ||
var queueKeys = queue.keys; | ||
var keys = [ | ||
queueKeys.wait, | ||
queueKeys.paused, | ||
queueKeys['meta-paused'], | ||
queueKeys.id, | ||
queueKeys.delayed, | ||
queueKeys.priority | ||
]; | ||
var args = [ | ||
toKey(''), | ||
queueKeys[''], | ||
_.isUndefined(opts.customJobId) ? '' : opts.customJobId, | ||
@@ -46,8 +51,9 @@ job.name, | ||
job.delay, | ||
job.delay ? job.timestamp + job.delay : '0', | ||
job.delay ? job.timestamp + job.delay : 0, | ||
opts.priority || 0, | ||
opts.lifo ? 'LIFO' : 'FIFO' | ||
opts.lifo ? 'RPUSH' : 'LPUSH', | ||
queue.token | ||
]; | ||
return client.addJob(keys.concat(args)); | ||
keys = keys.concat(args); | ||
return client.addJob(keys); | ||
}, | ||
@@ -70,5 +76,4 @@ | ||
moveToActive: function(queue){ | ||
var keys = _.map(['wait', 'active', 'priority'], function(name){ | ||
return queue.toKey(name); | ||
}); | ||
var queueKeys = queue.keys; | ||
var keys = [queueKeys.wait, queueKeys.active, queueKeys.priority]; | ||
@@ -78,3 +83,3 @@ keys[3] = keys[1] + '@' + queue.token; | ||
var args = [ | ||
queue.toKey(''), | ||
queueKeys[''], | ||
queue.token, | ||
@@ -115,5 +120,2 @@ queue.settings.lockDuration, | ||
var json = job.toData(); | ||
json[propVal] = val; | ||
var args = [ | ||
@@ -126,3 +128,3 @@ job.id, | ||
shouldRemove ? '1' : '0', | ||
JSON.stringify({job: json, val: val}) | ||
JSON.stringify({jobId: job.id, val: val}), | ||
]; | ||
@@ -148,3 +150,3 @@ | ||
}, | ||
// TODO: add a retention argument for completed and finished jobs (in time). | ||
@@ -160,3 +162,3 @@ moveToCompleted: function(job, returnvalue, removeOnComplete, ignoreLock){ | ||
moveToFailed: function(job, failedReason, removeOnFailed, ignoreLock){ | ||
var args = scripts.moveToFailedArgs(job, failedReason, 'failedReason', removeOnFailed, 'failed', ignoreLock); | ||
var args = scripts.moveToFailedArgs(job, failedReason, 'failedReason', removeOnFailed, 'failed', ignoreLock); | ||
return scripts.moveToFinished(args); | ||
@@ -232,3 +234,3 @@ }, | ||
takeLock: function(queue, job){ | ||
return queue.client.takeLock([job.lockKey(), queue.token, queue.settings.lockDuration]); | ||
return queue.client.takeLock([job.lockKey, queue.token, queue.settings.lockDuration]); | ||
}, | ||
@@ -241,13 +243,9 @@ | ||
updateDelaySet: function(queue, delayedTimestamp){ | ||
var keys = _.map([ | ||
'delayed', | ||
'active', | ||
'wait', | ||
'added'], | ||
function(name){ | ||
return queue.toKey(name); | ||
}); | ||
var keys = [ | ||
queue.keys.delayed, | ||
queue.keys.active, | ||
queue.keys.wait | ||
]; | ||
var args = [queue.toKey(''), delayedTimestamp]; | ||
var args = [queue.toKey(''), delayedTimestamp]; | ||
return queue.client.updateDelaySet(keys.concat(args)); | ||
@@ -257,3 +255,3 @@ }, | ||
/** | ||
* Looks for unlocked jobs in the active queue. | ||
* Looks for unlocked jobs in the active queue. | ||
* | ||
@@ -266,6 +264,11 @@ * The job was being worked on, but the worker process died and it failed to renew the lock. | ||
moveUnlockedJobsToWait: function(queue){ | ||
var keys = _.map(['active', 'wait', 'failed', 'added'], function(key){ | ||
return queue.toKey(key); | ||
}); | ||
var args = [queue.settings.maxStalledCount, queue.toKey(''), Date.now()]; | ||
var keys = [ | ||
queue.keys.active, | ||
queue.keys.wait, | ||
queue.keys.failed, | ||
queue.keys['meta-paused'], | ||
queue.keys.paused, | ||
queue.keys['unlocked-check'] | ||
]; | ||
var args = [queue.settings.maxStalledCount, queue.toKey(''), Date.now(), queue.settings.stalledInterval]; | ||
return queue.client.moveUnlockedJobsToWait(keys.concat(args)); | ||
@@ -349,3 +352,3 @@ }, | ||
var keys = _.map(['active', 'wait', jobId, 'added'], function(name){ | ||
var keys = _.map(['active', 'wait', jobId], function(name){ | ||
return queue.toKey(name); | ||
@@ -355,3 +358,3 @@ }); | ||
var pushCmd = (job.opts.lifo ? 'R' : 'L') + 'PUSH'; | ||
return keys.concat([pushCmd, jobId, ignoreLock ? '0' : job.queue.token]); | ||
@@ -381,4 +384,3 @@ }, | ||
queue.toKey(options.state), | ||
queue.toKey('wait'), | ||
queue.toKey('added') + '@' + queue.token | ||
queue.toKey('wait') | ||
]; | ||
@@ -388,3 +390,4 @@ | ||
job.id, | ||
(job.opts.lifo ? 'R' : 'L') + 'PUSH' | ||
(job.opts.lifo ? 'R' : 'L') + 'PUSH', | ||
queue.token | ||
]; | ||
@@ -391,0 +394,0 @@ |
@@ -77,3 +77,2 @@ /*eslint-env node */ | ||
var id = uuid.v4(); | ||
var now = Date.now(); | ||
var timer = setTimeout(function (timerInstance, timeoutId) { | ||
@@ -93,3 +92,2 @@ timerInstance.clear(timeoutId); | ||
name: name, | ||
created: now, | ||
timer: timer | ||
@@ -96,0 +94,0 @@ }; |
@@ -0,8 +1,11 @@ | ||
/*eslint-env node */ | ||
'use strict'; | ||
module.exports = function(Queue){ | ||
Queue.prototype.setWorkerName = function(){ | ||
return this.client.client('setname', this.clientName()); | ||
}; | ||
module.exports = { | ||
setWorkerName: function(){ | ||
return this.client.client('setname', this.clientName()); | ||
}, | ||
getWorkers: function(){ | ||
Queue.prototype.getWorkers = function(){ | ||
var _this = this; | ||
@@ -12,10 +15,13 @@ return this.client.client('list').then(function(clients){ | ||
}); | ||
}, | ||
base64Name: function(){ | ||
}; | ||
Queue.prototype.base64Name = function(){ | ||
return (new Buffer(this.name)).toString('base64'); | ||
}, | ||
clientName: function(){ | ||
}; | ||
Queue.prototype.clientName = function(){ | ||
return this.keyPrefix + ':' + this.base64Name(); | ||
}, | ||
parseClientList: function(list){ | ||
}; | ||
Queue.prototype.parseClientList = function(list){ | ||
var _this = this; | ||
@@ -40,4 +46,4 @@ var lines = list.split('\n'); | ||
return clients; | ||
} | ||
}; | ||
}; | ||
{ | ||
"name": "bull", | ||
"version": "3.0.0-rc.4", | ||
"version": "3.0.0-rc.5", | ||
"description": "Job manager", | ||
@@ -5,0 +5,0 @@ "main": "./lib/queue", |
@@ -20,3 +20,3 @@ | ||
feature in some usecases. For example, you can have two servers that need to | ||
communicate with each other. By using a queue the servers do not need to be online at the same time, this create a very robust communication channel. You can treat `add` as *send* and `process` as *receive*: | ||
communicate with each other. By using a queue the servers do not need to be online at the same time, this creates a very robust communication channel. You can treat `add` as *send* and `process` as *receive*: | ||
@@ -126,2 +126,2 @@ Server A: | ||
NODE_DEBUG=bull node ./your-script.js | ||
``` | ||
``` |
@@ -115,2 +115,3 @@ | ||
| Delayed jobs | ✓ | ✓ | | ✓ | | ||
| Global events | ✓ | | | | | ||
| Pause/Resume | ✓ | ✓ | | | | ||
@@ -120,2 +121,3 @@ | Repeatable jobs | ✓ | | | ✓ | | ||
| Persistence | ✓ | ✓ | ✓ | ✓ | | ||
| UI | ✓ | ✓ | | ✓ | | ||
| Optimized for | Jobs / Messages | Jobs | Messages | Jobs | | ||
@@ -156,3 +158,3 @@ | ||
var videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379'); | ||
var audioQueue = new Queue('audio transcoding', {redis: {port: 6379, host: '127.0.0.1'}}); // Specify Redis connection using object | ||
var audioQueue = new Queue('audio transcoding', {redis: {port: 6379, host: '127.0.0.1', password: 'foobared'}}); // Specify Redis connection using object | ||
var imageQueue = new Queue('image transcoding'); | ||
@@ -159,0 +161,0 @@ var pdfQueue = new Queue('pdf transcoding'); |
@@ -180,4 +180,5 @@ /*eslint-env node */ | ||
}); | ||
queue.once('failed', function (job) { | ||
queue.once('waiting', function (jobId2) { | ||
queue.once('global:waiting', function (jobId2) { | ||
Job.fromId(queue, jobId2).then(function(job2){ | ||
@@ -188,3 +189,5 @@ expect(job2.data.foo).to.be.equal('bar'); | ||
}); | ||
job.retry(); | ||
queue.once('registered:global:waiting', function(){ | ||
job.retry(); | ||
}); | ||
}); | ||
@@ -191,0 +194,0 @@ }); |
@@ -34,2 +34,40 @@ /*eslint-env node */ | ||
it('should create multiple jobs if they have the same cron pattern', function(done) { | ||
var cron = '*/10 * * * * *'; | ||
var customJobIds = ['customjobone', 'customjobtwo']; | ||
Promise.all([ | ||
queue.add({}, { jobId: customJobIds[0], repeat: { cron: cron }}), | ||
queue.add({}, { jobId: customJobIds[1], repeat: { cron: cron }}) | ||
]).then(function() { | ||
return queue.count(); | ||
}).then(function(count) { | ||
expect(count).to.be.eql(2); | ||
done(); | ||
}).catch(done); | ||
}); | ||
it('should get repeatable jobs with different cron pattern', function(done) { | ||
var crons = ['10 * * * * *', '2 * * 1 * 2', '1 * * 5 * *', '2 * * 4 * *']; | ||
Promise.all([ | ||
queue.add('first', {}, { repeat: { cron: crons[0], endDate: 12345 }}), | ||
queue.add('second', {}, { repeat: { cron: crons[1], endDate: 54321 }}), | ||
queue.add('third', {}, { repeat: { cron: crons[2], tz: 'Africa/Abidjan' }}), | ||
queue.add('fourth', {}, { repeat: { cron: crons[3], tz: 'Africa/Accra' }}), | ||
]).then(function() { | ||
return queue.getRepeatableCount(); | ||
}).then(function(count){ | ||
expect(count).to.be.eql(4); | ||
return queue.getRepeatableJobs(0, -1, true); | ||
}).then(function(jobs){ | ||
expect(jobs).to.be.and.an('array').and.have.length(4); | ||
expect(jobs[0]).to.include({cron: '2 * * 1 * 2', next: 2000, endDate: 54321}); | ||
expect(jobs[1]).to.include({cron: '10 * * * * *', next: 10000, endDate: 12345 }); | ||
expect(jobs[2]).to.include({cron: '2 * * 4 * *', next: 259202000, tz: 'Africa/Accra'}); | ||
expect(jobs[3]).to.include({cron: '1 * * 5 * *', next: 345601000, tz: 'Africa/Abidjan'}); | ||
done(); | ||
}).catch(done); | ||
}); | ||
it('should repeat every 2 seconds', function (done) { | ||
@@ -36,0 +74,0 @@ var _this = this; |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is too big to display
304062
62
5459
358