@cap-js-community/event-queue
Advanced tools
Comparing version 1.3.3 to 1.3.4
{ | ||
"name": "@cap-js-community/event-queue", | ||
"version": "1.3.3", | ||
"version": "1.3.4", | ||
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.", | ||
@@ -47,3 +47,3 @@ "main": "src/index.js", | ||
"verror": "1.10.1", | ||
"yaml": "2.4.0" | ||
"yaml": "2.4.1" | ||
}, | ||
@@ -71,2 +71,7 @@ "devDependencies": { | ||
"cds": { | ||
"eventQueue": { | ||
"[production]": { | ||
"disableRedis": false | ||
} | ||
}, | ||
"requires": { | ||
@@ -73,0 +78,0 @@ "event-queue": { |
@@ -102,3 +102,4 @@ "use strict"; | ||
checkRedisEnabled() { | ||
this.#redisEnabled = !this.#disableRedis && this._checkRedisIsBound() && this.#env.isOnCF; | ||
this.#redisEnabled = !this.#disableRedis && this._checkRedisIsBound(); | ||
return this.#redisEnabled; | ||
} | ||
@@ -105,0 +106,0 @@ |
@@ -13,2 +13,3 @@ "use strict"; | ||
const PerformanceTracer = require("./shared/PerformanceTracer"); | ||
const { broadcastEvent } = require("./redisPubSub"); | ||
@@ -1006,7 +1007,7 @@ const IMPLEMENT_ERROR_MESSAGE = "needs to be reimplemented"; | ||
async scheduleNextPeriodEvent(queueEntry) { | ||
const intervalInSec = this.#eventConfig.interval * 1000; | ||
const intervalInMs = this.#eventConfig.interval * 1000; | ||
const newEvent = { | ||
type: this.#eventType, | ||
subType: this.#eventSubType, | ||
startAfter: new Date(new Date(queueEntry.startAfter).getTime() + intervalInSec), | ||
startAfter: new Date(new Date(queueEntry.startAfter).getTime() + intervalInMs), | ||
}; | ||
@@ -1020,3 +1021,4 @@ const { relative } = this.#eventSchedulerInstance.calculateOffset( | ||
// more than one interval behind - shift tick to keep up | ||
if (relative < 0 && Math.abs(relative) >= intervalInSec) { | ||
if (relative < 0 && Math.abs(relative) >= intervalInMs) { | ||
const plannedStartAfter = newEvent.startAfter; | ||
newEvent.startAfter = new Date(Date.now() + 5 * 1000); | ||
@@ -1026,2 +1028,3 @@ this.logger.info("interval adjusted because shifted more than one interval", { | ||
eventSubType: this.#eventSubType, | ||
plannedStartAfter, | ||
newStartAfter: newEvent.startAfter, | ||
@@ -1034,3 +1037,3 @@ }); | ||
this.tx._skipEventQueueBroadcase = false; | ||
if (intervalInSec < this.#config.runInterval * 1.5) { | ||
if (intervalInMs < this.#config.runInterval * 1.5) { | ||
this.#handleDelayedEvents([newEvent]); | ||
@@ -1116,2 +1119,14 @@ const { relative: relativeAfterSchedule } = this.#eventSchedulerInstance.calculateOffset( | ||
broadCastEvent() { | ||
setTimeout(() => { | ||
broadcastEvent(this.__baseContext.tenant, this.#eventType, this.#eventSubType).catch((err) => { | ||
this.logger.error("could not execute scheduled event", err, { | ||
tenantId: this.__baseContext.tenant, | ||
type: this.#eventType, | ||
subType: this.#eventSubType, | ||
}); | ||
}); | ||
}, 1000).unref(); | ||
} | ||
get logger() { | ||
@@ -1118,0 +1133,0 @@ return this.__logger ?? this.__baseLogger; |
@@ -15,3 +15,3 @@ "use strict"; | ||
const { initEventQueueRedisSubscribe, closeSubscribeClient } = require("./redisPubSub"); | ||
const { closeMainClient } = require("./shared/redis"); | ||
const redis = require("./shared/redis"); | ||
const eventQueueAsOutbox = require("./outbox/eventQueueAsOutbox"); | ||
@@ -37,3 +37,3 @@ const { getAllTenantIds } = require("./shared/cdsHelper"); | ||
["tableNameEventLock", BASE_TABLES.LOCK], | ||
["disableRedis", false], | ||
["disableRedis", true], | ||
["skipCsnCheck", false], | ||
@@ -89,3 +89,5 @@ ["updatePeriodicEvents", true], | ||
const logger = cds.log(COMPONENT); | ||
config.checkRedisEnabled(); | ||
const redisEnabled = config.checkRedisEnabled(); | ||
let resolveFn; | ||
let initFinished = new Promise((resolve) => (resolveFn = resolve)); | ||
cds.on("connect", (service) => { | ||
@@ -95,5 +97,8 @@ if (service.name === "db") { | ||
config.cleanupLocksAndEventsForDev && registerCleanupForDevDb().catch(() => {}); | ||
registerEventProcessors(); | ||
initFinished.then(registerEventProcessors); | ||
} | ||
}); | ||
if (redisEnabled) { | ||
config.redisEnabled = await redis.connectionCheck(); | ||
} | ||
config.fileContent = await readConfigFromFile(config.configFilePath); | ||
@@ -112,2 +117,3 @@ | ||
}); | ||
resolveFn(); | ||
}; | ||
@@ -227,3 +233,3 @@ | ||
cds.on("shutdown", async () => { | ||
await Promise.allSettled([closeMainClient(), closeSubscribeClient()]); | ||
await Promise.allSettled([redis.closeMainClient(), closeSubscribeClient()]); | ||
}); | ||
@@ -230,0 +236,0 @@ }; |
@@ -120,2 +120,3 @@ "use strict"; | ||
} | ||
eventTypeInstance.logTimeExceeded(iterationCounter); | ||
@@ -122,0 +123,0 @@ return false; |
@@ -7,8 +7,5 @@ "use strict"; | ||
#isLocal; | ||
#isOnCF; | ||
#vcapServices; | ||
constructor() { | ||
this.#isLocal = process.env.USER !== "vcap"; | ||
this.#isOnCF = !this.#isLocal; | ||
try { | ||
@@ -32,9 +29,2 @@ this.#vcapServices = JSON.parse(process.env.VCAP_SERVICES); | ||
set isOnCF(value) { | ||
this.#isOnCF = value; | ||
} | ||
get isOnCF() { | ||
return this.#isOnCF; | ||
} | ||
set vcapServices(value) { | ||
@@ -41,0 +31,0 @@ this.#vcapServices = value; |
@@ -29,25 +29,19 @@ "use strict"; | ||
const env = getEnvInstance(); | ||
if (env.isOnCF) { | ||
try { | ||
const credentials = env.getRedisCredentialsFromEnv(); | ||
const redisIsCluster = credentials.cluster_mode; | ||
const url = credentials.uri.replace(/(?<=rediss:\/\/)[\w-]+?(?=:)/, ""); | ||
if (redisIsCluster) { | ||
return redis.createCluster({ | ||
rootNodes: [{ url }], | ||
// https://github.com/redis/node-redis/issues/1782 | ||
defaults: { | ||
password: credentials.password, | ||
socket: { tls: credentials.tls }, | ||
}, | ||
}); | ||
} | ||
return redis.createClient({ url }); | ||
} catch (err) { | ||
throw EventQueueError.redisConnectionFailure(err); | ||
try { | ||
const credentials = env.getRedisCredentialsFromEnv(); | ||
const redisIsCluster = credentials.cluster_mode; | ||
const url = credentials.uri.replace(/(?<=rediss:\/\/)[\w-]+?(?=:)/, ""); | ||
if (redisIsCluster) { | ||
return redis.createCluster({ | ||
rootNodes: [{ url }], | ||
// https://github.com/redis/node-redis/issues/1782 | ||
defaults: { | ||
password: credentials.password, | ||
socket: { tls: credentials.tls }, | ||
}, | ||
}); | ||
} | ||
} else { | ||
return redis.createClient({ | ||
socket: { reconnectStrategy: _localReconnectStrategy }, | ||
}); | ||
return redis.createClient({ url }); | ||
} catch (err) { | ||
throw EventQueueError.redisConnectionFailure(err); | ||
} | ||
@@ -97,7 +91,12 @@ }; | ||
const _localReconnectStrategy = () => EventQueueError.redisNoReconnect(); | ||
const closeMainClient = async () => { | ||
try { | ||
const client = await mainClientPromise; | ||
await _resilientClientClose(await mainClientPromise); | ||
} catch (err) { | ||
// ignore errors during shutdown | ||
} | ||
}; | ||
const _resilientClientClose = async (client) => { | ||
try { | ||
if (client?.quit) { | ||
@@ -111,2 +110,19 @@ await client.quit(); | ||
const connectionCheck = async () => { | ||
return new Promise((resolve, reject) => { | ||
createClientAndConnect(reject) | ||
.then((client) => { | ||
if (client) { | ||
_resilientClientClose(client); | ||
resolve(); | ||
} else { | ||
reject(new Error()); | ||
} | ||
}) | ||
.catch(reject); | ||
}) | ||
.then(() => true) | ||
.catch(() => false); | ||
}; | ||
module.exports = { | ||
@@ -118,2 +134,3 @@ createClientAndConnect, | ||
closeMainClient, | ||
connectionCheck, | ||
}; |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
161767
3985
+ Addedyaml@2.4.1(transitive)
- Removedyaml@2.4.0(transitive)
Updatedyaml@2.4.1