@cap-js-community/event-queue
Advanced tools
Comparing version 1.2.1 to 1.2.2
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.2.1", | ||
"version": "1.2.2", | ||
"description": "An event queue that enables secure transactional processing of asynchronous events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -59,2 +59,3 @@ "use strict"; | ||
#userId; | ||
#enableTxConsistencyCheck; | ||
static #instance; | ||
@@ -466,2 +467,10 @@ constructor() { | ||
set enableTxConsistencyCheck(value) { | ||
this.#enableTxConsistencyCheck = value; | ||
} | ||
get enableTxConsistencyCheck() { | ||
return this.#enableTxConsistencyCheck; | ||
} | ||
get isMultiTenancy() { | ||
@@ -468,0 +477,0 @@ return !!cds.requires.multitenancy; |
@@ -20,2 +20,3 @@ "use strict"; | ||
LOAD_HIGHER_THAN_LIMIT: "LOAD_HIGHER_THAN_LIMIT", | ||
SCHEMA_TENANT_MISMATCH: "SCHEMA_TENANT_MISMATCH", | ||
}; | ||
@@ -67,2 +68,5 @@ | ||
}, | ||
[ERROR_CODES.SCHEMA_TENANT_MISMATCH]: { | ||
message: "The db client associated to the tenant context does not match! Processing will be skipped.", | ||
}, | ||
}; | ||
@@ -225,4 +229,15 @@ | ||
} | ||
static dbClientSchemaMismatch(tenantId, dbClientSchema, serviceManagerSchema) { | ||
const { message } = ERROR_CODES_META[ERROR_CODES.SCHEMA_TENANT_MISMATCH]; | ||
return new EventQueueError( | ||
{ | ||
name: ERROR_CODES.SCHEMA_TENANT_MISMATCH, | ||
info: { tenantId, dbClientSchema, serviceManagerSchema }, | ||
}, | ||
message | ||
); | ||
} | ||
} | ||
module.exports = EventQueueError; |
@@ -535,2 +535,3 @@ "use strict"; | ||
await executeInNewTransaction(this.__baseContext, "eventQueue-getQueueEntriesAndSetToInProgress", async (tx) => { | ||
await this.checkTxConsistency(tx); | ||
const entries = await tx.run( | ||
@@ -662,2 +663,42 @@ SELECT.from(this.#config.tableNameEventQueue) | ||
async checkTxConsistency(tx) { | ||
if (!this.#config.enableTxConsistencyCheck) { | ||
return; | ||
} | ||
const errorHandler = (err) => | ||
this.logger.error("tx consistency check failed!", err, { | ||
type: this.eventType, | ||
subType: this.eventSubType, | ||
txTenant: tx.context.tenant, | ||
globalCdsTenant: cds.context.tenant, | ||
}); | ||
let txSchema, serviceManagerSchema; | ||
try { | ||
const mtxServiceManager = require("@sap/cds-mtxs/srv/plugins/hana/srv-mgr"); | ||
const schemaPromise = tx.run("SELECT CURRENT_SCHEMA FROM DUMMY"); | ||
const serviceManagerBindingsPromise = mtxServiceManager.getAll(); | ||
const [schema, serviceManagerBindings] = await Promise.allSettled([schemaPromise, serviceManagerBindingsPromise]); | ||
if (schema.reason) { | ||
errorHandler(schema.reason); | ||
return; | ||
} | ||
if (serviceManagerBindings.reason) { | ||
errorHandler(schema.reason); | ||
return; | ||
} | ||
txSchema = schema.value[0].CURRENT_SCHEMA; | ||
serviceManagerSchema = serviceManagerBindings.value.find((t) => t.labels.tenant_id[0] === tx.context.tenant) | ||
.credentials.schema; | ||
} catch (err) { | ||
errorHandler(err); | ||
} | ||
if (txSchema !== serviceManagerSchema) { | ||
const err = EventQueueError.dbClientSchemaMismatch(tx.context.tenant, txSchema, serviceManagerSchema); | ||
errorHandler(err); | ||
throw err; | ||
} | ||
} | ||
async #selectLastSuccessfulPeriodicTimestamp() { | ||
@@ -664,0 +705,0 @@ const entry = await SELECT.one |
"use strict"; | ||
// TODO: how to deal with fatal logs | ||
// TODO: think about situations where isInitialized need to be checked - publishEvent access config which is not initialized | ||
// TODO: add tests for config --> similar to csn check | ||
// TODO: redis client check reconnect strategy | ||
// TODO: for test | ||
// --> deeper look into the functions e.g. getQueueEntriesAndSetToInProgress | ||
// TODO: add test for commit on event level and stuff like that | ||
module.exports = { | ||
@@ -13,0 +7,0 @@ ...require("./initialize"), |
@@ -40,2 +40,3 @@ "use strict"; | ||
["userId", null], | ||
["enableTxConsistencyCheck", false], | ||
]; | ||
@@ -57,2 +58,3 @@ | ||
userId, | ||
enableTxConsistencyCheck, | ||
} = {}) => { | ||
@@ -81,3 +83,4 @@ // TODO: initialize check: | ||
useAsCAPOutbox, | ||
userId | ||
userId, | ||
enableTxConsistencyCheck | ||
); | ||
@@ -84,0 +87,0 @@ |
@@ -37,3 +37,3 @@ "use strict"; | ||
if (baseInstance.isPeriodicEvent) { | ||
return await processPeriodicEvent(baseInstance); | ||
return await processPeriodicEvent(context, baseInstance); | ||
} | ||
@@ -97,2 +97,5 @@ eventConfig.startTime = startTime; | ||
cds.log(COMPONENT_NAME).error("Processing event queue failed with unexpected error.", err, { | ||
tenantId: context?.tenant, | ||
tenantIdBase: baseInstance?.context?.tenant, | ||
globalTenantId: cds.context?.tenant, | ||
eventType, | ||
@@ -120,6 +123,3 @@ eventSubType, | ||
const processPeriodicEvent = async (eventTypeInstance) => { | ||
let queueEntry; | ||
let processNext = true; | ||
const processPeriodicEvent = async (context, eventTypeInstance) => { | ||
const isPeriodicEventBlockedCb = config.isPeriodicEventBlockedCb; | ||
@@ -151,2 +151,4 @@ const params = [eventTypeInstance.eventType, eventTypeInstance.eventSubType, eventTypeInstance.context.tenant]; | ||
try { | ||
let queueEntry; | ||
let processNext = true; | ||
while (processNext) { | ||
@@ -214,2 +216,5 @@ await executeInNewTransaction( | ||
eventSubType: eventTypeInstance?.eventSubType, | ||
tenantId: context?.tenant, | ||
tenantIdBase: eventTypeInstance?.context?.tenant, | ||
globalTenantId: cds.context?.tenant, | ||
}); | ||
@@ -216,0 +221,0 @@ } finally { |
@@ -103,12 +103,18 @@ "use strict"; | ||
if (subdomainCache[tenantId]) { | ||
return subdomainCache[tenantId]; | ||
return await subdomainCache[tenantId]; | ||
} | ||
try { | ||
const ssp = await cds.connect.to("cds.xt.SaasProvisioningService"); | ||
const response = await ssp.get("/tenant", { subscribedTenantId: tenantId }); | ||
subdomainCache[tenantId] = response.subscribedSubdomain; | ||
return response.subscribedSubdomain; | ||
} catch (err) { | ||
return null; | ||
} | ||
subdomainCache[tenantId] = new Promise((resolve) => { | ||
cds.connect | ||
.to("cds.xt.SaasProvisioningService") | ||
.then((ssp) => { | ||
ssp | ||
.get("/tenant", { subscribedTenantId: tenantId }) | ||
.then((response) => { | ||
resolve(response.subscribedSubdomain); | ||
}) | ||
.catch(() => resolve(null)); | ||
}) | ||
.catch(() => resolve(null)); | ||
}); | ||
return await subdomainCache[tenantId]; | ||
}; | ||
@@ -115,0 +121,0 @@ |
@@ -73,13 +73,12 @@ "use strict"; | ||
const subscribeRedisChannel = (channel, subscribeCb) => { | ||
const subscribeRedisChannel = (channel, subscribeHandler) => { | ||
const errorHandlerCreateClient = (err) => { | ||
cds.log(COMPONENT_NAME).error(`error from redis client for pub/sub failed for channel ${channel}`, err); | ||
subscriberChannelClientPromise[channel] = null; | ||
setTimeout(() => subscribeRedisChannel(channel, subscribeCb), 5 * 1000).unref(); | ||
setTimeout(() => subscribeRedisChannel(channel, subscribeHandler), 5 * 1000).unref(); | ||
}; | ||
subscriberChannelClientPromise[channel] = createClientAndConnect(errorHandlerCreateClient); | ||
subscriberChannelClientPromise[channel] | ||
subscriberChannelClientPromise[channel] = createClientAndConnect(errorHandlerCreateClient) | ||
.then((client) => { | ||
cds.log(COMPONENT_NAME).info("subscribe redis client connected channel", { channel }); | ||
client.subscribe(channel, subscribeCb).catch(errorHandlerCreateClient); | ||
client.subscribe(channel, subscribeHandler).catch(errorHandlerCreateClient); | ||
}) | ||
@@ -86,0 +85,0 @@ .catch((err) => { |
153157
3724