@cap-js-community/event-queue
Advanced tools
Comparing version 1.6.4 to 1.6.5
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.6.4", | ||
"version": "1.6.5", | ||
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -30,3 +30,3 @@ "main": "src/index.js", | ||
"test:deploySchema": "node test-integration/_env/srv/hana/deploy.js", | ||
"test:cleanSchemas": "node test-integration/_env/srv/hana/cleanObsoletSchemas.js", | ||
"test:cleanSchemas": "node test-integration/_env/srv/hana/deleteTestSchema.js ", | ||
"lint": "npm run eslint && npm run prettier", | ||
@@ -33,0 +33,0 @@ "lint:ci": "npm run eslint:ci && npm run prettier:ci", |
@@ -27,2 +27,3 @@ "use strict"; | ||
const SUFFIX_PERIODIC = "_PERIODIC"; | ||
const DEFAULT_RETRY_AFTER = 5 * 60 * 1000; | ||
@@ -41,2 +42,3 @@ class EventQueueProcessorBase { | ||
#lastSuccessfulRunTimestamp; | ||
#retryFailedAfter; | ||
@@ -62,2 +64,3 @@ constructor(context, eventType, eventSubType, config) { | ||
} | ||
this.#retryFailedAfter = this.#eventConfig.retryFailedAfter ?? DEFAULT_RETRY_AFTER; | ||
// NOTE: keep the feature, this might be needed again | ||
@@ -460,2 +463,13 @@ this.__concurrentEventProcessing = false; | ||
} | ||
let startAfter; | ||
if (status === EventProcessingStatus.Error) { | ||
startAfter = new Date(Date.now() + this.#retryFailedAfter); | ||
this.#eventSchedulerInstance.scheduleEvent( | ||
this.__context.tenant, | ||
this.#eventType, | ||
this.#eventSubType, | ||
startAfter | ||
); | ||
} | ||
await tx.run( | ||
@@ -466,2 +480,3 @@ UPDATE.entity(this.#config.tableNameEventQueue) | ||
lastAttemptTimestamp: ts, | ||
...(status === EventProcessingStatus.Error ? { startAfter: startAfter.toISOString() } : {}), | ||
}) | ||
@@ -560,3 +575,4 @@ .where("ID IN", eventIds) | ||
let result = []; | ||
const refDateStartAfter = new Date(Date.now() + this.#config.runInterval * 1.2); | ||
const baseDate = Date.now(); | ||
const refDateStartAfter = new Date(baseDate + this.#config.runInterval * 1.2); | ||
await executeInNewTransaction(this.__baseContext, "eventQueue-getQueueEntriesAndSetToInProgress", async (tx) => { | ||
@@ -583,3 +599,3 @@ const entries = await tx.run( | ||
"AND lastAttemptTimestamp <=", | ||
new Date(new Date().getTime() - this.#config.globalTxTimeout).toISOString(), | ||
new Date(baseDate - this.#config.globalTxTimeout).toISOString(), | ||
") )", | ||
@@ -595,3 +611,3 @@ ] | ||
"AND lastAttemptTimestamp <=", | ||
new Date(new Date().getTime() - this.#config.globalTxTimeout).toISOString(), | ||
new Date(baseDate - this.#config.globalTxTimeout).toISOString(), | ||
") )", | ||
@@ -598,0 +614,0 @@ ]) |
186242
4650