Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@vendure/job-queue-plugin

Package Overview
Dependencies
Maintainers
1
Versions
147
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@vendure/job-queue-plugin - npm Package Compare versions

Comparing version 2.2.0-next.1 to 2.2.0-next.2

12

package.json
{
"name": "@vendure/job-queue-plugin",
"version": "2.2.0-next.1",
"version": "2.2.0-next.2",
"license": "MIT",

@@ -26,10 +26,10 @@ "main": "package/index.js",

"@google-cloud/pubsub": "^2.8.0",
"@vendure/common": "2.2.0-next.1",
"@vendure/core": "2.2.0-next.1",
"bullmq": "^3.15.5",
"ioredis": "^5.3.0",
"@vendure/common": "2.2.0-next.2",
"@vendure/core": "2.2.0-next.2",
"bullmq": "^5.1.7",
"ioredis": "^5.3.2",
"rimraf": "^3.0.2",
"typescript": "4.9.5"
},
"gitHead": "558d9fc1be780beed364221e83421bfb2942aa66"
"gitHead": "4c145666b147a13b38e3d19422727743d0a8218e"
}

@@ -18,2 +18,6 @@ import { JobListOptions } from '@vendure/common/lib/generated-types';

private queueNameProcessFnMap;
private cancellationSub;
private cancelRunningJob$;
private readonly CANCEL_JOB_CHANNEL;
private readonly CANCELLED_JOB_LIST_NAME;
init(injector: Injector): Promise<void>;

@@ -28,4 +32,6 @@ destroy(): Promise<void>;

start<Data extends JobData<Data> = object>(queueName: string, process: (job: Job<Data>) => Promise<any>): Promise<void>;
private subscribeToCancellationEvents;
private stopped;
stop<Data extends JobData<Data> = object>(queueName: string, process: (job: Job<Data>) => Promise<any>): Promise<void>;
private setActiveJobAsCancelled;
private createVendureJob;

@@ -32,0 +38,0 @@ private getState;

@@ -10,2 +10,4 @@ "use strict";

const ioredis_1 = require("ioredis");
const rxjs_1 = require("rxjs");
const operators_1 = require("rxjs/operators");
const constants_1 = require("./constants");

@@ -26,2 +28,10 @@ const redis_health_indicator_1 = require("./redis-health-indicator");

this.queueNameProcessFnMap = new Map();
this.cancelRunningJob$ = new rxjs_1.Subject();
this.CANCEL_JOB_CHANNEL = 'cancel-job';
this.CANCELLED_JOB_LIST_NAME = 'vendure:cancelled-jobs';
this.subscribeToCancellationEvents = (channel, jobId) => {
if (channel === this.CANCEL_JOB_CHANNEL && jobId) {
this.cancelRunningJob$.next(jobId);
}
};
this.stopped = false;

@@ -70,3 +80,3 @@ }

this.workerProcessor = async (bullJob) => {
var _a, _b;
var _a, _b, _c;
const queueName = bullJob.name;

@@ -77,5 +87,13 @@ core_1.Logger.debug(`Job ${(_a = bullJob.id) !== null && _a !== void 0 ? _a : ''} [${queueName}] starting (attempt ${bullJob.attemptsMade + 1} of ${(_b = bullJob.opts.attempts) !== null && _b !== void 0 ? _b : 1})`);

const job = await this.createVendureJob(bullJob);
const completed$ = new rxjs_1.Subject();
try {
// eslint-disable-next-line
job.on('progress', _job => bullJob.updateProgress(_job.progress));
this.cancelRunningJob$
.pipe((0, operators_1.filter)(jobId => jobId === job.id), (0, operators_1.takeUntil)(completed$))
.subscribe(() => {
var _a;
core_1.Logger.info(`Setting job ${(_a = job.id) !== null && _a !== void 0 ? _a : ''} as cancelled`, constants_1.loggerCtx);
job.cancel();
});
const result = await processFn(job);

@@ -88,5 +106,14 @@ await bullJob.updateProgress(100);

}
finally {
if (job.id) {
await this.redisConnection.srem(this.CANCELLED_JOB_LIST_NAME, (_c = job.id) === null || _c === void 0 ? void 0 : _c.toString());
}
completed$.next();
completed$.complete();
}
}
throw new core_1.InternalServerError(`No processor defined for the queue "${queueName}"`);
};
// Subscription-mode Redis connection for the cancellation messages
this.cancellationSub = new ioredis_1.Redis(this.connectionOptions);
}

