@cap-js-community/event-queue
Advanced tools
Comparing version 1.7.2 to 1.7.3
@@ -7,2 +7,3 @@ "use strict"; | ||
const eventQueue = require("./src"); | ||
const EventQueueError = require("./src/EventQueueError"); | ||
const COMPONENT_NAME = "/eventQueue/plugin"; | ||
@@ -22,3 +23,8 @@ const SERVE_COMMAND = "serve"; | ||
if (Object.keys(cds.env.eventQueue ?? {}).length) { | ||
module.exports = eventQueue.initialize().catch((err) => cds.log(COMPONENT_NAME).error(err)); | ||
module.exports = eventQueue.initialize().catch((err) => { | ||
if (EventQueueError.isRedisConnectionFailure(err) && eventQueue.config.crashOnRedisUnavailable) { | ||
throw err; | ||
} | ||
cds.log(COMPONENT_NAME).error(err); | ||
}); | ||
} |
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.7.2", | ||
"version": "1.7.3", | ||
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -80,2 +80,3 @@ "use strict"; | ||
#publishEventBlockList; | ||
#crashOnRedisUnavailable; | ||
static #instance; | ||
@@ -514,2 +515,10 @@ constructor() { | ||
get crashOnRedisUnavailable() { | ||
return this.#crashOnRedisUnavailable; | ||
} | ||
set crashOnRedisUnavailable(value) { | ||
this.#crashOnRedisUnavailable = value; | ||
} | ||
set globalTxTimeout(value) { | ||
@@ -516,0 +525,0 @@ this.#globalTxTimeout = value; |
@@ -44,3 +44,3 @@ "use strict"; | ||
[ERROR_CODES.REDIS_LOCAL_NO_RECONNECT]: { | ||
message: "disabled reconnect, because we are not running on cloud foundry", | ||
message: "disabled reconnect, because not running on cloud foundry", | ||
}, | ||
@@ -139,3 +139,3 @@ [ERROR_CODES.MISSING_TABLE_DEFINITION]: { | ||
name: ERROR_CODES.REDIS_CREATE_CLIENT, | ||
cause: err, | ||
...(err && { cause: err }), | ||
}, | ||
@@ -330,4 +330,8 @@ message | ||
} | ||
static isRedisConnectionFailure(err) { | ||
return err instanceof VError && err.name === ERROR_CODES.REDIS_CREATE_CLIENT; | ||
} | ||
} | ||
module.exports = EventQueueError; |
@@ -185,2 +185,4 @@ import * as cds from "@sap/cds"; | ||
checkRedisEnabled(): boolean; | ||
publishEventBlockList(): boolean; | ||
crashOnRedisUnavailable(): boolean; | ||
attachConfigChangeHandler(): void; | ||
@@ -187,0 +189,0 @@ attachRedisUnsubscribeHandler(): void; |
@@ -19,2 +19,3 @@ "use strict"; | ||
const distributedLock = require("./shared/distributedLock"); | ||
const EventQueueError = require("./EventQueueError"); | ||
@@ -43,2 +44,3 @@ const readFileAsync = promisify(fs.readFile); | ||
["publishEventBlockList", true], | ||
["crashOnRedisUnavailable", false], | ||
]; | ||
@@ -66,2 +68,3 @@ | ||
* @param {string} [options.publishEventBlockList=true] - If redis is available event blocklist is distributed to all application instances | ||
* @param {string} [options.crashOnRedisUnavailable=true] - If enabled an error is thrown if the redis connection check is not successful | ||
*/ | ||
@@ -95,2 +98,5 @@ const initialize = async (options = {}) => { | ||
config.redisEnabled = await redis.connectionCheck(config.redisOptions); | ||
if (!config.redisEnabled && config.crashOnRedisUnavailable) { | ||
throw EventQueueError.redisConnectionFailure(); | ||
} | ||
} | ||
@@ -146,13 +152,17 @@ config.fileContent = await readConfigFromFile(config.configFilePath); | ||
if (!config.isMultiTenancy) { | ||
runner.singleTenant().catch(errorHandler); | ||
if (config.redisEnabled) { | ||
initEventQueueRedisSubscribe(); | ||
config.attachConfigChangeHandler(); | ||
if (config.isMultiTenancy) { | ||
runner.multiTenancyRedis().catch(errorHandler); | ||
} else { | ||
runner.singleTenantRedis().catch(errorHandler); | ||
} | ||
return; | ||
} | ||
if (config.redisEnabled) { | ||
initEventQueueRedisSubscribe(); | ||
config.attachConfigChangeHandler(); | ||
runner.multiTenancyRedis().catch(errorHandler); | ||
if (config.isMultiTenancy) { | ||
runner.multiTenancyDb().catch(errorHandler); | ||
} else { | ||
runner.multiTenancyDb().catch(errorHandler); | ||
runner.singleTenantDb().catch(errorHandler); | ||
} | ||
@@ -159,0 +169,0 @@ }; |
@@ -30,3 +30,3 @@ "use strict"; | ||
const singleTenant = () => _scheduleFunction(_checkPeriodicEventsSingleTenantOneTime, _singleTenantDb); | ||
const singleTenantDb = () => _scheduleFunction(_checkPeriodicEventsSingleTenantOneTime, _singleTenantDb); | ||
@@ -37,2 +37,4 @@ const multiTenancyDb = () => _scheduleFunction(async () => {}, _multiTenancyDb); | ||
const singleTenantRedis = () => _scheduleFunction(_checkPeriodicEventsSingleTenantOneTime, _singleTenantRedis); | ||
const _scheduleFunction = async (singleRunFn, periodicFn) => { | ||
@@ -305,2 +307,50 @@ const logger = cds.log(COMPONENT_NAME); | ||
const _singleTenantRedis = async () => { | ||
const id = cds.utils.uuid(); | ||
const logger = cds.log(COMPONENT_NAME); | ||
try { | ||
// NOTE: do checks for open events on one app instance distribute from this instance to all others | ||
const dummyContext = new cds.EventContext({}); | ||
const couldAcquireLock = await trace( | ||
dummyContext, | ||
"acquire-lock-master-runner", | ||
async () => { | ||
return await distributedLock.acquireLock(dummyContext, EVENT_QUEUE_RUN_REDIS_CHECK, { | ||
expiryTime: eventQueueConfig.runInterval * 0.95, | ||
tenantScoped: false, | ||
}); | ||
}, | ||
{ newRootSpan: true } | ||
); | ||
if (!couldAcquireLock) { | ||
return; | ||
} | ||
await trace( | ||
{ id }, | ||
"get-openEvents-and-publish", | ||
async () => { | ||
return await cds.tx({}, async (tx) => { | ||
const entries = await openEvents.getOpenQueueEntries(tx); | ||
logger.info("broadcasting events for run", { | ||
entries: entries.length, | ||
}); | ||
if (!entries.length) { | ||
return; | ||
} | ||
// Do not wait until this is finished - as broadcastEvent has a retry mechanism and can delay this loop | ||
redisPub.broadcastEvent(null, entries).catch((err) => { | ||
logger.error("broadcasting event failed", err, { | ||
entries: entries.length, | ||
}); | ||
}); | ||
}); | ||
}, | ||
{ newRootSpan: true } | ||
); | ||
} catch (err) { | ||
logger.info("executing event queue run for single tenant via redis", err); | ||
} | ||
}; | ||
const _acquireRunId = async (context) => { | ||
@@ -407,6 +457,30 @@ let runId = randomUUID(); | ||
const _checkPeriodicEventsSingleTenantOneTime = async () => | ||
eventQueueConfig.updatePeriodicEvents && | ||
cds.tx({}, async (tx) => await periodicEvents.checkAndInsertPeriodicEvents(tx.context)); | ||
const _checkPeriodicEventsSingleTenantOneTime = async () => { | ||
const logger = cds.log(COMPONENT_NAME); | ||
if (!eventQueueConfig.updatePeriodicEvents || !eventQueueConfig.periodicEvents.length) { | ||
logger.info("updating of periodic events is disabled or no periodic events configured", { | ||
updateEnabled: eventQueueConfig.updatePeriodicEvents, | ||
events: eventQueueConfig.periodicEvents.length, | ||
}); | ||
return; | ||
} | ||
const dummyContext = new cds.EventContext({}); | ||
return await trace( | ||
dummyContext, | ||
"update-periodic-events", | ||
async () => { | ||
const couldAcquireLock = await distributedLock.acquireLock(dummyContext, EVENT_QUEUE_UPDATE_PERIODIC_EVENTS, { | ||
expiryTime: 60 * 1000, | ||
tenantScoped: false, | ||
}); | ||
if (!couldAcquireLock) { | ||
return; | ||
} | ||
return await cds.tx({}, async (tx) => await periodicEvents.checkAndInsertPeriodicEvents(tx.context)); | ||
}, | ||
{ newRootSpan: true } | ||
); | ||
}; | ||
const _checkPeriodicEventsSingleTenant = async (context) => { | ||
@@ -436,8 +510,10 @@ const logger = cds.log(COMPONENT_NAME); | ||
module.exports = { | ||
singleTenant, | ||
singleTenantDb, | ||
multiTenancyDb, | ||
multiTenancyRedis, | ||
singleTenantRedis, | ||
__: { | ||
_singleTenantDb, | ||
_multiTenancyRedis, | ||
_singleTenantRedis, | ||
_multiTenancyDb, | ||
@@ -444,0 +520,0 @@ _calculateOffsetForFirstRun, |
@@ -90,2 +90,6 @@ "use strict"; | ||
const getAuthInfo = async (tenantId) => { | ||
if (!tenantId) { | ||
return null; | ||
} | ||
if (!cds.requires?.auth?.credentials) { | ||
@@ -92,0 +96,0 @@ return null; // no credentials not authInfo |
@@ -53,21 +53,23 @@ "use strict"; | ||
const createClientAndConnect = async (options, errorHandlerCreateClient) => { | ||
const createClientAndConnect = async (options, errorHandlerCreateClient, isConnectionCheck) => { | ||
try { | ||
const client = _createClientBase(options); | ||
if (!isConnectionCheck) { | ||
client.on("error", (err) => { | ||
const dateNow = Date.now(); | ||
if (dateNow - lastErrorLog > LOG_AFTER_SEC * 1000) { | ||
cds.log(COMPONENT_NAME).error("error from redis client for pub/sub failed", err); | ||
lastErrorLog = dateNow; | ||
} | ||
}); | ||
client.on("reconnecting", () => { | ||
const dateNow = Date.now(); | ||
if (dateNow - lastErrorLog > LOG_AFTER_SEC * 1000) { | ||
cds.log(COMPONENT_NAME).info("redis client trying reconnect..."); | ||
lastErrorLog = dateNow; | ||
} | ||
}); | ||
} | ||
await client.connect(); | ||
client.on("error", (err) => { | ||
const dateNow = Date.now(); | ||
if (dateNow - lastErrorLog > LOG_AFTER_SEC * 1000) { | ||
cds.log(COMPONENT_NAME).error("error from redis client for pub/sub failed", err); | ||
lastErrorLog = dateNow; | ||
} | ||
}); | ||
client.on("reconnecting", () => { | ||
const dateNow = Date.now(); | ||
if (dateNow - lastErrorLog > LOG_AFTER_SEC * 1000) { | ||
cds.log(COMPONENT_NAME).info("redis client trying reconnect..."); | ||
lastErrorLog = dateNow; | ||
} | ||
}); | ||
return client; | ||
@@ -123,3 +125,3 @@ } catch (err) { | ||
return new Promise((resolve, reject) => { | ||
createClientAndConnect(options, reject) | ||
createClientAndConnect(options, reject, true) | ||
.then((client) => { | ||
@@ -136,3 +138,6 @@ if (client) { | ||
.then(() => true) | ||
.catch(() => false); | ||
.catch((err) => { | ||
cds.log(COMPONENT_NAME).error("Redis connection check failed! Falling back to NO_REDIS mode", err); | ||
return false; | ||
}); | ||
}; | ||
@@ -139,0 +144,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
202772
5048