Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@opuscapita/event-client

Package Overview
Dependencies
Maintainers
13
Versions
88
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@opuscapita/event-client - npm Package Compare versions

Comparing version 2.2.1 to 2.2.2

2

index.js

@@ -1,3 +0,3 @@

const { EventClient } = require('./lib');
const {EventClient} = require('./lib');
module.exports = EventClient;

@@ -32,13 +32,31 @@ /* eslint max-len: ["error", {"code":160}] */

/* Channel states */
static get CS_NONE() { return 0; }
static get CS_READY() { return 1; }
static get CS_BLOCKED() { return 2; }
static get CS_ERROR() { return 3; }
static get CS_NONE() {
return 0;
}
static get CS_READY() {
return 1;
}
static get CS_BLOCKED() {
return 2;
}
static get CS_ERROR() {
return 3;
}
/* Message states */
static get MSG_CREATED() { return 0; }
static get MSG_SENT() { return 1; }
static get MSG_NACKED() { return 2; }
static get MSG_BLOCKED() { return 3; }
static get MSG_FAILED() { return 4; }
static get MSG_CREATED() {
return 0;
}
static get MSG_SENT() {
return 1;
}
static get MSG_NACKED() {
return 2;
}
static get MSG_BLOCKED() {
return 3;
}
static get MSG_FAILED() {
return 4;
}

@@ -62,4 +80,4 @@ /**

this.logger = logger || new Logger({ context: { name: '@opuscapita/event-client:channel' } });
this.config = extend(true, { allowPing: false }, config);
this.logger = logger || new Logger({context: {name: '@opuscapita/event-client:channel'}});
this.config = extend(true, {allowPing: false}, config);
this.connectionConfig = connectionConfig;

@@ -137,3 +155,3 @@

*/
async registerConsumer({ exchangeName, queueName, topic, callback, prefetchCount }) {
async registerConsumer({exchangeName, queueName, topic, callback, prefetchCount}) {
if (this.consumerExists(topic)) {

@@ -150,7 +168,7 @@ throw new EventError(`The topic "${topic}" has already been registered.`, 409);

// TODO what to do when this fails?
await this.createAndBindQueue({ queueName, exchangeName, topic });
await this.createAndBindQueue({queueName, exchangeName, topic});
this.logger.info('Trying to consume', { queueName, topic });
this.logger.info('Trying to consume', {queueName, topic});
const consumerLogger = new Logger({ context: { "name": 'event-client:channel:registerConsumer#cb' } });
const consumerLogger = new Logger({context: {'name': 'event-client:channel:registerConsumer#cb'}});
const consumer = await channel.consume(queueName, async (message) => {

@@ -172,3 +190,3 @@ try {

if (!this.consumers.has(topic)) {
consumerLogger.warn('Received an empty message but not expecting to consume it - message ignored.', { topic: topic });
consumerLogger.warn('Received an empty message but not expecting to consume it - message ignored.', {topic: topic});
} else {

@@ -183,3 +201,3 @@ consumerLogger.warn('Received an empty message. Reregistering consumer ...');

// Using setImmediate to avoid stack overflow of recursive call
setImmediate(this.registerConsumer.bind(this, { exchangeName, queueName, topic, callback, prefetchCount }));
setImmediate(this.registerConsumer.bind(this, {exchangeName, queueName, topic, callback, prefetchCount}));
}

@@ -201,5 +219,5 @@ } else {

messageId: message.properties.messageId,
error: e });
error: e});
} else {
consumerLogger.warn('Dead event.', { queueName: queueName, error: e });
consumerLogger.warn('Dead event.', {queueName: queueName, error: e});
}

@@ -239,3 +257,3 @@

const channel = await this.getChannel();
return channel.get(queueName, { noAck: autoAck });
return channel.get(queueName, {noAck: autoAck});
}

@@ -253,3 +271,3 @@

async createAndBindQueue({ queueName, exchangeName, topic }) {
async createAndBindQueue({queueName, exchangeName, topic}) {
const channel = await this.getChannel();

@@ -284,3 +302,3 @@

async removeConsumer(topic) {
this.logger.debug('AmqpChannel#removeConsumer: Trying to remove consumer', { topic });
this.logger.debug('AmqpChannel#removeConsumer: Trying to remove consumer', {topic});

@@ -366,3 +384,3 @@ const channel = await this.getChannel();

doPublishWait(message) {
this.logger.debug('AmqpChannel#doPublishWait: Putting message to local wait queue', { message });
this.logger.debug('AmqpChannel#doPublishWait: Putting message to local wait queue', {message});

@@ -413,3 +431,3 @@ const cachedMsg = this._createCacheEntry(message);

*/
this.unackedMsgs.set(msgKey, { ...this.unackedMsgs.get(msgKey), state: AmqpChannel.MSG_SENT });
this.unackedMsgs.set(msgKey, {...this.unackedMsgs.get(msgKey), state: AmqpChannel.MSG_SENT});
} else {

@@ -444,3 +462,3 @@ /*

async doPublishConfirm(message, [msgKey, cacheEntry]) {
const { exchangeName, topic, messageBuffer, options } = message;
const {exchangeName, topic, messageBuffer, options} = message;

@@ -467,3 +485,3 @@ return new Promise(async (resolve, reject) => {

this.logger.info('AmqpChannel#doPublishConfirm: Moving message from unackedMsgs to waitQueue', { messageId: message.options.messageId });
this.logger.info('AmqpChannel#doPublishConfirm: Moving message from unackedMsgs to waitQueue', {messageId: message.options.messageId});

@@ -499,7 +517,7 @@ this.waitingCounter--;

if (this.unackedMsgs.has(msgKey) === false) {
this.logger.info('AmqpChannel:doPublishConfirm:confirmCallback: message deleted from unackedMsgs -> rejecting', { id: message.options.messageId });
this.logger.info('AmqpChannel:doPublishConfirm:confirmCallback: message deleted from unackedMsgs -> rejecting', {id: message.options.messageId});
this.waitingCounter--;
reject(false);
} else {
const result = channel.publish(exchangeName, topic, messageBuffer, { ...options, mandatory: true }, (err) => {
const result = channel.publish(exchangeName, topic, messageBuffer, {...options, mandatory: true}, (err) => {
/* Using the confirm callback to capture message, cache, etc. in the closure. */

@@ -519,3 +537,3 @@

if (this.unackedMsgs.has(msgKey)) {
this.unackedMsgs.set(msgKey, { ...this.unackedMsgs.get(msgKey), state: AmqpChannel.MSG_NACKED });
this.unackedMsgs.set(msgKey, {...this.unackedMsgs.get(msgKey), state: AmqpChannel.MSG_NACKED});
}

@@ -527,3 +545,3 @@

} else {
this.logger.info('AmqpChannel:doPublishConfirm:confirmCallback: Got ACK from message broker', { id: message.options.messageId });
this.logger.info('AmqpChannel:doPublishConfirm:confirmCallback: Got ACK from message broker', {id: message.options.messageId});

@@ -556,5 +574,5 @@ if (this.confirmFailedQueue.has(msgKey) && this.waitQueue.has(msgKey)) {

msgKey,
{ ...this.unackedMsgs.get(msgKey), state: AmqpChannel.MSG_FAILED });
{...this.unackedMsgs.get(msgKey), state: AmqpChannel.MSG_FAILED});
}
this.logger.error('AmqpChannel#publish: failed', { id: message.options.messageId, e });
this.logger.error('AmqpChannel#publish: failed', {id: message.options.messageId, e});

@@ -575,3 +593,3 @@ return false;

*/
async doPublishNoConfirm({ exchangeName, topic, messageBuffer, options }) {
async doPublishNoConfirm({exchangeName, topic, messageBuffer, options}) {
const channel = await this.getChannel();

@@ -583,3 +601,3 @@ return channel.publish(exchangeName, topic, messageBuffer, options);

const channel = await this.getChannel();
return await channel.assertExchange(exchangeName, type, { durable: true, autoDelete: false });
return await channel.assertExchange(exchangeName, type, {durable: true, autoDelete: false});
}

@@ -591,3 +609,3 @@

await this.createExchange(`${exchangeName}.dead`, 'fanout');
await channel.assertQueue(`${exchangeName}.dead`, { durable: true, autoDelete: false });
await channel.assertQueue(`${exchangeName}.dead`, {durable: true, autoDelete: false});
await channel.bindQueue(`${exchangeName}.dead`, `${exchangeName}.dead`, '');

@@ -674,3 +692,3 @@ }

this.logger.debug('AmqpChannel#queueExists: RETURN', { exists });
this.logger.debug('AmqpChannel#queueExists: RETURN', {exists});
return exists;

@@ -745,3 +763,3 @@ }

'AmqpChannel#flushWaitQueue: Saving message to logstash - ',
{ value: Buffer.from(JSON.stringify(v)).toString('base64') }
{value: Buffer.from(JSON.stringify(v)).toString('base64')}
);

@@ -768,3 +786,3 @@ }

'AmqpChannel#flushWaitQueue: Failed to deliver message - ',
{ value: Buffer.from(JSON.stringify(v)).toString('base64') }
{value: Buffer.from(JSON.stringify(v)).toString('base64')}
);

@@ -790,3 +808,3 @@ }

onChannelError(err) {
this.logger.error('A channel has been unexpectedly closed', { err });
this.logger.error('A channel has been unexpectedly closed', {err});

@@ -799,3 +817,3 @@ // Not using AmqpChannel#close() here because this would also clear the subscriptions.

onChannelClose(err) {
this.logger.info('AmqpChannel#onChannelClose: onChannelClose handler was triggered on channel', { state: this.state, err });
this.logger.info('AmqpChannel#onChannelClose: onChannelClose handler was triggered on channel', {state: this.state, err});

@@ -811,3 +829,3 @@ this.channel = null; // Not using AmqpChannel#close() here because this would also clear the subscriptions.

onChannelDrain() {
this.logger.info('AmpChannel#onChannelDrain: Received drain event on channel', { channel: this.channel.ch });
this.logger.info('AmpChannel#onChannelDrain: Received drain event on channel', {channel: this.channel.ch});

@@ -821,6 +839,8 @@ if (this.state === AmqpChannel.CS_READY) {

onChannelReturn(message) {
if (message.fields.routingKey === 'ping') {return;}
if (message.fields.routingKey === 'ping') {
return;
}
this.logger.error(
'AmqpChannel#onChannelReturn: Failed to deliver message - routing failure',
{ value: Buffer.from(JSON.stringify(message)).toString('base64')
{value: Buffer.from(JSON.stringify(message)).toString('base64')
});

@@ -852,8 +872,8 @@ }

this.logger.info('AmqpChannel#_registerMissingConsumers: Registering missing consumers');
this.logger.info('AmqpChannel#_registerMissingConsumers: Active consumers', { activeTags });
this.logger.info('AmqpChannel#_registerMissingConsumers: Missing consumers', { missingConsumers });
this.logger.info('AmqpChannel#_registerMissingConsumers: Active consumers', {activeTags});
this.logger.info('AmqpChannel#_registerMissingConsumers: Missing consumers', {missingConsumers});
}
result = await Promise.all(missingConsumers.map(async (c) => {
this.logger.info('Registering...', { topic: c.topic });
this.logger.info('Registering...', {topic: c.topic});

@@ -918,3 +938,5 @@ try {

await this.sendPing(this.channel).
then(() => { this.useWaitQueue = false}).
then(() => {
this.useWaitQueue = false;
}).
catch(() => this.logger.warn('AmqpChannel#registerConnectinListener: Connection was established but got immediatly blocked'));

@@ -965,4 +987,4 @@ }

'ping',
Buffer.from(JSON.stringify({ cmd: 'ping' })),
{ mandatory: true },
Buffer.from(JSON.stringify({cmd: 'ping'})),
{mandatory: true},
(err) => {

@@ -973,3 +995,3 @@ if (timeout) {

this.logger.info('AmqpChannel#sendPing: Confirm for ping received', { channel: channel.ch });
this.logger.info('AmqpChannel#sendPing: Confirm for ping received', {channel: channel.ch});

@@ -994,3 +1016,3 @@ if (err) {

_createCacheEntry({ exchangeName, topic, messageBuffer, options }, state = AmqpChannel.MSG_CREATED) {
_createCacheEntry({exchangeName, topic, messageBuffer, options}, state = AmqpChannel.MSG_CREATED) {
const hmac = crypto.createHmac('sha256', Buffer.from(`${Date.now().toString()}`));

@@ -997,0 +1019,0 @@ hmac.update(messageBuffer);

@@ -10,9 +10,23 @@ const Logger = require('@opuscapita/logger');

/* Connection states */
static get CS_EMPTY() { return 0; }
static get CS_CONNECTING() { return 1; }
static get CS_CONNECTED() { return 2; }
static get CS_BLOCKED() { return 3; }
static get CS_ERROR() { return 4; }
static get CS_CLOSED() { return 5; }
static get CS_DISPOSED() { return 6; }
static get CS_EMPTY() {
return 0;
}
static get CS_CONNECTING() {
return 1;
}
static get CS_CONNECTED() {
return 2;
}
static get CS_BLOCKED() {
return 3;
}
static get CS_ERROR() {
return 4;
}
static get CS_CLOSED() {
return 5;
}
static get CS_DISPOSED() {
return 6;
}

@@ -23,3 +37,3 @@ constructor(config, logger) {

this.connectionState = AmqpConnection.CS_EMPTY;
this.logger = logger || new Logger({ context: { name: '@opuscapita/event-client:connection' } });
this.logger = logger || new Logger({context: {name: '@opuscapita/event-client:connection'}});
this.events = new EventEmitter();

@@ -30,6 +44,6 @@ this.connectTryCount = 0;

createHash('md5').
update(Math.random() + '').
update(`${Math.random()}`).
digest('hex');
logger.contextify({ uniqueId: this.uniqueId });
logger.contextify({uniqueId: this.uniqueId});
}

@@ -51,3 +65,3 @@

this.logger.info('Current connectTryCount', { count: this.connectTryCount });
this.logger.info('Current connectTryCount', {count: this.connectTryCount});

@@ -92,3 +106,7 @@ if (this.connectTryCount < this.config.maxConnectTryCount) {

this.connectionState = AmqpConnection.CS_CLOSED;
this.logger.warn('AmqpConnection: Connection went to CLOSED state:', err);
this.logger.info('AmqpConnection: Connection went to CLOSED state:', err);
// When a channel is no longer needed, it should be closed.
// This can be a normal operation (ex. when checking if queue exists) or unwanted situation (an error).
// An attempt to perform an operation on a closed channel will result in an exception that says
// that the channel has already been closed.
this.events.emit('close');

@@ -104,3 +122,3 @@

this.logger.info('Connection established.', { host: this.config.host, port: this.config.port });
this.logger.info('Connection established.', {host: this.config.host, port: this.config.port});
} else {

@@ -115,3 +133,3 @@ this.connectionState = AmqpConnection.CS_ERROR;

} else {
this.logger.error('Max connectTryCount exceeded', { tryCount: this.connectTryCount });
this.logger.error('Max connectTryCount exceeded', {tryCount: this.connectTryCount});
throw new Error('Max connectTryCount exceeded.');

@@ -129,3 +147,3 @@ }

doConnect() {
this.logger.info('Trying to connect', { host: this.config.host, port: this.config.port });
this.logger.info('Trying to connect', {host: this.config.host, port: this.config.port});

@@ -132,0 +150,0 @@ return amqp.connect({

@@ -34,3 +34,3 @@ /* eslint max-len: ["error", {"code":160}] */

this._logger = this.config.logger || new Logger({ context: { name: '@opuscapita/event-client:client' } });
this._logger = this.config.logger || new Logger({context: {name: '@opuscapita/event-client:client'}});

@@ -50,3 +50,3 @@ this._onEndpointChangedBound = this._onEndpointChanged.bind(this);

if (!this._logger) {
this._logger = new Logger({ context: { name: '@opuscapita/event-client:client' } });
this._logger = new Logger({context: {name: '@opuscapita/event-client:client'}});
this._logger.warn('this._logger was uninitialized for unknown reason');

@@ -164,3 +164,3 @@ }

logger.contextify(extend(true, { }, localContext, options));
logger.info('Emitting event', { topic });
logger.info('Emitting event', {topic});

@@ -170,3 +170,3 @@ const exchangeName = this.exchangeName;

const messageToPublish = { exchangeName, topic, messageBuffer, options };
const messageToPublish = {exchangeName, topic, messageBuffer, options};
let result = false;

@@ -176,3 +176,3 @@ try {

} catch (e) {
logger.error('EventClient: Failed to publish message', { exchangeName, topic, e });
logger.error('EventClient: Failed to publish message', {exchangeName, topic, e});
}

@@ -183,3 +183,3 @@

} else {
logger.error('EventClient: Failed to publish message', { exchangeName, topic, messageToPublish });
logger.error('EventClient: Failed to publish message', {exchangeName, topic, messageToPublish});
throw new EventError('Unkown error: Event could not be published.', 500);

@@ -219,6 +219,6 @@ }

const logger = this.logger.clone();
const { routingKey } = message.fields;
const { contentType, headers } = message.properties;
const {routingKey} = message.fields;
const {contentType, headers} = message.properties;
logger.info('Receiving event for registered topic', { topic: topic, routingKey: routingKey });
logger.info('Receiving event for registered topic', {topic: topic, routingKey: routingKey});

@@ -236,3 +236,3 @@ if (contentType === this.config.parserContentType) {

logger.contextify(extend(true, { }, headers, message.properties));
logger.info(`Passing event to application.`, { routingKey: routingKey });
logger.info('Passing event to application.', {routingKey: routingKey});

@@ -250,6 +250,6 @@ try {

this.callbackErrorCount[topic] = this.callbackErrorCount[topic] + 1 || 1;
logger.info('Processing re-try', { topicRetryCounter: this.callbackErrorCount[topic] });
logger.info('Processing re-try', {topicRetryCounter: this.callbackErrorCount[topic]});
if (this.callbackErrorCount[topic] >= 32) {
logger.info('Event processing failure - will NOT try again.', { topicRetryCounter: this.callbackErrorCount[topic] });
logger.info('Event processing failure - will NOT try again.', {topicRetryCounter: this.callbackErrorCount[topic]});
// TODO: This should be variable and have possibility to use advanced logic, not a hardcoded value of 32

@@ -263,3 +263,3 @@ throw new EventError(`Processing the topic "${topic}" did not succeed after several tries.`, 500);

this.callbackErrorCount[topic] = this.callbackErrorCount[topic] + 1 || 1;
logger.info('Processing fail', { topicRetryCounter: this.callbackErrorCount[topic] });
logger.info('Processing fail', {topicRetryCounter: this.callbackErrorCount[topic]});

@@ -288,3 +288,3 @@ if (this.callbackErrorCount[topic] >= 32) {

} else {
await channel.createAndBindQueue({ queueName, exchangeName, topic });
await channel.createAndBindQueue({queueName, exchangeName, topic});
}

@@ -313,4 +313,4 @@ return true;

if (message) {
const { routingKey, deliveryTag } = message.fields;
const { contentType, headers } = message.properties;
const {routingKey, deliveryTag} = message.fields;
const {contentType, headers} = message.properties;

@@ -350,3 +350,3 @@ if (contentType === this.config.parserContentType) {

await this.getChannel.ackMessage({ fields: { deliveryTag: message.tag } });
await this.getChannel.ackMessage({fields: {deliveryTag: message.tag}});
})());

@@ -367,3 +367,3 @@ }

await this.getChannel.nackMessage({ fields: { deliveryTag: message.tag } }, allUpTo, requeue);
await this.getChannel.nackMessage({fields: {deliveryTag: message.tag}}, allUpTo, requeue);
})());

@@ -512,3 +512,3 @@ }

const channelClosePromises = this.channels.map((c) => {
return c.close().catch((e) => this.logger.error('EventClient#dispose: Failed to close channel', { ch: c.ch, e }));
return c.close().catch((e) => this.logger.error('EventClient#dispose: Failed to close channel', {ch: c.ch, e}));
});

@@ -564,8 +564,12 @@ await Promise.all(channelClosePromises);

const { host, port } = await consul.getEndPoint(config.mqServiceName);
const {host, port} = await consul.getEndPoint(config.mqServiceName);
const [username, password] = await consul.get([config.mqUserKey, config.mqPasswordKey]);
if (!consul.listeners('endpointChanged').some((fn) => fn === this._onEndpointChangedBound)) {consul.on('endpointChanged', this._onEndpointChangedBound);}
if (!consul.listeners('endpointChanged').some((fn) => fn === this._onEndpointChangedBound)) {
consul.on('endpointChanged', this._onEndpointChangedBound);
}
if (!consul.listeners('propertyChanged').some((fn) => fn === this._onPropertyChangedBound)) {consul.on('propertyChanged', this._onPropertyChangedBound);}
if (!consul.listeners('propertyChanged').some((fn) => fn === this._onPropertyChangedBound)) {
consul.on('propertyChanged', this._onPropertyChangedBound);
}

@@ -584,5 +588,7 @@ return {

async _getNewChannel() {
if (!this.consumerConnection) {await this.init();}
if (!this.consumerConnection) {
await this.init();
}
const channel = new AmqpChannel(this.consumerConnection, this.logger, this.connectionConfig, { exchangeName: this.exchangeName });
const channel = new AmqpChannel(this.consumerConnection, this.logger, this.connectionConfig, {exchangeName: this.exchangeName});
this.channels.push(channel);

@@ -606,8 +612,12 @@

const { host, port } = await configService.getEndPoint(config.mqServiceName);
const {host, port} = await configService.getEndPoint(config.mqServiceName);
const [username, password] = await configService.get([config.mqUserKey, config.mqPasswordKey]);
if (this.publisherConnection) {this.publisherConnection.setConfig({ host, port, username, password });}
if (this.publisherConnection) {
this.publisherConnection.setConfig({host, port, username, password});
}
if (this.consumerConnection) {this.consumerConnection.setConfig({ host, port, username, password });}
if (this.consumerConnection) {
this.consumerConnection.setConfig({host, port, username, password});
}

@@ -621,3 +631,3 @@ return true;

if (serviceName === config.mqServiceName) {
this.logger.info('EventClient#_onEndpointChanged: Got on onEndpointChange event', { serviceName });
this.logger.info('EventClient#_onEndpointChanged: Got on onEndpointChange event', {serviceName});
try {

@@ -635,3 +645,3 @@ this._doReconnect();

if (key === config.mqUserKey || key === config.mqPasswordKey) {
this.logger.info('EventClient#_onPropertyChanged: Got on onPropertyChanged event', { key });
this.logger.info('EventClient#_onPropertyChanged: Got on onPropertyChanged event', {key});
try {

@@ -638,0 +648,0 @@ this._doReconnect();

@@ -6,2 +6,2 @@ const AmqpChannel = require('./AmqpChannel');

module.exports = { AmqpChannel, AmqpConnection, EventClient, EventError };
module.exports = {AmqpChannel, AmqpConnection, EventClient, EventError};
{
"name": "@opuscapita/event-client",
"version": "2.2.1",
"version": "2.2.2",
"description": "An event client to handle the event system in microservices architecture.",

@@ -33,10 +33,8 @@ "main": "index.js",

"devDependencies": {
"@opuscapita/eslint-config-opuscapita-bnapp": "^1.3.5",
"@types/amqplib": "^0.8.2",
"@types/mocha": "^9.1.1",
"@types/node": "^18.6.1",
"babel-eslint": "10.1.0",
"eslint": "^8.4.1",
"eslint-config-opuscapita": "2.0.10",
"eslint": "^8.20.0",
"eslint-find-rules": "^3.6.1",
"eslint-plugin-react": "7.26.1",
"mocha": "^9.1.3",

@@ -43,0 +41,0 @@ "mocha-junit-reporter": "^2.0.2",

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc