@cap-js-community/event-queue
Advanced tools
Comparing version 1.2.5 to 1.2.6
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.2.5", | ||
"version": "1.2.6", | ||
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -21,2 +21,3 @@ "use strict"; | ||
SCHEMA_TENANT_MISMATCH: "SCHEMA_TENANT_MISMATCH", | ||
GLOBAL_CDS_CONTEXT_MISSMATCH: "GLOBAL_CDS_CONTEXT_MISSMATCH", | ||
}; | ||
@@ -71,2 +72,5 @@ | ||
}, | ||
[ERROR_CODES.GLOBAL_CDS_CONTEXT_MISSMATCH]: { | ||
message: "The global cds context does not match the local cds context.", | ||
}, | ||
}; | ||
@@ -240,4 +244,15 @@ | ||
} | ||
static globalCdsContextNotMatchingLocal(globalProperties, localProperties) { | ||
const { message } = ERROR_CODES_META[ERROR_CODES.GLOBAL_CDS_CONTEXT_MISSMATCH]; | ||
return new EventQueueError( | ||
{ | ||
name: ERROR_CODES.GLOBAL_CDS_CONTEXT_MISSMATCH, | ||
info: { globalProperties, localProperties }, | ||
}, | ||
message | ||
); | ||
} | ||
} | ||
module.exports = EventQueueError; |
@@ -73,2 +73,4 @@ "use strict"; | ||
this.__queueEntries = []; | ||
this.#checkGlobalContextToLocalContext(); | ||
} | ||
@@ -537,3 +539,5 @@ | ||
const refDateStartAfter = new Date(Date.now() + this.#config.runInterval * 1.2); | ||
this.#checkGlobalContextToLocalContext(); | ||
await executeInNewTransaction(this.__baseContext, "eventQueue-getQueueEntriesAndSetToInProgress", async (tx) => { | ||
this.#checkGlobalContextToLocalContext(); | ||
await this.checkTxConsistency(tx); | ||
@@ -697,3 +701,3 @@ const entries = await tx.run( | ||
} | ||
if (txSchema !== serviceManagerSchema) { | ||
if (serviceManagerSchema && txSchema !== serviceManagerSchema) { | ||
const err = EventQueueError.dbClientSchemaMismatch(tx.context.tenant, txSchema, serviceManagerSchema); | ||
@@ -730,2 +734,44 @@ errorHandler(err); | ||
#checkGlobalContextToLocalContext() { | ||
if (!this.#config.enableTxConsistencyCheck) { | ||
return; | ||
} | ||
if (this.__context.tenant !== cds.context.tenant) { | ||
throw EventQueueError.globalCdsContextNotMatchingLocal( | ||
JSON.stringify( | ||
{ | ||
correlationId: cds.context.id, | ||
tenantId: cds.context.tenant, | ||
timestamp: cds.context.timestamp, | ||
base: JSON.stringify( | ||
{ | ||
correlationId: cds.context.context?.id, | ||
tenantId: cds.context.context?.tenant, | ||
timestamp: cds.context.context?.timestamp, | ||
}, | ||
null, | ||
2 | ||
), | ||
}, | ||
null, | ||
2 | ||
), | ||
JSON.stringify( | ||
{ | ||
correlationId: this.__context.id, | ||
tenantId: this.__context.tenant, | ||
timestamp: this.__context.timestamp, | ||
base: JSON.stringify({ | ||
correlationId: this.__context.context?.id, | ||
tenantId: this.__context.context?.tenant, | ||
timestamp: this.__context.context?.timestamp, | ||
}), | ||
}, | ||
null, | ||
2 | ||
) | ||
); | ||
} | ||
} | ||
#handleDelayedEvents(delayedEvents) { | ||
@@ -732,0 +778,0 @@ for (const delayedEvent of delayedEvents) { |
@@ -29,5 +29,5 @@ "use strict"; | ||
const multiTenancyDb = () => _scheduleFunction(_multiTenancyPeriodicEvents, _multiTenancyDb); | ||
const multiTenancyDb = () => _scheduleFunction(async () => {}, _multiTenancyDb); | ||
const multiTenancyRedis = () => _scheduleFunction(_multiTenancyPeriodicEvents, _multiTenancyRedis); | ||
const multiTenancyRedis = () => _scheduleFunction(async () => {}, _multiTenancyRedis); | ||
@@ -73,3 +73,3 @@ const _scheduleFunction = async (singleRunFn, periodicFn) => { | ||
const tenantIds = await cdsHelper.getAllTenantIds(); | ||
_checkAndTriggerPeriodicEventUpdate(tenantIds); | ||
await _checkPeriodicEventUpdate(tenantIds); | ||
@@ -83,13 +83,12 @@ const runId = await _acquireRunId(emptyContext); | ||
return _executeEventsAllTenants(tenantIds, runId); | ||
return await _executeEventsAllTenants(tenantIds, runId); | ||
}; | ||
const _checkAndTriggerPeriodicEventUpdate = (tenantIds) => { | ||
const _checkPeriodicEventUpdate = async (tenantIds) => { | ||
const hash = hashStringTo32Bit(JSON.stringify(tenantIds)); | ||
if (!tenantIdHash) { | ||
tenantIdHash = hash; | ||
_multiTenancyPeriodicEvents().catch((err) => { | ||
return await _multiTenancyPeriodicEvents(tenantIds).catch((err) => { | ||
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events!", err); | ||
}); | ||
return; | ||
} | ||
@@ -99,3 +98,3 @@ if (tenantIdHash && tenantIdHash !== hash) { | ||
cds.log(COMPONENT_NAME).info("tenant id hash changed, triggering updating periodic events!"); | ||
_multiTenancyPeriodicEvents().catch((err) => { | ||
return await _multiTenancyPeriodicEvents(tenantIds).catch((err) => { | ||
cds.log(COMPONENT_NAME).error("Error during triggering updating periodic events!", err); | ||
@@ -115,23 +114,59 @@ }); | ||
return product.map(async ([tenantId, event]) => { | ||
const subdomain = await getSubdomainForTenantId(tenantId); | ||
const user = new cds.User.Privileged(config.userId); | ||
const tenantContext = { | ||
tenant: tenantId, | ||
user, | ||
// NOTE: we need this because of logging otherwise logs would not contain the subdomain | ||
http: { req: { authInfo: { getSubdomain: () => subdomain } } }, | ||
}; | ||
return await cds.tx(tenantContext, async ({ context }) => { | ||
return Promise.allSettled( | ||
product.map(async ([tenantId, event]) => { | ||
const subdomain = await getSubdomainForTenantId(tenantId); | ||
const user = new cds.User.Privileged(config.userId); | ||
const tenantContext = { | ||
tenant: tenantId, | ||
user, | ||
// NOTE: we need this because of logging otherwise logs would not contain the subdomain | ||
http: { req: { authInfo: { getSubdomain: () => subdomain } } }, | ||
}; | ||
const label = `${event.type}_${event.subType}`; | ||
return await WorkerQueue.instance.addToQueue(event.load, label, async () => { | ||
return await cds.tx(tenantContext, async ({ context }) => { | ||
try { | ||
const lockId = `${runId}_${label}`; | ||
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
} | ||
await runEventCombinationForTenant(context, event.type, event.subType, true); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
tenantId, | ||
}); | ||
} | ||
}); | ||
}); | ||
}) | ||
); | ||
}; | ||
const _executePeriodicEventsAllTenants = async (tenantIds, runId) => { | ||
return await Promise.allSettled( | ||
tenantIds.map(async (tenantId) => { | ||
const label = `UPDATE_PERIODIC_EVENTS_${tenantId}`; | ||
return await WorkerQueue.instance.addToQueue(1, label, async () => { | ||
try { | ||
const lockId = `${runId}_${label}`; | ||
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
const subdomain = await getSubdomainForTenantId(tenantId); | ||
const user = new cds.User.Privileged(config.userId); | ||
const tenantContext = { | ||
tenant: tenantId, | ||
user, | ||
// NOTE: we need this because of logging otherwise logs would not contain the subdomain | ||
http: { req: { authInfo: { getSubdomain: () => subdomain } } }, | ||
}; | ||
return await cds.tx(tenantContext, async ({ context }) => { | ||
const couldAcquireLock = await distributedLock.acquireLock(context, runId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
} | ||
await _checkPeriodicEventsSingleTenant(context); | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
} | ||
await runEventCombinationForTenant(context, event.type, event.subType, true); | ||
} catch (err) { | ||
@@ -143,62 +178,38 @@ cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
}); | ||
}); | ||
}); | ||
}) | ||
); | ||
}; | ||
const _executePeriodicEventsAllTenants = (tenantIds, runId) => { | ||
tenantIds.forEach((tenantId) => { | ||
const label = `UPDATE_PERIODIC_EVENTS_${tenantId}`; | ||
WorkerQueue.instance.addToQueue(1, label, async () => { | ||
try { | ||
const subdomain = await getSubdomainForTenantId(tenantId); | ||
const user = new cds.User.Privileged(config.userId); | ||
const tenantContext = { | ||
tenant: tenantId, | ||
user, | ||
// NOTE: we need this because of logging otherwise logs would not contain the subdomain | ||
http: { req: { authInfo: { getSubdomain: () => subdomain } } }, | ||
}; | ||
const _singleTenantDb = async (tenantId) => { | ||
return Promise.allSettled( | ||
eventQueueConfig.allEvents.map(async (event) => { | ||
const label = `${event.type}_${event.subType}`; | ||
const user = new cds.User.Privileged(config.userId); | ||
const tenantContext = { | ||
tenant: tenantId, | ||
user, | ||
}; | ||
return await WorkerQueue.instance.addToQueue(event.load, label, async () => { | ||
return await cds.tx(tenantContext, async ({ context }) => { | ||
const couldAcquireLock = await distributedLock.acquireLock(context, runId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
try { | ||
const lockId = `${EVENT_QUEUE_RUN_ID}_${label}`; | ||
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
} | ||
await runEventCombinationForTenant(context, event.type, event.subType, true); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
tenantId, | ||
redisEnabled: eventQueueConfig.redisEnabled, | ||
}); | ||
} | ||
await _checkPeriodicEventsSingleTenant(context); | ||
}); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
tenantId, | ||
}); | ||
} | ||
}); | ||
}); | ||
}); | ||
}) | ||
); | ||
}; | ||
const _singleTenantDb = async (tenantId) => { | ||
return eventQueueConfig.allEvents.map((event) => { | ||
const label = `${event.type}_${event.subType}`; | ||
return WorkerQueue.instance.addToQueue(event.load, label, async () => { | ||
try { | ||
const context = new cds.EventContext({ tenant: tenantId }); | ||
const lockId = `${EVENT_QUEUE_RUN_ID}_${label}`; | ||
const couldAcquireLock = await distributedLock.acquireLock(context, lockId, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
} | ||
await runEventCombinationForTenant(context, event.type, event.subType, true); | ||
} catch (err) { | ||
cds.log(COMPONENT_NAME).error("executing event-queue run for tenant failed", { | ||
tenantId, | ||
redisEnabled: eventQueueConfig.redisEnabled, | ||
}); | ||
} | ||
}); | ||
}); | ||
}; | ||
const _acquireRunId = async (context) => { | ||
@@ -289,4 +300,4 @@ let runId = randomUUID(); | ||
const tenantIds = await cdsHelper.getAllTenantIds(); | ||
_checkAndTriggerPeriodicEventUpdate(tenantIds); | ||
return _executeEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_ID); | ||
await _checkPeriodicEventUpdate(tenantIds); | ||
return await _executeEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_ID); | ||
} catch (err) { | ||
@@ -297,8 +308,8 @@ logger.error("Couldn't fetch tenant ids for event queue processing! Next try after defined interval.", err); | ||
const _multiTenancyPeriodicEvents = async () => { | ||
const _multiTenancyPeriodicEvents = async (tenantIds) => { | ||
const logger = cds.log(COMPONENT_NAME); | ||
try { | ||
logger.info("executing event queue update periodic events"); | ||
const tenantIds = await cdsHelper.getAllTenantIds(); | ||
_executePeriodicEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_PERIODIC_EVENT); | ||
tenantIds = tenantIds ?? (await cdsHelper.getAllTenantIds()); | ||
return await _executePeriodicEventsAllTenants(tenantIds, EVENT_QUEUE_RUN_PERIODIC_EVENT); | ||
} catch (err) { | ||
@@ -305,0 +316,0 @@ logger.error("Couldn't fetch tenant ids for updating periodic event processing!", err); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
157202
3843