@cap-js-community/event-queue
Advanced tools
Comparing version 1.3.5 to 1.3.6
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.3.5", | ||
"version": "1.3.6", | ||
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -71,15 +71,19 @@ "use strict"; | ||
const logger = cds.log(COMPONENT_NAME); | ||
const emptyContext = new cds.EventContext({}); | ||
logger.info("executing event queue run for multi instance and tenant"); | ||
const tenantIds = await cdsHelper.getAllTenantIds(); | ||
await _checkPeriodicEventUpdate(tenantIds); | ||
try { | ||
const emptyContext = new cds.EventContext({}); | ||
logger.info("executing event queue run for multi instance and tenant"); | ||
const tenantIds = await cdsHelper.getAllTenantIds(); | ||
await _checkPeriodicEventUpdate(tenantIds); | ||
const runId = await _acquireRunId(emptyContext); | ||
const runId = await _acquireRunId(emptyContext); | ||
if (!runId) { | ||
logger.error("could not acquire runId, skip processing events!"); | ||
return; | ||
if (!runId) { | ||
logger.error("could not acquire runId, skip processing events!"); | ||
return; | ||
} | ||
return await _executeEventsAllTenants(tenantIds, runId); | ||
} catch (err) { | ||
logger.info("executing event queue run for multi instance and tenant failed", err); | ||
} | ||
return await _executeEventsAllTenants(tenantIds, runId); | ||
}; | ||
@@ -86,0 +90,0 @@ |
@@ -9,5 +9,7 @@ "use strict"; | ||
const COMPONENT_NAME = "/eventQueue/shared/redis"; | ||
const LOG_AFTER_SEC = 5; | ||
let mainClientPromise; | ||
const subscriberChannelClientPromise = {}; | ||
let lastErrorLog = Date.now(); | ||
@@ -22,4 +24,5 @@ const createMainClientAndConnect = () => { | ||
mainClientPromise = null; | ||
setTimeout(createMainClientAndConnect, 5 * 1000).unref(); | ||
setTimeout(createMainClientAndConnect, LOG_AFTER_SEC * 1000).unref(); | ||
}; | ||
mainClientPromise = createClientAndConnect(errorHandlerCreateClient); | ||
@@ -52,17 +55,24 @@ return mainClientPromise; | ||
const createClientAndConnect = async (errorHandlerCreateClient) => { | ||
let client = null; | ||
try { | ||
client = _createClientBase(); | ||
} catch (err) { | ||
throw EventQueueError.redisConnectionFailure(err); | ||
} | ||
const client = _createClientBase(); | ||
await client.connect(); | ||
client.on("error", (err) => { | ||
const dateNow = Date.now(); | ||
if (dateNow - lastErrorLog > LOG_AFTER_SEC * 1000) { | ||
cds.log(COMPONENT_NAME).error("error from redis client for pub/sub failed", err); | ||
lastErrorLog = dateNow; | ||
} | ||
}); | ||
client.on("error", errorHandlerCreateClient); | ||
try { | ||
await client.connect(); | ||
client.on("reconnecting", () => { | ||
const dateNow = Date.now(); | ||
if (dateNow - lastErrorLog > LOG_AFTER_SEC * 1000) { | ||
cds.log(COMPONENT_NAME).info("redis client trying reconnect..."); | ||
lastErrorLog = dateNow; | ||
} | ||
}); | ||
return client; | ||
} catch (err) { | ||
errorHandlerCreateClient(err); | ||
} | ||
return client; | ||
}; | ||
@@ -74,4 +84,5 @@ | ||
subscriberChannelClientPromise[channel] = null; | ||
setTimeout(() => subscribeRedisChannel(channel, subscribeHandler), 5 * 1000).unref(); | ||
setTimeout(() => subscribeRedisChannel(channel, subscribeHandler), LOG_AFTER_SEC * 1000).unref(); | ||
}; | ||
subscriberChannelClientPromise[channel] = createClientAndConnect(errorHandlerCreateClient) | ||
@@ -78,0 +89,0 @@ .then((client) => { |
163117
4014