Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

bullmq

Package Overview
Dependencies
Maintainers
1
Versions
575
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

bullmq - npm Package Compare versions

Comparing version 5.34.7 to 5.34.8

dist/cjs/commands/updateJobScheduler-6.lua

93

dist/cjs/classes/job-scheduler.js

@@ -72,20 +72,19 @@ "use strict";

}
const multi = (await this.client).multi();
if (nextMillis) {
if (override) {
return this.trace(enums_1.SpanKind.PRODUCER, 'add', `${this.name}.${jobName}`, async (span, srcPropagationMedatada) => {
var _a, _b;
let telemetry = opts.telemetry;
if (srcPropagationMedatada) {
const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
(!omitContext && srcPropagationMedatada);
if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
return this.trace(enums_1.SpanKind.PRODUCER, 'add', `${this.name}.${jobName}`, async (span, srcPropagationMedatada) => {
var _a, _b;
let telemetry = opts.telemetry;
if (srcPropagationMedatada) {
const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
(!omitContext && srcPropagationMedatada);
if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
const mergedOpts = this.getNextJobOpts(nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts, telemetry }), iterationCount, newOffset);
}
const mergedOpts = this.getNextJobOpts(nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts, telemetry }), iterationCount, newOffset);
if (override) {
const jobId = await this.scripts.addJobScheduler(jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), job_1.Job.optsAsJSON(opts), {

@@ -105,58 +104,18 @@ name: jobName,

return job;
});
}
else {
this.scripts.updateJobSchedulerNextMillis(multi, jobSchedulerId, nextMillis);
}
return this.trace(enums_1.SpanKind.PRODUCER, 'add', `${this.name}.${jobName}`, async (span, srcPropagationMedatada) => {
var _a, _b;
let telemetry = opts.telemetry;
if (srcPropagationMedatada) {
const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
(!omitContext && srcPropagationMedatada);
if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
else {
const jobId = await this.scripts.updateJobSchedulerNextMillis(jobSchedulerId, nextMillis, job_1.Job.optsAsJSON(mergedOpts), producerId);
if (jobId) {
const job = new this.Job(this, jobName, jobData, mergedOpts, jobId);
job.id = jobId;
span === null || span === void 0 ? void 0 : span.setAttributes({
[enums_1.TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[enums_1.TelemetryAttributes.JobId]: job.id,
});
return job;
}
}
const job = this.createNextJob(multi, jobName, nextMillis, newOffset, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: Object.assign(Object.assign({}, filteredRepeatOpts), { offset: newOffset }), telemetry }), jobData, iterationCount, producerId);
const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]
// Check if there are any errors
const erroredResult = results.find(result => result[0]);
if (erroredResult) {
throw new Error(`Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`);
}
// Get last result with the job id
const lastResult = results.pop();
job.id = lastResult[1];
span === null || span === void 0 ? void 0 : span.setAttributes({
[enums_1.TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[enums_1.TelemetryAttributes.JobId]: job.id,
});
return job;
});
}
}
createNextJob(client, name, nextMillis, offset, jobSchedulerId, opts, data, currentCount,
// The job id of the job that produced this next iteration
producerId) {
//
// Generate unique job id for this iteration.
//
const jobId = this.getSchedulerNextJobId({
jobSchedulerId,
nextMillis,
});
const mergedOpts = this.getNextJobOpts(nextMillis, jobSchedulerId, opts, currentCount, offset);
const job = new this.Job(this, name, data, mergedOpts, jobId);
job.addJob(client);
if (producerId) {
const producerJobKey = this.toKey(producerId);
client.hset(producerJobKey, 'nrjid', job.id);
}
return job;
}
getNextJobOpts(nextMillis, jobSchedulerId, opts, currentCount, offset) {

@@ -163,0 +122,0 @@ var _a;

@@ -222,2 +222,25 @@ /**

}
async updateJobSchedulerNextMillis(jobSchedulerId, nextMillis, delayedJobOpts,
// The job id of the job that produced this next iteration
producerId) {
const client = await this.queue.client;
const queueKeys = this.queue.keys;
const keys = [
queueKeys.marker,
queueKeys.meta,
queueKeys.id,
queueKeys.delayed,
queueKeys.events,
queueKeys.repeat,
];
const args = [
nextMillis,
jobSchedulerId,
pack(delayedJobOpts),
Date.now(),
queueKeys[''],
producerId ? this.queue.toKey(producerId) : '',
];
return this.execCommand(client, 'updateJobScheduler', keys.concat(args));
}
async updateRepeatableJobMillis(client, customKey, nextMillis, legacyCustomKey) {

@@ -232,5 +255,2 @@ const args = [

}
async updateJobSchedulerNextMillis(client, jobSchedulerId, nextMillis) {
return client.zadd(this.queue.keys.repeat, nextMillis, jobSchedulerId);
}
removeRepeatableArgs(legacyRepeatJobId, repeatConcatOptions, repeatJobKey) {

@@ -237,0 +257,0 @@ const queueKeys = this.queue.keys;

@@ -52,3 +52,6 @@ "use strict";

--[[
Add marker if needed when a job is available.
Adds a delayed job to the queue by doing the following:
- Creates a new job key with the job data.
- adds to delayed zset.
- Emits a global event 'delayed' if the job is delayed.
]]

@@ -55,0 +58,0 @@ -- Includes

@@ -40,3 +40,6 @@ "use strict";

--[[
Add marker if needed when a job is available.
Adds a delayed job to the queue by doing the following:
- Creates a new job key with the job data.
- adds to delayed zset.
- Emits a global event 'delayed' if the job is delayed.
]]

@@ -43,0 +46,0 @@ -- Includes

@@ -47,4 +47,5 @@ "use strict";

tslib_1.__exportStar(require("./updateData-1"), exports);
tslib_1.__exportStar(require("./updateJobScheduler-6"), exports);
tslib_1.__exportStar(require("./updateProgress-3"), exports);
tslib_1.__exportStar(require("./updateRepeatableJobMillis-1"), exports);
//# sourceMappingURL=index.js.map
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.version = void 0;
exports.version = '5.34.7';
exports.version = '5.34.8';
//# sourceMappingURL=version.js.map

@@ -13,3 +13,2 @@ import { JobSchedulerJson, RepeatBaseOptions, RepeatOptions } from '../interfaces';

}): Promise<Job<T, R, N> | undefined>;
private createNextJob;
private getNextJobOpts;

@@ -16,0 +15,0 @@ removeJobScheduler(jobSchedulerId: string): Promise<number>;

@@ -69,20 +69,19 @@ import { __rest } from "tslib";

}
const multi = (await this.client).multi();
if (nextMillis) {
if (override) {
return this.trace(SpanKind.PRODUCER, 'add', `${this.name}.${jobName}`, async (span, srcPropagationMedatada) => {
var _a, _b;
let telemetry = opts.telemetry;
if (srcPropagationMedatada) {
const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
(!omitContext && srcPropagationMedatada);
if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
return this.trace(SpanKind.PRODUCER, 'add', `${this.name}.${jobName}`, async (span, srcPropagationMedatada) => {
var _a, _b;
let telemetry = opts.telemetry;
if (srcPropagationMedatada) {
const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
(!omitContext && srcPropagationMedatada);
if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
const mergedOpts = this.getNextJobOpts(nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts, telemetry }), iterationCount, newOffset);
}
const mergedOpts = this.getNextJobOpts(nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts, telemetry }), iterationCount, newOffset);
if (override) {
const jobId = await this.scripts.addJobScheduler(jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), Job.optsAsJSON(opts), {

@@ -102,58 +101,18 @@ name: jobName,

return job;
});
}
else {
this.scripts.updateJobSchedulerNextMillis(multi, jobSchedulerId, nextMillis);
}
return this.trace(SpanKind.PRODUCER, 'add', `${this.name}.${jobName}`, async (span, srcPropagationMedatada) => {
var _a, _b;
let telemetry = opts.telemetry;
if (srcPropagationMedatada) {
const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext;
const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) ||
(!omitContext && srcPropagationMedatada);
if (telemetryMetadata || omitContext) {
telemetry = {
metadata: telemetryMetadata,
omitContext,
};
}
else {
const jobId = await this.scripts.updateJobSchedulerNextMillis(jobSchedulerId, nextMillis, Job.optsAsJSON(mergedOpts), producerId);
if (jobId) {
const job = new this.Job(this, jobName, jobData, mergedOpts, jobId);
job.id = jobId;
span === null || span === void 0 ? void 0 : span.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});
return job;
}
}
const job = this.createNextJob(multi, jobName, nextMillis, newOffset, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: Object.assign(Object.assign({}, filteredRepeatOpts), { offset: newOffset }), telemetry }), jobData, iterationCount, producerId);
const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][]
// Check if there are any errors
const erroredResult = results.find(result => result[0]);
if (erroredResult) {
throw new Error(`Error upserting job scheduler ${jobSchedulerId} - ${erroredResult[0]}`);
}
// Get last result with the job id
const lastResult = results.pop();
job.id = lastResult[1];
span === null || span === void 0 ? void 0 : span.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});
return job;
});
}
}
createNextJob(client, name, nextMillis, offset, jobSchedulerId, opts, data, currentCount,
// The job id of the job that produced this next iteration
producerId) {
//
// Generate unique job id for this iteration.
//
const jobId = this.getSchedulerNextJobId({
jobSchedulerId,
nextMillis,
});
const mergedOpts = this.getNextJobOpts(nextMillis, jobSchedulerId, opts, currentCount, offset);
const job = new this.Job(this, name, data, mergedOpts, jobId);
job.addJob(client);
if (producerId) {
const producerJobKey = this.toKey(producerId);
client.hset(producerJobKey, 'nrjid', job.id);
}
return job;
}
getNextJobOpts(nextMillis, jobSchedulerId, opts, currentCount, offset) {

@@ -160,0 +119,0 @@ var _a;

@@ -26,4 +26,4 @@ /**

addJobScheduler(jobSchedulerId: string, nextMillis: number, templateData: string, templateOpts: RedisJobOptions, opts: RepeatableOptions, delayedJobOpts: JobsOptions, producerId?: string): Promise<string>;
updateJobSchedulerNextMillis(jobSchedulerId: string, nextMillis: number, delayedJobOpts: JobsOptions, producerId?: string): Promise<string | null>;
updateRepeatableJobMillis(client: RedisClient, customKey: string, nextMillis: number, legacyCustomKey: string): Promise<string>;
updateJobSchedulerNextMillis(client: RedisClient, jobSchedulerId: string, nextMillis: number): Promise<number>;
private removeRepeatableArgs;

@@ -30,0 +30,0 @@ getRepeatConcatOptions(repeatConcatOptions: string, repeatJobKey: string): string;

@@ -220,2 +220,25 @@ /**

}
async updateJobSchedulerNextMillis(jobSchedulerId, nextMillis, delayedJobOpts,
// The job id of the job that produced this next iteration
producerId) {
const client = await this.queue.client;
const queueKeys = this.queue.keys;
const keys = [
queueKeys.marker,
queueKeys.meta,
queueKeys.id,
queueKeys.delayed,
queueKeys.events,
queueKeys.repeat,
];
const args = [
nextMillis,
jobSchedulerId,
pack(delayedJobOpts),
Date.now(),
queueKeys[''],
producerId ? this.queue.toKey(producerId) : '',
];
return this.execCommand(client, 'updateJobScheduler', keys.concat(args));
}
async updateRepeatableJobMillis(client, customKey, nextMillis, legacyCustomKey) {

@@ -230,5 +253,2 @@ const args = [

}
async updateJobSchedulerNextMillis(client, jobSchedulerId, nextMillis) {
return client.zadd(this.queue.keys.repeat, nextMillis, jobSchedulerId);
}
removeRepeatableArgs(legacyRepeatJobId, repeatConcatOptions, repeatJobKey) {

@@ -235,0 +255,0 @@ const queueKeys = this.queue.keys;

@@ -49,3 +49,6 @@ const content = `--[[

--[[
Add marker if needed when a job is available.
Adds a delayed job to the queue by doing the following:
- Creates a new job key with the job data.
- adds to delayed zset.
- Emits a global event 'delayed' if the job is delayed.
]]

@@ -52,0 +55,0 @@ -- Includes

@@ -37,3 +37,6 @@ const content = `--[[

--[[
Add marker if needed when a job is available.
Adds a delayed job to the queue by doing the following:
- Creates a new job key with the job data.
- adds to delayed zset.
- Emits a global event 'delayed' if the job is delayed.
]]

@@ -40,0 +43,0 @@ -- Includes

@@ -44,3 +44,4 @@ export * from './addDelayedJob-6';

export * from './updateData-1';
export * from './updateJobScheduler-6';
export * from './updateProgress-3';
export * from './updateRepeatableJobMillis-1';

@@ -44,4 +44,5 @@ export * from './addDelayedJob-6';

export * from './updateData-1';
export * from './updateJobScheduler-6';
export * from './updateProgress-3';
export * from './updateRepeatableJobMillis-1';
//# sourceMappingURL=index.js.map

@@ -1,1 +0,1 @@

export declare const version = "5.34.7";
export declare const version = "5.34.8";

@@ -1,2 +0,2 @@

export const version = '5.34.7';
export const version = '5.34.8';
//# sourceMappingURL=version.js.map
{
"name": "bullmq",
"version": "5.34.7",
"version": "5.34.8",
"description": "Queue for messages and jobs based on Redis",

@@ -5,0 +5,0 @@ "homepage": "https://bullmq.io/",

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc