Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/event-queue

Package Overview
Dependencies
Maintainers
0
Versions
65
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/event-queue - npm Package Compare versions

Comparing version 1.8.0 to 1.8.1

14

package.json
{
"name": "@cap-js-community/event-queue",
"version": "1.8.0",
"version": "1.8.1",
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",

@@ -53,6 +53,6 @@ "main": "src/index.js",

"devDependencies": {
"@cap-js/hana": "^1.3.0",
"@cap-js/sqlite": "^1.7.3",
"@sap/cds": "^8.4.2",
"@sap/cds-dk": "^8.4.2",
"@cap-js/hana": "^1.5.2",
"@cap-js/sqlite": "^1.7.8",
"@sap/cds": "^8.6.0",
"@sap/cds-dk": "^8.6.1",
"eslint": "^8.57.0",

@@ -62,3 +62,3 @@ "eslint-config-prettier": "^9.1.0",

"eslint-plugin-node": "^11.1.0",
"express": "^4.21.0",
"express": "^4.21.2",
"hdb": "^0.19.10",

@@ -80,3 +80,3 @@ "jest": "^29.7.0",

"[test]": {
"isRunnerDeactivated": true,
"isEventQueueActive": true,
"registerAsEventProcessor": false,

@@ -83,0 +83,0 @@ "updatePeriodicEvents": false,

@@ -448,2 +448,7 @@ "use strict";

}
if (event.multiInstanceProcessing) {
throw EventQueueError.multiInstanceProcessingNotAllowed(event.type, event.subType);
}
this.#basicEventValidation(event);

@@ -457,2 +462,7 @@ }

}
if (this.isMultiTenancy && event.multiInstanceProcessing) {
throw EventQueueError.multiInstanceProcessingNotAllowed(event.type, event.subType);
}
this.#basicEventValidation(event);

@@ -459,0 +469,0 @@ }

@@ -27,2 +27,3 @@ "use strict";

APP_INSTANCES_FORMAT: "APP_INSTANCES_FORMAT",
MULTI_INSTANCE_PROCESSING_NOT_ALLOWED: "MULTI_INSTANCE_PROCESSING_NOT_ALLOWED",
};

@@ -95,2 +96,5 @@

},
[ERROR_CODES.MULTI_INSTANCE_PROCESSING_NOT_ALLOWED]: {
message: "The config multiInstanceProcessing is currently only allowed for ad-hoc events and single-tenant-apps.",
},
};

@@ -331,2 +335,13 @@

static multiInstanceProcessingNotAllowed(type, subType) {
const { message } = ERROR_CODES_META[ERROR_CODES.MULTI_INSTANCE_PROCESSING_NOT_ALLOWED];
return new EventQueueError(
{
name: ERROR_CODES.MULTI_INSTANCE_PROCESSING_NOT_ALLOWED,
info: { type, subType },
},
message
);
}
static isRedisConnectionFailure(err) {

@@ -333,0 +348,0 @@ return err instanceof VError && err.name === ERROR_CODES.REDIS_CREATE_CLIENT;

@@ -6,3 +6,3 @@ "use strict";

const { executeInNewTransaction, TriggerRollback } = require("./shared/cdsHelper");
const { executeInNewTransaction } = require("./shared/cdsHelper");
const { EventProcessingStatus, TransactionMode } = require("./constants");

@@ -65,4 +65,3 @@ const distributedLock = require("./shared/distributedLock");

this.#retryFailedAfter = this.#eventConfig.retryFailedAfter ?? DEFAULT_RETRY_AFTER;
// NOTE: keep the feature, this might be needed again
this.__concurrentEventProcessing = false;
this.__concurrentEventProcessing = this.#eventConfig.multiInstanceProcessing;
this.__startTime = this.#eventConfig.startTime ?? new Date();

@@ -791,3 +790,3 @@ this.__retryAttempts = this.#isPeriodic ? 1 : this.#eventConfig.retryAttempts ?? DEFAULT_RETRY_ATTEMPTS;

);
throw new TriggerRollback();
await tx.rollback();
}

@@ -924,3 +923,4 @@ }

this.context,
[this.#eventType, this.#eventSubType].join("##")
[this.#eventType, this.#eventSubType].join("##"),
{ keepTrackOfLock: true }
);

@@ -927,0 +927,0 @@ if (!lockAcquired) {

@@ -14,2 +14,7 @@ import * as cds from "@sap/cds";

export declare const TenantIdCheckTypes: {
getAllTenantIds: "getAllTenantIds";
getTokenInfo: "getTokenInfo";
};
export declare const TransactionMode: {

@@ -16,0 +21,0 @@ isolated: "isolated";

@@ -11,3 +11,3 @@ "use strict";

const { executeInNewTransaction, TriggerRollback } = require("./shared/cdsHelper");
const { executeInNewTransaction } = require("./shared/cdsHelper");
const trace = require("./shared/openTelemetry");

@@ -69,3 +69,3 @@

}
throw new TriggerRollback();
await tx.rollback();
});

@@ -94,3 +94,3 @@ });

) {
throw new TriggerRollback();
await tx.rollback();
}

@@ -173,3 +173,4 @@ });

eventTypeInstance.handleErrorDuringPeriodicEventProcessing(err, queueEntry);
throw new TriggerRollback();
await tx.rollback();
return;
} finally {

@@ -182,3 +183,3 @@ eventTypeInstance.endPerformanceTracerPeriodicEvents();

) {
throw new TriggerRollback();
await tx.rollback();
}

@@ -230,3 +231,3 @@ });

) {
throw new TriggerRollback();
await tx.rollback();
}

@@ -233,0 +234,0 @@ }

@@ -8,3 +8,3 @@ "use strict";

const redis = require("../shared/redis");
const { checkLockExistsAndReturnValue } = require("../shared/distributedLock");
const distributedLock = require("../shared/distributedLock");
const config = require("../config");

@@ -71,3 +71,5 @@ const common = require("../shared/common");

for (let i = 0; i < TRIES_FOR_PUBLISH_PERIODIC_EVENT; i++) {
const result = await checkLockExistsAndReturnValue(context, [type, subType].join("##"));
const result = eventConfig.multiInstanceProcessing
? false
: await distributedLock.checkLockExistsAndReturnValue(context, [type, subType].join("##"));
if (result) {

@@ -74,0 +76,0 @@ logger.debug("skip publish redis event as no lock is available", {

@@ -284,5 +284,7 @@ "use strict";

const lockId = `${label}`;
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
const couldAcquireLock = eventConfig.multiInstanceProcessing
? true
: await distributedLock.acquireLock(context, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {

@@ -289,0 +291,0 @@ return;

@@ -28,3 +28,3 @@ "use strict";

const _exec = async () => {
if (lockId) {
if (!eventConfig.multiInstanceProcessing && lockId) {
const lockAvailable = await distributedLock.acquireLock(context, lockId);

@@ -31,0 +31,0 @@ if (!lockAvailable) {

@@ -65,40 +65,32 @@ "use strict";

}
await fn(contextTx, ...parameters);
const txRollback = contextTx.rollback;
contextTx.rollback = async () => {
// tx should not be managed here as we did not open the tx
// change rollback to no opt - closing tx would cause follow-up usage to fail.
// the process that opened the tx needs to manage it
};
await fn(contextTx, ...parameters).finally(() => (contextTx.rollback = txRollback));
}
}
} catch (err) {
if (!(err instanceof TriggerRollback)) {
if (err instanceof VError) {
Object.assign(err.jse_info, {
newTx: info,
});
throw err;
} else {
throw new VError(
{
name: VERROR_CLUSTER_NAME,
cause: err,
info,
},
"Execution in new transaction failed"
);
}
if (err instanceof VError) {
Object.assign(err.jse_info, {
newTx: info,
});
throw err;
} else {
throw new VError(
{
name: VERROR_CLUSTER_NAME,
cause: err,
info,
},
"Execution in new transaction failed"
);
}
return false;
} finally {
logger.debug("Execution in new transaction finished", info);
}
return true;
}
/**
* Error class to be used to force rollback in executionInNewTransaction
* Error will not be logged, as it assumes that error handling has been done before...
*/
class TriggerRollback extends VError {
constructor() {
super("Rollback triggered");
}
}
const getAllTenantIds = async () => {

@@ -126,4 +118,3 @@ if (!config.isMultiTenancy) {

executeInNewTransaction,
TriggerRollback,
getAllTenantIds,
};

@@ -8,11 +8,14 @@ "use strict";

const KEY_PREFIX = "EVENT_QUEUE";
const existingLocks = {};
const REDIS_COMMAND_OK = "OK";
const COMPONENT_NAME = "/eventQueue/distributedLock";
const acquireLock = async (context, key, { tenantScoped = true, expiryTime = config.globalTxTimeout } = {}) => {
const acquireLock = async (
context,
key,
{ tenantScoped = true, expiryTime = config.globalTxTimeout, keepTrackOfLock = false } = {}
) => {
const fullKey = _generateKey(context, tenantScoped, key);
if (config.redisEnabled) {
return await _acquireLockRedis(context, fullKey, expiryTime);
return await _acquireLockRedis(context, fullKey, expiryTime, { keepTrackOfLock });
} else {

@@ -27,3 +30,3 @@ return await _acquireLockDB(context, fullKey, expiryTime);

value,
{ tenantScoped = true, expiryTime = config.globalTxTimeout, overrideValue = false } = {}
{ tenantScoped = true, expiryTime = config.globalTxTimeout, overrideValue = false, keepTrackOfLock = false } = {}
) => {

@@ -35,2 +38,3 @@ const fullKey = _generateKey(context, tenantScoped, key);

overrideValue,
keepTrackOfLock,
});

@@ -63,3 +67,8 @@ } else {

const _acquireLockRedis = async (context, fullKey, expiryTime, { value = "true", overrideValue = false } = {}) => {
const _acquireLockRedis = async (
context,
fullKey,
expiryTime,
{ value = "true", overrideValue = false, keepTrackOfLock } = {}
) => {
const client = await redis.createMainClientAndConnect(config.redisOptions);

@@ -71,3 +80,3 @@ const result = await client.set(fullKey, value, {

const isOk = result === REDIS_COMMAND_OK;
if (isOk) {
if (isOk && keepTrackOfLock) {
existingLocks[fullKey] = 1;

@@ -154,3 +163,17 @@ }

const shutdownHandler = async () => {
await Promise.allSettled(Object.keys(existingLocks).map((key) => _releaseLockRedis(null, key)));
const logger = cds.log(COMPONENT_NAME);
logger.info("received shutdown event, trying to release all locks", {
numberOfLocks: Object.keys(existingLocks).length,
});
const result = await Promise.allSettled(
Object.keys(existingLocks).map(async (key) => {
await _releaseLockRedis(null, key);
logger.info("lock released", { key });
})
);
const errors = result.filter((promise) => promise.reason);
logger.info("releasing locks finished ", {
numberOfErrors: errors.length,
...(errors.length && { firstError: errors[0] }),
});
};

@@ -157,0 +180,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc