New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/event-queue

Package Overview
Dependencies
Maintainers
6
Versions
67
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/event-queue - npm Package Compare versions

Comparing version 0.2.1 to 0.2.2

src/periodicEvents.js

2

package.json
{
"name": "@cap-js-community/event-queue",
"version": "0.2.1",
"version": "0.2.2",
"description": "An event queue that enables secure transactional processing of asynchronous events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",

@@ -5,0 +5,0 @@ "main": "src/index.js",

@@ -7,2 +7,3 @@ "use strict";

const redis = require("./shared/redis");
const EventQueueError = require("./EventQueueError");

@@ -15,2 +16,3 @@ let instance;

const COMPONENT_NAME = "eventQueue/config";
const MIN_INTERVAL_SEC = 10;

@@ -35,2 +37,3 @@ class Config {

#eventMap;
#updatePeriodicEvents;
constructor() {

@@ -56,7 +59,7 @@ this.#logger = cds.log(COMPONENT_NAME);

getEventConfig(type, subType) {
return this.#eventMap[[type, subType].join("##")];
return this.#eventMap[this.generateKey(type, subType)];
}
hasEventAfterCommitFlag(type, subType) {
return this.#eventMap[[type, subType].join("##")]?.processAfterCommit ?? true;
return this.#eventMap[this.generateKey(type, subType)]?.processAfterCommit ?? true;
}

@@ -108,8 +111,45 @@

this.#config = config;
config.events = config.events ?? [];
config.periodicEvents = config.periodicEvents ?? [];
this.#eventMap = config.events.reduce((result, event) => {
this.validateAdHocEvents(result, event);
result[[event.type, event.subType].join("##")] = event;
return result;
}, {});
this.#eventMap = config.periodicEvents.reduce((result, event) => {
this.validatePeriodicConfig(result, event);
event.isPeriodic = true;
result[[event.type, event.subType].join("##")] = event;
return result;
}, this.#eventMap);
}
validatePeriodicConfig(eventMap, config) {
if (eventMap[this.generateKey(config.type, config.subType)]) {
throw EventQueueError.duplicateEventRegistration(config.type, config.subType);
}
if (!config.interval || config.interval <= MIN_INTERVAL_SEC) {
throw EventQueueError.invalidInterval(config.type, config.subType, config.interval);
}
if (!config.impl) {
throw EventQueueError.missingImpl(config.type, config.subType);
}
}
validateAdHocEvents(eventMap, config) {
if (eventMap[this.generateKey(config.type, config.subType)]) {
throw EventQueueError.duplicateEventRegistration(config.type, config.subType);
}
if (!config.impl) {
throw EventQueueError.missingImpl(config.type, config.subType);
}
}
generateKey(type, subType) {
return [type, subType].join("##");
}
get fileContent() {

@@ -123,2 +163,14 @@ return this.#config;

get periodicEvents() {
return this.#config.periodicEvents;
}
isPeriodicEvent(type, subType) {
return this.#eventMap[this.generateKey(type, subType)]?.isPeriodic;
}
get allEvents() {
return this.#config.events.concat(this.#config.periodicEvents);
}
get forUpdateTimeout() {

@@ -220,2 +272,10 @@ return this.#forUpdateTimeout;

set updatePeriodicEvents(value) {
this.#updatePeriodicEvents = value;
}
get updatePeriodicEvents() {
return this.#updatePeriodicEvents;
}
get isMultiTenancy() {

@@ -222,0 +282,0 @@ return !!cds.requires.multitenancy;

@@ -14,2 +14,5 @@ "use strict";

dbService.after("CREATE", def, (_, req) => {
if (req.tx._skipEventQueueBroadcase) {
return;
}
req.tx._ = req.tx._ ?? {};

@@ -16,0 +19,0 @@ req.tx._.eventQueuePublishEvents = req.tx._.eventQueuePublishEvents ?? {};

@@ -15,2 +15,6 @@ "use strict";

NO_VALID_DATE: "NO_VALID_DATE",
INVALID_INTERVAL: "INVALID_INTERVAL",
MISSING_IMPL: "MISSING_IMPL",
DUPLICATE_EVENT_REGISTRATION: "DUPLICATE_EVENT_REGISTRATION",
NO_MANUEL_INSERT_OF_PERIODIC: "NO_MANUEL_INSERT_OF_PERIODIC",
};

@@ -47,2 +51,14 @@

},
[ERROR_CODES.INVALID_INTERVAL]: {
message: "Invalid interval, the value needs to greater than 10 seconds.",
},
[ERROR_CODES.MISSING_IMPL]: {
message: "Missing path to event class implementation.",
},
[ERROR_CODES.DUPLICATE_EVENT_REGISTRATION]: {
message: "Duplicate event registration, check the uniqueness of type and subType.",
},
[ERROR_CODES.NO_MANUEL_INSERT_OF_PERIODIC]: {
message: "Periodic events are managed by the framework and are not allowed to insert manually.",
},
};

@@ -151,4 +167,48 @@

}
static invalidInterval(type, subType, interval) {
const { message } = ERROR_CODES_META[ERROR_CODES.INVALID_INTERVAL];
return new EventQueueError(
{
name: ERROR_CODES.INVALID_INTERVAL,
info: { type, subType, interval },
},
message
);
}
static missingImpl(type, subType) {
const { message } = ERROR_CODES_META[ERROR_CODES.MISSING_IMPL];
return new EventQueueError(
{
name: ERROR_CODES.MISSING_IMPL,
info: { type, subType },
},
message
);
}
static duplicateEventRegistration(type, subType) {
const { message } = ERROR_CODES_META[ERROR_CODES.DUPLICATE_EVENT_REGISTRATION];
return new EventQueueError(
{
name: ERROR_CODES.DUPLICATE_EVENT_REGISTRATION,
info: { type, subType },
},
message
);
}
static manuelPeriodicEventInsert(type, subType) {
const { message } = ERROR_CODES_META[ERROR_CODES.NO_MANUEL_INSERT_OF_PERIODIC];
return new EventQueueError(
{
name: ERROR_CODES.NO_MANUEL_INSERT_OF_PERIODIC,
info: { type, subType },
},
message
);
}
}
module.exports = EventQueueError;

