Socket
Socket
Sign inDemoInstall

bullmq

Package Overview
Dependencies
36
Maintainers
1
Versions
479
Alerts
File Explorer

Advanced tools

Install Socket

Detect and block malicious and high-risk dependencies

Install

Comparing version 4.17.0 to 5.0.0

dist/cjs/commands/addDelayedJob-6.lua

13

dist/cjs/classes/flow-producer.js

@@ -20,14 +20,7 @@ "use strict";

class FlowProducer extends events_1.EventEmitter {
constructor(opts = {}, Connection = redis_connection_1.RedisConnection) {
constructor(opts = { connection: {} }, Connection = redis_connection_1.RedisConnection) {
super();
this.opts = opts;
this.opts = Object.assign({ prefix: 'bull' }, opts);
if (!opts.connection) {
console.warn([
'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer',
'without providing explicitly a connection or connection options is deprecated. This behaviour will',
'be removed in the next major release',
].join(' '));
}
this.connection = new Connection(opts.connection, (0, utils_1.isRedisInstance)(opts === null || opts === void 0 ? void 0 : opts.connection), false, opts.skipVersionCheck);
this.connection = new Connection(opts.connection, (0, utils_1.isRedisInstance)(opts.connection), false, opts.skipVersionCheck);
this.connection.on('error', (error) => this.emit('error', error));

@@ -281,3 +274,3 @@ this.connection.on('close', () => {

toKey: (type) => queueKeys.toKey(node.queueName, type),
opts: { prefix },
opts: { prefix, connection: {} },
qualifiedName: queueKeys.getQueueQualifiedName(node.queueName),

@@ -284,0 +277,0 @@ closing: this.closing,

@@ -64,2 +64,7 @@ "use strict";

/**
* Number of attempts when job is moved to active.
* @defaultValue 0
*/
this.attemptsStarted = 0;
/**
* Number of attempts after the job has failed.

@@ -160,3 +165,4 @@ * @defaultValue 0

job.failedReason = json.failedReason;
job.attemptsMade = parseInt(json.attemptsMade || '0');
job.attemptsStarted = parseInt(json.ats || '0');
job.attemptsMade = parseInt(json.attemptsMade || json.atm || '0');
job.stacktrace = getTraces(json.stacktrace);

@@ -246,2 +252,3 @@ if (typeof json.returnvalue === 'string') {

attemptsMade: this.attemptsMade,
attemptsStarted: this.attemptsStarted,
finishedOn: this.finishedOn,

@@ -369,3 +376,4 @@ processedOn: this.processedOn,

const result = await this.scripts.moveToFinished(this.id, args);
this.finishedOn = args[14];
this.finishedOn = args[this.scripts.moveToFinishedKeys.length + 1];
this.attemptsMade += 1;
return result;

@@ -394,3 +402,3 @@ }

let finishedOn, delay;
if (this.attemptsMade < this.opts.attempts &&
if (this.attemptsMade + 1 < this.opts.attempts &&
!this.discarded &&

@@ -400,3 +408,3 @@ !(err instanceof unrecoverable_error_1.UnrecoverableError || err.name == 'UnrecoverableError')) {

// Check if backoff is needed
delay = await backoffs_1.Backoffs.calculate(this.opts.backoff, this.attemptsMade, err, this, opts.settings && opts.settings.backoffStrategy);
delay = await backoffs_1.Backoffs.calculate(this.opts.backoff, this.attemptsMade + 1, err, this, opts.settings && opts.settings.backoffStrategy);
if (delay === -1) {

@@ -423,3 +431,3 @@ moveToFailed = true;

multi.moveToFinished(args);
finishedOn = args[14];
finishedOn = args[this.scripts.moveToFinishedKeys.length + 1];
command = 'failed';

@@ -442,2 +450,3 @@ }

}
this.attemptsMade += 1;
}

@@ -679,5 +688,6 @@ /**

*/
moveToDelayed(timestamp, token) {
async moveToDelayed(timestamp, token) {
const delay = timestamp - Date.now();
return this.scripts.moveToDelayed(this.id, timestamp, delay > 0 ? delay : 0, token);
const movedToDelayed = await this.scripts.moveToDelayed(this.id, timestamp, delay > 0 ? delay : 0, token, { skipAttempt: true });
return movedToDelayed;
}

@@ -691,4 +701,5 @@ /**

*/
moveToWaitingChildren(token, opts = {}) {
return this.scripts.moveToWaitingChildren(this.id, token, opts);
async moveToWaitingChildren(token, opts = {}) {
const movedToWaitingChildren = await this.scripts.moveToWaitingChildren(this.id, token, opts);
return movedToWaitingChildren;
}

@@ -758,4 +769,3 @@ /**

if (`${parseInt(this.id, 10)}` === this.id) {
//TODO: throw an error in next breaking change
console.warn('Custom Ids should not be integers: https://github.com/taskforcesh/bullmq/pull/1569');
throw new Error('Custom Ids cannot be integers');
}

@@ -762,0 +772,0 @@ if (this.opts.priority) {

@@ -26,3 +26,3 @@ "use strict";

*/
constructor(name, opts = {}, Connection = redis_connection_1.RedisConnection) {
constructor(name, opts = { connection: {} }, Connection = redis_connection_1.RedisConnection) {
super();

@@ -36,10 +36,3 @@ this.name = name;

}
if (!opts.connection) {
console.warn([
'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer',
'without providing explicitly a connection or connection options is deprecated. This behaviour will',
'be removed in the next major release',
].join(' '));
}
this.connection = new Connection(opts.connection, (0, utils_1.isRedisInstance)(opts === null || opts === void 0 ? void 0 : opts.connection), opts.blockingConnection, opts.skipVersionCheck);
this.connection = new Connection(opts.connection, (0, utils_1.isRedisInstance)(opts.connection), opts.blockingConnection, opts.skipVersionCheck);
this.connection.on('error', (error) => this.emit('error', error));

@@ -46,0 +39,0 @@ this.connection.on('close', () => {

@@ -15,3 +15,5 @@ "use strict";

class QueueEvents extends queue_base_1.QueueBase {
constructor(name, _a = {}, Connection) {
constructor(name, _a = {
connection: {},
}, Connection) {
var { connection, autorun = true } = _a, opts = tslib_1.__rest(_a, ["connection", "autorun"]);

@@ -18,0 +20,0 @@ super(name, Object.assign(Object.assign({}, opts), { connection: (0, utils_1.isRedisInstance)(connection)

@@ -28,2 +28,3 @@ "use strict";

'pc',
'marker', // marker key
].forEach(key => {

@@ -30,0 +31,0 @@ keys[key] = this.toKey(name, key);

@@ -15,6 +15,3 @@ "use strict";

].join(' ');
const deprecationMessage = [
'BullMQ: DEPRECATION WARNING! Your redis options maxRetriesPerRequest must be null.',
'On the next versions having this settings will throw an exception',
].join(' ');
const deprecationMessage = 'BullMQ: Your redis options maxRetriesPerRequest must be null.';
class RedisConnection extends events_1.EventEmitter {

@@ -50,3 +47,3 @@ constructor(opts, shared = false, blocking = true, skipVersionCheck = false) {

}
this.checkBlockingOptions(deprecationMessage, this.opts);
this.checkBlockingOptions(deprecationMessage, this.opts, true);
}

@@ -67,5 +64,10 @@ this.skipVersionCheck =

}
checkBlockingOptions(msg, options) {
checkBlockingOptions(msg, options, throwError = false) {
if (this.blocking && options && options.maxRetriesPerRequest) {
console.error(msg);
if (throwError) {
throw new Error(msg);
}
else {
console.error(msg);
}
}

@@ -72,0 +74,0 @@ }

@@ -34,2 +34,3 @@ /**

undefined,
undefined,
];

@@ -51,4 +52,3 @@ }

const keys = [
queueKeys.wait,
queueKeys.paused,
queueKeys.marker,
queueKeys.meta,

@@ -66,4 +66,3 @@ queueKeys.id,

const keys = [
queueKeys.wait,
queueKeys.paused,
queueKeys.marker,
queueKeys.meta,

@@ -137,2 +136,3 @@ queueKeys.id,

queueKeys.events,
queueKeys.marker,
];

@@ -155,3 +155,3 @@ keys.push(pack(args), job.data, encodedOpts);

const keys = [src, dst, 'meta', 'prioritized'].map((name) => this.queue.toKey(name));
keys.push(this.queue.keys.events);
keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.marker);
return client.pause(keys.concat([pause ? 'paused' : 'resumed']));

@@ -218,2 +218,3 @@ }

keys[12] = metricsKey;
keys[13] = this.queue.keys.marker;
const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs);

@@ -226,3 +227,2 @@ const args = [

target,
JSON.stringify({ jobId: job.id, val: val }),
!fetchNext || this.queue.closing ? 0 : 1,

@@ -236,3 +236,2 @@ queueKeys[''],

attempts: job.opts.attempts,
attemptsMade: job.attemptsMade,
maxMetricsSize: ((_b = opts.metrics) === null || _b === void 0 ? void 0 : _b.maxDataPoints)

@@ -406,2 +405,3 @@ ? (_c = opts.metrics) === null || _c === void 0 ? void 0 : _c.maxDataPoints

this.queue.keys.pc,
this.queue.keys.marker,
];

@@ -416,3 +416,3 @@ return keys.concat([

// Note: We have an issue here with jobs using custom job ids
moveToDelayedArgs(jobId, timestamp, token, delay) {
moveToDelayedArgs(jobId, timestamp, token, delay, opts = {}) {
//

@@ -429,16 +429,12 @@ // Bake in the job id first 12 bits into the timestamp

}
const queueKeys = this.queue.keys;
const keys = [
'wait',
'active',
'prioritized',
'delayed',
jobId,
].map(name => {
return this.queue.toKey(name);
});
keys.push.apply(keys, [
this.queue.keys.events,
this.queue.keys.paused,
this.queue.keys.meta,
]);
queueKeys.marker,
queueKeys.active,
queueKeys.prioritized,
queueKeys.delayed,
this.queue.toKey(jobId),
queueKeys.events,
queueKeys.meta,
];
return keys.concat([

@@ -451,2 +447,3 @@ this.queue.keys[''],

delay,
opts.skipAttempt ? '1' : '0',
]);

@@ -471,5 +468,5 @@ }

}
async moveToDelayed(jobId, timestamp, delay, token = '0') {
async moveToDelayed(jobId, timestamp, delay, token = '0', opts = {}) {
const client = await this.queue.client;
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay);
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts);
const result = await client.moveToDelayed(args);

@@ -522,11 +519,13 @@ if (result < 0) {

const keys = [
'active',
'wait',
'paused',
jobId,
'meta',
].map(name => {
return this.queue.toKey(name);
});
keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.prioritized, this.queue.keys.pc);
this.queue.keys.active,
this.queue.keys.wait,
this.queue.keys.paused,
this.queue.toKey(jobId),
this.queue.keys.meta,
this.queue.keys.events,
this.queue.keys.delayed,
this.queue.keys.prioritized,
this.queue.keys.pc,
this.queue.keys.marker,
];
const pushCmd = (lifo ? 'R' : 'L') + 'PUSH';

@@ -600,3 +599,3 @@ return keys.concat([

}
async moveToActive(client, token, jobId) {
async moveToActive(client, token) {
const opts = this.queue.opts;

@@ -615,2 +614,3 @@ const queueKeys = this.queue.keys;

queueKeys.pc,
queueKeys.marker,
];

@@ -620,3 +620,2 @@ const args = [

Date.now(),
jobId || '',
pack({

@@ -641,2 +640,3 @@ token,

this.queue.keys.events,
this.queue.keys.marker,
];

@@ -643,0 +643,0 @@ const args = [this.queue.toKey(''), jobId];

@@ -32,3 +32,3 @@ "use strict";

}
constructor(name, processor, opts = {}, Connection) {
constructor(name, processor, opts, Connection) {
super(name, Object.assign(Object.assign({}, opts), { blockingConnection: true }), Connection);

@@ -42,2 +42,5 @@ this.abortDelayController = null;

this.running = false;
if (!opts || !opts.connection) {
throw new Error('Worker requires a connection');
}
this.opts = Object.assign({ drainDelay: 5, concurrency: 1, lockDuration: 30000, maxStalledCount: 1, stalledInterval: 30000, autorun: true, runRetryDelay: 15000 }, this.opts);

@@ -178,3 +181,3 @@ if (this.opts.stalledInterval <= 0) {

if (this.waiting && numTotal > 1) {
// We have a job waiting but we have others that we could start processing already
// We are waiting for jobs but we have others that we could start processing already
break;

@@ -200,5 +203,3 @@ }

job = await asyncFifoQueue.fetch();
} while (!job &&
asyncFifoQueue.numTotal() > 0 &&
asyncFifoQueue.numQueued() > 0);
} while (!job && asyncFifoQueue.numQueued() > 0);
if (job) {

@@ -239,6 +240,8 @@ const token = job.token;

if (this.drained && block && !this.limitUntil && !this.waiting) {
this.waiting = this.waitForJob(bclient);
this.waiting = this.waitForJob(bclient, this.blockUntil);
try {
const jobId = await this.waiting;
return this.moveToActive(client, token, jobId);
this.blockUntil = await this.waiting;
if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 10) {
return this.moveToActive(client, token);
}
}

@@ -273,20 +276,10 @@ catch (err) {

}
async moveToActive(client, token, jobId) {
// If we get the special delayed job ID, we pick the delay as the next
// block timeout.
if (jobId && jobId.startsWith('0:')) {
this.blockUntil = parseInt(jobId.split(':')[1]) || 0;
// Remove marker from active list.
await client.lrem(this.keys.active, 1, jobId);
if (this.blockUntil > 0) {
return;
}
}
const [jobData, id, limitUntil, delayUntil] = await this.scripts.moveToActive(client, token, jobId);
async moveToActive(client, token) {
const [jobData, id, limitUntil, delayUntil] = await this.scripts.moveToActive(client, token);
this.updateDelays(limitUntil, delayUntil);
return this.nextJobFromJobData(jobData, id, token);
}
async waitForJob(bclient) {
async waitForJob(bclient, blockUntil) {
if (this.paused) {
return;
return Infinity;
}

@@ -296,6 +289,3 @@ try {

if (!this.closing) {
let blockTimeout = Math.max(this.blockUntil
? (this.blockUntil - Date.now()) / 1000
: opts.drainDelay, 0);
let jobId;
let blockTimeout = Math.max(blockUntil ? (blockUntil - Date.now()) / 1000 : opts.drainDelay, 0);
// Blocking for less than 50ms is useless.

@@ -310,9 +300,13 @@ if (blockTimeout > 0.05) {

blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);
jobId = await bclient.brpoplpush(this.keys.wait, this.keys.active, blockTimeout);
// Markers should only be used for un-blocking, so we will handle them in this
// function only.
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
if (result) {
const [_key, member, score] = result;
if (member) {
return parseInt(score);
}
}
}
else {
jobId = await bclient.rpoplpush(this.keys.wait, this.keys.active);
}
this.blockUntil = 0;
return jobId;
return 0;
}

@@ -331,2 +325,3 @@ }

}
return Infinity;
}

@@ -333,0 +328,0 @@ /**

@@ -80,3 +80,3 @@ "use strict";

Add delay marker if needed.
]]
]]
-- Includes

@@ -96,43 +96,26 @@ --[[

end
local function addDelayMarkerIfNeeded(targetKey, delayedKey)
local waitLen = rcall("LLEN", targetKey)
if waitLen <= 1 then
local function addDelayMarkerIfNeeded(markerKey, delayedKey)
local nextTimestamp = getNextDelayedTimestamp(delayedKey)
if nextTimestamp ~= nil then
-- Check if there is already a marker with older timestamp
-- if there is, we need to replace it.
if waitLen == 1 then
local marker = rcall("LINDEX", targetKey, 0)
local oldTimestamp = tonumber(marker:sub(3))
if oldTimestamp and oldTimestamp > nextTimestamp then
rcall("LSET", targetKey, 0, "0:" .. nextTimestamp)
end
else
-- if there is no marker, then we need to add one
rcall("LPUSH", targetKey, "0:" .. nextTimestamp)
end
-- Replace the score of the marker with the newest known
-- next timestamp.
rcall("ZADD", markerKey, nextTimestamp, "0")
end
end
end
--[[
Function to add job considering priority.
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
]]
-- Includes
local function isQueuePaused(queueMetaKey)
return rcall("HEXISTS", queueMetaKey, "paused") == 1
end
--[[
Function priority marker to wait if needed
in order to wake up our workers and to respect priority
order as much as possible
Function to add job considering priority.
]]
local function addPriorityMarkerIfNeeded(waitKey)
local waitLen = rcall("LLEN", waitKey)
if waitLen == 0 then
rcall("LPUSH", waitKey, "0:0")
end
end
local function addJobWithPriority(waitKey, prioritizedKey, priority, paused, jobId, priorityCounterKey)
local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused)
local prioCounter = rcall("INCR", priorityCounterKey)
local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff)
rcall("ZADD", prioritizedKey, score, jobId)
if not paused then
addPriorityMarkerIfNeeded(waitKey)
if not isPaused then
rcall("ZADD", markerKey, 0, "0")
end

@@ -151,31 +134,40 @@ end

end
local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
local isParentActive = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
local parentWaitKey = parentQueueKey .. ":wait"
local parentTarget, paused = getTargetQueueList(parentQueueKey .. ":meta", parentWaitKey,
parentQueueKey .. ":paused")
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
local priority = tonumber(jobAttributes[1]) or 0
local delay = tonumber(jobAttributes[2]) or 0
if delay > 0 then
local delayedTimestamp = tonumber(timestamp) + delay
local score = delayedTimestamp * 0x1000
local parentDelayedKey = parentQueueKey .. ":delayed"
rcall("ZADD", parentDelayedKey, score, parentId)
rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId,
"delay", delayedTimestamp)
addDelayMarkerIfNeeded(parentTarget, parentDelayedKey)
else
if priority == 0 then
rcall("RPUSH", parentTarget, parentId)
else
addJobWithPriority(parentWaitKey, parentQueueKey .. ":prioritized", priority, paused,
parentId, parentQueueKey .. ":pc")
end
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId,
"prev", "waiting-children")
local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,
parentKey, parentId, timestamp)
local isParentActive = rcall("ZSCORE",
parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
local parentWaitKey = parentQueueKey .. ":wait"
local parentPausedKey = parentQueueKey .. ":paused"
local parentMetaKey = parentQueueKey .. ":meta"
local parentMarkerKey = parentQueueKey .. ":marker"
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
local priority = tonumber(jobAttributes[1]) or 0
local delay = tonumber(jobAttributes[2]) or 0
if delay > 0 then
local delayedTimestamp = tonumber(timestamp) + delay
local score = delayedTimestamp * 0x1000
local parentDelayedKey = parentQueueKey .. ":delayed"
rcall("ZADD", parentDelayedKey, score, parentId)
rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed",
"jobId", parentId, "delay", delayedTimestamp)
addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
else
if priority == 0 then
local parentTarget, _paused =
getTargetQueueList(parentMetaKey, parentWaitKey,
parentPausedKey)
rcall("RPUSH", parentTarget, parentId)
rcall("ZADD", parentMarkerKey, 0, "0")
else
local isPaused = isQueuePaused(parentMetaKey)
addJobWithPriority(parentMarkerKey,
parentQueueKey .. ":prioritized", priority,
parentId, parentQueueKey .. ":pc", isPaused)
end
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting",
"jobId", parentId, "prev", "waiting-children")
end
end
end
end

@@ -250,4 +242,2 @@ local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,

-- Check if this job is a child of another job, if so add it to the parents dependencies
-- TODO: Should not be possible to add a child job to a parent that is not in the "waiting-children" status.
-- fail in this case.
if parentDependenciesKey ~= nil then

@@ -254,0 +244,0 @@ rcall("SADD", parentDependenciesKey, jobIdKey)

@@ -16,2 +16,3 @@ "use strict";

if ARGV[i] == "wait" or ARGV[i] == "paused" then
-- Markers in waitlist DEPRECATED in v5: Remove in v6.
local marker = rcall("LINDEX", stateKey, -1)

@@ -18,0 +19,0 @@ if marker and string.sub(marker, 1, 2) == "0:" then

@@ -43,2 +43,3 @@ "use strict";

if ARGV[i] == "wait" or ARGV[i] == "paused" then
-- Markers in waitlist DEPRECATED in v5: Remove in v6.
local marker = rcall("LINDEX", stateKey, -1)

@@ -45,0 +46,0 @@ if marker and string.sub(marker, 1, 2) == "0:" then

"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
const tslib_1 = require("tslib");
tslib_1.__exportStar(require("./addDelayedJob-7"), exports);
tslib_1.__exportStar(require("./addDelayedJob-6"), exports);
tslib_1.__exportStar(require("./addParentJob-4"), exports);
tslib_1.__exportStar(require("./addPrioritizedJob-8"), exports);
tslib_1.__exportStar(require("./addStandardJob-6"), exports);
tslib_1.__exportStar(require("./addPrioritizedJob-7"), exports);
tslib_1.__exportStar(require("./addStandardJob-7"), exports);
tslib_1.__exportStar(require("./changeDelay-3"), exports);
tslib_1.__exportStar(require("./changePriority-5"), exports);
tslib_1.__exportStar(require("./changePriority-6"), exports);
tslib_1.__exportStar(require("./cleanJobsInSet-2"), exports);

@@ -22,10 +22,10 @@ tslib_1.__exportStar(require("./drain-4"), exports);

tslib_1.__exportStar(require("./moveStalledJobsToWait-8"), exports);
tslib_1.__exportStar(require("./moveToActive-10"), exports);
tslib_1.__exportStar(require("./moveToDelayed-8"), exports);
tslib_1.__exportStar(require("./moveToFinished-13"), exports);
tslib_1.__exportStar(require("./moveToActive-11"), exports);
tslib_1.__exportStar(require("./moveToDelayed-7"), exports);
tslib_1.__exportStar(require("./moveToFinished-14"), exports);
tslib_1.__exportStar(require("./moveToWaitingChildren-4"), exports);
tslib_1.__exportStar(require("./obliterate-2"), exports);
tslib_1.__exportStar(require("./paginate-1"), exports);
tslib_1.__exportStar(require("./pause-5"), exports);
tslib_1.__exportStar(require("./promote-7"), exports);
tslib_1.__exportStar(require("./pause-7"), exports);
tslib_1.__exportStar(require("./promote-8"), exports);
tslib_1.__exportStar(require("./releaseLock-1"), exports);

@@ -35,3 +35,3 @@ tslib_1.__exportStar(require("./removeJob-1"), exports);

tslib_1.__exportStar(require("./reprocessJob-6"), exports);
tslib_1.__exportStar(require("./retryJob-9"), exports);
tslib_1.__exportStar(require("./retryJob-10"), exports);
tslib_1.__exportStar(require("./saveStacktrace-1"), exports);

@@ -38,0 +38,0 @@ tslib_1.__exportStar(require("./updateData-1"), exports);

@@ -209,2 +209,3 @@ "use strict";

for i, jobId in ipairs(stalling) do
-- Markers in waitlist DEPRECATED in v5: Remove in v6.
if string.sub(jobId, 1, 2) == "0:" then

@@ -211,0 +212,0 @@ -- If the jobId is a delay marker ID we just remove it.

@@ -23,3 +23,4 @@ "use strict";

local rcall = redis.call
local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp, lockKey, token)
local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp,
lockKey, jobKey, token)
if token ~= "0" then

@@ -43,3 +44,4 @@ if rcall("GET", lockKey) == token then

if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1])
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4],
ARGV[1])
end

@@ -49,3 +51,4 @@ return 1

if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1])
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4],
ARGV[1])
end

@@ -52,0 +55,0 @@ return 1

@@ -17,14 +17,7 @@ import { EventEmitter } from 'events';

export class FlowProducer extends EventEmitter {
constructor(opts = {}, Connection = RedisConnection) {
constructor(opts = { connection: {} }, Connection = RedisConnection) {
super();
this.opts = opts;
this.opts = Object.assign({ prefix: 'bull' }, opts);
if (!opts.connection) {
console.warn([
'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer',
'without providing explicitly a connection or connection options is deprecated. This behaviour will',
'be removed in the next major release',
].join(' '));
}
this.connection = new Connection(opts.connection, isRedisInstance(opts === null || opts === void 0 ? void 0 : opts.connection), false, opts.skipVersionCheck);
this.connection = new Connection(opts.connection, isRedisInstance(opts.connection), false, opts.skipVersionCheck);
this.connection.on('error', (error) => this.emit('error', error));

@@ -278,3 +271,3 @@ this.connection.on('close', () => {

toKey: (type) => queueKeys.toKey(node.queueName, type),
opts: { prefix },
opts: { prefix, connection: {} },
qualifiedName: queueKeys.getQueueQualifiedName(node.queueName),

@@ -281,0 +274,0 @@ closing: this.closing,

@@ -62,2 +62,7 @@ import { ChainableCommander } from 'ioredis';

/**
* Number of attempts when job is moved to active.
* @defaultValue 0
*/
attemptsStarted: number;
/**
* Number of attempts after the job has failed.

@@ -64,0 +69,0 @@ * @defaultValue 0

@@ -61,2 +61,7 @@ import { __rest } from "tslib";

/**
* Number of attempts when job is moved to active.
* @defaultValue 0
*/
this.attemptsStarted = 0;
/**
* Number of attempts after the job has failed.

@@ -157,3 +162,4 @@ * @defaultValue 0

job.failedReason = json.failedReason;
job.attemptsMade = parseInt(json.attemptsMade || '0');
job.attemptsStarted = parseInt(json.ats || '0');
job.attemptsMade = parseInt(json.attemptsMade || json.atm || '0');
job.stacktrace = getTraces(json.stacktrace);

@@ -243,2 +249,3 @@ if (typeof json.returnvalue === 'string') {

attemptsMade: this.attemptsMade,
attemptsStarted: this.attemptsStarted,
finishedOn: this.finishedOn,

@@ -366,3 +373,4 @@ processedOn: this.processedOn,

const result = await this.scripts.moveToFinished(this.id, args);
this.finishedOn = args[14];
this.finishedOn = args[this.scripts.moveToFinishedKeys.length + 1];
this.attemptsMade += 1;
return result;

@@ -391,3 +399,3 @@ }

let finishedOn, delay;
if (this.attemptsMade < this.opts.attempts &&
if (this.attemptsMade + 1 < this.opts.attempts &&
!this.discarded &&

@@ -397,3 +405,3 @@ !(err instanceof UnrecoverableError || err.name == 'UnrecoverableError')) {

// Check if backoff is needed
delay = await Backoffs.calculate(this.opts.backoff, this.attemptsMade, err, this, opts.settings && opts.settings.backoffStrategy);
delay = await Backoffs.calculate(this.opts.backoff, this.attemptsMade + 1, err, this, opts.settings && opts.settings.backoffStrategy);
if (delay === -1) {

@@ -420,3 +428,3 @@ moveToFailed = true;

multi.moveToFinished(args);
finishedOn = args[14];
finishedOn = args[this.scripts.moveToFinishedKeys.length + 1];
command = 'failed';

@@ -439,2 +447,3 @@ }

}
this.attemptsMade += 1;
}

@@ -676,5 +685,6 @@ /**

*/
moveToDelayed(timestamp, token) {
async moveToDelayed(timestamp, token) {
const delay = timestamp - Date.now();
return this.scripts.moveToDelayed(this.id, timestamp, delay > 0 ? delay : 0, token);
const movedToDelayed = await this.scripts.moveToDelayed(this.id, timestamp, delay > 0 ? delay : 0, token, { skipAttempt: true });
return movedToDelayed;
}

@@ -688,4 +698,5 @@ /**

*/
moveToWaitingChildren(token, opts = {}) {
return this.scripts.moveToWaitingChildren(this.id, token, opts);
async moveToWaitingChildren(token, opts = {}) {
const movedToWaitingChildren = await this.scripts.moveToWaitingChildren(this.id, token, opts);
return movedToWaitingChildren;
}

@@ -755,4 +766,3 @@ /**

if (`${parseInt(this.id, 10)}` === this.id) {
//TODO: throw an error in next breaking change
console.warn('Custom Ids should not be integers: https://github.com/taskforcesh/bullmq/pull/1569');
throw new Error('Custom Ids cannot be integers');
}

@@ -759,0 +769,0 @@ if (this.opts.priority) {

@@ -23,3 +23,3 @@ import { EventEmitter } from 'events';

*/
constructor(name, opts = {}, Connection = RedisConnection) {
constructor(name, opts = { connection: {} }, Connection = RedisConnection) {
super();

@@ -33,10 +33,3 @@ this.name = name;

}
if (!opts.connection) {
console.warn([
'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer',
'without providing explicitly a connection or connection options is deprecated. This behaviour will',
'be removed in the next major release',
].join(' '));
}
this.connection = new Connection(opts.connection, isRedisInstance(opts === null || opts === void 0 ? void 0 : opts.connection), opts.blockingConnection, opts.skipVersionCheck);
this.connection = new Connection(opts.connection, isRedisInstance(opts.connection), opts.blockingConnection, opts.skipVersionCheck);
this.connection.on('error', (error) => this.emit('error', error));

@@ -43,0 +36,0 @@ this.connection.on('close', () => {

@@ -12,3 +12,5 @@ import { __rest } from "tslib";

export class QueueEvents extends QueueBase {
constructor(name, _a = {}, Connection) {
constructor(name, _a = {
connection: {},
}, Connection) {
var { connection, autorun = true } = _a, opts = __rest(_a, ["connection", "autorun"]);

@@ -15,0 +17,0 @@ super(name, Object.assign(Object.assign({}, opts), { connection: isRedisInstance(connection)

@@ -25,2 +25,3 @@ export class QueueKeys {

'pc',
'marker', // marker key
].forEach(key => {

@@ -27,0 +28,0 @@ keys[key] = this.toKey(name, key);

@@ -12,6 +12,3 @@ import { EventEmitter } from 'events';

].join(' ');
const deprecationMessage = [
'BullMQ: DEPRECATION WARNING! Your redis options maxRetriesPerRequest must be null.',
'On the next versions having this settings will throw an exception',
].join(' ');
const deprecationMessage = 'BullMQ: Your redis options maxRetriesPerRequest must be null.';
export class RedisConnection extends EventEmitter {

@@ -47,3 +44,3 @@ constructor(opts, shared = false, blocking = true, skipVersionCheck = false) {

}
this.checkBlockingOptions(deprecationMessage, this.opts);
this.checkBlockingOptions(deprecationMessage, this.opts, true);
}

@@ -64,5 +61,10 @@ this.skipVersionCheck =

}
checkBlockingOptions(msg, options) {
checkBlockingOptions(msg, options, throwError = false) {
if (this.blocking && options && options.maxRetriesPerRequest) {
console.error(msg);
if (throwError) {
throw new Error(msg);
}
else {
console.error(msg);
}
}

@@ -69,0 +71,0 @@ }

@@ -5,3 +5,3 @@ /**

/// <reference types="node" />
import { JobJson, JobJsonRaw, MinimalJob, MoveToWaitingChildrenOpts, ParentOpts, RedisClient, KeepJobs } from '../interfaces';
import { JobJson, JobJsonRaw, MinimalJob, MoveToWaitingChildrenOpts, ParentOpts, RedisClient, KeepJobs, MoveToDelayedOpts } from '../interfaces';
import { JobState, JobType, FinishedStatus, FinishedPropValAttribute, MinimalQueue, RedisJobOptions } from '../types';

@@ -44,6 +44,6 @@ import { ChainableCommander } from 'ioredis';

private changePriorityArgs;
moveToDelayedArgs(jobId: string, timestamp: number, token: string, delay: number): (string | number)[];
moveToDelayedArgs(jobId: string, timestamp: number, token: string, delay: number, opts?: MoveToDelayedOpts): (string | number)[];
saveStacktraceArgs(jobId: string, stacktrace: string, failedReason: string): string[];
moveToWaitingChildrenArgs(jobId: string, token: string, opts?: MoveToWaitingChildrenOpts): string[];
moveToDelayed(jobId: string, timestamp: number, delay: number, token?: string): Promise<void>;
moveToDelayed(jobId: string, timestamp: number, delay: number, token?: string, opts?: MoveToDelayedOpts): Promise<void>;
/**

@@ -85,3 +85,3 @@ * Move parent job to waiting-children state.

reprocessJob<T = any, R = any, N extends string = string>(job: MinimalJob<T, R, N>, state: 'failed' | 'completed'): Promise<void>;
moveToActive(client: RedisClient, token: string, jobId?: string): Promise<any[]>;
moveToActive(client: RedisClient, token: string): Promise<any[]>;
promote(jobId: string): Promise<void>;

@@ -88,0 +88,0 @@ /**

@@ -32,2 +32,3 @@ /**

undefined,
undefined,
];

@@ -49,4 +50,3 @@ }

const keys = [
queueKeys.wait,
queueKeys.paused,
queueKeys.marker,
queueKeys.meta,

@@ -64,4 +64,3 @@ queueKeys.id,

const keys = [
queueKeys.wait,
queueKeys.paused,
queueKeys.marker,
queueKeys.meta,

@@ -135,2 +134,3 @@ queueKeys.id,

queueKeys.events,
queueKeys.marker,
];

@@ -153,3 +153,3 @@ keys.push(pack(args), job.data, encodedOpts);

const keys = [src, dst, 'meta', 'prioritized'].map((name) => this.queue.toKey(name));
keys.push(this.queue.keys.events);
keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.marker);
return client.pause(keys.concat([pause ? 'paused' : 'resumed']));

@@ -216,2 +216,3 @@ }

keys[12] = metricsKey;
keys[13] = this.queue.keys.marker;
const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs);

@@ -224,3 +225,2 @@ const args = [

target,
JSON.stringify({ jobId: job.id, val: val }),
!fetchNext || this.queue.closing ? 0 : 1,

@@ -234,3 +234,2 @@ queueKeys[''],

attempts: job.opts.attempts,
attemptsMade: job.attemptsMade,
maxMetricsSize: ((_b = opts.metrics) === null || _b === void 0 ? void 0 : _b.maxDataPoints)

@@ -404,2 +403,3 @@ ? (_c = opts.metrics) === null || _c === void 0 ? void 0 : _c.maxDataPoints

this.queue.keys.pc,
this.queue.keys.marker,
];

@@ -414,3 +414,3 @@ return keys.concat([

// Note: We have an issue here with jobs using custom job ids
moveToDelayedArgs(jobId, timestamp, token, delay) {
moveToDelayedArgs(jobId, timestamp, token, delay, opts = {}) {
//

@@ -427,16 +427,12 @@ // Bake in the job id first 12 bits into the timestamp

}
const queueKeys = this.queue.keys;
const keys = [
'wait',
'active',
'prioritized',
'delayed',
jobId,
].map(name => {
return this.queue.toKey(name);
});
keys.push.apply(keys, [
this.queue.keys.events,
this.queue.keys.paused,
this.queue.keys.meta,
]);
queueKeys.marker,
queueKeys.active,
queueKeys.prioritized,
queueKeys.delayed,
this.queue.toKey(jobId),
queueKeys.events,
queueKeys.meta,
];
return keys.concat([

@@ -449,2 +445,3 @@ this.queue.keys[''],

delay,
opts.skipAttempt ? '1' : '0',
]);

@@ -469,5 +466,5 @@ }

}
async moveToDelayed(jobId, timestamp, delay, token = '0') {
async moveToDelayed(jobId, timestamp, delay, token = '0', opts = {}) {
const client = await this.queue.client;
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay);
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts);
const result = await client.moveToDelayed(args);

@@ -520,11 +517,13 @@ if (result < 0) {

const keys = [
'active',
'wait',
'paused',
jobId,
'meta',
].map(name => {
return this.queue.toKey(name);
});
keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.prioritized, this.queue.keys.pc);
this.queue.keys.active,
this.queue.keys.wait,
this.queue.keys.paused,
this.queue.toKey(jobId),
this.queue.keys.meta,
this.queue.keys.events,
this.queue.keys.delayed,
this.queue.keys.prioritized,
this.queue.keys.pc,
this.queue.keys.marker,
];
const pushCmd = (lifo ? 'R' : 'L') + 'PUSH';

@@ -598,3 +597,3 @@ return keys.concat([

}
async moveToActive(client, token, jobId) {
async moveToActive(client, token) {
const opts = this.queue.opts;

@@ -613,2 +612,3 @@ const queueKeys = this.queue.keys;

queueKeys.pc,
queueKeys.marker,
];

@@ -618,3 +618,2 @@ const args = [

Date.now(),
jobId || '',
pack({

@@ -639,2 +638,3 @@ token,

this.queue.keys.events,
this.queue.keys.marker,
];

@@ -641,0 +641,0 @@ const args = [this.queue.toKey(''), jobId];

@@ -147,3 +147,3 @@ /// <reference types="node" />

rateLimit(expireTimeMs: number): Promise<void>;
protected moveToActive(client: RedisClient, token: string, jobId?: string): Promise<Job<DataType, ResultType, NameType>>;
protected moveToActive(client: RedisClient, token: string): Promise<Job<DataType, ResultType, NameType>>;
private waitForJob;

@@ -150,0 +150,0 @@ /**

@@ -29,3 +29,3 @@ import * as fs from 'fs';

}
constructor(name, processor, opts = {}, Connection) {
constructor(name, processor, opts, Connection) {
super(name, Object.assign(Object.assign({}, opts), { blockingConnection: true }), Connection);

@@ -39,2 +39,5 @@ this.abortDelayController = null;

this.running = false;
if (!opts || !opts.connection) {
throw new Error('Worker requires a connection');
}
this.opts = Object.assign({ drainDelay: 5, concurrency: 1, lockDuration: 30000, maxStalledCount: 1, stalledInterval: 30000, autorun: true, runRetryDelay: 15000 }, this.opts);

@@ -175,3 +178,3 @@ if (this.opts.stalledInterval <= 0) {

if (this.waiting && numTotal > 1) {
// We have a job waiting but we have others that we could start processing already
// We are waiting for jobs but we have others that we could start processing already
break;

@@ -197,5 +200,3 @@ }

job = await asyncFifoQueue.fetch();
} while (!job &&
asyncFifoQueue.numTotal() > 0 &&
asyncFifoQueue.numQueued() > 0);
} while (!job && asyncFifoQueue.numQueued() > 0);
if (job) {

@@ -236,6 +237,8 @@ const token = job.token;

if (this.drained && block && !this.limitUntil && !this.waiting) {
this.waiting = this.waitForJob(bclient);
this.waiting = this.waitForJob(bclient, this.blockUntil);
try {
const jobId = await this.waiting;
return this.moveToActive(client, token, jobId);
this.blockUntil = await this.waiting;
if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 10) {
return this.moveToActive(client, token);
}
}

@@ -270,20 +273,10 @@ catch (err) {

}
async moveToActive(client, token, jobId) {
// If we get the special delayed job ID, we pick the delay as the next
// block timeout.
if (jobId && jobId.startsWith('0:')) {
this.blockUntil = parseInt(jobId.split(':')[1]) || 0;
// Remove marker from active list.
await client.lrem(this.keys.active, 1, jobId);
if (this.blockUntil > 0) {
return;
}
}
const [jobData, id, limitUntil, delayUntil] = await this.scripts.moveToActive(client, token, jobId);
async moveToActive(client, token) {
const [jobData, id, limitUntil, delayUntil] = await this.scripts.moveToActive(client, token);
this.updateDelays(limitUntil, delayUntil);
return this.nextJobFromJobData(jobData, id, token);
}
async waitForJob(bclient) {
async waitForJob(bclient, blockUntil) {
if (this.paused) {
return;
return Infinity;
}

@@ -293,6 +286,3 @@ try {

if (!this.closing) {
let blockTimeout = Math.max(this.blockUntil
? (this.blockUntil - Date.now()) / 1000
: opts.drainDelay, 0);
let jobId;
let blockTimeout = Math.max(blockUntil ? (blockUntil - Date.now()) / 1000 : opts.drainDelay, 0);
// Blocking for less than 50ms is useless.

@@ -307,9 +297,13 @@ if (blockTimeout > 0.05) {

blockTimeout = Math.min(blockTimeout, maximumBlockTimeout);
jobId = await bclient.brpoplpush(this.keys.wait, this.keys.active, blockTimeout);
// Markers should only be used for un-blocking, so we will handle them in this
// function only.
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout);
if (result) {
const [_key, member, score] = result;
if (member) {
return parseInt(score);
}
}
}
else {
jobId = await bclient.rpoplpush(this.keys.wait, this.keys.active);
}
this.blockUntil = 0;
return jobId;
return 0;
}

@@ -328,2 +322,3 @@ }

}
return Infinity;
}

@@ -330,0 +325,0 @@ /**

@@ -10,2 +10,3 @@ import { RedisJobOptions } from '../types';

attemptsMade: number;
attemptsStarted: number;
finishedOn?: number;

@@ -28,3 +29,3 @@ processedOn?: number;

progress: string;
attemptsMade: string;
attemptsMade?: string;
finishedOn?: string;

@@ -39,2 +40,4 @@ processedOn?: string;

rjk?: string;
atm?: string;
ats?: string;
}

@@ -5,2 +5,5 @@ import { JobsOptions, JobJsonSandbox } from '../types';

export type BulkJobOptions = Omit<JobsOptions, 'repeat'>;
export interface MoveToDelayedOpts {
skipAttempt?: boolean;
}
export interface MoveToWaitingChildrenOpts {

@@ -7,0 +10,0 @@ child?: {

@@ -15,3 +15,3 @@ import { AdvancedRepeatOptions } from './advanced-options';

*/
connection?: ConnectionOptions;
connection: ConnectionOptions;
/**

@@ -18,0 +18,0 @@ * Denotes commands should retry indefinitely.

@@ -77,3 +77,3 @@ const content = `--[[

Add delay marker if needed.
]]
]]
-- Includes

@@ -93,43 +93,26 @@ --[[

end
local function addDelayMarkerIfNeeded(targetKey, delayedKey)
local waitLen = rcall("LLEN", targetKey)
if waitLen <= 1 then
local function addDelayMarkerIfNeeded(markerKey, delayedKey)
local nextTimestamp = getNextDelayedTimestamp(delayedKey)
if nextTimestamp ~= nil then
-- Check if there is already a marker with older timestamp
-- if there is, we need to replace it.
if waitLen == 1 then
local marker = rcall("LINDEX", targetKey, 0)
local oldTimestamp = tonumber(marker:sub(3))
if oldTimestamp and oldTimestamp > nextTimestamp then
rcall("LSET", targetKey, 0, "0:" .. nextTimestamp)
end
else
-- if there is no marker, then we need to add one
rcall("LPUSH", targetKey, "0:" .. nextTimestamp)
end
-- Replace the score of the marker with the newest known
-- next timestamp.
rcall("ZADD", markerKey, nextTimestamp, "0")
end
end
end
--[[
Function to add job considering priority.
Function to check for the meta.paused key to decide if we are paused or not
(since an empty list and !EXISTS are not really the same).
]]
-- Includes
local function isQueuePaused(queueMetaKey)
return rcall("HEXISTS", queueMetaKey, "paused") == 1
end
--[[
Function priority marker to wait if needed
in order to wake up our workers and to respect priority
order as much as possible
Function to add job considering priority.
]]
local function addPriorityMarkerIfNeeded(waitKey)
local waitLen = rcall("LLEN", waitKey)
if waitLen == 0 then
rcall("LPUSH", waitKey, "0:0")
end
end
local function addJobWithPriority(waitKey, prioritizedKey, priority, paused, jobId, priorityCounterKey)
local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused)
local prioCounter = rcall("INCR", priorityCounterKey)
local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff)
rcall("ZADD", prioritizedKey, score, jobId)
if not paused then
addPriorityMarkerIfNeeded(waitKey)
if not isPaused then
rcall("ZADD", markerKey, 0, "0")
end

@@ -148,31 +131,40 @@ end

end
local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp)
local isParentActive = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
local parentWaitKey = parentQueueKey .. ":wait"
local parentTarget, paused = getTargetQueueList(parentQueueKey .. ":meta", parentWaitKey,
parentQueueKey .. ":paused")
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
local priority = tonumber(jobAttributes[1]) or 0
local delay = tonumber(jobAttributes[2]) or 0
if delay > 0 then
local delayedTimestamp = tonumber(timestamp) + delay
local score = delayedTimestamp * 0x1000
local parentDelayedKey = parentQueueKey .. ":delayed"
rcall("ZADD", parentDelayedKey, score, parentId)
rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId,
"delay", delayedTimestamp)
addDelayMarkerIfNeeded(parentTarget, parentDelayedKey)
else
if priority == 0 then
rcall("RPUSH", parentTarget, parentId)
else
addJobWithPriority(parentWaitKey, parentQueueKey .. ":prioritized", priority, paused,
parentId, parentQueueKey .. ":pc")
end
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId,
"prev", "waiting-children")
local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey,
parentKey, parentId, timestamp)
local isParentActive = rcall("ZSCORE",
parentQueueKey .. ":waiting-children", parentId)
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId)
local parentWaitKey = parentQueueKey .. ":wait"
local parentPausedKey = parentQueueKey .. ":paused"
local parentMetaKey = parentQueueKey .. ":meta"
local parentMarkerKey = parentQueueKey .. ":marker"
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay")
local priority = tonumber(jobAttributes[1]) or 0
local delay = tonumber(jobAttributes[2]) or 0
if delay > 0 then
local delayedTimestamp = tonumber(timestamp) + delay
local score = delayedTimestamp * 0x1000
local parentDelayedKey = parentQueueKey .. ":delayed"
rcall("ZADD", parentDelayedKey, score, parentId)
rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed",
"jobId", parentId, "delay", delayedTimestamp)
addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey)
else
if priority == 0 then
local parentTarget, _paused =
getTargetQueueList(parentMetaKey, parentWaitKey,
parentPausedKey)
rcall("RPUSH", parentTarget, parentId)
rcall("ZADD", parentMarkerKey, 0, "0")
else
local isPaused = isQueuePaused(parentMetaKey)
addJobWithPriority(parentMarkerKey,
parentQueueKey .. ":prioritized", priority,
parentId, parentQueueKey .. ":pc", isPaused)
end
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting",
"jobId", parentId, "prev", "waiting-children")
end
end
end
end

@@ -247,4 +239,2 @@ local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey,

-- Check if this job is a child of another job, if so add it to the parents dependencies
-- TODO: Should not be possible to add a child job to a parent that is not in the "waiting-children" status.
-- fail in this case.
if parentDependenciesKey ~= nil then

@@ -251,0 +241,0 @@ rcall("SADD", parentDependenciesKey, jobIdKey)

@@ -13,2 +13,3 @@ const content = `--[[

if ARGV[i] == "wait" or ARGV[i] == "paused" then
-- Markers in waitlist DEPRECATED in v5: Remove in v6.
local marker = rcall("LINDEX", stateKey, -1)

@@ -15,0 +16,0 @@ if marker and string.sub(marker, 1, 2) == "0:" then

@@ -40,2 +40,3 @@ const content = `--[[

if ARGV[i] == "wait" or ARGV[i] == "paused" then
-- Markers in waitlist DEPRECATED in v5: Remove in v6.
local marker = rcall("LINDEX", stateKey, -1)

@@ -42,0 +43,0 @@ if marker and string.sub(marker, 1, 2) == "0:" then

@@ -1,7 +0,7 @@

export * from './addDelayedJob-7';
export * from './addDelayedJob-6';
export * from './addParentJob-4';
export * from './addPrioritizedJob-8';
export * from './addStandardJob-6';
export * from './addPrioritizedJob-7';
export * from './addStandardJob-7';
export * from './changeDelay-3';
export * from './changePriority-5';
export * from './changePriority-6';
export * from './cleanJobsInSet-2';

@@ -19,10 +19,10 @@ export * from './drain-4';

export * from './moveStalledJobsToWait-8';
export * from './moveToActive-10';
export * from './moveToDelayed-8';
export * from './moveToFinished-13';
export * from './moveToActive-11';
export * from './moveToDelayed-7';
export * from './moveToFinished-14';
export * from './moveToWaitingChildren-4';
export * from './obliterate-2';
export * from './paginate-1';
export * from './pause-5';
export * from './promote-7';
export * from './pause-7';
export * from './promote-8';
export * from './releaseLock-1';

@@ -32,5 +32,5 @@ export * from './removeJob-1';

export * from './reprocessJob-6';
export * from './retryJob-9';
export * from './retryJob-10';
export * from './saveStacktrace-1';
export * from './updateData-1';
export * from './updateProgress-3';

@@ -1,7 +0,7 @@

export * from './addDelayedJob-7';
export * from './addDelayedJob-6';
export * from './addParentJob-4';
export * from './addPrioritizedJob-8';
export * from './addStandardJob-6';
export * from './addPrioritizedJob-7';
export * from './addStandardJob-7';
export * from './changeDelay-3';
export * from './changePriority-5';
export * from './changePriority-6';
export * from './cleanJobsInSet-2';

@@ -19,10 +19,10 @@ export * from './drain-4';

export * from './moveStalledJobsToWait-8';
export * from './moveToActive-10';
export * from './moveToDelayed-8';
export * from './moveToFinished-13';
export * from './moveToActive-11';
export * from './moveToDelayed-7';
export * from './moveToFinished-14';
export * from './moveToWaitingChildren-4';
export * from './obliterate-2';
export * from './paginate-1';
export * from './pause-5';
export * from './promote-7';
export * from './pause-7';
export * from './promote-8';
export * from './releaseLock-1';

@@ -32,3 +32,3 @@ export * from './removeJob-1';

export * from './reprocessJob-6';
export * from './retryJob-9';
export * from './retryJob-10';
export * from './saveStacktrace-1';

@@ -35,0 +35,0 @@ export * from './updateData-1';

@@ -206,2 +206,3 @@ const content = `--[[

for i, jobId in ipairs(stalling) do
-- Markers in waitlist DEPRECATED in v5: Remove in v6.
if string.sub(jobId, 1, 2) == "0:" then

@@ -208,0 +209,0 @@ -- If the jobId is a delay marker ID we just remove it.

@@ -20,3 +20,4 @@ const content = `--[[

local rcall = redis.call
local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp, lockKey, token)
local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp,
lockKey, jobKey, token)
if token ~= "0" then

@@ -40,3 +41,4 @@ if rcall("GET", lockKey) == token then

if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1])
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4],
ARGV[1])
end

@@ -46,3 +48,4 @@ return 1

if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1])
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4],
ARGV[1])
end

@@ -49,0 +52,0 @@ return 1

{
"name": "bullmq",
"version": "4.17.0",
"version": "5.0.0",
"description": "Queue for messages and jobs based on Redis",

@@ -5,0 +5,0 @@ "homepage": "https://bullmq.io/",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc