@cap-js-community/event-queue
Advanced tools
Comparing version 0.1.49 to 0.1.50
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "0.1.49", | ||
"version": "0.1.50", | ||
"description": "event queue for cds", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -24,3 +24,3 @@ # @cap-js-community/event-queue | ||
const cds = require("@sap/cds"); | ||
const eventQueue = require("@sap/cds-event-queue"); | ||
const eventQueue = require("@cap-js-community/event-queue"); | ||
@@ -90,3 +90,3 @@ cds.on("bootstrap", () => { | ||
"cds-event-queue": { | ||
"model": "@sap/cds-event-queue" | ||
"model": "@cap-js-community/event-queue" | ||
} | ||
@@ -93,0 +93,0 @@ } |
@@ -11,2 +11,7 @@ "use strict"; | ||
}, | ||
TransactionMode: { | ||
isolated: "isolated", | ||
alwaysCommit: "alwaysCommit", | ||
alwaysRollback: "alwaysRollback", | ||
}, | ||
}; |
@@ -6,3 +6,3 @@ "use strict"; | ||
const { executeInNewTransaction } = require("./shared/cdsHelper"); | ||
const { EventProcessingStatus } = require("./constants"); | ||
const { EventProcessingStatus, TransactionMode } = require("./constants"); | ||
const distributedLock = require("./shared/distributedLock"); | ||
@@ -52,3 +52,4 @@ const EventQueueError = require("./EventQueueError"); | ||
this.__outdatedCheckEnabled = this.__config.eventOutdatedCheck ?? true; | ||
this.__commitOnEventLevel = this.__config.commitOnEventLevel ?? true; | ||
this.__transactionMode = | ||
this.__config.transactionMode ?? TransactionMode.isolated; | ||
this.__eventsWithExceededTries = []; | ||
@@ -180,3 +181,3 @@ this.__emptyChunkSelected = false; | ||
}); | ||
this._determineAndAddEventStatusToMap( | ||
this.#determineAndAddEventStatusToMap( | ||
queueEntry.ID, | ||
@@ -197,3 +198,3 @@ EventProcessingStatus.Done | ||
([key, { queueEntry, payload }]) => { | ||
this._addEntryToProcessingMap(key, queueEntry, payload); | ||
this.addEntryToProcessingMap(key, queueEntry, payload); | ||
} | ||
@@ -203,3 +204,10 @@ ); | ||
_addEntryToProcessingMap(key, queueEntry, payload) { | ||
/** | ||
* This function allows to add entries to the process map. This function is needed if the function clusterQueueEntries | ||
* is redefined. For each entry in the processing map the processEvent function will be called once. | ||
* @param {String} key key for event | ||
* @param {Object} queueEntry queueEntry which should be clustered with this key | ||
* @param {Object} payload payload which should be clustered with this key | ||
*/ | ||
addEntryToProcessingMap(key, queueEntry, payload) { | ||
this.logger.debug("add entry to processing map", { | ||
@@ -220,4 +228,4 @@ key, | ||
* This function sets the status of multiple events to a given status. If the structure of queueEntryProcessingStatusTuple | ||
* is not as expected all events will be set to error. The function respects the config commitOnEventLevel. If | ||
* commitOnEventLevel is true the status will be written to a dedicated map and returned afterwards to handle concurrent | ||
* is not as expected all events will be set to error. The function respects the config transactionMode. If | ||
* transactionMode is isolated the status will be written to a dedicated map and returned afterwards to handle concurrent | ||
* event processing. | ||
@@ -236,10 +244,10 @@ * @param {Array} queueEntries which has been selected from event queue table and been modified by modifyQueueEntry | ||
}); | ||
const statusMap = this.__commitOnEventLevel ? {} : this.__statusMap; | ||
const statusMap = this.commitOnEventLevel ? {} : this.__statusMap; | ||
try { | ||
queueEntryProcessingStatusTuple.forEach(([id, processingStatus]) => | ||
this._determineAndAddEventStatusToMap(id, processingStatus, statusMap) | ||
this.#determineAndAddEventStatusToMap(id, processingStatus, statusMap) | ||
); | ||
} catch (error) { | ||
queueEntries.forEach((queueEntry) => | ||
this._determineAndAddEventStatusToMap( | ||
this.#determineAndAddEventStatusToMap( | ||
queueEntry.ID, | ||
@@ -275,3 +283,3 @@ EventProcessingStatus.Error, | ||
_determineAndAddEventStatusToMap( | ||
#determineAndAddEventStatusToMap( | ||
id, | ||
@@ -309,3 +317,3 @@ processingStatus, | ||
queueEntries.forEach((queueEntry) => | ||
this._determineAndAddEventStatusToMap( | ||
this.#determineAndAddEventStatusToMap( | ||
queueEntry.ID, | ||
@@ -336,7 +344,7 @@ EventProcessingStatus.Error | ||
}); | ||
this._ensureOnlySelectedQueueEntries(statusMap); | ||
this.#ensureOnlySelectedQueueEntries(statusMap); | ||
if (!skipChecks) { | ||
this._ensureEveryQueueEntryHasStatus(); | ||
this.#ensureEveryQueueEntryHasStatus(); | ||
} | ||
this._ensureEveryStatusIsAllowed(statusMap); | ||
this.#ensureEveryStatusIsAllowed(statusMap); | ||
@@ -421,3 +429,3 @@ const { success, failed, exceeded, invalidAttempts } = Object.entries( | ||
_ensureEveryQueueEntryHasStatus() { | ||
#ensureEveryQueueEntryHasStatus() { | ||
this.__queueEntries.forEach((queueEntry) => { | ||
@@ -438,3 +446,3 @@ if ( | ||
); | ||
this._determineAndAddEventStatusToMap( | ||
this.#determineAndAddEventStatusToMap( | ||
queueEntry.ID, | ||
@@ -446,3 +454,3 @@ EventProcessingStatus.Error | ||
_ensureEveryStatusIsAllowed(statusMap) { | ||
#ensureEveryStatusIsAllowed(statusMap) { | ||
Object.entries(statusMap).forEach(([queueEntryId, status]) => { | ||
@@ -473,3 +481,3 @@ if ( | ||
_ensureOnlySelectedQueueEntries(statusMap) { | ||
#ensureOnlySelectedQueueEntries(statusMap) { | ||
Object.keys(statusMap).forEach((queueEntryId) => { | ||
@@ -501,3 +509,3 @@ if (this.__queueEntriesMap[queueEntryId]) { | ||
this.__queueEntries.forEach((queueEntry) => { | ||
this._determineAndAddEventStatusToMap( | ||
this.#determineAndAddEventStatusToMap( | ||
queueEntry.ID, | ||
@@ -518,3 +526,3 @@ EventProcessingStatus.Error | ||
); | ||
this._determineAndAddEventStatusToMap( | ||
this.#determineAndAddEventStatusToMap( | ||
queueEntry.ID, | ||
@@ -593,3 +601,3 @@ EventProcessingStatus.Error | ||
const { exceededTries, openEvents } = | ||
this._filterExceededEvents(entries); | ||
this.#filterExceededEvents(entries); | ||
if (exceededTries.length) { | ||
@@ -640,3 +648,3 @@ this.__eventsWithExceededTries = exceededTries; | ||
_filterExceededEvents(events) { | ||
#filterExceededEvents(events) { | ||
return events.reduce( | ||
@@ -840,9 +848,2 @@ (result, event) => { | ||
get shouldTriggerRollback() { | ||
return ( | ||
this.statusMapContainsError(this.__statusMap) || | ||
this.statusMapContainsError(this.__commitedStatusMap) | ||
); | ||
} | ||
get logger() { | ||
@@ -897,5 +898,9 @@ return this.__logger ?? this.__baseLogger; | ||
get commitOnEventLevel() { | ||
return this.__commitOnEventLevel; | ||
return this.__transactionMode === TransactionMode.isolated; | ||
} | ||
get transactionMode() { | ||
return this.__transactionMode; | ||
} | ||
get eventType() { | ||
@@ -902,0 +907,0 @@ return this.__eventType; |
@@ -8,2 +8,3 @@ "use strict"; | ||
const { getConfigInstance } = require("./config"); | ||
const { TransactionMode } = require("./constants"); | ||
const { limiter, Funnel } = require("./shared/common"); | ||
@@ -140,3 +141,4 @@ | ||
if ( | ||
eventTypeInstance.shouldTriggerRollback || | ||
eventTypeInstance.transactionMode !== | ||
TransactionMode.alwaysCommit || | ||
Object.entries(eventTypeInstance.eventProcessingMap).some( | ||
@@ -143,0 +145,0 @@ ([key]) => eventTypeInstance.shouldRollbackTransaction(key) |
@@ -18,3 +18,3 @@ "use strict"; | ||
} | ||
await tx.run( | ||
return await tx.run( | ||
INSERT.into(configInstance.tableNameEventQueue).entries(eventsForProcessing) | ||
@@ -21,0 +21,0 @@ ); |
111816
2664