New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 3.16.0 to 3.17.0

.nyc_output/0c34bd47-d663-4c22-b371-a89f424b464e.json

7

CHANGELOG.md
# Changelog
## v.3.17.0
- feat: better rate limiter (#1816) (@manast)
- feat(sandbox): kill child workers gracefully (#1802) (@GabrielCastro)
[Changes](https://github.com/OptimalBits/bull/compare/v3.16.0...v3.17.0)
## v.3.16.0

@@ -4,0 +11,0 @@

33

lib/process/child-pool.js

@@ -7,3 +7,6 @@ 'use strict';

const getPort = require('get-port');
const { killAsync } = require('./utils');
const CHILD_KILL_TIMEOUT = 30000;
const ChildPool = function ChildPool() {

@@ -82,4 +85,4 @@ if (!(this instanceof ChildPool)) {

ChildPool.prototype.kill = function(child, signal) {
child.kill(signal || 'SIGKILL');
this.remove(child);
return killAsync(child, signal || 'SIGKILL', CHILD_KILL_TIMEOUT);
};

@@ -89,10 +92,10 @@

const children = _.values(this.retained).concat(this.getAllFree());
this.retained = {};
this.free = {};
const allKillPromises = [];
children.forEach(child => {
// TODO: We may want to use SIGKILL if the process does not die after some time.
this.kill(child, 'SIGTERM');
allKillPromises.push(this.kill(child, 'SIGTERM'));
});
this.retained = {};
this.free = {};
return Promise.all(allKillPromises).then(() => {});
};

@@ -108,8 +111,18 @@

const initChild = function(child, processFile) {
return new Promise(resolve => {
child.send({ cmd: 'init', value: processFile }, resolve);
async function initChild(child, processFile) {
const onComplete = new Promise(resolve => {
const onMessageHandler = msg => {
if (msg.cmd === 'init-complete') {
resolve();
child.off('message', onMessageHandler);
}
};
child.on('message', onMessageHandler);
});
};
await new Promise(resolve =>
child.send({ cmd: 'init', value: processFile }, resolve)
);
await onComplete;
}
module.exports = ChildPool;

@@ -10,2 +10,3 @@ /**

let processor;
let currentJobPromise;

@@ -17,2 +18,9 @@ //TODO remove for node >= 10

// same as process.send but waits until the send is complete
// the async version is used below because otherwise
// the termination handler may exit before the parent
// process has recived the messages it requires
const processSendAsync = promisify(process.send.bind(process));
// https://stackoverflow.com/questions/18391212/is-it-not-possible-to-stringify-an-error-using-json-stringify

@@ -35,2 +43,16 @@ if (!('toJSON' in Error.prototype)) {

async function waitForCurrentJobAndExit() {
status = 'TERMINATING';
try {
await currentJobPromise;
} finally {
// it's an exit handler
// eslint-disable-next-line no-process-exit
process.exit(process.exitCode || 0);
}
}
process.on('SIGTERM', waitForCurrentJobAndExit);
process.on('SIGINT', waitForCurrentJobAndExit);
process.on('message', msg => {

@@ -57,2 +79,5 @@ switch (msg.cmd) {

status = 'IDLE';
process.send({
cmd: 'init-complete'
});
break;

@@ -68,23 +93,23 @@

status = 'STARTED';
Promise.resolve(processor(wrapJob(msg.job)) || {})
.then(
result => {
process.send({
cmd: 'completed',
value: result
});
},
err => {
if (!err.message) {
err = new Error(err);
}
process.send({
cmd: 'failed',
value: err
});
currentJobPromise = (async () => {
try {
const result = (await processor(wrapJob(msg.job))) || {};
await processSendAsync({
cmd: 'completed',
value: result
});
} catch (err) {
if (!err.message) {
// eslint-disable-next-line no-ex-assign
err = new Error(err);
}
)
.finally(() => {
await processSendAsync({
cmd: 'failed',
value: err
});
} finally {
status = 'IDLE';
});
currentJobPromise = null;
}
})();
break;

@@ -91,0 +116,0 @@ case 'stop':

@@ -279,3 +279,3 @@ 'use strict';

return function() {
// getter function
// Memoized connection
if (connections[type] != null) {

@@ -500,44 +500,33 @@ return connections[type];

function redisClientDisconnect(client) {
if (client.status === 'end') {
return Promise.resolve();
}
let _resolve, _reject;
return new Promise((resolve, reject) => {
_resolve = resolve;
_reject = reject;
client.once('end', resolve);
client.once('error', reject);
async function redisClientDisconnect(client) {
if (client.status !== 'end') {
let _resolve, _reject;
return new Promise((resolve, reject) => {
_resolve = resolve;
_reject = reject;
client.once('end', _resolve);
pTimeout(
client.quit().catch(err => {
if (err.message !== 'Connection is closed.') {
throw err;
pTimeout(
client.quit().catch(err => {
if (err.message !== 'Connection is closed.') {
throw err;
}
}),
500
).catch(() => {
client.once('error', _reject);
client.disconnect();
if (['connecting', 'reconnecting'].includes(client.status)) {
resolve();
}
}),
500
).catch(() => {
client.disconnect();
});
}).finally(() => {
client.removeListener('end', _resolve);
client.removeListener('error', _reject);
});
}).finally(() => {
client.removeListener('end', _resolve);
client.removeListener('error', _reject);
});
}
}
Queue.prototype.disconnect = function() {
//
// TODO: Only quit clients that we "own".
//
const clients = this.clients.filter(client => {
return client.status !== 'end';
});
return Promise.all(clients.map(redisClientDisconnect))
.catch(err => {
return console.error(err);
})
.then(() => {
return null;
});
Queue.prototype.disconnect = async function() {
await Promise.all(this.clients.map(redisClientDisconnect));
};

@@ -550,2 +539,3 @@

Queue.prototype.close = function(doNotWaitJobs) {
let isReady = true;
if (this.closing) {

@@ -557,25 +547,27 @@ return this.closing;

.then(
() => {
return this._initializingProcess;
},
(/*err*/) => {
// Ignore this error and try to close anyway.
() => this._initializingProcess,
err => {
console.error(err);
isReady = false;
}
)
.finally(() => {
return this._clearTimers();
})
.finally(() => this._clearTimers())
.then(() => isReady && this.pause(true, doNotWaitJobs))
.then(() => {
return this.pause(true, doNotWaitJobs);
if (!this.childPool) {
return;
}
const cleanPromise = this.childPool.clean().catch(() => {
// Ignore this error and try to close anyway.
});
if (doNotWaitJobs) {
return;
}
return cleanPromise;
})
.then(
() => {
return this.disconnect();
},
(/*err*/) => {
// Ignore this error and try to close anyway.
}
async () => this.disconnect(),
err => console.error(err)
)
.finally(() => {
this.childPool && this.childPool.clean();
this.closed = true;

@@ -754,22 +746,29 @@ this.emit('close');

Queue.prototype.empty = function() {
// Get all jobids and empty all lists atomically.
const queueKeys = this.keys;
let multi = this.multi();
multi.lrange(this.toKey('wait'), 0, -1);
multi.lrange(this.toKey('paused'), 0, -1);
multi.del(this.toKey('wait'));
multi.del(this.toKey('paused'));
multi.del(this.toKey('meta-paused'));
multi.del(this.toKey('delayed'));
multi.del(this.toKey('priority'));
multi.lrange(queueKeys.wait, 0, -1);
multi.lrange(queueKeys.paused, 0, -1);
multi.keys(this.toKey('*:limited'));
multi.del(
queueKeys.wait,
queueKeys.paused,
queueKeys['meta-paused'],
queueKeys.delayed,
queueKeys.priority,
queueKeys.limiter,
`${queueKeys.limiter}:index`
);
return multi.exec().then(res => {
let waiting = res[0],
paused = res[1];
let [waiting, paused, limited] = res;
waiting = waiting[1];
paused = paused[1];
limited = limited[1];
const jobKeys = paused.concat(waiting).map(this.toKey, this);
if (jobKeys.length) {
if (jobKeys.length || limited.length) {
multi = this.multi();

@@ -780,2 +779,7 @@

}
for (let i = 0; i < limited.length; i += 10000) {
multi.del.apply(multi, limited.slice(i, i + 10000));
}
return multi.exec();

@@ -817,5 +821,5 @@ }

// Force reconnection of blocking connection to abort blocking redis call immediately.
return redisClientDisconnect(this.bclient).then(() => {
return this.bclient.connect();
});
return redisClientDisconnect(this.bclient).then(() =>
this.bclient.connect()
);
}

@@ -1237,5 +1241,3 @@

return Promise.all(this.processing).then(() => {
return forcedReconnection;
});
return Promise.all(this.processing).then(() => forcedReconnection);
};

@@ -1242,0 +1244,0 @@

@@ -124,3 +124,4 @@ /**

queueKeys.priority,
queueKeys.active + '@' + queue.token
queueKeys.active + '@' + queue.token,
queueKeys.delayed
];

@@ -285,19 +286,14 @@

remove(queue, jobId) {
const keys = _.map(
[
'active',
'wait',
'delayed',
'paused',
'completed',
'failed',
'priority',
jobId,
jobId + ':logs'
],
name => {
return queue.toKey(name);
}
);
const keys = [
queue.keys.active,
queue.keys.wait,
queue.keys.delayed,
queue.keys.paused,
queue.keys.completed,
queue.keys.failed,
queue.keys.priority,
queue.toKey(jobId),
queue.toKey(`${jobId}:logs`),
queue.keys.limiter
];
return queue.client.removeJob(keys.concat([jobId, queue.token]));

@@ -307,16 +303,12 @@ },

async removeWithPattern(queue, pattern) {
const keys = _.map(
[
'active',
'wait',
'delayed',
'paused',
'completed',
'failed',
'priority'
],
name => {
return queue.toKey(name);
}
);
const keys = [
queue.keys.active,
queue.keys.wait,
queue.keys.delayed,
queue.keys.paused,
queue.keys.completed,
queue.keys.failed,
queue.keys.priority,
queue.keys.limiter
];

@@ -417,2 +409,3 @@ const allRemoved = [];

queue.toKey(set),
queue.keys.limiter,
queue.toKey(''),

@@ -419,0 +412,0 @@ ts,

{
"name": "bull",
"version": "3.16.0",
"version": "3.17.0",
"description": "Job manager",

@@ -27,8 +27,8 @@ "engines": {

"ioredis": "^4.14.1",
"lodash": "^4.17.15",
"lodash": "^4.17.19",
"p-timeout": "^3.2.0",
"promise.prototype.finally": "^3.1.2",
"semver": "^6.3.0",
"semver": "^7.3.2",
"util.promisify": "^1.0.1",
"uuid": "^8.2.0"
"uuid": "^8.3.0"
},

@@ -39,3 +39,3 @@ "devDependencies": {

"chai": "^4.2.0",
"coveralls": "^3.0.9",
"coveralls": "^3.1.0",
"delay": "^4.3.0",

@@ -46,8 +46,9 @@ "eslint": "^7.4.0",

"expect.js": "^0.3.1",
"husky": "^1.3.1",
"husky": "^4.2.5",
"istanbul": "^0.4.5",
"lint-staged": "^8.2.1",
"mocha": "^6.2.2",
"mocha": "^8.1.1",
"mocha-lcov-reporter": "^1.3.0",
"moment": "^2.24.0",
"nyc": "^15.1.0",
"p-reflect": "^1.0.0",

@@ -60,5 +61,5 @@ "prettier": "^1.19.1",

"lint": "eslint lib test *.js",
"test": "NODE_ENV=test mocha 'test/test_*'",
"test:nolint": "NODE_ENV=test mocha 'test/test_*'",
"coveralls": "istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- --exit -R spec && cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js && rm -rf ./coverage",
"test": "NODE_ENV=test nyc mocha -- 'test/test_*' --recursive --exit",
"test:nolint": "NODE_ENV=test mocha 'test/test_*' --recursive --exit",
"coverage": "nyc report --reporter=text-lcov | coveralls",
"postpublish": "git push && git push --tags",

@@ -65,0 +66,0 @@ "prettier": "prettier --config package.json --write '**/*.js'",

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc