@cap-js-community/event-queue
Advanced tools
Comparing version 0.2.1 to 0.2.2
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "0.2.1", | ||
"version": "0.2.2", | ||
"description": "An event queue that enables secure transactional processing of asynchronous events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -7,2 +7,3 @@ "use strict"; | ||
const redis = require("./shared/redis"); | ||
const EventQueueError = require("./EventQueueError"); | ||
@@ -15,2 +16,3 @@ let instance; | ||
const COMPONENT_NAME = "eventQueue/config"; | ||
const MIN_INTERVAL_SEC = 10; | ||
@@ -35,2 +37,3 @@ class Config { | ||
#eventMap; | ||
#updatePeriodicEvents; | ||
constructor() { | ||
@@ -56,7 +59,7 @@ this.#logger = cds.log(COMPONENT_NAME); | ||
getEventConfig(type, subType) { | ||
return this.#eventMap[[type, subType].join("##")]; | ||
return this.#eventMap[this.generateKey(type, subType)]; | ||
} | ||
hasEventAfterCommitFlag(type, subType) { | ||
return this.#eventMap[[type, subType].join("##")]?.processAfterCommit ?? true; | ||
return this.#eventMap[this.generateKey(type, subType)]?.processAfterCommit ?? true; | ||
} | ||
@@ -108,8 +111,45 @@ | ||
this.#config = config; | ||
config.events = config.events ?? []; | ||
config.periodicEvents = config.periodicEvents ?? []; | ||
this.#eventMap = config.events.reduce((result, event) => { | ||
this.validateAdHocEvents(result, event); | ||
result[[event.type, event.subType].join("##")] = event; | ||
return result; | ||
}, {}); | ||
this.#eventMap = config.periodicEvents.reduce((result, event) => { | ||
this.validatePeriodicConfig(result, event); | ||
event.isPeriodic = true; | ||
result[[event.type, event.subType].join("##")] = event; | ||
return result; | ||
}, this.#eventMap); | ||
} | ||
validatePeriodicConfig(eventMap, config) { | ||
if (eventMap[this.generateKey(config.type, config.subType)]) { | ||
throw EventQueueError.duplicateEventRegistration(config.type, config.subType); | ||
} | ||
if (!config.interval || config.interval <= MIN_INTERVAL_SEC) { | ||
throw EventQueueError.invalidInterval(config.type, config.subType, config.interval); | ||
} | ||
if (!config.impl) { | ||
throw EventQueueError.missingImpl(config.type, config.subType); | ||
} | ||
} | ||
validateAdHocEvents(eventMap, config) { | ||
if (eventMap[this.generateKey(config.type, config.subType)]) { | ||
throw EventQueueError.duplicateEventRegistration(config.type, config.subType); | ||
} | ||
if (!config.impl) { | ||
throw EventQueueError.missingImpl(config.type, config.subType); | ||
} | ||
} | ||
generateKey(type, subType) { | ||
return [type, subType].join("##"); | ||
} | ||
get fileContent() { | ||
@@ -123,2 +163,14 @@ return this.#config; | ||
get periodicEvents() { | ||
return this.#config.periodicEvents; | ||
} | ||
isPeriodicEvent(type, subType) { | ||
return this.#eventMap[this.generateKey(type, subType)]?.isPeriodic; | ||
} | ||
get allEvents() { | ||
return this.#config.events.concat(this.#config.periodicEvents); | ||
} | ||
get forUpdateTimeout() { | ||
@@ -220,2 +272,10 @@ return this.#forUpdateTimeout; | ||
set updatePeriodicEvents(value) { | ||
this.#updatePeriodicEvents = value; | ||
} | ||
get updatePeriodicEvents() { | ||
return this.#updatePeriodicEvents; | ||
} | ||
get isMultiTenancy() { | ||
@@ -222,0 +282,0 @@ return !!cds.requires.multitenancy; |
@@ -14,2 +14,5 @@ "use strict"; | ||
dbService.after("CREATE", def, (_, req) => { | ||
if (req.tx._skipEventQueueBroadcase) { | ||
return; | ||
} | ||
req.tx._ = req.tx._ ?? {}; | ||
@@ -16,0 +19,0 @@ req.tx._.eventQueuePublishEvents = req.tx._.eventQueuePublishEvents ?? {}; |
@@ -15,2 +15,6 @@ "use strict"; | ||
NO_VALID_DATE: "NO_VALID_DATE", | ||
INVALID_INTERVAL: "INVALID_INTERVAL", | ||
MISSING_IMPL: "MISSING_IMPL", | ||
DUPLICATE_EVENT_REGISTRATION: "DUPLICATE_EVENT_REGISTRATION", | ||
NO_MANUEL_INSERT_OF_PERIODIC: "NO_MANUEL_INSERT_OF_PERIODIC", | ||
}; | ||
@@ -47,2 +51,14 @@ | ||
}, | ||
[ERROR_CODES.INVALID_INTERVAL]: { | ||
message: "Invalid interval, the value needs to greater than 10 seconds.", | ||
}, | ||
[ERROR_CODES.MISSING_IMPL]: { | ||
message: "Missing path to event class implementation.", | ||
}, | ||
[ERROR_CODES.DUPLICATE_EVENT_REGISTRATION]: { | ||
message: "Duplicate event registration, check the uniqueness of type and subType.", | ||
}, | ||
[ERROR_CODES.NO_MANUEL_INSERT_OF_PERIODIC]: { | ||
message: "Periodic events are managed by the framework and are not allowed to insert manually.", | ||
}, | ||
}; | ||
@@ -151,4 +167,48 @@ | ||
} | ||
static invalidInterval(type, subType, interval) { | ||
const { message } = ERROR_CODES_META[ERROR_CODES.INVALID_INTERVAL]; | ||
return new EventQueueError( | ||
{ | ||
name: ERROR_CODES.INVALID_INTERVAL, | ||
info: { type, subType, interval }, | ||
}, | ||
message | ||
); | ||
} | ||
static missingImpl(type, subType) { | ||
const { message } = ERROR_CODES_META[ERROR_CODES.MISSING_IMPL]; | ||
return new EventQueueError( | ||
{ | ||
name: ERROR_CODES.MISSING_IMPL, | ||
info: { type, subType }, | ||
}, | ||
message | ||
); | ||
} | ||
static duplicateEventRegistration(type, subType) { | ||
const { message } = ERROR_CODES_META[ERROR_CODES.DUPLICATE_EVENT_REGISTRATION]; | ||
return new EventQueueError( | ||
{ | ||
name: ERROR_CODES.DUPLICATE_EVENT_REGISTRATION, | ||
info: { type, subType }, | ||
}, | ||
message | ||
); | ||
} | ||
static manuelPeriodicEventInsert(type, subType) { | ||
const { message } = ERROR_CODES_META[ERROR_CODES.NO_MANUEL_INSERT_OF_PERIODIC]; | ||
return new EventQueueError( | ||
{ | ||
name: ERROR_CODES.NO_MANUEL_INSERT_OF_PERIODIC, | ||
info: { type, subType }, | ||
}, | ||
message | ||
); | ||
} | ||
} | ||
module.exports = EventQueueError; |
@@ -10,3 +10,3 @@ "use strict"; | ||
const { arrayToFlatMap } = require("./shared/common"); | ||
const eventScheduler = require("./shared/EventScheduler"); | ||
const eventScheduler = require("./shared/eventScheduler"); | ||
const eventQueueConfig = require("./config"); | ||
@@ -307,2 +307,25 @@ const PerformanceTracer = require("./shared/PerformanceTracer"); | ||
handleErrorDuringPeriodicEventProcessing(error, queueEntry) { | ||
this.logger.error( | ||
`Caught error during event periodic processing. Please catch your promises/exceptions. Error: ${error}`, | ||
{ | ||
eventType: this.#eventType, | ||
eventSubType: this.#eventSubType, | ||
queueEntryId: queueEntry.ID, | ||
} | ||
); | ||
} | ||
async setPeriodicEventStatus(queueEntryIds) { | ||
await this.tx.run( | ||
UPDATE.entity(this.#config.tableNameEventQueue) | ||
.set({ | ||
status: EventProcessingStatus.Done, | ||
}) | ||
.where({ | ||
ID: queueEntryIds, | ||
}) | ||
); | ||
} | ||
/** | ||
@@ -817,2 +840,43 @@ * This function validates for all selected events one status has been submitted. It's also validated that only for | ||
async scheduleNextPeriodEvent(queueEntry) { | ||
const interval = this.__eventConfig.interval; | ||
const newEvent = { | ||
type: this.#eventType, | ||
subType: this.#eventSubType, | ||
startAfter: new Date(new Date(queueEntry.startAfter).getTime() + interval * 1000), | ||
}; | ||
this.tx._skipEventQueueBroadcase = true; | ||
await this.tx.run(INSERT.into(this.#config.tableNameEventQueue).entries({ ...newEvent })); | ||
this.tx._skipEventQueueBroadcase = false; | ||
if (interval < this.#config.runInterval) { | ||
this.#handleDelayedEvents([newEvent]); | ||
} | ||
} | ||
async handleDuplicatedPeriodicEventEntry(queueEntries) { | ||
this.logger.error("More than one open events for the same configuration which is not allowed!", { | ||
eventType: this.#eventType, | ||
eventSubType: this.#eventSubType, | ||
queueEntriesIds: queueEntries.map(({ ID }) => ID), | ||
}); | ||
let queueEntryToUse; | ||
const obsoleteEntries = []; | ||
for (const queueEntry of queueEntries) { | ||
if (!queueEntryToUse) { | ||
queueEntryToUse = queueEntry; | ||
continue; | ||
} | ||
if (queueEntryToUse.startAfter <= queueEntry.queueEntry) { | ||
obsoleteEntries.push(queueEntryToUse); | ||
queueEntryToUse = queueEntry; | ||
} else { | ||
obsoleteEntries.push(queueEntry); | ||
} | ||
} | ||
await this.setPeriodicEventStatus(obsoleteEntries.map(({ ID }) => ID)); | ||
return queueEntryToUse; | ||
} | ||
statusMapContainsError(statusMap) { | ||
@@ -926,4 +990,8 @@ return Object.values(statusMap).includes(EventProcessingStatus.Error); | ||
} | ||
get isPeriodicEvent() { | ||
return this.__eventConfig.isPeriodic; | ||
} | ||
} | ||
module.exports = EventQueueProcessorBase; |
@@ -37,2 +37,3 @@ "use strict"; | ||
["skipCsnCheck", false], | ||
["updatePeriodicEvents", true], | ||
]; | ||
@@ -51,2 +52,3 @@ | ||
skipCsnCheck, | ||
updatePeriodicEvents, | ||
} = {}) => { | ||
@@ -73,3 +75,4 @@ // TODO: initialize check: | ||
disableRedis, | ||
skipCsnCheck | ||
skipCsnCheck, | ||
updatePeriodicEvents | ||
); | ||
@@ -123,4 +126,6 @@ | ||
const errorHandler = (err) => cds.log(COMPONENT).error("error during init runner", err); | ||
if (!configInstance.isMultiTenancy) { | ||
runner.singleTenant(); | ||
runner.singleTenant().catch(errorHandler); | ||
return; | ||
@@ -132,5 +137,5 @@ } | ||
configInstance.attachConfigChangeHandler(); | ||
runner.multiTenancyRedis(); | ||
runner.multiTenancyRedis().catch(errorHandler); | ||
} else { | ||
runner.multiTenancyDb(); | ||
runner.multiTenancyDb().catch(errorHandler); | ||
} | ||
@@ -137,0 +142,0 @@ }; |
@@ -46,2 +46,5 @@ "use strict"; | ||
} | ||
if (baseInstance.isPeriodicEvent) { | ||
return await processPeriodicEvent(baseInstance); | ||
} | ||
eventConfig.startTime = startTime; | ||
@@ -135,2 +138,62 @@ while (shouldContinue) { | ||
// TODO: don't forget to release lock | ||
const processPeriodicEvent = async (eventTypeInstance) => { | ||
let queueEntry; | ||
try { | ||
await executeInNewTransaction( | ||
eventTypeInstance.context, | ||
`eventQueue-periodic-scheduleNext-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`, | ||
async (tx) => { | ||
eventTypeInstance.processEventContext = tx.context; | ||
const queueEntries = await eventTypeInstance.getQueueEntriesAndSetToInProgress(); | ||
if (!queueEntries.length) { | ||
return; | ||
} | ||
if (queueEntries.length > 1) { | ||
queueEntry = await eventTypeInstance.handleDuplicatedPeriodicEventEntry(queueEntries); | ||
} else { | ||
queueEntry = queueEntries[0]; | ||
} | ||
await eventTypeInstance.scheduleNextPeriodEvent(queueEntry); | ||
} | ||
); | ||
if (!queueEntry) { | ||
return; | ||
} | ||
await executeInNewTransaction( | ||
eventTypeInstance.context, | ||
`eventQueue-periodic-process-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`, | ||
async (tx) => { | ||
eventTypeInstance.processEventContext = tx.context; | ||
eventTypeInstance.setTxForEventProcessing(queueEntry.ID, cds.tx(tx.context)); | ||
try { | ||
await eventTypeInstance.processEvent(tx.context, queueEntry.ID, [queueEntry]); | ||
} catch (err) { | ||
eventTypeInstance.handleErrorDuringPeriodicEventProcessing(err, queueEntry); | ||
throw new TriggerRollback(); | ||
} | ||
if ( | ||
eventTypeInstance.transactionMode !== TransactionMode.alwaysCommit || | ||
eventTypeInstance.shouldRollbackTransaction(queueEntry.ID) | ||
) { | ||
throw new TriggerRollback(); | ||
} | ||
} | ||
); | ||
await executeInNewTransaction( | ||
eventTypeInstance.context, | ||
`eventQueue-periodic-setStatus-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`, | ||
async (tx) => { | ||
eventTypeInstance.processEventContext = tx.context; | ||
await eventTypeInstance.setPeriodicEventStatus(queueEntry.ID); | ||
} | ||
); | ||
} finally { | ||
await eventTypeInstance?.handleReleaseLock(); | ||
} | ||
}; | ||
const processEventMap = async (eventTypeInstance) => { | ||
@@ -137,0 +200,0 @@ eventTypeInstance.startPerformanceTracerEvents(); |
@@ -24,2 +24,3 @@ "use strict"; | ||
* } | ||
* @param {Boolean} skipBroadcast - (Optional) If set to true, event broadcasting will be skipped. Defaults to false. | ||
* @throws {EventQueueError} Throws an error if the configuration is not initialized. | ||
@@ -30,3 +31,3 @@ * @throws {EventQueueError} Throws an error if the event type is unknown. | ||
*/ | ||
const publishEvent = async (tx, events) => { | ||
const publishEvent = async (tx, events, skipBroadcast = false) => { | ||
const configInstance = config.getConfigInstance(); | ||
@@ -45,4 +46,11 @@ if (!configInstance.initialized) { | ||
} | ||
if (eventConfig.isPeriodic) { | ||
throw EventQueueError.manuelPeriodicEventInsert(type, subType); | ||
} | ||
} | ||
return await tx.run(INSERT.into(configInstance.tableNameEventQueue).entries(eventsForProcessing)); | ||
tx._skipEventQueueBroadcase = skipBroadcast; | ||
const result = await tx.run(INSERT.into(configInstance.tableNameEventQueue).entries(eventsForProcessing)); | ||
tx._skipEventQueueBroadcase = false; | ||
return result; | ||
}; | ||
@@ -49,0 +57,0 @@ |
@@ -12,2 +12,4 @@ "use strict"; | ||
const { getSubdomainForTenantId } = require("./shared/cdsHelper"); | ||
const periodicEvents = require("./periodicEvents"); | ||
const { hashStringTo32Bit } = require("./shared/common"); | ||
@@ -17,14 +19,17 @@ const COMPONENT_NAME = "eventQueue/runner"; | ||
const EVENT_QUEUE_RUN_TS = "EVENT_QUEUE_RUN_TS"; | ||
const EVENT_QUEUE_RUN_PERIODIC_EVENT = "EVENT_QUEUE_RUN_PERIODIC_EVENT"; | ||
const OFFSET_FIRST_RUN = 10 * 1000; | ||
const singleTenant = () => _scheduleFunction(_executeRunForTenant); | ||
let tenantIdHash; | ||
const multiTenancyDb = () => _scheduleFunction(_multiTenancyDb); | ||
const singleTenant = () => _scheduleFunction(_checkPeriodicEventsSingleTenant, _executeRunForTenant); | ||
const multiTenancyRedis = () => _scheduleFunction(_multiTenancyRedis); | ||
const multiTenancyDb = () => _scheduleFunction(_multiTenancyPeriodicEvents, _multiTenancyDb); | ||
const _scheduleFunction = async (fn) => { | ||
const multiTenancyRedis = () => _scheduleFunction(_multiTenancyPeriodicEvents, _multiTenancyRedis); | ||
const _scheduleFunction = async (singleRunFn, periodicFn) => { | ||
const logger = cds.log(COMPONENT_NAME); | ||
const configInstance = eventQueueConfig.getConfigInstance(); | ||
const eventsForAutomaticRun = configInstance.events; | ||
const eventsForAutomaticRun = configInstance.allEvents; | ||
if (!eventsForAutomaticRun.length) { | ||
@@ -41,3 +46,3 @@ logger.warn("no events for automatic run are configured - skipping runner registration"); | ||
} | ||
return fn(); | ||
return periodicFn(); | ||
}; | ||
@@ -52,2 +57,3 @@ | ||
setTimeout(() => { | ||
singleRunFn(); | ||
fnWithRunningCheck(); | ||
@@ -64,2 +70,4 @@ const intervalRunner = new SetIntervalDriftSafe(configInstance.runInterval); | ||
const tenantIds = await cdsHelper.getAllTenantIds(); | ||
_checkAndTriggerPriodicEventUpdate(tenantIds); | ||
const runId = await _acquireRunId(emptyContext); | ||
@@ -75,16 +83,17 @@ | ||
const _multiTenancyDb = async () => { | ||
const logger = cds.log(COMPONENT_NAME); | ||
try { | ||
logger.info("executing event queue run for single instance and multi tenant"); | ||
const tenantIds = await cdsHelper.getAllTenantIds(); | ||
_executeAllTenants(tenantIds, EVENT_QUEUE_RUN_ID); | ||
} catch (err) { | ||
logger.error( | ||
`Couldn't fetch tenant ids for event queue processing! Next try after defined interval. Error: ${err}` | ||
); | ||
const _checkAndTriggerPriodicEventUpdate = (tenantIds) => { | ||
const hash = hashStringTo32Bit(JSON.stringify(tenantIds)); | ||
if (!tenantIdHash) { | ||
tenantIdHash = hash; | ||
return; | ||
} | ||
if (tenantIdHash && tenantIdHash !== hash) { | ||
cds.log(COMPONENT_NAME).info("tenant id hash changed, triggering updating periodic events!"); | ||
_multiTenancyPeriodicEvents().catch((err) => { | ||
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events! Error:", err); | ||
}); | ||
} | ||
}; | ||
const _executeAllTenants = (tenantIds, runId) => { | ||
const _executeAllTenantsGeneric = (tenantIds, runId, fn) => { | ||
const configInstance = eventQueueConfig.getConfigInstance(); | ||
@@ -102,3 +111,3 @@ const workerQueueInstance = getWorkerPoolInstance(); | ||
} | ||
await _executeRunForTenant(tenantId, runId); | ||
await fn(tenantId, runId); | ||
} catch (err) { | ||
@@ -113,2 +122,7 @@ cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
const _executeAllTenants = (tenantIds, runId) => _executeAllTenantsGeneric(tenantIds, runId, _executeRunForTenant); | ||
const _executePeriodicEventsAllTenants = (tenantIds, runId) => | ||
_executeAllTenantsGeneric(tenantIds, runId, _checkPeriodicEventsSingleTenant); | ||
const _executeRunForTenant = async (tenantId, runId) => { | ||
@@ -118,3 +132,3 @@ const logger = cds.log(COMPONENT_NAME); | ||
try { | ||
const eventsForAutomaticRun = configInstance.events; | ||
const eventsForAutomaticRun = configInstance.allEvents; | ||
const subdomain = await cdsHelper.getSubdomainForTenantId(tenantId); | ||
@@ -223,2 +237,56 @@ const context = new cds.EventContext({ | ||
const _multiTenancyDb = async () => { | ||
const logger = cds.log(COMPONENT_NAME); | ||
try { | ||
logger.info("executing event queue run for single instance and multi tenant"); | ||
const tenantIds = await cdsHelper.getAllTenantIds(); | ||
_checkAndTriggerPriodicEventUpdate(tenantIds); | ||
_executeAllTenants(tenantIds, EVENT_QUEUE_RUN_ID); | ||
} catch (err) { | ||
logger.error( | ||
`Couldn't fetch tenant ids for event queue processing! Next try after defined interval. Error: ${err}` | ||
); | ||
} | ||
}; | ||
const _multiTenancyPeriodicEvents = async () => { | ||
const logger = cds.log(COMPONENT_NAME); | ||
try { | ||
logger.info("executing event queue update periodic events"); | ||
const tenantIds = await cdsHelper.getAllTenantIds(); | ||
_executePeriodicEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_PERIODIC_EVENT); | ||
} catch (err) { | ||
logger.error(`Couldn't fetch tenant ids for updating periodic event processing! Error: ${err}`); | ||
} | ||
}; | ||
const _checkPeriodicEventsSingleTenant = async (tenantId) => { | ||
const logger = cds.log(COMPONENT_NAME); | ||
const configInstance = eventQueueConfig.getConfigInstance(); | ||
if (!configInstance.updatePeriodicEvents) { | ||
logger.info("updating of periodic events is disabled"); | ||
} | ||
try { | ||
const subdomain = await cdsHelper.getSubdomainForTenantId(tenantId); | ||
const context = new cds.EventContext({ | ||
tenant: tenantId, | ||
// NOTE: we need this because of logging otherwise logs would not contain the subdomain | ||
http: { req: { authInfo: { getSubdomain: () => subdomain } } }, | ||
}); | ||
cds.context = context; | ||
logger.info("executing updating periotic events", { | ||
tenantId, | ||
subdomain, | ||
}); | ||
await cdsHelper.executeInNewTransaction(context, "update-periodic-events", async (tx) => { | ||
await periodicEvents.checkAndInsertPeriodicEvents(tx.context); | ||
}); | ||
} catch (err) { | ||
logger.error(`Couldn't process eventQueue for tenant! Next try after defined interval. Error: ${err}`, { | ||
tenantId, | ||
redisEnabled: configInstance.redisEnabled, | ||
}); | ||
} | ||
}; | ||
module.exports = { | ||
@@ -225,0 +293,0 @@ singleTenant, |
"use strict"; | ||
const crypto = require("crypto"); | ||
const { floor, abs, min } = Math; | ||
@@ -121,2 +123,14 @@ | ||
module.exports = { arrayToFlatMap, Funnel, limiter, isValidDate }; | ||
const processChunkedSync = (inputs, chunkSize, chunkHandler) => { | ||
let start = 0; | ||
while (start < inputs.length) { | ||
let end = start + chunkSize > inputs.length ? inputs.length : start + chunkSize; | ||
const chunk = inputs.slice(start, end); | ||
chunkHandler(chunk); | ||
start = end; | ||
} | ||
}; | ||
const hashStringTo32Bit = (value) => crypto.createHash("sha256").update(String(value)).digest("base64").slice(0, 32); | ||
module.exports = { arrayToFlatMap, Funnel, limiter, isValidDate, processChunkedSync, hashStringTo32Bit }; |
129553
29
3062