New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@cap-js-community/event-queue

Package Overview
Dependencies
Maintainers
7
Versions
67
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@cap-js-community/event-queue - npm Package Compare versions

Comparing version 1.2.1 to 1.2.2

2

package.json
{
"name": "@cap-js-community/event-queue",
"version": "1.2.1",
"version": "1.2.2",
"description": "An event queue that enables secure transactional processing of asynchronous events, featuring instant event processing with Redis Pub/Sub and load distribution across all application instances.",

@@ -5,0 +5,0 @@ "main": "src/index.js",

@@ -59,2 +59,3 @@ "use strict";

#userId;
#enableTxConsistencyCheck;
static #instance;

@@ -466,2 +467,10 @@ constructor() {

set enableTxConsistencyCheck(value) {
this.#enableTxConsistencyCheck = value;
}
get enableTxConsistencyCheck() {
return this.#enableTxConsistencyCheck;
}
get isMultiTenancy() {

@@ -468,0 +477,0 @@ return !!cds.requires.multitenancy;

@@ -20,2 +20,3 @@ "use strict";

LOAD_HIGHER_THAN_LIMIT: "LOAD_HIGHER_THAN_LIMIT",
SCHEMA_TENANT_MISMATCH: "SCHEMA_TENANT_MISMATCH",
};

@@ -67,2 +68,5 @@

},
[ERROR_CODES.SCHEMA_TENANT_MISMATCH]: {
message: "The db client associated to the tenant context does not match! Processing will be skipped.",
},
};

@@ -225,4 +229,15 @@

}
static dbClientSchemaMismatch(tenantId, dbClientSchema, serviceManagerSchema) {
const { message } = ERROR_CODES_META[ERROR_CODES.SCHEMA_TENANT_MISMATCH];
return new EventQueueError(
{
name: ERROR_CODES.SCHEMA_TENANT_MISMATCH,
info: { tenantId, dbClientSchema, serviceManagerSchema },
},
message
);
}
}
module.exports = EventQueueError;

@@ -535,2 +535,3 @@ "use strict";

await executeInNewTransaction(this.__baseContext, "eventQueue-getQueueEntriesAndSetToInProgress", async (tx) => {
await this.checkTxConsistency(tx);
const entries = await tx.run(

@@ -662,2 +663,42 @@ SELECT.from(this.#config.tableNameEventQueue)

async checkTxConsistency(tx) {
if (!this.#config.enableTxConsistencyCheck) {
return;
}
const errorHandler = (err) =>
this.logger.error("tx consistency check failed!", err, {
type: this.eventType,
subType: this.eventSubType,
txTenant: tx.context.tenant,
globalCdsTenant: cds.context.tenant,
});
let txSchema, serviceManagerSchema;
try {
const mtxServiceManager = require("@sap/cds-mtxs/srv/plugins/hana/srv-mgr");
const schemaPromise = tx.run("SELECT CURRENT_SCHEMA FROM DUMMY");
const serviceManagerBindingsPromise = mtxServiceManager.getAll();
const [schema, serviceManagerBindings] = await Promise.allSettled([schemaPromise, serviceManagerBindingsPromise]);
if (schema.reason) {
errorHandler(schema.reason);
return;
}
if (serviceManagerBindings.reason) {
errorHandler(schema.reason);
return;
}
txSchema = schema.value[0].CURRENT_SCHEMA;
serviceManagerSchema = serviceManagerBindings.value.find((t) => t.labels.tenant_id[0] === tx.context.tenant)
.credentials.schema;
} catch (err) {
errorHandler(err);
}
if (txSchema !== serviceManagerSchema) {
const err = EventQueueError.dbClientSchemaMismatch(tx.context.tenant, txSchema, serviceManagerSchema);
errorHandler(err);
throw err;
}
}
async #selectLastSuccessfulPeriodicTimestamp() {

@@ -664,0 +705,0 @@ const entry = await SELECT.one

"use strict";
// TODO: how to deal with fatal logs
// TODO: think about situations where isInitialized need to be checked - publishEvent access config which is not initialized
// TODO: add tests for config --> similar to csn check
// TODO: redis client check reconnect strategy
// TODO: for test
// --> deeper look into the functions e.g. getQueueEntriesAndSetToInProgress
// TODO: add test for commit on event level and stuff like that
module.exports = {

@@ -13,0 +7,0 @@ ...require("./initialize"),

@@ -40,2 +40,3 @@ "use strict";

["userId", null],
["enableTxConsistencyCheck", false],
];

@@ -57,2 +58,3 @@

userId,
enableTxConsistencyCheck,
} = {}) => {

@@ -81,3 +83,4 @@ // TODO: initialize check:

useAsCAPOutbox,
userId
userId,
enableTxConsistencyCheck
);

@@ -84,0 +87,0 @@

@@ -37,3 +37,3 @@ "use strict";

if (baseInstance.isPeriodicEvent) {
return await processPeriodicEvent(baseInstance);
return await processPeriodicEvent(context, baseInstance);
}

@@ -97,2 +97,5 @@ eventConfig.startTime = startTime;

cds.log(COMPONENT_NAME).error("Processing event queue failed with unexpected error.", err, {
tenantId: context?.tenant,
tenantIdBase: baseInstance?.context?.tenant,
globalTenantId: cds.context?.tenant,
eventType,

@@ -120,6 +123,3 @@ eventSubType,

const processPeriodicEvent = async (eventTypeInstance) => {
let queueEntry;
let processNext = true;
const processPeriodicEvent = async (context, eventTypeInstance) => {
const isPeriodicEventBlockedCb = config.isPeriodicEventBlockedCb;

@@ -151,2 +151,4 @@ const params = [eventTypeInstance.eventType, eventTypeInstance.eventSubType, eventTypeInstance.context.tenant];

try {
let queueEntry;
let processNext = true;
while (processNext) {

@@ -214,2 +216,5 @@ await executeInNewTransaction(

eventSubType: eventTypeInstance?.eventSubType,
tenantId: context?.tenant,
tenantIdBase: eventTypeInstance?.context?.tenant,
globalTenantId: cds.context?.tenant,
});

@@ -216,0 +221,0 @@ } finally {

@@ -103,12 +103,18 @@ "use strict";

if (subdomainCache[tenantId]) {
return subdomainCache[tenantId];
return await subdomainCache[tenantId];
}
try {
const ssp = await cds.connect.to("cds.xt.SaasProvisioningService");
const response = await ssp.get("/tenant", { subscribedTenantId: tenantId });
subdomainCache[tenantId] = response.subscribedSubdomain;
return response.subscribedSubdomain;
} catch (err) {
return null;
}
subdomainCache[tenantId] = new Promise((resolve) => {
cds.connect
.to("cds.xt.SaasProvisioningService")
.then((ssp) => {
ssp
.get("/tenant", { subscribedTenantId: tenantId })
.then((response) => {
resolve(response.subscribedSubdomain);
})
.catch(() => resolve(null));
})
.catch(() => resolve(null));
});
return await subdomainCache[tenantId];
};

@@ -115,0 +121,0 @@

@@ -73,13 +73,12 @@ "use strict";

const subscribeRedisChannel = (channel, subscribeCb) => {
const subscribeRedisChannel = (channel, subscribeHandler) => {
const errorHandlerCreateClient = (err) => {
cds.log(COMPONENT_NAME).error(`error from redis client for pub/sub failed for channel ${channel}`, err);
subscriberChannelClientPromise[channel] = null;
setTimeout(() => subscribeRedisChannel(channel, subscribeCb), 5 * 1000).unref();
setTimeout(() => subscribeRedisChannel(channel, subscribeHandler), 5 * 1000).unref();
};
subscriberChannelClientPromise[channel] = createClientAndConnect(errorHandlerCreateClient);
subscriberChannelClientPromise[channel]
subscriberChannelClientPromise[channel] = createClientAndConnect(errorHandlerCreateClient)
.then((client) => {
cds.log(COMPONENT_NAME).info("subscribe redis client connected channel", { channel });
client.subscribe(channel, subscribeCb).catch(errorHandlerCreateClient);
client.subscribe(channel, subscribeHandler).catch(errorHandlerCreateClient);
})

@@ -86,0 +85,0 @@ .catch((err) => {

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc