Comparing version 5.33.1 to 5.34.0
@@ -172,3 +172,4 @@ "use strict"; | ||
const jobId = ((_b = node.opts) === null || _b === void 0 ? void 0 : _b.jobId) || (0, uuid_1.v4)(); | ||
return (0, utils_1.trace)(this.telemetry, enums_1.SpanKind.PRODUCER, node.name, 'addNode', node.queueName, async (span, dstPropagationMetadata) => { | ||
return (0, utils_1.trace)(this.telemetry, enums_1.SpanKind.PRODUCER, node.name, 'addNode', node.queueName, async (span, srcPropagationMedatada) => { | ||
var _a, _b; | ||
span === null || span === void 0 ? void 0 : span.setAttributes({ | ||
@@ -178,3 +179,16 @@ [enums_1.TelemetryAttributes.JobName]: node.name, | ||
}); | ||
const job = new this.Job(queue, node.name, node.data, Object.assign(Object.assign(Object.assign({}, jobsOpts), node.opts), { parent: parent === null || parent === void 0 ? void 0 : parent.parentOpts, telemetryMetadata: dstPropagationMetadata }), jobId); | ||
const opts = node.opts; | ||
let telemetry = opts === null || opts === void 0 ? void 0 : opts.telemetry; | ||
if (srcPropagationMedatada && opts) { | ||
const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext; | ||
const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) || | ||
(!omitContext && srcPropagationMedatada); | ||
if (telemetryMetadata || omitContext) { | ||
telemetry = { | ||
metadata: telemetryMetadata, | ||
omitContext, | ||
}; | ||
} | ||
} | ||
const job = new this.Job(queue, node.name, node.data, Object.assign(Object.assign(Object.assign({}, jobsOpts), opts), { parent: parent === null || parent === void 0 ? void 0 : parent.parentOpts, telemetry }), jobId); | ||
const parentKey = (0, utils_1.getParentKey)(parent === null || parent === void 0 ? void 0 : parent.parentOpts); | ||
@@ -181,0 +195,0 @@ if (node.children && node.children.length > 0) { |
@@ -6,2 +6,3 @@ "use strict"; | ||
const cron_parser_1 = require("cron-parser"); | ||
const job_1 = require("./job"); | ||
const queue_base_1 = require("./queue-base"); | ||
@@ -60,3 +61,3 @@ const enums_1 = require("../enums"); | ||
if (override) { | ||
this.scripts.addJobScheduler(multi, jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), (0, utils_1.optsAsJSON)(opts), { | ||
this.scripts.addJobScheduler(multi, jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), job_1.Job.optsAsJSON(opts), { | ||
name: jobName, | ||
@@ -73,3 +74,16 @@ endDate: endDate ? new Date(endDate).getTime() : undefined, | ||
return this.trace(enums_1.SpanKind.PRODUCER, 'add', `${this.name}.${jobName}`, async (span, srcPropagationMedatada) => { | ||
const job = this.createNextJob(multi, jobName, nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts, telemetryMetadata: srcPropagationMedatada }), jobData, iterationCount); | ||
var _a, _b; | ||
let telemetry = opts.telemetry; | ||
if (srcPropagationMedatada) { | ||
const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext; | ||
const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) || | ||
(!omitContext && srcPropagationMedatada); | ||
if (telemetryMetadata || omitContext) { | ||
telemetry = { | ||
metadata: telemetryMetadata, | ||
omitContext, | ||
}; | ||
} | ||
} | ||
const job = this.createNextJob(multi, jobName, nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts, telemetry }), jobData, iterationCount); | ||
const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][] | ||
@@ -148,3 +162,3 @@ // Check if there are any errors | ||
if (rawOpts) { | ||
template.opts = (0, utils_1.optsFromJSON)(rawOpts); | ||
template.opts = job_1.Job.optsFromJSON(rawOpts); | ||
} | ||
@@ -151,0 +165,0 @@ return template; |
@@ -12,2 +12,12 @@ "use strict"; | ||
const logger = (0, util_1.debuglog)('bull'); | ||
// Simple options decode map. | ||
const optsDecodeMap = { | ||
de: 'deduplication', | ||
fpof: 'failParentOnFailure', | ||
idof: 'ignoreDependencyOnFailure', | ||
kl: 'keepLogs', | ||
rdof: 'removeDependencyOnFailure', | ||
}; | ||
const optsEncodeMap = Object.assign(Object.assign({}, (0, utils_1.invertObject)(optsDecodeMap)), { | ||
/*/ Legacy for backwards compatibility */ debounce: 'de' }); | ||
exports.PRIORITY_LIMIT = 2 ** 21; | ||
@@ -160,3 +170,3 @@ /** | ||
const data = JSON.parse(json.data || '{}'); | ||
const opts = (0, utils_1.optsFromJSON)(json.opts); | ||
const opts = Job.optsFromJSON(json.opts); | ||
const job = new this(queue, json.name, data, opts, json.id || jobId); | ||
@@ -200,2 +210,26 @@ job.progress = JSON.parse(json.progress || '0'); | ||
} | ||
static optsFromJSON(rawOpts) { | ||
const opts = JSON.parse(rawOpts || '{}'); | ||
const optionEntries = Object.entries(opts); | ||
const options = {}; | ||
for (const item of optionEntries) { | ||
const [attributeName, value] = item; | ||
if (optsDecodeMap[attributeName]) { | ||
options[optsDecodeMap[attributeName]] = | ||
value; | ||
} | ||
else { | ||
if (attributeName === 'tm') { | ||
options.telemetry = Object.assign(Object.assign({}, options.telemetry), { metadata: value }); | ||
} | ||
else if (attributeName === 'omc') { | ||
options.telemetry = Object.assign(Object.assign({}, options.telemetry), { omitContext: value }); | ||
} | ||
else { | ||
options[attributeName] = value; | ||
} | ||
} | ||
} | ||
return options; | ||
} | ||
/** | ||
@@ -245,3 +279,3 @@ * Fetches a Job from the queue given the passed job id. | ||
data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data), | ||
opts: (0, utils_1.optsAsJSON)(this.opts), | ||
opts: Job.optsAsJSON(this.opts), | ||
parent: this.parent ? Object.assign({}, this.parent) : undefined, | ||
@@ -263,2 +297,27 @@ parentKey: this.parentKey, | ||
} | ||
static optsAsJSON(opts = {}) { | ||
const optionEntries = Object.entries(opts); | ||
const options = {}; | ||
for (const [attributeName, value] of optionEntries) { | ||
if (typeof value === 'undefined') { | ||
continue; | ||
} | ||
if (attributeName in optsEncodeMap) { | ||
const compressableAttribute = attributeName; | ||
const key = optsEncodeMap[compressableAttribute]; | ||
options[key] = value; | ||
} | ||
else { | ||
// Handle complex compressable fields separately | ||
if (attributeName === 'telemetry') { | ||
options.tm = value.metadata; | ||
options.omc = value.omitContext; | ||
} | ||
else { | ||
options[attributeName] = value; | ||
} | ||
} | ||
} | ||
return options; | ||
} | ||
/** | ||
@@ -403,2 +462,7 @@ * Prepares a job to be passed to Sandbox. | ||
return this.queue.trace(enums_1.SpanKind.INTERNAL, this.getSpanOperation('moveToFailed'), this.queue.name, async (span, dstPropagationMedatadata) => { | ||
var _a, _b; | ||
let tm; | ||
if (!((_b = (_a = this.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.omitContext) && dstPropagationMedatadata) { | ||
tm = dstPropagationMedatadata; | ||
} | ||
let result; | ||
@@ -409,3 +473,3 @@ this.updateStacktrace(err); | ||
stacktrace: JSON.stringify(this.stacktrace), | ||
tm: dstPropagationMedatadata, | ||
tm, | ||
}; | ||
@@ -412,0 +476,0 @@ // |
@@ -142,4 +142,8 @@ "use strict"; | ||
return this.trace(enums_1.SpanKind.PRODUCER, 'add', `${this.name}.${name}`, async (span, srcPropagationMedatada) => { | ||
if (srcPropagationMedatada) { | ||
opts = Object.assign(Object.assign({}, opts), { telemetryMetadata: srcPropagationMedatada }); | ||
var _a; | ||
if (srcPropagationMedatada && !((_a = opts === null || opts === void 0 ? void 0 : opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext)) { | ||
const telemetry = { | ||
metadata: srcPropagationMedatada, | ||
}; | ||
opts = Object.assign(Object.assign({}, opts), { telemetry }); | ||
} | ||
@@ -199,8 +203,20 @@ const job = await this.addJob(name, data, opts); | ||
return await this.Job.createBulk(this, jobs.map(job => { | ||
var _a; | ||
return ({ | ||
var _a, _b, _c, _d, _e, _f; | ||
let telemetry = (_a = job.opts) === null || _a === void 0 ? void 0 : _a.telemetry; | ||
if (srcPropagationMedatada) { | ||
const omitContext = (_c = (_b = job.opts) === null || _b === void 0 ? void 0 : _b.telemetry) === null || _c === void 0 ? void 0 : _c.omitContext; | ||
const telemetryMetadata = ((_e = (_d = job.opts) === null || _d === void 0 ? void 0 : _d.telemetry) === null || _e === void 0 ? void 0 : _e.metadata) || | ||
(!omitContext && srcPropagationMedatada); | ||
if (telemetryMetadata || omitContext) { | ||
telemetry = { | ||
metadata: telemetryMetadata, | ||
omitContext, | ||
}; | ||
} | ||
} | ||
return { | ||
name: job.name, | ||
data: job.data, | ||
opts: Object.assign(Object.assign(Object.assign({}, this.jobsOpts), job.opts), { jobId: (_a = job.opts) === null || _a === void 0 ? void 0 : _a.jobId, tm: span && srcPropagationMedatada }), | ||
}); | ||
opts: Object.assign(Object.assign(Object.assign({}, this.jobsOpts), job.opts), { jobId: (_f = job.opts) === null || _f === void 0 ? void 0 : _f.jobId, telemetry }), | ||
}; | ||
})); | ||
@@ -207,0 +223,0 @@ }); |
@@ -257,2 +257,3 @@ "use strict"; | ||
async getNextJob(token, { block = true } = {}) { | ||
var _a, _b; | ||
const nextJob = await this._getNextJob(await this.client, await this.blockingConnection.client, token, { block }); | ||
@@ -268,3 +269,3 @@ return this.trace(enums_1.SpanKind.INTERNAL, 'getNextJob', this.name, async (span) => { | ||
return nextJob; | ||
}, nextJob === null || nextJob === void 0 ? void 0 : nextJob.opts.telemetryMetadata); | ||
}, (_b = (_a = nextJob === null || nextJob === void 0 ? void 0 : nextJob.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.metadata); | ||
} | ||
@@ -453,6 +454,7 @@ async _getNextJob(client, bclient, token, { block = true } = {}) { | ||
async processJob(job, token, fetchNextCallback = () => true, jobsInProgress) { | ||
var _a, _b; | ||
if (!job || this.closing || this.paused) { | ||
return; | ||
} | ||
const { telemetryMetadata: srcPropagationMedatada } = job.opts; | ||
const srcPropagationMedatada = (_b = (_a = job.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.metadata; | ||
return this.trace(enums_1.SpanKind.CONSUMER, 'process', this.name, async (span) => { | ||
@@ -459,0 +461,0 @@ span === null || span === void 0 ? void 0 : span.setAttributes({ |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.trace = exports.removeUndefinedFields = exports.optsFromJSON = exports.optsAsJSON = exports.QUEUE_EVENT_SUFFIX = exports.toString = exports.errorToJSON = exports.parseObjectValues = exports.isRedisVersionLowerThan = exports.childSend = exports.asyncSend = exports.isNotConnectionError = exports.DELAY_TIME_1 = exports.DELAY_TIME_5 = exports.clientCommandMessageReg = exports.getParentKey = exports.removeAllQueueData = exports.decreaseMaxListeners = exports.isRedisCluster = exports.isRedisInstance = exports.invertObject = exports.increaseMaxListeners = exports.delay = exports.objectToFlatArray = exports.array2obj = exports.isEmpty = exports.lengthInUtf8Bytes = exports.tryCatch = exports.errorObject = void 0; | ||
exports.trace = exports.removeUndefinedFields = exports.QUEUE_EVENT_SUFFIX = exports.toString = exports.errorToJSON = exports.parseObjectValues = exports.isRedisVersionLowerThan = exports.childSend = exports.asyncSend = exports.isNotConnectionError = exports.DELAY_TIME_1 = exports.DELAY_TIME_5 = exports.clientCommandMessageReg = exports.getParentKey = exports.removeAllQueueData = exports.decreaseMaxListeners = exports.isRedisCluster = exports.isRedisInstance = exports.invertObject = exports.increaseMaxListeners = exports.delay = exports.objectToFlatArray = exports.array2obj = exports.isEmpty = exports.lengthInUtf8Bytes = exports.tryCatch = exports.errorObject = void 0; | ||
const ioredis_1 = require("ioredis"); | ||
@@ -77,8 +77,8 @@ // eslint-disable-next-line @typescript-eslint/ban-ts-comment | ||
exports.increaseMaxListeners = increaseMaxListeners; | ||
const invertObject = (obj) => { | ||
return Object.entries(obj).reduce((encodeMap, [key, value]) => { | ||
encodeMap[value] = key; | ||
return encodeMap; | ||
function invertObject(obj) { | ||
return Object.entries(obj).reduce((result, [key, value]) => { | ||
result[value] = key; | ||
return result; | ||
}, {}); | ||
}; | ||
} | ||
exports.invertObject = invertObject; | ||
@@ -224,47 +224,2 @@ function isRedisInstance(obj) { | ||
exports.QUEUE_EVENT_SUFFIX = ':qe'; | ||
const optsDecodeMap = { | ||
de: 'deduplication', | ||
fpof: 'failParentOnFailure', | ||
idof: 'ignoreDependencyOnFailure', | ||
kl: 'keepLogs', | ||
rdof: 'removeDependencyOnFailure', | ||
tm: 'telemetryMetadata', | ||
}; | ||
const optsEncodeMap = (0, exports.invertObject)(optsDecodeMap); | ||
optsEncodeMap.debounce = 'de'; | ||
function optsAsJSON(opts = {}) { | ||
const optionEntries = Object.entries(opts); | ||
const options = {}; | ||
for (const item of optionEntries) { | ||
const [attributeName, value] = item; | ||
if (value !== undefined) { | ||
if (optsEncodeMap[attributeName]) { | ||
options[optsEncodeMap[attributeName]] = | ||
value; | ||
} | ||
else { | ||
options[attributeName] = value; | ||
} | ||
} | ||
} | ||
return options; | ||
} | ||
exports.optsAsJSON = optsAsJSON; | ||
function optsFromJSON(rawOpts) { | ||
const opts = JSON.parse(rawOpts || '{}'); | ||
const optionEntries = Object.entries(opts); | ||
const options = {}; | ||
for (const item of optionEntries) { | ||
const [attributeName, value] = item; | ||
if (optsDecodeMap[attributeName]) { | ||
options[optsDecodeMap[attributeName]] = | ||
value; | ||
} | ||
else { | ||
options[attributeName] = value; | ||
} | ||
} | ||
return options; | ||
} | ||
exports.optsFromJSON = optsFromJSON; | ||
function removeUndefinedFields(obj) { | ||
@@ -271,0 +226,0 @@ const newObj = {}; |
"use strict"; | ||
Object.defineProperty(exports, "__esModule", { value: true }); | ||
exports.version = void 0; | ||
exports.version = '5.33.1'; | ||
exports.version = '5.34.0'; | ||
//# sourceMappingURL=version.js.map |
@@ -169,3 +169,4 @@ import { EventEmitter } from 'events'; | ||
const jobId = ((_b = node.opts) === null || _b === void 0 ? void 0 : _b.jobId) || v4(); | ||
return trace(this.telemetry, SpanKind.PRODUCER, node.name, 'addNode', node.queueName, async (span, dstPropagationMetadata) => { | ||
return trace(this.telemetry, SpanKind.PRODUCER, node.name, 'addNode', node.queueName, async (span, srcPropagationMedatada) => { | ||
var _a, _b; | ||
span === null || span === void 0 ? void 0 : span.setAttributes({ | ||
@@ -175,3 +176,16 @@ [TelemetryAttributes.JobName]: node.name, | ||
}); | ||
const job = new this.Job(queue, node.name, node.data, Object.assign(Object.assign(Object.assign({}, jobsOpts), node.opts), { parent: parent === null || parent === void 0 ? void 0 : parent.parentOpts, telemetryMetadata: dstPropagationMetadata }), jobId); | ||
const opts = node.opts; | ||
let telemetry = opts === null || opts === void 0 ? void 0 : opts.telemetry; | ||
if (srcPropagationMedatada && opts) { | ||
const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext; | ||
const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) || | ||
(!omitContext && srcPropagationMedatada); | ||
if (telemetryMetadata || omitContext) { | ||
telemetry = { | ||
metadata: telemetryMetadata, | ||
omitContext, | ||
}; | ||
} | ||
} | ||
const job = new this.Job(queue, node.name, node.data, Object.assign(Object.assign(Object.assign({}, jobsOpts), opts), { parent: parent === null || parent === void 0 ? void 0 : parent.parentOpts, telemetry }), jobId); | ||
const parentKey = getParentKey(parent === null || parent === void 0 ? void 0 : parent.parentOpts); | ||
@@ -178,0 +192,0 @@ if (node.children && node.children.length > 0) { |
import { __rest } from "tslib"; | ||
import { parseExpression } from 'cron-parser'; | ||
import { Job } from './job'; | ||
import { QueueBase } from './queue-base'; | ||
import { SpanKind, TelemetryAttributes } from '../enums'; | ||
import { array2obj, optsAsJSON, optsFromJSON } from '../utils'; | ||
import { array2obj } from '../utils'; | ||
export class JobScheduler extends QueueBase { | ||
@@ -56,3 +57,3 @@ constructor(name, opts, Connection) { | ||
if (override) { | ||
this.scripts.addJobScheduler(multi, jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), optsAsJSON(opts), { | ||
this.scripts.addJobScheduler(multi, jobSchedulerId, nextMillis, JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), Job.optsAsJSON(opts), { | ||
name: jobName, | ||
@@ -69,3 +70,16 @@ endDate: endDate ? new Date(endDate).getTime() : undefined, | ||
return this.trace(SpanKind.PRODUCER, 'add', `${this.name}.${jobName}`, async (span, srcPropagationMedatada) => { | ||
const job = this.createNextJob(multi, jobName, nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts, telemetryMetadata: srcPropagationMedatada }), jobData, iterationCount); | ||
var _a, _b; | ||
let telemetry = opts.telemetry; | ||
if (srcPropagationMedatada) { | ||
const omitContext = (_a = opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext; | ||
const telemetryMetadata = ((_b = opts.telemetry) === null || _b === void 0 ? void 0 : _b.metadata) || | ||
(!omitContext && srcPropagationMedatada); | ||
if (telemetryMetadata || omitContext) { | ||
telemetry = { | ||
metadata: telemetryMetadata, | ||
omitContext, | ||
}; | ||
} | ||
} | ||
const job = this.createNextJob(multi, jobName, nextMillis, jobSchedulerId, Object.assign(Object.assign({}, opts), { repeat: filteredRepeatOpts, telemetry }), jobData, iterationCount); | ||
const results = await multi.exec(); // multi.exec returns an array of results [ err, result ][] | ||
@@ -144,3 +158,3 @@ // Check if there are any errors | ||
if (rawOpts) { | ||
template.opts = optsFromJSON(rawOpts); | ||
template.opts = Job.optsFromJSON(rawOpts); | ||
} | ||
@@ -147,0 +161,0 @@ return template; |
import { BulkJobOptions, DependenciesOpts, JobJson, JobJsonRaw, MinimalJob, MoveToWaitingChildrenOpts, ParentKeys, ParentOpts, RedisClient } from '../interfaces'; | ||
import { FinishedStatus, JobsOptions, JobState, JobJsonSandbox, MinimalQueue } from '../types'; | ||
import { FinishedStatus, JobsOptions, JobState, JobJsonSandbox, MinimalQueue, RedisJobOptions } from '../types'; | ||
import { Scripts } from './scripts'; | ||
@@ -166,2 +166,3 @@ import type { QueueEvents } from './queue-events'; | ||
protected setScripts(): void; | ||
static optsFromJSON(rawOpts?: string): JobsOptions; | ||
/** | ||
@@ -192,2 +193,3 @@ * Fetches a Job from the queue given the passed job id. | ||
asJSON(): JobJson; | ||
static optsAsJSON(opts?: JobsOptions): RedisJobOptions; | ||
/** | ||
@@ -194,0 +196,0 @@ * Prepares a job to be passed to Sandbox. |
import { __rest } from "tslib"; | ||
import { debuglog } from 'util'; | ||
import { errorObject, isEmpty, getParentKey, lengthInUtf8Bytes, parseObjectValues, tryCatch, removeUndefinedFields, optsAsJSON, optsFromJSON, } from '../utils'; | ||
import { errorObject, isEmpty, getParentKey, lengthInUtf8Bytes, parseObjectValues, tryCatch, removeUndefinedFields, invertObject, } from '../utils'; | ||
import { Backoffs } from './backoffs'; | ||
@@ -9,2 +9,12 @@ import { Scripts } from './scripts'; | ||
const logger = debuglog('bull'); | ||
// Simple options decode map. | ||
const optsDecodeMap = { | ||
de: 'deduplication', | ||
fpof: 'failParentOnFailure', | ||
idof: 'ignoreDependencyOnFailure', | ||
kl: 'keepLogs', | ||
rdof: 'removeDependencyOnFailure', | ||
}; | ||
const optsEncodeMap = Object.assign(Object.assign({}, invertObject(optsDecodeMap)), { | ||
/*/ Legacy for backwards compatibility */ debounce: 'de' }); | ||
export const PRIORITY_LIMIT = 2 ** 21; | ||
@@ -157,3 +167,3 @@ /** | ||
const data = JSON.parse(json.data || '{}'); | ||
const opts = optsFromJSON(json.opts); | ||
const opts = Job.optsFromJSON(json.opts); | ||
const job = new this(queue, json.name, data, opts, json.id || jobId); | ||
@@ -197,2 +207,26 @@ job.progress = JSON.parse(json.progress || '0'); | ||
} | ||
static optsFromJSON(rawOpts) { | ||
const opts = JSON.parse(rawOpts || '{}'); | ||
const optionEntries = Object.entries(opts); | ||
const options = {}; | ||
for (const item of optionEntries) { | ||
const [attributeName, value] = item; | ||
if (optsDecodeMap[attributeName]) { | ||
options[optsDecodeMap[attributeName]] = | ||
value; | ||
} | ||
else { | ||
if (attributeName === 'tm') { | ||
options.telemetry = Object.assign(Object.assign({}, options.telemetry), { metadata: value }); | ||
} | ||
else if (attributeName === 'omc') { | ||
options.telemetry = Object.assign(Object.assign({}, options.telemetry), { omitContext: value }); | ||
} | ||
else { | ||
options[attributeName] = value; | ||
} | ||
} | ||
} | ||
return options; | ||
} | ||
/** | ||
@@ -242,3 +276,3 @@ * Fetches a Job from the queue given the passed job id. | ||
data: JSON.stringify(typeof this.data === 'undefined' ? {} : this.data), | ||
opts: optsAsJSON(this.opts), | ||
opts: Job.optsAsJSON(this.opts), | ||
parent: this.parent ? Object.assign({}, this.parent) : undefined, | ||
@@ -260,2 +294,27 @@ parentKey: this.parentKey, | ||
} | ||
static optsAsJSON(opts = {}) { | ||
const optionEntries = Object.entries(opts); | ||
const options = {}; | ||
for (const [attributeName, value] of optionEntries) { | ||
if (typeof value === 'undefined') { | ||
continue; | ||
} | ||
if (attributeName in optsEncodeMap) { | ||
const compressableAttribute = attributeName; | ||
const key = optsEncodeMap[compressableAttribute]; | ||
options[key] = value; | ||
} | ||
else { | ||
// Handle complex compressable fields separately | ||
if (attributeName === 'telemetry') { | ||
options.tm = value.metadata; | ||
options.omc = value.omitContext; | ||
} | ||
else { | ||
options[attributeName] = value; | ||
} | ||
} | ||
} | ||
return options; | ||
} | ||
/** | ||
@@ -400,2 +459,7 @@ * Prepares a job to be passed to Sandbox. | ||
return this.queue.trace(SpanKind.INTERNAL, this.getSpanOperation('moveToFailed'), this.queue.name, async (span, dstPropagationMedatadata) => { | ||
var _a, _b; | ||
let tm; | ||
if (!((_b = (_a = this.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.omitContext) && dstPropagationMedatadata) { | ||
tm = dstPropagationMedatadata; | ||
} | ||
let result; | ||
@@ -406,3 +470,3 @@ this.updateStacktrace(err); | ||
stacktrace: JSON.stringify(this.stacktrace), | ||
tm: dstPropagationMedatadata, | ||
tm, | ||
}; | ||
@@ -409,0 +473,0 @@ // |
@@ -139,4 +139,8 @@ import { v4 } from 'uuid'; | ||
return this.trace(SpanKind.PRODUCER, 'add', `${this.name}.${name}`, async (span, srcPropagationMedatada) => { | ||
if (srcPropagationMedatada) { | ||
opts = Object.assign(Object.assign({}, opts), { telemetryMetadata: srcPropagationMedatada }); | ||
var _a; | ||
if (srcPropagationMedatada && !((_a = opts === null || opts === void 0 ? void 0 : opts.telemetry) === null || _a === void 0 ? void 0 : _a.omitContext)) { | ||
const telemetry = { | ||
metadata: srcPropagationMedatada, | ||
}; | ||
opts = Object.assign(Object.assign({}, opts), { telemetry }); | ||
} | ||
@@ -196,8 +200,20 @@ const job = await this.addJob(name, data, opts); | ||
return await this.Job.createBulk(this, jobs.map(job => { | ||
var _a; | ||
return ({ | ||
var _a, _b, _c, _d, _e, _f; | ||
let telemetry = (_a = job.opts) === null || _a === void 0 ? void 0 : _a.telemetry; | ||
if (srcPropagationMedatada) { | ||
const omitContext = (_c = (_b = job.opts) === null || _b === void 0 ? void 0 : _b.telemetry) === null || _c === void 0 ? void 0 : _c.omitContext; | ||
const telemetryMetadata = ((_e = (_d = job.opts) === null || _d === void 0 ? void 0 : _d.telemetry) === null || _e === void 0 ? void 0 : _e.metadata) || | ||
(!omitContext && srcPropagationMedatada); | ||
if (telemetryMetadata || omitContext) { | ||
telemetry = { | ||
metadata: telemetryMetadata, | ||
omitContext, | ||
}; | ||
} | ||
} | ||
return { | ||
name: job.name, | ||
data: job.data, | ||
opts: Object.assign(Object.assign(Object.assign({}, this.jobsOpts), job.opts), { jobId: (_a = job.opts) === null || _a === void 0 ? void 0 : _a.jobId, tm: span && srcPropagationMedatada }), | ||
}); | ||
opts: Object.assign(Object.assign(Object.assign({}, this.jobsOpts), job.opts), { jobId: (_f = job.opts) === null || _f === void 0 ? void 0 : _f.jobId, telemetry }), | ||
}; | ||
})); | ||
@@ -204,0 +220,0 @@ }); |
@@ -254,2 +254,3 @@ import * as fs from 'fs'; | ||
async getNextJob(token, { block = true } = {}) { | ||
var _a, _b; | ||
const nextJob = await this._getNextJob(await this.client, await this.blockingConnection.client, token, { block }); | ||
@@ -265,3 +266,3 @@ return this.trace(SpanKind.INTERNAL, 'getNextJob', this.name, async (span) => { | ||
return nextJob; | ||
}, nextJob === null || nextJob === void 0 ? void 0 : nextJob.opts.telemetryMetadata); | ||
}, (_b = (_a = nextJob === null || nextJob === void 0 ? void 0 : nextJob.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.metadata); | ||
} | ||
@@ -450,6 +451,7 @@ async _getNextJob(client, bclient, token, { block = true } = {}) { | ||
async processJob(job, token, fetchNextCallback = () => true, jobsInProgress) { | ||
var _a, _b; | ||
if (!job || this.closing || this.paused) { | ||
return; | ||
} | ||
const { telemetryMetadata: srcPropagationMedatada } = job.opts; | ||
const srcPropagationMedatada = (_b = (_a = job.opts) === null || _a === void 0 ? void 0 : _a.telemetry) === null || _b === void 0 ? void 0 : _b.metadata; | ||
return this.trace(SpanKind.CONSUMER, 'process', this.name, async (span) => { | ||
@@ -456,0 +458,0 @@ span === null || span === void 0 ? void 0 : span.setAttributes({ |
@@ -100,6 +100,2 @@ import { BackoffOptions } from './backoff-options'; | ||
prevMillis?: number; | ||
/** | ||
* TelemetryMetadata, provide for context propagation. | ||
*/ | ||
telemetryMetadata?: string; | ||
} |
import { BaseJobOptions, DebounceOptions } from '../interfaces'; | ||
export type JobsOptions = BaseJobOptions & { | ||
/** | ||
* These options will be stored in Redis with smaller | ||
* keys for compactness. | ||
*/ | ||
export type CompressableJobOptions = { | ||
/** | ||
@@ -24,3 +28,18 @@ * Debounce options. | ||
removeDependencyOnFailure?: boolean; | ||
/** | ||
* Telemetry options | ||
*/ | ||
telemetry?: { | ||
/** | ||
* Metadata, used for context propagation. | ||
*/ | ||
metadata?: string; | ||
/** | ||
* If `true` telemetry will omit the context propagation | ||
* @default false | ||
*/ | ||
omitContext?: boolean; | ||
}; | ||
}; | ||
export type JobsOptions = BaseJobOptions & CompressableJobOptions; | ||
/** | ||
@@ -54,2 +73,11 @@ * These fields are the ones stored in Redis with smaller keys for compactness. | ||
tm?: string; | ||
/** | ||
* Omit Context Propagation | ||
*/ | ||
omc?: boolean; | ||
/** | ||
* Deduplication identifier. | ||
* @deprecated use deid | ||
*/ | ||
de?: string; | ||
}; |
@@ -9,3 +9,2 @@ /// <reference types="node" /> | ||
import { SpanKind } from './enums'; | ||
import { JobsOptions, RedisJobOptions } from './types'; | ||
export declare const errorObject: { | ||
@@ -26,3 +25,8 @@ [index: string]: any; | ||
export declare function increaseMaxListeners(emitter: EventEmitter, count: number): void; | ||
export declare const invertObject: (obj: Record<string, string>) => Record<string, string>; | ||
type Invert<T extends Record<PropertyKey, PropertyKey>> = { | ||
[V in T[keyof T]]: { | ||
[K in keyof T]: T[K] extends V ? K : never; | ||
}[keyof T]; | ||
}; | ||
export declare function invertObject<T extends Record<PropertyKey, PropertyKey>>(obj: T): Invert<T>; | ||
export declare function isRedisInstance(obj: any): obj is Redis | Cluster; | ||
@@ -53,4 +57,2 @@ export declare function isRedisCluster(obj: unknown): obj is Cluster; | ||
export declare const QUEUE_EVENT_SUFFIX = ":qe"; | ||
export declare function optsAsJSON(opts?: JobsOptions): RedisJobOptions; | ||
export declare function optsFromJSON(rawOpts?: string): JobsOptions; | ||
export declare function removeUndefinedFields<T extends Record<string, any>>(obj: Record<string, any>): T; | ||
@@ -57,0 +59,0 @@ /** |
@@ -67,8 +67,8 @@ import { Cluster } from 'ioredis'; | ||
} | ||
export const invertObject = (obj) => { | ||
return Object.entries(obj).reduce((encodeMap, [key, value]) => { | ||
encodeMap[value] = key; | ||
return encodeMap; | ||
export function invertObject(obj) { | ||
return Object.entries(obj).reduce((result, [key, value]) => { | ||
result[value] = key; | ||
return result; | ||
}, {}); | ||
}; | ||
} | ||
export function isRedisInstance(obj) { | ||
@@ -201,45 +201,2 @@ if (!obj) { | ||
export const QUEUE_EVENT_SUFFIX = ':qe'; | ||
const optsDecodeMap = { | ||
de: 'deduplication', | ||
fpof: 'failParentOnFailure', | ||
idof: 'ignoreDependencyOnFailure', | ||
kl: 'keepLogs', | ||
rdof: 'removeDependencyOnFailure', | ||
tm: 'telemetryMetadata', | ||
}; | ||
const optsEncodeMap = invertObject(optsDecodeMap); | ||
optsEncodeMap.debounce = 'de'; | ||
export function optsAsJSON(opts = {}) { | ||
const optionEntries = Object.entries(opts); | ||
const options = {}; | ||
for (const item of optionEntries) { | ||
const [attributeName, value] = item; | ||
if (value !== undefined) { | ||
if (optsEncodeMap[attributeName]) { | ||
options[optsEncodeMap[attributeName]] = | ||
value; | ||
} | ||
else { | ||
options[attributeName] = value; | ||
} | ||
} | ||
} | ||
return options; | ||
} | ||
export function optsFromJSON(rawOpts) { | ||
const opts = JSON.parse(rawOpts || '{}'); | ||
const optionEntries = Object.entries(opts); | ||
const options = {}; | ||
for (const item of optionEntries) { | ||
const [attributeName, value] = item; | ||
if (optsDecodeMap[attributeName]) { | ||
options[optsDecodeMap[attributeName]] = | ||
value; | ||
} | ||
else { | ||
options[attributeName] = value; | ||
} | ||
} | ||
return options; | ||
} | ||
export function removeUndefinedFields(obj) { | ||
@@ -246,0 +203,0 @@ const newObj = {}; |
@@ -1,1 +0,1 @@ | ||
export declare const version = "5.33.1"; | ||
export declare const version = "5.34.0"; |
@@ -1,2 +0,2 @@ | ||
export const version = '5.33.1'; | ||
export const version = '5.34.0'; | ||
//# sourceMappingURL=version.js.map |
{ | ||
"name": "bullmq", | ||
"version": "5.33.1", | ||
"version": "5.34.0", | ||
"description": "Queue for messages and jobs based on Redis", | ||
@@ -5,0 +5,0 @@ "homepage": "https://bullmq.io/", |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
1897453
30497
7
1