New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/event-queue

Package Overview
Dependencies
Maintainers
6
Versions
67
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/event-queue - npm Package Compare versions

Comparing version 0.2.3 to 0.2.4

2

package.json
{
"name": "@cap-js-community/event-queue",
"version": "0.2.3",
"version": "0.2.4",
"description": "An event queue that enables secure transactional processing of asynchronous events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",

@@ -5,0 +5,0 @@ "main": "src/index.js",

@@ -14,2 +14,3 @@ "use strict";

const MIN_INTERVAL_SEC = 10;
const DEFAULT_LOAD = 1;

@@ -31,2 +32,3 @@ class Config {

#skipCsnCheck;
#registerAsEventProcessor;
#disableRedis;

@@ -111,2 +113,3 @@ #env;

this.#eventMap = config.events.reduce((result, event) => {
event.load = event.load ?? DEFAULT_LOAD;
this.validateAdHocEvents(result, event);

@@ -117,2 +120,3 @@ result[[event.type, event.subType].join("##")] = event;

this.#eventMap = config.periodicEvents.reduce((result, event) => {
event.load = event.load ?? DEFAULT_LOAD;
const SUFFIX_PERIODIC = "_PERIODIC";

@@ -281,2 +285,10 @@ event.type = `${event.type}${SUFFIX_PERIODIC}`;

set registerAsEventProcessor(value) {
this.#registerAsEventProcessor = value;
}
get registerAsEventProcessor() {
return this.#registerAsEventProcessor;
}
get isMultiTenancy() {

@@ -283,0 +295,0 @@ return !!cds.requires.multitenancy;

@@ -19,2 +19,3 @@ "use strict";

NO_MANUEL_INSERT_OF_PERIODIC: "NO_MANUEL_INSERT_OF_PERIODIC",
LOAD_HIGHER_THAN_LIMIT: "LOAD_HIGHER_THAN_LIMIT",
};

@@ -63,2 +64,5 @@

},
[ERROR_CODES.LOAD_HIGHER_THAN_LIMIT]: {
message: "The defined load of an event is higher than the maximum defined limit. Check your configuration!",
},
};

@@ -211,4 +215,14 @@

}
static loadHigherThanLimit(load) {
const { message } = ERROR_CODES_META[ERROR_CODES.LOAD_HIGHER_THAN_LIMIT];
return new EventQueueError(
{
name: ERROR_CODES.LOAD_HIGHER_THAN_LIMIT,
info: { load },
},
message
);
}
}
module.exports = EventQueueError;

@@ -101,2 +101,16 @@ "use strict";

