@vendure/job-queue-plugin
Advanced tools
Comparing version 2.2.0-next.1 to 2.2.0-next.2
{ | ||
"name": "@vendure/job-queue-plugin", | ||
"version": "2.2.0-next.1", | ||
"version": "2.2.0-next.2", | ||
"license": "MIT", | ||
@@ -26,10 +26,10 @@ "main": "package/index.js", | ||
"@google-cloud/pubsub": "^2.8.0", | ||
"@vendure/common": "2.2.0-next.1", | ||
"@vendure/core": "2.2.0-next.1", | ||
"bullmq": "^3.15.5", | ||
"ioredis": "^5.3.0", | ||
"@vendure/common": "2.2.0-next.2", | ||
"@vendure/core": "2.2.0-next.2", | ||
"bullmq": "^5.1.7", | ||
"ioredis": "^5.3.2", | ||
"rimraf": "^3.0.2", | ||
"typescript": "4.9.5" | ||
}, | ||
"gitHead": "558d9fc1be780beed364221e83421bfb2942aa66" | ||
"gitHead": "4c145666b147a13b38e3d19422727743d0a8218e" | ||
} |
@@ -18,2 +18,6 @@ import { JobListOptions } from '@vendure/common/lib/generated-types'; | ||
private queueNameProcessFnMap; | ||
private cancellationSub; | ||
private cancelRunningJob$; | ||
private readonly CANCEL_JOB_CHANNEL; | ||
private readonly CANCELLED_JOB_LIST_NAME; | ||
init(injector: Injector): Promise<void>; | ||
@@ -28,4 +32,6 @@ destroy(): Promise<void>; | ||
start<Data extends JobData<Data> = object>(queueName: string, process: (job: Job<Data>) => Promise<any>): Promise<void>; | ||
private subscribeToCancellationEvents; | ||
private stopped; | ||
stop<Data extends JobData<Data> = object>(queueName: string, process: (job: Job<Data>) => Promise<any>): Promise<void>; | ||
private setActiveJobAsCancelled; | ||
private createVendureJob; | ||
@@ -32,0 +38,0 @@ private getState; |
@@ -10,2 +10,4 @@ "use strict"; | ||
const ioredis_1 = require("ioredis"); | ||
const rxjs_1 = require("rxjs"); | ||
const operators_1 = require("rxjs/operators"); | ||
const constants_1 = require("./constants"); | ||
@@ -26,2 +28,10 @@ const redis_health_indicator_1 = require("./redis-health-indicator"); | ||
this.queueNameProcessFnMap = new Map(); | ||
this.cancelRunningJob$ = new rxjs_1.Subject(); | ||
this.CANCEL_JOB_CHANNEL = 'cancel-job'; | ||
this.CANCELLED_JOB_LIST_NAME = 'vendure:cancelled-jobs'; | ||
this.subscribeToCancellationEvents = (channel, jobId) => { | ||
if (channel === this.CANCEL_JOB_CHANNEL && jobId) { | ||
this.cancelRunningJob$.next(jobId); | ||
} | ||
}; | ||
this.stopped = false; | ||
@@ -70,3 +80,3 @@ } | ||
this.workerProcessor = async (bullJob) => { | ||
var _a, _b; | ||
var _a, _b, _c; | ||
const queueName = bullJob.name; | ||
@@ -77,5 +87,13 @@ core_1.Logger.debug(`Job ${(_a = bullJob.id) !== null && _a !== void 0 ? _a : ''} [${queueName}] starting (attempt ${bullJob.attemptsMade + 1} of ${(_b = bullJob.opts.attempts) !== null && _b !== void 0 ? _b : 1})`); | ||
const job = await this.createVendureJob(bullJob); | ||
const completed$ = new rxjs_1.Subject(); | ||
try { | ||
// eslint-disable-next-line | ||
job.on('progress', _job => bullJob.updateProgress(_job.progress)); | ||
this.cancelRunningJob$ | ||
.pipe((0, operators_1.filter)(jobId => jobId === job.id), (0, operators_1.takeUntil)(completed$)) | ||
.subscribe(() => { | ||
var _a; | ||
core_1.Logger.info(`Setting job ${(_a = job.id) !== null && _a !== void 0 ? _a : ''} as cancelled`, constants_1.loggerCtx); | ||
job.cancel(); | ||
}); | ||
const result = await processFn(job); | ||
@@ -88,5 +106,14 @@ await bullJob.updateProgress(100); | ||
} | ||
finally { | ||
if (job.id) { | ||
await this.redisConnection.srem(this.CANCELLED_JOB_LIST_NAME, (_c = job.id) === null || _c === void 0 ? void 0 : _c.toString()); | ||
} | ||
completed$.next(); | ||
completed$.complete(); | ||
} | ||
} | ||
throw new core_1.InternalServerError(`No processor defined for the queue "${queueName}"`); | ||
}; | ||
// Subscription-mode Redis connection for the cancellation messages | ||
this.cancellationSub = new ioredis_1.Redis(this.connectionOptions); | ||
} | ||
@@ -114,14 +141,16 @@ async destroy() { | ||
if (await bullJob.isActive()) { | ||
// Not yet possible in BullMQ, see | ||
// https://github.com/taskforcesh/bullmq/issues/632 | ||
throw new core_1.InternalServerError('Cannot cancel a running job'); | ||
} | ||
try { | ||
await bullJob.remove(); | ||
await this.setActiveJobAsCancelled(jobId); | ||
return this.createVendureJob(bullJob); | ||
} | ||
catch (e) { | ||
const message = `Error when cancelling job: ${JSON.stringify(e.message)}`; | ||
core_1.Logger.error(message, constants_1.loggerCtx); | ||
throw new core_1.InternalServerError(message); | ||
else { | ||
try { | ||
const job = await this.createVendureJob(bullJob); | ||
await bullJob.remove(); | ||
return job; | ||
} | ||
catch (e) { | ||
const message = `Error when cancelling job: ${JSON.stringify(e.message)}`; | ||
core_1.Logger.error(message, constants_1.loggerCtx); | ||
throw new core_1.InternalServerError(message); | ||
} | ||
} | ||
@@ -218,6 +247,6 @@ } | ||
.on('closing', e => core_1.Logger.verbose(`BullMQ Worker closing: ${e}`, constants_1.loggerCtx)) | ||
.on('closed', () => core_1.Logger.verbose('BullMQ Worker closed')) | ||
.on('closed', () => core_1.Logger.verbose('BullMQ Worker closed', constants_1.loggerCtx)) | ||
.on('failed', (job, error) => { | ||
var _a, _b, _c, _d; | ||
core_1.Logger.warn(`Job ${(_a = job === null || job === void 0 ? void 0 : job.id) !== null && _a !== void 0 ? _a : '(unknown id)'} [${(_b = job === null || job === void 0 ? void 0 : job.name) !== null && _b !== void 0 ? _b : 'unknown name'}] failed (attempt ${(_c = job === null || job === void 0 ? void 0 : job.attemptsMade) !== null && _c !== void 0 ? _c : 'unknown'} of ${(_d = job === null || job === void 0 ? void 0 : job.opts.attempts) !== null && _d !== void 0 ? _d : 1})`); | ||
core_1.Logger.warn(`Job ${(_a = job === null || job === void 0 ? void 0 : job.id) !== null && _a !== void 0 ? _a : '(unknown id)'} [${(_b = job === null || job === void 0 ? void 0 : job.name) !== null && _b !== void 0 ? _b : 'unknown name'}] failed (attempt ${(_c = job === null || job === void 0 ? void 0 : job.attemptsMade) !== null && _c !== void 0 ? _c : 'unknown'} of ${(_d = job === null || job === void 0 ? void 0 : job.opts.attempts) !== null && _d !== void 0 ? _d : 1})`, constants_1.loggerCtx); | ||
}) | ||
@@ -229,4 +258,6 @@ .on('stalled', (jobId) => { | ||
var _a; | ||
core_1.Logger.debug(`Job ${(_a = job === null || job === void 0 ? void 0 : job.id) !== null && _a !== void 0 ? _a : 'unknown id'} [${job.name}] completed`); | ||
core_1.Logger.debug(`Job ${(_a = job === null || job === void 0 ? void 0 : job.id) !== null && _a !== void 0 ? _a : 'unknown id'} [${job.name}] completed`, constants_1.loggerCtx); | ||
}); | ||
await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL); | ||
this.cancellationSub.on('message', this.subscribeToCancellationEvents); | ||
} | ||
@@ -238,3 +269,19 @@ } | ||
try { | ||
await Promise.all([this.queue.close(), this.worker.close()]); | ||
core_1.Logger.info(`Closing worker`, constants_1.loggerCtx); | ||
let timer; | ||
const checkActive = async () => { | ||
const activeCount = await this.queue.getActiveCount(); | ||
if (0 < activeCount) { | ||
const activeJobs = await this.queue.getActive(); | ||
core_1.Logger.info(`Waiting on ${activeCount} active ${activeCount > 1 ? 'jobs' : 'job'} (${activeJobs.map(j => j.id).join(', ')})...`, constants_1.loggerCtx); | ||
timer = setTimeout(checkActive, 2000); | ||
} | ||
}; | ||
timer = setTimeout(checkActive, 2000); | ||
await this.worker.close(); | ||
core_1.Logger.info(`Worker closed`, constants_1.loggerCtx); | ||
await this.queue.close(); | ||
clearTimeout(timer); | ||
core_1.Logger.info(`Queue closed`, constants_1.loggerCtx); | ||
this.cancellationSub.off('message', this.subscribeToCancellationEvents); | ||
} | ||
@@ -246,2 +293,9 @@ catch (e) { | ||
} | ||
async setActiveJobAsCancelled(jobId) { | ||
// Not yet possible natively in BullMQ, see | ||
// https://github.com/taskforcesh/bullmq/issues/632 | ||
// So we have our own custom method of marking a job as cancelled. | ||
await this.redisConnection.publish(this.CANCEL_JOB_CHANNEL, jobId); | ||
await this.redisConnection.sadd(this.CANCELLED_JOB_LIST_NAME, jobId.toString()); | ||
} | ||
async createVendureJob(bullJob) { | ||
@@ -265,3 +319,4 @@ const jobJson = bullJob.toJSON(); | ||
async getState(bullJob) { | ||
const jobJson = bullJob.toJSON(); | ||
var _a; | ||
const jobId = (_a = bullJob.id) === null || _a === void 0 ? void 0 : _a.toString(); | ||
if ((await bullJob.isWaiting()) || (await bullJob.isWaitingChildren())) { | ||
@@ -271,3 +326,9 @@ return generated_types_1.JobState.PENDING; | ||
if (await bullJob.isActive()) { | ||
return generated_types_1.JobState.RUNNING; | ||
const isCancelled = jobId && (await this.redisConnection.sismember(this.CANCELLED_JOB_LIST_NAME, jobId)); | ||
if (isCancelled) { | ||
return generated_types_1.JobState.CANCELLED; | ||
} | ||
else { | ||
return generated_types_1.JobState.RUNNING; | ||
} | ||
} | ||
@@ -283,5 +344,2 @@ if (await bullJob.isDelayed()) { | ||
} | ||
if (!jobJson.finishedOn) { | ||
return generated_types_1.JobState.CANCELLED; | ||
} | ||
throw new core_1.InternalServerError('Could not determine job state'); | ||
@@ -288,0 +346,0 @@ // TODO: how to handle "cancelled" state? Currently when we cancel a job, we simply remove all record of it. |
@@ -28,3 +28,3 @@ import { Job } from '@vendure/core'; | ||
*/ | ||
queueOptions?: Exclude<QueueOptions, 'connection'>; | ||
queueOptions?: Omit<QueueOptions, 'connection'>; | ||
/** | ||
@@ -36,3 +36,3 @@ * @description | ||
*/ | ||
workerOptions?: Exclude<WorkerOptions, 'connection'>; | ||
workerOptions?: Omit<WorkerOptions, 'connection'>; | ||
/** | ||
@@ -39,0 +39,0 @@ * @description |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
86770
1383