Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/event-queue

Package Overview
Dependencies
Maintainers
7
Versions
65
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/event-queue - npm Package Compare versions

Comparing version 1.3.3 to 1.3.4

9

package.json
{
"name": "@cap-js-community/event-queue",
"version": "1.3.3",
"version": "1.3.4",
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",

@@ -47,3 +47,3 @@ "main": "src/index.js",

"verror": "1.10.1",
"yaml": "2.4.0"
"yaml": "2.4.1"
},

@@ -71,2 +71,7 @@ "devDependencies": {

"cds": {
"eventQueue": {
"[production]": {
"disableRedis": false
}
},
"requires": {

@@ -73,0 +78,0 @@ "event-queue": {

@@ -102,3 +102,4 @@ "use strict";

checkRedisEnabled() {
this.#redisEnabled = !this.#disableRedis && this._checkRedisIsBound() && this.#env.isOnCF;
this.#redisEnabled = !this.#disableRedis && this._checkRedisIsBound();
return this.#redisEnabled;
}

@@ -105,0 +106,0 @@

@@ -13,2 +13,3 @@ "use strict";

const PerformanceTracer = require("./shared/PerformanceTracer");
const { broadcastEvent } = require("./redisPubSub");

@@ -1006,7 +1007,7 @@ const IMPLEMENT_ERROR_MESSAGE = "needs to be reimplemented";

async scheduleNextPeriodEvent(queueEntry) {
const intervalInSec = this.#eventConfig.interval * 1000;
const intervalInMs = this.#eventConfig.interval * 1000;
const newEvent = {
type: this.#eventType,
subType: this.#eventSubType,
startAfter: new Date(new Date(queueEntry.startAfter).getTime() + intervalInSec),
startAfter: new Date(new Date(queueEntry.startAfter).getTime() + intervalInMs),
};

@@ -1020,3 +1021,4 @@ const { relative } = this.#eventSchedulerInstance.calculateOffset(

// more than one interval behind - shift tick to keep up
if (relative < 0 && Math.abs(relative) >= intervalInSec) {
if (relative < 0 && Math.abs(relative) >= intervalInMs) {
const plannedStartAfter = newEvent.startAfter;
newEvent.startAfter = new Date(Date.now() + 5 * 1000);

@@ -1026,2 +1028,3 @@ this.logger.info("interval adjusted because shifted more than one interval", {

eventSubType: this.#eventSubType,
plannedStartAfter,
newStartAfter: newEvent.startAfter,

@@ -1034,3 +1037,3 @@ });

this.tx._skipEventQueueBroadcase = false;
if (intervalInSec < this.#config.runInterval * 1.5) {
if (intervalInMs < this.#config.runInterval * 1.5) {
this.#handleDelayedEvents([newEvent]);

@@ -1116,2 +1119,14 @@ const { relative: relativeAfterSchedule } = this.#eventSchedulerInstance.calculateOffset(

broadCastEvent() {
setTimeout(() => {
broadcastEvent(this.__baseContext.tenant, this.#eventType, this.#eventSubType).catch((err) => {
this.logger.error("could not execute scheduled event", err, {
tenantId: this.__baseContext.tenant,
type: this.#eventType,
subType: this.#eventSubType,
});
});
}, 1000).unref();
}
get logger() {

@@ -1118,0 +1133,0 @@ return this.__logger ?? this.__baseLogger;

@@ -15,3 +15,3 @@ "use strict";

const { initEventQueueRedisSubscribe, closeSubscribeClient } = require("./redisPubSub");
const { closeMainClient } = require("./shared/redis");
const redis = require("./shared/redis");
const eventQueueAsOutbox = require("./outbox/eventQueueAsOutbox");

@@ -37,3 +37,3 @@ const { getAllTenantIds } = require("./shared/cdsHelper");

["tableNameEventLock", BASE_TABLES.LOCK],
["disableRedis", false],
["disableRedis", true],
["skipCsnCheck", false],

@@ -89,3 +89,5 @@ ["updatePeriodicEvents", true],

const logger = cds.log(COMPONENT);
config.checkRedisEnabled();
const redisEnabled = config.checkRedisEnabled();
let resolveFn;
let initFinished = new Promise((resolve) => (resolveFn = resolve));
cds.on("connect", (service) => {

@@ -95,5 +97,8 @@ if (service.name === "db") {

config.cleanupLocksAndEventsForDev && registerCleanupForDevDb().catch(() => {});
registerEventProcessors();
initFinished.then(registerEventProcessors);
}
});
if (redisEnabled) {
config.redisEnabled = await redis.connectionCheck();
}
config.fileContent = await readConfigFromFile(config.configFilePath);

@@ -112,2 +117,3 @@

});
resolveFn();
};

@@ -227,3 +233,3 @@

cds.on("shutdown", async () => {
await Promise.allSettled([closeMainClient(), closeSubscribeClient()]);
await Promise.allSettled([redis.closeMainClient(), closeSubscribeClient()]);
});

@@ -230,0 +236,0 @@ };

@@ -120,2 +120,3 @@ "use strict";

}
eventTypeInstance.logTimeExceeded(iterationCounter);

@@ -122,0 +123,0 @@ return false;

@@ -7,8 +7,5 @@ "use strict";

#isLocal;
#isOnCF;
#vcapServices;
constructor() {
this.#isLocal = process.env.USER !== "vcap";
this.#isOnCF = !this.#isLocal;
try {

@@ -32,9 +29,2 @@ this.#vcapServices = JSON.parse(process.env.VCAP_SERVICES);

set isOnCF(value) {
this.#isOnCF = value;
}
get isOnCF() {
return this.#isOnCF;
}
set vcapServices(value) {

@@ -41,0 +31,0 @@ this.#vcapServices = value;

@@ -29,25 +29,19 @@ "use strict";

const env = getEnvInstance();
if (env.isOnCF) {
try {
const credentials = env.getRedisCredentialsFromEnv();
const redisIsCluster = credentials.cluster_mode;
const url = credentials.uri.replace(/(?<=rediss:\/\/)[\w-]+?(?=:)/, "");
if (redisIsCluster) {
return redis.createCluster({
rootNodes: [{ url }],
// https://github.com/redis/node-redis/issues/1782
defaults: {
password: credentials.password,
socket: { tls: credentials.tls },
},
});
}
return redis.createClient({ url });
} catch (err) {
throw EventQueueError.redisConnectionFailure(err);
try {
const credentials = env.getRedisCredentialsFromEnv();
const redisIsCluster = credentials.cluster_mode;
const url = credentials.uri.replace(/(?<=rediss:\/\/)[\w-]+?(?=:)/, "");
if (redisIsCluster) {
return redis.createCluster({
rootNodes: [{ url }],
// https://github.com/redis/node-redis/issues/1782
defaults: {
password: credentials.password,
socket: { tls: credentials.tls },
},
});
}
} else {
return redis.createClient({
socket: { reconnectStrategy: _localReconnectStrategy },
});
return redis.createClient({ url });
} catch (err) {
throw EventQueueError.redisConnectionFailure(err);
}

@@ -97,7 +91,12 @@ };

const _localReconnectStrategy = () => EventQueueError.redisNoReconnect();
const closeMainClient = async () => {
try {
const client = await mainClientPromise;
await _resilientClientClose(await mainClientPromise);
} catch (err) {
// ignore errors during shutdown
}
};
const _resilientClientClose = async (client) => {
try {
if (client?.quit) {

@@ -111,2 +110,19 @@ await client.quit();

const connectionCheck = async () => {
return new Promise((resolve, reject) => {
createClientAndConnect(reject)
.then((client) => {
if (client) {
_resilientClientClose(client);
resolve();
} else {
reject(new Error());
}
})
.catch(reject);
})
.then(() => true)
.catch(() => false);
};
module.exports = {

@@ -118,2 +134,3 @@ createClientAndConnect,

closeMainClient,
connectionCheck,
};
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc