New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@bettercorp/service-base-plugin-events-rabbitmq

Package Overview
Dependencies
Maintainers
2
Versions
50
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@bettercorp/service-base-plugin-events-rabbitmq - npm Package Compare versions

Comparing version 2.1.20220615203957 to 2.1.20220615203962-ea.0

10

lib/plugins/events-rabbitmq/events/emit.d.ts

@@ -1,4 +0,5 @@

import { Events } from '../plugin';
import { Events } from "../plugin";
export declare class emit {
private uSelf;
private privateQueuesSetup;
private publishChannel;

@@ -11,6 +12,7 @@ private receiveChannel;

init(uSelf: Events): Promise<void>;
onEvent<T = any>(callerPluginName: string, pluginName: string | null, event: string, listener: {
(data: T): Promise<void>;
dispose(): void;
onEvent(callerPluginName: string, pluginName: string, event: string, listener: {
(args: Array<any>): Promise<void>;
}): Promise<void>;
emitEvent<T = any>(callerPluginName: string, pluginName: string | null, event: string, data?: T): Promise<void>;
emitEvent(callerPluginName: string, pluginName: string, event: string, args: Array<any>): Promise<void>;
}

81

lib/plugins/events-rabbitmq/events/emit.js
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.emit = void 0;
const sec_config_1 = require("../sec.config");
const events_1 = require("events");
const lib_1 = require("./lib");
class emit {
constructor() {
this.channelKey = "2eq";
this.privateQueuesSetup = [];
this.channelKey = "8eq";
this.exchange = {
type: 'fanout',
name: 'better-service2-emit'
type: "fanout",
name: "better-service8-emit",
};
this.exchangeOpts = {
durable: true,
//exclusive: true,
autoDelete: true,

@@ -22,4 +20,4 @@ };

autoDelete: true,
messageTtl: (60 * 60) * 1000,
expires: (60 * 60) * 1000, // 1h
messageTtl: 60 * 60 * 1000,
expires: 60 * 60 * 1000,
};

@@ -29,38 +27,53 @@ }

this.uSelf = uSelf;
this.uSelf.log.info(`Open emit channel (${this.exchange.name})`);
this.uSelf.log.debug(`Open emit channel ({exchangeName})`, {
exchangeName: this.exchange.name,
});
this.publishChannel = await lib_1.LIB.setupChannel(uSelf, uSelf.publishConnection, this.channelKey, this.exchange.name, this.exchange.type, this.exchangeOpts);
this.receiveChannel = await lib_1.LIB.setupChannel(uSelf, uSelf.receiveConnection, this.channelKey, this.exchange.name, this.exchange.type, this.exchangeOpts, 5);
}
dispose() {
this.publishChannel.close();
this.receiveChannel.close();
}
async onEvent(callerPluginName, pluginName, event, listener) {
const self = this;
const thisQueueKey = lib_1.LIB.getQueueKey(this.channelKey, callerPluginName, pluginName, event);
self.uSelf.log.info(callerPluginName, ` - LISTEN: [${thisQueueKey}]`);
await self.receiveChannel.assertQueue(thisQueueKey, self.queueOpts);
self.uSelf.log.info(callerPluginName, ` - LISTEN: [${thisQueueKey}] - LISTENING`);
await self.receiveChannel.consume(thisQueueKey, async (msg) => {
let body = msg.content.toString();
const bodyObj = JSON.parse(body);
try {
await listener(bodyObj);
self.receiveChannel.ack(msg);
}
catch (err) {
self.receiveChannel.reject(msg, true);
}
}, { noAck: false });
self.uSelf.log.debug(`{callerPluginName} - LISTEN: [{thisQueueKey}]`, {
callerPluginName,
thisQueueKey,
});
await self.receiveChannel.addSetup(async (iChannel) => {
await iChannel.assertQueue(thisQueueKey, self.queueOpts);
self.uSelf.log.debug(`{callerPluginName} - LISTEN: [{thisQueueKey}] - LISTENING`, { callerPluginName, thisQueueKey });
await self.receiveChannel.consume(thisQueueKey, async (msg) => {
let body = msg.content.toString();
const bodyObj = JSON.parse(body);
try {
await listener(bodyObj);
self.receiveChannel.ack(msg);
}
catch (err) {
self.receiveChannel.nack(msg, true);
}
}, { noAck: false });
});
}
async emitEvent(callerPluginName, pluginName, event, data) {
let dataType = sec_config_1.DataType[typeof data];
if (data instanceof events_1.EventEmitter) {
this.uSelf.log.fatal('We cannot emit streams. Only emitAndReturn');
throw new Error('Not supported transmitting streams this way');
async emitEvent(callerPluginName, pluginName, event, args) {
const self = this;
const thisQueueKey = lib_1.LIB.getQueueKey(this.channelKey, callerPluginName, pluginName, event);
this.uSelf.log.debug(`{callerPluginName} - EMIT: [{thisQueueKey}]`, {
callerPluginName,
thisQueueKey,
});
if (self.privateQueuesSetup.indexOf(thisQueueKey) < 0) {
self.privateQueuesSetup.push(thisQueueKey);
await self.publishChannel.addSetup(async (iChannel) => {
await iChannel.assertQueue(thisQueueKey, self.queueOpts);
});
}
const thisQueueKey = lib_1.LIB.getQueueKey(this.channelKey, callerPluginName, pluginName, event);
this.uSelf.log.debug(callerPluginName, ` - EMIT: [${thisQueueKey}]`);
await this.publishChannel.assertQueue(thisQueueKey, this.queueOpts);
if (!this.publishChannel.sendToQueue(thisQueueKey, Buffer.from(JSON.stringify(data)), {
if (!this.publishChannel.sendToQueue(thisQueueKey, args, {
expiration: this.queueOpts.messageTtl,
contentType: dataType,
contentType: "string",
appId: this.uSelf.myId,
timestamp: new Date().getTime()
timestamp: new Date().getTime(),
}))

@@ -67,0 +80,0 @@ throw `Cannot send msg to queue [${thisQueueKey}]`;

/// <reference types="node" />
import { Events } from '../plugin';
import { EventEmitter } from 'events';
import { Events } from "../plugin";
import { EventEmitter } from "events";
export declare class emitAndReturn extends EventEmitter {
private uSelf;
private privateQueuesSetup;
private publishChannel;

@@ -15,6 +16,7 @@ private receiveChannel;

init(uSelf: Events): Promise<void>;
onReturnableEvent<ArgsDataType = any, ReturnDataType = any>(callerPluginName: string, pluginName: string, event: string, listener: {
(data: ArgsDataType): Promise<ReturnDataType>;
dispose(): void;
onReturnableEvent(callerPluginName: string, pluginName: string, event: string, listener: {
(args: Array<any>): Promise<any>;
}): Promise<void>;
emitEventAndReturn<ArgsDataType = any, ReturnDataType = any>(callerPluginName: string, pluginName: string, event: string, data: ArgsDataType, timeoutSeconds?: number): Promise<ReturnDataType>;
emitEventAndReturn(callerPluginName: string, pluginName: string, event: string, timeoutSeconds: number, args: Array<any>): Promise<any>;
}
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.emitAndReturn = void 0;
const sec_config_1 = require("../sec.config");
const events_1 = require("events");

@@ -11,11 +10,11 @@ const crypto_1 = require("crypto");

super(...arguments);
this.channelKey = "2ar";
this.myChannelKey = "2kr";
this.privateQueuesSetup = [];
this.channelKey = "8ar";
this.myChannelKey = "8kr";
this.exchange = {
type: 'direct',
name: 'better-service2-ear'
type: "direct",
name: "better-service8-ear",
};
this.exchangeOpts = {
durable: false,
//exclusive: true,
autoDelete: true,

@@ -27,3 +26,3 @@ };

messageTtl: 60 * 1000,
expires: 60000, // 60s
expires: 60000,
};

@@ -35,3 +34,3 @@ this.myQueueOpts = {

messageTtl: 60 * 1000,
expires: 60000, // 60s
expires: 60000,
};

@@ -43,93 +42,119 @@ }

const myEARQueueKey = lib_1.LIB.getSpecialQueueKey(this.myChannelKey, this.uSelf.myId);
this.uSelf.log.info(`Ready my events name: ${myEARQueueKey}`);
this.uSelf.log.debug(`Ready my events name: {myEARQueueKey}`, {
myEARQueueKey,
});
this.publishChannel = await lib_1.LIB.setupChannel(uSelf, uSelf.publishConnection, this.myChannelKey, this.exchange.name, this.exchange.type, this.exchangeOpts);
this.receiveChannel = await lib_1.LIB.setupChannel(uSelf, uSelf.receiveConnection, this.myChannelKey, this.exchange.name, this.exchange.type, this.exchangeOpts, 2);
await this.receiveChannel.assertQueue(myEARQueueKey, this.myQueueOpts);
this.uSelf.log.info(` - LISTEN: [${myEARQueueKey}] - LISTEN`);
await this.receiveChannel.consume(myEARQueueKey, (msg) => {
if (msg === null)
return self.uSelf.log.debug(`[RECEVIED ${myEARQueueKey}]... as null`);
try {
let body = msg.content.toString();
self.uSelf.log.debug(`[RECEVIED ${myEARQueueKey}]`);
self.emit(msg.properties.correlationId, JSON.parse(body));
self.receiveChannel.ack(msg);
}
catch (exc) {
self.uSelf.log.fatal(exc);
}
}, { noAck: false });
this.uSelf.log.info(` - LISTEN: [${myEARQueueKey}] - LISTENING`);
this.uSelf.log.info(`Ready my events name: ${myEARQueueKey} OKAY`);
await this.receiveChannel.addSetup(async (iChannel) => {
await iChannel.assertQueue(myEARQueueKey, self.myQueueOpts);
self.uSelf.log.debug(`LISTEN: [{myEARQueueKey}]`, { myEARQueueKey });
await iChannel.consume(myEARQueueKey, (msg) => {
if (msg === null)
return self.uSelf.log.warn(`[RECEVIED {myEARQueueKey}]... as null`, { myEARQueueKey });
try {
let body = msg.content.toString();
self.uSelf.log.debug(`[RECEVIED {myEARQueueKey}]`, {
myEARQueueKey,
});
self.emit(msg.properties.correlationId, JSON.parse(body));
iChannel.ack(msg);
}
catch (exc) {
self.uSelf.log.fatal("AMPQ Consumed exception: {eMsg}", {
eMsg: exc.message || exc.toString(),
});
}
}, { noAck: false });
self.uSelf.log.debug(`LISTEN: [{myEARQueueKey}]`, { myEARQueueKey });
self.uSelf.log.debug(`Ready my events name: {myEARQueueKey} OKAY`, {
myEARQueueKey,
});
});
}
dispose() {
this.publishChannel.close();
this.receiveChannel.close();
}
async onReturnableEvent(callerPluginName, pluginName, event, listener) {
const self = this;
const queueKey = lib_1.LIB.getQueueKey(this.channelKey, callerPluginName, pluginName, event);
self.uSelf.log.info(callerPluginName, ` EAR: ${callerPluginName} listen ${queueKey}`);
await self.receiveChannel.assertQueue(queueKey, self.queueOpts);
await self.receiveChannel.consume(queueKey, async (msg) => {
if (msg === null)
return self.uSelf.log.error('Message received on my EAR queue was null...');
const returnQueue = lib_1.LIB.getSpecialQueueKey(this.myChannelKey, msg.properties.appId);
self.uSelf.log.info(callerPluginName, `EAR: ${callerPluginName} Received: ${queueKey} from ${returnQueue}`);
let body = msg.content.toString();
const bodyObj = JSON.parse(body);
try {
const response = (await listener(bodyObj)) || null;
self.receiveChannel.ack(msg);
self.uSelf.log.info(callerPluginName, `EAR: ${callerPluginName} OKAY: ${queueKey} -> ${returnQueue}`);
if (!self.publishChannel.sendToQueue(returnQueue, Buffer.from(JSON.stringify(response)), {
expiration: 5000,
correlationId: `${msg.properties.correlationId}-resolve`,
contentType: sec_config_1.DataType[typeof response],
appId: self.uSelf.myId,
timestamp: new Date().getTime()
}))
throw `Cannot send msg to queue [${returnQueue}]`;
}
catch (exc) {
self.receiveChannel.ack(msg);
self.uSelf.log.info(callerPluginName, `EAR: ${callerPluginName} ERROR: ${queueKey} -> ${returnQueue}`);
if (!self.publishChannel.sendToQueue(returnQueue, Buffer.from(JSON.stringify(exc)), {
expiration: 5000,
correlationId: `${msg.properties.correlationId}-reject`,
contentType: sec_config_1.DataType[typeof exc],
appId: self.uSelf.myId,
timestamp: new Date().getTime()
}))
throw `Cannot send msg to queue [${returnQueue}]`;
}
}, { noAck: false });
self.uSelf.log.info(callerPluginName, ` EAR: ${callerPluginName} listening ${queueKey}`);
self.uSelf.log.debug(` EAR: {callerPluginName} listen {queueKey}`, {
callerPluginName,
queueKey,
});
await self.receiveChannel.addSetup(async (iChannel) => {
await iChannel.assertQueue(queueKey, self.queueOpts);
await iChannel.consume(queueKey, async (msg) => {
if (msg === null)
return self.uSelf.log.error("Message received on my EAR queue was null...");
const returnQueue = lib_1.LIB.getSpecialQueueKey(this.myChannelKey, msg.properties.appId);
self.uSelf.log.debug(`EAR: {callerPluginName} Received: {queueKey} from {returnQueue}`, { callerPluginName, queueKey, returnQueue });
let body = msg.content.toString();
const bodyObj = JSON.parse(body);
try {
const response = (await listener(bodyObj));
iChannel.ack(msg);
self.uSelf.log.debug(`EAR: {callerPluginName} OKAY: {queueKey} -> {returnQueue}`, { callerPluginName, queueKey, returnQueue });
if (!self.publishChannel.sendToQueue(returnQueue, response, {
expiration: 5000,
correlationId: `${msg.properties.correlationId}-resolve`,
contentType: "string",
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${returnQueue}]`;
}
catch (exc) {
iChannel.ack(msg);
self.uSelf.log.error(`EAR: {callerPluginName} ERROR: {queueKey} -> {returnQueue}`, { callerPluginName, queueKey, returnQueue });
if (!self.publishChannel.sendToQueue(returnQueue, exc, {
expiration: 5000,
correlationId: `${msg.properties.correlationId}-reject`,
contentType: "string",
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${returnQueue}]`;
}
}, { noAck: false });
self.uSelf.log.debug(`EAR: {callerPluginName} listening {queueKey}`, {
callerPluginName,
queueKey,
});
});
}
;
emitEventAndReturn(callerPluginName, pluginName, event, data, timeoutSeconds = 5) {
async emitEventAndReturn(callerPluginName, pluginName, event, timeoutSeconds, args) {
const self = this;
const resultKey = `${(0, crypto_1.randomUUID)()}-${new Date().getTime()}${Math.random()}`;
const queueKey = lib_1.LIB.getQueueKey(this.channelKey, callerPluginName, pluginName, event);
this.uSelf.log.info(`EAR: ${callerPluginName} emitting ${queueKey} (${resultKey})`);
return new Promise(async (resolve, reject) => {
this.uSelf.log.debug(`EAR: {callerPluginName} emitting {queueKey} ({resultKey})`, { callerPluginName, queueKey, resultKey });
if (self.privateQueuesSetup.indexOf(queueKey) < 0) {
self.privateQueuesSetup.push(queueKey);
await self.publishChannel.addSetup(async (iChannel) => {
await iChannel.assertQueue(queueKey, self.queueOpts);
});
}
return await new Promise(async (resolve, reject) => {
let timeoutHandler = setTimeout(() => {
self.removeAllListeners(`${resultKey}-resolve`);
self.removeAllListeners(`${resultKey}-reject`);
reject('Timeout');
reject("Timeout");
}, timeoutSeconds * 1000);
self.once(`${resultKey}-resolve`, (args) => {
self.once(`${resultKey}-resolve`, (rargs) => {
clearTimeout(timeoutHandler);
resolve(args);
resolve(rargs);
});
self.once(`${resultKey}-reject`, (args) => {
self.once(`${resultKey}-reject`, (rargs) => {
clearTimeout(timeoutHandler);
reject(args);
reject(rargs);
});
await self.publishChannel.assertQueue(queueKey, self.queueOpts);
if (!self.publishChannel.sendToQueue(queueKey, Buffer.from(JSON.stringify(data)), {
expiration: (timeoutSeconds * 1000) + 5000,
if (!self.publishChannel.sendToQueue(queueKey, args, {
expiration: timeoutSeconds * 1000 + 5000,
correlationId: resultKey,
contentType: sec_config_1.DataType[typeof data],
contentType: "string",
appId: self.uSelf.myId,
timestamp: new Date().getTime()
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${queueKey}]`;
self.uSelf.log.info(`EAR: ${callerPluginName} emitted ${queueKey} (${resultKey})`);
self.uSelf.log.debug(`EAR: {callerPluginName} emitted {queueKey} ({resultKey})`, { callerPluginName, queueKey, resultKey });
});

@@ -136,0 +161,0 @@ }

/// <reference types="node" />
import { Readable } from "stream";
/// <reference types="node" />
import { EventEmitter, Readable } from "stream";
import { Events } from "../plugin";
export declare class emitStreamAndReceiveStream {
export declare class emitStreamAndReceiveStream extends EventEmitter {
private readonly staticCommsTimeout;
private uSelf;
private publishChannel;
private receiveChannel;
private eventsChannel;
private streamChannel;
private readonly eventsChannelKey;
private readonly senderChannelKey;
private readonly streamChannelKey;

@@ -16,3 +15,7 @@ private readonly exchange;

private readonly queueOpts;
private get myEventsQueueKey();
private get myStreamQueueKey();
private cleanupSelf;
init(uSelf: Events): Promise<void>;
dispose(): void;
setupChannelsIfNotSetup(): Promise<void>;

@@ -22,3 +25,3 @@ receiveStream(callerPluginName: string, listener: {

}, timeoutSeconds?: number): Promise<string>;
sendStream(callerPluginName: string, streamId: string, stream: Readable): Promise<void>;
sendStream(callerPluginName: string, streamIdf: string, stream: Readable): Promise<void>;
}

@@ -7,12 +7,11 @@ "use strict";

const lib_1 = require("./lib");
class emitStreamAndReceiveStream {
class emitStreamAndReceiveStream extends stream_1.EventEmitter {
constructor() {
// If we try receive or send a stream and the other party is not ready for some reason, we will automatically timeout in 5s.
this.staticCommsTimeout = 30000; //1000;
this.eventsChannelKey = "2se";
this.senderChannelKey = "2ss";
this.streamChannelKey = "2sc";
super(...arguments);
this.staticCommsTimeout = 30000;
this.eventsChannelKey = "8se";
this.streamChannelKey = "8sd";
this.exchange = {
type: "direct",
name: "better-service2-ers",
name: "better-service8-ers",
};

@@ -27,15 +26,104 @@ this.exchangeOpts = {

messageTtl: 60000,
expires: 60000, // 60s
expires: 60000,
};
}
get myEventsQueueKey() {
return lib_1.LIB.getSpecialQueueKey(this.eventsChannelKey, this.uSelf.myId);
}
get myStreamQueueKey() {
return lib_1.LIB.getSpecialQueueKey(this.streamChannelKey, this.uSelf.myId);
}
cleanupSelf(streamId, key) {
this.removeAllListeners(this.eventsChannelKey + key + streamId);
this.removeAllListeners(this.streamChannelKey + key + streamId);
}
async init(uSelf) {
this.uSelf = uSelf;
}
dispose() {
this.removeAllListeners();
if (this.eventsChannel !== undefined)
this.eventsChannel.close();
if (this.streamChannel !== undefined)
this.streamChannel.close();
}
async setupChannelsIfNotSetup() {
if (this.publishChannel === undefined)
this.publishChannel = await lib_1.LIB.setupChannel(this.uSelf, this.uSelf.publishConnection, this.eventsChannelKey, this.exchange.name, this.exchange.type, this.exchangeOpts);
if (this.receiveChannel === undefined)
this.receiveChannel = await lib_1.LIB.setupChannel(this.uSelf, this.uSelf.receiveConnection, this.eventsChannelKey, this.exchange.name, this.exchange.type, this.exchangeOpts, 2);
if (this.streamChannel === undefined)
const self = this;
if (this.eventsChannel === undefined) {
this.eventsChannel = await lib_1.LIB.setupChannel(this.uSelf, this.uSelf.receiveConnection, this.eventsChannelKey, this.exchange.name, this.exchange.type, this.exchangeOpts, 2);
this.uSelf.log.debug(`Ready my events name: {myEARQueueKey}`, {
myEARQueueKey: this.myEventsQueueKey,
});
await this.eventsChannel.addSetup(async (iChannel) => {
await iChannel.assertQueue(self.myEventsQueueKey, self.queueOpts);
self.uSelf.log.debug(`LISTEN: [{myEARQueueKey}]`, {
myEARQueueKey: self.myEventsQueueKey,
});
await iChannel.consume(self.myEventsQueueKey, (msg) => {
if (msg === null)
return self.uSelf.log.warn(`[RECEVIED {myEARQueueKey}]... as null`, { myEARQueueKey: self.myEventsQueueKey });
try {
let body = JSON.parse(msg.content.toString());
self.uSelf.log.debug(`[RECEVIED Event {myEARQueueKey}] ({correlationId})`, {
myEARQueueKey: self.myEventsQueueKey,
correlationId: msg.properties.correlationId,
});
self.emit(self.eventsChannelKey + msg.properties.correlationId, body, () => {
iChannel.ack(msg);
}, () => {
iChannel.nack(msg);
});
}
catch (exc) {
self.uSelf.log.fatal("AMPQ Consumed exception: {eMsg}", {
eMsg: exc.message || exc.toString(),
});
}
}, { noAck: false });
self.uSelf.log.debug(`LISTEN: [{myEARQueueKey}]`, {
myEARQueueKey: self.myEventsQueueKey,
});
self.uSelf.log.debug(`Ready my events name: {myEARQueueKey} OKAY`, {
myEARQueueKey: self.myEventsQueueKey,
});
});
}
if (this.streamChannel === undefined) {
this.streamChannel = await lib_1.LIB.setupChannel(this.uSelf, this.uSelf.receiveConnection, this.streamChannelKey, this.exchange.name, this.exchange.type, this.exchangeOpts, 2);
this.uSelf.log.debug(`Ready my stream name: {myEARQueueKey}`, {
myEARQueueKey: self.myStreamQueueKey,
});
await this.streamChannel.addSetup(async (iChannel) => {
await iChannel.assertQueue(self.myStreamQueueKey, self.queueOpts);
self.uSelf.log.debug(`LISTEN: [{myEARQueueKey}]`, {
myEARQueueKey: self.myStreamQueueKey,
});
await iChannel.consume(self.myStreamQueueKey, (msg) => {
if (msg === null)
return self.uSelf.log.warn(`[RECEVIED {myEARQueueKey}]... as null`, { myEARQueueKey: self.myStreamQueueKey });
try {
let body = JSON.parse(msg.content.toString());
self.uSelf.log.debug(`[RECEVIED Stream {myEARQueueKey}]`, {
myEARQueueKey: self.myStreamQueueKey,
});
self.emit(self.streamChannelKey + "r-" + msg.properties.correlationId, body, () => {
iChannel.ack(msg);
}, () => {
iChannel.nack(msg);
});
}
catch (exc) {
self.uSelf.log.fatal("AMPQ Consumed exception: {eMsg}", {
eMsg: exc.message || exc.toString(),
});
}
}, { noAck: false });
self.uSelf.log.debug(`LISTEN: [{myEARQueueKey}]`, {
myEARQueueKey: self.myStreamQueueKey,
});
self.uSelf.log.debug(`Ready my stream name: {myEARQueueKey} OKAY`, {
myEARQueueKey: self.myStreamQueueKey,
});
});
}
}

@@ -45,11 +133,10 @@ receiveStream(callerPluginName, listener, timeoutSeconds = 5) {

let thisTimeoutMS = this.staticCommsTimeout;
const streamReturnRefId = lib_1.LIB.getLocalKey(this.senderChannelKey, streamId);
const streamEventsRefId = lib_1.LIB.getLocalKey(this.eventsChannelKey, streamId);
const streamRefId = lib_1.LIB.getLocalKey(this.streamChannelKey, streamId);
this.uSelf.log.info(`SR: ${callerPluginName} listening to ${streamId}`);
this.uSelf.log.debug(`SR: {callerPluginName} listening to {streamId}`, {
callerPluginName,
streamId,
});
const self = this;
let dstEventsQueueKey;
return new Promise(async (resolve) => {
await self.setupChannelsIfNotSetup();
await self.receiveChannel.assertQueue(streamEventsRefId, self.queueOpts);
await self.streamChannel.assertQueue(streamRefId, self.queueOpts);
let stream = null;

@@ -63,6 +150,7 @@ let lastResponseTimeoutHandler = null;

const cleanup = async () => {
self.cleanupSelf(streamId, "r-");
createTimeout = (e) => {
self.uSelf.log.debug("voided timeout creator: " + e);
self.uSelf.log.debug("voided timeout creator: {e}", { e });
};
self.uSelf.log.debug("Cleanup stuff");
self.uSelf.log.debug("Cleanup stuffR");
if (receiptTimeoutHandler !== null) {

@@ -77,4 +165,2 @@ clearTimeout(receiptTimeoutHandler);

lastResponseTimeoutCount = -2;
await self.receiveChannel.deleteQueue(streamEventsRefId);
await self.streamChannel.deleteQueue(streamRefId);
if (stream !== null && !stream.destroyed) {

@@ -88,12 +174,12 @@ stream.destroy();

await cleanup();
if (!self.publishChannel.sendToQueue(streamReturnRefId, Buffer.from(JSON.stringify({
if (!(await self.eventsChannel.sendToQueue(dstEventsQueueKey, {
type: "timeout",
data: err,
})), {
}, {
expiration: self.queueOpts.messageTtl,
correlationId: streamId,
correlationId: "s-" + streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${streamRefId}]`;
})))
throw `Cannot send msg to queue [${dstEventsQueueKey}]`;
await listener(err, null);

@@ -112,14 +198,14 @@ }, thisTimeoutMS);

const err = new Error("Receive Active Timeout");
self.uSelf.log.error(err);
self.uSelf.log.error("Receive Active Timeout");
await cleanup();
if (!self.publishChannel.sendToQueue(streamReturnRefId, Buffer.from(JSON.stringify({
if (!(await self.eventsChannel.sendToQueue(dstEventsQueueKey, {
type: "timeout",
data: err,
})), {
}, {
expiration: self.queueOpts.messageTtl,
correlationId: streamId,
correlationId: "s-" + streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${streamRefId}]`;
})))
throw `Cannot send msg to queue [${dstEventsQueueKey}]`;
await listener(err, null);

@@ -143,54 +229,72 @@ };

thisTimeoutMS = timeoutSeconds * 1000;
if (!self.publishChannel.sendToQueue(streamReturnRefId, Buffer.from(JSON.stringify({ type: "receipt", timeout: thisTimeoutMS })), {
if (!(await self.eventsChannel.sendToQueue(dstEventsQueueKey, { type: "receipt", timeout: thisTimeoutMS }, {
expiration: self.queueOpts.messageTtl,
correlationId: streamId,
correlationId: "s-" + streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${streamReturnRefId}] ${streamId}`;
})))
throw `Cannot send msg to queue [${dstEventsQueueKey}] ${streamId}`;
try {
stream = new stream_1.Readable({
objectMode: true,
read() {
if (!self.publishChannel.sendToQueue(streamReturnRefId, Buffer.from(JSON.stringify({ type: "read" })), {
async read() {
if (!(await self.eventsChannel.sendToQueue(dstEventsQueueKey, { type: "read" }, {
expiration: self.queueOpts.messageTtl,
correlationId: streamId,
correlationId: "s-" + streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${streamReturnRefId}] ${streamId}`;
})))
throw `Cannot send msg to queue [${dstEventsQueueKey}] ${streamId}`;
},
});
self.uSelf.log.debug(`[R RECEVIED ${streamRefId}] ${streamId}`);
self.uSelf.log.debug(`[R RECEVIED {streamRefId}] {streamId}`, {
streamRefId: dstEventsQueueKey,
streamId,
});
let eventsToListenTo = ["error", "end"];
for (let evnt of eventsToListenTo)
stream.on(evnt, async (e, b) => {
if (evnt === "end")
await cleanup();
if (b === "RECEIVED")
return;
if (!self.publishChannel.sendToQueue(streamReturnRefId, Buffer.from(JSON.stringify({
if (!(await self.eventsChannel.sendToQueue(dstEventsQueueKey, {
type: "event",
event: evnt,
data: e || null,
})), {
}, {
expiration: self.queueOpts.messageTtl,
correlationId: streamId,
correlationId: "s-" + streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${streamReturnRefId}] ${streamId}`;
}))) {
throw `Cannot send msg to queue [${dstEventsQueueKey}] ${streamId}`;
}
if (evnt === "end") {
await cleanup();
}
});
await self.streamChannel.consume(streamRefId, async (sMsg) => {
if (sMsg === null)
return self.uSelf.log.debug(`[R RECEVIED ${streamRefId}]... as null`);
if (sMsg.properties.correlationId === "event") {
let data = JSON.parse(sMsg.content.toString());
stream.emit(data.event, data.data || null, "RECEIVED");
self.streamChannel.ack(sMsg);
self.on(self.streamChannelKey + "r-" + streamId, async (data, ack, nack) => {
if (data === null) {
nack();
return self.uSelf.log.debug(`[R RECEVIED {streamId}]... as null`, { streamId });
}
if (!(await self.eventsChannel.sendToQueue(dstEventsQueueKey, {
type: "receipt",
timeout: thisTimeoutMS,
}, {
expiration: self.queueOpts.messageTtl,
correlationId: "s-" + streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
})))
throw `Cannot send msg to queue [${dstEventsQueueKey}] ${streamId}`;
if (data.type === "event") {
stream.emit(data.event, data.data !== undefined ? data.data : null);
ack();
return;
}
stream.push(sMsg.content);
self.streamChannel.ack(sMsg);
}, { noAck: false });
if (data.type === "data") {
stream.push(Buffer.from(data.data));
ack();
return;
}
nack();
});
listener(null, stream)

@@ -201,5 +305,4 @@ .then(async () => {

.catch(async (x) => {
self.uSelf.log.error("Stream NOT OK", x);
await cleanup();
self.uSelf.log.fatal(x);
self.uSelf.log.fatal("Stream NOT OK: {e}", { e: x.message });
});

@@ -209,7 +312,6 @@ }

await cleanup();
self.uSelf.log.fatal(exc);
self.uSelf.log.fatal("Stream NOT OK: {e}", { e: exc.message || exc });
}
};
await self.receiveChannel.consume(streamEventsRefId, async (baseMsg) => {
console.log(`streamEventsRefId Received`);
self.on(self.eventsChannelKey + "r-" + streamId, async (data, ack, nack) => {
if (receiptTimeoutHandler !== null) {

@@ -220,36 +322,38 @@ clearTimeout(receiptTimeoutHandler);

updateLastResponseTimer();
if (baseMsg === null)
return self.uSelf.log.debug(`[R RECEVIED ${streamEventsRefId}]... as null`);
let data = JSON.parse(baseMsg.content.toString());
console.log(`streamEventsRefId Received:`, data);
self.receiveChannel.ack(baseMsg);
if (data === null)
return self.uSelf.log.debug(`[R RECEVIED {streamEventsRefId}]... as null`, { streamEventsRefId: dstEventsQueueKey });
if (data.type === "timeout") {
await cleanup();
listener(data.data, null);
ack();
return;
}
if (data.type === "event") {
stream.emit(data.event, data.data || null, "RECEIVED");
return;
}
if (data.type === "start") {
self.uSelf.log.info("Readying to stream");
self.uSelf.log.debug("Readying to stream from: {fromId}", {
fromId: data.myId,
});
dstEventsQueueKey = lib_1.LIB.getSpecialQueueKey(this.eventsChannelKey, data.myId);
await startStream();
self.uSelf.log.info("Starting to stream");
self.uSelf.log.debug("Starting to stream");
ack();
return;
}
}, { noAck: false });
resolve(streamId);
nack();
});
resolve(`${this.uSelf.myId}||${streamId}||${timeoutSeconds}`);
});
}
sendStream(callerPluginName, streamId, stream) {
sendStream(callerPluginName, streamIdf, stream) {
if (streamIdf.split("||").length !== 3)
throw "invalid stream ID";
let streamReceiverId = streamIdf.split("||")[0];
let streamId = streamIdf.split("||")[1];
let streamTimeoutS = Number.parseInt(streamIdf.split("||")[2]);
let thisTimeoutMS = this.staticCommsTimeout;
const dstEventsQueueKey = lib_1.LIB.getSpecialQueueKey(this.eventsChannelKey, streamReceiverId);
const dstStreamQueueKey = lib_1.LIB.getSpecialQueueKey(this.streamChannelKey, streamReceiverId);
const self = this;
const streamReturnRefId = lib_1.LIB.getLocalKey(this.senderChannelKey, streamId);
const streamEventsRefId = lib_1.LIB.getLocalKey(this.eventsChannelKey, streamId);
const streamRefId = lib_1.LIB.getLocalKey(this.streamChannelKey, streamId);
let thisTimeoutMS = self.staticCommsTimeout;
this.uSelf.log.info(`SS: ${callerPluginName} emitting ${streamEventsRefId}`);
this.uSelf.log.info(`SS: {callerPluginName} emitting to {dstEventsQueueKey}/{dstStreamQueueKey}`, { callerPluginName, dstEventsQueueKey, dstStreamQueueKey });
return new Promise(async (resolveI, rejectI) => {
await self.setupChannelsIfNotSetup();
await self.receiveChannel.assertQueue(streamReturnRefId, self.queueOpts);
let lastResponseTimeoutHandler = null;

@@ -261,3 +365,4 @@ let lastResponseTimeoutCount = 1;

const cleanup = async (eType, e) => {
self.uSelf.log.debug("cleanup:", eType);
self.uSelf.log.debug("cleanup: {eType}", { eType });
self.cleanupSelf(streamId, "s-");
stream.destroy(e);

@@ -270,3 +375,2 @@ if (receiptTimeoutHandler !== null)

lastResponseTimeoutHandler = null;
await self.receiveChannel.deleteQueue(streamReturnRefId);
};

@@ -296,12 +400,12 @@ const reject = async (e) => {

await cleanup("active-timeout");
if (!self.publishChannel.sendToQueue(streamReturnRefId, Buffer.from(JSON.stringify({
if (!(await self.eventsChannel.sendToQueue(dstEventsQueueKey, {
type: "timeout",
data: err,
})), {
}, {
expiration: self.queueOpts.messageTtl,
correlationId: streamId,
correlationId: "r-" + streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${streamRefId}]`;
})))
throw `Cannot send msg to queue [${dstEventsQueueKey}]`;
rejectI(err);

@@ -315,19 +419,17 @@ };

};
let eventsToListenTo = ["error", "close", "end"];
let eventsToListenTo = ["error", "end"];
for (let evnt of eventsToListenTo) {
stream.on(evnt, async (e, b) => {
await cleanup(evnt);
if (b === "RECEIVED")
return;
if (!self.publishChannel.sendToQueue(streamRefId, Buffer.from(JSON.stringify({ type: "event", event: evnt, data: e || null })), {
stream.on(evnt, async (e, b, ack, nack) => {
if (!(await self.streamChannel.sendToQueue(dstStreamQueueKey, { type: "event", event: evnt, data: e || null }, {
expiration: self.queueOpts.messageTtl,
correlationId: "event",
correlationId: streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${streamEventsRefId}] ${streamId}`;
}))) {
nack();
throw `Cannot send msg to queue [${dstEventsQueueKey}] ${streamId}`;
}
ack();
if (evnt === "error")
reject(e);
if (evnt === "close")
resolve();
});

@@ -338,19 +440,21 @@ }

const pushData = () => {
if (pushingData)
if (pushingData) {
self.uSelf.log.warn("Stream tried pushing data, but not ready to push data!");
return;
}
pushingData = true;
self.uSelf.log.warn("Switching to push data model.");
stream.on("data", (data) => {
if (!self.publishChannel.sendToQueue(streamRefId, data, {
stream.on("data", async (data) => {
if (!(await self.streamChannel.sendToQueue(dstStreamQueueKey, { type: "data", data }, {
expiration: self.queueOpts.messageTtl,
correlationId: "stream",
correlationId: streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
})) {
}))) {
pushingData = false;
self.uSelf.log.error(`Cannot push msg to queue [${streamRefId}] ${streamId} / switch back to poll model.`);
self.uSelf.log.error(`Cannot push msg to queue [{dstStreamQueueKey}] {streamId} / switch back to poll model.`, { dstStreamQueueKey, streamId });
}
});
};
await self.receiveChannel.consume(streamReturnRefId, async (baseMsg) => {
self.on(self.eventsChannelKey + "s-" + streamId, async (data, ack, nack) => {
if (receiptTimeoutHandler !== null) {

@@ -361,21 +465,25 @@ clearTimeout(receiptTimeoutHandler);

updateLastResponseTimer();
if (baseMsg === null)
return self.uSelf.log.debug(`[S RECEVIED ${streamEventsRefId}]... as null`);
let data = JSON.parse(baseMsg.content.toString());
self.receiveChannel.ack(baseMsg);
if (data === null) {
nack();
return self.uSelf.log.debug(`[S RECEVIED {dstEventsQueueKey}]... as null`, { dstEventsQueueKey });
}
if (data.type === "timeout") {
await reject(new Error("timeout-receiver"));
return;
return ack();
}
if (data.type === "receipt") {
thisTimeoutMS = data.timeout;
return;
return ack();
}
if (data.type === "event") {
if (data.event === "end") {
ack();
return resolve();
}
stream.emit(data.event, data.data || null, "RECEIVED");
return;
return ack();
}
if (data.type === "read") {
if (pushingData)
return;
return ack();
const readData = stream.read();

@@ -386,24 +494,29 @@ if (!stream.readable || readData === null) {

pushData();
return;
return ack();
}
streamStarted = true;
if (!self.publishChannel.sendToQueue(streamRefId, readData, {
if (!(await self.streamChannel.sendToQueue(dstStreamQueueKey, { type: "data", data: readData }, {
expiration: self.queueOpts.messageTtl,
correlationId: "stream",
correlationId: streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${streamRefId}] ${streamId}`;
}))) {
nack();
throw `Cannot send msg to queue [${dstStreamQueueKey}] ${streamId}`;
}
ack();
return;
}
}, { noAck: false });
self.uSelf.log.info(`SS: ${callerPluginName} setup, ready ${streamEventsRefId}`);
if (!self.publishChannel.sendToQueue(streamEventsRefId, Buffer.from(JSON.stringify({ type: "start" })), {
ack();
});
self.uSelf.log.info(`SS: {callerPluginName} setup, ready {streamEventsRefId}`, { callerPluginName, streamEventsRefId: dstEventsQueueKey });
if (!(await self.eventsChannel.sendToQueue(dstEventsQueueKey, { type: "start", myId: self.uSelf.myId }, {
expiration: self.queueOpts.messageTtl,
correlationId: streamId,
correlationId: "r-" + streamId,
appId: self.uSelf.myId,
timestamp: new Date().getTime(),
}))
throw `Cannot send msg to queue [${streamEventsRefId}]`;
self.uSelf.log.info(`SS: ${callerPluginName} emitted ${streamEventsRefId}`);
})))
throw `Cannot send msg to queue [${dstEventsQueueKey}]`;
thisTimeoutMS = streamTimeoutS * 1000;
self.uSelf.log.info(`SS: {callerPluginName} emitted {dstEventsQueueKey} with timeout of {thisTimeoutMS}`, { callerPluginName, dstEventsQueueKey, thisTimeoutMS });
});

@@ -410,0 +523,0 @@ }

@@ -1,3 +0,3 @@

import * as amqplib from 'amqplib';
import { Events } from '../plugin';
import * as amqplib from "amqp-connection-manager";
import { Events } from "../plugin";
export declare class LIB {

@@ -7,3 +7,3 @@ static getQueueKey(channelKey: string, callerPluginName: string, pluginName: string | null, event: string, addKey?: string): string;

static getSpecialQueueKey(channelKey: string, id: string, addKey?: string): string;
static setupChannel(uSelf: Events, connection: amqplib.Connection, queueKey: string, exName: string, exType: string, exOpts: amqplib.Options.AssertExchange, prefetch?: number): Promise<amqplib.Channel>;
static setupChannel(uSelf: Events, connection: amqplib.AmqpConnectionManager, queueKey: string, exName: string, exType: string, exOpts: amqplib.Options.AssertExchange, prefetch?: number, json?: boolean): Promise<amqplib.ChannelWrapper>;
}

@@ -7,3 +7,3 @@ "use strict";

static getQueueKey(channelKey, callerPluginName, pluginName, event, addKey) {
return `${channelKey}-${pluginName || callerPluginName}-${event}${Tools_1.Tools.isNullOrUndefined(addKey) ? '' : `-${addKey}`}`;
return `${channelKey}-${pluginName || callerPluginName}-${event}${Tools_1.Tools.isNullOrUndefined(addKey) ? "" : `-${addKey}`}`;
}

@@ -14,23 +14,44 @@ static getLocalKey(channelKey, event) {

static getSpecialQueueKey(channelKey, id, addKey) {
return `${channelKey}-${id}${Tools_1.Tools.isNullOrUndefined(addKey) ? '' : `-${addKey}`}`;
return `${channelKey}-${id}${Tools_1.Tools.isNullOrUndefined(addKey) ? "" : `-${addKey}`}`;
}
static async setupChannel(uSelf, connection, queueKey, exName, exType, exOpts, prefetch) {
uSelf.log.info(`Create channel (${queueKey})`);
const channel = await connection.createChannel();
channel.on("close", () => {
uSelf.log.error(`AMQP channel (${queueKey}) close`);
uSelf.log.fatal(`AMQP Error: channel (${queueKey}) close`);
static async setupChannel(uSelf, connection, queueKey, exName, exType, exOpts, prefetch, json = true) {
return new Promise(async (resolve) => {
let returned = false;
uSelf.log.debug(`Create channel ({queueKey})`, { queueKey });
const channel = await connection.createChannel({
json,
setup: async (ichannel) => {
await ichannel.assertExchange(exName, exType, exOpts);
if (!Tools_1.Tools.isNullOrUndefined(prefetch)) {
uSelf.log.debug(`prefetch ({queueKey}) {prefetch}`, {
queueKey,
prefetch: prefetch,
});
await ichannel.prefetch(prefetch);
}
uSelf.log.debug(`setup exchange ({queueKey}) OK`, {
queueKey,
});
if (!returned) {
resolve(channel);
returned = true;
}
},
});
channel.on("close", () => {
uSelf.log.fatal(`AMQP channel ({queueKey}) close`, { queueKey });
});
channel.on("error", (err) => {
uSelf.log.error(`AMQP channel ({queueKey}) error: {err}`, {
queueKey,
err: err.message || err,
});
});
uSelf.log.debug(`Assert exchange ({queueKey}) {exName} {exType}`, {
queueKey,
exName,
exType,
});
uSelf.log.debug(`Ready ({queueKey})`, { queueKey });
});
channel.on("error", (err) => {
uSelf.log.error(`AMQP channel (${queueKey}) error`, err);
uSelf.log.fatal(`AMQP Error: channel (${queueKey}) error`, err);
});
uSelf.log.info(`Assert exchange (${queueKey}) ${exName} ${exType}`);
await channel.assertExchange(exName, exType, exOpts);
if (!Tools_1.Tools.isNullOrUndefined(prefetch)) {
uSelf.log.info(`prefetch (${queueKey}) ${prefetch}`);
await channel.prefetch(prefetch);
}
uSelf.log.info(`Ready (${queueKey})`);
return channel;
}

@@ -37,0 +58,0 @@ }

/// <reference types="node" />
import * as amqplib from 'amqplib';
import { CEvents } from '@bettercorp/service-base/lib/interfaces/events';
import { Readable } from 'stream';
export declare class Events extends CEvents {
publishConnection: amqplib.Connection;
receiveConnection: amqplib.Connection;
import * as amqplib from "amqp-connection-manager";
import { PluginConfig } from "./sec.config";
import { Readable } from "stream";
import { EventsBase } from "@bettercorp/service-base";
export declare class Events extends EventsBase<PluginConfig> {
publishConnection: amqplib.AmqpConnectionManager;
receiveConnection: amqplib.AmqpConnectionManager;
myId: string;

@@ -14,10 +15,13 @@ private ear;

private _connectToAMQP;
onEvent<T = any>(callerPluginName: string, pluginName: string | null, event: string, listener: (data: T) => Promise<void>): Promise<void>;
emitEvent<T = any>(callerPluginName: string, pluginName: string | null, event: string, data?: T): Promise<void>;
onReturnableEvent<ArgsDataType = any, ReturnDataType = any>(callerPluginName: string, pluginName: string, event: string, listener: {
(data: ArgsDataType): Promise<ReturnDataType>;
dispose(): void;
onEvent(callerPluginName: string, pluginName: string, event: string, listener: {
(args: Array<any>): Promise<void>;
}): Promise<void>;
emitEventAndReturn<ArgsDataType = any, ReturnDataType = any>(callerPluginName: string, pluginName: string, event: string, data?: ArgsDataType, timeoutSeconds?: number): Promise<ReturnDataType>;
emitEvent(callerPluginName: string, pluginName: string, event: string, args: Array<any>): Promise<void>;
onReturnableEvent(callerPluginName: string, pluginName: string, event: string, listener: {
(args: Array<any>): Promise<any>;
}): Promise<void>;
emitEventAndReturn(callerPluginName: string, pluginName: string, event: string, timeoutSeconds: number, args: Array<any>): Promise<any>;
receiveStream(callerPluginName: string, listener: (error: Error | null, stream: Readable) => Promise<void>, timeoutSeconds: number): Promise<string>;
sendStream(callerPluginName: string, streamId: string, stream: Readable): Promise<void>;
}
"use strict";
var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
var desc = Object.getOwnPropertyDescriptor(m, k);
if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
desc = { enumerable: true, get: function() { return m[k]; } };
}
Object.defineProperty(o, k2, desc);
}) : (function(o, m, k, k2) {
if (k2 === undefined) k2 = k;
o[k2] = m[k];
}));
var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
Object.defineProperty(o, "default", { enumerable: true, value: v });
}) : function(o, v) {
o["default"] = v;
});
var __importStar = (this && this.__importStar) || function (mod) {
if (mod && mod.__esModule) return mod;
var result = {};
if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k);
__setModuleDefault(result, mod);
return result;
};
Object.defineProperty(exports, "__esModule", { value: true });
exports.Events = void 0;
const amqplib = require("amqplib");
const events_1 = require("@bettercorp/service-base/lib/interfaces/events");
const amqplib = __importStar(require("amqp-connection-manager"));
const amqplibCore = __importStar(require("amqplib"));
const Tools_1 = require("@bettercorp/tools/lib/Tools");

@@ -12,3 +35,4 @@ const emit_1 = require("./events/emit");

const os_1 = require("os");
class Events extends events_1.CEvents {
const service_base_1 = require("@bettercorp/service-base");
class Events extends service_base_1.EventsBase {
constructor() {

@@ -24,18 +48,25 @@ super(...arguments);

async _connectToAMQP() {
const pluginConfig = (await this.getPluginConfig());
this.log.info(`Connect to ${pluginConfig.endpoint}`);
let socketOptions = {};
const pluginConfig = await this.getPluginConfig();
this.log.info(`Connect to {endpoints}`, {
endpoints: pluginConfig.endpoints,
});
let socketOptions = {
connectionOptions: {},
};
if (!Tools_1.Tools.isNullOrUndefined(pluginConfig.credentials)) {
socketOptions.credentials = amqplib.credentials.plain('radmin', 'TLvGnHd9a9ndmo2nBepNxFXFprQ9eCpEvXp6qKN2YPBqUVN2va');
socketOptions.connectionOptions.credentials =
amqplibCore.credentials.plain(pluginConfig.credentials.username, pluginConfig.credentials.password);
}
this.publishConnection = await amqplib.connect((await this.getPluginConfig()).endpoint, {
credentials: amqplib.credentials.plain((await this.getPluginConfig()).credentials.username, (await this.getPluginConfig()).credentials.password)
this.publishConnection = amqplib.connect(pluginConfig.endpoints, socketOptions);
this.receiveConnection = amqplib.connect(pluginConfig.endpoints, socketOptions);
const self = this;
this.publishConnection.on("connect", (data) => {
self.log.info("AMQP CONNECTED: {url}", { url: data.url });
});
this.receiveConnection = await amqplib.connect((await this.getPluginConfig()).endpoint, {
credentials: amqplib.credentials.plain((await this.getPluginConfig()).credentials.username, (await this.getPluginConfig()).credentials.password)
this.publishConnection.on("connectFailed", (data) => {
self.log.error("AMQP CONNECT FAIL: {url} ({msg})", { url: data.url, msg: data.err.toString() });
});
const self = this;
this.publishConnection.on("error", (err) => {
if (err.message !== "Connection closing") {
self.log.error('AMQP ERROR', err.message);
self.log.error("AMQP ERROR: {message}", { message: err.message });
}

@@ -45,14 +76,20 @@ });

if (err.message !== "Connection closing") {
self.log.error('AMQP ERROR', err.message);
self.log.error("AMQP ERROR: {message}", { message: err.message });
}
});
this.publishConnection.on("close", () => {
self.log.error('AMQP CONNECTION CLOSED');
self.log.fatal('AMQP Error: Connection closed');
self.log.error("AMQP CONNECTION CLOSED");
if (pluginConfig.fatalOnDisconnect)
self.log.fatal("AMQP Error: Connection closed");
});
this.receiveConnection.on("close", () => {
self.log.error('AMQP CONNECTION CLOSED');
self.log.fatal('AMQP Error: Connection closed');
self.log.error("AMQP CONNECTION CLOSED");
if (pluginConfig.fatalOnDisconnect)
self.log.fatal("AMQP Error: Connection closed");
});
this.log.info(`Connected to ${(await this.getPluginConfig()).endpoint}x2`);
this.log.info(`Connected to {endpoints}x2? (s:{sendS}/p:{pubS})`, {
endpoints: (await this.getPluginConfig()).endpoints,
sendS: this.receiveConnection.isConnected(),
pubS: this.publishConnection.isConnected(),
});
this.myId = `${(await this.getPluginConfig()).uniqueId || (0, os_1.hostname)()}-${(0, crypto_1.randomUUID)()}`;

@@ -63,13 +100,20 @@ await this.emit.init(this);

}
dispose() {
this.emit.dispose();
this.ear.dispose();
this.eas.dispose();
this.publishConnection.close();
this.receiveConnection.close();
}
async onEvent(callerPluginName, pluginName, event, listener) {
return await this.emit.onEvent(callerPluginName, pluginName, event, listener);
await this.emit.onEvent(callerPluginName, pluginName, event, listener);
}
async emitEvent(callerPluginName, pluginName, event, data) {
return await this.emit.emitEvent(callerPluginName, pluginName, event, data);
async emitEvent(callerPluginName, pluginName, event, args) {
await this.emit.emitEvent(callerPluginName, pluginName, event, args);
}
async onReturnableEvent(callerPluginName, pluginName, event, listener) {
return await this.ear.onReturnableEvent(callerPluginName, pluginName, event, listener);
await this.ear.onReturnableEvent(callerPluginName, pluginName, event, listener);
}
async emitEventAndReturn(callerPluginName, pluginName, event, data, timeoutSeconds) {
return this.ear.emitEventAndReturn(callerPluginName, pluginName, event, data, timeoutSeconds);
async emitEventAndReturn(callerPluginName, pluginName, event, timeoutSeconds, args) {
return await this.ear.emitEventAndReturn(callerPluginName, pluginName, event, timeoutSeconds, args);
}

@@ -76,0 +120,0 @@ async receiveStream(callerPluginName, listener, timeoutSeconds) {

@@ -0,1 +1,2 @@

import { SecConfig } from "@bettercorp/service-base/lib/interfaces/serviceConfig";
export declare enum DataType {

@@ -12,7 +13,8 @@ string = "string",

}
export interface IPluginConfig {
export interface PluginConfig {
fatalOnDisconnect: boolean;
prefetch: number;
endpoint: string;
endpoints: Array<string>;
credentials: IPluginConfig_Credentials;
uniqueId: string | null;
uniqueId?: string;
}

@@ -23,11 +25,4 @@ export interface IPluginConfig_Credentials {

}
declare const _default: () => {
prefetch: number;
endpoint: string;
credentials: {
username: string;
password: string;
};
uniqueId: null;
};
export default _default;
export declare class Config extends SecConfig<PluginConfig> {
migrate(mappedPluginName: string, existingConfig: PluginConfig): PluginConfig;
}
"use strict";
Object.defineProperty(exports, "__esModule", { value: true });
exports.DataType = void 0;
exports.Config = exports.DataType = void 0;
const serviceConfig_1 = require("@bettercorp/service-base/lib/interfaces/serviceConfig");
const Tools_1 = require("@bettercorp/tools/lib/Tools");
var DataType;

@@ -16,13 +18,28 @@ (function (DataType) {

})(DataType = exports.DataType || (exports.DataType = {}));
exports.default = () => {
return {
prefetch: 10,
endpoint: "amqp://localhost",
credentials: {
username: "guest",
password: "guest"
},
uniqueId: null
};
};
class Config extends serviceConfig_1.SecConfig {
migrate(mappedPluginName, existingConfig) {
let resultConfig = {
fatalOnDisconnect: existingConfig.fatalOnDisconnect || true,
prefetch: existingConfig.prefetch || 10,
endpoints: (existingConfig.endpoint !== undefined
? [existingConfig.endpoint]
: existingConfig.endpoints) || ["amqp://localhost"],
credentials: existingConfig.credentials || {},
uniqueId: existingConfig.uniqueId,
};
if (resultConfig.endpoints.length === 0 ||
resultConfig.endpoints.filter((x) => !Tools_1.Tools.isString(x)).length > 0) {
resultConfig.endpoints = ["amqp://localhost"];
}
if (resultConfig.endpoints.length === 1) {
resultConfig.fatalOnDisconnect = true;
}
resultConfig.credentials.username =
(resultConfig.credentials || {}).username || "guest";
resultConfig.credentials.password =
(resultConfig.credentials || {}).password || "guest";
return resultConfig;
}
}
exports.Config = Config;
//# sourceMappingURL=sec.config.js.map

@@ -1,1 +0,42 @@

{"name":"@bettercorp/service-base-plugin-events-rabbitmq","license":"AGPL-3.0-only","repository":{"url":"https://gitlab.com/BetterCorp/public/service-base-plugin-events-rabbitmq"},"scripts":{"dev":"nodemon --config node_modules/@bettercorp/service-base/build/nodemon.json","start":"ts-node node_modules/@bettercorp/service-base/lib/index.js","build":"tsc","deploy":"npm publish","version":"node ./node_modules/@bettercorp/service-base/build/version-bcorp.js $0","create":"ts-node node_modules/@bettercorp/service-base/lib/bootstrap.js $0","_test":"./node_modules/mocha/bin/mocha --reporter mocha-junit-reporter --reporter-options mochaFile=junit.xml","_testDev":"./node_modules/mocha/bin/mocha"},"files":["lib/**/*"],"main":"lib/index.js","version":"2.1.20220615203957","bsb_project":true,"devDependencies":{"@typescript-eslint/eslint-plugin":"^5.28.0","@typescript-eslint/parser":"^5.28.0","eslint":"^8.17.0","mocha":"^10.0.0","mocha-junit-reporter":"^2.0.2","ts-node":"^10.8.1","typescript":"^4.7.3"},"dependencies":{"@bettercorp/service-base":"^7.6.20220531162851","@bettercorp/tools":"^2.0.20220613145514","@types/amqplib":"^0.8.2","@types/node":"^17.0.44","amqplib":"^0.10.0"}}
{
"name": "@bettercorp/service-base-plugin-events-rabbitmq",
"license": "AGPL-3.0-only",
"repository": {
"url": "https://github.com/BetterCorp/service-base-events-rabbitmq"
},
"scripts": {
"dev": "nodemon --config node_modules/@bettercorp/service-base/development/nodemon.json",
"start": "ts-node node_modules/@bettercorp/service-base/lib/cli.js",
"build": "tsc",
"test": "env TS_NODE_COMPILER_OPTIONS='{\"module\": \"commonjs\" }' node ./node_modules/nyc/bin/nyc.js --reporter json --reporter lcov ./node_modules/mocha/bin/mocha.js -r ts-node/register 'src/tests/**/*.ts' --reporter json --reporter-options output=junit.json",
"testDev": "env TS_NODE_COMPILER_OPTIONS='{\"module\": \"commonjs\" }' node ./node_modules/nyc/bin/nyc.js ./node_modules/mocha/bin/mocha.js -r ts-node/register 'src/tests/**/*.ts'"
},
"files": [
"lib/**/*"
],
"main": "lib/index.js",
"version": "2.1.20220615203962-ea.0",
"bsb_project": true,
"devDependencies": {
"@types/amqplib": "^0.8.2",
"@types/assert": "^1.5.6",
"@types/chai": "^4.3.3",
"@types/mocha": "^9.1.1",
"@types/node": "^17.0.44",
"@types/yargs": "^17.0.10",
"@typescript-eslint/eslint-plugin": "^5.28.0",
"@typescript-eslint/parser": "^5.28.0",
"eslint": "^8.17.0",
"mocha": "^10.0.0",
"mocha-junit-reporter": "^2.0.2",
"nyc": "^15.1.0",
"ts-node": "^10.8.1",
"typescript": "^4.7.3"
},
"dependencies": {
"@bettercorp/service-base": "^7.6.20220531162965-ea.0",
"@bettercorp/tools": "^2.0.20220613145514",
"amqp-connection-manager": "^4.1.6",
"amqplib": "^0.10.3"
}
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc