@cap-js-community/event-queue
Advanced tools
Comparing version 1.8.0 to 1.8.1
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.8.0", | ||
"version": "1.8.1", | ||
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -53,6 +53,6 @@ "main": "src/index.js", | ||
"devDependencies": { | ||
"@cap-js/hana": "^1.3.0", | ||
"@cap-js/sqlite": "^1.7.3", | ||
"@sap/cds": "^8.4.2", | ||
"@sap/cds-dk": "^8.4.2", | ||
"@cap-js/hana": "^1.5.2", | ||
"@cap-js/sqlite": "^1.7.8", | ||
"@sap/cds": "^8.6.0", | ||
"@sap/cds-dk": "^8.6.1", | ||
"eslint": "^8.57.0", | ||
@@ -62,3 +62,3 @@ "eslint-config-prettier": "^9.1.0", | ||
"eslint-plugin-node": "^11.1.0", | ||
"express": "^4.21.0", | ||
"express": "^4.21.2", | ||
"hdb": "^0.19.10", | ||
@@ -80,3 +80,3 @@ "jest": "^29.7.0", | ||
"[test]": { | ||
"isRunnerDeactivated": true, | ||
"isEventQueueActive": true, | ||
"registerAsEventProcessor": false, | ||
@@ -83,0 +83,0 @@ "updatePeriodicEvents": false, |
@@ -448,2 +448,7 @@ "use strict"; | ||
} | ||
if (event.multiInstanceProcessing) { | ||
throw EventQueueError.multiInstanceProcessingNotAllowed(event.type, event.subType); | ||
} | ||
this.#basicEventValidation(event); | ||
@@ -457,2 +462,7 @@ } | ||
} | ||
if (this.isMultiTenancy && event.multiInstanceProcessing) { | ||
throw EventQueueError.multiInstanceProcessingNotAllowed(event.type, event.subType); | ||
} | ||
this.#basicEventValidation(event); | ||
@@ -459,0 +469,0 @@ } |
@@ -27,2 +27,3 @@ "use strict"; | ||
APP_INSTANCES_FORMAT: "APP_INSTANCES_FORMAT", | ||
MULTI_INSTANCE_PROCESSING_NOT_ALLOWED: "MULTI_INSTANCE_PROCESSING_NOT_ALLOWED", | ||
}; | ||
@@ -95,2 +96,5 @@ | ||
}, | ||
[ERROR_CODES.MULTI_INSTANCE_PROCESSING_NOT_ALLOWED]: { | ||
message: "The config multiInstanceProcessing is currently only allowed for ad-hoc events and single-tenant-apps.", | ||
}, | ||
}; | ||
@@ -331,2 +335,13 @@ | ||
static multiInstanceProcessingNotAllowed(type, subType) { | ||
const { message } = ERROR_CODES_META[ERROR_CODES.MULTI_INSTANCE_PROCESSING_NOT_ALLOWED]; | ||
return new EventQueueError( | ||
{ | ||
name: ERROR_CODES.MULTI_INSTANCE_PROCESSING_NOT_ALLOWED, | ||
info: { type, subType }, | ||
}, | ||
message | ||
); | ||
} | ||
static isRedisConnectionFailure(err) { | ||
@@ -333,0 +348,0 @@ return err instanceof VError && err.name === ERROR_CODES.REDIS_CREATE_CLIENT; |
@@ -6,3 +6,3 @@ "use strict"; | ||
const { executeInNewTransaction, TriggerRollback } = require("./shared/cdsHelper"); | ||
const { executeInNewTransaction } = require("./shared/cdsHelper"); | ||
const { EventProcessingStatus, TransactionMode } = require("./constants"); | ||
@@ -65,4 +65,3 @@ const distributedLock = require("./shared/distributedLock"); | ||
this.#retryFailedAfter = this.#eventConfig.retryFailedAfter ?? DEFAULT_RETRY_AFTER; | ||
// NOTE: keep the feature, this might be needed again | ||
this.__concurrentEventProcessing = false; | ||
this.__concurrentEventProcessing = this.#eventConfig.multiInstanceProcessing; | ||
this.__startTime = this.#eventConfig.startTime ?? new Date(); | ||
@@ -791,3 +790,3 @@ this.__retryAttempts = this.#isPeriodic ? 1 : this.#eventConfig.retryAttempts ?? DEFAULT_RETRY_ATTEMPTS; | ||
); | ||
throw new TriggerRollback(); | ||
await tx.rollback(); | ||
} | ||
@@ -924,3 +923,4 @@ } | ||
this.context, | ||
[this.#eventType, this.#eventSubType].join("##") | ||
[this.#eventType, this.#eventSubType].join("##"), | ||
{ keepTrackOfLock: true } | ||
); | ||
@@ -927,0 +927,0 @@ if (!lockAcquired) { |
@@ -14,2 +14,7 @@ import * as cds from "@sap/cds"; | ||
export declare const TenantIdCheckTypes: { | ||
getAllTenantIds: "getAllTenantIds"; | ||
getTokenInfo: "getTokenInfo"; | ||
}; | ||
export declare const TransactionMode: { | ||
@@ -16,0 +21,0 @@ isolated: "isolated"; |
@@ -11,3 +11,3 @@ "use strict"; | ||
const { executeInNewTransaction, TriggerRollback } = require("./shared/cdsHelper"); | ||
const { executeInNewTransaction } = require("./shared/cdsHelper"); | ||
const trace = require("./shared/openTelemetry"); | ||
@@ -69,3 +69,3 @@ | ||
} | ||
throw new TriggerRollback(); | ||
await tx.rollback(); | ||
}); | ||
@@ -94,3 +94,3 @@ }); | ||
) { | ||
throw new TriggerRollback(); | ||
await tx.rollback(); | ||
} | ||
@@ -173,3 +173,4 @@ }); | ||
eventTypeInstance.handleErrorDuringPeriodicEventProcessing(err, queueEntry); | ||
throw new TriggerRollback(); | ||
await tx.rollback(); | ||
return; | ||
} finally { | ||
@@ -182,3 +183,3 @@ eventTypeInstance.endPerformanceTracerPeriodicEvents(); | ||
) { | ||
throw new TriggerRollback(); | ||
await tx.rollback(); | ||
} | ||
@@ -230,3 +231,3 @@ }); | ||
) { | ||
throw new TriggerRollback(); | ||
await tx.rollback(); | ||
} | ||
@@ -233,0 +234,0 @@ } |
@@ -8,3 +8,3 @@ "use strict"; | ||
const redis = require("../shared/redis"); | ||
const { checkLockExistsAndReturnValue } = require("../shared/distributedLock"); | ||
const distributedLock = require("../shared/distributedLock"); | ||
const config = require("../config"); | ||
@@ -71,3 +71,5 @@ const common = require("../shared/common"); | ||
for (let i = 0; i < TRIES_FOR_PUBLISH_PERIODIC_EVENT; i++) { | ||
const result = await checkLockExistsAndReturnValue(context, [type, subType].join("##")); | ||
const result = eventConfig.multiInstanceProcessing | ||
? false | ||
: await distributedLock.checkLockExistsAndReturnValue(context, [type, subType].join("##")); | ||
if (result) { | ||
@@ -74,0 +76,0 @@ logger.debug("skip publish redis event as no lock is available", { |
@@ -284,5 +284,7 @@ "use strict"; | ||
const lockId = `${label}`; | ||
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
const couldAcquireLock = eventConfig.multiInstanceProcessing | ||
? true | ||
: await distributedLock.acquireLock(context, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
@@ -289,0 +291,0 @@ return; |
@@ -28,3 +28,3 @@ "use strict"; | ||
const _exec = async () => { | ||
if (lockId) { | ||
if (!eventConfig.multiInstanceProcessing && lockId) { | ||
const lockAvailable = await distributedLock.acquireLock(context, lockId); | ||
@@ -31,0 +31,0 @@ if (!lockAvailable) { |
@@ -65,40 +65,32 @@ "use strict"; | ||
} | ||
await fn(contextTx, ...parameters); | ||
const txRollback = contextTx.rollback; | ||
contextTx.rollback = async () => { | ||
// tx should not be managed here as we did not open the tx | ||
// change rollback to no opt - closing tx would cause follow-up usage to fail. | ||
// the process that opened the tx needs to manage it | ||
}; | ||
await fn(contextTx, ...parameters).finally(() => (contextTx.rollback = txRollback)); | ||
} | ||
} | ||
} catch (err) { | ||
if (!(err instanceof TriggerRollback)) { | ||
if (err instanceof VError) { | ||
Object.assign(err.jse_info, { | ||
newTx: info, | ||
}); | ||
throw err; | ||
} else { | ||
throw new VError( | ||
{ | ||
name: VERROR_CLUSTER_NAME, | ||
cause: err, | ||
info, | ||
}, | ||
"Execution in new transaction failed" | ||
); | ||
} | ||
if (err instanceof VError) { | ||
Object.assign(err.jse_info, { | ||
newTx: info, | ||
}); | ||
throw err; | ||
} else { | ||
throw new VError( | ||
{ | ||
name: VERROR_CLUSTER_NAME, | ||
cause: err, | ||
info, | ||
}, | ||
"Execution in new transaction failed" | ||
); | ||
} | ||
return false; | ||
} finally { | ||
logger.debug("Execution in new transaction finished", info); | ||
} | ||
return true; | ||
} | ||
/** | ||
* Error class to be used to force rollback in executionInNewTransaction | ||
* Error will not be logged, as it assumes that error handling has been done before... | ||
*/ | ||
class TriggerRollback extends VError { | ||
constructor() { | ||
super("Rollback triggered"); | ||
} | ||
} | ||
const getAllTenantIds = async () => { | ||
@@ -126,4 +118,3 @@ if (!config.isMultiTenancy) { | ||
executeInNewTransaction, | ||
TriggerRollback, | ||
getAllTenantIds, | ||
}; |
@@ -8,11 +8,14 @@ "use strict"; | ||
const KEY_PREFIX = "EVENT_QUEUE"; | ||
const existingLocks = {}; | ||
const REDIS_COMMAND_OK = "OK"; | ||
const COMPONENT_NAME = "/eventQueue/distributedLock"; | ||
const acquireLock = async (context, key, { tenantScoped = true, expiryTime = config.globalTxTimeout } = {}) => { | ||
const acquireLock = async ( | ||
context, | ||
key, | ||
{ tenantScoped = true, expiryTime = config.globalTxTimeout, keepTrackOfLock = false } = {} | ||
) => { | ||
const fullKey = _generateKey(context, tenantScoped, key); | ||
if (config.redisEnabled) { | ||
return await _acquireLockRedis(context, fullKey, expiryTime); | ||
return await _acquireLockRedis(context, fullKey, expiryTime, { keepTrackOfLock }); | ||
} else { | ||
@@ -27,3 +30,3 @@ return await _acquireLockDB(context, fullKey, expiryTime); | ||
value, | ||
{ tenantScoped = true, expiryTime = config.globalTxTimeout, overrideValue = false } = {} | ||
{ tenantScoped = true, expiryTime = config.globalTxTimeout, overrideValue = false, keepTrackOfLock = false } = {} | ||
) => { | ||
@@ -35,2 +38,3 @@ const fullKey = _generateKey(context, tenantScoped, key); | ||
overrideValue, | ||
keepTrackOfLock, | ||
}); | ||
@@ -63,3 +67,8 @@ } else { | ||
const _acquireLockRedis = async (context, fullKey, expiryTime, { value = "true", overrideValue = false } = {}) => { | ||
const _acquireLockRedis = async ( | ||
context, | ||
fullKey, | ||
expiryTime, | ||
{ value = "true", overrideValue = false, keepTrackOfLock } = {} | ||
) => { | ||
const client = await redis.createMainClientAndConnect(config.redisOptions); | ||
@@ -71,3 +80,3 @@ const result = await client.set(fullKey, value, { | ||
const isOk = result === REDIS_COMMAND_OK; | ||
if (isOk) { | ||
if (isOk && keepTrackOfLock) { | ||
existingLocks[fullKey] = 1; | ||
@@ -154,3 +163,17 @@ } | ||
const shutdownHandler = async () => { | ||
await Promise.allSettled(Object.keys(existingLocks).map((key) => _releaseLockRedis(null, key))); | ||
const logger = cds.log(COMPONENT_NAME); | ||
logger.info("received shutdown event, trying to release all locks", { | ||
numberOfLocks: Object.keys(existingLocks).length, | ||
}); | ||
const result = await Promise.allSettled( | ||
Object.keys(existingLocks).map(async (key) => { | ||
await _releaseLockRedis(null, key); | ||
logger.info("lock released", { key }); | ||
}) | ||
); | ||
const errors = result.filter((promise) => promise.reason); | ||
logger.info("releasing locks finished ", { | ||
numberOfErrors: errors.length, | ||
...(errors.length && { firstError: errors[0] }), | ||
}); | ||
}; | ||
@@ -157,0 +180,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
205222
5120