/**
* Process one periodic event
* @param processContext the context valid for the event processing. This context is associated with a valid transaction
* Access to the context is also possible with this.getContextForEventProcessing(key).
* The associated tx can be accessed with this.getTxForEventProcessing(key).
* @param {string} key cluster key generated during the clustering step. By default, this is ID of the event queue entry
* @param {Object} queueEntry this is the queueEntry which should be processed
* @returns {Promise<undefined>}
*/
// eslint-disable-next-line no-unused-vars
async processPeriodicEvent(processContext, key, queueEntry) {
throw new Error(IMPLEMENT_ERROR_MESSAGE);
}
startPerformanceTracerEvents() {

@@ -258,3 +272,4 @@ this.__performanceLoggerEvents = new PerformanceTracer(this.logger, "Processing events");

this.logger.error(
`The supplied status tuple doesn't have the required structure. Setting all entries to error. Error: ${error.toString()}`,
"The supplied status tuple doesn't have the required structure. Setting all entries to error.",
error,
{

@@ -300,3 +315,4 @@ eventType: this.#eventType,

this.logger.error(
`Caught error during event processing - setting queue entry to error. Please catch your promises/exceptions. Error: ${error}`,
"Caught error during event processing - setting queue entry to error. Please catch your promises/exceptions",
error,
{

@@ -315,10 +331,7 @@ eventType: this.#eventType,

handleErrorDuringPeriodicEventProcessing(error, queueEntry) {
this.logger.error(
`Caught error during event periodic processing. Please catch your promises/exceptions. Error: ${error}`,
{
eventType: this.#eventType,
eventSubType: this.#eventSubType,
queueEntryId: queueEntry.ID,
}
);
this.logger.error("Caught error during event periodic processing. Please catch your promises/exceptions.", error, {
eventType: this.#eventType,
eventSubType: this.#eventSubType,
queueEntryId: queueEntry.ID,
});
}

@@ -498,3 +511,3 @@

handleErrorDuringClustering(error) {
this.logger.error(`Error during clustering of events - setting all queue entries to error. Error: ${error}`, {
this.logger.error("Error during clustering of events - setting all queue entries to error.", error, {
eventType: this.#eventType,

@@ -685,3 +698,4 @@ eventSubType: this.#eventSubType,

this.logger.error(
`Caught error during hook for exceeded events - setting queue entry to error. Please catch your promises/exceptions. Error: ${err}`,
"Caught error during hook for exceeded events - setting queue entry to error. Please catch your promises/exceptions.",
err,
{

@@ -831,2 +845,6 @@ eventType: this.#eventType,

if (!lockAcquired) {
this.logger.debug("no lock available, exit processing", {
type: this.#eventType,
subType: this.#eventSubType,
});
return false;

@@ -845,3 +863,3 @@ }

} catch (err) {
this.logger.error("Releasing distributed lock failed. Error:", err.toString());
this.logger.error("Releasing distributed lock failed.", err);
}

@@ -848,0 +866,0 @@ }

@@ -91,2 +91,3 @@ "use strict";

registerAsEventProcessor: config.registerAsEventProcessor,
processEventsAfterPublish: config.processEventsAfterPublish,
multiTenancyEnabled: config.isMultiTenancy,

@@ -93,0 +94,0 @@ redisEnabled: config.redisEnabled,

@@ -66,9 +66,12 @@ "use strict";

});
await tx.run(
DELETE.from(eventConfig.tableNameEventQueue).where(
"ID IN",
exitingWithNotMatchingInterval.map(({ ID }) => ID)
)
);
if (exitingWithNotMatchingInterval.length) {
await tx.run(
DELETE.from(eventConfig.tableNameEventQueue).where(
"ID IN",
exitingWithNotMatchingInterval.map(({ ID }) => ID)
)
);
}
const newOrChangedEvents = newEvents.concat(exitingWithNotMatchingInterval);

@@ -75,0 +78,0 @@

@@ -9,3 +9,3 @@ "use strict";

const { TransactionMode } = require("./constants");
const { limiter, Funnel } = require("./shared/common");
const { limiter } = require("./shared/common");

@@ -17,12 +17,2 @@ const { executeInNewTransaction, TriggerRollback } = require("./shared/cdsHelper");

const eventQueueRunner = async (context, events) => {
const startTime = new Date();
const funnel = new Funnel();
await Promise.allSettled(
events.map((event) =>
funnel.run(event.load, async () => processEventQueue(context, event.type, event.subType, startTime))
)
);
};
const processEventQueue = async (context, eventType, eventSubType, startTime = new Date()) => {

@@ -116,3 +106,3 @@ let iterationCounter = 0;

} catch (err) {
cds.log(COMPONENT_NAME).error("Processing event queue failed with unexpected error. Error:", err, {
cds.log(COMPONENT_NAME).error("Processing event queue failed with unexpected error.", err, {
eventType,

@@ -176,3 +166,3 @@ eventSubType,

try {
await eventTypeInstance.processEvent(tx.context, queueEntry.ID, [queueEntry]);
await eventTypeInstance.processPeriodicEvent(tx.context, queueEntry.ID, queueEntry);
} catch (err) {

@@ -201,3 +191,3 @@ eventTypeInstance.handleErrorDuringPeriodicEventProcessing(err, queueEntry);

} catch (err) {
cds.log(COMPONENT_NAME).error("Processing periodic events failed with unexpected error. Error:", err, {
cds.log(COMPONENT_NAME).error("Processing periodic events failed with unexpected error.", err, {
eventType: eventTypeInstance?.eventType,

@@ -286,3 +276,2 @@ eventSubType: eventTypeInstance?.eventSubType,

processEventQueue,
eventQueueRunner,
};

@@ -51,3 +51,6 @@ "use strict";

if (result) {
logger.info("skip publish redis event as no lock is available");
logger.info("skip publish redis event as no lock is available", {
type,
subType,
});
return;

@@ -62,3 +65,3 @@ }

} catch (err) {
logger.error(`publish event failed with error: ${err.toString()}`, {
logger.error("publish event failed!", err, {
tenantId,

@@ -65,0 +68,0 @@ type,

@@ -6,4 +6,4 @@ "use strict";

const eventQueueConfig = require("./config");
const { eventQueueRunner, processEventQueue } = require("./processEventQueue");
const { getWorkerPoolInstance } = require("./shared/WorkerQueue");
const { processEventQueue } = require("./processEventQueue");
const WorkerQueue = require("./shared/WorkerQueue");
const cdsHelper = require("./shared/cdsHelper");

@@ -25,3 +25,3 @@ const distributedLock = require("./shared/distributedLock");

const singleTenant = () => _scheduleFunction(_checkPeriodicEventsSingleTenant, _executeRunForTenant);
const singleTenant = () => _scheduleFunction(_checkPeriodicEventsSingleTenant, _singleTenantDb);

@@ -80,3 +80,3 @@ const multiTenancyDb = () => _scheduleFunction(_multiTenancyPeriodicEvents, _multiTenancyDb);

_executeAllTenants(tenantIds, runId);
return _executeEventsAllTenants(tenantIds, runId);
};

@@ -93,3 +93,3 @@

_multiTenancyPeriodicEvents().catch((err) => {
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events! Error:", err);
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events!", err);
});

@@ -99,6 +99,34 @@ }

const _executeAllTenantsGeneric = (tenantIds, runId, fn) => {
const workerQueueInstance = getWorkerPoolInstance();
const _executeEventsAllTenants = (tenantIds, runId) => {
const events = eventQueueConfig.allEvents;
const promises = [];
tenantIds.forEach((tenantId) => {
workerQueueInstance.addToQueue(async () => {
events.forEach((event) => {
promises.push(
WorkerQueue.instance.addToQueue(event.load, async () => {
try {
const lockId = `${runId}_${event.type}_${event.subType}`;
const tenantContext = new cds.EventContext({ tenant: tenantId });
const couldAcquireLock = await distributedLock.acquireLock(tenantContext, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {
return;
}
await runEventCombinationForTenant(tenantId, event.type, event.subType, true);
} catch (err) {
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {
tenantId,
});
}
})
);
});
});
return promises;
};
const _executePeriodicEventsAllTenants = (tenantIds, runId) => {
tenantIds.forEach((tenantId) => {
WorkerQueue.instance.addToQueue(1, async () => {
try {

@@ -112,3 +140,3 @@ const tenantContext = new cds.EventContext({ tenant: tenantId });

}
await fn(tenantId, runId);
await _checkPeriodicEventsSingleTenant(tenantId);
} catch (err) {

@@ -123,30 +151,16 @@ cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {

const _executeAllTenants = (tenantIds, runId) => _executeAllTenantsGeneric(tenantIds, runId, _executeRunForTenant);
const _executePeriodicEventsAllTenants = (tenantIds, runId) =>
_executeAllTenantsGeneric(tenantIds, runId, _checkPeriodicEventsSingleTenant);
const _executeRunForTenant = async (tenantId, runId) => {
const logger = cds.log(COMPONENT_NAME);
try {
const eventsForAutomaticRun = eventQueueConfig.allEvents;
const subdomain = await cdsHelper.getSubdomainForTenantId(tenantId);
const context = new cds.EventContext({
tenant: tenantId,
// NOTE: we need this because of logging otherwise logs would not contain the subdomain
http: { req: { authInfo: { getSubdomain: () => subdomain } } },
const _singleTenantDb = async (tenantId) => {
const events = eventQueueConfig.allEvents;
events.forEach((event) => {
WorkerQueue.instance.addToQueue(event.load, async () => {
try {
await runEventCombinationForTenant(tenantId, event.type, event.subType, true);
} catch (err) {
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {
tenantId,
redisEnabled: eventQueueConfig.redisEnabled,
});
}
});
cds.context = context;
logger.info("executing eventQueue run", {
tenantId,
subdomain,
...(runId ? { runId } : null),
});
await eventQueueRunner(context, eventsForAutomaticRun);
} catch (err) {
logger.error(`Couldn't process eventQueue for tenant! Next try after defined interval. Error: ${err}`, {
tenantId,
redisEnabled: eventQueueConfig.redisEnabled,
});
}
});
};

@@ -206,6 +220,3 @@

.log(COMPONENT_NAME)
.error(
"calculating offset for first run failed, falling back to default. Runs might be out-of-sync. Error:",
err
);
.error("calculating offset for first run failed, falling back to default. Runs might be out-of-sync.", err);
}

@@ -215,3 +226,3 @@ return offsetDependingOnLastRun;

const runEventCombinationForTenant = async (tenantId, type, subType) => {
const runEventCombinationForTenant = async (tenantId, type, subType, skipWorkerPool) => {
try {

@@ -225,3 +236,11 @@ const subdomain = await getSubdomainForTenantId(tenantId);

cds.context = context;
getWorkerPoolInstance().addToQueue(async () => await processEventQueue(context, type, subType));
if (skipWorkerPool) {
return await processEventQueue(context, type, subType);
} else {
const config = eventQueueConfig.getEventConfig(type, subType);
return await WorkerQueue.instance.addToQueue(
config.load,
async () => await processEventQueue(context, type, subType)
);
}
} catch (err) {

@@ -243,7 +262,5 @@ const logger = cds.log(COMPONENT_NAME);

_checkAndTriggerPeriodicEventUpdate(tenantIds);
_executeAllTenants(tenantIds, EVENT_QUEUE_RUN_ID);
return _executeEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_ID);
} catch (err) {
logger.error(
`Couldn't fetch tenant ids for event queue processing! Next try after defined interval. Error: ${err}`
);
logger.error("Couldn't fetch tenant ids for event queue processing! Next try after defined interval.", err);
}

@@ -259,3 +276,3 @@ };

} catch (err) {
logger.error(`Couldn't fetch tenant ids for updating periodic event processing! Error: ${err}`);
logger.error("Couldn't fetch tenant ids for updating periodic event processing!", err);
}

@@ -266,4 +283,8 @@ };

const logger = cds.log(COMPONENT_NAME);
if (!eventQueueConfig.updatePeriodicEvents) {
logger.info("updating of periodic events is disabled");
if (!eventQueueConfig.updatePeriodicEvents || !eventQueueConfig.periodicEvents.length) {
logger.info("updating of periodic events is disabled or no periodic events configured", {
updateEnabled: eventQueueConfig.updatePeriodicEvents,
events: eventQueueConfig.periodicEvents.length,
});
return;
}

@@ -286,3 +307,3 @@ try {

} catch (err) {
logger.error(`Couldn't process eventQueue for tenant! Next try after defined interval. Error: ${err}`, {
logger.error("Couldn't update periodic events for tenant! Next try after defined interval.", err, {
tenantId,

@@ -289,0 +310,0 @@ redisEnabled: eventQueueConfig.redisEnabled,

"use strict";
const crypto = require("crypto");
const { floor, abs, min } = Math;
const arrayToFlatMap = (array, key = "ID") => {

@@ -15,70 +12,2 @@ return array.reduce((result, element) => {

/**
* Establish a "Funnel" instance to limit how much
* load can be processed in parallel. This is somewhat
* similar to the limiter function however it has some
* distinctintly different features. The Funnel will
* not know in advance which functions and how many
* loads it will have to process.
*/
class Funnel {
/**
* Create a funnel with specified capacity
* @param capacity - the capacity of the funnel (integer, sign will be ignored)
*/
constructor(capacity = 100) {
this.runningPromises = [];
this.capacity = floor(abs(capacity));
}
/**
* Asynchronously run a function that will put a specified load to the funnel.
* The total amount of load of all running functions shall not
* exceed the capacity of the funnel. If the desired load exceeds the capacity
* the funnel will wait until sufficient capacity is available.
* If a function requires a load >= capacity, then it will run
* exclusively.
* @param load - the load (integer, sign will be ignored)
* @param f
* @param args
* @return {Promise<unknown>}
*/
async run(load, f, ...args) {
load = min(floor(abs(load)), Number.MAX_SAFE_INTEGER);
// wait for sufficient capacity
while (this.capacity < load && this.runningPromises.length > 0) {
try {
await Promise.race(this.runningPromises);
} catch {
// Yes, we must ignore exceptions here. The
// caller expects exceptions from f and no
// exceptions from other workloads.
// Other exceptions must be handled by the
// other callers. See (*) below.
}
}
// map function call to promise
const p = f.constructor.name === "AsyncFunction" ? f(...args) : Promise.resolve().then(() => f(...args));
// create promise for book keeping
const workload = p.finally(() => {
// remove workload
this.runningPromises.splice(this.runningPromises.indexOf(workload), 1);
// and reclaim its capacity
this.capacity += load;
});
// claim the capacity and schedule workload
this.capacity -= load;
this.runningPromises.push(workload);
// make the caller wait for the workload
// this also establish the seemingly missing
// exception handling. See (*) above.
return workload;
}
}
/**
* Defines a promise that resolves when all payloads are processed by the iterator, but limits

@@ -136,2 +65,2 @@ * the number concurrent executions.

module.exports = { arrayToFlatMap, Funnel, limiter, isValidDate, processChunkedSync, hashStringTo32Bit };
module.exports = { arrayToFlatMap, limiter, isValidDate, processChunkedSync, hashStringTo32Bit };

@@ -7,2 +7,4 @@ "use strict";

const KEY_PREFIX = "EVENT_QUEUE";
const acquireLock = async (context, key, { tenantScoped = true, expiryTime = config.globalTxTimeout } = {}) => {

@@ -132,3 +134,3 @@ const fullKey = _generateKey(context, tenantScoped, key);

keyParts.push(key);
return keyParts.join("##");
return `${KEY_PREFIX}_${keyParts.join("##")}`;
};

@@ -135,0 +137,0 @@

@@ -6,21 +6,38 @@ "use strict";

const config = require("../config");
const EventQueueError = require("../EventQueueError");
const COMPONENT_NAME = "eventQueue/WorkerQueue";
const NANO_TO_MS = 1e6;
const THRESHOLD = {
INFO: 5 * 1000,
WARN: 10 * 1000,
ERROR: 15 * 1000,
};
let instance = null;
class WorkerQueue {
#concurrencyLimit;
#runningPromises;
#runningLoad;
#queue;
static #instance;
class WorkerQueue {
constructor(concurrency) {
if (Number.isNaN(concurrency) || concurrency <= 0) {
this.__concurrencyLimit = 1;
this.#concurrencyLimit = 1;
} else {
this.__concurrencyLimit = concurrency;
this.#concurrencyLimit = concurrency;
}
this.__runningPromises = [];
this.__queue = [];
this.#runningPromises = [];
this.#runningLoad = 0;
this.#queue = [];
}
addToQueue(cb) {
addToQueue(load, cb) {
if (load > this.#concurrencyLimit) {
throw EventQueueError.loadHigherThanLimit(load);
}
const startTime = process.hrtime.bigint();
const p = new Promise((resolve, reject) => {
this.__queue.push([cb, resolve, reject]);
this.#queue.push([load, cb, resolve, reject, startTime]);
});

@@ -31,8 +48,11 @@ this._checkForNext();

_executeFunction(cb, resolve, reject) {
_executeFunction(load, cb, resolve, reject, startTime) {
this.checkAndLogWaitingTime(startTime);
const promise = Promise.resolve().then(() => cb());
this.__runningPromises.push(promise);
this.#runningPromises.push(promise);
this.#runningLoad = this.#runningLoad + load;
promise
.finally(() => {
this.__runningPromises.splice(this.__runningPromises.indexOf(promise), 1);
this.#runningLoad = this.#runningLoad - load;
this.#runningPromises.splice(this.#runningPromises.indexOf(promise), 1);
this._checkForNext();

@@ -44,3 +64,3 @@ })

.catch((err) => {
cds.log(COMPONENT_NAME).error("Error happened in WorkQueue. Errors should be caught before! Error:", err);
cds.log(COMPONENT_NAME).error("Error happened in WorkQueue. Errors should be caught before!", err);
reject(err);

@@ -51,20 +71,42 @@ });

_checkForNext() {
if (!this.__queue.length || this.__runningPromises.length >= this.__concurrencyLimit) {
const load = this.#queue[0]?.[0];
if (!this.#queue.length || this.#runningLoad + load > this.#concurrencyLimit) {
return;
}
const [cb, resolve, reject] = this.__queue.shift();
this._executeFunction(cb, resolve, reject);
const args = this.#queue.shift();
this._executeFunction(...args);
}
get runningPromises() {
return this.#runningPromises;
}
/**
@return { WorkerQueue }
**/
static get instance() {
if (!WorkerQueue.#instance) {
WorkerQueue.#instance = new WorkerQueue(config.parallelTenantProcessing);
}
return WorkerQueue.#instance;
}
checkAndLogWaitingTime(startTime) {
const diffMs = Math.round(Number(process.hrtime.bigint() - startTime) / NANO_TO_MS);
let logLevel;
if (diffMs >= THRESHOLD.ERROR) {
logLevel = "error";
} else if (diffMs >= THRESHOLD.WARN) {
logLevel = "warn";
} else if (diffMs >= THRESHOLD.INFO) {
logLevel = "info";
} else {
logLevel = "debug";
}
cds.log(COMPONENT_NAME)[logLevel]("Waiting time in worker queue", {
diffMs,
});
}
}
module.exports = {
getWorkerPoolInstance: () => {
if (!instance) {
instance = new WorkerQueue(config.parallelTenantProcessing);
}
return instance;
},
_: {
WorkerQueue,
},
};
module.exports = WorkerQueue;
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc