Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bull

Package Overview
Dependencies
Maintainers
1
Versions
198
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bull - npm Package Compare versions

Comparing version 3.13.0 to 3.14.0

lib/commands/removeJobs-7.lua

16

CHANGELOG.md
# Changelog
## v.3.14.0
- feat(queue): add removeJobs function
- fix: clamp negative job delay values to 0 to prevent thrashing
- fix: use DEFAULT_JOB_NAME (#1585)
- fix: remove the lazy client error handler on close (#1605)
- fix: prevent exceeding the maximum stack call size when emptying large queues (#1660)
[Changes](https://github.com/OptimalBits/bull/compare/v3.13.0...v3.14.0)
## v.3.13.0
feat: add "preventParsingData" job option to prevent data parsing
fix: queue.clean clean job logs as well
fix: whenCurrentJobsFinished should wait for all jobs
- feat: add "preventParsingData" job option to prevent data parsing
- fix: queue.clean clean job logs as well
- fix: whenCurrentJobsFinished should wait for all jobs

@@ -9,0 +19,0 @@ [Changes](https://github.com/OptimalBits/bull/compare/v3.12.1...v3.13.0)

160

lib/job.js

@@ -11,2 +11,3 @@ 'use strict';

const FINISHED_WATCHDOG = 5000;
const DEFAULT_JOB_NAME = '__default__';

@@ -27,3 +28,3 @@ /**

data = name;
name = '__default__';
name = DEFAULT_JOB_NAME;
}

@@ -38,3 +39,3 @@

this._progress = 0;
this.delay = this.opts.delay;
this.delay = this.opts.delay < 0 ? 0 : this.opts.delay;
this.timestamp = this.opts.timestamp;

@@ -62,3 +63,3 @@ this.stacktrace = [];

Job.DEFAULT_JOB_NAME = '__default__';
Job.DEFAULT_JOB_NAME = DEFAULT_JOB_NAME;

@@ -125,2 +126,8 @@ function addJob(queue, client, job) {

Job.remove = async function(queue, pattern) {
await queue.isReady();
const removed = await scripts.removeWithPattern(queue, pattern);
removed.forEach(jobId => queue.emit('removed', jobId));
};
Job.prototype.progress = function(progress) {

@@ -215,18 +222,20 @@ if (_.isUndefined(progress)) {

) {
this.returnvalue = returnValue || 0;
return this.queue.isReady().then(() => {
this.returnvalue = returnValue || 0;
returnValue = utils.tryCatch(JSON.stringify, JSON, [returnValue]);
if (returnValue === utils.errorObject) {
const err = utils.errorObject.value;
return Promise.reject(err);
}
this.finishedOn = Date.now();
returnValue = utils.tryCatch(JSON.stringify, JSON, [returnValue]);
if (returnValue === utils.errorObject) {
const err = utils.errorObject.value;
return Promise.reject(err);
}
this.finishedOn = Date.now();
return scripts.moveToCompleted(
this,
returnValue,
this.opts.removeOnComplete,
ignoreLock,
notFetch
);
return scripts.moveToCompleted(
this,
returnValue,
this.opts.removeOnComplete,
ignoreLock,
notFetch
);
});
};

@@ -246,59 +255,61 @@

this.failedReason = err.message;
return new Promise(async (resolve, reject) => {
let command;
const multi = this.queue.client.multi();
this._saveAttempt(multi, err);
return this.queue.isReady().then(() => {
return new Promise(async (resolve, reject) => {
let command;
const multi = this.queue.client.multi();
this._saveAttempt(multi, err);
// Check if an automatic retry should be performed
let moveToFailed = false;
if (this.attemptsMade < this.opts.attempts && !this._discarded) {
// Check if backoff is needed
const delay = await backoffs.calculate(
this.opts.backoff,
this.attemptsMade,
this.queue.settings.backoffStrategies,
err
);
// Check if an automatic retry should be performed
let moveToFailed = false;
if (this.attemptsMade < this.opts.attempts && !this._discarded) {
// Check if backoff is needed
const delay = await backoffs.calculate(
this.opts.backoff,
this.attemptsMade,
this.queue.settings.backoffStrategies,
err
);
if (delay === -1) {
// If delay is -1, we should no continue retrying
if (delay === -1) {
// If delay is -1, we should no continue retrying
moveToFailed = true;
} else if (delay) {
// If so, move to delayed (need to unlock job in this case!)
const args = scripts.moveToDelayedArgs(
this.queue,
this.id,
Date.now() + delay,
ignoreLock
);
multi.moveToDelayed(args);
command = 'delayed';
} else {
// If not, retry immediately
multi.retryJob(scripts.retryJobArgs(this, ignoreLock));
command = 'retry';
}
} else {
// If not, move to failed
moveToFailed = true;
} else if (delay) {
// If so, move to delayed (need to unlock job in this case!)
const args = scripts.moveToDelayedArgs(
this.queue,
this.id,
Date.now() + delay,
}
if (moveToFailed) {
this.finishedOn = Date.now();
const args = scripts.moveToFailedArgs(
this,
err.message,
this.opts.removeOnFail,
ignoreLock
);
multi.moveToDelayed(args);
command = 'delayed';
} else {
// If not, retry immediately
multi.retryJob(scripts.retryJobArgs(this, ignoreLock));
command = 'retry';
multi.moveToFinished(args);
command = 'failed';
}
} else {
// If not, move to failed
moveToFailed = true;
}
if (moveToFailed) {
this.finishedOn = Date.now();
const args = scripts.moveToFailedArgs(
this,
err.message,
this.opts.removeOnFail,
ignoreLock
);
multi.moveToFinished(args);
command = 'failed';
}
return multi.exec().then(results => {
const code = _.last(results)[1];
if (code < 0) {
return reject(scripts.finishedErrors(code, this.id, command));
}
resolve();
}, reject);
return multi.exec().then(results => {
const code = _.last(results)[1];
if (code < 0) {
return reject(scripts.finishedErrors(code, this.id, command));
}
resolve();
}, reject);
});
});

@@ -314,8 +325,9 @@ };

const jobId = this.id;
return scripts.promote(queue, jobId).then(result => {
if (result === -1) {
throw new Error('Job ' + jobId + ' is not in a delayed state');
}
});
return queue.isReady().then(() =>
scripts.promote(queue, jobId).then(result => {
if (result === -1) {
throw new Error('Job ' + jobId + ' is not in a delayed state');
}
})
);
};

@@ -322,0 +334,0 @@

@@ -147,3 +147,5 @@ 'use strict';

// bubble up Redis error events
client.on('error', this.emit.bind(this, 'error'));
const handler = this.emit.bind(this, 'error');
client.on('error', handler);
this.once('close', () => client.removeListener('error', handler));

@@ -377,3 +379,3 @@ if (type === 'client') {

this.eclient.on('pmessage', (pattern, channel, message) => {
const pmessageHandler = (pattern, channel, message) => {
const keyAndToken = channel.split('@');

@@ -399,5 +401,5 @@ const key = keyAndToken[0];

}
});
};
this.eclient.on('message', (channel, message) => {
const messageHandler = (channel, message) => {
const key = channel.split('@')[0];

@@ -442,2 +444,10 @@ switch (key) {

}
};
this.eclient.on('pmessage', pmessageHandler);
this.eclient.on('message', messageHandler);
this.once('close', () => {
this.eclient.removeListener('pmessage', pmessageHandler);
this.eclient.removeListener('message', messageHandler);
});

@@ -537,2 +547,6 @@ };

Queue.prototype.removeJobs = function(pattern) {
return Job.remove(this, pattern);
};
Queue.prototype.close = function(doNotWaitJobs) {

@@ -569,2 +583,3 @@ if (this.closing) {

this.closed = true;
this.emit('close');
}));

@@ -759,3 +774,5 @@ };

multi.del.apply(multi, jobKeys);
for (let i = 0; i < jobKeys.length; i += 10000) {
multi.del.apply(multi, jobKeys.slice(i, i + 10000));
}
return multi.exec();

@@ -762,0 +779,0 @@ }

@@ -302,2 +302,31 @@ /**

async removeWithPattern(queue, pattern) {
const keys = _.map(
[
'active',
'wait',
'delayed',
'paused',
'completed',
'failed',
'priority'
],
name => {
return queue.toKey(name);
}
);
const allRemoved = [];
let cursor = '0',
removed;
do {
[cursor, removed] = await queue.client.removeJobs(
keys.concat([queue.toKey(''), pattern, cursor])
);
allRemoved.push.apply(allRemoved, removed);
} while (cursor !== '0');
return allRemoved;
},
extendLock(queue, jobId) {

@@ -304,0 +333,0 @@ return queue.client.extendLock([

{
"name": "bull",
"version": "3.13.0",
"version": "3.14.0",
"description": "Job manager",

@@ -5,0 +5,0 @@ "engines": {

@@ -423,3 +423,3 @@

1. The Node process running your job processor unexpectedly terminates.
2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see #488 for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the `lockDuration` setting (with the tradeoff being that it will take longer to recognize a real stalled job).
2. Your job processor was too CPU-intensive and stalled the Node event loop, and as a result, Bull couldn't renew the job lock (see [#488](https://github.com/OptimalBits/bull/issues/488) for how we might better detect this). You can fix this by breaking your job processor into smaller parts so that no single part can block the Node event loop. Alternatively, you can pass a larger value for the `lockDuration` setting (with the tradeoff being that it will take longer to recognize a real stalled job).

@@ -426,0 +426,0 @@ As such, you should always listen for the `stalled` event and log this to your error monitoring system, as this means your jobs are likely getting double-processed.

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc