New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/event-queue

Package Overview
Dependencies
Maintainers
3
Versions
73
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/event-queue - npm Package Compare versions

Comparing version 1.8.6 to 1.8.7

2

package.json
{
"name": "@cap-js-community/event-queue",
"version": "1.8.6",
"version": "1.8.7",
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",

@@ -5,0 +5,0 @@ "main": "src/index.js",

@@ -20,2 +20,3 @@ "use strict";

const DEFAULT_PRIORITY = Priorities.Medium;
const DEFAULT_INCREASE_PRIORITY = true;
const SUFFIX_PERIODIC = "_PERIODIC";

@@ -321,2 +322,3 @@ const COMMAND_BLOCK = "EVENT_QUEUE_EVENT_BLOCK";

multiInstanceProcessing: config.multiInstanceProcessing,
increasePriorityOverTime: config.increasePriorityOverTime,
internalEvent: true,

@@ -369,3 +371,2 @@ };

this.#eventMap = config.periodicEvents.reduce((result, event) => {
event.priority = event.priority ?? DEFAULT_PRIORITY;
event.type = `${event.type}${SUFFIX_PERIODIC}`;

@@ -384,2 +385,3 @@ event.isPeriodic = true;

event.priority = event.priority ?? DEFAULT_PRIORITY;
event.increasePriorityOverTime = event.increasePriorityOverTime ?? DEFAULT_INCREASE_PRIORITY;
}

@@ -386,0 +388,0 @@

@@ -272,3 +272,9 @@ import * as cds from "@sap/cds";

addToQueue(load: number, label: string, priority?: Priorities, cb?: () => any): Promise<any>;
addToQueue(
load: number,
label: string,
priority: Priorities,
increasePriorityOverTime: boolean,
cb: () => any
): Promise<any>;

@@ -275,0 +281,0 @@ _executeFunction(

@@ -197,29 +197,35 @@ "use strict";

const label = `${eventConfig.type}_${eventConfig.subType}`;
return await WorkerQueue.instance.addToQueue(eventConfig.load, label, eventConfig.priority, async () => {
return await cds.tx(tenantContext, async ({ context }) => {
await trace(
context,
label,
async () => {
try {
const lockId = `${runId}_${label}`;
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {
return;
return await WorkerQueue.instance.addToQueue(
eventConfig.load,
label,
eventConfig.priority,
eventConfig.increasePriorityOverTime,
async () => {
return await cds.tx(tenantContext, async ({ context }) => {
await trace(
context,
label,
async () => {
try {
const lockId = `${runId}_${label}`;
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {
return;
}
await runEventCombinationForTenant(context, eventConfig.type, eventConfig.subType, {
skipWorkerPool: true,
});
} catch (err) {
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {
tenantId,
});
}
await runEventCombinationForTenant(context, eventConfig.type, eventConfig.subType, {
skipWorkerPool: true,
});
} catch (err) {
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {
tenantId,
});
}
},
{ newRootSpan: true }
);
});
});
},
{ newRootSpan: true }
);
});
}
);
})

@@ -279,29 +285,35 @@ );

const label = `${eventConfig.type}_${eventConfig.subType}`;
return await WorkerQueue.instance.addToQueue(eventConfig.load, label, eventConfig.priority, async () => {
return await cds.tx({}, async ({ context }) => {
await trace(
context,
label,
async () => {
try {
const lockId = `${label}`;
const couldAcquireLock = eventConfig.multiInstanceProcessing
? true
: await distributedLock.acquireLock(context, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {
return;
return await WorkerQueue.instance.addToQueue(
eventConfig.load,
label,
eventConfig.priority,
eventConfig.increasePriorityOverTime,
async () => {
return await cds.tx({}, async ({ context }) => {
await trace(
context,
label,
async () => {
try {
const lockId = `${label}`;
const couldAcquireLock = eventConfig.multiInstanceProcessing
? true
: await distributedLock.acquireLock(context, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {
return;
}
await runEventCombinationForTenant(context, eventConfig.type, eventConfig.subType, {
skipWorkerPool: true,
});
} catch (err) {
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed");
}
await runEventCombinationForTenant(context, eventConfig.type, eventConfig.subType, {
skipWorkerPool: true,
});
} catch (err) {
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed");
}
},
{ newRootSpan: true }
);
});
});
},
{ newRootSpan: true }
);
});
}
);
})

@@ -308,0 +320,0 @@ );

@@ -26,2 +26,3 @@ "use strict";

eventConfig.priority,
eventConfig.increasePriorityOverTime,
AsyncResource.bind(async () => {

@@ -28,0 +29,0 @@ const _exec = async () => {

@@ -55,3 +55,3 @@ "use strict";

addToQueue(load, label, priority = Priorities.Medium, cb) {
addToQueue(load, label, priority = Priorities.Medium, increasePriorityOverTime, cb) {
if (load > this.#concurrencyLimit) {

@@ -67,3 +67,3 @@ throw EventQueueError.loadHigherThanLimit(load, label);

const p = new Promise((resolve, reject) => {
this.#queue[priority].push([load, label, cb, resolve, reject, startTime]);
this.#queue[priority].push([load, label, cb, resolve, reject, increasePriorityOverTime, startTime]);
});

@@ -83,3 +83,8 @@ this.#checkForNext();

const queueEntry = this.queue[priority][i];
const startTime = queueEntry[6] ?? queueEntry[5];
// NOTE: index 5 - increasingPrioOverTime
if (!queueEntry[5]) {
continue;
}
// NOTE: index 6 original time; index 7 shifted time
const startTime = queueEntry[7] ?? queueEntry[6];
if (Math.round(Number(checkTime - startTime) / NANO_TO_MS) > INCREASE_PRIORITY_AFTER * MIN_TO_MS) {

@@ -94,3 +99,3 @@ const [entry] = this.queue[priority].splice(i, 1);

_executeFunction(load, label, cb, resolve, reject, startTime, priority) {
_executeFunction(priority, load, label, cb, resolve, reject, skipIncreasingPrioOverTime, startTime) {
this.#checkAndLogWaitingTime(startTime, label, priority);

@@ -130,3 +135,3 @@ const promise = Promise.resolve().then(() => cb());

const [args] = this.#queue[priority].splice(i, 1);
this._executeFunction(...args, priority);
this._executeFunction(priority, ...args);
entryFound = true;

@@ -133,0 +138,0 @@ break;

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc