New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/event-queue

Package Overview
Dependencies
Maintainers
0
Versions
67
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/event-queue - npm Package Compare versions

Comparing version 1.8.3 to 1.8.4

2

package.json
{
"name": "@cap-js-community/event-queue",
"version": "1.8.3",
"version": "1.8.4",
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",

@@ -5,0 +5,0 @@ "main": "src/index.js",

@@ -61,3 +61,2 @@ "use strict";

#processEventsAfterPublish;
#skipCsnCheck;
#registerAsEventProcessor;

@@ -82,3 +81,4 @@ #disableRedis;

#crashOnRedisUnavailable;
#tenantIdFilterCb;
#tenantIdFilterTokenInfoCb;
#tenantIdFilterEventProcessingCb;
static #instance;

@@ -99,3 +99,2 @@ constructor() {

this.#processEventsAfterPublish = null;
this.#skipCsnCheck = null;
this.#disableRedis = null;

@@ -299,3 +298,6 @@ this.#env = getEnvInstance();

if (this.#eventMap[this.generateKey(CAP_EVENT_TYPE, serviceName)]) {
return;
const index = this.#config.events.findIndex(
(event) => event.type === CAP_EVENT_TYPE && event.subType === serviceName
);
this.#config.events.splice(index, 1);
}

@@ -537,10 +539,18 @@

get tenantIdFilterCb() {
return this.#tenantIdFilterCb;
get tenantIdFilterTokenInfo() {
return this.#tenantIdFilterTokenInfoCb;
}
set tenantIdFilterCb(value) {
this.#tenantIdFilterCb = value;
set tenantIdFilterTokenInfo(value) {
this.#tenantIdFilterTokenInfoCb = value;
}
get tenantIdFilterEventProcessing() {
return this.#tenantIdFilterEventProcessingCb;
}
set tenantIdFilterEventProcessing(value) {
this.#tenantIdFilterEventProcessingCb = value;
}
set globalTxTimeout(value) {

@@ -625,10 +635,2 @@ this.#globalTxTimeout = value;

set skipCsnCheck(value) {
this.#skipCsnCheck = value;
}
get skipCsnCheck() {
return this.#skipCsnCheck;
}
set disableRedis(value) {

@@ -635,0 +637,0 @@ this.#disableRedis = value;

@@ -24,5 +24,5 @@ "use strict";

TenantIdCheckTypes: {
getAllTenantIds: "getAllTenantIds",
eventProcessing: "eventProcessing",
getTokenInfo: "getTokenInfo",
},
};

@@ -549,2 +549,12 @@ "use strict";

handleErrorTx(error) {
this.logger.error("Error in commit|rollback transaction, check handlers and constraints!", error, {
eventType: this.#eventType,
eventSubType: this.#eventSubType,
});
this.__queueEntries.forEach((queueEntry) => {
this.#determineAndAddEventStatusToMap(queueEntry.ID, EventProcessingStatus.Error);
});
}
handleInvalidPayloadReturned(queueEntry) {

@@ -551,0 +561,0 @@ this.logger.error(

@@ -15,3 +15,3 @@ import * as cds from "@sap/cds";

export declare const TenantIdCheckTypes: {
getAllTenantIds: "getAllTenantIds";
eventProcessing: "eventProcessing";
getTokenInfo: "getTokenInfo";

@@ -186,50 +186,59 @@ };

getEventConfig(type: string, subType: string): any;
isCapOutboxEvent(type: string): boolean;
hasEventAfterCommitFlag(type: string, subType: string): boolean;
_checkRedisIsBound(): boolean;
checkRedisEnabled(): boolean;
publishEventBlockList(): boolean;
crashOnRedisUnavailable(): boolean;
getEventConfig(type: any, subType: any): any;
isCapOutboxEvent(type: any): boolean;
hasEventAfterCommitFlag(type: any, subType: any): any;
shouldBeProcessedInThisApplication(type: any, subType: any): boolean;
checkRedisEnabled(): any;
attachConfigChangeHandler(): void;
attachRedisUnsubscribeHandler(): void;
executeUnsubscribeHandlers(tenantId: string): void;
handleUnsubscribe(tenantId: string): void;
attachUnsubscribeHandler(cb: Function): void;
publishConfigChange(key: string, value: any): void;
blockEvent(type: string, subType: string, isPeriodic: boolean, tenant?: string): void;
executeUnsubscribeHandlers(tenantId: any): void;
handleUnsubscribe(tenantId: any): void;
attachUnsubscribeHandler(cb: any): void;
publishConfigChange(key: any, value: any): void;
blockEvent(type: any, subType: any, isPeriodic: any, tenant?: string): void;
clearPeriodicEventBlockList(): void;
unblockEvent(type: string, subType: string, isPeriodic: boolean, tenant?: string): void;
addCAPOutboxEvent(serviceName: string, config: any): void;
isEventBlocked(type: string, subType: string, isPeriodicEvent: boolean, tenant: string): boolean;
unblockEvent(type: any, subType: any, isPeriodic: any, tenant?: string): void;
addCAPOutboxEvent(serviceName: any, config: any): void;
isEventBlocked(type: any, subType: any, isPeriodicEvent: any, tenant: any): any;
set isEventQueueActive(value: boolean);
get isEventQueueActive(): boolean;
set isEventQueueActive(value: boolean);
set fileContent(config: any);
get fileContent(): any;
get events(): any[];
get periodicEvents(): any[];
isPeriodicEvent(type: string, subType: string): boolean;
get allEvents(): any[];
generateKey(type: any, subType: any): string;
removeEvent(type: any, subType: any): void;
isTenantUnsubscribed(tenantId: any): any;
get events(): any;
get periodicEvents(): any;
isPeriodicEvent(type: any, subType: any): any;
get allEvents(): any;
set forUpdateTimeout(value: number);
get forUpdateTimeout(): number;
set globalTxTimeout(value: number);
get globalTxTimeout(): number;
set forUpdateTimeout(value: number);
set globalTxTimeout(value: number);
get runInterval(): number | null;
set runInterval(value: number);
get redisEnabled(): boolean | null;
set redisEnabled(value: boolean | null);
set publishEventBlockList(value: any);
get publishEventBlockList(): any;
set crashOnRedisUnavailable(value: any);
get crashOnRedisUnavailable(): any;
set tenantIdFilterTokenInfo(value: any);
get tenantIdFilterTokenInfo(): any;
set tenantIdFilterEventProcessing(value: any);
get tenantIdFilterEventProcessing(): any;
set runInterval(value: any);
get runInterval(): any;
set redisEnabled(value: any);
get redisEnabled(): any;
set initialized(value: boolean);
get initialized(): boolean;
set initialized(value: boolean);
set cronTimezone(value: any);
get cronTimezone(): any;
set instanceLoadLimit(value: number);
get instanceLoadLimit(): number;
set instanceLoadLimit(value: number);
set isEventBlockedCb(value: any);
get isEventBlockedCb(): any;
set isEventBlockedCb(value: any);
get tableNameEventQueue(): string;
get tableNameEventLock(): string;
set configFilePath(value: string | null);
get configFilePath(): string | null;
set configFilePath(value: any);
get configFilePath(): any;
set processEventsAfterPublish(value: any);
get processEventsAfterPublish(): any;
set skipCsnCheck(value: any);
get skipCsnCheck(): any;
set disableRedis(value: any);

@@ -253,2 +262,4 @@ get disableRedis(): any;

get insertEventsBeforeCommit(): any;
set enableCAPTelemetry(value: any);
get enableCAPTelemetry(): any;
get isMultiTenancy(): boolean;

@@ -255,0 +266,0 @@ }

@@ -45,3 +45,2 @@ "use strict";

["crashOnRedisUnavailable", false],
["tenantIdFilterCb", null],
];

@@ -70,3 +69,2 @@

* @param {string} [options.crashOnRedisUnavailable=true] - If enabled an error is thrown if the redis connection check is not successful
* @param {function} [options.tenantIdFilterCb=null] - Allows to set customer filter function to filter the tenants ids which should be processed in the event-queue
*/

@@ -193,3 +191,3 @@ const initialize = async (options = {}) => {

const isTestProfile = cds.env.profiles.find((profile) => profile.includes("test"));
if (isTestProfile) {
if (isTestProfile || !config.redisEnabled) {
return;

@@ -196,0 +194,0 @@ }

@@ -15,3 +15,9 @@ "use strict";

function outboxed(srv, customOpts) {
// outbox max. once
if (!(new.target || customOpts)) {
const former = srv[OUTBOXED];
if (former) {
return former;
}
}
const logger = cds.log(COMPONENT_NAME);

@@ -25,10 +31,4 @@ const outboxOpts = Object.assign(

if (!new.target) {
const former = srv[OUTBOXED];
if (former) {
if (outboxOpts.kind === "persistent-outbox") {
config.addCAPOutboxEvent(srv.name, outboxOpts);
}
return former;
}
if (outboxOpts.kind === "persistent-outbox") {
config.addCAPOutboxEvent(srv.name, outboxOpts);
}

@@ -41,8 +41,6 @@

if (!new.target) {
Object.defineProperty(srv, OUTBOXED, { value: outboxedSrv });
if (!srv[OUTBOXED]) {
Object.defineProperty(srv, OUTBOXED, { value: outboxedSrv });
}
}
if (outboxOpts.kind === "persistent-outbox") {
config.addCAPOutboxEvent(srv.name, outboxOpts);
}
outboxedSrv.handle = async function (req) {

@@ -49,0 +47,0 @@ const context = req.context || cds.context;

@@ -204,52 +204,63 @@ "use strict";

const processEventMap = async (eventTypeInstance) => {
eventTypeInstance.startPerformanceTracerEvents();
await eventTypeInstance.beforeProcessingEvents();
eventTypeInstance.logStartMessage();
if (eventTypeInstance.commitOnEventLevel) {
eventTypeInstance.txUsageAllowed = false;
const processEventMap = async (instance) => {
instance.startPerformanceTracerEvents();
await instance.beforeProcessingEvents();
instance.logStartMessage();
if (instance.commitOnEventLevel) {
instance.txUsageAllowed = false;
}
await limiter(
eventTypeInstance.parallelEventProcessing,
Object.entries(eventTypeInstance.eventProcessingMap),
instance.parallelEventProcessing,
Object.entries(instance.eventProcessingMap),
async ([key, { queueEntries, payload }]) => {
if (eventTypeInstance.commitOnEventLevel) {
if (instance.commitOnEventLevel) {
let statusMap;
await executeInNewTransaction(
eventTypeInstance.baseContext,
`eventQueue-processEvent-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`,
instance.baseContext,
`eventQueue-processEvent-${instance.eventType}##${instance.eventSubType}`,
async (tx) => {
statusMap = await _processEvent(eventTypeInstance, tx.context, key, queueEntries, payload);
if (
eventTypeInstance.statusMapContainsError(statusMap) ||
eventTypeInstance.shouldRollbackTransaction(key)
) {
statusMap = await _processEvent(instance, tx.context, key, queueEntries, payload);
const shouldRollback =
instance.statusMapContainsError(statusMap) || instance.shouldRollbackTransaction(key);
if (shouldRollback) {
await tx.rollback();
await _commitStatusInNewTx(instance, statusMap);
} else {
await instance.persistEventStatus(tx, {
skipChecks: true,
statusMap,
});
}
}
);
await executeInNewTransaction(
eventTypeInstance.baseContext,
`eventQueue-persistStatus-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`,
async (tx) => {
eventTypeInstance.processEventContext = tx.context;
await eventTypeInstance.persistEventStatus(tx, {
skipChecks: true,
statusMap,
});
}
);
} else {
await _processEvent(eventTypeInstance, eventTypeInstance.context, key, queueEntries, payload);
await _processEvent(instance, instance.context, key, queueEntries, payload);
}
}
).finally(() => {
eventTypeInstance.clearEventProcessingContext();
if (eventTypeInstance.commitOnEventLevel) {
eventTypeInstance.txUsageAllowed = true;
}
});
eventTypeInstance.endPerformanceTracerEvents();
)
.catch((err) => {
instance.handleErrorTx(err);
})
.finally(() => {
instance.clearEventProcessingContext();
if (instance.commitOnEventLevel) {
instance.txUsageAllowed = true;
}
});
instance.endPerformanceTracerEvents();
};
const _commitStatusInNewTx = async (eventTypeInstance, statusMap) =>
await executeInNewTransaction(
eventTypeInstance.baseContext,
`eventQueue-persistStatus-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`,
async (tx) => {
eventTypeInstance.processEventContext = tx.context;
await eventTypeInstance.persistEventStatus(tx, {
skipChecks: true,
statusMap,
});
}
);
const _checkEventIsBlocked = async (baseInstance) => {

@@ -256,0 +267,0 @@ const isEventBlockedCb = config.isEventBlockedCb;

@@ -13,2 +13,3 @@ "use strict";

const trace = require("../shared/openTelemetry");
const { TenantIdCheckTypes } = require("../constants");

@@ -63,2 +64,6 @@ const EVENT_MESSAGE_CHANNEL = "EVENT_QUEUE_MESSAGE_CHANNEL";

if (!config.redisEnabled) {
const tenantShouldBeProcessed = await common.isTenantIdValidCb(TenantIdCheckTypes.eventProcessing, tenantId);
if (!tenantShouldBeProcessed) {
return;
}
await _processLocalWithoutRedis(tenantId, events);

@@ -71,2 +76,5 @@ return;

const eventConfig = config.getEventConfig(type, subType);
if (!eventConfig) {
continue;
}
for (let i = 0; i < TRIES_FOR_PUBLISH_PERIODIC_EVENT; i++) {

@@ -73,0 +81,0 @@ const result = eventConfig.multiInstanceProcessing

@@ -9,2 +9,3 @@ "use strict";

const common = require("../shared/common");
const { TenantIdCheckTypes } = require("../constants");

@@ -26,2 +27,6 @@ const EVENT_MESSAGE_CHANNEL = "EVENT_QUEUE_MESSAGE_CHANNEL";

const { lockId, tenantId, type, subType } = JSON.parse(messageData);
const tenantShouldBeProcessed = await common.isTenantIdValidCb(TenantIdCheckTypes.eventProcessing, tenantId);
if (!tenantShouldBeProcessed) {
return;
}
logger.debug("received redis event", {

@@ -28,0 +33,0 @@ tenantId,

@@ -114,13 +114,15 @@ "use strict";

const dummyContext = new cds.EventContext({});
const couldAcquireLock = await trace(
dummyContext,
"acquire-lock-master-runner",
async () => {
return await distributedLock.acquireLock(dummyContext, EVENT_QUEUE_RUN_REDIS_CHECK, {
expiryTime: eventQueueConfig.runInterval * 0.95,
tenantScoped: false,
});
},
{ newRootSpan: true }
);
const couldAcquireLock = config.tenantIdFilterEventProcessing
? true
: await trace(
dummyContext,
"acquire-lock-master-runner",
async () => {
return await distributedLock.acquireLock(dummyContext, EVENT_QUEUE_RUN_REDIS_CHECK, {
expiryTime: eventQueueConfig.runInterval * 0.95,
tenantScoped: false,
});
},
{ newRootSpan: true }
);
if (!couldAcquireLock) {

@@ -241,3 +243,3 @@ return;

await trace(tenantContext, "update-periodic-events-for-tenant", async () => {
if (!config.redisEnabled) {
if (!config.redisEnabled && !config.tenantIdFilterEventProcessing) {
const couldAcquireLock = await distributedLock.acquireLock(context, EVENT_QUEUE_UPDATE_PERIODIC_EVENTS, {

@@ -438,3 +440,3 @@ expiryTime: eventQueueConfig.runInterval * 0.95,

async () => {
if (config.redisEnabled) {
if (config.redisEnabled && !config.tenantIdFilterEventProcessing) {
const couldAcquireLock = await distributedLock.acquireLock(dummyContext, EVENT_QUEUE_UPDATE_PERIODIC_EVENTS, {

@@ -441,0 +443,0 @@ expiryTime: 60 * 1000, // short living lock --> assume we do not have 2 onboards within 1 minute

@@ -26,2 +26,3 @@ "use strict";

const logger = cds.log(COMPONENT_NAME);
let transactionRollbackPromise = Promise.resolve(false);
try {

@@ -40,3 +41,11 @@ const user = new cds.User.Privileged({ id: config.userId, tokenInfo: await common.getTokenInfo(context.tenant) });

tx.context._ = context._ ?? {};
return await fn(tx, ...parameters);
return new Promise((outerResolve, outerReject) => {
transactionRollbackPromise = new Promise((resolve) => {
tx.context.on("succeeded", () => resolve(false));
tx.context.on("failed", () => resolve(true));
fn(tx, ...parameters)
.then(outerResolve)
.catch(outerReject);
});
});
}

@@ -56,3 +65,6 @@ );

},
async (tx) => fn(tx, ...parameters)
async (tx) => {
await fn(tx, ...parameters);
transactionRollbackPromise = false;
}
);

@@ -74,6 +86,9 @@ } else {

};
await fn(contextTx, ...parameters).finally(() => (contextTx.rollback = txRollback));
await fn(contextTx, ...parameters)
.then(() => (transactionRollbackPromise = false))
.finally(() => (contextTx.rollback = txRollback));
}
}
} catch (err) {
const transactionRollback = await transactionRollbackPromise;
if (err instanceof VError) {

@@ -83,5 +98,9 @@ Object.assign(err.jse_info, {

});
throw err;
if (transactionRollback) {
throw err;
} else {
logger.error("business transaction commited but succeeded|done|failed threw a error!", err);
}
} else {
throw new VError(
const nestedError = new VError(
{

@@ -94,2 +113,7 @@ name: VERROR_CLUSTER_NAME,

);
if (transactionRollback) {
throw err;
} else {
logger.error("business transaction commited but succeeded|done|failed threw a error!", nestedError);
}
}

@@ -118,3 +142,9 @@ } finally {

.map((tenant) => tenant.subscribedTenantId ?? tenant.tenant)
.filter((tenantId) => common.isTenantIdValidCb(TenantIdCheckTypes.getAllTenantIds, tenantId));
.reduce(async (result, tenantId) => {
result = await result;
if (await common.isTenantIdValidCb(TenantIdCheckTypes.eventProcessing, tenantId)) {
result.push(tenantId);
}
return result;
}, []);
};

@@ -121,0 +151,0 @@

@@ -7,2 +7,4 @@ "use strict";

const xssec = require("@sap/xssec");
const VError = require("verror");
const config = require("../config");

@@ -48,5 +50,20 @@ const { TenantIdCheckTypes } = require("../constants");

}
return Promise.allSettled(returnPromises);
return promiseAllDone(returnPromises);
};
const promiseAllDone = async (iterable) => {
const results = await Promise.allSettled(iterable);
const rejects = results.filter((entry) => {
return entry.status === "rejected";
});
if (rejects.length === 1) {
return Promise.reject(rejects[0].reason);
} else if (rejects.length > 1) {
return Promise.reject(new VError.MultiError(rejects.map((reject) => reject.reason)));
}
return results.map((entry) => {
return entry.value;
});
};
const isValidDate = (value) => {

@@ -120,6 +137,19 @@ if (typeof value === "string") {

const isTenantIdValidCb = (checkType, tenantId) => {
if (config.tenantIdFilterCb) {
return config.tenantIdFilterCb(checkType, tenantId);
} else {
const isTenantIdValidCb = async (checkType, tenantId) => {
let cb;
switch (checkType) {
case TenantIdCheckTypes.getTokenInfo:
cb = config.tenantIdFilterTokenInfo;
break;
case TenantIdCheckTypes.eventProcessing:
cb = config.tenantIdFilterEventProcessing;
break;
default:
cb = async () => true;
}
try {
return cb ? await cb(tenantId) : true;
} catch (err) {
cds.log(COMPONENT_NAME).error("failed in custom tenant id filter callback. Returning true.", err);
return true;

@@ -137,2 +167,3 @@ }

isTenantIdValidCb,
promiseAllDone,
__: {

@@ -139,0 +170,0 @@ clearTokenInfoCache: () => (getTokenInfo._tokenInfoCache = {}),

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc