New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@bettercorp/service-base-plugin-events-rabbitmq

Package Overview
Dependencies
Maintainers
2
Versions
50
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@bettercorp/service-base-plugin-events-rabbitmq - npm Package Compare versions

Comparing version 2.0.20211208003051 to 3.0.20211220200946

22

lib/plugins/events-rabbitmq/plugin.d.ts
import { CEvents } from '@bettercorp/service-base/lib/ILib';
export declare class Events extends CEvents {
private appId;
private rabbitQConnection;
private rabbitQConnectionEmitChannel;
private readonly rabbitQConnectionEmitChannelKey;
private rabbitQConnectionEARChannel;
private readonly rabbitQConnectionEARChannelKey;
private readonly rabbitQConnectionEARChannelKeyMine;
private builtInEvents;
private readonly emitExchange;
private readonly earExchange;
private readonly emitExchangeQueue;
private readonly earExchangeQueue;
init(): Promise<void>;
private _connectToAMQP;
private _emitEventAsync;
private _setupEmitHandler;
onEvent<T = any>(plugin: string, pluginName: string | null, event: string, listener: (data: T) => void): Promise<void>;
onEvent<ArgsDataType = any>(callerPluginName: string, pluginName: string, event: string, listener: (data: ArgsDataType) => void): Promise<void>;
emitEvent<T = any>(plugin: string, pluginName: string | null, event: string, data?: T): Promise<void>;
private _setupEARHandler;
onReturnableEvent<T = any>(plugin: string, pluginName: string | null, event: string, listener: (resolve: Function, reject: Function, data: T) => void): Promise<void>;
onReturnableEvent<ArgsDataType = any, ResolveDataType = any, RejectDataType = any>(callerPluginName: string, pluginName: string, event: string, listener: {
(resolve: {
(...args: ResolveDataType[]): void;
}, reject: {
(...args: RejectDataType[]): void;
}, data: ArgsDataType): void;
}): Promise<void>;
emitEventAndReturn<T1 = any, T2 = void>(plugin: string, pluginName: string | null, event: string, data?: T1, timeoutSeconds?: number): Promise<T2>;
}

@@ -5,7 +5,7 @@ "use strict";

const amqplib = require("amqplib");
const crypto_1 = require("crypto");
const ILib_1 = require("@bettercorp/service-base/lib/ILib");
const uuid_1 = require("uuid");
const Tools_1 = require("@bettercorp/tools/lib/Tools");
const EVENT_EMITTER = require("events");
const OS = require("os");
const rabbit_queue_1 = require("rabbit-queue");
class Events extends ILib_1.CEvents {

@@ -16,24 +16,2 @@ constructor() {

this.rabbitQConnectionEARChannelKey = "ar";
this.rabbitQConnectionEARChannelKeyMine = "kr";
this.emitExchange = {
type: 'fanout',
name: 'better-service-emit'
};
this.earExchange = {
type: 'direct',
name: 'better-service-ear',
myResponseName: null
};
this.emitExchangeQueue = {
durable: true,
messageTtl: (60 * 60 * 60 * 6) * 1000 // 6h
//expires: (60*60*360)*1000 // 360 minutes
};
this.earExchangeQueue = {
durable: false,
//exclusive: true,
autoDelete: true,
messageTtl: (60 * 60 * 60) * 1000, // 60 minutes
//expires: (60 * 60 * 60) * 1000 // 60 minutes
};
}

@@ -44,2 +22,3 @@ init() {

try {
self.appId = `${OS.hostname()}-${(0, crypto_1.randomUUID)()}`;
await self._connectToAMQP();

@@ -55,201 +34,84 @@ resolve();

async _connectToAMQP() {
this.log.info(`Connect to ${(await this.getPluginConfig()).endpoint}`);
if (Tools_1.Tools.isNullOrUndefined((await this.getPluginConfig()).credentials)) {
throw new Error('Plugin credentials not defined in sec.config.json');
const pluginConfig = (await this.getPluginConfig());
this.log.info(`Connect to ${pluginConfig.endpoint}`);
let socketOptions = {};
if (!Tools_1.Tools.isNullOrUndefined(pluginConfig.credentials)) {
socketOptions.credentials = amqplib.credentials.plain('radmin', 'TLvGnHd9a9ndmo2nBepNxFXFprQ9eCpEvXp6qKN2YPBqUVN2va');
}
this.rabbitQConnection = await amqplib.connect((await this.getPluginConfig()).endpoint, {
credentials: amqplib.credentials.plain((await this.getPluginConfig()).credentials.username, (await this.getPluginConfig()).credentials.password)
this.rabbitQConnection = new rabbit_queue_1.Rabbit(pluginConfig.endpoint, {
prefetch: pluginConfig.prefetch,
replyPattern: true,
scheduledPublish: false,
prefix: '',
socketOptions
});
const self = this;
this.rabbitQConnection.on("error", (err) => {
if (err.message !== "Connection closing") {
self.log.error('AMQP ERROR', err.message);
}
this.rabbitQConnection.on('log', (component, level, ...args) => {
if (Tools_1.Tools.isFunction(self.log[level]))
return self.log[level](component, args);
if ('trace' === level)
return self.log.debug(component, args);
self.log.info(component, args);
});
this.rabbitQConnection.on("close", () => {
await new Promise((r) => {
this.rabbitQConnection.on('connected', r);
});
this.log.info(`Connected to ${pluginConfig.endpoint}`);
this.rabbitQConnection.on('disconnected', (err = new Error('Rabbitmq Disconnected')) => {
self.log.error(err);
self.log.error('AMQP CONNECTION CLOSED');
self.log.fatal('AMQP Error: Connection closed');
});
this.log.info(`Connected to ${(await this.getPluginConfig()).endpoint}`);
await this._setupEmitHandler();
await this._setupEARHandler();
}
_emitEventAsync(queueKey, queueAttr, plugin, pluginName, event, channel, data, additionalArgs) {
async onEvent(callerPluginName, pluginName, event, listener) {
const self = this;
return new Promise((resolve, reject) => {
self.log.debug(plugin, ` - EMIT: [${queueKey}-${pluginName || plugin}-${event}]`, data);
let qArguments = undefined;
if (!Tools_1.Tools.isNullOrUndefined(additionalArgs)) {
qArguments = JSON.parse(JSON.stringify(queueAttr));
for (let iKey of Object.keys(additionalArgs))
qArguments[iKey] = additionalArgs[iKey];
}
var ok = channel.assertQueue(`${queueKey}-${pluginName || plugin}-${event}`, qArguments || queueAttr);
ok.then(function (_qok) {
// NB: `sentToQueue` and `publish` both return a boolean
// indicating whether it's OK to send again straight away, or
// (when `false`) that you should wait for the event `'drain'`
// to fire before writing again. We're just doing the one write,
// so we'll ignore it.
if (!channel.sendToQueue(`${queueKey}-${pluginName || plugin}-${event}`, Buffer.from(JSON.stringify(data))))
return reject([`Cannot send msg to queue [${queueKey}-${pluginName || plugin}-${event}]`, data]);
self.log.debug(plugin, ` - EMIT: [${queueKey}-${pluginName || plugin}-${event}] - EMITTED`, data);
resolve();
}).catch((x) => {
reject([`Cannot assert queue [${queueKey}-${pluginName || plugin}-${event}]`, x]);
});
self.log.info(callerPluginName, ` - LISTEN: [${self.rabbitQConnectionEmitChannelKey}-${pluginName || callerPluginName}-${event}]`);
this.rabbitQConnection.createQueue(`${self.rabbitQConnectionEmitChannelKey}-${pluginName || callerPluginName}-${event}`, { durable: true }, (msg, ack) => {
let body = msg.content.toString();
const bodyObj = JSON.parse(body);
listener(bodyObj);
ack(null);
}).then(() => {
self.log.info(callerPluginName, ` - LISTEN: [${self.rabbitQConnectionEmitChannelKey}-${pluginName || callerPluginName}-${event}] - LISTENING`);
});
}
async _setupEmitHandler() {
this.log.info(`Open emit channel (${this.emitExchange.name})`);
this.rabbitQConnectionEmitChannel = await this.rabbitQConnection.createChannel();
const self = this;
this.rabbitQConnectionEmitChannel.on("close", () => {
self.log.error('AMQP rabbitQConnectionEmitChannel close');
self.log.fatal('AMQP Error: rabbitQConnectionEmitChannel close');
});
this.rabbitQConnectionEmitChannel.on("error", (err) => {
self.log.error('AMQP rabbitQConnectionEmitChannel error', err);
self.log.fatal('AMQP Error: rabbitQConnectionEmitChannel error');
});
this.rabbitQConnectionEmitChannel.assertExchange(this.emitExchange.name, this.emitExchange.type, this.emitExchangeQueue);
this.log.info(`Open emit channel (${this.emitExchange.name}) - PREFETCH x${(await this.getPluginConfig()).prefetch}`);
this.rabbitQConnectionEmitChannel.prefetch((await this.getPluginConfig()).prefetch);
this.log.info(`Open emit channel (${this.emitExchange.name}) - COMPLETED`);
}
async onEvent(plugin, pluginName, event, listener) {
const self = this;
self.log.info(plugin, ` - LISTEN: [${self.rabbitQConnectionEmitChannelKey}-${pluginName || plugin}-${event}]`);
let ok = self.rabbitQConnectionEmitChannel.assertQueue(`${self.rabbitQConnectionEmitChannelKey}-${pluginName || plugin}-${event}`, self.emitExchangeQueue);
ok = ok.then(function () {
self.log.info(plugin, ` - LISTEN: [${`${self.rabbitQConnectionEmitChannelKey}-${pluginName || plugin}-${event}`}] - LISTENING`);
});
//ok = ok.then(function () { channel.prefetch(1); });
ok = ok.then(function () {
self.rabbitQConnectionEmitChannel.consume(`${self.rabbitQConnectionEmitChannelKey}-${pluginName || plugin}-${event}`, (msg) => {
let body = msg.content.toString();
const bodyObj = JSON.parse(body);
listener(bodyObj);
self.rabbitQConnectionEmitChannel.ack(msg);
}, { noAck: false });
});
}
async emitEvent(plugin, pluginName, event, data) {
this._emitEventAsync(this.rabbitQConnectionEmitChannelKey, this.emitExchangeQueue, plugin, pluginName, event, this.rabbitQConnectionEmitChannel, data).then(this.log.debug).catch(this.log.fatal);
await this.rabbitQConnection.publish(`${this.rabbitQConnectionEmitChannelKey}-${pluginName || plugin}-${event}`, data, {
persistent: false,
expiration: ((60 * 60 * 60 * 6) * 1000),
appId: this.appId,
correlationId: this.appId + (0, crypto_1.randomUUID)()
})
.then(this.log.debug)
.catch(this.log.fatal);
}
async _setupEARHandler() {
this.log.info(`Ready my events name`);
this.earExchange.myResponseName = `${this.cwd}`.replace(/[\W-]/g, '').toLowerCase() +
(((await this.getPluginConfig()).noRandomDebugName === true ? '' : this.appConfig.runningInDebug || !this.appConfig.runningLive ? `-${Math.random()}` : '')) +
((await this.getPluginConfig()).uniqueId || '');
this.log.info(`Ready my events name - ${this.earExchange.myResponseName}`);
this.log.info(`Ready internal events`);
this.builtInEvents = new EVENT_EMITTER();
this.log.info(`Ready internal events - COMPLETED`);
this.log.info(`Open EAR channel (${this.earExchange.name})`);
this.rabbitQConnectionEARChannel = await this.rabbitQConnection.createChannel();
async onReturnableEvent(callerPluginName, pluginName, event, listener) {
this.log.info(callerPluginName, ` - LISTEN EAR: [${this.rabbitQConnectionEARChannelKey}-${pluginName || callerPluginName}-${event}]`);
const self = this;
this.rabbitQConnectionEARChannel.on("close", () => {
self.log.error('AMQP rabbitQConnectionEARChannel close');
self.log.fatal('AMQP Error: rabbitQConnectionEARChannel close');
this.rabbitQConnection
.createQueue(`${self.rabbitQConnectionEARChannelKey}-${pluginName || callerPluginName}-${event}`, { durable: false }, (msg, ack) => {
let body = msg.content.toString();
let bodyObj = JSON.parse(body);
listener((x) => {
self.log.debug(callerPluginName, ` - RETURN OKAY: [${self.rabbitQConnectionEARChannelKey}-${pluginName || callerPluginName}-${event}]`, bodyObj);
ack(null, x);
}, (x) => {
self.log.debug(callerPluginName, ` - RETURN ERROR: [${self.rabbitQConnectionEARChannelKey}-${pluginName || callerPluginName}-${event}]`, bodyObj);
ack(x);
}, bodyObj);
})
.then(() => {
self.log.info(callerPluginName, ` - LISTEN EAR: [${self.rabbitQConnectionEARChannelKey}-${pluginName || callerPluginName}-${event}] - LISTENING`);
});
this.rabbitQConnectionEARChannel.on("error", (err) => {
self.log.error('AMQP rabbitQConnectionEARChannel error', err);
self.log.fatal('AMQP Error: rabbitQConnectionEARChannel error');
});
this.rabbitQConnectionEARChannel.assertExchange(this.earExchange.name, this.earExchange.type, this.earExchangeQueue);
this.log.info(`Open EAR channel (${this.earExchange.name}) - PREFETCH x${(await this.getPluginConfig()).prefetchEAR}`);
this.rabbitQConnectionEARChannel.prefetch((await this.getPluginConfig()).prefetchEAR);
this.log.info(`Open EAR channel (${this.earExchange.name}) - ${this.earExchange.myResponseName} - LISTEN`);
let eventEARQueue = this.rabbitQConnectionEARChannel.assertQueue(`${this.rabbitQConnectionEARChannelKeyMine}-${OS.hostname()}-${this.earExchange.myResponseName}`, this.earExchangeQueue);
eventEARQueue = eventEARQueue.then(function () {
self.log.info(` - LISTEN: [${self.earExchange.myResponseName}] - LISTENING`);
});
eventEARQueue = eventEARQueue.then(function () {
self.rabbitQConnectionEARChannel.consume(`${self.rabbitQConnectionEARChannelKeyMine}-${OS.hostname()}-${self.earExchange.myResponseName}`, (msg) => {
let body = msg.content.toString();
self.rabbitQConnectionEARChannel.ack(msg);
self.log.debug(`[RECEVIED ${self.earExchange.myResponseName}]:`, body);
const bodyObj = JSON.parse(body);
self.builtInEvents.emit(bodyObj.id, bodyObj);
}, { noAck: false });
});
this.log.info(`Open EAR channel (${this.earExchange.name}) - COMPLETED`);
}
async onReturnableEvent(plugin, pluginName, event, listener) {
const self = this;
self.log.info(plugin, ` - LISTEN EAR: [${self.rabbitQConnectionEARChannelKey}-${pluginName || plugin}-${event}]`);
let ok = self.rabbitQConnectionEARChannel.assertQueue(`${self.rabbitQConnectionEARChannelKey}-${pluginName || plugin}-${event}`, self.earExchangeQueue);
ok = ok.then(function () {
self.log.info(plugin, ` - LISTEN EAR: [${self.rabbitQConnectionEARChannelKey}-${pluginName || plugin}-${event}] - LISTENING`);
});
//ok = ok.then(function () { channel.prefetch(1); });
ok = ok.then(function () {
self.rabbitQConnectionEARChannel.consume(`${self.rabbitQConnectionEARChannelKey}-${pluginName || plugin}-${event}`, (msg) => {
let body = msg.content.toString();
const bodyObj = JSON.parse(body);
listener((x) => {
self.rabbitQConnectionEARChannel.ack(msg);
self.log.debug(plugin, ` - RETURN OKAY: [${self.rabbitQConnectionEARChannelKeyMine}-${pluginName || plugin}-${event}]`, bodyObj);
self._emitEventAsync(self.rabbitQConnectionEARChannelKeyMine, self.earExchangeQueue, plugin, bodyObj.plugin, bodyObj.topic, self.rabbitQConnectionEARChannel, {
data: x,
id: bodyObj.id,
resultSuccess: true
}).then(self.log.debug).catch(self.log.fatal);
}, (x) => {
self.rabbitQConnectionEARChannel.ack(msg);
self.log.debug(plugin, ` - RETURN ERROR: [${self.rabbitQConnectionEARChannelKeyMine}-${pluginName || plugin}-${event}]`, bodyObj);
self._emitEventAsync(self.rabbitQConnectionEARChannelKeyMine, self.earExchangeQueue, plugin, bodyObj.plugin, bodyObj.topic, self.rabbitQConnectionEARChannel, {
data: x,
id: bodyObj.id,
resultSuccess: false
}).then(self.log.debug).catch(self.log.fatal);
}, bodyObj.data);
}, { noAck: false });
});
}
emitEventAndReturn(plugin, pluginName, event, data, timeoutSeconds = 10) {
const self = this;
this.log.debug(plugin, ` - EMIT EAR: [${self.rabbitQConnectionEARChannelKey}-${pluginName || plugin}-${event}]`, data);
return new Promise((resolve, reject) => {
const resultKey = `${(0, uuid_1.v4)()}-${new Date().getTime()}${Math.random()}`;
const xtimeoutSeconds = timeoutSeconds || 10;
const additionalArgs = {
"$$TIME": new Date().getTime(),
expiration: (xtimeoutSeconds * 1000) + 5000,
};
if (additionalArgs.expiration >= self.earExchangeQueue)
return reject(`TTL CANNOT BE GREATER THAN: ${self.earExchangeQueue - 2}ms`);
let qArguments = JSON.parse(JSON.stringify(self.earExchangeQueue));
for (let iKey of Object.keys(additionalArgs))
qArguments[iKey] = additionalArgs[iKey];
const listener = (data) => {
if (timeoutTimer === null)
return this.log.debug(plugin, ` - REC EAR TOO LATE: [${self.rabbitQConnectionEARChannelKey}-${pluginName || plugin}-${event}]`, data.resultSuccess ? 'SUCCESS' : 'ERRORED', data);
this.log.debug(plugin, ` - REC EAR: [${self.rabbitQConnectionEARChannelKey}-${pluginName || plugin}-${event}]`, data.resultSuccess ? 'SUCCESS' : 'ERRORED', data);
clearTimeout(timeoutTimer);
timeoutTimer = null;
if (data.resultSuccess)
resolve(data.data);
else
reject(data.data);
};
let timeoutTimer = setTimeout(() => {
if (timeoutTimer === null)
return;
clearTimeout(timeoutTimer);
timeoutTimer = null;
self.builtInEvents.removeListener(resultKey, listener);
self.log.debug(plugin, ` - EMIT AR: [${self.rabbitQConnectionEARChannelKey}-${pluginName || plugin}-${event}-${resultKey}]`, 'TIMED OUT');
reject(`NO RESPONSE IN TIME: ${self.rabbitQConnectionEARChannelKey}-${pluginName || plugin}/${resultKey} x${(data || {}).timeoutSeconds || 10}s`);
}, timeoutSeconds * 1000);
self.builtInEvents.once(resultKey, listener);
self._emitEventAsync(self.rabbitQConnectionEARChannelKey, self.earExchangeQueue, plugin, pluginName, event, self.rabbitQConnectionEARChannel, {
id: resultKey,
data: data,
plugin: OS.hostname(),
topic: self.earExchange.myResponseName
}, qArguments).then(self.log.debug).catch(self.log.fatal);
self.log.debug(plugin, ` - EMIT EAR: [${self.rabbitQConnectionEARChannelKey}-${pluginName || plugin}-${event}-${resultKey}] - EMITTED`, data);
});
const xtimeoutSeconds = timeoutSeconds || 10;
return this.rabbitQConnection.getReply(`${this.rabbitQConnectionEARChannelKey}-${pluginName || plugin}-${event}`, data, {
persistent: false,
expiration: ((xtimeoutSeconds * 1000) + 5000),
appId: this.appId,
correlationId: this.appId + (0, crypto_1.randomUUID)()
}, '', xtimeoutSeconds * 1000);
}

@@ -256,0 +118,0 @@ }

export interface IPluginConfig {
prefetch: number;
prefetchEAR: number;
endpoint: string;
credentials: IPluginConfig_Credentials;
noRandomDebugName: boolean;
uniqueId: string | null;
}

@@ -15,3 +12,2 @@ export interface IPluginConfig_Credentials {

prefetch: number;
prefetchEAR: number;
endpoint: string;

@@ -22,5 +18,3 @@ credentials: {

};
noRandomDebugName: boolean;
uniqueId: null;
};
export default _default;

@@ -5,4 +5,3 @@ "use strict";

return {
prefetch: 10,
prefetchEAR: 10,
prefetch: 2,
endpoint: "amqp://localhost",

@@ -13,6 +12,4 @@ credentials: {

},
noRandomDebugName: false,
uniqueId: null
};
};
//# sourceMappingURL=sec.config.js.map

@@ -1,1 +0,1 @@

{"name":"@bettercorp/service-base-plugin-events-rabbitmq","license":"AGPL-3.0-only","repository":{"url":"https://gitlab.com/BetterCorp/public/service-base-plugin-events-rabbitmq"},"scripts":{"dev":"nodemon --config node_modules/@bettercorp/service-base/build/nodemon.json","start":"ts-node node_modules/@bettercorp/service-base/lib/index.js","build":"tsc","deploy":"npm publish","version":"node ./node_modules/@bettercorp/service-base/build/version-bcorp.js $0","create":"ts-node node_modules/@bettercorp/service-base/lib/bootstrap.js $0"},"files":["lib/**/*","installer.js"],"main":"lib/index.js","version":"2.0.20211208003051","dependencies":{"@bettercorp/service-base":"^6.0.20211116225325","@bettercorp/tools":"^1.2.20211101100533","@types/amqplib":"^0.8.2","@types/node":"^16.11.7","@types/uuid":"^8.3.1","amqplib":"^0.8.0","uuid":"^8.3.2"},"devDependencies":{"@typescript-eslint/eslint-plugin":"^5.4.0","@typescript-eslint/parser":"^5.4.0","eslint":"^8.2.0","ts-node":"^10.4.0","typescript":"^4.4.4"},"bsb_project":true}
{"name":"@bettercorp/service-base-plugin-events-rabbitmq","license":"AGPL-3.0-only","repository":{"url":"https://gitlab.com/BetterCorp/BetterServiceBase/service-base-plugin-events-rabbitmq"},"scripts":{"dev":"nodemon --config node_modules/@bettercorp/service-base/build/nodemon.json","start":"ts-node node_modules/@bettercorp/service-base/lib/index.js","build":"tsc","version":"node ./node_modules/@bettercorp/service-base/build/version-bcorp.js $0","create":"ts-node node_modules/@bettercorp/service-base/lib/bootstrap.js $0"},"files":["lib/**/*"],"main":"lib/index.js","version":"3.0.20211220200946","dependencies":{"@bettercorp/service-base":"^6.1.20211220164149","@bettercorp/tools":"^1.2.20211101100533","@types/amqplib":"^0.8.2","@types/node":"^16.11.14","@types/uuid":"^8.3.1","amqplib":"^0.8.0","rabbit-queue":"^5.4.1"},"devDependencies":{"@typescript-eslint/eslint-plugin":"^5.8.0","@typescript-eslint/parser":"^5.8.0","eslint":"^8.5.0","ts-node":"^10.4.0","typescript":"^4.5.4"},"bsb_project":true}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc