Comparing version 5.40.0 to 5.40.1
@@ -45,2 +45,3 @@ "use strict"; | ||
this.running = false; | ||
this.mainLoopRunning = null; | ||
if (!opts || !opts.connection) { | ||
@@ -191,66 +192,70 @@ throw new Error('Worker requires a connection'); | ||
this.running = true; | ||
if (this.closing) { | ||
if (this.closing || this.paused) { | ||
return; | ||
} | ||
await this.startStalledCheckTimer(); | ||
const jobsInProgress = new Set(); | ||
this.startLockExtenderTimer(jobsInProgress); | ||
const asyncFifoQueue = (this.asyncFifoQueue = | ||
new async_fifo_queue_1.AsyncFifoQueue()); | ||
let tokenPostfix = 0; | ||
const client = await this.client; | ||
const bclient = await this.blockingConnection.client; | ||
this.mainLoopRunning = this.mainLoop(client, bclient); | ||
// We must await here or finally will be called too early. | ||
await this.mainLoopRunning; | ||
} | ||
finally { | ||
this.running = false; | ||
} | ||
} | ||
/** | ||
* This is the main loop in BullMQ. Its goals are to fetch jobs from the queue | ||
* as efficiently as possible, providing concurrency and minimal unnecessary calls | ||
* to Redis. | ||
*/ | ||
async mainLoop(client, bclient) { | ||
const asyncFifoQueue = (this.asyncFifoQueue = new async_fifo_queue_1.AsyncFifoQueue()); | ||
const jobsInProgress = new Set(); | ||
this.startLockExtenderTimer(jobsInProgress); | ||
let tokenPostfix = 0; | ||
while (!this.closing && !this.paused) { | ||
let numTotal = asyncFifoQueue.numTotal(); | ||
/** | ||
* This is the main loop in BullMQ. Its goals are to fetch jobs from the queue | ||
* as efficiently as possible, providing concurrency and minimal unnecessary calls | ||
* to Redis. | ||
* This inner loop tries to fetch jobs concurrently, but if we are waiting for a job | ||
* to arrive at the queue we should not try to fetch more jobs (as it would be pointless) | ||
*/ | ||
while (!this.closing) { | ||
let numTotal = asyncFifoQueue.numTotal(); | ||
/** | ||
* This inner loop tries to fetch jobs concurrently, but if we are waiting for a job | ||
* to arrive at the queue we should not try to fetch more jobs (as it would be pointless) | ||
*/ | ||
while (!this.waiting && | ||
numTotal < this._concurrency && | ||
(!this.limitUntil || numTotal == 0)) { | ||
const token = `${this.id}:${tokenPostfix++}`; | ||
const fetchedJob = this.retryIfFailed(() => this._getNextJob(client, bclient, token, { block: true }), this.opts.runRetryDelay); | ||
asyncFifoQueue.add(fetchedJob); | ||
numTotal = asyncFifoQueue.numTotal(); | ||
if (this.waiting && numTotal > 1) { | ||
// We are waiting for jobs but we have others that we could start processing already | ||
break; | ||
} | ||
// We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls | ||
// to Redis in high concurrency scenarios. | ||
const job = await fetchedJob; | ||
// No more jobs waiting but we have others that could start processing already | ||
if (!job && numTotal > 1) { | ||
break; | ||
} | ||
// If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting | ||
// for processing this job. | ||
if (this.blockUntil) { | ||
break; | ||
} | ||
while (!this.closing && | ||
!this.paused && | ||
!this.waiting && | ||
numTotal < this._concurrency && | ||
(!this.limitUntil || numTotal == 0)) { | ||
const token = `${this.id}:${tokenPostfix++}`; | ||
const fetchedJob = this.retryIfFailed(() => this._getNextJob(client, bclient, token, { block: true }), this.opts.runRetryDelay); | ||
asyncFifoQueue.add(fetchedJob); | ||
numTotal = asyncFifoQueue.numTotal(); | ||
if (this.waiting && numTotal > 1) { | ||
// We are waiting for jobs but we have others that we could start processing already | ||
break; | ||
} | ||
// Since there can be undefined jobs in the queue (when a job fails or queue is empty) | ||
// we iterate until we find a job. | ||
let job; | ||
do { | ||
job = await asyncFifoQueue.fetch(); | ||
} while (!job && asyncFifoQueue.numQueued() > 0); | ||
if (job) { | ||
const token = job.token; | ||
asyncFifoQueue.add(this.retryIfFailed(() => this.processJob(job, token, () => asyncFifoQueue.numTotal() <= this._concurrency, jobsInProgress), this.opts.runRetryDelay)); | ||
// We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls | ||
// to Redis in high concurrency scenarios. | ||
const job = await fetchedJob; | ||
// No more jobs waiting but we have others that could start processing already | ||
if (!job && numTotal > 1) { | ||
break; | ||
} | ||
// If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting | ||
// for processing this job. | ||
if (this.blockUntil) { | ||
break; | ||
} | ||
} | ||
this.running = false; | ||
return await asyncFifoQueue.waitAll(); | ||
// Since there can be undefined jobs in the queue (when a job fails or queue is empty) | ||
// we iterate until we find a job. | ||
let job; | ||
do { | ||
job = await asyncFifoQueue.fetch(); | ||
} while (!job && asyncFifoQueue.numQueued() > 0); | ||
if (job) { | ||
const token = job.token; | ||
asyncFifoQueue.add(this.retryIfFailed(() => this.processJob(job, token, () => asyncFifoQueue.numTotal() <= this._concurrency, jobsInProgress), this.opts.runRetryDelay)); | ||
} | ||
} | ||
catch (error) { | ||
this.running = false; | ||
throw error; | ||
} | ||
return asyncFifoQueue.waitAll(); | ||
} | ||
@@ -279,8 +284,3 @@ /** | ||
if (this.paused) { | ||
if (block) { | ||
await this.paused; | ||
} | ||
else { | ||
return; | ||
} | ||
return; | ||
} | ||
@@ -460,5 +460,2 @@ if (this.closing) { | ||
var _a, _b; | ||
if (!job || this.closing || this.paused) { | ||
return; | ||
} | ||
const srcPropagationMedatada = (_b = (_a = job.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.metadata; | ||
@@ -545,2 +542,3 @@ return this.trace(enums_1.SpanKind.CONSUMER, 'process', this.name, async (span) => { | ||
await this.trace(enums_1.SpanKind.INTERNAL, 'pause', this.name, async (span) => { | ||
var _a; | ||
span === null || span === void 0 ? void 0 : span.setAttributes({ | ||
@@ -552,10 +550,5 @@ [enums_1.TelemetryAttributes.WorkerId]: this.id, | ||
if (!this.paused) { | ||
this.paused = new Promise(resolve => { | ||
this.resumeWorker = function () { | ||
resolve(); | ||
this.paused = null; // Allow pause to be checked externally for paused state. | ||
this.resumeWorker = null; | ||
}; | ||
}); | ||
this.paused = true; | ||
await (!doNotWaitActive && this.whenCurrentJobsFinished()); | ||
(_a = this.stalledCheckStopper) === null || _a === void 0 ? void 0 : _a.call(this); | ||
this.emit('paused'); | ||
@@ -570,3 +563,3 @@ } | ||
resume() { | ||
if (this.resumeWorker) { | ||
if (!this.running) { | ||
this.trace(enums_1.SpanKind.INTERNAL, 'resume', this.name, span => { | ||
@@ -577,3 +570,4 @@ span === null || span === void 0 ? void 0 : span.setAttributes({ | ||
}); | ||
this.resumeWorker(); | ||
this.paused = false; | ||
this.run(); | ||
this.emit('resumed'); | ||
@@ -682,3 +676,3 @@ }); | ||
async stalledChecker() { | ||
while (!this.closing) { | ||
while (!(this.closing || this.paused)) { | ||
try { | ||
@@ -747,4 +741,4 @@ await this.checkConnectionError(() => this.moveStalledJobsToWait()); | ||
} | ||
if (this.asyncFifoQueue) { | ||
await this.asyncFifoQueue.waitAll(); | ||
if (this.mainLoopRunning) { | ||
await this.mainLoopRunning; | ||
} | ||
@@ -751,0 +745,0 @@ reconnect && (await this.blockingConnection.reconnect()); |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.version = void 0; | ||
exports.version = '5.40.0'; | ||
exports.version = '5.40.1'; | ||
//# sourceMappingURL=version.js.map |
@@ -111,3 +111,2 @@ /// <reference types="node" /> | ||
private limitUntil; | ||
private resumeWorker; | ||
private stalledCheckStopper?; | ||
@@ -117,5 +116,6 @@ private waiting; | ||
protected _jobScheduler: JobScheduler; | ||
protected paused: Promise<void>; | ||
protected paused: boolean; | ||
protected processFn: Processor<DataType, ResultType, NameType>; | ||
protected running: boolean; | ||
protected mainLoopRunning: Promise<void> | null; | ||
static RateLimitError(): Error; | ||
@@ -142,2 +142,8 @@ constructor(name: string, processor?: string | URL | null | Processor<DataType, ResultType, NameType>, opts?: WorkerOptions, Connection?: typeof RedisConnection); | ||
/** | ||
* This is the main loop in BullMQ. Its goals are to fetch jobs from the queue | ||
* as efficiently as possible, providing concurrency and minimal unnecessary calls | ||
* to Redis. | ||
*/ | ||
private mainLoop; | ||
/** | ||
* Returns a promise that resolves to the next job in queue. | ||
@@ -144,0 +150,0 @@ * @param token - worker token to be assigned to retrieved job |
@@ -42,2 +42,3 @@ import * as fs from 'fs'; | ||
this.running = false; | ||
this.mainLoopRunning = null; | ||
if (!opts || !opts.connection) { | ||
@@ -188,66 +189,70 @@ throw new Error('Worker requires a connection'); | ||
this.running = true; | ||
if (this.closing) { | ||
if (this.closing || this.paused) { | ||
return; | ||
} | ||
await this.startStalledCheckTimer(); | ||
const jobsInProgress = new Set(); | ||
this.startLockExtenderTimer(jobsInProgress); | ||
const asyncFifoQueue = (this.asyncFifoQueue = | ||
new AsyncFifoQueue()); | ||
let tokenPostfix = 0; | ||
const client = await this.client; | ||
const bclient = await this.blockingConnection.client; | ||
this.mainLoopRunning = this.mainLoop(client, bclient); | ||
// We must await here or finally will be called too early. | ||
await this.mainLoopRunning; | ||
} | ||
finally { | ||
this.running = false; | ||
} | ||
} | ||
/** | ||
* This is the main loop in BullMQ. Its goals are to fetch jobs from the queue | ||
* as efficiently as possible, providing concurrency and minimal unnecessary calls | ||
* to Redis. | ||
*/ | ||
async mainLoop(client, bclient) { | ||
const asyncFifoQueue = (this.asyncFifoQueue = new AsyncFifoQueue()); | ||
const jobsInProgress = new Set(); | ||
this.startLockExtenderTimer(jobsInProgress); | ||
let tokenPostfix = 0; | ||
while (!this.closing && !this.paused) { | ||
let numTotal = asyncFifoQueue.numTotal(); | ||
/** | ||
* This is the main loop in BullMQ. Its goals are to fetch jobs from the queue | ||
* as efficiently as possible, providing concurrency and minimal unnecessary calls | ||
* to Redis. | ||
* This inner loop tries to fetch jobs concurrently, but if we are waiting for a job | ||
* to arrive at the queue we should not try to fetch more jobs (as it would be pointless) | ||
*/ | ||
while (!this.closing) { | ||
let numTotal = asyncFifoQueue.numTotal(); | ||
/** | ||
* This inner loop tries to fetch jobs concurrently, but if we are waiting for a job | ||
* to arrive at the queue we should not try to fetch more jobs (as it would be pointless) | ||
*/ | ||
while (!this.waiting && | ||
numTotal < this._concurrency && | ||
(!this.limitUntil || numTotal == 0)) { | ||
const token = `${this.id}:${tokenPostfix++}`; | ||
const fetchedJob = this.retryIfFailed(() => this._getNextJob(client, bclient, token, { block: true }), this.opts.runRetryDelay); | ||
asyncFifoQueue.add(fetchedJob); | ||
numTotal = asyncFifoQueue.numTotal(); | ||
if (this.waiting && numTotal > 1) { | ||
// We are waiting for jobs but we have others that we could start processing already | ||
break; | ||
} | ||
// We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls | ||
// to Redis in high concurrency scenarios. | ||
const job = await fetchedJob; | ||
// No more jobs waiting but we have others that could start processing already | ||
if (!job && numTotal > 1) { | ||
break; | ||
} | ||
// If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting | ||
// for processing this job. | ||
if (this.blockUntil) { | ||
break; | ||
} | ||
while (!this.closing && | ||
!this.paused && | ||
!this.waiting && | ||
numTotal < this._concurrency && | ||
(!this.limitUntil || numTotal == 0)) { | ||
const token = `${this.id}:${tokenPostfix++}`; | ||
const fetchedJob = this.retryIfFailed(() => this._getNextJob(client, bclient, token, { block: true }), this.opts.runRetryDelay); | ||
asyncFifoQueue.add(fetchedJob); | ||
numTotal = asyncFifoQueue.numTotal(); | ||
if (this.waiting && numTotal > 1) { | ||
// We are waiting for jobs but we have others that we could start processing already | ||
break; | ||
} | ||
// Since there can be undefined jobs in the queue (when a job fails or queue is empty) | ||
// we iterate until we find a job. | ||
let job; | ||
do { | ||
job = await asyncFifoQueue.fetch(); | ||
} while (!job && asyncFifoQueue.numQueued() > 0); | ||
if (job) { | ||
const token = job.token; | ||
asyncFifoQueue.add(this.retryIfFailed(() => this.processJob(job, token, () => asyncFifoQueue.numTotal() <= this._concurrency, jobsInProgress), this.opts.runRetryDelay)); | ||
// We await here so that we fetch jobs in sequence, this is important to avoid unnecessary calls | ||
// to Redis in high concurrency scenarios. | ||
const job = await fetchedJob; | ||
// No more jobs waiting but we have others that could start processing already | ||
if (!job && numTotal > 1) { | ||
break; | ||
} | ||
// If there are potential jobs to be processed and blockUntil is set, we should exit to avoid waiting | ||
// for processing this job. | ||
if (this.blockUntil) { | ||
break; | ||
} | ||
} | ||
this.running = false; | ||
return await asyncFifoQueue.waitAll(); | ||
// Since there can be undefined jobs in the queue (when a job fails or queue is empty) | ||
// we iterate until we find a job. | ||
let job; | ||
do { | ||
job = await asyncFifoQueue.fetch(); | ||
} while (!job && asyncFifoQueue.numQueued() > 0); | ||
if (job) { | ||
const token = job.token; | ||
asyncFifoQueue.add(this.retryIfFailed(() => this.processJob(job, token, () => asyncFifoQueue.numTotal() <= this._concurrency, jobsInProgress), this.opts.runRetryDelay)); | ||
} | ||
} | ||
catch (error) { | ||
this.running = false; | ||
throw error; | ||
} | ||
return asyncFifoQueue.waitAll(); | ||
} | ||
@@ -276,8 +281,3 @@ /** | ||
if (this.paused) { | ||
if (block) { | ||
await this.paused; | ||
} | ||
else { | ||
return; | ||
} | ||
return; | ||
} | ||
@@ -457,5 +457,2 @@ if (this.closing) { | ||
var _a, _b; | ||
if (!job || this.closing || this.paused) { | ||
return; | ||
} | ||
const srcPropagationMedatada = (_b = (_a = job.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.metadata; | ||
@@ -542,2 +539,3 @@ return this.trace(SpanKind.CONSUMER, 'process', this.name, async (span) => { | ||
await this.trace(SpanKind.INTERNAL, 'pause', this.name, async (span) => { | ||
var _a; | ||
span === null || span === void 0 ? void 0 : span.setAttributes({ | ||
@@ -549,10 +547,5 @@ [TelemetryAttributes.WorkerId]: this.id, | ||
if (!this.paused) { | ||
this.paused = new Promise(resolve => { | ||
this.resumeWorker = function () { | ||
resolve(); | ||
this.paused = null; // Allow pause to be checked externally for paused state. | ||
this.resumeWorker = null; | ||
}; | ||
}); | ||
this.paused = true; | ||
await (!doNotWaitActive && this.whenCurrentJobsFinished()); | ||
(_a = this.stalledCheckStopper) === null || _a === void 0 ? void 0 : _a.call(this); | ||
this.emit('paused'); | ||
@@ -567,3 +560,3 @@ } | ||
resume() { | ||
if (this.resumeWorker) { | ||
if (!this.running) { | ||
this.trace(SpanKind.INTERNAL, 'resume', this.name, span => { | ||
@@ -574,3 +567,4 @@ span === null || span === void 0 ? void 0 : span.setAttributes({ | ||
}); | ||
this.resumeWorker(); | ||
this.paused = false; | ||
this.run(); | ||
this.emit('resumed'); | ||
@@ -679,3 +673,3 @@ }); | ||
async stalledChecker() { | ||
while (!this.closing) { | ||
while (!(this.closing || this.paused)) { | ||
try { | ||
@@ -744,4 +738,4 @@ await this.checkConnectionError(() => this.moveStalledJobsToWait()); | ||
} | ||
if (this.asyncFifoQueue) { | ||
await this.asyncFifoQueue.waitAll(); | ||
if (this.mainLoopRunning) { | ||
await this.mainLoopRunning; | ||
} | ||
@@ -748,0 +742,0 @@ reconnect && (await this.blockingConnection.reconnect()); |
@@ -1,1 +0,1 @@ | ||
export declare const version = "5.40.0"; | ||
export declare const version = "5.40.1"; |
@@ -1,2 +0,2 @@ | ||
export const version = '5.40.0'; | ||
export const version = '5.40.1'; | ||
//# sourceMappingURL=version.js.map |
{ | ||
"name": "bullmq", | ||
"version": "5.40.0", | ||
"version": "5.40.1", | ||
"description": "Queue for messages and jobs based on Redis", | ||
@@ -5,0 +5,0 @@ "homepage": "https://bullmq.io/", |
@@ -47,3 +47,3 @@ <div align="center"> | ||
[<img src="http://taskforce.sh/assets/logo_square.png" width="150" alt="Taskforce.sh, Inc" style="padding: 200px"/>](https://taskforce.sh) | ||
[<img src="https://taskforce.sh/assets/logo_square.png" width="150" alt="Taskforce.sh, Inc" style="padding: 200px"/>](https://taskforce.sh) | ||
@@ -242,3 +242,3 @@ Supercharge your queues with a professional front end: | ||
Fork the repo, make some changes, submit a pull-request! Here is the [contributing](contributing.md) doc that has more details. | ||
Fork the repo, make some changes, submit a pull-request! Here is the [contributing](https://github.com/taskforcesh/bullmq/blob/master/contributing.md) doc that has more details. | ||
@@ -245,0 +245,0 @@ # Thanks |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
1941860
31237