@@ -10,3 +10,3 @@ "use strict";

const { arrayToFlatMap } = require("./shared/common");
const eventScheduler = require("./shared/EventScheduler");
const eventScheduler = require("./shared/eventScheduler");
const eventQueueConfig = require("./config");

@@ -307,2 +307,25 @@ const PerformanceTracer = require("./shared/PerformanceTracer");

handleErrorDuringPeriodicEventProcessing(error, queueEntry) {
this.logger.error(
`Caught error during event periodic processing. Please catch your promises/exceptions. Error: ${error}`,
{
eventType: this.#eventType,
eventSubType: this.#eventSubType,
queueEntryId: queueEntry.ID,
}
);
}
async setPeriodicEventStatus(queueEntryIds) {
await this.tx.run(
UPDATE.entity(this.#config.tableNameEventQueue)
.set({
status: EventProcessingStatus.Done,
})
.where({
ID: queueEntryIds,
})
);
}
/**

@@ -817,2 +840,43 @@ * This function validates for all selected events one status has been submitted. It's also validated that only for

async scheduleNextPeriodEvent(queueEntry) {
const interval = this.__eventConfig.interval;
const newEvent = {
type: this.#eventType,
subType: this.#eventSubType,
startAfter: new Date(new Date(queueEntry.startAfter).getTime() + interval * 1000),
};
this.tx._skipEventQueueBroadcase = true;
await this.tx.run(INSERT.into(this.#config.tableNameEventQueue).entries({ ...newEvent }));
this.tx._skipEventQueueBroadcase = false;
if (interval < this.#config.runInterval) {
this.#handleDelayedEvents([newEvent]);
}
}
async handleDuplicatedPeriodicEventEntry(queueEntries) {
this.logger.error("More than one open events for the same configuration which is not allowed!", {
eventType: this.#eventType,
eventSubType: this.#eventSubType,
queueEntriesIds: queueEntries.map(({ ID }) => ID),
});
let queueEntryToUse;
const obsoleteEntries = [];
for (const queueEntry of queueEntries) {
if (!queueEntryToUse) {
queueEntryToUse = queueEntry;
continue;
}
if (queueEntryToUse.startAfter <= queueEntry.queueEntry) {
obsoleteEntries.push(queueEntryToUse);
queueEntryToUse = queueEntry;
} else {
obsoleteEntries.push(queueEntry);
}
}
await this.setPeriodicEventStatus(obsoleteEntries.map(({ ID }) => ID));
return queueEntryToUse;
}
statusMapContainsError(statusMap) {

@@ -926,4 +990,8 @@ return Object.values(statusMap).includes(EventProcessingStatus.Error);

}
get isPeriodicEvent() {
return this.__eventConfig.isPeriodic;
}
}
module.exports = EventQueueProcessorBase;

@@ -37,2 +37,3 @@ "use strict";

["skipCsnCheck", false],
["updatePeriodicEvents", true],
];

@@ -51,2 +52,3 @@

skipCsnCheck,
updatePeriodicEvents,
} = {}) => {

@@ -73,3 +75,4 @@ // TODO: initialize check:

disableRedis,
skipCsnCheck
skipCsnCheck,
updatePeriodicEvents
);

@@ -123,4 +126,6 @@

const errorHandler = (err) => cds.log(COMPONENT).error("error during init runner", err);
if (!configInstance.isMultiTenancy) {
runner.singleTenant();
runner.singleTenant().catch(errorHandler);
return;

@@ -132,5 +137,5 @@ }

configInstance.attachConfigChangeHandler();
runner.multiTenancyRedis();
runner.multiTenancyRedis().catch(errorHandler);
} else {
runner.multiTenancyDb();
runner.multiTenancyDb().catch(errorHandler);
}

@@ -137,0 +142,0 @@ };

@@ -46,2 +46,5 @@ "use strict";

}
if (baseInstance.isPeriodicEvent) {
return await processPeriodicEvent(baseInstance);
}
eventConfig.startTime = startTime;

@@ -135,2 +138,62 @@ while (shouldContinue) {

// TODO: don't forget to release lock
const processPeriodicEvent = async (eventTypeInstance) => {
let queueEntry;
try {
await executeInNewTransaction(
eventTypeInstance.context,
`eventQueue-periodic-scheduleNext-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`,
async (tx) => {
eventTypeInstance.processEventContext = tx.context;
const queueEntries = await eventTypeInstance.getQueueEntriesAndSetToInProgress();
if (!queueEntries.length) {
return;
}
if (queueEntries.length > 1) {
queueEntry = await eventTypeInstance.handleDuplicatedPeriodicEventEntry(queueEntries);
} else {
queueEntry = queueEntries[0];
}
await eventTypeInstance.scheduleNextPeriodEvent(queueEntry);
}
);
if (!queueEntry) {
return;
}
await executeInNewTransaction(
eventTypeInstance.context,
`eventQueue-periodic-process-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`,
async (tx) => {
eventTypeInstance.processEventContext = tx.context;
eventTypeInstance.setTxForEventProcessing(queueEntry.ID, cds.tx(tx.context));
try {
await eventTypeInstance.processEvent(tx.context, queueEntry.ID, [queueEntry]);
} catch (err) {
eventTypeInstance.handleErrorDuringPeriodicEventProcessing(err, queueEntry);
throw new TriggerRollback();
}
if (
eventTypeInstance.transactionMode !== TransactionMode.alwaysCommit ||
eventTypeInstance.shouldRollbackTransaction(queueEntry.ID)
) {
throw new TriggerRollback();
}
}
);
await executeInNewTransaction(
eventTypeInstance.context,
`eventQueue-periodic-setStatus-${eventTypeInstance.eventType}##${eventTypeInstance.eventSubType}`,
async (tx) => {
eventTypeInstance.processEventContext = tx.context;
await eventTypeInstance.setPeriodicEventStatus(queueEntry.ID);
}
);
} finally {
await eventTypeInstance?.handleReleaseLock();
}
};
const processEventMap = async (eventTypeInstance) => {

@@ -137,0 +200,0 @@ eventTypeInstance.startPerformanceTracerEvents();

@@ -24,2 +24,3 @@ "use strict";

* }
* @param {Boolean} skipBroadcast - (Optional) If set to true, event broadcasting will be skipped. Defaults to false.
* @throws {EventQueueError} Throws an error if the configuration is not initialized.

@@ -30,3 +31,3 @@ * @throws {EventQueueError} Throws an error if the event type is unknown.

*/
const publishEvent = async (tx, events) => {
const publishEvent = async (tx, events, skipBroadcast = false) => {
const configInstance = config.getConfigInstance();

@@ -45,4 +46,11 @@ if (!configInstance.initialized) {

}
if (eventConfig.isPeriodic) {
throw EventQueueError.manuelPeriodicEventInsert(type, subType);
}
}
return await tx.run(INSERT.into(configInstance.tableNameEventQueue).entries(eventsForProcessing));
tx._skipEventQueueBroadcase = skipBroadcast;
const result = await tx.run(INSERT.into(configInstance.tableNameEventQueue).entries(eventsForProcessing));
tx._skipEventQueueBroadcase = false;
return result;
};

@@ -49,0 +57,0 @@

@@ -12,2 +12,4 @@ "use strict";

const { getSubdomainForTenantId } = require("./shared/cdsHelper");
const periodicEvents = require("./periodicEvents");
const { hashStringTo32Bit } = require("./shared/common");

@@ -17,14 +19,17 @@ const COMPONENT_NAME = "eventQueue/runner";

const EVENT_QUEUE_RUN_TS = "EVENT_QUEUE_RUN_TS";
const EVENT_QUEUE_RUN_PERIODIC_EVENT = "EVENT_QUEUE_RUN_PERIODIC_EVENT";
const OFFSET_FIRST_RUN = 10 * 1000;
const singleTenant = () => _scheduleFunction(_executeRunForTenant);
let tenantIdHash;
const multiTenancyDb = () => _scheduleFunction(_multiTenancyDb);
const singleTenant = () => _scheduleFunction(_checkPeriodicEventsSingleTenant, _executeRunForTenant);
const multiTenancyRedis = () => _scheduleFunction(_multiTenancyRedis);
const multiTenancyDb = () => _scheduleFunction(_multiTenancyPeriodicEvents, _multiTenancyDb);
const _scheduleFunction = async (fn) => {
const multiTenancyRedis = () => _scheduleFunction(_multiTenancyPeriodicEvents, _multiTenancyRedis);
const _scheduleFunction = async (singleRunFn, periodicFn) => {
const logger = cds.log(COMPONENT_NAME);
const configInstance = eventQueueConfig.getConfigInstance();
const eventsForAutomaticRun = configInstance.events;
const eventsForAutomaticRun = configInstance.allEvents;
if (!eventsForAutomaticRun.length) {

@@ -41,3 +46,3 @@ logger.warn("no events for automatic run are configured - skipping runner registration");

}
return fn();
return periodicFn();
};

@@ -52,2 +57,3 @@

setTimeout(() => {
singleRunFn();
fnWithRunningCheck();

@@ -64,2 +70,4 @@ const intervalRunner = new SetIntervalDriftSafe(configInstance.runInterval);

const tenantIds = await cdsHelper.getAllTenantIds();
_checkAndTriggerPriodicEventUpdate(tenantIds);
const runId = await _acquireRunId(emptyContext);

@@ -75,16 +83,17 @@

const _multiTenancyDb = async () => {
const logger = cds.log(COMPONENT_NAME);
try {
logger.info("executing event queue run for single instance and multi tenant");
const tenantIds = await cdsHelper.getAllTenantIds();
_executeAllTenants(tenantIds, EVENT_QUEUE_RUN_ID);
} catch (err) {
logger.error(
`Couldn't fetch tenant ids for event queue processing! Next try after defined interval. Error: ${err}`
);
const _checkAndTriggerPriodicEventUpdate = (tenantIds) => {
const hash = hashStringTo32Bit(JSON.stringify(tenantIds));
if (!tenantIdHash) {
tenantIdHash = hash;
return;
}
if (tenantIdHash && tenantIdHash !== hash) {
cds.log(COMPONENT_NAME).info("tenant id hash changed, triggering updating periodic events!");
_multiTenancyPeriodicEvents().catch((err) => {
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events! Error:", err);
});
}
};
const _executeAllTenants = (tenantIds, runId) => {
const _executeAllTenantsGeneric = (tenantIds, runId, fn) => {
const configInstance = eventQueueConfig.getConfigInstance();

@@ -102,3 +111,3 @@ const workerQueueInstance = getWorkerPoolInstance();

}
await _executeRunForTenant(tenantId, runId);
await fn(tenantId, runId);
} catch (err) {

@@ -113,2 +122,7 @@ cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {

const _executeAllTenants = (tenantIds, runId) => _executeAllTenantsGeneric(tenantIds, runId, _executeRunForTenant);
const _executePeriodicEventsAllTenants = (tenantIds, runId) =>
_executeAllTenantsGeneric(tenantIds, runId, _checkPeriodicEventsSingleTenant);
const _executeRunForTenant = async (tenantId, runId) => {

@@ -118,3 +132,3 @@ const logger = cds.log(COMPONENT_NAME);

try {
const eventsForAutomaticRun = configInstance.events;
const eventsForAutomaticRun = configInstance.allEvents;
const subdomain = await cdsHelper.getSubdomainForTenantId(tenantId);

@@ -223,2 +237,56 @@ const context = new cds.EventContext({

const _multiTenancyDb = async () => {
const logger = cds.log(COMPONENT_NAME);
try {
logger.info("executing event queue run for single instance and multi tenant");
const tenantIds = await cdsHelper.getAllTenantIds();
_checkAndTriggerPriodicEventUpdate(tenantIds);
_executeAllTenants(tenantIds, EVENT_QUEUE_RUN_ID);
} catch (err) {
logger.error(
`Couldn't fetch tenant ids for event queue processing! Next try after defined interval. Error: ${err}`
);
}
};
const _multiTenancyPeriodicEvents = async () => {
const logger = cds.log(COMPONENT_NAME);
try {
logger.info("executing event queue update periodic events");
const tenantIds = await cdsHelper.getAllTenantIds();
_executePeriodicEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_PERIODIC_EVENT);
} catch (err) {
logger.error(`Couldn't fetch tenant ids for updating periodic event processing! Error: ${err}`);
}
};
const _checkPeriodicEventsSingleTenant = async (tenantId) => {
const logger = cds.log(COMPONENT_NAME);
const configInstance = eventQueueConfig.getConfigInstance();
if (!configInstance.updatePeriodicEvents) {
logger.info("updating of periodic events is disabled");
}
try {
const subdomain = await cdsHelper.getSubdomainForTenantId(tenantId);
const context = new cds.EventContext({
tenant: tenantId,
// NOTE: we need this because of logging otherwise logs would not contain the subdomain
http: { req: { authInfo: { getSubdomain: () => subdomain } } },
});
cds.context = context;
logger.info("executing updating periotic events", {
tenantId,
subdomain,
});
await cdsHelper.executeInNewTransaction(context, "update-periodic-events", async (tx) => {
await periodicEvents.checkAndInsertPeriodicEvents(tx.context);
});
} catch (err) {
logger.error(`Couldn't process eventQueue for tenant! Next try after defined interval. Error: ${err}`, {
tenantId,
redisEnabled: configInstance.redisEnabled,
});
}
};
module.exports = {

@@ -225,0 +293,0 @@ singleTenant,

"use strict";
const crypto = require("crypto");
const { floor, abs, min } = Math;

@@ -121,2 +123,14 @@

module.exports = { arrayToFlatMap, Funnel, limiter, isValidDate };
const processChunkedSync = (inputs, chunkSize, chunkHandler) => {
let start = 0;
while (start < inputs.length) {
let end = start + chunkSize > inputs.length ? inputs.length : start + chunkSize;
const chunk = inputs.slice(start, end);
chunkHandler(chunk);
start = end;
}
};
const hashStringTo32Bit = (value) => crypto.createHash("sha256").update(String(value)).digest("base64").slice(0, 32);
module.exports = { arrayToFlatMap, Funnel, limiter, isValidDate, processChunkedSync, hashStringTo32Bit };
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc