Comparing version 3.13.0 to 3.14.0
# Changelog | ||
## v.3.14.0 | ||
- feat(queue): add removeJobs function | ||
- fix: clamp negative job delay values to 0 to prevent thrashing | ||
- fix: use DEFAULT_JOB_NAME (#1585) | ||
- fix: remove the lazy client error handler on close (#1605) | ||
- fix: prevent exceeding the maximum stack call size when emptying large queues (#1660) | ||
[Changes](https://github.com/OptimalBits/bull/compare/v3.13.0...v3.14.0) | ||
## v.3.13.0 | ||
feat: add "preventParsingData" job option to prevent data parsing | ||
fix: queue.clean clean job logs as well | ||
fix: whenCurrentJobsFinished should wait for all jobs | ||
- feat: add "preventParsingData" job option to prevent data parsing | ||
- fix: queue.clean clean job logs as well | ||
- fix: whenCurrentJobsFinished should wait for all jobs | ||
@@ -9,0 +19,0 @@ [Changes](https://github.com/OptimalBits/bull/compare/v3.12.1...v3.13.0) |
160
lib/job.js
@@ -11,2 +11,3 @@ 'use strict'; | ||
const FINISHED_WATCHDOG = 5000; | ||
const DEFAULT_JOB_NAME = '__default__'; | ||
@@ -27,3 +28,3 @@ /** | ||
data = name; | ||
name = '__default__'; | ||
name = DEFAULT_JOB_NAME; | ||
} | ||
@@ -38,3 +39,3 @@ | ||
this._progress = 0; | ||
this.delay = this.opts.delay; | ||
this.delay = this.opts.delay < 0 ? 0 : this.opts.delay; | ||
this.timestamp = this.opts.timestamp; | ||
@@ -62,3 +63,3 @@ this.stacktrace = []; | ||
Job.DEFAULT_JOB_NAME = '__default__'; | ||
Job.DEFAULT_JOB_NAME = DEFAULT_JOB_NAME; | ||
@@ -125,2 +126,8 @@ function addJob(queue, client, job) { | ||
Job.remove = async function(queue, pattern) { | ||
await queue.isReady(); | ||
const removed = await scripts.removeWithPattern(queue, pattern); | ||
removed.forEach(jobId => queue.emit('removed', jobId)); | ||
}; | ||
Job.prototype.progress = function(progress) { | ||
@@ -215,18 +222,20 @@ if (_.isUndefined(progress)) { | ||
) { | ||
this.returnvalue = returnValue || 0; | ||
return this.queue.isReady().then(() => { | ||
this.returnvalue = returnValue || 0; | ||
returnValue = utils.tryCatch(JSON.stringify, JSON, [returnValue]); | ||
if (returnValue === utils.errorObject) { | ||
const err = utils.errorObject.value; | ||
return Promise.reject(err); | ||
} | ||
this.finishedOn = Date.now(); | ||
returnValue = utils.tryCatch(JSON.stringify, JSON, [returnValue]); | ||
if (returnValue === utils.errorObject) { | ||
const err = utils.errorObject.value; | ||
return Promise.reject(err); | ||
} | ||
this.finishedOn = Date.now(); | ||
return scripts.moveToCompleted( | ||
this, | ||
returnValue, | ||
this.opts.removeOnComplete, | ||
ignoreLock, | ||
notFetch | ||
); | ||
return scripts.moveToCompleted( | ||
this, | ||
returnValue, | ||
this.opts.removeOnComplete, | ||
ignoreLock, | ||
notFetch | ||
); | ||
}); | ||
}; | ||
@@ -246,59 +255,61 @@ | ||
this.failedReason = err.message; | ||
return new Promise(async (resolve, reject) => { | ||
let command; | ||
const multi = this.queue.client.multi(); | ||
this._saveAttempt(multi, err); | ||
return this.queue.isReady().then(() => { | ||
return new Promise(async (resolve, reject) => { | ||
let command; | ||
const multi = this.queue.client.multi(); | ||
this._saveAttempt(multi, err); | ||
// Check if an automatic retry should be performed | ||
let moveToFailed = false; | ||
if (this.attemptsMade < this.opts.attempts && !this._discarded) { | ||
// Check if backoff is needed | ||
const delay = await backoffs.calculate( | ||
this.opts.backoff, | ||
this.attemptsMade, | ||
this.queue.settings.backoffStrategies, | ||
err | ||
); | ||
// Check if an automatic retry should be performed | ||
let moveToFailed = false; | ||
if (this.attemptsMade < this.opts.attempts && !this._discarded) { | ||
// Check if backoff is needed | ||
const delay = await backoffs.calculate( | ||
this.opts.backoff, | ||
this.attemptsMade, | ||
this.queue.settings.backoffStrategies, | ||
err | ||
); | ||
if (delay === -1) { | ||
// If delay is -1, we should no continue retrying | ||
if (delay === -1) { | ||
// If delay is -1, we should no continue retrying | ||
moveToFailed = true; | ||
} else if (delay) { | ||
// If so, move to delayed (need to unlock job in this case!) | ||
const args = scripts.moveToDelayedArgs( | ||
this.queue, | ||
this.id, | ||
Date.now() + delay, | ||
ignoreLock | ||
); | ||
multi.moveToDelayed(args); | ||
command = 'delayed'; | ||
} else { | ||
// If not, retry immediately | ||
multi.retryJob(scripts.retryJobArgs(this, ignoreLock)); | ||
command = 'retry'; | ||
} | ||
} else { | ||
// If not, move to failed | ||
moveToFailed = true; | ||
} else if (delay) { | ||
// If so, move to delayed (need to unlock job in this case!) | ||
const args = scripts.moveToDelayedArgs( | ||
this.queue, | ||
this.id, | ||
Date.now() + delay, | ||
} | ||
if (moveToFailed) { | ||
this.finishedOn = Date.now(); | ||
const args = scripts.moveToFailedArgs( | ||
this, | ||
err.message, | ||
this.opts.removeOnFail, | ||
ignoreLock | ||
); | ||
multi.moveToDelayed(args); | ||
command = 'delayed'; | ||
} else { | ||
// If not, retry immediately | ||
multi.retryJob(scripts.retryJobArgs(this, ignoreLock)); | ||
command = 'retry'; | ||
multi.moveToFinished(args); | ||
command = 'failed'; | ||
} | ||
} else { | ||
// If not, move to failed | ||
moveToFailed = true; | ||
} | ||
if (moveToFailed) { | ||
this.finishedOn = Date.now(); | ||
const args = scripts.moveToFailedArgs( | ||
this, | ||
err.message, | ||
this.opts.removeOnFail, | ||
ignoreLock | ||
); | ||
multi.moveToFinished(args); | ||
command = 'failed'; | ||
} | ||
return multi.exec().then(results => { | ||
const code = _.last(results)[1]; | ||
if (code < 0) { | ||
return reject(scripts.finishedErrors(code, this.id, command)); | ||
} | ||
resolve(); | ||
}, reject); | ||
return multi.exec().then(results => { | ||
const code = _.last(results)[1]; | ||
if (code < 0) { | ||
return reject(scripts.finishedErrors(code, this.id, command)); | ||
} | ||
resolve(); | ||
}, reject); | ||
}); | ||
}); | ||
@@ -314,8 +325,9 @@ }; | ||
const jobId = this.id; | ||
return scripts.promote(queue, jobId).then(result => { | ||
if (result === -1) { | ||
throw new Error('Job ' + jobId + ' is not in a delayed state'); | ||
} | ||
}); | ||
return queue.isReady().then(() => | ||
scripts.promote(queue, jobId).then(result => { | ||
if (result === -1) { | ||
throw new Error('Job ' + jobId + ' is not in a delayed state'); | ||
} | ||
}) | ||
); | ||
}; | ||
@@ -322,0 +334,0 @@ |
@@ -147,3 +147,5 @@ 'use strict'; | ||
// bubble up Redis error events | ||
client.on('error', this.emit.bind(this, 'error')); | ||
const handler = this.emit.bind(this, 'error'); | ||
client.on('error', handler); | ||
this.once('close', () => client.removeListener('error', handler)); | ||
@@ -377,3 +379,3 @@ if (type === 'client') { | ||
this.eclient.on('pmessage', (pattern, channel, message) => { | ||
const pmessageHandler = (pattern, channel, message) => { | ||
const keyAndToken = channel.split('@'); | ||
@@ -399,5 +401,5 @@ const key = keyAndToken[0]; | ||
} | ||
}); | ||
}; | ||
this.eclient.on('message', (channel, message) => { | ||
const messageHandler = (channel, message) => { | ||
const key = channel.split('@')[0]; | ||
@@ -442,2 +444,10 @@ switch (key) { | ||
} | ||
}; | ||
this.eclient.on('pmessage', pmessageHandler); | ||
this.eclient.on('message', messageHandler); | ||
this.once('close', () => { | ||
this.eclient.removeListener('pmessage', pmessageHandler); | ||
this.eclient.removeListener('message', messageHandler); | ||
}); | ||
@@ -537,2 +547,6 @@ }; | ||
Queue.prototype.removeJobs = function(pattern) { | ||
return Job.remove(this, pattern); | ||
}; | ||
Queue.prototype.close = function(doNotWaitJobs) { | ||
@@ -569,2 +583,3 @@ if (this.closing) { | ||
this.closed = true; | ||
this.emit('close'); | ||
})); | ||
@@ -759,3 +774,5 @@ }; | ||
multi.del.apply(multi, jobKeys); | ||
for (let i = 0; i < jobKeys.length; i += 10000) { | ||
multi.del.apply(multi, jobKeys.slice(i, i + 10000)); | ||
} | ||
return multi.exec(); | ||
@@ -762,0 +779,0 @@ } |
@@ -302,2 +302,31 @@ /** | ||
async removeWithPattern(queue, pattern) { | ||
const keys = _.map( | ||
[ | ||
'active', | ||
'wait', | ||
'delayed', | ||
'paused', | ||
'completed', | ||
'failed', | ||
'priority' | ||
], | ||
name => { | ||
return queue.toKey(name); | ||
} | ||
); | ||
const allRemoved = []; | ||
let cursor = '0', | ||
removed; | ||
do { | ||
[cursor, removed] = await queue.client.removeJobs( | ||
keys.concat([queue.toKey(''), pattern, cursor]) | ||
); | ||
allRemoved.push.apply(allRemoved, removed); | ||
} while (cursor !== '0'); | ||
return allRemoved; | ||
}, | ||
extendLock(queue, jobId) { | ||
@@ -304,0 +333,0 @@ return queue.client.extendLock([ |
{ | ||
"name": "bull", | ||
"version": "3.13.0", | ||
"version": "3.14.0", | ||
"description": "Job manager", | ||
@@ -5,0 +5,0 @@ "engines": { |
@@ -423,3 +423,3 @@ | ||
1. The Node process running your job processor unexpectedly terminates. | ||
2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the `lockDuration` setting (with the tradeoff being that it will take longer to recognize a real stalled job). | ||
2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see [#488](https://github.com/OptimalBits/bull/issues/488) for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the `lockDuration` setting (with the tradeoff being that it will take longer to recognize a real stalled job). | ||
@@ -426,0 +426,0 @@ As such, you should always listen for the `stalled` event and log this to your error monitoring system, as this means your jobs are likely getting double-processed. |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
305693
40
3015
20