@cap-js-community/event-queue
Advanced tools
Comparing version 0.2.3 to 0.2.4
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "0.2.3", | ||
"version": "0.2.4", | ||
"description": "An event queue that enables secure transactional processing of asynchronous events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -14,2 +14,3 @@ "use strict"; | ||
const MIN_INTERVAL_SEC = 10; | ||
const DEFAULT_LOAD = 1; | ||
@@ -31,2 +32,3 @@ class Config { | ||
#skipCsnCheck; | ||
#registerAsEventProcessor; | ||
#disableRedis; | ||
@@ -111,2 +113,3 @@ #env; | ||
this.#eventMap = config.events.reduce((result, event) => { | ||
event.load = event.load ?? DEFAULT_LOAD; | ||
this.validateAdHocEvents(result, event); | ||
@@ -117,2 +120,3 @@ result[[event.type, event.subType].join("##")] = event; | ||
this.#eventMap = config.periodicEvents.reduce((result, event) => { | ||
event.load = event.load ?? DEFAULT_LOAD; | ||
const SUFFIX_PERIODIC = "_PERIODIC"; | ||
@@ -281,2 +285,10 @@ event.type = `${event.type}${SUFFIX_PERIODIC}`; | ||
set registerAsEventProcessor(value) { | ||
this.#registerAsEventProcessor = value; | ||
} | ||
get registerAsEventProcessor() { | ||
return this.#registerAsEventProcessor; | ||
} | ||
get isMultiTenancy() { | ||
@@ -283,0 +295,0 @@ return !!cds.requires.multitenancy; |
@@ -19,2 +19,3 @@ "use strict"; | ||
NO_MANUEL_INSERT_OF_PERIODIC: "NO_MANUEL_INSERT_OF_PERIODIC", | ||
LOAD_HIGHER_THAN_LIMIT: "LOAD_HIGHER_THAN_LIMIT", | ||
}; | ||
@@ -63,2 +64,5 @@ | ||
}, | ||
[ERROR_CODES.LOAD_HIGHER_THAN_LIMIT]: { | ||
message: "The defined load of an event is higher than the maximum defined limit. Check your configuration!", | ||
}, | ||
}; | ||
@@ -211,4 +215,14 @@ | ||
} | ||
static loadHigherThanLimit(load) { | ||
const { message } = ERROR_CODES_META[ERROR_CODES.LOAD_HIGHER_THAN_LIMIT]; | ||
return new EventQueueError( | ||
{ | ||
name: ERROR_CODES.LOAD_HIGHER_THAN_LIMIT, | ||
info: { load }, | ||
}, | ||
message | ||
); | ||
} | ||
} | ||
module.exports = EventQueueError; |
@@ -101,2 +101,16 @@ "use strict"; | ||
/** | ||
* Process one periodic event | ||
* @param processContext the context valid for the event processing. This context is associated with a valid transaction | ||
* Access to the context is also possible with this.getContextForEventProcessing(key). | ||
* The associated tx can be accessed with this.getTxForEventProcessing(key). | ||
* @param {string} key cluster key generated during the clustering step. By default, this is ID of the event queue entry | ||
* @param {Object} queueEntry this is the queueEntry which should be processed | ||
* @returns {Promise<undefined>} | ||
*/ | ||
// eslint-disable-next-line no-unused-vars | ||
async processPeriodicEvent(processContext, key, queueEntry) { | ||
throw new Error(IMPLEMENT_ERROR_MESSAGE); | ||
} | ||
startPerformanceTracerEvents() { | ||
@@ -258,3 +272,4 @@ this.__performanceLoggerEvents = new PerformanceTracer(this.logger, "Processing events"); | ||
this.logger.error( | ||
`The supplied status tuple doesn't have the required structure. Setting all entries to error. Error: ${error.toString()}`, | ||
"The supplied status tuple doesn't have the required structure. Setting all entries to error.", | ||
error, | ||
{ | ||
@@ -300,3 +315,4 @@ eventType: this.#eventType, | ||
this.logger.error( | ||
`Caught error during event processing - setting queue entry to error. Please catch your promises/exceptions. Error: ${error}`, | ||
"Caught error during event processing - setting queue entry to error. Please catch your promises/exceptions", | ||
error, | ||
{ | ||
@@ -315,10 +331,7 @@ eventType: this.#eventType, | ||
handleErrorDuringPeriodicEventProcessing(error, queueEntry) { | ||
this.logger.error( | ||
`Caught error during event periodic processing. Please catch your promises/exceptions. Error: ${error}`, | ||
{ | ||
eventType: this.#eventType, | ||
eventSubType: this.#eventSubType, | ||
queueEntryId: queueEntry.ID, | ||
} | ||
); | ||
this.logger.error("Caught error during event periodic processing. Please catch your promises/exceptions.", error, { | ||
eventType: this.#eventType, | ||
eventSubType: this.#eventSubType, | ||
queueEntryId: queueEntry.ID, | ||
}); | ||
} | ||
@@ -498,3 +511,3 @@ | ||
handleErrorDuringClustering(error) { | ||
this.logger.error(`Error during clustering of events - setting all queue entries to error. Error: ${error}`, { | ||
this.logger.error("Error during clustering of events - setting all queue entries to error.", error, { | ||
eventType: this.#eventType, | ||
@@ -685,3 +698,4 @@ eventSubType: this.#eventSubType, | ||
this.logger.error( | ||
`Caught error during hook for exceeded events - setting queue entry to error. Please catch your promises/exceptions. Error: ${err}`, | ||
"Caught error during hook for exceeded events - setting queue entry to error. Please catch your promises/exceptions.", | ||
err, | ||
{ | ||
@@ -831,2 +845,6 @@ eventType: this.#eventType, | ||
if (!lockAcquired) { | ||
this.logger.debug("no lock available, exit processing", { | ||
type: this.#eventType, | ||
subType: this.#eventSubType, | ||
}); | ||
return false; | ||
@@ -845,3 +863,3 @@ } | ||
} catch (err) { | ||
this.logger.error("Releasing distributed lock failed. Error:", err.toString()); | ||
this.logger.error("Releasing distributed lock failed.", err); | ||
} | ||
@@ -848,0 +866,0 @@ } |
@@ -91,2 +91,3 @@ "use strict"; | ||
registerAsEventProcessor: config.registerAsEventProcessor, | ||
processEventsAfterPublish: config.processEventsAfterPublish, | ||
multiTenancyEnabled: config.isMultiTenancy, | ||
@@ -93,0 +94,0 @@ redisEnabled: config.redisEnabled, |
@@ -66,9 +66,12 @@ "use strict"; | ||
}); | ||
await tx.run( | ||
DELETE.from(eventConfig.tableNameEventQueue).where( | ||
"ID IN", | ||
exitingWithNotMatchingInterval.map(({ ID }) => ID) | ||
) | ||
); | ||
if (exitingWithNotMatchingInterval.length) { | ||
await tx.run( | ||
DELETE.from(eventConfig.tableNameEventQueue).where( | ||
"ID IN", | ||
exitingWithNotMatchingInterval.map(({ ID }) => ID) | ||
) | ||
); | ||
} | ||
const newOrChangedEvents = newEvents.concat(exitingWithNotMatchingInterval); | ||
@@ -75,0 +78,0 @@ |
@@ -9,3 +9,3 @@ "use strict"; | ||
const { TransactionMode } = require("./constants"); | ||
const { limiter, Funnel } = require("./shared/common"); | ||
const { limiter } = require("./shared/common"); | ||
@@ -17,12 +17,2 @@ const { executeInNewTransaction, TriggerRollback } = require("./shared/cdsHelper"); | ||
const eventQueueRunner = async (context, events) => { | ||
const startTime = new Date(); | ||
const funnel = new Funnel(); | ||
await Promise.allSettled( | ||
events.map((event) => | ||
funnel.run(event.load, async () => processEventQueue(context, event.type, event.subType, startTime)) | ||
) | ||
); | ||
}; | ||
const processEventQueue = async (context, eventType, eventSubType, startTime = new Date()) => { | ||
@@ -116,3 +106,3 @@ let iterationCounter = 0; | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("Processing event queue failed with unexpected error. Error:", err, { | ||
cds.log(COMPONENT_NAME).error("Processing event queue failed with unexpected error.", err, { | ||
eventType, | ||
@@ -176,3 +166,3 @@ eventSubType, | ||
try { | ||
await eventTypeInstance.processEvent(tx.context, queueEntry.ID, [queueEntry]); | ||
await eventTypeInstance.processPeriodicEvent(tx.context, queueEntry.ID, queueEntry); | ||
} catch (err) { | ||
@@ -201,3 +191,3 @@ eventTypeInstance.handleErrorDuringPeriodicEventProcessing(err, queueEntry); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("Processing periodic events failed with unexpected error. Error:", err, { | ||
cds.log(COMPONENT_NAME).error("Processing periodic events failed with unexpected error.", err, { | ||
eventType: eventTypeInstance?.eventType, | ||
@@ -286,3 +276,2 @@ eventSubType: eventTypeInstance?.eventSubType, | ||
processEventQueue, | ||
eventQueueRunner, | ||
}; |
@@ -51,3 +51,6 @@ "use strict"; | ||
if (result) { | ||
logger.info("skip publish redis event as no lock is available"); | ||
logger.info("skip publish redis event as no lock is available", { | ||
type, | ||
subType, | ||
}); | ||
return; | ||
@@ -62,3 +65,3 @@ } | ||
} catch (err) { | ||
logger.error(`publish event failed with error: ${err.toString()}`, { | ||
logger.error("publish event failed!", err, { | ||
tenantId, | ||
@@ -65,0 +68,0 @@ type, |
@@ -6,4 +6,4 @@ "use strict"; | ||
const eventQueueConfig = require("./config"); | ||
const { eventQueueRunner, processEventQueue } = require("./processEventQueue"); | ||
const { getWorkerPoolInstance } = require("./shared/WorkerQueue"); | ||
const { processEventQueue } = require("./processEventQueue"); | ||
const WorkerQueue = require("./shared/WorkerQueue"); | ||
const cdsHelper = require("./shared/cdsHelper"); | ||
@@ -25,3 +25,3 @@ const distributedLock = require("./shared/distributedLock"); | ||
const singleTenant = () => _scheduleFunction(_checkPeriodicEventsSingleTenant, _executeRunForTenant); | ||
const singleTenant = () => _scheduleFunction(_checkPeriodicEventsSingleTenant, _singleTenantDb); | ||
@@ -80,3 +80,3 @@ const multiTenancyDb = () => _scheduleFunction(_multiTenancyPeriodicEvents, _multiTenancyDb); | ||
_executeAllTenants(tenantIds, runId); | ||
return _executeEventsAllTenants(tenantIds, runId); | ||
}; | ||
@@ -93,3 +93,3 @@ | ||
_multiTenancyPeriodicEvents().catch((err) => { | ||
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events! Error:", err); | ||
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events!", err); | ||
}); | ||
@@ -99,6 +99,34 @@ } | ||
const _executeAllTenantsGeneric = (tenantIds, runId, fn) => { | ||
const workerQueueInstance = getWorkerPoolInstance(); | ||
const _executeEventsAllTenants = (tenantIds, runId) => { | ||
const events = eventQueueConfig.allEvents; | ||
const promises = []; | ||
tenantIds.forEach((tenantId) => { | ||
workerQueueInstance.addToQueue(async () => { | ||
events.forEach((event) => { | ||
promises.push( | ||
WorkerQueue.instance.addToQueue(event.load, async () => { | ||
try { | ||
const lockId = `${runId}_${event.type}_${event.subType}`; | ||
const tenantContext = new cds.EventContext({ tenant: tenantId }); | ||
const couldAcquireLock = await distributedLock.acquireLock(tenantContext, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
} | ||
await runEventCombinationForTenant(tenantId, event.type, event.subType, true); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
tenantId, | ||
}); | ||
} | ||
}) | ||
); | ||
}); | ||
}); | ||
return promises; | ||
}; | ||
const _executePeriodicEventsAllTenants = (tenantIds, runId) => { | ||
tenantIds.forEach((tenantId) => { | ||
WorkerQueue.instance.addToQueue(1, async () => { | ||
try { | ||
@@ -112,3 +140,3 @@ const tenantContext = new cds.EventContext({ tenant: tenantId }); | ||
} | ||
await fn(tenantId, runId); | ||
await _checkPeriodicEventsSingleTenant(tenantId); | ||
} catch (err) { | ||
@@ -123,30 +151,16 @@ cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
const _executeAllTenants = (tenantIds, runId) => _executeAllTenantsGeneric(tenantIds, runId, _executeRunForTenant); | ||
const _executePeriodicEventsAllTenants = (tenantIds, runId) => | ||
_executeAllTenantsGeneric(tenantIds, runId, _checkPeriodicEventsSingleTenant); | ||
const _executeRunForTenant = async (tenantId, runId) => { | ||
const logger = cds.log(COMPONENT_NAME); | ||
try { | ||
const eventsForAutomaticRun = eventQueueConfig.allEvents; | ||
const subdomain = await cdsHelper.getSubdomainForTenantId(tenantId); | ||
const context = new cds.EventContext({ | ||
tenant: tenantId, | ||
// NOTE: we need this because of logging otherwise logs would not contain the subdomain | ||
http: { req: { authInfo: { getSubdomain: () => subdomain } } }, | ||
const _singleTenantDb = async (tenantId) => { | ||
const events = eventQueueConfig.allEvents; | ||
events.forEach((event) => { | ||
WorkerQueue.instance.addToQueue(event.load, async () => { | ||
try { | ||
await runEventCombinationForTenant(tenantId, event.type, event.subType, true); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
tenantId, | ||
redisEnabled: eventQueueConfig.redisEnabled, | ||
}); | ||
} | ||
}); | ||
cds.context = context; | ||
logger.info("executing eventQueue run", { | ||
tenantId, | ||
subdomain, | ||
...(runId ? { runId } : null), | ||
}); | ||
await eventQueueRunner(context, eventsForAutomaticRun); | ||
} catch (err) { | ||
logger.error(`Couldn't process eventQueue for tenant! Next try after defined interval. Error: ${err}`, { | ||
tenantId, | ||
redisEnabled: eventQueueConfig.redisEnabled, | ||
}); | ||
} | ||
}); | ||
}; | ||
@@ -206,6 +220,3 @@ | ||
.log(COMPONENT_NAME) | ||
.error( | ||
"calculating offset for first run failed, falling back to default. Runs might be out-of-sync. Error:", | ||
err | ||
); | ||
.error("calculating offset for first run failed, falling back to default. Runs might be out-of-sync.", err); | ||
} | ||
@@ -215,3 +226,3 @@ return offsetDependingOnLastRun; | ||
const runEventCombinationForTenant = async (tenantId, type, subType) => { | ||
const runEventCombinationForTenant = async (tenantId, type, subType, skipWorkerPool) => { | ||
try { | ||
@@ -225,3 +236,11 @@ const subdomain = await getSubdomainForTenantId(tenantId); | ||
cds.context = context; | ||
getWorkerPoolInstance().addToQueue(async () => await processEventQueue(context, type, subType)); | ||
if (skipWorkerPool) { | ||
return await processEventQueue(context, type, subType); | ||
} else { | ||
const config = eventQueueConfig.getEventConfig(type, subType); | ||
return await WorkerQueue.instance.addToQueue( | ||
config.load, | ||
async () => await processEventQueue(context, type, subType) | ||
); | ||
} | ||
} catch (err) { | ||
@@ -243,7 +262,5 @@ const logger = cds.log(COMPONENT_NAME); | ||
_checkAndTriggerPeriodicEventUpdate(tenantIds); | ||
_executeAllTenants(tenantIds, EVENT_QUEUE_RUN_ID); | ||
return _executeEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_ID); | ||
} catch (err) { | ||
logger.error( | ||
`Couldn't fetch tenant ids for event queue processing! Next try after defined interval. Error: ${err}` | ||
); | ||
logger.error("Couldn't fetch tenant ids for event queue processing! Next try after defined interval.", err); | ||
} | ||
@@ -259,3 +276,3 @@ }; | ||
} catch (err) { | ||
logger.error(`Couldn't fetch tenant ids for updating periodic event processing! Error: ${err}`); | ||
logger.error("Couldn't fetch tenant ids for updating periodic event processing!", err); | ||
} | ||
@@ -266,4 +283,8 @@ }; | ||
const logger = cds.log(COMPONENT_NAME); | ||
if (!eventQueueConfig.updatePeriodicEvents) { | ||
logger.info("updating of periodic events is disabled"); | ||
if (!eventQueueConfig.updatePeriodicEvents || !eventQueueConfig.periodicEvents.length) { | ||
logger.info("updating of periodic events is disabled or no periodic events configured", { | ||
updateEnabled: eventQueueConfig.updatePeriodicEvents, | ||
events: eventQueueConfig.periodicEvents.length, | ||
}); | ||
return; | ||
} | ||
@@ -286,3 +307,3 @@ try { | ||
} catch (err) { | ||
logger.error(`Couldn't process eventQueue for tenant! Next try after defined interval. Error: ${err}`, { | ||
logger.error("Couldn't update periodic events for tenant! Next try after defined interval.", err, { | ||
tenantId, | ||
@@ -289,0 +310,0 @@ redisEnabled: eventQueueConfig.redisEnabled, |
"use strict"; | ||
const crypto = require("crypto"); | ||
const { floor, abs, min } = Math; | ||
const arrayToFlatMap = (array, key = "ID") => { | ||
@@ -15,70 +12,2 @@ return array.reduce((result, element) => { | ||
/** | ||
* Establish a "Funnel" instance to limit how much | ||
* load can be processed in parallel. This is somewhat | ||
* similar to the limiter function however it has some | ||
* distinctintly different features. The Funnel will | ||
* not know in advance which functions and how many | ||
* loads it will have to process. | ||
*/ | ||
class Funnel { | ||
/** | ||
* Create a funnel with specified capacity | ||
* @param capacity - the capacity of the funnel (integer, sign will be ignored) | ||
*/ | ||
constructor(capacity = 100) { | ||
this.runningPromises = []; | ||
this.capacity = floor(abs(capacity)); | ||
} | ||
/** | ||
* Asynchronously run a function that will put a specified load to the funnel. | ||
* The total amount of load of all running functions shall not | ||
* exceed the capacity of the funnel. If the desired load exceeds the capacity | ||
* the funnel will wait until sufficient capacity is available. | ||
* If a function requires a load >= capacity, then it will run | ||
* exclusively. | ||
* @param load - the load (integer, sign will be ignored) | ||
* @param f | ||
* @param args | ||
* @return {Promise<unknown>} | ||
*/ | ||
async run(load, f, ...args) { | ||
load = min(floor(abs(load)), Number.MAX_SAFE_INTEGER); | ||
// wait for sufficient capacity | ||
while (this.capacity < load && this.runningPromises.length > 0) { | ||
try { | ||
await Promise.race(this.runningPromises); | ||
} catch { | ||
// Yes, we must ignore exceptions here. The | ||
// caller expects exceptions from f and no | ||
// exceptions from other workloads. | ||
// Other exceptions must be handled by the | ||
// other callers. See (*) below. | ||
} | ||
} | ||
// map function call to promise | ||
const p = f.constructor.name === "AsyncFunction" ? f(...args) : Promise.resolve().then(() => f(...args)); | ||
// create promise for book keeping | ||
const workload = p.finally(() => { | ||
// remove workload | ||
this.runningPromises.splice(this.runningPromises.indexOf(workload), 1); | ||
// and reclaim its capacity | ||
this.capacity += load; | ||
}); | ||
// claim the capacity and schedule workload | ||
this.capacity -= load; | ||
this.runningPromises.push(workload); | ||
// make the caller wait for the workload | ||
// this also establish the seemingly missing | ||
// exception handling. See (*) above. | ||
return workload; | ||
} | ||
} | ||
/** | ||
* Defines a promise that resolves when all payloads are processed by the iterator, but limits | ||
@@ -136,2 +65,2 @@ * the number concurrent executions. | ||
module.exports = { arrayToFlatMap, Funnel, limiter, isValidDate, processChunkedSync, hashStringTo32Bit }; | ||
module.exports = { arrayToFlatMap, limiter, isValidDate, processChunkedSync, hashStringTo32Bit }; |
@@ -7,2 +7,4 @@ "use strict"; | ||
const KEY_PREFIX = "EVENT_QUEUE"; | ||
const acquireLock = async (context, key, { tenantScoped = true, expiryTime = config.globalTxTimeout } = {}) => { | ||
@@ -132,3 +134,3 @@ const fullKey = _generateKey(context, tenantScoped, key); | ||
keyParts.push(key); | ||
return keyParts.join("##"); | ||
return `${KEY_PREFIX}_${keyParts.join("##")}`; | ||
}; | ||
@@ -135,0 +137,0 @@ |
@@ -6,21 +6,38 @@ "use strict"; | ||
const config = require("../config"); | ||
const EventQueueError = require("../EventQueueError"); | ||
const COMPONENT_NAME = "eventQueue/WorkerQueue"; | ||
const NANO_TO_MS = 1e6; | ||
const THRESHOLD = { | ||
INFO: 5 * 1000, | ||
WARN: 10 * 1000, | ||
ERROR: 15 * 1000, | ||
}; | ||
let instance = null; | ||
class WorkerQueue { | ||
#concurrencyLimit; | ||
#runningPromises; | ||
#runningLoad; | ||
#queue; | ||
static #instance; | ||
class WorkerQueue { | ||
constructor(concurrency) { | ||
if (Number.isNaN(concurrency) || concurrency <= 0) { | ||
this.__concurrencyLimit = 1; | ||
this.#concurrencyLimit = 1; | ||
} else { | ||
this.__concurrencyLimit = concurrency; | ||
this.#concurrencyLimit = concurrency; | ||
} | ||
this.__runningPromises = []; | ||
this.__queue = []; | ||
this.#runningPromises = []; | ||
this.#runningLoad = 0; | ||
this.#queue = []; | ||
} | ||
addToQueue(cb) { | ||
addToQueue(load, cb) { | ||
if (load > this.#concurrencyLimit) { | ||
throw EventQueueError.loadHigherThanLimit(load); | ||
} | ||
const startTime = process.hrtime.bigint(); | ||
const p = new Promise((resolve, reject) => { | ||
this.__queue.push([cb, resolve, reject]); | ||
this.#queue.push([load, cb, resolve, reject, startTime]); | ||
}); | ||
@@ -31,8 +48,11 @@ this._checkForNext(); | ||
_executeFunction(cb, resolve, reject) { | ||
_executeFunction(load, cb, resolve, reject, startTime) { | ||
this.checkAndLogWaitingTime(startTime); | ||
const promise = Promise.resolve().then(() => cb()); | ||
this.__runningPromises.push(promise); | ||
this.#runningPromises.push(promise); | ||
this.#runningLoad = this.#runningLoad + load; | ||
promise | ||
.finally(() => { | ||
this.__runningPromises.splice(this.__runningPromises.indexOf(promise), 1); | ||
this.#runningLoad = this.#runningLoad - load; | ||
this.#runningPromises.splice(this.#runningPromises.indexOf(promise), 1); | ||
this._checkForNext(); | ||
@@ -44,3 +64,3 @@ }) | ||
.catch((err) => { | ||
cds.log(COMPONENT_NAME).error("Error happened in WorkQueue. Errors should be caught before! Error:", err); | ||
cds.log(COMPONENT_NAME).error("Error happened in WorkQueue. Errors should be caught before!", err); | ||
reject(err); | ||
@@ -51,20 +71,42 @@ }); | ||
_checkForNext() { | ||
if (!this.__queue.length || this.__runningPromises.length >= this.__concurrencyLimit) { | ||
const load = this.#queue[0]?.[0]; | ||
if (!this.#queue.length || this.#runningLoad + load > this.#concurrencyLimit) { | ||
return; | ||
} | ||
const [cb, resolve, reject] = this.__queue.shift(); | ||
this._executeFunction(cb, resolve, reject); | ||
const args = this.#queue.shift(); | ||
this._executeFunction(...args); | ||
} | ||
get runningPromises() { | ||
return this.#runningPromises; | ||
} | ||
/** | ||
@return { WorkerQueue } | ||
**/ | ||
static get instance() { | ||
if (!WorkerQueue.#instance) { | ||
WorkerQueue.#instance = new WorkerQueue(config.parallelTenantProcessing); | ||
} | ||
return WorkerQueue.#instance; | ||
} | ||
checkAndLogWaitingTime(startTime) { | ||
const diffMs = Math.round(Number(process.hrtime.bigint() - startTime) / NANO_TO_MS); | ||
let logLevel; | ||
if (diffMs >= THRESHOLD.ERROR) { | ||
logLevel = "error"; | ||
} else if (diffMs >= THRESHOLD.WARN) { | ||
logLevel = "warn"; | ||
} else if (diffMs >= THRESHOLD.INFO) { | ||
logLevel = "info"; | ||
} else { | ||
logLevel = "debug"; | ||
} | ||
cds.log(COMPONENT_NAME)[logLevel]("Waiting time in worker queue", { | ||
diffMs, | ||
}); | ||
} | ||
} | ||
module.exports = { | ||
getWorkerPoolInstance: () => { | ||
if (!instance) { | ||
instance = new WorkerQueue(config.parallelTenantProcessing); | ||
} | ||
return instance; | ||
}, | ||
_: { | ||
WorkerQueue, | ||
}, | ||
}; | ||
module.exports = WorkerQueue; |
131065
3124