Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/event-queue

Package Overview
Dependencies
Maintainers
0
Versions
65
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/event-queue - npm Package Compare versions

Comparing version 1.7.2 to 1.7.3

8

cds-plugin.js

@@ -7,2 +7,3 @@ "use strict";

const eventQueue = require("./src");
const EventQueueError = require("./src/EventQueueError");
const COMPONENT_NAME = "/eventQueue/plugin";

@@ -22,3 +23,8 @@ const SERVE_COMMAND = "serve";

if (Object.keys(cds.env.eventQueue ?? {}).length) {
module.exports = eventQueue.initialize().catch((err) => cds.log(COMPONENT_NAME).error(err));
module.exports = eventQueue.initialize().catch((err) => {
if (EventQueueError.isRedisConnectionFailure(err) && eventQueue.config.crashOnRedisUnavailable) {
throw err;
}
cds.log(COMPONENT_NAME).error(err);
});
}

2

package.json
{
"name": "@cap-js-community/event-queue",
"version": "1.7.2",
"version": "1.7.3",
"description": "An event queue that enables secure transactional processing of asynchronous and periodic events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",

@@ -5,0 +5,0 @@ "main": "src/index.js",

@@ -80,2 +80,3 @@ "use strict";

#publishEventBlockList;
#crashOnRedisUnavailable;
static #instance;

@@ -514,2 +515,10 @@ constructor() {

get crashOnRedisUnavailable() {
return this.#crashOnRedisUnavailable;
}
set crashOnRedisUnavailable(value) {
this.#crashOnRedisUnavailable = value;
}
set globalTxTimeout(value) {

@@ -516,0 +525,0 @@ this.#globalTxTimeout = value;

@@ -44,3 +44,3 @@ "use strict";

[ERROR_CODES.REDIS_LOCAL_NO_RECONNECT]: {
message: "disabled reconnect, because we are not running on cloud foundry",
message: "disabled reconnect, because not running on cloud foundry",
},

@@ -139,3 +139,3 @@ [ERROR_CODES.MISSING_TABLE_DEFINITION]: {

name: ERROR_CODES.REDIS_CREATE_CLIENT,
cause: err,
...(err && { cause: err }),
},

@@ -330,4 +330,8 @@ message

}
static isRedisConnectionFailure(err) {
return err instanceof VError && err.name === ERROR_CODES.REDIS_CREATE_CLIENT;
}
}
module.exports = EventQueueError;

@@ -185,2 +185,4 @@ import * as cds from "@sap/cds";

checkRedisEnabled(): boolean;
publishEventBlockList(): boolean;
crashOnRedisUnavailable(): boolean;
attachConfigChangeHandler(): void;

@@ -187,0 +189,0 @@ attachRedisUnsubscribeHandler(): void;

@@ -19,2 +19,3 @@ "use strict";

const distributedLock = require("./shared/distributedLock");
const EventQueueError = require("./EventQueueError");

@@ -43,2 +44,3 @@ const readFileAsync = promisify(fs.readFile);

["publishEventBlockList", true],
["crashOnRedisUnavailable", false],
];

@@ -66,2 +68,3 @@

* @param {string} [options.publishEventBlockList=true] - If redis is available event blocklist is distributed to all application instances
* @param {string} [options.crashOnRedisUnavailable=true] - If enabled an error is thrown if the redis connection check is not successful
*/

@@ -95,2 +98,5 @@ const initialize = async (options = {}) => {

config.redisEnabled = await redis.connectionCheck(config.redisOptions);
if (!config.redisEnabled && config.crashOnRedisUnavailable) {
throw EventQueueError.redisConnectionFailure();
}
}

@@ -146,13 +152,17 @@ config.fileContent = await readConfigFromFile(config.configFilePath);

if (!config.isMultiTenancy) {
runner.singleTenant().catch(errorHandler);
if (config.redisEnabled) {
initEventQueueRedisSubscribe();
config.attachConfigChangeHandler();
if (config.isMultiTenancy) {
runner.multiTenancyRedis().catch(errorHandler);
} else {
runner.singleTenantRedis().catch(errorHandler);
}
return;
}
if (config.redisEnabled) {
initEventQueueRedisSubscribe();
config.attachConfigChangeHandler();
runner.multiTenancyRedis().catch(errorHandler);
if (config.isMultiTenancy) {
runner.multiTenancyDb().catch(errorHandler);
} else {
runner.multiTenancyDb().catch(errorHandler);
runner.singleTenantDb().catch(errorHandler);
}

@@ -159,0 +169,0 @@ };

@@ -30,3 +30,3 @@ "use strict";

const singleTenant = () => _scheduleFunction(_checkPeriodicEventsSingleTenantOneTime, _singleTenantDb);
const singleTenantDb = () => _scheduleFunction(_checkPeriodicEventsSingleTenantOneTime, _singleTenantDb);

@@ -37,2 +37,4 @@ const multiTenancyDb = () => _scheduleFunction(async () => {}, _multiTenancyDb);

const singleTenantRedis = () => _scheduleFunction(_checkPeriodicEventsSingleTenantOneTime, _singleTenantRedis);
const _scheduleFunction = async (singleRunFn, periodicFn) => {

@@ -305,2 +307,50 @@ const logger = cds.log(COMPONENT_NAME);

const _singleTenantRedis = async () => {
const id = cds.utils.uuid();
const logger = cds.log(COMPONENT_NAME);
try {
// NOTE: do checks for open events on one app instance distribute from this instance to all others
const dummyContext = new cds.EventContext({});
const couldAcquireLock = await trace(
dummyContext,
"acquire-lock-master-runner",
async () => {
return await distributedLock.acquireLock(dummyContext, EVENT_QUEUE_RUN_REDIS_CHECK, {
expiryTime: eventQueueConfig.runInterval * 0.95,
tenantScoped: false,
});
},
{ newRootSpan: true }
);
if (!couldAcquireLock) {
return;
}
await trace(
{ id },
"get-openEvents-and-publish",
async () => {
return await cds.tx({}, async (tx) => {
const entries = await openEvents.getOpenQueueEntries(tx);
logger.info("broadcasting events for run", {
entries: entries.length,
});
if (!entries.length) {
return;
}
// Do not wait until this is finished - as broadcastEvent has a retry mechanism and can delay this loop
redisPub.broadcastEvent(null, entries).catch((err) => {
logger.error("broadcasting event failed", err, {
entries: entries.length,
});
});
});
},
{ newRootSpan: true }
);
} catch (err) {
logger.info("executing event queue run for single tenant via redis", err);
}
};
const _acquireRunId = async (context) => {

@@ -407,6 +457,30 @@ let runId = randomUUID();

const _checkPeriodicEventsSingleTenantOneTime = async () =>
eventQueueConfig.updatePeriodicEvents &&
cds.tx({}, async (tx) => await periodicEvents.checkAndInsertPeriodicEvents(tx.context));
const _checkPeriodicEventsSingleTenantOneTime = async () => {
const logger = cds.log(COMPONENT_NAME);
if (!eventQueueConfig.updatePeriodicEvents || !eventQueueConfig.periodicEvents.length) {
logger.info("updating of periodic events is disabled or no periodic events configured", {
updateEnabled: eventQueueConfig.updatePeriodicEvents,
events: eventQueueConfig.periodicEvents.length,
});
return;
}
const dummyContext = new cds.EventContext({});
return await trace(
dummyContext,
"update-periodic-events",
async () => {
const couldAcquireLock = await distributedLock.acquireLock(dummyContext, EVENT_QUEUE_UPDATE_PERIODIC_EVENTS, {
expiryTime: 60 * 1000,
tenantScoped: false,
});
if (!couldAcquireLock) {
return;
}
return await cds.tx({}, async (tx) => await periodicEvents.checkAndInsertPeriodicEvents(tx.context));
},
{ newRootSpan: true }
);
};
const _checkPeriodicEventsSingleTenant = async (context) => {

@@ -436,8 +510,10 @@ const logger = cds.log(COMPONENT_NAME);

module.exports = {
singleTenant,
singleTenantDb,
multiTenancyDb,
multiTenancyRedis,
singleTenantRedis,
__: {
_singleTenantDb,
_multiTenancyRedis,
_singleTenantRedis,
_multiTenancyDb,

@@ -444,0 +520,0 @@ _calculateOffsetForFirstRun,

@@ -90,2 +90,6 @@ "use strict";

const getAuthInfo = async (tenantId) => {
if (!tenantId) {
return null;
}
if (!cds.requires?.auth?.credentials) {

@@ -92,0 +96,0 @@ return null; // no credentials not authInfo

@@ -53,21 +53,23 @@ "use strict";

const createClientAndConnect = async (options, errorHandlerCreateClient) => {
const createClientAndConnect = async (options, errorHandlerCreateClient, isConnectionCheck) => {
try {
const client = _createClientBase(options);
if (!isConnectionCheck) {
client.on("error", (err) => {
const dateNow = Date.now();
if (dateNow - lastErrorLog > LOG_AFTER_SEC * 1000) {
cds.log(COMPONENT_NAME).error("error from redis client for pub/sub failed", err);
lastErrorLog = dateNow;
}
});
client.on("reconnecting", () => {
const dateNow = Date.now();
if (dateNow - lastErrorLog > LOG_AFTER_SEC * 1000) {
cds.log(COMPONENT_NAME).info("redis client trying reconnect...");
lastErrorLog = dateNow;
}
});
}
await client.connect();
client.on("error", (err) => {
const dateNow = Date.now();
if (dateNow - lastErrorLog > LOG_AFTER_SEC * 1000) {
cds.log(COMPONENT_NAME).error("error from redis client for pub/sub failed", err);
lastErrorLog = dateNow;
}
});
client.on("reconnecting", () => {
const dateNow = Date.now();
if (dateNow - lastErrorLog > LOG_AFTER_SEC * 1000) {
cds.log(COMPONENT_NAME).info("redis client trying reconnect...");
lastErrorLog = dateNow;
}
});
return client;

@@ -123,3 +125,3 @@ } catch (err) {

return new Promise((resolve, reject) => {
createClientAndConnect(options, reject)
createClientAndConnect(options, reject, true)
.then((client) => {

@@ -136,3 +138,6 @@ if (client) {

.then(() => true)
.catch(() => false);
.catch((err) => {
cds.log(COMPONENT_NAME).error("Redis connection check failed! Falling back to NO_REDIS mode", err);
return false;
});
};

@@ -139,0 +144,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc