@cap-js-community/event-queue
Advanced tools
Comparing version 1.4.5 to 1.4.6
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.4.5", | ||
"version": "1.4.6", | ||
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -47,12 +47,12 @@ "main": "src/index.js", | ||
"@sap/xssec": "^3.6.1", | ||
"redis": "^4.6.13", | ||
"redis": "^4.6.14", | ||
"verror": "^1.10.1", | ||
"yaml": "^2.4.1" | ||
"yaml": "^2.4.2" | ||
}, | ||
"devDependencies": { | ||
"@cap-js/hana": "^0.1.0", | ||
"@cap-js/sqlite": "^1.5.0", | ||
"@sap/cds": "^7.8.0", | ||
"@cap-js/hana": "^0.4.0", | ||
"@cap-js/sqlite": "^1.7.1", | ||
"@sap/cds": "^7.9.2", | ||
"@sap/cds-dk": "^7.8.0", | ||
"eslint": "^8.56.0", | ||
"eslint": "^8.57.0", | ||
"eslint-config-prettier": "^9.1.0", | ||
@@ -59,0 +59,0 @@ "eslint-plugin-jest": "^27.9.0", |
@@ -13,3 +13,4 @@ "use strict"; | ||
const REDIS_CONFIG_CHANNEL = "EVENT_QUEUE_CONFIG_CHANNEL"; | ||
const REDIS_CONFIG_BLOCKLIST_CHANNEL = "REDIS_CONFIG_BLOCKLIST_CHANNEL"; | ||
const REDIS_OFFBOARD_TENANT_CHANNEL = "REDIS_OFFBOARD_TENANT_CHANNEL"; | ||
const REDIS_CONFIG_BLOCKLIST_CHANNEL = "EVENT_QUEUE_REDIS_CONFIG_BLOCKLIST_CHANNEL"; | ||
const COMPONENT_NAME = "/eventQueue/config"; | ||
@@ -23,4 +24,4 @@ const MIN_INTERVAL_SEC = 10; | ||
const CAP_EVENT_TYPE = "CAP_OUTBOX"; | ||
const CAP_PARALLEL_DEFAULT = 5; | ||
const DELETE_TENANT_BLOCK_AFTER_MS = 5 * 60 * 1000; | ||
@@ -72,2 +73,4 @@ const BASE_PERIODIC_EVENTS = [ | ||
#insertEventsBeforeCommit; | ||
#unsubscribeHandlers = []; | ||
#unsubscribedTenants = {}; | ||
static #instance; | ||
@@ -132,2 +135,47 @@ constructor() { | ||
attachRedisUnsubscribeHandler() { | ||
this.#logger.info("attached redis handle for unsubscribe events"); | ||
redis.subscribeRedisChannel(this.#redisOptions, REDIS_OFFBOARD_TENANT_CHANNEL, (messageData) => { | ||
try { | ||
const { tenantId } = JSON.parse(messageData); | ||
this.#logger.info("received unsubscribe broadcast event", { tenantId }); | ||
this.executeUnsubscribeHandlers(tenantId); | ||
} catch (err) { | ||
this.#logger.error("could not parse unsubscribe broadcast event", err, { | ||
messageData, | ||
}); | ||
} | ||
}); | ||
} | ||
executeUnsubscribeHandlers(tenantId) { | ||
this.#unsubscribedTenants[tenantId] = true; | ||
setTimeout(() => delete this.#unsubscribedTenants[tenantId], DELETE_TENANT_BLOCK_AFTER_MS); | ||
for (const unsubscribeHandler of this.#unsubscribeHandlers) { | ||
try { | ||
unsubscribeHandler(tenantId); | ||
} catch (err) { | ||
this.#logger.error("could executing unsubscribe handler", err, { | ||
tenantId, | ||
}); | ||
} | ||
} | ||
} | ||
handleUnsubscribe(tenantId) { | ||
if (this.redisEnabled) { | ||
redis | ||
.publishMessage(this.#redisOptions, REDIS_OFFBOARD_TENANT_CHANNEL, JSON.stringify({ tenantId })) | ||
.catch((error) => { | ||
this.#logger.error(`publishing tenant unsubscribe failed. tenantId: ${tenantId}`, error); | ||
}); | ||
} else { | ||
this.executeUnsubscribeHandlers(tenantId); | ||
} | ||
} | ||
attachUnsubscribeHandler(cb) { | ||
this.#unsubscribeHandlers.push(cb); | ||
} | ||
publishConfigChange(key, value) { | ||
@@ -327,2 +375,6 @@ if (!this.redisEnabled) { | ||
isTenantUnsubscribed(tenantId) { | ||
return this.#unsubscribedTenants[tenantId]; | ||
} | ||
get fileContent() { | ||
@@ -329,0 +381,0 @@ return this.#config; |
@@ -139,1 +139,73 @@ import * as cds from "@sap/cds"; | ||
): Promise<any>; | ||
declare class Config { | ||
constructor(); | ||
getEventConfig(type: string, subType: string): any; | ||
isCapOutboxEvent(type: string): boolean; | ||
hasEventAfterCommitFlag(type: string, subType: string): boolean; | ||
_checkRedisIsBound(): boolean; | ||
checkRedisEnabled(): boolean; | ||
attachConfigChangeHandler(): void; | ||
attachRedisUnsubscribeHandler(): void; | ||
executeUnsubscribeHandlers(tenantId: string): void; | ||
handleUnsubscribe(tenantId: string): void; | ||
attachUnsubscribeHandler(cb: Function): void; | ||
publishConfigChange(key: string, value: any): void; | ||
blockEvent(type: string, subType: string, isPeriodic: boolean, tenant?: string): void; | ||
clearPeriodicEventBlockList(): void; | ||
unblockEvent(type: string, subType: string, isPeriodic: boolean, tenant?: string): void; | ||
addCAPOutboxEvent(serviceName: string, config: any): void; | ||
isEventBlocked(type: string, subType: string, isPeriodicEvent: boolean, tenant: string): boolean; | ||
get isEventQueueActive(): boolean; | ||
set isEventQueueActive(value: boolean); | ||
set fileContent(config: any); | ||
get fileContent(): any; | ||
get events(): any[]; | ||
get periodicEvents(): any[]; | ||
isPeriodicEvent(type: string, subType: string): boolean; | ||
get allEvents(): any[]; | ||
get forUpdateTimeout(): number; | ||
get globalTxTimeout(): number; | ||
set forUpdateTimeout(value: number); | ||
set globalTxTimeout(value: number); | ||
get runInterval(): number | null; | ||
set runInterval(value: number); | ||
get redisEnabled(): boolean | null; | ||
set redisEnabled(value: boolean | null); | ||
get initialized(): boolean; | ||
set initialized(value: boolean); | ||
get instanceLoadLimit(): number; | ||
set instanceLoadLimit(value: number); | ||
get isEventBlockedCb(): any; | ||
set isEventBlockedCb(value: any); | ||
get tableNameEventQueue(): string; | ||
get tableNameEventLock(): string; | ||
set configFilePath(value: string | null); | ||
get configFilePath(): string | null; | ||
set processEventsAfterPublish(value: any); | ||
get processEventsAfterPublish(): any; | ||
set skipCsnCheck(value: any); | ||
get skipCsnCheck(): any; | ||
set disableRedis(value: any); | ||
get disableRedis(): any; | ||
set updatePeriodicEvents(value: any); | ||
get updatePeriodicEvents(): any; | ||
set registerAsEventProcessor(value: any); | ||
get registerAsEventProcessor(): any; | ||
set thresholdLoggingEventProcessing(value: any); | ||
get thresholdLoggingEventProcessing(): any; | ||
set useAsCAPOutbox(value: any); | ||
get useAsCAPOutbox(): any; | ||
set userId(value: any); | ||
get userId(): any; | ||
set cleanupLocksAndEventsForDev(value: any); | ||
get cleanupLocksAndEventsForDev(): any; | ||
set redisOptions(value: any); | ||
get redisOptions(): any; | ||
set insertEventsBeforeCommit(value: any); | ||
get insertEventsBeforeCommit(): any; | ||
get isMultiTenancy(): boolean; | ||
} | ||
export const config: Config; |
@@ -131,2 +131,20 @@ "use strict"; | ||
const registerEventProcessors = () => { | ||
cds.on("listening", () => { | ||
cds.connect | ||
.to("cds.xt.DeploymentService") | ||
.then((ds) => { | ||
cds.log(COMPONENT).info("event-queue unsubscribe handler registered", { | ||
redisEnabled: config.redisEnabled, | ||
}); | ||
ds.after("unsubscribe", async (_, req) => { | ||
const { tenant } = req.data; | ||
config.handleUnsubscribe(tenant); | ||
}); | ||
}) | ||
.catch( | ||
() => {} // ignore errors as the DeploymentService is most of the time only available in the mtx sidecar | ||
); | ||
}); | ||
config.redisEnabled && config.attachRedisUnsubscribeHandler(); | ||
if (!config.registerAsEventProcessor) { | ||
@@ -133,0 +151,0 @@ return; |
@@ -271,2 +271,6 @@ "use strict"; | ||
if (!eventBlocked) { | ||
eventBlocked = config.isTenantUnsubscribed(baseInstance.context.tenant); | ||
} | ||
if (eventBlocked) { | ||
@@ -276,2 +280,3 @@ baseInstance.logger.info("skipping run because event is blocked by configuration", { | ||
subType: baseInstance.eventSubType, | ||
tenantUnsubscribed: config.isTenantUnsubscribed(baseInstance.context.tenant), | ||
}); | ||
@@ -278,0 +283,0 @@ } |
@@ -13,3 +13,6 @@ "use strict"; | ||
#scheduledEvents = {}; | ||
constructor() {} | ||
#eventsByTenants = {}; | ||
constructor() { | ||
config.attachUnsubscribeHandler(this.clearForTenant.bind(this)); | ||
} | ||
@@ -28,3 +31,5 @@ scheduleEvent(tenantId, type, subType, startAfter) { | ||
}); | ||
setTimeout(() => { | ||
this.#eventsByTenants[tenantId] ??= {}; | ||
const timeoutId = setTimeout(() => { | ||
delete this.#eventsByTenants[tenantId][timeoutId]; | ||
delete this.#scheduledEvents[key]; | ||
@@ -40,4 +45,9 @@ redisPub.broadcastEvent(tenantId, { type, subType }).catch((err) => { | ||
}, relative).unref(); | ||
this.#eventsByTenants[tenantId][timeoutId] = true; | ||
} | ||
clearForTenant(tenantId) { | ||
Object.values(this.#eventsByTenants[tenantId]).forEach((timeoutId) => clearTimeout(timeoutId)); | ||
} | ||
calculateOffset(type, subType, startAfter) { | ||
@@ -44,0 +54,0 @@ const eventConfig = config.getEventConfig(type, subType); |
174459
4334
Updatedredis@^4.6.14
Updatedyaml@^2.4.2