@cap-js-community/event-queue
Advanced tools
Comparing version 1.0.3 to 1.1.0
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.0.3", | ||
"version": "1.1.0", | ||
"description": "An event queue that enables secure transactional processing of asynchronous events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -50,7 +50,7 @@ "main": "src/index.js", | ||
"devDependencies": { | ||
"@sap/cds": "7.5.1", | ||
"@sap/cds-dk": "7.5.0", | ||
"@sap/cds": "7.5.2", | ||
"@sap/cds-dk": "7.5.1", | ||
"eslint": "8.56.0", | ||
"eslint-config-prettier": "9.1.0", | ||
"eslint-plugin-jest": "27.6.1", | ||
"eslint-plugin-jest": "27.6.2", | ||
"eslint-plugin-node": "11.1.0", | ||
@@ -61,3 +61,3 @@ "express": "4.18.2", | ||
"prettier": "2.8.8", | ||
"sqlite3": "5.1.7-rc.0" | ||
"sqlite3": "5.1.7" | ||
}, | ||
@@ -64,0 +64,0 @@ "homepage": "https://cap-js-community.github.io/event-queue/", |
@@ -13,3 +13,3 @@ "use strict"; | ||
const REDIS_CONFIG_BLOCKLIST_CHANNEL = "REDIS_CONFIG_BLOCKLIST_CHANNEL"; | ||
const COMPONENT_NAME = "eventQueue/config"; | ||
const COMPONENT_NAME = "/eventQueue/config"; | ||
const MIN_INTERVAL_SEC = 10; | ||
@@ -20,3 +20,6 @@ const DEFAULT_LOAD = 1; | ||
const COMMAND_UNBLOCK = "EVENT_QUEUE_EVENT_UNBLOCK"; | ||
const CAP_EVENT_TYPE = "CAP_OUTBOX"; | ||
const CAP_PARALLEL_DEFAULT = 5; | ||
const BASE_PERIODIC_EVENTS = [ | ||
@@ -56,2 +59,3 @@ { | ||
#thresholdLoggingEventProcessing; | ||
#useAsCAPOutbox; | ||
static #instance; | ||
@@ -82,2 +86,6 @@ constructor() { | ||
isCapOutboxEvent(type) { | ||
return type === CAP_EVENT_TYPE; | ||
} | ||
hasEventAfterCommitFlag(type, subType) { | ||
@@ -187,2 +195,26 @@ return this.#eventMap[this.generateKey(type, subType)]?.processAfterCommit ?? true; | ||
addCAPOutboxEvent(serviceName, config) { | ||
if (this.#eventMap[this.generateKey(CAP_EVENT_TYPE, serviceName)]) { | ||
return; | ||
} | ||
const eventConfig = { | ||
type: CAP_EVENT_TYPE, | ||
subType: serviceName, | ||
load: config.load ?? DEFAULT_LOAD, | ||
impl: "./outbox/EventQueueGenericOutboxHandler", | ||
selectMaxChunkSize: config.chunkSize, | ||
parallelEventProcessing: config.parallelEventProcessing ?? (config.parallel && CAP_PARALLEL_DEFAULT), | ||
retryAttempts: config.maxAttempts, | ||
transactionMode: config.transactionMode, | ||
processAfterCommit: config.processAfterCommit, | ||
eventOutdatedCheck: config.eventOutdatedCheck, | ||
checkForNextChunk: config.checkForNextChunk, | ||
deleteFinishedEventsAfterDays: config.deleteFinishedEventsAfterDays, | ||
internalEvent: true, | ||
}; | ||
this.#config.events.push(eventConfig); | ||
this.#eventMap[this.generateKey(CAP_EVENT_TYPE, serviceName)] = eventConfig; | ||
} | ||
#unblockPeriodicEventLocalState(key, tenant) { | ||
@@ -222,3 +254,3 @@ const map = this.#blockedPeriodicEvents[key]; | ||
this.validateAdHocEvents(result, event); | ||
result[[event.type, event.subType].join("##")] = event; | ||
result[this.generateKey(event.type, event.subType)] = event; | ||
return result; | ||
@@ -231,3 +263,3 @@ }, {}); | ||
this.validatePeriodicConfig(result, event); | ||
result[[event.type, event.subType].join("##")] = event; | ||
result[this.generateKey(event.type, event.subType)] = event; | ||
return result; | ||
@@ -267,2 +299,10 @@ }, this.#eventMap); | ||
removeEvent(type, subType) { | ||
const index = this.#config.events.findIndex((event) => event.type === "CAP_OUTBOX"); | ||
if (index >= 0) { | ||
this.#config.events.splice(index, 1); | ||
} | ||
delete this.#eventMap[this.generateKey(type, subType)]; | ||
} | ||
get fileContent() { | ||
@@ -416,2 +456,10 @@ return this.#config; | ||
set useAsCAPOutbox(value) { | ||
this.#useAsCAPOutbox = value; | ||
} | ||
get useAsCAPOutbox() { | ||
return this.#useAsCAPOutbox; | ||
} | ||
get isMultiTenancy() { | ||
@@ -418,0 +466,0 @@ return !!cds.requires.multitenancy; |
@@ -8,3 +8,3 @@ "use strict"; | ||
const COMPONENT_NAME = "eventQueue/dbHandler"; | ||
const COMPONENT_NAME = "/eventQueue/dbHandler"; | ||
@@ -11,0 +11,0 @@ const registerEventQueueDbHandler = (dbService) => { |
@@ -15,3 +15,3 @@ "use strict"; | ||
const IMPLEMENT_ERROR_MESSAGE = "needs to be reimplemented"; | ||
const COMPONENT_NAME = "eventQueue/EventQueueProcessorBase"; | ||
const COMPONENT_NAME = "/eventQueue/EventQueueProcessorBase"; | ||
@@ -873,3 +873,3 @@ const DEFAULT_RETRY_ATTEMPTS = 3; | ||
async handleDistributedLock() { | ||
async acquireDistributedLock() { | ||
if (this.concurrentEventProcessing) { | ||
@@ -876,0 +876,0 @@ return true; |
@@ -17,2 +17,3 @@ "use strict"; | ||
const { closeMainClient } = require("./shared/redis"); | ||
const eventQueueAsOutbox = require("./outbox/eventQueueAsOutbox"); | ||
@@ -39,2 +40,3 @@ const readFileAsync = promisify(fs.readFile); | ||
["thresholdLoggingEventProcessing", 50], | ||
["useAsCAPOutbox", false], | ||
]; | ||
@@ -54,2 +56,3 @@ | ||
thresholdLoggingEventProcessing, | ||
useAsCAPOutbox, | ||
} = {}) => { | ||
@@ -76,3 +79,4 @@ // TODO: initialize check: | ||
updatePeriodicEvents, | ||
thresholdLoggingEventProcessing | ||
thresholdLoggingEventProcessing, | ||
useAsCAPOutbox | ||
); | ||
@@ -91,2 +95,3 @@ | ||
monkeyPatchCAPOutbox(); | ||
registerEventProcessors(); | ||
@@ -141,2 +146,13 @@ registerCdsShutdown(); | ||
const monkeyPatchCAPOutbox = () => { | ||
if (config.useAsCAPOutbox) { | ||
Object.defineProperty(cds, "outboxed", { | ||
get: () => eventQueueAsOutbox.outboxed, | ||
}); | ||
Object.defineProperty(cds, "unboxed", { | ||
get: () => eventQueueAsOutbox.unboxed, | ||
}); | ||
} | ||
}; | ||
const csnCheck = async () => { | ||
@@ -143,0 +159,0 @@ const eventCsn = cds.model.definitions[config.tableNameEventQueue]; |
@@ -9,3 +9,3 @@ "use strict"; | ||
const COMPONENT_NAME = "eventQueue/periodicEvents"; | ||
const COMPONENT_NAME = "/eventQueue/periodicEvents"; | ||
const CHUNK_SIZE_INSERT_PERIODIC_EVENTS = 4; | ||
@@ -12,0 +12,0 @@ |
@@ -13,3 +13,3 @@ "use strict"; | ||
const COMPONENT_NAME = "eventQueue/processEventQueue"; | ||
const COMPONENT_NAME = "/eventQueue/processEventQueue"; | ||
const MAX_EXECUTION_TIME = 5 * 60 * 1000; | ||
@@ -33,3 +33,3 @@ | ||
baseInstance = new EventTypeClass(context, eventType, eventSubType, eventConfig); | ||
const continueProcessing = await baseInstance.handleDistributedLock(); | ||
const continueProcessing = await baseInstance.acquireDistributedLock(); | ||
if (!continueProcessing) { | ||
@@ -36,0 +36,0 @@ return; |
@@ -8,7 +8,7 @@ "use strict"; | ||
const config = require("./config"); | ||
const { runEventCombinationForTenant } = require("./runner"); | ||
const runner = require("./runner"); | ||
const { getSubdomainForTenantId } = require("./shared/cdsHelper"); | ||
const EVENT_MESSAGE_CHANNEL = "EVENT_QUEUE_MESSAGE_CHANNEL"; | ||
const COMPONENT_NAME = "eventQueue/redisPubSub"; | ||
const COMPONENT_NAME = "/eventQueue/redisPubSub"; | ||
@@ -21,6 +21,6 @@ let subscriberClientPromise; | ||
} | ||
redis.subscribeRedisChannel(EVENT_MESSAGE_CHANNEL, messageHandlerProcessEvents); | ||
redis.subscribeRedisChannel(EVENT_MESSAGE_CHANNEL, _messageHandlerProcessEvents); | ||
}; | ||
const messageHandlerProcessEvents = async (messageData) => { | ||
const _messageHandlerProcessEvents = async (messageData) => { | ||
const logger = cds.log(COMPONENT_NAME); | ||
@@ -48,4 +48,26 @@ try { | ||
}; | ||
if (!config.getEventConfig(type, subType)) { | ||
if (config.isCapOutboxEvent(type)) { | ||
try { | ||
const service = await cds.connect.to(subType); | ||
cds.outboxed(service); | ||
} catch (err) { | ||
logger.error("could not connect to outboxed service", err, { | ||
type, | ||
subType, | ||
}); | ||
return; | ||
} | ||
} else { | ||
logger.error("cannot find configuration for published event. Event won't be processed", { | ||
type, | ||
subType, | ||
}); | ||
return; | ||
} | ||
} | ||
return await cds.tx(tenantContext, async ({ context }) => { | ||
return await runEventCombinationForTenant(context, type, subType); | ||
return await runner.runEventCombinationForTenant(context, type, subType); | ||
}); | ||
@@ -82,3 +104,3 @@ } catch (err) { | ||
return await cds.tx(context, async ({ context }) => { | ||
return await runEventCombinationForTenant(context, type, subType); | ||
return await runner.runEventCombinationForTenant(context, type, subType); | ||
}); | ||
@@ -129,2 +151,5 @@ } | ||
closeSubscribeClient, | ||
__: { | ||
_messageHandlerProcessEvents, | ||
}, | ||
}; |
@@ -15,3 +15,3 @@ "use strict"; | ||
const COMPONENT_NAME = "eventQueue/runner"; | ||
const COMPONENT_NAME = "/eventQueue/runner"; | ||
const EVENT_QUEUE_RUN_ID = "EVENT_QUEUE_RUN_ID"; | ||
@@ -325,3 +325,3 @@ const EVENT_QUEUE_RUN_TS = "EVENT_QUEUE_RUN_TS"; | ||
runEventCombinationForTenant, | ||
_: { | ||
__: { | ||
_singleTenantDb, | ||
@@ -328,0 +328,0 @@ _multiTenancyRedis, |
@@ -11,3 +11,3 @@ "use strict"; | ||
const VERROR_CLUSTER_NAME = "ExecuteInNewTransactionError"; | ||
const COMPONENT_NAME = "eventQueue/cdsHelper"; | ||
const COMPONENT_NAME = "/eventQueue/cdsHelper"; | ||
@@ -14,0 +14,0 @@ /** |
@@ -8,3 +8,3 @@ "use strict"; | ||
const COMPONENT_NAME = "eventQueue/shared/eventScheduler"; | ||
const COMPONENT_NAME = "/eventQueue/shared/eventScheduler"; | ||
@@ -11,0 +11,0 @@ let instance; |
@@ -8,3 +8,3 @@ "use strict"; | ||
const COMPONENT_NAME = "eventQueue/shared/redis"; | ||
const COMPONENT_NAME = "/eventQueue/shared/redis"; | ||
@@ -11,0 +11,0 @@ let mainClientPromise; |
@@ -8,3 +8,3 @@ "use strict"; | ||
const COMPONENT_NAME = "eventQueue/WorkerQueue"; | ||
const COMPONENT_NAME = "/eventQueue/WorkerQueue"; | ||
const NANO_TO_MS = 1e6; | ||
@@ -11,0 +11,0 @@ const THRESHOLD = { |
150105
32
3629