@cap-js-community/event-queue
Advanced tools
Comparing version 1.8.3 to 1.8.4
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.8.3", | ||
"version": "1.8.4", | ||
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -61,3 +61,2 @@ "use strict"; | ||
#processEventsAfterPublish; | ||
#skipCsnCheck; | ||
#registerAsEventProcessor; | ||
@@ -82,3 +81,4 @@ #disableRedis; | ||
#crashOnRedisUnavailable; | ||
#tenantIdFilterCb; | ||
#tenantIdFilterTokenInfoCb; | ||
#tenantIdFilterEventProcessingCb; | ||
static #instance; | ||
@@ -99,3 +99,2 @@ constructor() { | ||
this.#processEventsAfterPublish = null; | ||
this.#skipCsnCheck = null; | ||
this.#disableRedis = null; | ||
@@ -299,3 +298,6 @@ this.#env = getEnvInstance(); | ||
if (this.#eventMap[this.generateKey(CAP_EVENT_TYPE, serviceName)]) { | ||
return; | ||
const index = this.#config.events.findIndex( | ||
(event) => event.type === CAP_EVENT_TYPE && event.subType === serviceName | ||
); | ||
this.#config.events.splice(index, 1); | ||
} | ||
@@ -537,10 +539,18 @@ | ||
get tenantIdFilterCb() { | ||
return this.#tenantIdFilterCb; | ||
get tenantIdFilterTokenInfo() { | ||
return this.#tenantIdFilterTokenInfoCb; | ||
} | ||
set tenantIdFilterCb(value) { | ||
this.#tenantIdFilterCb = value; | ||
set tenantIdFilterTokenInfo(value) { | ||
this.#tenantIdFilterTokenInfoCb = value; | ||
} | ||
get tenantIdFilterEventProcessing() { | ||
return this.#tenantIdFilterEventProcessingCb; | ||
} | ||
set tenantIdFilterEventProcessing(value) { | ||
this.#tenantIdFilterEventProcessingCb = value; | ||
} | ||
set globalTxTimeout(value) { | ||
@@ -625,10 +635,2 @@ this.#globalTxTimeout = value; | ||
set skipCsnCheck(value) { | ||
this.#skipCsnCheck = value; | ||
} | ||
get skipCsnCheck() { | ||
return this.#skipCsnCheck; | ||
} | ||
set disableRedis(value) { | ||
@@ -635,0 +637,0 @@ this.#disableRedis = value; |
@@ -24,5 +24,5 @@ "use strict"; | ||
TenantIdCheckTypes: { | ||
getAllTenantIds: "getAllTenantIds", | ||
eventProcessing: "eventProcessing", | ||
getTokenInfo: "getTokenInfo", | ||
}, | ||
}; |
@@ -549,2 +549,12 @@ "use strict"; | ||
handleErrorTx(error) { | ||
this.logger.error("Error in commit|rollback transaction, check handlers and constraints!", error, { | ||
eventType: this.#eventType, | ||
eventSubType: this.#eventSubType, | ||
}); | ||
this.__queueEntries.forEach((queueEntry) => { | ||
this.#determineAndAddEventStatusToMap(queueEntry.ID, EventProcessingStatus.Error); | ||
}); | ||
} | ||
handleInvalidPayloadReturned(queueEntry) { | ||
@@ -551,0 +561,0 @@ this.logger.error( |
@@ -15,3 +15,3 @@ import * as cds from "@sap/cds"; | ||
export declare const TenantIdCheckTypes: { | ||
getAllTenantIds: "getAllTenantIds"; | ||
eventProcessing: "eventProcessing"; | ||
getTokenInfo: "getTokenInfo"; | ||
@@ -186,50 +186,59 @@ }; | ||
getEventConfig(type: string, subType: string): any; | ||
isCapOutboxEvent(type: string): boolean; | ||
hasEventAfterCommitFlag(type: string, subType: string): boolean; | ||
_checkRedisIsBound(): boolean; | ||
checkRedisEnabled(): boolean; | ||
publishEventBlockList(): boolean; | ||
crashOnRedisUnavailable(): boolean; | ||
getEventConfig(type: any, subType: any): any; | ||
isCapOutboxEvent(type: any): boolean; | ||
hasEventAfterCommitFlag(type: any, subType: any): any; | ||
shouldBeProcessedInThisApplication(type: any, subType: any): boolean; | ||
checkRedisEnabled(): any; | ||
attachConfigChangeHandler(): void; | ||
attachRedisUnsubscribeHandler(): void; | ||
executeUnsubscribeHandlers(tenantId: string): void; | ||
handleUnsubscribe(tenantId: string): void; | ||
attachUnsubscribeHandler(cb: Function): void; | ||
publishConfigChange(key: string, value: any): void; | ||
blockEvent(type: string, subType: string, isPeriodic: boolean, tenant?: string): void; | ||
executeUnsubscribeHandlers(tenantId: any): void; | ||
handleUnsubscribe(tenantId: any): void; | ||
attachUnsubscribeHandler(cb: any): void; | ||
publishConfigChange(key: any, value: any): void; | ||
blockEvent(type: any, subType: any, isPeriodic: any, tenant?: string): void; | ||
clearPeriodicEventBlockList(): void; | ||
unblockEvent(type: string, subType: string, isPeriodic: boolean, tenant?: string): void; | ||
addCAPOutboxEvent(serviceName: string, config: any): void; | ||
isEventBlocked(type: string, subType: string, isPeriodicEvent: boolean, tenant: string): boolean; | ||
unblockEvent(type: any, subType: any, isPeriodic: any, tenant?: string): void; | ||
addCAPOutboxEvent(serviceName: any, config: any): void; | ||
isEventBlocked(type: any, subType: any, isPeriodicEvent: any, tenant: any): any; | ||
set isEventQueueActive(value: boolean); | ||
get isEventQueueActive(): boolean; | ||
set isEventQueueActive(value: boolean); | ||
set fileContent(config: any); | ||
get fileContent(): any; | ||
get events(): any[]; | ||
get periodicEvents(): any[]; | ||
isPeriodicEvent(type: string, subType: string): boolean; | ||
get allEvents(): any[]; | ||
generateKey(type: any, subType: any): string; | ||
removeEvent(type: any, subType: any): void; | ||
isTenantUnsubscribed(tenantId: any): any; | ||
get events(): any; | ||
get periodicEvents(): any; | ||
isPeriodicEvent(type: any, subType: any): any; | ||
get allEvents(): any; | ||
set forUpdateTimeout(value: number); | ||
get forUpdateTimeout(): number; | ||
set globalTxTimeout(value: number); | ||
get globalTxTimeout(): number; | ||
set forUpdateTimeout(value: number); | ||
set globalTxTimeout(value: number); | ||
get runInterval(): number | null; | ||
set runInterval(value: number); | ||
get redisEnabled(): boolean | null; | ||
set redisEnabled(value: boolean | null); | ||
set publishEventBlockList(value: any); | ||
get publishEventBlockList(): any; | ||
set crashOnRedisUnavailable(value: any); | ||
get crashOnRedisUnavailable(): any; | ||
set tenantIdFilterTokenInfo(value: any); | ||
get tenantIdFilterTokenInfo(): any; | ||
set tenantIdFilterEventProcessing(value: any); | ||
get tenantIdFilterEventProcessing(): any; | ||
set runInterval(value: any); | ||
get runInterval(): any; | ||
set redisEnabled(value: any); | ||
get redisEnabled(): any; | ||
set initialized(value: boolean); | ||
get initialized(): boolean; | ||
set initialized(value: boolean); | ||
set cronTimezone(value: any); | ||
get cronTimezone(): any; | ||
set instanceLoadLimit(value: number); | ||
get instanceLoadLimit(): number; | ||
set instanceLoadLimit(value: number); | ||
set isEventBlockedCb(value: any); | ||
get isEventBlockedCb(): any; | ||
set isEventBlockedCb(value: any); | ||
get tableNameEventQueue(): string; | ||
get tableNameEventLock(): string; | ||
set configFilePath(value: string | null); | ||
get configFilePath(): string | null; | ||
set configFilePath(value: any); | ||
get configFilePath(): any; | ||
set processEventsAfterPublish(value: any); | ||
get processEventsAfterPublish(): any; | ||
set skipCsnCheck(value: any); | ||
get skipCsnCheck(): any; | ||
set disableRedis(value: any); | ||
@@ -253,2 +262,4 @@ get disableRedis(): any; | ||
get insertEventsBeforeCommit(): any; | ||
set enableCAPTelemetry(value: any); | ||
get enableCAPTelemetry(): any; | ||
get isMultiTenancy(): boolean; | ||
@@ -255,0 +266,0 @@ } |
@@ -45,3 +45,2 @@ "use strict"; | ||
["crashOnRedisUnavailable", false], | ||
["tenantIdFilterCb", null], | ||
]; | ||
@@ -70,3 +69,2 @@ | ||
* @param {string} [options.crashOnRedisUnavailable=true] - If enabled an error is thrown if the redis connection check is not successful | ||
* @param {function} [options.tenantIdFilterCb=null] - Allows to set customer filter function to filter the tenants ids which should be processed in the event-queue | ||
*/ | ||
@@ -193,3 +191,3 @@ const initialize = async (options = {}) => { | ||
const isTestProfile = cds.env.profiles.find((profile) => profile.includes("test")); | ||
if (isTestProfile) { | ||
if (isTestProfile || !config.redisEnabled) { | ||
return; | ||
@@ -196,0 +194,0 @@ } |
@@ -15,3 +15,9 @@ "use strict"; | ||
function outboxed(srv, customOpts) { | ||
// outbox max. once | ||
if (!(new.target || customOpts)) { | ||
const former = srv[OUTBOXED]; | ||
if (former) { | ||
return former; | ||
} | ||
} | ||
const logger = cds.log(COMPONENT_NAME); | ||
@@ -25,10 +31,4 @@ const outboxOpts = Object.assign( | ||
if (!new.target) { | ||
const former = srv[OUTBOXED]; | ||
if (former) { | ||
if (outboxOpts.kind === "persistent-outbox") { | ||
config.addCAPOutboxEvent(srv.name, outboxOpts); | ||
} | ||
return former; | ||
} | ||
if (outboxOpts.kind === "persistent-outbox") { | ||
config.addCAPOutboxEvent(srv.name, outboxOpts); | ||
} | ||
@@ -41,8 +41,6 @@ | ||
if (!new.target) { | ||
Object.defineProperty(srv, OUTBOXED, { value: outboxedSrv }); | ||
if (!srv[OUTBOXED]) { | ||
Object.defineProperty(srv, OUTBOXED, { value: outboxedSrv }); | ||
} | ||
} | ||
if (outboxOpts.kind === "persistent-outbox") { | ||
config.addCAPOutboxEvent(srv.name, outboxOpts); | ||
} | ||
outboxedSrv.handle = async function (req) { | ||
@@ -49,0 +47,0 @@ const context = req.context || cds.context; |
@@ -204,52 +204,63 @@ "use strict"; | ||
const processEventMap = async (eventTypeInstance) => { | ||
eventTypeInstance.startPerformanceTracerEvents(); | ||
await eventTypeInstance.beforeProcessingEvents(); | ||
eventTypeInstance.logStartMessage(); | ||
if (eventTypeInstance.commitOnEventLevel) { | ||
eventTypeInstance.txUsageAllowed = false; | ||
const processEventMap = async (instance) => { | ||
instance.startPerformanceTracerEvents(); | ||
await instance.beforeProcessingEvents(); | ||
instance.logStartMessage(); | ||
if (instance.commitOnEventLevel) { | ||
instance.txUsageAllowed = false; | ||
} | ||
await limiter( | ||
eventTypeInstance.parallelEventProcessing, | ||
Object.entries(eventTypeInstance.eventProcessingMap), | ||
instance.parallelEventProcessing, | ||
Object.entries(instance.eventProcessingMap), | ||
async ([key, { queueEntries, payload }]) => { | ||
if (eventTypeInstance.commitOnEventLevel) { | ||
if (instance.commitOnEventLevel) { | ||
let statusMap; | ||
await executeInNewTransaction( | ||
eventTypeInstance.baseContext, | ||
`eventQueue-processEvent-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`, | ||
instance.baseContext, | ||
`eventQueue-processEvent-${instance.eventType}##${instance.eventSubType}`, | ||
async (tx) => { | ||
statusMap = await _processEvent(eventTypeInstance, tx.context, key, queueEntries, payload); | ||
if ( | ||
eventTypeInstance.statusMapContainsError(statusMap) || | ||
eventTypeInstance.shouldRollbackTransaction(key) | ||
) { | ||
statusMap = await _processEvent(instance, tx.context, key, queueEntries, payload); | ||
const shouldRollback = | ||
instance.statusMapContainsError(statusMap) || instance.shouldRollbackTransaction(key); | ||
if (shouldRollback) { | ||
await tx.rollback(); | ||
await _commitStatusInNewTx(instance, statusMap); | ||
} else { | ||
await instance.persistEventStatus(tx, { | ||
skipChecks: true, | ||
statusMap, | ||
}); | ||
} | ||
} | ||
); | ||
await executeInNewTransaction( | ||
eventTypeInstance.baseContext, | ||
`eventQueue-persistStatus-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`, | ||
async (tx) => { | ||
eventTypeInstance.processEventContext = tx.context; | ||
await eventTypeInstance.persistEventStatus(tx, { | ||
skipChecks: true, | ||
statusMap, | ||
}); | ||
} | ||
); | ||
} else { | ||
await _processEvent(eventTypeInstance, eventTypeInstance.context, key, queueEntries, payload); | ||
await _processEvent(instance, instance.context, key, queueEntries, payload); | ||
} | ||
} | ||
).finally(() => { | ||
eventTypeInstance.clearEventProcessingContext(); | ||
if (eventTypeInstance.commitOnEventLevel) { | ||
eventTypeInstance.txUsageAllowed = true; | ||
} | ||
}); | ||
eventTypeInstance.endPerformanceTracerEvents(); | ||
) | ||
.catch((err) => { | ||
instance.handleErrorTx(err); | ||
}) | ||
.finally(() => { | ||
instance.clearEventProcessingContext(); | ||
if (instance.commitOnEventLevel) { | ||
instance.txUsageAllowed = true; | ||
} | ||
}); | ||
instance.endPerformanceTracerEvents(); | ||
}; | ||
const _commitStatusInNewTx = async (eventTypeInstance, statusMap) => | ||
await executeInNewTransaction( | ||
eventTypeInstance.baseContext, | ||
`eventQueue-persistStatus-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`, | ||
async (tx) => { | ||
eventTypeInstance.processEventContext = tx.context; | ||
await eventTypeInstance.persistEventStatus(tx, { | ||
skipChecks: true, | ||
statusMap, | ||
}); | ||
} | ||
); | ||
const _checkEventIsBlocked = async (baseInstance) => { | ||
@@ -256,0 +267,0 @@ const isEventBlockedCb = config.isEventBlockedCb; |
@@ -13,2 +13,3 @@ "use strict"; | ||
const trace = require("../shared/openTelemetry"); | ||
const { TenantIdCheckTypes } = require("../constants"); | ||
@@ -63,2 +64,6 @@ const EVENT_MESSAGE_CHANNEL = "EVENT_QUEUE_MESSAGE_CHANNEL"; | ||
if (!config.redisEnabled) { | ||
const tenantShouldBeProcessed = await common.isTenantIdValidCb(TenantIdCheckTypes.eventProcessing, tenantId); | ||
if (!tenantShouldBeProcessed) { | ||
return; | ||
} | ||
await _processLocalWithoutRedis(tenantId, events); | ||
@@ -71,2 +76,5 @@ return; | ||
const eventConfig = config.getEventConfig(type, subType); | ||
if (!eventConfig) { | ||
continue; | ||
} | ||
for (let i = 0; i < TRIES_FOR_PUBLISH_PERIODIC_EVENT; i++) { | ||
@@ -73,0 +81,0 @@ const result = eventConfig.multiInstanceProcessing |
@@ -9,2 +9,3 @@ "use strict"; | ||
const common = require("../shared/common"); | ||
const { TenantIdCheckTypes } = require("../constants"); | ||
@@ -26,2 +27,6 @@ const EVENT_MESSAGE_CHANNEL = "EVENT_QUEUE_MESSAGE_CHANNEL"; | ||
const { lockId, tenantId, type, subType } = JSON.parse(messageData); | ||
const tenantShouldBeProcessed = await common.isTenantIdValidCb(TenantIdCheckTypes.eventProcessing, tenantId); | ||
if (!tenantShouldBeProcessed) { | ||
return; | ||
} | ||
logger.debug("received redis event", { | ||
@@ -28,0 +33,0 @@ tenantId, |
@@ -114,13 +114,15 @@ "use strict"; | ||
const dummyContext = new cds.EventContext({}); | ||
const couldAcquireLock = await trace( | ||
dummyContext, | ||
"acquire-lock-master-runner", | ||
async () => { | ||
return await distributedLock.acquireLock(dummyContext, EVENT_QUEUE_RUN_REDIS_CHECK, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
tenantScoped: false, | ||
}); | ||
}, | ||
{ newRootSpan: true } | ||
); | ||
const couldAcquireLock = config.tenantIdFilterEventProcessing | ||
? true | ||
: await trace( | ||
dummyContext, | ||
"acquire-lock-master-runner", | ||
async () => { | ||
return await distributedLock.acquireLock(dummyContext, EVENT_QUEUE_RUN_REDIS_CHECK, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
tenantScoped: false, | ||
}); | ||
}, | ||
{ newRootSpan: true } | ||
); | ||
if (!couldAcquireLock) { | ||
@@ -241,3 +243,3 @@ return; | ||
await trace(tenantContext, "update-periodic-events-for-tenant", async () => { | ||
if (!config.redisEnabled) { | ||
if (!config.redisEnabled && !config.tenantIdFilterEventProcessing) { | ||
const couldAcquireLock = await distributedLock.acquireLock(context, EVENT_QUEUE_UPDATE_PERIODIC_EVENTS, { | ||
@@ -438,3 +440,3 @@ expiryTime: eventQueueConfig.runInterval * 0.95, | ||
async () => { | ||
if (config.redisEnabled) { | ||
if (config.redisEnabled && !config.tenantIdFilterEventProcessing) { | ||
const couldAcquireLock = await distributedLock.acquireLock(dummyContext, EVENT_QUEUE_UPDATE_PERIODIC_EVENTS, { | ||
@@ -441,0 +443,0 @@ expiryTime: 60 * 1000, // short living lock --> assume we do not have 2 onboards within 1 minute |
@@ -26,2 +26,3 @@ "use strict"; | ||
const logger = cds.log(COMPONENT_NAME); | ||
let transactionRollbackPromise = Promise.resolve(false); | ||
try { | ||
@@ -40,3 +41,11 @@ const user = new cds.User.Privileged({ id: config.userId, tokenInfo: await common.getTokenInfo(context.tenant) }); | ||
tx.context._ = context._ ?? {}; | ||
return await fn(tx, ...parameters); | ||
return new Promise((outerResolve, outerReject) => { | ||
transactionRollbackPromise = new Promise((resolve) => { | ||
tx.context.on("succeeded", () => resolve(false)); | ||
tx.context.on("failed", () => resolve(true)); | ||
fn(tx, ...parameters) | ||
.then(outerResolve) | ||
.catch(outerReject); | ||
}); | ||
}); | ||
} | ||
@@ -56,3 +65,6 @@ ); | ||
}, | ||
async (tx) => fn(tx, ...parameters) | ||
async (tx) => { | ||
await fn(tx, ...parameters); | ||
transactionRollbackPromise = false; | ||
} | ||
); | ||
@@ -74,6 +86,9 @@ } else { | ||
}; | ||
await fn(contextTx, ...parameters).finally(() => (contextTx.rollback = txRollback)); | ||
await fn(contextTx, ...parameters) | ||
.then(() => (transactionRollbackPromise = false)) | ||
.finally(() => (contextTx.rollback = txRollback)); | ||
} | ||
} | ||
} catch (err) { | ||
const transactionRollback = await transactionRollbackPromise; | ||
if (err instanceof VError) { | ||
@@ -83,5 +98,9 @@ Object.assign(err.jse_info, { | ||
}); | ||
throw err; | ||
if (transactionRollback) { | ||
throw err; | ||
} else { | ||
logger.error("business transaction commited but succeeded|done|failed threw a error!", err); | ||
} | ||
} else { | ||
throw new VError( | ||
const nestedError = new VError( | ||
{ | ||
@@ -94,2 +113,7 @@ name: VERROR_CLUSTER_NAME, | ||
); | ||
if (transactionRollback) { | ||
throw err; | ||
} else { | ||
logger.error("business transaction commited but succeeded|done|failed threw a error!", nestedError); | ||
} | ||
} | ||
@@ -118,3 +142,9 @@ } finally { | ||
.map((tenant) => tenant.subscribedTenantId ?? tenant.tenant) | ||
.filter((tenantId) => common.isTenantIdValidCb(TenantIdCheckTypes.getAllTenantIds, tenantId)); | ||
.reduce(async (result, tenantId) => { | ||
result = await result; | ||
if (await common.isTenantIdValidCb(TenantIdCheckTypes.eventProcessing, tenantId)) { | ||
result.push(tenantId); | ||
} | ||
return result; | ||
}, []); | ||
}; | ||
@@ -121,0 +151,0 @@ |
@@ -7,2 +7,4 @@ "use strict"; | ||
const xssec = require("@sap/xssec"); | ||
const VError = require("verror"); | ||
const config = require("../config"); | ||
@@ -48,5 +50,20 @@ const { TenantIdCheckTypes } = require("../constants"); | ||
} | ||
return Promise.allSettled(returnPromises); | ||
return promiseAllDone(returnPromises); | ||
}; | ||
const promiseAllDone = async (iterable) => { | ||
const results = await Promise.allSettled(iterable); | ||
const rejects = results.filter((entry) => { | ||
return entry.status === "rejected"; | ||
}); | ||
if (rejects.length === 1) { | ||
return Promise.reject(rejects[0].reason); | ||
} else if (rejects.length > 1) { | ||
return Promise.reject(new VError.MultiError(rejects.map((reject) => reject.reason))); | ||
} | ||
return results.map((entry) => { | ||
return entry.value; | ||
}); | ||
}; | ||
const isValidDate = (value) => { | ||
@@ -120,6 +137,19 @@ if (typeof value === "string") { | ||
const isTenantIdValidCb = (checkType, tenantId) => { | ||
if (config.tenantIdFilterCb) { | ||
return config.tenantIdFilterCb(checkType, tenantId); | ||
} else { | ||
const isTenantIdValidCb = async (checkType, tenantId) => { | ||
let cb; | ||
switch (checkType) { | ||
case TenantIdCheckTypes.getTokenInfo: | ||
cb = config.tenantIdFilterTokenInfo; | ||
break; | ||
case TenantIdCheckTypes.eventProcessing: | ||
cb = config.tenantIdFilterEventProcessing; | ||
break; | ||
default: | ||
cb = async () => true; | ||
} | ||
try { | ||
return cb ? await cb(tenantId) : true; | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("failed in custom tenant id filter callback. Returning true.", err); | ||
return true; | ||
@@ -137,2 +167,3 @@ } | ||
isTenantIdValidCb, | ||
promiseAllDone, | ||
__: { | ||
@@ -139,0 +170,0 @@ clearTokenInfoCache: () => (getTokenInfo._tokenInfoCache = {}), |
210290
5254