@@ -114,14 +141,16 @@ async destroy() {

if (await bullJob.isActive()) {
// Not yet possible in BullMQ, see
// https://github.com/taskforcesh/bullmq/issues/632
throw new core_1.InternalServerError('Cannot cancel a running job');
}
try {
await bullJob.remove();
await this.setActiveJobAsCancelled(jobId);
return this.createVendureJob(bullJob);
}
catch (e) {
const message = `Error when cancelling job: ${JSON.stringify(e.message)}`;
core_1.Logger.error(message, constants_1.loggerCtx);
throw new core_1.InternalServerError(message);
else {
try {
const job = await this.createVendureJob(bullJob);
await bullJob.remove();
return job;
}
catch (e) {
const message = `Error when cancelling job: ${JSON.stringify(e.message)}`;
core_1.Logger.error(message, constants_1.loggerCtx);
throw new core_1.InternalServerError(message);
}
}

@@ -218,6 +247,6 @@ }

.on('closing', e => core_1.Logger.verbose(`BullMQ Worker closing: ${e}`, constants_1.loggerCtx))
.on('closed', () => core_1.Logger.verbose('BullMQ Worker closed'))
.on('closed', () => core_1.Logger.verbose('BullMQ Worker closed', constants_1.loggerCtx))
.on('failed', (job, error) => {
var _a, _b, _c, _d;
core_1.Logger.warn(`Job ${(_a = job === null || job === void 0 ? void 0 : job.id) !== null && _a !== void 0 ? _a : '(unknown id)'} [${(_b = job === null || job === void 0 ? void 0 : job.name) !== null && _b !== void 0 ? _b : 'unknown name'}] failed (attempt ${(_c = job === null || job === void 0 ? void 0 : job.attemptsMade) !== null && _c !== void 0 ? _c : 'unknown'} of ${(_d = job === null || job === void 0 ? void 0 : job.opts.attempts) !== null && _d !== void 0 ? _d : 1})`);
core_1.Logger.warn(`Job ${(_a = job === null || job === void 0 ? void 0 : job.id) !== null && _a !== void 0 ? _a : '(unknown id)'} [${(_b = job === null || job === void 0 ? void 0 : job.name) !== null && _b !== void 0 ? _b : 'unknown name'}] failed (attempt ${(_c = job === null || job === void 0 ? void 0 : job.attemptsMade) !== null && _c !== void 0 ? _c : 'unknown'} of ${(_d = job === null || job === void 0 ? void 0 : job.opts.attempts) !== null && _d !== void 0 ? _d : 1})`, constants_1.loggerCtx);
})

@@ -229,4 +258,6 @@ .on('stalled', (jobId) => {

var _a;
core_1.Logger.debug(`Job ${(_a = job === null || job === void 0 ? void 0 : job.id) !== null && _a !== void 0 ? _a : 'unknown id'} [${job.name}] completed`);
core_1.Logger.debug(`Job ${(_a = job === null || job === void 0 ? void 0 : job.id) !== null && _a !== void 0 ? _a : 'unknown id'} [${job.name}] completed`, constants_1.loggerCtx);
});
await this.cancellationSub.subscribe(this.CANCEL_JOB_CHANNEL);
this.cancellationSub.on('message', this.subscribeToCancellationEvents);
}

@@ -238,3 +269,19 @@ }

try {
await Promise.all([this.queue.close(), this.worker.close()]);
core_1.Logger.info(`Closing worker`, constants_1.loggerCtx);
let timer;
const checkActive = async () => {
const activeCount = await this.queue.getActiveCount();
if (0 < activeCount) {
const activeJobs = await this.queue.getActive();
core_1.Logger.info(`Waiting on ${activeCount} active ${activeCount > 1 ? 'jobs' : 'job'} (${activeJobs.map(j => j.id).join(', ')})...`, constants_1.loggerCtx);
timer = setTimeout(checkActive, 2000);
}
};
timer = setTimeout(checkActive, 2000);
await this.worker.close();
core_1.Logger.info(`Worker closed`, constants_1.loggerCtx);
await this.queue.close();
clearTimeout(timer);
core_1.Logger.info(`Queue closed`, constants_1.loggerCtx);
this.cancellationSub.off('message', this.subscribeToCancellationEvents);
}

@@ -246,2 +293,9 @@ catch (e) {

}
async setActiveJobAsCancelled(jobId) {
// Not yet possible natively in BullMQ, see
// https://github.com/taskforcesh/bullmq/issues/632
// So we have our own custom method of marking a job as cancelled.
await this.redisConnection.publish(this.CANCEL_JOB_CHANNEL, jobId);
await this.redisConnection.sadd(this.CANCELLED_JOB_LIST_NAME, jobId.toString());
}
async createVendureJob(bullJob) {

@@ -265,3 +319,4 @@ const jobJson = bullJob.toJSON();

async getState(bullJob) {
const jobJson = bullJob.toJSON();
var _a;
const jobId = (_a = bullJob.id) === null || _a === void 0 ? void 0 : _a.toString();
if ((await bullJob.isWaiting()) || (await bullJob.isWaitingChildren())) {

@@ -271,3 +326,9 @@ return generated_types_1.JobState.PENDING;

if (await bullJob.isActive()) {
return generated_types_1.JobState.RUNNING;
const isCancelled = jobId && (await this.redisConnection.sismember(this.CANCELLED_JOB_LIST_NAME, jobId));
if (isCancelled) {
return generated_types_1.JobState.CANCELLED;
}
else {
return generated_types_1.JobState.RUNNING;
}
}

@@ -283,5 +344,2 @@ if (await bullJob.isDelayed()) {

}
if (!jobJson.finishedOn) {
return generated_types_1.JobState.CANCELLED;
}
throw new core_1.InternalServerError('Could not determine job state');

@@ -288,0 +346,0 @@ // TODO: how to handle "cancelled" state? Currently when we cancel a job, we simply remove all record of it.

@@ -28,3 +28,3 @@ import { Job } from '@vendure/core';

*/
queueOptions?: Exclude<QueueOptions, 'connection'>;
queueOptions?: Omit<QueueOptions, 'connection'>;
/**

@@ -36,3 +36,3 @@ * @description

*/
workerOptions?: Exclude<WorkerOptions, 'connection'>;
workerOptions?: Omit<WorkerOptions, 'connection'>;
/**

@@ -39,0 +39,0 @@ * @description

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc