@cap-js-community/event-queue
Advanced tools
Comparing version 1.8.6 to 1.8.7
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.8.6", | ||
"version": "1.8.7", | ||
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -20,2 +20,3 @@ "use strict"; | ||
const DEFAULT_PRIORITY = Priorities.Medium; | ||
const DEFAULT_INCREASE_PRIORITY = true; | ||
const SUFFIX_PERIODIC = "_PERIODIC"; | ||
@@ -321,2 +322,3 @@ const COMMAND_BLOCK = "EVENT_QUEUE_EVENT_BLOCK"; | ||
multiInstanceProcessing: config.multiInstanceProcessing, | ||
increasePriorityOverTime: config.increasePriorityOverTime, | ||
internalEvent: true, | ||
@@ -369,3 +371,2 @@ }; | ||
this.#eventMap = config.periodicEvents.reduce((result, event) => { | ||
event.priority = event.priority ?? DEFAULT_PRIORITY; | ||
event.type = `${event.type}${SUFFIX_PERIODIC}`; | ||
@@ -384,2 +385,3 @@ event.isPeriodic = true; | ||
event.priority = event.priority ?? DEFAULT_PRIORITY; | ||
event.increasePriorityOverTime = event.increasePriorityOverTime ?? DEFAULT_INCREASE_PRIORITY; | ||
} | ||
@@ -386,0 +388,0 @@ |
@@ -272,3 +272,9 @@ import * as cds from "@sap/cds"; | ||
addToQueue(load: number, label: string, priority?: Priorities, cb?: () => any): Promise<any>; | ||
addToQueue( | ||
load: number, | ||
label: string, | ||
priority: Priorities, | ||
increasePriorityOverTime: boolean, | ||
cb: () => any | ||
): Promise<any>; | ||
@@ -275,0 +281,0 @@ _executeFunction( |
@@ -197,29 +197,35 @@ "use strict"; | ||
const label = `${eventConfig.type}_${eventConfig.subType}`; | ||
return await WorkerQueue.instance.addToQueue(eventConfig.load, label, eventConfig.priority, async () => { | ||
return await cds.tx(tenantContext, async ({ context }) => { | ||
await trace( | ||
context, | ||
label, | ||
async () => { | ||
try { | ||
const lockId = `${runId}_${label}`; | ||
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
return await WorkerQueue.instance.addToQueue( | ||
eventConfig.load, | ||
label, | ||
eventConfig.priority, | ||
eventConfig.increasePriorityOverTime, | ||
async () => { | ||
return await cds.tx(tenantContext, async ({ context }) => { | ||
await trace( | ||
context, | ||
label, | ||
async () => { | ||
try { | ||
const lockId = `${runId}_${label}`; | ||
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
} | ||
await runEventCombinationForTenant(context, eventConfig.type, eventConfig.subType, { | ||
skipWorkerPool: true, | ||
}); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
tenantId, | ||
}); | ||
} | ||
await runEventCombinationForTenant(context, eventConfig.type, eventConfig.subType, { | ||
skipWorkerPool: true, | ||
}); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
tenantId, | ||
}); | ||
} | ||
}, | ||
{ newRootSpan: true } | ||
); | ||
}); | ||
}); | ||
}, | ||
{ newRootSpan: true } | ||
); | ||
}); | ||
} | ||
); | ||
}) | ||
@@ -279,29 +285,35 @@ ); | ||
const label = `${eventConfig.type}_${eventConfig.subType}`; | ||
return await WorkerQueue.instance.addToQueue(eventConfig.load, label, eventConfig.priority, async () => { | ||
return await cds.tx({}, async ({ context }) => { | ||
await trace( | ||
context, | ||
label, | ||
async () => { | ||
try { | ||
const lockId = `${label}`; | ||
const couldAcquireLock = eventConfig.multiInstanceProcessing | ||
? true | ||
: await distributedLock.acquireLock(context, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
return await WorkerQueue.instance.addToQueue( | ||
eventConfig.load, | ||
label, | ||
eventConfig.priority, | ||
eventConfig.increasePriorityOverTime, | ||
async () => { | ||
return await cds.tx({}, async ({ context }) => { | ||
await trace( | ||
context, | ||
label, | ||
async () => { | ||
try { | ||
const lockId = `${label}`; | ||
const couldAcquireLock = eventConfig.multiInstanceProcessing | ||
? true | ||
: await distributedLock.acquireLock(context, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
} | ||
await runEventCombinationForTenant(context, eventConfig.type, eventConfig.subType, { | ||
skipWorkerPool: true, | ||
}); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed"); | ||
} | ||
await runEventCombinationForTenant(context, eventConfig.type, eventConfig.subType, { | ||
skipWorkerPool: true, | ||
}); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed"); | ||
} | ||
}, | ||
{ newRootSpan: true } | ||
); | ||
}); | ||
}); | ||
}, | ||
{ newRootSpan: true } | ||
); | ||
}); | ||
} | ||
); | ||
}) | ||
@@ -308,0 +320,0 @@ ); |
@@ -26,2 +26,3 @@ "use strict"; | ||
eventConfig.priority, | ||
eventConfig.increasePriorityOverTime, | ||
AsyncResource.bind(async () => { | ||
@@ -28,0 +29,0 @@ const _exec = async () => { |
@@ -55,3 +55,3 @@ "use strict"; | ||
addToQueue(load, label, priority = Priorities.Medium, cb) { | ||
addToQueue(load, label, priority = Priorities.Medium, increasePriorityOverTime, cb) { | ||
if (load > this.#concurrencyLimit) { | ||
@@ -67,3 +67,3 @@ throw EventQueueError.loadHigherThanLimit(load, label); | ||
const p = new Promise((resolve, reject) => { | ||
this.#queue[priority].push([load, label, cb, resolve, reject, startTime]); | ||
this.#queue[priority].push([load, label, cb, resolve, reject, increasePriorityOverTime, startTime]); | ||
}); | ||
@@ -83,3 +83,8 @@ this.#checkForNext(); | ||
const queueEntry = this.queue[priority][i]; | ||
const startTime = queueEntry[6] ?? queueEntry[5]; | ||
// NOTE: index 5 - increasingPrioOverTime | ||
if (!queueEntry[5]) { | ||
continue; | ||
} | ||
// NOTE: index 6 original time; index 7 shifted time | ||
const startTime = queueEntry[7] ?? queueEntry[6]; | ||
if (Math.round(Number(checkTime - startTime) / NANO_TO_MS) > INCREASE_PRIORITY_AFTER * MIN_TO_MS) { | ||
@@ -94,3 +99,3 @@ const [entry] = this.queue[priority].splice(i, 1); | ||
_executeFunction(load, label, cb, resolve, reject, startTime, priority) { | ||
_executeFunction(priority, load, label, cb, resolve, reject, skipIncreasingPrioOverTime, startTime) { | ||
this.#checkAndLogWaitingTime(startTime, label, priority); | ||
@@ -130,3 +135,3 @@ const promise = Promise.resolve().then(() => cb()); | ||
const [args] = this.#queue[priority].splice(i, 1); | ||
this._executeFunction(...args, priority); | ||
this._executeFunction(priority, ...args); | ||
entryFound = true; | ||
@@ -133,0 +138,0 @@ break; |
211560
5294