New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/event-queue

Package Overview
Dependencies
Maintainers
7
Versions
67
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/event-queue - npm Package Compare versions

Comparing version 1.0.3 to 1.1.0

src/outbox/eventQueueAsOutbox.js

10

package.json
{
"name": "@cap-js-community/event-queue",
"version": "1.0.3",
"version": "1.1.0",
"description": "An event queue that enables secure transactional processing of asynchronous events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",

@@ -50,7 +50,7 @@ "main": "src/index.js",

"devDependencies": {
"@sap/cds": "7.5.1",
"@sap/cds-dk": "7.5.0",
"@sap/cds": "7.5.2",
"@sap/cds-dk": "7.5.1",
"eslint": "8.56.0",
"eslint-config-prettier": "9.1.0",
"eslint-plugin-jest": "27.6.1",
"eslint-plugin-jest": "27.6.2",
"eslint-plugin-node": "11.1.0",

@@ -61,3 +61,3 @@ "express": "4.18.2",

"prettier": "2.8.8",
"sqlite3": "5.1.7-rc.0"
"sqlite3": "5.1.7"
},

@@ -64,0 +64,0 @@ "homepage": "https://cap-js-community.github.io/event-queue/",

@@ -13,3 +13,3 @@ "use strict";

const REDIS_CONFIG_BLOCKLIST_CHANNEL = "REDIS_CONFIG_BLOCKLIST_CHANNEL";
const COMPONENT_NAME = "eventQueue/config";
const COMPONENT_NAME = "/eventQueue/config";
const MIN_INTERVAL_SEC = 10;

@@ -20,3 +20,6 @@ const DEFAULT_LOAD = 1;

const COMMAND_UNBLOCK = "EVENT_QUEUE_EVENT_UNBLOCK";
const CAP_EVENT_TYPE = "CAP_OUTBOX";
const CAP_PARALLEL_DEFAULT = 5;
const BASE_PERIODIC_EVENTS = [

@@ -56,2 +59,3 @@ {

#thresholdLoggingEventProcessing;
#useAsCAPOutbox;
static #instance;

@@ -82,2 +86,6 @@ constructor() {

isCapOutboxEvent(type) {
return type === CAP_EVENT_TYPE;
}
hasEventAfterCommitFlag(type, subType) {

@@ -187,2 +195,26 @@ return this.#eventMap[this.generateKey(type, subType)]?.processAfterCommit ?? true;

addCAPOutboxEvent(serviceName, config) {
if (this.#eventMap[this.generateKey(CAP_EVENT_TYPE, serviceName)]) {
return;
}
const eventConfig = {
type: CAP_EVENT_TYPE,
subType: serviceName,
load: config.load ?? DEFAULT_LOAD,
impl: "./outbox/EventQueueGenericOutboxHandler",
selectMaxChunkSize: config.chunkSize,
parallelEventProcessing: config.parallelEventProcessing ?? (config.parallel && CAP_PARALLEL_DEFAULT),
retryAttempts: config.maxAttempts,
transactionMode: config.transactionMode,
processAfterCommit: config.processAfterCommit,
eventOutdatedCheck: config.eventOutdatedCheck,
checkForNextChunk: config.checkForNextChunk,
deleteFinishedEventsAfterDays: config.deleteFinishedEventsAfterDays,
internalEvent: true,
};
this.#config.events.push(eventConfig);
this.#eventMap[this.generateKey(CAP_EVENT_TYPE, serviceName)] = eventConfig;
}
#unblockPeriodicEventLocalState(key, tenant) {

@@ -222,3 +254,3 @@ const map = this.#blockedPeriodicEvents[key];

this.validateAdHocEvents(result, event);
result[[event.type, event.subType].join("##")] = event;
result[this.generateKey(event.type, event.subType)] = event;
return result;

@@ -231,3 +263,3 @@ }, {});

this.validatePeriodicConfig(result, event);
result[[event.type, event.subType].join("##")] = event;
result[this.generateKey(event.type, event.subType)] = event;
return result;

@@ -267,2 +299,10 @@ }, this.#eventMap);

removeEvent(type, subType) {
const index = this.#config.events.findIndex((event) => event.type === "CAP_OUTBOX");
if (index >= 0) {
this.#config.events.splice(index, 1);
}
delete this.#eventMap[this.generateKey(type, subType)];
}
get fileContent() {

@@ -416,2 +456,10 @@ return this.#config;

set useAsCAPOutbox(value) {
this.#useAsCAPOutbox = value;
}
get useAsCAPOutbox() {
return this.#useAsCAPOutbox;
}
get isMultiTenancy() {

@@ -418,0 +466,0 @@ return !!cds.requires.multitenancy;

@@ -8,3 +8,3 @@ "use strict";

const COMPONENT_NAME = "eventQueue/dbHandler";
const COMPONENT_NAME = "/eventQueue/dbHandler";

@@ -11,0 +11,0 @@ const registerEventQueueDbHandler = (dbService) => {

@@ -15,3 +15,3 @@ "use strict";

const IMPLEMENT_ERROR_MESSAGE = "needs to be reimplemented";
const COMPONENT_NAME = "eventQueue/EventQueueProcessorBase";
const COMPONENT_NAME = "/eventQueue/EventQueueProcessorBase";

@@ -873,3 +873,3 @@ const DEFAULT_RETRY_ATTEMPTS = 3;

async handleDistributedLock() {
async acquireDistributedLock() {
if (this.concurrentEventProcessing) {

@@ -876,0 +876,0 @@ return true;

@@ -17,2 +17,3 @@ "use strict";

const { closeMainClient } = require("./shared/redis");
const eventQueueAsOutbox = require("./outbox/eventQueueAsOutbox");

@@ -39,2 +40,3 @@ const readFileAsync = promisify(fs.readFile);

["thresholdLoggingEventProcessing", 50],
["useAsCAPOutbox", false],
];

@@ -54,2 +56,3 @@

thresholdLoggingEventProcessing,
useAsCAPOutbox,
} = {}) => {

@@ -76,3 +79,4 @@ // TODO: initialize check:

updatePeriodicEvents,
thresholdLoggingEventProcessing
thresholdLoggingEventProcessing,
useAsCAPOutbox
);

@@ -91,2 +95,3 @@

monkeyPatchCAPOutbox();
registerEventProcessors();

@@ -141,2 +146,13 @@ registerCdsShutdown();

const monkeyPatchCAPOutbox = () => {
if (config.useAsCAPOutbox) {
Object.defineProperty(cds, "outboxed", {
get: () => eventQueueAsOutbox.outboxed,
});
Object.defineProperty(cds, "unboxed", {
get: () => eventQueueAsOutbox.unboxed,
});
}
};
const csnCheck = async () => {

@@ -143,0 +159,0 @@ const eventCsn = cds.model.definitions[config.tableNameEventQueue];

@@ -9,3 +9,3 @@ "use strict";

const COMPONENT_NAME = "eventQueue/periodicEvents";
const COMPONENT_NAME = "/eventQueue/periodicEvents";
const CHUNK_SIZE_INSERT_PERIODIC_EVENTS = 4;

@@ -12,0 +12,0 @@

@@ -13,3 +13,3 @@ "use strict";

const COMPONENT_NAME = "eventQueue/processEventQueue";
const COMPONENT_NAME = "/eventQueue/processEventQueue";
const MAX_EXECUTION_TIME = 5 * 60 * 1000;

@@ -33,3 +33,3 @@

baseInstance = new EventTypeClass(context, eventType, eventSubType, eventConfig);
const continueProcessing = await baseInstance.handleDistributedLock();
const continueProcessing = await baseInstance.acquireDistributedLock();
if (!continueProcessing) {

@@ -36,0 +36,0 @@ return;

@@ -8,7 +8,7 @@ "use strict";

const config = require("./config");
const { runEventCombinationForTenant } = require("./runner");
const runner = require("./runner");
const { getSubdomainForTenantId } = require("./shared/cdsHelper");
const EVENT_MESSAGE_CHANNEL = "EVENT_QUEUE_MESSAGE_CHANNEL";
const COMPONENT_NAME = "eventQueue/redisPubSub";
const COMPONENT_NAME = "/eventQueue/redisPubSub";

@@ -21,6 +21,6 @@ let subscriberClientPromise;

}
redis.subscribeRedisChannel(EVENT_MESSAGE_CHANNEL, messageHandlerProcessEvents);
redis.subscribeRedisChannel(EVENT_MESSAGE_CHANNEL, _messageHandlerProcessEvents);
};
const messageHandlerProcessEvents = async (messageData) => {
const _messageHandlerProcessEvents = async (messageData) => {
const logger = cds.log(COMPONENT_NAME);

@@ -48,4 +48,26 @@ try {

};
if (!config.getEventConfig(type, subType)) {
if (config.isCapOutboxEvent(type)) {
try {
const service = await cds.connect.to(subType);
cds.outboxed(service);
} catch (err) {
logger.error("could not connect to outboxed service", err, {
type,
subType,
});
return;
}
} else {
logger.error("cannot find configuration for published event. Event won't be processed", {
type,
subType,
});
return;
}
}
return await cds.tx(tenantContext, async ({ context }) => {
return await runEventCombinationForTenant(context, type, subType);
return await runner.runEventCombinationForTenant(context, type, subType);
});

@@ -82,3 +104,3 @@ } catch (err) {

return await cds.tx(context, async ({ context }) => {
return await runEventCombinationForTenant(context, type, subType);
return await runner.runEventCombinationForTenant(context, type, subType);
});

@@ -129,2 +151,5 @@ }

closeSubscribeClient,
__: {
_messageHandlerProcessEvents,
},
};

@@ -15,3 +15,3 @@ "use strict";

const COMPONENT_NAME = "eventQueue/runner";
const COMPONENT_NAME = "/eventQueue/runner";
const EVENT_QUEUE_RUN_ID = "EVENT_QUEUE_RUN_ID";

@@ -325,3 +325,3 @@ const EVENT_QUEUE_RUN_TS = "EVENT_QUEUE_RUN_TS";

runEventCombinationForTenant,
_: {
__: {
_singleTenantDb,

@@ -328,0 +328,0 @@ _multiTenancyRedis,

@@ -11,3 +11,3 @@ "use strict";

const VERROR_CLUSTER_NAME = "ExecuteInNewTransactionError";
const COMPONENT_NAME = "eventQueue/cdsHelper";
const COMPONENT_NAME = "/eventQueue/cdsHelper";

@@ -14,0 +14,0 @@ /**

@@ -8,3 +8,3 @@ "use strict";

const COMPONENT_NAME = "eventQueue/shared/eventScheduler";
const COMPONENT_NAME = "/eventQueue/shared/eventScheduler";

@@ -11,0 +11,0 @@ let instance;

@@ -8,3 +8,3 @@ "use strict";

const COMPONENT_NAME = "eventQueue/shared/redis";
const COMPONENT_NAME = "/eventQueue/shared/redis";

@@ -11,0 +11,0 @@ let mainClientPromise;

@@ -8,3 +8,3 @@ "use strict";

const COMPONENT_NAME = "eventQueue/WorkerQueue";
const COMPONENT_NAME = "/eventQueue/WorkerQueue";
const NANO_TO_MS = 1e6;

@@ -11,0 +11,0 @@ const THRESHOLD = {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc