bullmq
Advanced tools
Comparing version 4.17.0 to 5.0.0
@@ -20,14 +20,7 @@ "use strict"; | ||
class FlowProducer extends events_1.EventEmitter { | ||
constructor(opts = {}, Connection = redis_connection_1.RedisConnection) { | ||
constructor(opts = { connection: {} }, Connection = redis_connection_1.RedisConnection) { | ||
super(); | ||
this.opts = opts; | ||
this.opts = Object.assign({ prefix: 'bull' }, opts); | ||
if (!opts.connection) { | ||
console.warn([ | ||
'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer', | ||
'without providing explicitly a connection or connection options is deprecated. This behaviour will', | ||
'be removed in the next major release', | ||
].join(' ')); | ||
} | ||
this.connection = new Connection(opts.connection, (0, utils_1.isRedisInstance)(opts === null || opts === void 0 ? void 0 : opts.connection), false, opts.skipVersionCheck); | ||
this.connection = new Connection(opts.connection, (0, utils_1.isRedisInstance)(opts.connection), false, opts.skipVersionCheck); | ||
this.connection.on('error', (error) => this.emit('error', error)); | ||
@@ -281,3 +274,3 @@ this.connection.on('close', () => { | ||
toKey: (type) => queueKeys.toKey(node.queueName, type), | ||
opts: { prefix }, | ||
opts: { prefix, connection: {} }, | ||
qualifiedName: queueKeys.getQueueQualifiedName(node.queueName), | ||
@@ -284,0 +277,0 @@ closing: this.closing, |
@@ -64,2 +64,7 @@ "use strict"; | ||
/** | ||
* Number of attempts when job is moved to active. | ||
* @defaultValue 0 | ||
*/ | ||
this.attemptsStarted = 0; | ||
/** | ||
* Number of attempts after the job has failed. | ||
@@ -160,3 +165,4 @@ * @defaultValue 0 | ||
job.failedReason = json.failedReason; | ||
job.attemptsMade = parseInt(json.attemptsMade || '0'); | ||
job.attemptsStarted = parseInt(json.ats || '0'); | ||
job.attemptsMade = parseInt(json.attemptsMade || json.atm || '0'); | ||
job.stacktrace = getTraces(json.stacktrace); | ||
@@ -246,2 +252,3 @@ if (typeof json.returnvalue === 'string') { | ||
attemptsMade: this.attemptsMade, | ||
attemptsStarted: this.attemptsStarted, | ||
finishedOn: this.finishedOn, | ||
@@ -369,3 +376,4 @@ processedOn: this.processedOn, | ||
const result = await this.scripts.moveToFinished(this.id, args); | ||
this.finishedOn = args[14]; | ||
this.finishedOn = args[this.scripts.moveToFinishedKeys.length + 1]; | ||
this.attemptsMade += 1; | ||
return result; | ||
@@ -394,3 +402,3 @@ } | ||
let finishedOn, delay; | ||
if (this.attemptsMade < this.opts.attempts && | ||
if (this.attemptsMade + 1 < this.opts.attempts && | ||
!this.discarded && | ||
@@ -400,3 +408,3 @@ !(err instanceof unrecoverable_error_1.UnrecoverableError || err.name == 'UnrecoverableError')) { | ||
// Check if backoff is needed | ||
delay = await backoffs_1.Backoffs.calculate(this.opts.backoff, this.attemptsMade, err, this, opts.settings && opts.settings.backoffStrategy); | ||
delay = await backoffs_1.Backoffs.calculate(this.opts.backoff, this.attemptsMade + 1, err, this, opts.settings && opts.settings.backoffStrategy); | ||
if (delay === -1) { | ||
@@ -423,3 +431,3 @@ moveToFailed = true; | ||
multi.moveToFinished(args); | ||
finishedOn = args[14]; | ||
finishedOn = args[this.scripts.moveToFinishedKeys.length + 1]; | ||
command = 'failed'; | ||
@@ -442,2 +450,3 @@ } | ||
} | ||
this.attemptsMade += 1; | ||
} | ||
@@ -679,5 +688,6 @@ /** | ||
*/ | ||
moveToDelayed(timestamp, token) { | ||
async moveToDelayed(timestamp, token) { | ||
const delay = timestamp - Date.now(); | ||
return this.scripts.moveToDelayed(this.id, timestamp, delay > 0 ? delay : 0, token); | ||
const movedToDelayed = await this.scripts.moveToDelayed(this.id, timestamp, delay > 0 ? delay : 0, token, { skipAttempt: true }); | ||
return movedToDelayed; | ||
} | ||
@@ -691,4 +701,5 @@ /** | ||
*/ | ||
moveToWaitingChildren(token, opts = {}) { | ||
return this.scripts.moveToWaitingChildren(this.id, token, opts); | ||
async moveToWaitingChildren(token, opts = {}) { | ||
const movedToWaitingChildren = await this.scripts.moveToWaitingChildren(this.id, token, opts); | ||
return movedToWaitingChildren; | ||
} | ||
@@ -758,4 +769,3 @@ /** | ||
if (`${parseInt(this.id, 10)}` === this.id) { | ||
//TODO: throw an error in next breaking change | ||
console.warn('Custom Ids should not be integers: https://github.com/taskforcesh/bullmq/pull/1569'); | ||
throw new Error('Custom Ids cannot be integers'); | ||
} | ||
@@ -762,0 +772,0 @@ if (this.opts.priority) { |
@@ -26,3 +26,3 @@ "use strict"; | ||
*/ | ||
constructor(name, opts = {}, Connection = redis_connection_1.RedisConnection) { | ||
constructor(name, opts = { connection: {} }, Connection = redis_connection_1.RedisConnection) { | ||
super(); | ||
@@ -36,10 +36,3 @@ this.name = name; | ||
} | ||
if (!opts.connection) { | ||
console.warn([ | ||
'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer', | ||
'without providing explicitly a connection or connection options is deprecated. This behaviour will', | ||
'be removed in the next major release', | ||
].join(' ')); | ||
} | ||
this.connection = new Connection(opts.connection, (0, utils_1.isRedisInstance)(opts === null || opts === void 0 ? void 0 : opts.connection), opts.blockingConnection, opts.skipVersionCheck); | ||
this.connection = new Connection(opts.connection, (0, utils_1.isRedisInstance)(opts.connection), opts.blockingConnection, opts.skipVersionCheck); | ||
this.connection.on('error', (error) => this.emit('error', error)); | ||
@@ -46,0 +39,0 @@ this.connection.on('close', () => { |
@@ -15,3 +15,5 @@ "use strict"; | ||
class QueueEvents extends queue_base_1.QueueBase { | ||
constructor(name, _a = {}, Connection) { | ||
constructor(name, _a = { | ||
connection: {}, | ||
}, Connection) { | ||
var { connection, autorun = true } = _a, opts = tslib_1.__rest(_a, ["connection", "autorun"]); | ||
@@ -18,0 +20,0 @@ super(name, Object.assign(Object.assign({}, opts), { connection: (0, utils_1.isRedisInstance)(connection) |
@@ -28,2 +28,3 @@ "use strict"; | ||
'pc', | ||
'marker', // marker key | ||
].forEach(key => { | ||
@@ -30,0 +31,0 @@ keys[key] = this.toKey(name, key); |
@@ -15,6 +15,3 @@ "use strict"; | ||
].join(' '); | ||
const deprecationMessage = [ | ||
'BullMQ: DEPRECATION WARNING! Your redis options maxRetriesPerRequest must be null.', | ||
'On the next versions having this settings will throw an exception', | ||
].join(' '); | ||
const deprecationMessage = 'BullMQ: Your redis options maxRetriesPerRequest must be null.'; | ||
class RedisConnection extends events_1.EventEmitter { | ||
@@ -50,3 +47,3 @@ constructor(opts, shared = false, blocking = true, skipVersionCheck = false) { | ||
} | ||
this.checkBlockingOptions(deprecationMessage, this.opts); | ||
this.checkBlockingOptions(deprecationMessage, this.opts, true); | ||
} | ||
@@ -67,5 +64,10 @@ this.skipVersionCheck = | ||
} | ||
checkBlockingOptions(msg, options) { | ||
checkBlockingOptions(msg, options, throwError = false) { | ||
if (this.blocking && options && options.maxRetriesPerRequest) { | ||
console.error(msg); | ||
if (throwError) { | ||
throw new Error(msg); | ||
} | ||
else { | ||
console.error(msg); | ||
} | ||
} | ||
@@ -72,0 +74,0 @@ } |
@@ -34,2 +34,3 @@ /** | ||
undefined, | ||
undefined, | ||
]; | ||
@@ -51,4 +52,3 @@ } | ||
const keys = [ | ||
queueKeys.wait, | ||
queueKeys.paused, | ||
queueKeys.marker, | ||
queueKeys.meta, | ||
@@ -66,4 +66,3 @@ queueKeys.id, | ||
const keys = [ | ||
queueKeys.wait, | ||
queueKeys.paused, | ||
queueKeys.marker, | ||
queueKeys.meta, | ||
@@ -137,2 +136,3 @@ queueKeys.id, | ||
queueKeys.events, | ||
queueKeys.marker, | ||
]; | ||
@@ -155,3 +155,3 @@ keys.push(pack(args), job.data, encodedOpts); | ||
const keys = [src, dst, 'meta', 'prioritized'].map((name) => this.queue.toKey(name)); | ||
keys.push(this.queue.keys.events); | ||
keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.marker); | ||
return client.pause(keys.concat([pause ? 'paused' : 'resumed'])); | ||
@@ -218,2 +218,3 @@ } | ||
keys[12] = metricsKey; | ||
keys[13] = this.queue.keys.marker; | ||
const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs); | ||
@@ -226,3 +227,2 @@ const args = [ | ||
target, | ||
JSON.stringify({ jobId: job.id, val: val }), | ||
!fetchNext || this.queue.closing ? 0 : 1, | ||
@@ -236,3 +236,2 @@ queueKeys[''], | ||
attempts: job.opts.attempts, | ||
attemptsMade: job.attemptsMade, | ||
maxMetricsSize: ((_b = opts.metrics) === null || _b === void 0 ? void 0 : _b.maxDataPoints) | ||
@@ -406,2 +405,3 @@ ? (_c = opts.metrics) === null || _c === void 0 ? void 0 : _c.maxDataPoints | ||
this.queue.keys.pc, | ||
this.queue.keys.marker, | ||
]; | ||
@@ -416,3 +416,3 @@ return keys.concat([ | ||
// Note: We have an issue here with jobs using custom job ids | ||
moveToDelayedArgs(jobId, timestamp, token, delay) { | ||
moveToDelayedArgs(jobId, timestamp, token, delay, opts = {}) { | ||
// | ||
@@ -429,16 +429,12 @@ // Bake in the job id first 12 bits into the timestamp | ||
} | ||
const queueKeys = this.queue.keys; | ||
const keys = [ | ||
'wait', | ||
'active', | ||
'prioritized', | ||
'delayed', | ||
jobId, | ||
].map(name => { | ||
return this.queue.toKey(name); | ||
}); | ||
keys.push.apply(keys, [ | ||
this.queue.keys.events, | ||
this.queue.keys.paused, | ||
this.queue.keys.meta, | ||
]); | ||
queueKeys.marker, | ||
queueKeys.active, | ||
queueKeys.prioritized, | ||
queueKeys.delayed, | ||
this.queue.toKey(jobId), | ||
queueKeys.events, | ||
queueKeys.meta, | ||
]; | ||
return keys.concat([ | ||
@@ -451,2 +447,3 @@ this.queue.keys[''], | ||
delay, | ||
opts.skipAttempt ? '1' : '0', | ||
]); | ||
@@ -471,5 +468,5 @@ } | ||
} | ||
async moveToDelayed(jobId, timestamp, delay, token = '0') { | ||
async moveToDelayed(jobId, timestamp, delay, token = '0', opts = {}) { | ||
const client = await this.queue.client; | ||
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay); | ||
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts); | ||
const result = await client.moveToDelayed(args); | ||
@@ -522,11 +519,13 @@ if (result < 0) { | ||
const keys = [ | ||
'active', | ||
'wait', | ||
'paused', | ||
jobId, | ||
'meta', | ||
].map(name => { | ||
return this.queue.toKey(name); | ||
}); | ||
keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.prioritized, this.queue.keys.pc); | ||
this.queue.keys.active, | ||
this.queue.keys.wait, | ||
this.queue.keys.paused, | ||
this.queue.toKey(jobId), | ||
this.queue.keys.meta, | ||
this.queue.keys.events, | ||
this.queue.keys.delayed, | ||
this.queue.keys.prioritized, | ||
this.queue.keys.pc, | ||
this.queue.keys.marker, | ||
]; | ||
const pushCmd = (lifo ? 'R' : 'L') + 'PUSH'; | ||
@@ -600,3 +599,3 @@ return keys.concat([ | ||
} | ||
async moveToActive(client, token, jobId) { | ||
async moveToActive(client, token) { | ||
const opts = this.queue.opts; | ||
@@ -615,2 +614,3 @@ const queueKeys = this.queue.keys; | ||
queueKeys.pc, | ||
queueKeys.marker, | ||
]; | ||
@@ -620,3 +620,2 @@ const args = [ | ||
Date.now(), | ||
jobId || '', | ||
pack({ | ||
@@ -641,2 +640,3 @@ token, | ||
this.queue.keys.events, | ||
this.queue.keys.marker, | ||
]; | ||
@@ -643,0 +643,0 @@ const args = [this.queue.toKey(''), jobId]; |
@@ -32,3 +32,3 @@ "use strict"; | ||
} | ||
constructor(name, processor, opts = {}, Connection) { | ||
constructor(name, processor, opts, Connection) { | ||
super(name, Object.assign(Object.assign({}, opts), { blockingConnection: true }), Connection); | ||
@@ -42,2 +42,5 @@ this.abortDelayController = null; | ||
this.running = false; | ||
if (!opts || !opts.connection) { | ||
throw new Error('Worker requires a connection'); | ||
} | ||
this.opts = Object.assign({ drainDelay: 5, concurrency: 1, lockDuration: 30000, maxStalledCount: 1, stalledInterval: 30000, autorun: true, runRetryDelay: 15000 }, this.opts); | ||
@@ -178,3 +181,3 @@ if (this.opts.stalledInterval <= 0) { | ||
if (this.waiting && numTotal > 1) { | ||
// We have a job waiting but we have others that we could start processing already | ||
// We are waiting for jobs but we have others that we could start processing already | ||
break; | ||
@@ -200,5 +203,3 @@ } | ||
job = await asyncFifoQueue.fetch(); | ||
} while (!job && | ||
asyncFifoQueue.numTotal() > 0 && | ||
asyncFifoQueue.numQueued() > 0); | ||
} while (!job && asyncFifoQueue.numQueued() > 0); | ||
if (job) { | ||
@@ -239,6 +240,8 @@ const token = job.token; | ||
if (this.drained && block && !this.limitUntil && !this.waiting) { | ||
this.waiting = this.waitForJob(bclient); | ||
this.waiting = this.waitForJob(bclient, this.blockUntil); | ||
try { | ||
const jobId = await this.waiting; | ||
return this.moveToActive(client, token, jobId); | ||
this.blockUntil = await this.waiting; | ||
if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 10) { | ||
return this.moveToActive(client, token); | ||
} | ||
} | ||
@@ -273,20 +276,10 @@ catch (err) { | ||
} | ||
async moveToActive(client, token, jobId) { | ||
// If we get the special delayed job ID, we pick the delay as the next | ||
// block timeout. | ||
if (jobId && jobId.startsWith('0:')) { | ||
this.blockUntil = parseInt(jobId.split(':')[1]) || 0; | ||
// Remove marker from active list. | ||
await client.lrem(this.keys.active, 1, jobId); | ||
if (this.blockUntil > 0) { | ||
return; | ||
} | ||
} | ||
const [jobData, id, limitUntil, delayUntil] = await this.scripts.moveToActive(client, token, jobId); | ||
async moveToActive(client, token) { | ||
const [jobData, id, limitUntil, delayUntil] = await this.scripts.moveToActive(client, token); | ||
this.updateDelays(limitUntil, delayUntil); | ||
return this.nextJobFromJobData(jobData, id, token); | ||
} | ||
async waitForJob(bclient) { | ||
async waitForJob(bclient, blockUntil) { | ||
if (this.paused) { | ||
return; | ||
return Infinity; | ||
} | ||
@@ -296,6 +289,3 @@ try { | ||
if (!this.closing) { | ||
let blockTimeout = Math.max(this.blockUntil | ||
? (this.blockUntil - Date.now()) / 1000 | ||
: opts.drainDelay, 0); | ||
let jobId; | ||
let blockTimeout = Math.max(blockUntil ? (blockUntil - Date.now()) / 1000 : opts.drainDelay, 0); | ||
// Blocking for less than 50ms is useless. | ||
@@ -310,9 +300,13 @@ if (blockTimeout > 0.05) { | ||
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout); | ||
jobId = await bclient.brpoplpush(this.keys.wait, this.keys.active, blockTimeout); | ||
// Markers should only be used for un-blocking, so we will handle them in this | ||
// function only. | ||
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); | ||
if (result) { | ||
const [_key, member, score] = result; | ||
if (member) { | ||
return parseInt(score); | ||
} | ||
} | ||
} | ||
else { | ||
jobId = await bclient.rpoplpush(this.keys.wait, this.keys.active); | ||
} | ||
this.blockUntil = 0; | ||
return jobId; | ||
return 0; | ||
} | ||
@@ -331,2 +325,3 @@ } | ||
} | ||
return Infinity; | ||
} | ||
@@ -333,0 +328,0 @@ /** |
@@ -80,3 +80,3 @@ "use strict"; | ||
Add delay marker if needed. | ||
]] | ||
]] | ||
-- Includes | ||
@@ -96,43 +96,26 @@ --[[ | ||
end | ||
local function addDelayMarkerIfNeeded(targetKey, delayedKey) | ||
local waitLen = rcall("LLEN", targetKey) | ||
if waitLen <= 1 then | ||
local function addDelayMarkerIfNeeded(markerKey, delayedKey) | ||
local nextTimestamp = getNextDelayedTimestamp(delayedKey) | ||
if nextTimestamp ~= nil then | ||
-- Check if there is already a marker with older timestamp | ||
-- if there is, we need to replace it. | ||
if waitLen == 1 then | ||
local marker = rcall("LINDEX", targetKey, 0) | ||
local oldTimestamp = tonumber(marker:sub(3)) | ||
if oldTimestamp and oldTimestamp > nextTimestamp then | ||
rcall("LSET", targetKey, 0, "0:" .. nextTimestamp) | ||
end | ||
else | ||
-- if there is no marker, then we need to add one | ||
rcall("LPUSH", targetKey, "0:" .. nextTimestamp) | ||
end | ||
-- Replace the score of the marker with the newest known | ||
-- next timestamp. | ||
rcall("ZADD", markerKey, nextTimestamp, "0") | ||
end | ||
end | ||
end | ||
--[[ | ||
Function to add job considering priority. | ||
Function to check for the meta.paused key to decide if we are paused or not | ||
(since an empty list and !EXISTS are not really the same). | ||
]] | ||
-- Includes | ||
local function isQueuePaused(queueMetaKey) | ||
return rcall("HEXISTS", queueMetaKey, "paused") == 1 | ||
end | ||
--[[ | ||
Function priority marker to wait if needed | ||
in order to wake up our workers and to respect priority | ||
order as much as possible | ||
Function to add job considering priority. | ||
]] | ||
local function addPriorityMarkerIfNeeded(waitKey) | ||
local waitLen = rcall("LLEN", waitKey) | ||
if waitLen == 0 then | ||
rcall("LPUSH", waitKey, "0:0") | ||
end | ||
end | ||
local function addJobWithPriority(waitKey, prioritizedKey, priority, paused, jobId, priorityCounterKey) | ||
local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) | ||
local prioCounter = rcall("INCR", priorityCounterKey) | ||
local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff) | ||
rcall("ZADD", prioritizedKey, score, jobId) | ||
if not paused then | ||
addPriorityMarkerIfNeeded(waitKey) | ||
if not isPaused then | ||
rcall("ZADD", markerKey, 0, "0") | ||
end | ||
@@ -151,31 +134,40 @@ end | ||
end | ||
local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp) | ||
local isParentActive = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId) | ||
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then | ||
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) | ||
local parentWaitKey = parentQueueKey .. ":wait" | ||
local parentTarget, paused = getTargetQueueList(parentQueueKey .. ":meta", parentWaitKey, | ||
parentQueueKey .. ":paused") | ||
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay") | ||
local priority = tonumber(jobAttributes[1]) or 0 | ||
local delay = tonumber(jobAttributes[2]) or 0 | ||
if delay > 0 then | ||
local delayedTimestamp = tonumber(timestamp) + delay | ||
local score = delayedTimestamp * 0x1000 | ||
local parentDelayedKey = parentQueueKey .. ":delayed" | ||
rcall("ZADD", parentDelayedKey, score, parentId) | ||
rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, | ||
"delay", delayedTimestamp) | ||
addDelayMarkerIfNeeded(parentTarget, parentDelayedKey) | ||
else | ||
if priority == 0 then | ||
rcall("RPUSH", parentTarget, parentId) | ||
else | ||
addJobWithPriority(parentWaitKey, parentQueueKey .. ":prioritized", priority, paused, | ||
parentId, parentQueueKey .. ":pc") | ||
end | ||
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, | ||
"prev", "waiting-children") | ||
local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, | ||
parentKey, parentId, timestamp) | ||
local isParentActive = rcall("ZSCORE", | ||
parentQueueKey .. ":waiting-children", parentId) | ||
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then | ||
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) | ||
local parentWaitKey = parentQueueKey .. ":wait" | ||
local parentPausedKey = parentQueueKey .. ":paused" | ||
local parentMetaKey = parentQueueKey .. ":meta" | ||
local parentMarkerKey = parentQueueKey .. ":marker" | ||
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay") | ||
local priority = tonumber(jobAttributes[1]) or 0 | ||
local delay = tonumber(jobAttributes[2]) or 0 | ||
if delay > 0 then | ||
local delayedTimestamp = tonumber(timestamp) + delay | ||
local score = delayedTimestamp * 0x1000 | ||
local parentDelayedKey = parentQueueKey .. ":delayed" | ||
rcall("ZADD", parentDelayedKey, score, parentId) | ||
rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", | ||
"jobId", parentId, "delay", delayedTimestamp) | ||
addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey) | ||
else | ||
if priority == 0 then | ||
local parentTarget, _paused = | ||
getTargetQueueList(parentMetaKey, parentWaitKey, | ||
parentPausedKey) | ||
rcall("RPUSH", parentTarget, parentId) | ||
rcall("ZADD", parentMarkerKey, 0, "0") | ||
else | ||
local isPaused = isQueuePaused(parentMetaKey) | ||
addJobWithPriority(parentMarkerKey, | ||
parentQueueKey .. ":prioritized", priority, | ||
parentId, parentQueueKey .. ":pc", isPaused) | ||
end | ||
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", | ||
"jobId", parentId, "prev", "waiting-children") | ||
end | ||
end | ||
end | ||
end | ||
@@ -250,4 +242,2 @@ local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey, | ||
-- Check if this job is a child of another job, if so add it to the parents dependencies | ||
-- TODO: Should not be possible to add a child job to a parent that is not in the "waiting-children" status. | ||
-- fail in this case. | ||
if parentDependenciesKey ~= nil then | ||
@@ -254,0 +244,0 @@ rcall("SADD", parentDependenciesKey, jobIdKey) |
@@ -16,2 +16,3 @@ "use strict"; | ||
if ARGV[i] == "wait" or ARGV[i] == "paused" then | ||
-- Markers in waitlist DEPRECATED in v5: Remove in v6. | ||
local marker = rcall("LINDEX", stateKey, -1) | ||
@@ -18,0 +19,0 @@ if marker and string.sub(marker, 1, 2) == "0:" then |
@@ -43,2 +43,3 @@ "use strict"; | ||
if ARGV[i] == "wait" or ARGV[i] == "paused" then | ||
-- Markers in waitlist DEPRECATED in v5: Remove in v6. | ||
local marker = rcall("LINDEX", stateKey, -1) | ||
@@ -45,0 +46,0 @@ if marker and string.sub(marker, 1, 2) == "0:" then |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
const tslib_1 = require("tslib"); | ||
tslib_1.__exportStar(require("./addDelayedJob-7"), exports); | ||
tslib_1.__exportStar(require("./addDelayedJob-6"), exports); | ||
tslib_1.__exportStar(require("./addParentJob-4"), exports); | ||
tslib_1.__exportStar(require("./addPrioritizedJob-8"), exports); | ||
tslib_1.__exportStar(require("./addStandardJob-6"), exports); | ||
tslib_1.__exportStar(require("./addPrioritizedJob-7"), exports); | ||
tslib_1.__exportStar(require("./addStandardJob-7"), exports); | ||
tslib_1.__exportStar(require("./changeDelay-3"), exports); | ||
tslib_1.__exportStar(require("./changePriority-5"), exports); | ||
tslib_1.__exportStar(require("./changePriority-6"), exports); | ||
tslib_1.__exportStar(require("./cleanJobsInSet-2"), exports); | ||
@@ -22,10 +22,10 @@ tslib_1.__exportStar(require("./drain-4"), exports); | ||
tslib_1.__exportStar(require("./moveStalledJobsToWait-8"), exports); | ||
tslib_1.__exportStar(require("./moveToActive-10"), exports); | ||
tslib_1.__exportStar(require("./moveToDelayed-8"), exports); | ||
tslib_1.__exportStar(require("./moveToFinished-13"), exports); | ||
tslib_1.__exportStar(require("./moveToActive-11"), exports); | ||
tslib_1.__exportStar(require("./moveToDelayed-7"), exports); | ||
tslib_1.__exportStar(require("./moveToFinished-14"), exports); | ||
tslib_1.__exportStar(require("./moveToWaitingChildren-4"), exports); | ||
tslib_1.__exportStar(require("./obliterate-2"), exports); | ||
tslib_1.__exportStar(require("./paginate-1"), exports); | ||
tslib_1.__exportStar(require("./pause-5"), exports); | ||
tslib_1.__exportStar(require("./promote-7"), exports); | ||
tslib_1.__exportStar(require("./pause-7"), exports); | ||
tslib_1.__exportStar(require("./promote-8"), exports); | ||
tslib_1.__exportStar(require("./releaseLock-1"), exports); | ||
@@ -35,3 +35,3 @@ tslib_1.__exportStar(require("./removeJob-1"), exports); | ||
tslib_1.__exportStar(require("./reprocessJob-6"), exports); | ||
tslib_1.__exportStar(require("./retryJob-9"), exports); | ||
tslib_1.__exportStar(require("./retryJob-10"), exports); | ||
tslib_1.__exportStar(require("./saveStacktrace-1"), exports); | ||
@@ -38,0 +38,0 @@ tslib_1.__exportStar(require("./updateData-1"), exports); |
@@ -209,2 +209,3 @@ "use strict"; | ||
for i, jobId in ipairs(stalling) do | ||
-- Markers in waitlist DEPRECATED in v5: Remove in v6. | ||
if string.sub(jobId, 1, 2) == "0:" then | ||
@@ -211,0 +212,0 @@ -- If the jobId is a delay marker ID we just remove it. |
@@ -23,3 +23,4 @@ "use strict"; | ||
local rcall = redis.call | ||
local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp, lockKey, token) | ||
local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp, | ||
lockKey, jobKey, token) | ||
if token ~= "0" then | ||
@@ -43,3 +44,4 @@ if rcall("GET", lockKey) == token then | ||
if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then | ||
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1]) | ||
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4], | ||
ARGV[1]) | ||
end | ||
@@ -49,3 +51,4 @@ return 1 | ||
if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then | ||
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1]) | ||
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4], | ||
ARGV[1]) | ||
end | ||
@@ -52,0 +55,0 @@ return 1 |
@@ -17,14 +17,7 @@ import { EventEmitter } from 'events'; | ||
export class FlowProducer extends EventEmitter { | ||
constructor(opts = {}, Connection = RedisConnection) { | ||
constructor(opts = { connection: {} }, Connection = RedisConnection) { | ||
super(); | ||
this.opts = opts; | ||
this.opts = Object.assign({ prefix: 'bull' }, opts); | ||
if (!opts.connection) { | ||
console.warn([ | ||
'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer', | ||
'without providing explicitly a connection or connection options is deprecated. This behaviour will', | ||
'be removed in the next major release', | ||
].join(' ')); | ||
} | ||
this.connection = new Connection(opts.connection, isRedisInstance(opts === null || opts === void 0 ? void 0 : opts.connection), false, opts.skipVersionCheck); | ||
this.connection = new Connection(opts.connection, isRedisInstance(opts.connection), false, opts.skipVersionCheck); | ||
this.connection.on('error', (error) => this.emit('error', error)); | ||
@@ -278,3 +271,3 @@ this.connection.on('close', () => { | ||
toKey: (type) => queueKeys.toKey(node.queueName, type), | ||
opts: { prefix }, | ||
opts: { prefix, connection: {} }, | ||
qualifiedName: queueKeys.getQueueQualifiedName(node.queueName), | ||
@@ -281,0 +274,0 @@ closing: this.closing, |
@@ -62,2 +62,7 @@ import { ChainableCommander } from 'ioredis'; | ||
/** | ||
* Number of attempts when job is moved to active. | ||
* @defaultValue 0 | ||
*/ | ||
attemptsStarted: number; | ||
/** | ||
* Number of attempts after the job has failed. | ||
@@ -64,0 +69,0 @@ * @defaultValue 0 |
@@ -61,2 +61,7 @@ import { __rest } from "tslib"; | ||
/** | ||
* Number of attempts when job is moved to active. | ||
* @defaultValue 0 | ||
*/ | ||
this.attemptsStarted = 0; | ||
/** | ||
* Number of attempts after the job has failed. | ||
@@ -157,3 +162,4 @@ * @defaultValue 0 | ||
job.failedReason = json.failedReason; | ||
job.attemptsMade = parseInt(json.attemptsMade || '0'); | ||
job.attemptsStarted = parseInt(json.ats || '0'); | ||
job.attemptsMade = parseInt(json.attemptsMade || json.atm || '0'); | ||
job.stacktrace = getTraces(json.stacktrace); | ||
@@ -243,2 +249,3 @@ if (typeof json.returnvalue === 'string') { | ||
attemptsMade: this.attemptsMade, | ||
attemptsStarted: this.attemptsStarted, | ||
finishedOn: this.finishedOn, | ||
@@ -366,3 +373,4 @@ processedOn: this.processedOn, | ||
const result = await this.scripts.moveToFinished(this.id, args); | ||
this.finishedOn = args[14]; | ||
this.finishedOn = args[this.scripts.moveToFinishedKeys.length + 1]; | ||
this.attemptsMade += 1; | ||
return result; | ||
@@ -391,3 +399,3 @@ } | ||
let finishedOn, delay; | ||
if (this.attemptsMade < this.opts.attempts && | ||
if (this.attemptsMade + 1 < this.opts.attempts && | ||
!this.discarded && | ||
@@ -397,3 +405,3 @@ !(err instanceof UnrecoverableError || err.name == 'UnrecoverableError')) { | ||
// Check if backoff is needed | ||
delay = await Backoffs.calculate(this.opts.backoff, this.attemptsMade, err, this, opts.settings && opts.settings.backoffStrategy); | ||
delay = await Backoffs.calculate(this.opts.backoff, this.attemptsMade + 1, err, this, opts.settings && opts.settings.backoffStrategy); | ||
if (delay === -1) { | ||
@@ -420,3 +428,3 @@ moveToFailed = true; | ||
multi.moveToFinished(args); | ||
finishedOn = args[14]; | ||
finishedOn = args[this.scripts.moveToFinishedKeys.length + 1]; | ||
command = 'failed'; | ||
@@ -439,2 +447,3 @@ } | ||
} | ||
this.attemptsMade += 1; | ||
} | ||
@@ -676,5 +685,6 @@ /** | ||
*/ | ||
moveToDelayed(timestamp, token) { | ||
async moveToDelayed(timestamp, token) { | ||
const delay = timestamp - Date.now(); | ||
return this.scripts.moveToDelayed(this.id, timestamp, delay > 0 ? delay : 0, token); | ||
const movedToDelayed = await this.scripts.moveToDelayed(this.id, timestamp, delay > 0 ? delay : 0, token, { skipAttempt: true }); | ||
return movedToDelayed; | ||
} | ||
@@ -688,4 +698,5 @@ /** | ||
*/ | ||
moveToWaitingChildren(token, opts = {}) { | ||
return this.scripts.moveToWaitingChildren(this.id, token, opts); | ||
async moveToWaitingChildren(token, opts = {}) { | ||
const movedToWaitingChildren = await this.scripts.moveToWaitingChildren(this.id, token, opts); | ||
return movedToWaitingChildren; | ||
} | ||
@@ -755,4 +766,3 @@ /** | ||
if (`${parseInt(this.id, 10)}` === this.id) { | ||
//TODO: throw an error in next breaking change | ||
console.warn('Custom Ids should not be integers: https://github.com/taskforcesh/bullmq/pull/1569'); | ||
throw new Error('Custom Ids cannot be integers'); | ||
} | ||
@@ -759,0 +769,0 @@ if (this.opts.priority) { |
@@ -23,3 +23,3 @@ import { EventEmitter } from 'events'; | ||
*/ | ||
constructor(name, opts = {}, Connection = RedisConnection) { | ||
constructor(name, opts = { connection: {} }, Connection = RedisConnection) { | ||
super(); | ||
@@ -33,10 +33,3 @@ this.name = name; | ||
} | ||
if (!opts.connection) { | ||
console.warn([ | ||
'BullMQ: DEPRECATION WARNING! Optional instantiation of Queue, Worker, QueueEvents and FlowProducer', | ||
'without providing explicitly a connection or connection options is deprecated. This behaviour will', | ||
'be removed in the next major release', | ||
].join(' ')); | ||
} | ||
this.connection = new Connection(opts.connection, isRedisInstance(opts === null || opts === void 0 ? void 0 : opts.connection), opts.blockingConnection, opts.skipVersionCheck); | ||
this.connection = new Connection(opts.connection, isRedisInstance(opts.connection), opts.blockingConnection, opts.skipVersionCheck); | ||
this.connection.on('error', (error) => this.emit('error', error)); | ||
@@ -43,0 +36,0 @@ this.connection.on('close', () => { |
@@ -12,3 +12,5 @@ import { __rest } from "tslib"; | ||
export class QueueEvents extends QueueBase { | ||
constructor(name, _a = {}, Connection) { | ||
constructor(name, _a = { | ||
connection: {}, | ||
}, Connection) { | ||
var { connection, autorun = true } = _a, opts = __rest(_a, ["connection", "autorun"]); | ||
@@ -15,0 +17,0 @@ super(name, Object.assign(Object.assign({}, opts), { connection: isRedisInstance(connection) |
@@ -25,2 +25,3 @@ export class QueueKeys { | ||
'pc', | ||
'marker', // marker key | ||
].forEach(key => { | ||
@@ -27,0 +28,0 @@ keys[key] = this.toKey(name, key); |
@@ -12,6 +12,3 @@ import { EventEmitter } from 'events'; | ||
].join(' '); | ||
const deprecationMessage = [ | ||
'BullMQ: DEPRECATION WARNING! Your redis options maxRetriesPerRequest must be null.', | ||
'On the next versions having this settings will throw an exception', | ||
].join(' '); | ||
const deprecationMessage = 'BullMQ: Your redis options maxRetriesPerRequest must be null.'; | ||
export class RedisConnection extends EventEmitter { | ||
@@ -47,3 +44,3 @@ constructor(opts, shared = false, blocking = true, skipVersionCheck = false) { | ||
} | ||
this.checkBlockingOptions(deprecationMessage, this.opts); | ||
this.checkBlockingOptions(deprecationMessage, this.opts, true); | ||
} | ||
@@ -64,5 +61,10 @@ this.skipVersionCheck = | ||
} | ||
checkBlockingOptions(msg, options) { | ||
checkBlockingOptions(msg, options, throwError = false) { | ||
if (this.blocking && options && options.maxRetriesPerRequest) { | ||
console.error(msg); | ||
if (throwError) { | ||
throw new Error(msg); | ||
} | ||
else { | ||
console.error(msg); | ||
} | ||
} | ||
@@ -69,0 +71,0 @@ } |
@@ -5,3 +5,3 @@ /** | ||
/// <reference types="node" /> | ||
import { JobJson, JobJsonRaw, MinimalJob, MoveToWaitingChildrenOpts, ParentOpts, RedisClient, KeepJobs } from '../interfaces'; | ||
import { JobJson, JobJsonRaw, MinimalJob, MoveToWaitingChildrenOpts, ParentOpts, RedisClient, KeepJobs, MoveToDelayedOpts } from '../interfaces'; | ||
import { JobState, JobType, FinishedStatus, FinishedPropValAttribute, MinimalQueue, RedisJobOptions } from '../types'; | ||
@@ -44,6 +44,6 @@ import { ChainableCommander } from 'ioredis'; | ||
private changePriorityArgs; | ||
moveToDelayedArgs(jobId: string, timestamp: number, token: string, delay: number): (string | number)[]; | ||
moveToDelayedArgs(jobId: string, timestamp: number, token: string, delay: number, opts?: MoveToDelayedOpts): (string | number)[]; | ||
saveStacktraceArgs(jobId: string, stacktrace: string, failedReason: string): string[]; | ||
moveToWaitingChildrenArgs(jobId: string, token: string, opts?: MoveToWaitingChildrenOpts): string[]; | ||
moveToDelayed(jobId: string, timestamp: number, delay: number, token?: string): Promise<void>; | ||
moveToDelayed(jobId: string, timestamp: number, delay: number, token?: string, opts?: MoveToDelayedOpts): Promise<void>; | ||
/** | ||
@@ -85,3 +85,3 @@ * Move parent job to waiting-children state. | ||
reprocessJob<T = any, R = any, N extends string = string>(job: MinimalJob<T, R, N>, state: 'failed' | 'completed'): Promise<void>; | ||
moveToActive(client: RedisClient, token: string, jobId?: string): Promise<any[]>; | ||
moveToActive(client: RedisClient, token: string): Promise<any[]>; | ||
promote(jobId: string): Promise<void>; | ||
@@ -88,0 +88,0 @@ /** |
@@ -32,2 +32,3 @@ /** | ||
undefined, | ||
undefined, | ||
]; | ||
@@ -49,4 +50,3 @@ } | ||
const keys = [ | ||
queueKeys.wait, | ||
queueKeys.paused, | ||
queueKeys.marker, | ||
queueKeys.meta, | ||
@@ -64,4 +64,3 @@ queueKeys.id, | ||
const keys = [ | ||
queueKeys.wait, | ||
queueKeys.paused, | ||
queueKeys.marker, | ||
queueKeys.meta, | ||
@@ -135,2 +134,3 @@ queueKeys.id, | ||
queueKeys.events, | ||
queueKeys.marker, | ||
]; | ||
@@ -153,3 +153,3 @@ keys.push(pack(args), job.data, encodedOpts); | ||
const keys = [src, dst, 'meta', 'prioritized'].map((name) => this.queue.toKey(name)); | ||
keys.push(this.queue.keys.events); | ||
keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.marker); | ||
return client.pause(keys.concat([pause ? 'paused' : 'resumed'])); | ||
@@ -216,2 +216,3 @@ } | ||
keys[12] = metricsKey; | ||
keys[13] = this.queue.keys.marker; | ||
const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs); | ||
@@ -224,3 +225,2 @@ const args = [ | ||
target, | ||
JSON.stringify({ jobId: job.id, val: val }), | ||
!fetchNext || this.queue.closing ? 0 : 1, | ||
@@ -234,3 +234,2 @@ queueKeys[''], | ||
attempts: job.opts.attempts, | ||
attemptsMade: job.attemptsMade, | ||
maxMetricsSize: ((_b = opts.metrics) === null || _b === void 0 ? void 0 : _b.maxDataPoints) | ||
@@ -404,2 +403,3 @@ ? (_c = opts.metrics) === null || _c === void 0 ? void 0 : _c.maxDataPoints | ||
this.queue.keys.pc, | ||
this.queue.keys.marker, | ||
]; | ||
@@ -414,3 +414,3 @@ return keys.concat([ | ||
// Note: We have an issue here with jobs using custom job ids | ||
moveToDelayedArgs(jobId, timestamp, token, delay) { | ||
moveToDelayedArgs(jobId, timestamp, token, delay, opts = {}) { | ||
// | ||
@@ -427,16 +427,12 @@ // Bake in the job id first 12 bits into the timestamp | ||
} | ||
const queueKeys = this.queue.keys; | ||
const keys = [ | ||
'wait', | ||
'active', | ||
'prioritized', | ||
'delayed', | ||
jobId, | ||
].map(name => { | ||
return this.queue.toKey(name); | ||
}); | ||
keys.push.apply(keys, [ | ||
this.queue.keys.events, | ||
this.queue.keys.paused, | ||
this.queue.keys.meta, | ||
]); | ||
queueKeys.marker, | ||
queueKeys.active, | ||
queueKeys.prioritized, | ||
queueKeys.delayed, | ||
this.queue.toKey(jobId), | ||
queueKeys.events, | ||
queueKeys.meta, | ||
]; | ||
return keys.concat([ | ||
@@ -449,2 +445,3 @@ this.queue.keys[''], | ||
delay, | ||
opts.skipAttempt ? '1' : '0', | ||
]); | ||
@@ -469,5 +466,5 @@ } | ||
} | ||
async moveToDelayed(jobId, timestamp, delay, token = '0') { | ||
async moveToDelayed(jobId, timestamp, delay, token = '0', opts = {}) { | ||
const client = await this.queue.client; | ||
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay); | ||
const args = this.moveToDelayedArgs(jobId, timestamp, token, delay, opts); | ||
const result = await client.moveToDelayed(args); | ||
@@ -520,11 +517,13 @@ if (result < 0) { | ||
const keys = [ | ||
'active', | ||
'wait', | ||
'paused', | ||
jobId, | ||
'meta', | ||
].map(name => { | ||
return this.queue.toKey(name); | ||
}); | ||
keys.push(this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.prioritized, this.queue.keys.pc); | ||
this.queue.keys.active, | ||
this.queue.keys.wait, | ||
this.queue.keys.paused, | ||
this.queue.toKey(jobId), | ||
this.queue.keys.meta, | ||
this.queue.keys.events, | ||
this.queue.keys.delayed, | ||
this.queue.keys.prioritized, | ||
this.queue.keys.pc, | ||
this.queue.keys.marker, | ||
]; | ||
const pushCmd = (lifo ? 'R' : 'L') + 'PUSH'; | ||
@@ -598,3 +597,3 @@ return keys.concat([ | ||
} | ||
async moveToActive(client, token, jobId) { | ||
async moveToActive(client, token) { | ||
const opts = this.queue.opts; | ||
@@ -613,2 +612,3 @@ const queueKeys = this.queue.keys; | ||
queueKeys.pc, | ||
queueKeys.marker, | ||
]; | ||
@@ -618,3 +618,2 @@ const args = [ | ||
Date.now(), | ||
jobId || '', | ||
pack({ | ||
@@ -639,2 +638,3 @@ token, | ||
this.queue.keys.events, | ||
this.queue.keys.marker, | ||
]; | ||
@@ -641,0 +641,0 @@ const args = [this.queue.toKey(''), jobId]; |
@@ -147,3 +147,3 @@ /// <reference types="node" /> | ||
rateLimit(expireTimeMs: number): Promise<void>; | ||
protected moveToActive(client: RedisClient, token: string, jobId?: string): Promise<Job<DataType, ResultType, NameType>>; | ||
protected moveToActive(client: RedisClient, token: string): Promise<Job<DataType, ResultType, NameType>>; | ||
private waitForJob; | ||
@@ -150,0 +150,0 @@ /** |
@@ -29,3 +29,3 @@ import * as fs from 'fs'; | ||
} | ||
constructor(name, processor, opts = {}, Connection) { | ||
constructor(name, processor, opts, Connection) { | ||
super(name, Object.assign(Object.assign({}, opts), { blockingConnection: true }), Connection); | ||
@@ -39,2 +39,5 @@ this.abortDelayController = null; | ||
this.running = false; | ||
if (!opts || !opts.connection) { | ||
throw new Error('Worker requires a connection'); | ||
} | ||
this.opts = Object.assign({ drainDelay: 5, concurrency: 1, lockDuration: 30000, maxStalledCount: 1, stalledInterval: 30000, autorun: true, runRetryDelay: 15000 }, this.opts); | ||
@@ -175,3 +178,3 @@ if (this.opts.stalledInterval <= 0) { | ||
if (this.waiting && numTotal > 1) { | ||
// We have a job waiting but we have others that we could start processing already | ||
// We are waiting for jobs but we have others that we could start processing already | ||
break; | ||
@@ -197,5 +200,3 @@ } | ||
job = await asyncFifoQueue.fetch(); | ||
} while (!job && | ||
asyncFifoQueue.numTotal() > 0 && | ||
asyncFifoQueue.numQueued() > 0); | ||
} while (!job && asyncFifoQueue.numQueued() > 0); | ||
if (job) { | ||
@@ -236,6 +237,8 @@ const token = job.token; | ||
if (this.drained && block && !this.limitUntil && !this.waiting) { | ||
this.waiting = this.waitForJob(bclient); | ||
this.waiting = this.waitForJob(bclient, this.blockUntil); | ||
try { | ||
const jobId = await this.waiting; | ||
return this.moveToActive(client, token, jobId); | ||
this.blockUntil = await this.waiting; | ||
if (this.blockUntil <= 0 || this.blockUntil - Date.now() < 10) { | ||
return this.moveToActive(client, token); | ||
} | ||
} | ||
@@ -270,20 +273,10 @@ catch (err) { | ||
} | ||
async moveToActive(client, token, jobId) { | ||
// If we get the special delayed job ID, we pick the delay as the next | ||
// block timeout. | ||
if (jobId && jobId.startsWith('0:')) { | ||
this.blockUntil = parseInt(jobId.split(':')[1]) || 0; | ||
// Remove marker from active list. | ||
await client.lrem(this.keys.active, 1, jobId); | ||
if (this.blockUntil > 0) { | ||
return; | ||
} | ||
} | ||
const [jobData, id, limitUntil, delayUntil] = await this.scripts.moveToActive(client, token, jobId); | ||
async moveToActive(client, token) { | ||
const [jobData, id, limitUntil, delayUntil] = await this.scripts.moveToActive(client, token); | ||
this.updateDelays(limitUntil, delayUntil); | ||
return this.nextJobFromJobData(jobData, id, token); | ||
} | ||
async waitForJob(bclient) { | ||
async waitForJob(bclient, blockUntil) { | ||
if (this.paused) { | ||
return; | ||
return Infinity; | ||
} | ||
@@ -293,6 +286,3 @@ try { | ||
if (!this.closing) { | ||
let blockTimeout = Math.max(this.blockUntil | ||
? (this.blockUntil - Date.now()) / 1000 | ||
: opts.drainDelay, 0); | ||
let jobId; | ||
let blockTimeout = Math.max(blockUntil ? (blockUntil - Date.now()) / 1000 : opts.drainDelay, 0); | ||
// Blocking for less than 50ms is useless. | ||
@@ -307,9 +297,13 @@ if (blockTimeout > 0.05) { | ||
blockTimeout = Math.min(blockTimeout, maximumBlockTimeout); | ||
jobId = await bclient.brpoplpush(this.keys.wait, this.keys.active, blockTimeout); | ||
// Markers should only be used for un-blocking, so we will handle them in this | ||
// function only. | ||
const result = await bclient.bzpopmin(this.keys.marker, blockTimeout); | ||
if (result) { | ||
const [_key, member, score] = result; | ||
if (member) { | ||
return parseInt(score); | ||
} | ||
} | ||
} | ||
else { | ||
jobId = await bclient.rpoplpush(this.keys.wait, this.keys.active); | ||
} | ||
this.blockUntil = 0; | ||
return jobId; | ||
return 0; | ||
} | ||
@@ -328,2 +322,3 @@ } | ||
} | ||
return Infinity; | ||
} | ||
@@ -330,0 +325,0 @@ /** |
@@ -10,2 +10,3 @@ import { RedisJobOptions } from '../types'; | ||
attemptsMade: number; | ||
attemptsStarted: number; | ||
finishedOn?: number; | ||
@@ -28,3 +29,3 @@ processedOn?: number; | ||
progress: string; | ||
attemptsMade: string; | ||
attemptsMade?: string; | ||
finishedOn?: string; | ||
@@ -39,2 +40,4 @@ processedOn?: string; | ||
rjk?: string; | ||
atm?: string; | ||
ats?: string; | ||
} |
@@ -5,2 +5,5 @@ import { JobsOptions, JobJsonSandbox } from '../types'; | ||
export type BulkJobOptions = Omit<JobsOptions, 'repeat'>; | ||
export interface MoveToDelayedOpts { | ||
skipAttempt?: boolean; | ||
} | ||
export interface MoveToWaitingChildrenOpts { | ||
@@ -7,0 +10,0 @@ child?: { |
@@ -15,3 +15,3 @@ import { AdvancedRepeatOptions } from './advanced-options'; | ||
*/ | ||
connection?: ConnectionOptions; | ||
connection: ConnectionOptions; | ||
/** | ||
@@ -18,0 +18,0 @@ * Denotes commands should retry indefinitely. |
@@ -77,3 +77,3 @@ const content = `--[[ | ||
Add delay marker if needed. | ||
]] | ||
]] | ||
-- Includes | ||
@@ -93,43 +93,26 @@ --[[ | ||
end | ||
local function addDelayMarkerIfNeeded(targetKey, delayedKey) | ||
local waitLen = rcall("LLEN", targetKey) | ||
if waitLen <= 1 then | ||
local function addDelayMarkerIfNeeded(markerKey, delayedKey) | ||
local nextTimestamp = getNextDelayedTimestamp(delayedKey) | ||
if nextTimestamp ~= nil then | ||
-- Check if there is already a marker with older timestamp | ||
-- if there is, we need to replace it. | ||
if waitLen == 1 then | ||
local marker = rcall("LINDEX", targetKey, 0) | ||
local oldTimestamp = tonumber(marker:sub(3)) | ||
if oldTimestamp and oldTimestamp > nextTimestamp then | ||
rcall("LSET", targetKey, 0, "0:" .. nextTimestamp) | ||
end | ||
else | ||
-- if there is no marker, then we need to add one | ||
rcall("LPUSH", targetKey, "0:" .. nextTimestamp) | ||
end | ||
-- Replace the score of the marker with the newest known | ||
-- next timestamp. | ||
rcall("ZADD", markerKey, nextTimestamp, "0") | ||
end | ||
end | ||
end | ||
--[[ | ||
Function to add job considering priority. | ||
Function to check for the meta.paused key to decide if we are paused or not | ||
(since an empty list and !EXISTS are not really the same). | ||
]] | ||
-- Includes | ||
local function isQueuePaused(queueMetaKey) | ||
return rcall("HEXISTS", queueMetaKey, "paused") == 1 | ||
end | ||
--[[ | ||
Function priority marker to wait if needed | ||
in order to wake up our workers and to respect priority | ||
order as much as possible | ||
Function to add job considering priority. | ||
]] | ||
local function addPriorityMarkerIfNeeded(waitKey) | ||
local waitLen = rcall("LLEN", waitKey) | ||
if waitLen == 0 then | ||
rcall("LPUSH", waitKey, "0:0") | ||
end | ||
end | ||
local function addJobWithPriority(waitKey, prioritizedKey, priority, paused, jobId, priorityCounterKey) | ||
local function addJobWithPriority(markerKey, prioritizedKey, priority, jobId, priorityCounterKey, isPaused) | ||
local prioCounter = rcall("INCR", priorityCounterKey) | ||
local score = priority * 0x100000000 + bit.band(prioCounter, 0xffffffffffff) | ||
rcall("ZADD", prioritizedKey, score, jobId) | ||
if not paused then | ||
addPriorityMarkerIfNeeded(waitKey) | ||
if not isPaused then | ||
rcall("ZADD", markerKey, 0, "0") | ||
end | ||
@@ -148,31 +131,40 @@ end | ||
end | ||
local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp) | ||
local isParentActive = rcall("ZSCORE", parentQueueKey .. ":waiting-children", parentId) | ||
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then | ||
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) | ||
local parentWaitKey = parentQueueKey .. ":wait" | ||
local parentTarget, paused = getTargetQueueList(parentQueueKey .. ":meta", parentWaitKey, | ||
parentQueueKey .. ":paused") | ||
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay") | ||
local priority = tonumber(jobAttributes[1]) or 0 | ||
local delay = tonumber(jobAttributes[2]) or 0 | ||
if delay > 0 then | ||
local delayedTimestamp = tonumber(timestamp) + delay | ||
local score = delayedTimestamp * 0x1000 | ||
local parentDelayedKey = parentQueueKey .. ":delayed" | ||
rcall("ZADD", parentDelayedKey, score, parentId) | ||
rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", "jobId", parentId, | ||
"delay", delayedTimestamp) | ||
addDelayMarkerIfNeeded(parentTarget, parentDelayedKey) | ||
else | ||
if priority == 0 then | ||
rcall("RPUSH", parentTarget, parentId) | ||
else | ||
addJobWithPriority(parentWaitKey, parentQueueKey .. ":prioritized", priority, paused, | ||
parentId, parentQueueKey .. ":pc") | ||
end | ||
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", "jobId", parentId, | ||
"prev", "waiting-children") | ||
local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, | ||
parentKey, parentId, timestamp) | ||
local isParentActive = rcall("ZSCORE", | ||
parentQueueKey .. ":waiting-children", parentId) | ||
if rcall("SCARD", parentDependenciesKey) == 0 and isParentActive then | ||
rcall("ZREM", parentQueueKey .. ":waiting-children", parentId) | ||
local parentWaitKey = parentQueueKey .. ":wait" | ||
local parentPausedKey = parentQueueKey .. ":paused" | ||
local parentMetaKey = parentQueueKey .. ":meta" | ||
local parentMarkerKey = parentQueueKey .. ":marker" | ||
local jobAttributes = rcall("HMGET", parentKey, "priority", "delay") | ||
local priority = tonumber(jobAttributes[1]) or 0 | ||
local delay = tonumber(jobAttributes[2]) or 0 | ||
if delay > 0 then | ||
local delayedTimestamp = tonumber(timestamp) + delay | ||
local score = delayedTimestamp * 0x1000 | ||
local parentDelayedKey = parentQueueKey .. ":delayed" | ||
rcall("ZADD", parentDelayedKey, score, parentId) | ||
rcall("XADD", parentQueueKey .. ":events", "*", "event", "delayed", | ||
"jobId", parentId, "delay", delayedTimestamp) | ||
addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey) | ||
else | ||
if priority == 0 then | ||
local parentTarget, _paused = | ||
getTargetQueueList(parentMetaKey, parentWaitKey, | ||
parentPausedKey) | ||
rcall("RPUSH", parentTarget, parentId) | ||
rcall("ZADD", parentMarkerKey, 0, "0") | ||
else | ||
local isPaused = isQueuePaused(parentMetaKey) | ||
addJobWithPriority(parentMarkerKey, | ||
parentQueueKey .. ":prioritized", priority, | ||
parentId, parentQueueKey .. ":pc", isPaused) | ||
end | ||
rcall("XADD", parentQueueKey .. ":events", "*", "event", "waiting", | ||
"jobId", parentId, "prev", "waiting-children") | ||
end | ||
end | ||
end | ||
end | ||
@@ -247,4 +239,2 @@ local function updateParentDepsIfNeeded(parentKey, parentQueueKey, parentDependenciesKey, | ||
-- Check if this job is a child of another job, if so add it to the parents dependencies | ||
-- TODO: Should not be possible to add a child job to a parent that is not in the "waiting-children" status. | ||
-- fail in this case. | ||
if parentDependenciesKey ~= nil then | ||
@@ -251,0 +241,0 @@ rcall("SADD", parentDependenciesKey, jobIdKey) |
@@ -13,2 +13,3 @@ const content = `--[[ | ||
if ARGV[i] == "wait" or ARGV[i] == "paused" then | ||
-- Markers in waitlist DEPRECATED in v5: Remove in v6. | ||
local marker = rcall("LINDEX", stateKey, -1) | ||
@@ -15,0 +16,0 @@ if marker and string.sub(marker, 1, 2) == "0:" then |
@@ -40,2 +40,3 @@ const content = `--[[ | ||
if ARGV[i] == "wait" or ARGV[i] == "paused" then | ||
-- Markers in waitlist DEPRECATED in v5: Remove in v6. | ||
local marker = rcall("LINDEX", stateKey, -1) | ||
@@ -42,0 +43,0 @@ if marker and string.sub(marker, 1, 2) == "0:" then |
@@ -1,7 +0,7 @@ | ||
export * from './addDelayedJob-7'; | ||
export * from './addDelayedJob-6'; | ||
export * from './addParentJob-4'; | ||
export * from './addPrioritizedJob-8'; | ||
export * from './addStandardJob-6'; | ||
export * from './addPrioritizedJob-7'; | ||
export * from './addStandardJob-7'; | ||
export * from './changeDelay-3'; | ||
export * from './changePriority-5'; | ||
export * from './changePriority-6'; | ||
export * from './cleanJobsInSet-2'; | ||
@@ -19,10 +19,10 @@ export * from './drain-4'; | ||
export * from './moveStalledJobsToWait-8'; | ||
export * from './moveToActive-10'; | ||
export * from './moveToDelayed-8'; | ||
export * from './moveToFinished-13'; | ||
export * from './moveToActive-11'; | ||
export * from './moveToDelayed-7'; | ||
export * from './moveToFinished-14'; | ||
export * from './moveToWaitingChildren-4'; | ||
export * from './obliterate-2'; | ||
export * from './paginate-1'; | ||
export * from './pause-5'; | ||
export * from './promote-7'; | ||
export * from './pause-7'; | ||
export * from './promote-8'; | ||
export * from './releaseLock-1'; | ||
@@ -32,5 +32,5 @@ export * from './removeJob-1'; | ||
export * from './reprocessJob-6'; | ||
export * from './retryJob-9'; | ||
export * from './retryJob-10'; | ||
export * from './saveStacktrace-1'; | ||
export * from './updateData-1'; | ||
export * from './updateProgress-3'; |
@@ -1,7 +0,7 @@ | ||
export * from './addDelayedJob-7'; | ||
export * from './addDelayedJob-6'; | ||
export * from './addParentJob-4'; | ||
export * from './addPrioritizedJob-8'; | ||
export * from './addStandardJob-6'; | ||
export * from './addPrioritizedJob-7'; | ||
export * from './addStandardJob-7'; | ||
export * from './changeDelay-3'; | ||
export * from './changePriority-5'; | ||
export * from './changePriority-6'; | ||
export * from './cleanJobsInSet-2'; | ||
@@ -19,10 +19,10 @@ export * from './drain-4'; | ||
export * from './moveStalledJobsToWait-8'; | ||
export * from './moveToActive-10'; | ||
export * from './moveToDelayed-8'; | ||
export * from './moveToFinished-13'; | ||
export * from './moveToActive-11'; | ||
export * from './moveToDelayed-7'; | ||
export * from './moveToFinished-14'; | ||
export * from './moveToWaitingChildren-4'; | ||
export * from './obliterate-2'; | ||
export * from './paginate-1'; | ||
export * from './pause-5'; | ||
export * from './promote-7'; | ||
export * from './pause-7'; | ||
export * from './promote-8'; | ||
export * from './releaseLock-1'; | ||
@@ -32,3 +32,3 @@ export * from './removeJob-1'; | ||
export * from './reprocessJob-6'; | ||
export * from './retryJob-9'; | ||
export * from './retryJob-10'; | ||
export * from './saveStacktrace-1'; | ||
@@ -35,0 +35,0 @@ export * from './updateData-1'; |
@@ -206,2 +206,3 @@ const content = `--[[ | ||
for i, jobId in ipairs(stalling) do | ||
-- Markers in waitlist DEPRECATED in v5: Remove in v6. | ||
if string.sub(jobId, 1, 2) == "0:" then | ||
@@ -208,0 +209,0 @@ -- If the jobId is a delay marker ID we just remove it. |
@@ -20,3 +20,4 @@ const content = `--[[ | ||
local rcall = redis.call | ||
local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp, lockKey, token) | ||
local function moveToWaitingChildren (activeKey, waitingChildrenKey, jobId, timestamp, | ||
lockKey, jobKey, token) | ||
if token ~= "0" then | ||
@@ -40,3 +41,4 @@ if rcall("GET", lockKey) == token then | ||
if rcall("SISMEMBER", KEYS[4] .. ":dependencies", ARGV[2]) ~= 0 then | ||
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1]) | ||
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4], | ||
ARGV[1]) | ||
end | ||
@@ -46,3 +48,4 @@ return 1 | ||
if rcall("SCARD", KEYS[4] .. ":dependencies") ~= 0 then | ||
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], ARGV[1]) | ||
return moveToWaitingChildren(KEYS[2], KEYS[3], ARGV[4], ARGV[3], KEYS[1], KEYS[4], | ||
ARGV[1]) | ||
end | ||
@@ -49,0 +52,0 @@ return 1 |
{ | ||
"name": "bullmq", | ||
"version": "4.17.0", | ||
"version": "5.0.0", | ||
"description": "Queue for messages and jobs based on Redis", | ||
@@ -5,0 +5,0 @@ "homepage": "https://bullmq.io/", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
1467107
22812