Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/event-queue

Package Overview
Dependencies
Maintainers
7
Versions
65
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/event-queue - npm Package Compare versions

Comparing version 1.2.5 to 1.2.6

2

package.json
{
"name": "@cap-js-community/event-queue",
"version": "1.2.5",
"version": "1.2.6",
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",

@@ -5,0 +5,0 @@ "main": "src/index.js",

@@ -21,2 +21,3 @@ "use strict";

SCHEMA_TENANT_MISMATCH: "SCHEMA_TENANT_MISMATCH",
GLOBAL_CDS_CONTEXT_MISSMATCH: "GLOBAL_CDS_CONTEXT_MISSMATCH",
};

@@ -71,2 +72,5 @@

},
[ERROR_CODES.GLOBAL_CDS_CONTEXT_MISSMATCH]: {
message: "The global cds context does not match the local cds context.",
},
};

@@ -240,4 +244,15 @@

}
static globalCdsContextNotMatchingLocal(globalProperties, localProperties) {
const { message } = ERROR_CODES_META[ERROR_CODES.GLOBAL_CDS_CONTEXT_MISSMATCH];
return new EventQueueError(
{
name: ERROR_CODES.GLOBAL_CDS_CONTEXT_MISSMATCH,
info: { globalProperties, localProperties },
},
message
);
}
}
module.exports = EventQueueError;

@@ -73,2 +73,4 @@ "use strict";

this.__queueEntries = [];
this.#checkGlobalContextToLocalContext();
}

@@ -537,3 +539,5 @@

const refDateStartAfter = new Date(Date.now() + this.#config.runInterval * 1.2);
this.#checkGlobalContextToLocalContext();
await executeInNewTransaction(this.__baseContext, "eventQueue-getQueueEntriesAndSetToInProgress", async (tx) => {
this.#checkGlobalContextToLocalContext();
await this.checkTxConsistency(tx);

@@ -697,3 +701,3 @@ const entries = await tx.run(

}
if (txSchema !== serviceManagerSchema) {
if (serviceManagerSchema && txSchema !== serviceManagerSchema) {
const err = EventQueueError.dbClientSchemaMismatch(tx.context.tenant, txSchema, serviceManagerSchema);

@@ -730,2 +734,44 @@ errorHandler(err);

#checkGlobalContextToLocalContext() {
if (!this.#config.enableTxConsistencyCheck) {
return;
}
if (this.__context.tenant !== cds.context.tenant) {
throw EventQueueError.globalCdsContextNotMatchingLocal(
JSON.stringify(
{
correlationId: cds.context.id,
tenantId: cds.context.tenant,
timestamp: cds.context.timestamp,
base: JSON.stringify(
{
correlationId: cds.context.context?.id,
tenantId: cds.context.context?.tenant,
timestamp: cds.context.context?.timestamp,
},
null,
2
),
},
null,
2
),
JSON.stringify(
{
correlationId: this.__context.id,
tenantId: this.__context.tenant,
timestamp: this.__context.timestamp,
base: JSON.stringify({
correlationId: this.__context.context?.id,
tenantId: this.__context.context?.tenant,
timestamp: this.__context.context?.timestamp,
}),
},
null,
2
)
);
}
}
#handleDelayedEvents(delayedEvents) {

@@ -732,0 +778,0 @@ for (const delayedEvent of delayedEvents) {

@@ -29,5 +29,5 @@ "use strict";

const multiTenancyDb = () => _scheduleFunction(_multiTenancyPeriodicEvents, _multiTenancyDb);
const multiTenancyDb = () => _scheduleFunction(async () => {}, _multiTenancyDb);
const multiTenancyRedis = () => _scheduleFunction(_multiTenancyPeriodicEvents, _multiTenancyRedis);
const multiTenancyRedis = () => _scheduleFunction(async () => {}, _multiTenancyRedis);

@@ -73,3 +73,3 @@ const _scheduleFunction = async (singleRunFn, periodicFn) => {

const tenantIds = await cdsHelper.getAllTenantIds();
_checkAndTriggerPeriodicEventUpdate(tenantIds);
await _checkPeriodicEventUpdate(tenantIds);

@@ -83,13 +83,12 @@ const runId = await _acquireRunId(emptyContext);

return _executeEventsAllTenants(tenantIds, runId);
return await _executeEventsAllTenants(tenantIds, runId);
};
const _checkAndTriggerPeriodicEventUpdate = (tenantIds) => {
const _checkPeriodicEventUpdate = async (tenantIds) => {
const hash = hashStringTo32Bit(JSON.stringify(tenantIds));
if (!tenantIdHash) {
tenantIdHash = hash;
_multiTenancyPeriodicEvents().catch((err) => {
return await _multiTenancyPeriodicEvents(tenantIds).catch((err) => {
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events!", err);
});
return;
}

@@ -99,3 +98,3 @@ if (tenantIdHash && tenantIdHash !== hash) {

cds.log(COMPONENT_NAME).info("tenant id hash changed, triggering updating periodic events!");
_multiTenancyPeriodicEvents().catch((err) => {
return await _multiTenancyPeriodicEvents(tenantIds).catch((err) => {
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events!", err);

@@ -115,23 +114,59 @@ });

return product.map(async ([tenantId, event]) => {
const subdomain = await getSubdomainForTenantId(tenantId);
const user = new cds.User.Privileged(config.userId);
const tenantContext = {
tenant: tenantId,
user,
// NOTE: we need this because of logging otherwise logs would not contain the subdomain
http: { req: { authInfo: { getSubdomain: () => subdomain } } },
};
return await cds.tx(tenantContext, async ({ context }) => {
return Promise.allSettled(
product.map(async ([tenantId, event]) => {
const subdomain = await getSubdomainForTenantId(tenantId);
const user = new cds.User.Privileged(config.userId);
const tenantContext = {
tenant: tenantId,
user,
// NOTE: we need this because of logging otherwise logs would not contain the subdomain
http: { req: { authInfo: { getSubdomain: () => subdomain } } },
};
const label = `${event.type}_${event.subType}`;
return await WorkerQueue.instance.addToQueue(event.load, label, async () => {
return await cds.tx(tenantContext, async ({ context }) => {
try {
const lockId = `${runId}_${label}`;
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {
return;
}
await runEventCombinationForTenant(context, event.type, event.subType, true);
} catch (err) {
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {
tenantId,
});
}
});
});
})
);
};
const _executePeriodicEventsAllTenants = async (tenantIds, runId) => {
return await Promise.allSettled(
tenantIds.map(async (tenantId) => {
const label = `UPDATE_PERIODIC_EVENTS_${tenantId}`;
return await WorkerQueue.instance.addToQueue(1, label, async () => {
try {
const lockId = `${runId}_${label}`;
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
const subdomain = await getSubdomainForTenantId(tenantId);
const user = new cds.User.Privileged(config.userId);
const tenantContext = {
tenant: tenantId,
user,
// NOTE: we need this because of logging otherwise logs would not contain the subdomain
http: { req: { authInfo: { getSubdomain: () => subdomain } } },
};
return await cds.tx(tenantContext, async ({ context }) => {
const couldAcquireLock = await distributedLock.acquireLock(context, runId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {
return;
}
await _checkPeriodicEventsSingleTenant(context);
});
if (!couldAcquireLock) {
return;
}
await runEventCombinationForTenant(context, event.type, event.subType, true);
} catch (err) {

@@ -143,62 +178,38 @@ cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {

});
});
});
})
);
};
const _executePeriodicEventsAllTenants = (tenantIds, runId) => {
tenantIds.forEach((tenantId) => {
const label = `UPDATE_PERIODIC_EVENTS_${tenantId}`;
WorkerQueue.instance.addToQueue(1, label, async () => {
try {
const subdomain = await getSubdomainForTenantId(tenantId);
const user = new cds.User.Privileged(config.userId);
const tenantContext = {
tenant: tenantId,
user,
// NOTE: we need this because of logging otherwise logs would not contain the subdomain
http: { req: { authInfo: { getSubdomain: () => subdomain } } },
};
const _singleTenantDb = async (tenantId) => {
return Promise.allSettled(
eventQueueConfig.allEvents.map(async (event) => {
const label = `${event.type}_${event.subType}`;
const user = new cds.User.Privileged(config.userId);
const tenantContext = {
tenant: tenantId,
user,
};
return await WorkerQueue.instance.addToQueue(event.load, label, async () => {
return await cds.tx(tenantContext, async ({ context }) => {
const couldAcquireLock = await distributedLock.acquireLock(context, runId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {
return;
try {
const lockId = `${EVENT_QUEUE_RUN_ID}_${label}`;
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {
return;
}
await runEventCombinationForTenant(context, event.type, event.subType, true);
} catch (err) {
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {
tenantId,
redisEnabled: eventQueueConfig.redisEnabled,
});
}
await _checkPeriodicEventsSingleTenant(context);
});
} catch (err) {
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {
tenantId,
});
}
});
});
});
})
);
};
const _singleTenantDb = async (tenantId) => {
return eventQueueConfig.allEvents.map((event) => {
const label = `${event.type}_${event.subType}`;
return WorkerQueue.instance.addToQueue(event.load, label, async () => {
try {
const context = new cds.EventContext({ tenant: tenantId });
const lockId = `${EVENT_QUEUE_RUN_ID}_${label}`;
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, {
expiryTime: eventQueueConfig.runInterval * 0.95,
});
if (!couldAcquireLock) {
return;
}
await runEventCombinationForTenant(context, event.type, event.subType, true);
} catch (err) {
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", {
tenantId,
redisEnabled: eventQueueConfig.redisEnabled,
});
}
});
});
};
const _acquireRunId = async (context) => {

@@ -289,4 +300,4 @@ let runId = randomUUID();

const tenantIds = await cdsHelper.getAllTenantIds();
_checkAndTriggerPeriodicEventUpdate(tenantIds);
return _executeEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_ID);
await _checkPeriodicEventUpdate(tenantIds);
return await _executeEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_ID);
} catch (err) {

@@ -297,8 +308,8 @@ logger.error("Couldn't fetch tenant ids for event queue processing! Next try after defined interval.", err);

const _multiTenancyPeriodicEvents = async () => {
const _multiTenancyPeriodicEvents = async (tenantIds) => {
const logger = cds.log(COMPONENT_NAME);
try {
logger.info("executing event queue update periodic events");
const tenantIds = await cdsHelper.getAllTenantIds();
_executePeriodicEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_PERIODIC_EVENT);
tenantIds = tenantIds ?? (await cdsHelper.getAllTenantIds());
return await _executePeriodicEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_PERIODIC_EVENT);
} catch (err) {

@@ -305,0 +316,0 @@ logger.error("Couldn't fetch tenant ids for updating periodic event processing!", err);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc