Comparing version 3.16.0 to 3.17.0
# Changelog | ||
## v.3.17.0 | ||
- feat: better rate limiter (#1816) (@manast) | ||
- feat(sandbox): kill child workers gracefully (#1802) (@GabrielCastro) | ||
[Changes](https://github.com/OptimalBits/bull/compare/v3.16.0...v3.17.0) | ||
## v.3.16.0 | ||
@@ -4,0 +11,0 @@ |
@@ -7,3 +7,6 @@ 'use strict'; | ||
const getPort = require('get-port'); | ||
const { killAsync } = require('./utils'); | ||
const CHILD_KILL_TIMEOUT = 30000; | ||
const ChildPool = function ChildPool() { | ||
@@ -82,4 +85,4 @@ if (!(this instanceof ChildPool)) { | ||
ChildPool.prototype.kill = function(child, signal) { | ||
child.kill(signal || 'SIGKILL'); | ||
this.remove(child); | ||
return killAsync(child, signal || 'SIGKILL', CHILD_KILL_TIMEOUT); | ||
}; | ||
@@ -89,10 +92,10 @@ | ||
const children = _.values(this.retained).concat(this.getAllFree()); | ||
this.retained = {}; | ||
this.free = {}; | ||
const allKillPromises = []; | ||
children.forEach(child => { | ||
// TODO: We may want to use SIGKILL if the process does not die after some time. | ||
this.kill(child, 'SIGTERM'); | ||
allKillPromises.push(this.kill(child, 'SIGTERM')); | ||
}); | ||
this.retained = {}; | ||
this.free = {}; | ||
return Promise.all(allKillPromises).then(() => {}); | ||
}; | ||
@@ -108,8 +111,18 @@ | ||
const initChild = function(child, processFile) { | ||
return new Promise(resolve => { | ||
child.send({ cmd: 'init', value: processFile }, resolve); | ||
async function initChild(child, processFile) { | ||
const onComplete = new Promise(resolve => { | ||
const onMessageHandler = msg => { | ||
if (msg.cmd === 'init-complete') { | ||
resolve(); | ||
child.off('message', onMessageHandler); | ||
} | ||
}; | ||
child.on('message', onMessageHandler); | ||
}); | ||
}; | ||
await new Promise(resolve => | ||
child.send({ cmd: 'init', value: processFile }, resolve) | ||
); | ||
await onComplete; | ||
} | ||
module.exports = ChildPool; |
@@ -10,2 +10,3 @@ /** | ||
let processor; | ||
let currentJobPromise; | ||
@@ -17,2 +18,9 @@ //TODO remove for node >= 10 | ||
// same as process.send but waits until the send is complete | ||
// the async version is used below because otherwise | ||
// the termination handler may exit before the parent | ||
// process has recived the messages it requires | ||
const processSendAsync = promisify(process.send.bind(process)); | ||
// https://stackoverflow.com/questions/18391212/is-it-not-possible-to-stringify-an-error-using-json-stringify | ||
@@ -35,2 +43,16 @@ if (!('toJSON' in Error.prototype)) { | ||
async function waitForCurrentJobAndExit() { | ||
status = 'TERMINATING'; | ||
try { | ||
await currentJobPromise; | ||
} finally { | ||
// it's an exit handler | ||
// eslint-disable-next-line no-process-exit | ||
process.exit(process.exitCode || 0); | ||
} | ||
} | ||
process.on('SIGTERM', waitForCurrentJobAndExit); | ||
process.on('SIGINT', waitForCurrentJobAndExit); | ||
process.on('message', msg => { | ||
@@ -57,2 +79,5 @@ switch (msg.cmd) { | ||
status = 'IDLE'; | ||
process.send({ | ||
cmd: 'init-complete' | ||
}); | ||
break; | ||
@@ -68,23 +93,23 @@ | ||
status = 'STARTED'; | ||
Promise.resolve(processor(wrapJob(msg.job)) || {}) | ||
.then( | ||
result => { | ||
process.send({ | ||
cmd: 'completed', | ||
value: result | ||
}); | ||
}, | ||
err => { | ||
if (!err.message) { | ||
err = new Error(err); | ||
} | ||
process.send({ | ||
cmd: 'failed', | ||
value: err | ||
}); | ||
currentJobPromise = (async () => { | ||
try { | ||
const result = (await processor(wrapJob(msg.job))) || {}; | ||
await processSendAsync({ | ||
cmd: 'completed', | ||
value: result | ||
}); | ||
} catch (err) { | ||
if (!err.message) { | ||
// eslint-disable-next-line no-ex-assign | ||
err = new Error(err); | ||
} | ||
) | ||
.finally(() => { | ||
await processSendAsync({ | ||
cmd: 'failed', | ||
value: err | ||
}); | ||
} finally { | ||
status = 'IDLE'; | ||
}); | ||
currentJobPromise = null; | ||
} | ||
})(); | ||
break; | ||
@@ -91,0 +116,0 @@ case 'stop': |
144
lib/queue.js
@@ -279,3 +279,3 @@ 'use strict'; | ||
return function() { | ||
// getter function | ||
// Memoized connection | ||
if (connections[type] != null) { | ||
@@ -500,44 +500,33 @@ return connections[type]; | ||
function redisClientDisconnect(client) { | ||
if (client.status === 'end') { | ||
return Promise.resolve(); | ||
} | ||
let _resolve, _reject; | ||
return new Promise((resolve, reject) => { | ||
_resolve = resolve; | ||
_reject = reject; | ||
client.once('end', resolve); | ||
client.once('error', reject); | ||
async function redisClientDisconnect(client) { | ||
if (client.status !== 'end') { | ||
let _resolve, _reject; | ||
return new Promise((resolve, reject) => { | ||
_resolve = resolve; | ||
_reject = reject; | ||
client.once('end', _resolve); | ||
pTimeout( | ||
client.quit().catch(err => { | ||
if (err.message !== 'Connection is closed.') { | ||
throw err; | ||
pTimeout( | ||
client.quit().catch(err => { | ||
if (err.message !== 'Connection is closed.') { | ||
throw err; | ||
} | ||
}), | ||
500 | ||
).catch(() => { | ||
client.once('error', _reject); | ||
client.disconnect(); | ||
if (['connecting', 'reconnecting'].includes(client.status)) { | ||
resolve(); | ||
} | ||
}), | ||
500 | ||
).catch(() => { | ||
client.disconnect(); | ||
}); | ||
}).finally(() => { | ||
client.removeListener('end', _resolve); | ||
client.removeListener('error', _reject); | ||
}); | ||
}).finally(() => { | ||
client.removeListener('end', _resolve); | ||
client.removeListener('error', _reject); | ||
}); | ||
} | ||
} | ||
Queue.prototype.disconnect = function() { | ||
// | ||
// TODO: Only quit clients that we "own". | ||
// | ||
const clients = this.clients.filter(client => { | ||
return client.status !== 'end'; | ||
}); | ||
return Promise.all(clients.map(redisClientDisconnect)) | ||
.catch(err => { | ||
return console.error(err); | ||
}) | ||
.then(() => { | ||
return null; | ||
}); | ||
Queue.prototype.disconnect = async function() { | ||
await Promise.all(this.clients.map(redisClientDisconnect)); | ||
}; | ||
@@ -550,2 +539,3 @@ | ||
Queue.prototype.close = function(doNotWaitJobs) { | ||
let isReady = true; | ||
if (this.closing) { | ||
@@ -557,25 +547,27 @@ return this.closing; | ||
.then( | ||
() => { | ||
return this._initializingProcess; | ||
}, | ||
(/*err*/) => { | ||
// Ignore this error and try to close anyway. | ||
() => this._initializingProcess, | ||
err => { | ||
console.error(err); | ||
isReady = false; | ||
} | ||
) | ||
.finally(() => { | ||
return this._clearTimers(); | ||
}) | ||
.finally(() => this._clearTimers()) | ||
.then(() => isReady && this.pause(true, doNotWaitJobs)) | ||
.then(() => { | ||
return this.pause(true, doNotWaitJobs); | ||
if (!this.childPool) { | ||
return; | ||
} | ||
const cleanPromise = this.childPool.clean().catch(() => { | ||
// Ignore this error and try to close anyway. | ||
}); | ||
if (doNotWaitJobs) { | ||
return; | ||
} | ||
return cleanPromise; | ||
}) | ||
.then( | ||
() => { | ||
return this.disconnect(); | ||
}, | ||
(/*err*/) => { | ||
// Ignore this error and try to close anyway. | ||
} | ||
async () => this.disconnect(), | ||
err => console.error(err) | ||
) | ||
.finally(() => { | ||
this.childPool && this.childPool.clean(); | ||
this.closed = true; | ||
@@ -754,22 +746,29 @@ this.emit('close'); | ||
Queue.prototype.empty = function() { | ||
// Get all jobids and empty all lists atomically. | ||
const queueKeys = this.keys; | ||
let multi = this.multi(); | ||
multi.lrange(this.toKey('wait'), 0, -1); | ||
multi.lrange(this.toKey('paused'), 0, -1); | ||
multi.del(this.toKey('wait')); | ||
multi.del(this.toKey('paused')); | ||
multi.del(this.toKey('meta-paused')); | ||
multi.del(this.toKey('delayed')); | ||
multi.del(this.toKey('priority')); | ||
multi.lrange(queueKeys.wait, 0, -1); | ||
multi.lrange(queueKeys.paused, 0, -1); | ||
multi.keys(this.toKey('*:limited')); | ||
multi.del( | ||
queueKeys.wait, | ||
queueKeys.paused, | ||
queueKeys['meta-paused'], | ||
queueKeys.delayed, | ||
queueKeys.priority, | ||
queueKeys.limiter, | ||
`${queueKeys.limiter}:index` | ||
); | ||
return multi.exec().then(res => { | ||
let waiting = res[0], | ||
paused = res[1]; | ||
let [waiting, paused, limited] = res; | ||
waiting = waiting[1]; | ||
paused = paused[1]; | ||
limited = limited[1]; | ||
const jobKeys = paused.concat(waiting).map(this.toKey, this); | ||
if (jobKeys.length) { | ||
if (jobKeys.length || limited.length) { | ||
multi = this.multi(); | ||
@@ -780,2 +779,7 @@ | ||
} | ||
for (let i = 0; i < limited.length; i += 10000) { | ||
multi.del.apply(multi, limited.slice(i, i + 10000)); | ||
} | ||
return multi.exec(); | ||
@@ -817,5 +821,5 @@ } | ||
// Force reconnection of blocking connection to abort blocking redis call immediately. | ||
return redisClientDisconnect(this.bclient).then(() => { | ||
return this.bclient.connect(); | ||
}); | ||
return redisClientDisconnect(this.bclient).then(() => | ||
this.bclient.connect() | ||
); | ||
} | ||
@@ -1237,5 +1241,3 @@ | ||
return Promise.all(this.processing).then(() => { | ||
return forcedReconnection; | ||
}); | ||
return Promise.all(this.processing).then(() => forcedReconnection); | ||
}; | ||
@@ -1242,0 +1244,0 @@ |
@@ -124,3 +124,4 @@ /** | ||
queueKeys.priority, | ||
queueKeys.active + '@' + queue.token | ||
queueKeys.active + '@' + queue.token, | ||
queueKeys.delayed | ||
]; | ||
@@ -285,19 +286,14 @@ | ||
remove(queue, jobId) { | ||
const keys = _.map( | ||
[ | ||
'active', | ||
'wait', | ||
'delayed', | ||
'paused', | ||
'completed', | ||
'failed', | ||
'priority', | ||
jobId, | ||
jobId + ':logs' | ||
], | ||
name => { | ||
return queue.toKey(name); | ||
} | ||
); | ||
const keys = [ | ||
queue.keys.active, | ||
queue.keys.wait, | ||
queue.keys.delayed, | ||
queue.keys.paused, | ||
queue.keys.completed, | ||
queue.keys.failed, | ||
queue.keys.priority, | ||
queue.toKey(jobId), | ||
queue.toKey(`${jobId}:logs`), | ||
queue.keys.limiter | ||
]; | ||
return queue.client.removeJob(keys.concat([jobId, queue.token])); | ||
@@ -307,16 +303,12 @@ }, | ||
async removeWithPattern(queue, pattern) { | ||
const keys = _.map( | ||
[ | ||
'active', | ||
'wait', | ||
'delayed', | ||
'paused', | ||
'completed', | ||
'failed', | ||
'priority' | ||
], | ||
name => { | ||
return queue.toKey(name); | ||
} | ||
); | ||
const keys = [ | ||
queue.keys.active, | ||
queue.keys.wait, | ||
queue.keys.delayed, | ||
queue.keys.paused, | ||
queue.keys.completed, | ||
queue.keys.failed, | ||
queue.keys.priority, | ||
queue.keys.limiter | ||
]; | ||
@@ -417,2 +409,3 @@ const allRemoved = []; | ||
queue.toKey(set), | ||
queue.keys.limiter, | ||
queue.toKey(''), | ||
@@ -419,0 +412,0 @@ ts, |
{ | ||
"name": "bull", | ||
"version": "3.16.0", | ||
"version": "3.17.0", | ||
"description": "Job manager", | ||
@@ -27,8 +27,8 @@ "engines": { | ||
"ioredis": "^4.14.1", | ||
"lodash": "^4.17.15", | ||
"lodash": "^4.17.19", | ||
"p-timeout": "^3.2.0", | ||
"promise.prototype.finally": "^3.1.2", | ||
"semver": "^6.3.0", | ||
"semver": "^7.3.2", | ||
"util.promisify": "^1.0.1", | ||
"uuid": "^8.2.0" | ||
"uuid": "^8.3.0" | ||
}, | ||
@@ -39,3 +39,3 @@ "devDependencies": { | ||
"chai": "^4.2.0", | ||
"coveralls": "^3.0.9", | ||
"coveralls": "^3.1.0", | ||
"delay": "^4.3.0", | ||
@@ -46,8 +46,9 @@ "eslint": "^7.4.0", | ||
"expect.js": "^0.3.1", | ||
"husky": "^1.3.1", | ||
"husky": "^4.2.5", | ||
"istanbul": "^0.4.5", | ||
"lint-staged": "^8.2.1", | ||
"mocha": "^6.2.2", | ||
"mocha": "^8.1.1", | ||
"mocha-lcov-reporter": "^1.3.0", | ||
"moment": "^2.24.0", | ||
"nyc": "^15.1.0", | ||
"p-reflect": "^1.0.0", | ||
@@ -60,5 +61,5 @@ "prettier": "^1.19.1", | ||
"lint": "eslint lib test *.js", | ||
"test": "NODE_ENV=test mocha 'test/test_*'", | ||
"test:nolint": "NODE_ENV=test mocha 'test/test_*'", | ||
"coveralls": "istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- --exit -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage", | ||
"test": "NODE_ENV=test nyc mocha -- 'test/test_*' --recursive --exit", | ||
"test:nolint": "NODE_ENV=test mocha 'test/test_*' --recursive --exit", | ||
"coverage": "nyc report --reporter=text-lcov | coveralls", | ||
"postpublish": "git push && git push --tags", | ||
@@ -65,0 +66,0 @@ "prettier": "prettier --config package.json --write '**/*.js'", |
Sorry, the diff of this file is not supported yet
917346
116
3093
19
+ Addedsemver@7.6.3(transitive)
- Removedsemver@6.3.1(transitive)
Updatedlodash@^4.17.19
Updatedsemver@^7.3.2
Updateduuid@^8.3.0