Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

@thecolvinco/nodejs-amqplib

Package Overview
Dependencies
Maintainers
1
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@thecolvinco/nodejs-amqplib - npm Package Compare versions

Comparing version 1.0.0 to 2.0.0

dist/consumer/CommandConsumer.d.ts

14

dist/main.d.ts

@@ -1,8 +0,6 @@

import { Connection } from 'amqplib';
import retryable from './utils/retryable';
import deadLetter from './utils/deadLetter';
import worker from './utils/worker';
import producer from './utils/producer';
import { createEvent, createCommand, toJSON } from './utils/message';
declare const amqpConnect: (connectionString: string) => Promise<Connection>;
export { amqpConnect, retryable, deadLetter, worker, producer, createEvent, createCommand, toJSON, };
import { createEvent, createCommand, toJSON, producer, worker, deadLetter, retryable, amqpConnect } from './utils';
import { CommandConsumer, DomainEventConsumer } from './consumer';
import { pubSubInitialization, transportsInitialization } from './container';
import { CommandEmitter, DomainEventEmitter, EventDispatcher } from './emitter';
import { commandSubscriber, domainEventSubscriber } from './subscribers';
export { amqpConnect, retryable, deadLetter, worker, producer, createEvent, createCommand, toJSON, CommandConsumer, DomainEventConsumer, pubSubInitialization, transportsInitialization, CommandEmitter, DomainEventEmitter, EventDispatcher, commandSubscriber, domainEventSubscriber };

@@ -0,4 +1,152 @@

import { v4 } from 'uuid';
import { connect } from 'amqplib';
import { v4 } from 'uuid';
import { EventEmitter } from 'events';
import { resolve } from 'path';
const parse = ({
payload,
meta,
type
}) => {
const {
company,
context,
version,
entity,
name
} = meta;
return {
data: {
messageId: v4(),
occurredOn: Date.now(),
attributes: payload,
type: `${company}.${context}.${version}.${type}.${entity}.${name}`
}
};
};
const createEvent = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'domain_event'
});
return message;
};
const createCommand = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'command'
});
return message;
};
const toJSON = message => {
return JSON.parse(message.content.toString());
};
var producer = (function ({
channel,
message,
key,
exchange
}) {
try {
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({
exchange: exchangeAsserted
}) {
channel.publish(exchangeAsserted, key, Buffer.from(message));
return Promise.resolve(channel.close()).then(function () {});
});
} catch (e) {
return Promise.reject(e);
}
});
var worker = (function ({
channel,
exchange,
queue,
consumer
}) {
try {
const {
name: exchangeName,
type: exchangeType = 'topic',
options: exchangeOptions
} = exchange;
const {
name: queueName,
bindingKey,
options: queueOptions,
dlx = null
} = queue;
const {
onMessage,
options: consumerOptions
} = consumer;
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () {
function _temp2() {
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({
queue: assertedQueue
}) {
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () {
return channel.consume(assertedQueue, onMessage, consumerOptions);
});
});
}
const _temp = function () {
if (dlx !== null) {
const {
dlxQueue,
dlxExchange
} = dlx.params;
return Promise.resolve(dlx.func({
channel,
targetQueue: queue,
dlxQueue,
dlxExchange
})).then(function () {});
}
}();
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp);
});
} catch (e) {
return Promise.reject(e);
}
});
var deadLetter = (function ({
channel,
targetQueue,
dlxQueue,
dlxExchange
}) {
try {
const exchangeType = dlxExchange.type || 'direct';
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () {
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({
queue: queueAsserted
}) {
var _ref, _targetQueue$options$, _targetQueue$options;
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : '';
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {});
});
});
} catch (e) {
return Promise.reject(e);
}
});
function _extends() {

@@ -108,153 +256,713 @@ _extends = Object.assign || function (target) {

var deadLetter = (function ({
channel,
targetQueue,
dlxQueue,
dlxExchange
}) {
const amqpConnect = connectionString => {
return connect(connectionString);
};
function _catch(body, recover) {
try {
const exchangeType = dlxExchange.type || 'direct';
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () {
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({
queue: queueAsserted
}) {
var _ref, _targetQueue$options$, _targetQueue$options;
var result = body();
} catch (e) {
return recover(e);
}
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : '';
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {});
});
});
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class EventDispatcher {
constructor({
config
}) {
const _this = this,
_this2 = this,
_this3 = this;
this.asyncDispatch = function ({
eventConfig,
messageBody
}) {
try {
const {
transport
} = eventConfig;
const {
connectionString,
exchange
} = _this.config.transports[transport];
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return _catch(function () {
return Promise.resolve(connection.createChannel()).then(function (channel) {
const {
type: key
} = messageBody.data;
const message = JSON.stringify(messageBody);
return Promise.resolve(producer({
channel,
message,
key,
exchange
})).then(function () {
connection.close();
return true;
});
});
}, function () {
connection.close();
return false;
});
});
} catch (e) {
return Promise.reject(e);
}
};
this.asyncCommandDispatch = function ({
eventConfig,
event
}) {
try {
const [company, context, version,, entity, name] = event.getName().split('.');
const messageBody = createCommand({
payload: event.getPayload(),
meta: {
company,
context,
version,
entity,
name
}
});
return Promise.resolve(_this2.asyncDispatch({
eventConfig,
messageBody
}));
} catch (e) {
return Promise.reject(e);
}
};
this.asyncEventdDispatch = function ({
eventConfig,
event
}) {
try {
const [company, context, version,, entity, name] = event.getName().split('.');
const messageBody = createEvent({
payload: event.getPayload(),
meta: {
company,
context,
version,
entity,
name
}
});
return Promise.resolve(_this3.asyncDispatch({
eventConfig,
messageBody
}));
} catch (e) {
return Promise.reject(e);
}
};
this.config = config;
}
dispatch({
event
}) {
try {
const _this4 = this;
const eventClassname = event.constructor.name;
const isCommand = /Command$/.test(eventClassname);
const isEvent = /Event$/.test(eventClassname);
const eventConfig = _this4.config.routing[eventClassname];
if (!eventConfig) {
throw Error(`Missed event configuration for event ${eventClassname}`);
}
if (eventConfig.async === true || eventConfig.async === undefined) {
if (isCommand) {
_this4.asyncCommandDispatch({
eventConfig,
event
});
}
if (isEvent) {
_this4.asyncEventdDispatch({
eventConfig,
event
});
}
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}
}
function _catch$1(body, recover) {
try {
var result = body();
} catch (e) {
return Promise.reject(e);
return recover(e);
}
});
var worker = (function ({
channel,
exchange,
queue,
consumer
}) {
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class DomainEventEmitter extends EventEmitter {
on(eventName, originalFunction) {
const decoratedFunction = function ({
message,
onSuccess,
onError
}) {
try {
const _temp = _catch$1(function () {
return Promise.resolve(originalFunction(message)).then(function () {
if (onSuccess) onSuccess();
});
}, function (error) {
if (onError) onError({
error
});
});
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0);
} catch (e) {
return Promise.reject(e);
}
};
super.addListener(eventName, decoratedFunction);
return this;
}
}
function _catch$2(body, recover) {
try {
const {
name: exchangeName,
type: exchangeType = 'topic',
options: exchangeOptions
} = exchange;
const {
name: queueName,
bindingKey,
options: queueOptions,
dlx = null
} = queue;
const {
onMessage,
options: consumerOptions
} = consumer;
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () {
function _temp2() {
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({
queue: assertedQueue
}) {
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () {
return channel.consume(assertedQueue, onMessage, consumerOptions);
var result = body();
} catch (e) {
return recover(e);
}
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class CommandEmitter extends EventEmitter {
on(eventName, originalFunction) {
const decoratedFunction = function ({
message,
onSuccess,
onError
}) {
try {
const _temp = _catch$2(function () {
return Promise.resolve(originalFunction(message)).then(function () {
if (onSuccess) onSuccess();
});
}, function (error) {
if (onError) onError({
error
});
});
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0);
} catch (e) {
return Promise.reject(e);
}
};
const _temp = function () {
if (dlx !== null) {
const {
dlxQueue,
dlxExchange
} = dlx.params;
return Promise.resolve(dlx.func({
channel,
targetQueue: queue,
dlxQueue,
dlxExchange
})).then(function () {});
super.addListener(eventName, decoratedFunction);
return this;
}
}
var domainEventSubscriber = {
subscribe: function ({
emitter,
eventDispatcher,
eventName,
commandsPath
}) {
try {
commandsPath.forEach(function (commandPath) {
try {
return Promise.resolve(import(resolve(commandPath.trim()))).then(function ({
default: Command
}) {
emitter.on(eventName, message => {
eventDispatcher.dispatch({
event: new Command({
message
})
});
});
});
} catch (e) {
return Promise.reject(e);
}
}();
});
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}
};
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp);
});
} catch (e) {
return Promise.reject(e);
var commandSubscriber = {
subscribe: function ({
emitter,
eventName,
handlerPath
}) {
try {
return Promise.resolve(import(resolve(handlerPath.trim()))).then(function ({
default: CommandHandler
}) {
return Promise.resolve(import(resolve(handlerPath.replace('Handler', '').trim()))).then(function ({
default: Command
}) {
emitter.on(eventName, message => {
const handler = new CommandHandler({
command: new Command({
message
})
});
handler.handle();
});
});
});
} catch (e) {
return Promise.reject(e);
}
}
});
};
var producer = (function ({
channel,
message,
key,
exchange
let commandEmitter = null;
let domainEventEmitter = null;
const getCommandEmitter = () => {
return commandEmitter;
};
const getDomainEventsEmitter = () => {
return domainEventEmitter;
};
const subscribersInitialization = function ({
subscribers,
eventDispatcher,
// eslint-disable-next-line no-shadow
domainEventEmitter,
// eslint-disable-next-line no-shadow
commandEmitter
}) {
try {
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({
exchange: exchangeAsserted
}) {
channel.publish(exchangeAsserted, key, Buffer.from(message));
return Promise.resolve(channel.close()).then(function () {});
});
function _temp3() {
const _temp = function () {
if (subscribers.commands) {
return Promise.resolve(Promise.all(subscribers.commands.map(function ({
eventName,
handlerPath
}) {
try {
try {
commandSubscriber.subscribe({
emitter: commandEmitter,
eventName,
handlerPath
});
} catch (error) {
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}))).then(function () {});
}
}();
if (_temp && _temp.then) return _temp.then(function () {});
}
const _temp2 = function () {
if (subscribers.domainEvents) {
return Promise.resolve(Promise.all(subscribers.domainEvents.map(function ({
eventName,
commandsPath
}) {
try {
try {
domainEventSubscriber.subscribe({
eventDispatcher,
emitter: domainEventEmitter,
eventName,
commandsPath
});
} catch (error) {
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}))).then(function () {});
}
}();
return Promise.resolve(_temp2 && _temp2.then ? _temp2.then(_temp3) : _temp3(_temp2));
} catch (e) {
return Promise.reject(e);
}
});
};
const parse = ({
payload,
meta,
type
const pubSubInitialization = ({
config
}) => {
const eventDispatcher = new EventDispatcher({
config
});
domainEventEmitter = new DomainEventEmitter();
commandEmitter = new CommandEmitter();
const {
company,
context,
version,
entity,
name
} = meta;
subscribers
} = config;
subscribersInitialization({
subscribers,
eventDispatcher,
domainEventEmitter,
commandEmitter
});
return {
data: {
messageId: v4(),
occurredOn: Date.now(),
attributes: payload,
type: `${company}.${context}.${version}.${type}.${entity}.${name}`
}
eventDispatcher,
domainEventEmitter,
commandEmitter
};
};
const createEvent = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'domain_event'
});
return message;
};
function _catch$3(body, recover) {
try {
var result = body();
} catch (e) {
return recover(e);
}
const createCommand = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'command'
});
return message;
};
if (result && result.then) {
return result.then(void 0, recover);
}
const toJSON = message => {
return JSON.parse(message.content.toString());
};
return result;
}
const amqpConnect = connectionString => {
return connect(connectionString);
};
class CommandConsumer {
constructor({
config
}) {
this.transports = config.transports;
}
export { amqpConnect, createCommand, createEvent, deadLetter, producer, retryable, toJSON, worker };
consume({
transport,
queueName,
prefetchValue,
emitter = null
}) {
try {
const _this = this;
if (!_this.transports[transport]) {
throw Error(`Transport ${transport} is not defined`);
}
const {
connectionString,
queues
} = _this.transports[transport];
const queue = queues.find(({
name
}) => name === queueName);
if (!queue) {
throw Error(`Queue ${queue} is not defined`);
}
const {
retryPolicy = null
} = queue;
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return _catch$3(function () {
return Promise.resolve(connection.createChannel()).then(function (channel) {
if (prefetchValue) {
channel.prefetch(prefetchValue);
}
const commandEmitter = emitter === null ? getCommandEmitter() : emitter;
const onMessage = _this.workable({
channel,
emitter: commandEmitter,
retryPolicy,
queueName
});
channel.consume(queueName, onMessage, {
noAck: false
});
});
}, function (error) {
connection.close();
throw error;
});
});
} catch (e) {
return Promise.reject(e);
}
}
workable({
channel,
emitter,
retryPolicy,
queueName
}) {
return function (msg) {
try {
const message = toJSON(msg);
const {
data
} = message;
const {
type: eventName
} = data;
try {
const onError = () => {
if (!retryPolicy) channel.nack(msg, false, false);
const {
maxRetries,
delay,
retryExchangeName = 'blom.retries.exchange',
onRejected = null
} = retryPolicy;
retryable({
channel,
message: msg,
queue: {
name: queueName
},
retryExchange: {
name: retryExchangeName
},
maxRetries,
delay,
onRejected
});
};
const onSuccess = () => {
channel.ack(msg);
};
emitter.emit(eventName, {
message,
onSuccess,
onError
});
} catch (error) {
channel.nack(msg, false, false);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
};
}
}
function _catch$4(body, recover) {
try {
var result = body();
} catch (e) {
return recover(e);
}
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class DomainEventConsumer {
constructor({
config
}) {
this.transports = config.transports;
}
consume({
transport,
queueName,
prefetchValue,
emitter = null
}) {
try {
const _this = this;
if (!_this.transports[transport]) {
throw Error(`Transport ${transport} is not defined`);
}
const {
connectionString,
queues
} = _this.transports[transport];
const queue = queues.find(({
name
}) => name === queueName);
if (!queue) {
throw Error(`Queue ${queue} is not defined`);
}
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return _catch$4(function () {
return Promise.resolve(connection.createChannel()).then(function (channel) {
if (prefetchValue) {
channel.prefetch(prefetchValue);
}
const domainEventsEmitter = emitter === null ? getDomainEventsEmitter() : emitter;
const consumerData = {
onMessage: _this.workable({
channel,
emitter: domainEventsEmitter
}),
options: {
noAck: false
}
};
channel.consume(queueName, consumerData.onMessage, consumerData.options);
});
}, function (error) {
connection.close();
throw error;
});
});
} catch (e) {
return Promise.reject(e);
}
}
workable({
channel,
emitter
}) {
return function (msg) {
try {
const message = toJSON(msg);
const {
data
} = message;
const {
type: eventName
} = data;
try {
emitter.emit(eventName, {
message
});
channel.ack(msg);
} catch (error) {
channel.nack(msg, false, false);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
};
}
}
var transportsInitialization = (function ({
config
}) {
try {
const {
transports
} = config;
Object.entries(transports).forEach(function ([, transport]) {
try {
const {
connectionString,
exchange,
queues = []
} = transport;
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return Promise.resolve(connection.createChannel()).then(function (channel) {
const {
name: exchangeName,
type: exchangeType,
options: exchangeOptions = {}
} = exchange;
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () {
return Promise.resolve(Promise.all(queues.map(function ({
name: queueName,
bindingKey = null,
options: queueOptions = {}
}) {
try {
const pattern = bindingKey || '';
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({
queue: assertedQueue
}) {
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, pattern)).then(function () {});
});
} catch (e) {
return Promise.reject(e);
}
}))).then(function () {
connection.close();
});
});
});
});
} catch (e) {
return Promise.reject(e);
}
});
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
});
export { CommandConsumer, CommandEmitter, DomainEventConsumer, DomainEventEmitter, EventDispatcher, amqpConnect, commandSubscriber, createCommand, createEvent, deadLetter, domainEventSubscriber, producer, pubSubInitialization, retryable, toJSON, transportsInitialization, worker };
//# sourceMappingURL=main.esm.js.map

@@ -0,4 +1,171 @@

function _interopNamespace(e) {
if (e && e.__esModule) { return e; } else {
var n = {};
if (e) {
Object.keys(e).forEach(function (k) {
var d = Object.getOwnPropertyDescriptor(e, k);
Object.defineProperty(n, k, d.get ? d : {
enumerable: true,
get: function () {
return e[k];
}
});
});
}
n['default'] = e;
return n;
}
}
var uuid = require('uuid');
var amqplib = require('amqplib');
var uuid = require('uuid');
var events = require('events');
var path = require('path');
const parse = ({
payload,
meta,
type
}) => {
const {
company,
context,
version,
entity,
name
} = meta;
return {
data: {
messageId: uuid.v4(),
occurredOn: Date.now(),
attributes: payload,
type: `${company}.${context}.${version}.${type}.${entity}.${name}`
}
};
};
const createEvent = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'domain_event'
});
return message;
};
const createCommand = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'command'
});
return message;
};
const toJSON = message => {
return JSON.parse(message.content.toString());
};
var producer = (function ({
channel,
message,
key,
exchange
}) {
try {
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({
exchange: exchangeAsserted
}) {
channel.publish(exchangeAsserted, key, Buffer.from(message));
return Promise.resolve(channel.close()).then(function () {});
});
} catch (e) {
return Promise.reject(e);
}
});
var worker = (function ({
channel,
exchange,
queue,
consumer
}) {
try {
const {
name: exchangeName,
type: exchangeType = 'topic',
options: exchangeOptions
} = exchange;
const {
name: queueName,
bindingKey,
options: queueOptions,
dlx = null
} = queue;
const {
onMessage,
options: consumerOptions
} = consumer;
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () {
function _temp2() {
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({
queue: assertedQueue
}) {
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () {
return channel.consume(assertedQueue, onMessage, consumerOptions);
});
});
}
const _temp = function () {
if (dlx !== null) {
const {
dlxQueue,
dlxExchange
} = dlx.params;
return Promise.resolve(dlx.func({
channel,
targetQueue: queue,
dlxQueue,
dlxExchange
})).then(function () {});
}
}();
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp);
});
} catch (e) {
return Promise.reject(e);
}
});
var deadLetter = (function ({
channel,
targetQueue,
dlxQueue,
dlxExchange
}) {
try {
const exchangeType = dlxExchange.type || 'direct';
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () {
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({
queue: queueAsserted
}) {
var _ref, _targetQueue$options$, _targetQueue$options;
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : '';
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {});
});
});
} catch (e) {
return Promise.reject(e);
}
});
function _extends() {

@@ -108,160 +275,729 @@ _extends = Object.assign || function (target) {

var deadLetter = (function ({
channel,
targetQueue,
dlxQueue,
dlxExchange
}) {
const amqpConnect = connectionString => {
return amqplib.connect(connectionString);
};
function _catch(body, recover) {
try {
const exchangeType = dlxExchange.type || 'direct';
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () {
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({
queue: queueAsserted
}) {
var _ref, _targetQueue$options$, _targetQueue$options;
var result = body();
} catch (e) {
return recover(e);
}
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : '';
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {});
});
});
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class EventDispatcher {
constructor({
config
}) {
const _this = this,
_this2 = this,
_this3 = this;
this.asyncDispatch = function ({
eventConfig,
messageBody
}) {
try {
const {
transport
} = eventConfig;
const {
connectionString,
exchange
} = _this.config.transports[transport];
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return _catch(function () {
return Promise.resolve(connection.createChannel()).then(function (channel) {
const {
type: key
} = messageBody.data;
const message = JSON.stringify(messageBody);
return Promise.resolve(producer({
channel,
message,
key,
exchange
})).then(function () {
connection.close();
return true;
});
});
}, function () {
connection.close();
return false;
});
});
} catch (e) {
return Promise.reject(e);
}
};
this.asyncCommandDispatch = function ({
eventConfig,
event
}) {
try {
const [company, context, version,, entity, name] = event.getName().split('.');
const messageBody = createCommand({
payload: event.getPayload(),
meta: {
company,
context,
version,
entity,
name
}
});
return Promise.resolve(_this2.asyncDispatch({
eventConfig,
messageBody
}));
} catch (e) {
return Promise.reject(e);
}
};
this.asyncEventdDispatch = function ({
eventConfig,
event
}) {
try {
const [company, context, version,, entity, name] = event.getName().split('.');
const messageBody = createEvent({
payload: event.getPayload(),
meta: {
company,
context,
version,
entity,
name
}
});
return Promise.resolve(_this3.asyncDispatch({
eventConfig,
messageBody
}));
} catch (e) {
return Promise.reject(e);
}
};
this.config = config;
}
dispatch({
event
}) {
try {
const _this4 = this;
const eventClassname = event.constructor.name;
const isCommand = /Command$/.test(eventClassname);
const isEvent = /Event$/.test(eventClassname);
const eventConfig = _this4.config.routing[eventClassname];
if (!eventConfig) {
throw Error(`Missed event configuration for event ${eventClassname}`);
}
if (eventConfig.async === true || eventConfig.async === undefined) {
if (isCommand) {
_this4.asyncCommandDispatch({
eventConfig,
event
});
}
if (isEvent) {
_this4.asyncEventdDispatch({
eventConfig,
event
});
}
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}
}
function _catch$1(body, recover) {
try {
var result = body();
} catch (e) {
return Promise.reject(e);
return recover(e);
}
});
var worker = (function ({
channel,
exchange,
queue,
consumer
}) {
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class DomainEventEmitter extends events.EventEmitter {
on(eventName, originalFunction) {
const decoratedFunction = function ({
message,
onSuccess,
onError
}) {
try {
const _temp = _catch$1(function () {
return Promise.resolve(originalFunction(message)).then(function () {
if (onSuccess) onSuccess();
});
}, function (error) {
if (onError) onError({
error
});
});
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0);
} catch (e) {
return Promise.reject(e);
}
};
super.addListener(eventName, decoratedFunction);
return this;
}
}
function _catch$2(body, recover) {
try {
const {
name: exchangeName,
type: exchangeType = 'topic',
options: exchangeOptions
} = exchange;
const {
name: queueName,
bindingKey,
options: queueOptions,
dlx = null
} = queue;
const {
onMessage,
options: consumerOptions
} = consumer;
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () {
function _temp2() {
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({
queue: assertedQueue
}) {
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () {
return channel.consume(assertedQueue, onMessage, consumerOptions);
var result = body();
} catch (e) {
return recover(e);
}
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class CommandEmitter extends events.EventEmitter {
on(eventName, originalFunction) {
const decoratedFunction = function ({
message,
onSuccess,
onError
}) {
try {
const _temp = _catch$2(function () {
return Promise.resolve(originalFunction(message)).then(function () {
if (onSuccess) onSuccess();
});
}, function (error) {
if (onError) onError({
error
});
});
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0);
} catch (e) {
return Promise.reject(e);
}
};
const _temp = function () {
if (dlx !== null) {
const {
dlxQueue,
dlxExchange
} = dlx.params;
return Promise.resolve(dlx.func({
channel,
targetQueue: queue,
dlxQueue,
dlxExchange
})).then(function () {});
super.addListener(eventName, decoratedFunction);
return this;
}
}
var domainEventSubscriber = {
subscribe: function ({
emitter,
eventDispatcher,
eventName,
commandsPath
}) {
try {
commandsPath.forEach(function (commandPath) {
try {
return Promise.resolve(new Promise(function (resolve) { resolve(_interopNamespace(require(path.resolve(commandPath.trim())))); })).then(function ({
default: Command
}) {
emitter.on(eventName, message => {
eventDispatcher.dispatch({
event: new Command({
message
})
});
});
});
} catch (e) {
return Promise.reject(e);
}
}();
});
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}
};
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp);
});
} catch (e) {
return Promise.reject(e);
var commandSubscriber = {
subscribe: function ({
emitter,
eventName,
handlerPath
}) {
try {
return Promise.resolve(new Promise(function (resolve) { resolve(_interopNamespace(require(path.resolve(handlerPath.trim())))); })).then(function ({
default: CommandHandler
}) {
return Promise.resolve(new Promise(function (resolve) { resolve(_interopNamespace(require(path.resolve(handlerPath.replace('Handler', '').trim())))); })).then(function ({
default: Command
}) {
emitter.on(eventName, message => {
const handler = new CommandHandler({
command: new Command({
message
})
});
handler.handle();
});
});
});
} catch (e) {
return Promise.reject(e);
}
}
});
};
var producer = (function ({
channel,
message,
key,
exchange
let commandEmitter = null;
let domainEventEmitter = null;
const getCommandEmitter = () => {
return commandEmitter;
};
const getDomainEventsEmitter = () => {
return domainEventEmitter;
};
const subscribersInitialization = function ({
subscribers,
eventDispatcher,
// eslint-disable-next-line no-shadow
domainEventEmitter,
// eslint-disable-next-line no-shadow
commandEmitter
}) {
try {
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({
exchange: exchangeAsserted
}) {
channel.publish(exchangeAsserted, key, Buffer.from(message));
return Promise.resolve(channel.close()).then(function () {});
});
function _temp3() {
const _temp = function () {
if (subscribers.commands) {
return Promise.resolve(Promise.all(subscribers.commands.map(function ({
eventName,
handlerPath
}) {
try {
try {
commandSubscriber.subscribe({
emitter: commandEmitter,
eventName,
handlerPath
});
} catch (error) {
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}))).then(function () {});
}
}();
if (_temp && _temp.then) return _temp.then(function () {});
}
const _temp2 = function () {
if (subscribers.domainEvents) {
return Promise.resolve(Promise.all(subscribers.domainEvents.map(function ({
eventName,
commandsPath
}) {
try {
try {
domainEventSubscriber.subscribe({
eventDispatcher,
emitter: domainEventEmitter,
eventName,
commandsPath
});
} catch (error) {
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}))).then(function () {});
}
}();
return Promise.resolve(_temp2 && _temp2.then ? _temp2.then(_temp3) : _temp3(_temp2));
} catch (e) {
return Promise.reject(e);
}
});
};
const parse = ({
payload,
meta,
type
const pubSubInitialization = ({
config
}) => {
const eventDispatcher = new EventDispatcher({
config
});
domainEventEmitter = new DomainEventEmitter();
commandEmitter = new CommandEmitter();
const {
company,
context,
version,
entity,
name
} = meta;
subscribers
} = config;
subscribersInitialization({
subscribers,
eventDispatcher,
domainEventEmitter,
commandEmitter
});
return {
data: {
messageId: uuid.v4(),
occurredOn: Date.now(),
attributes: payload,
type: `${company}.${context}.${version}.${type}.${entity}.${name}`
}
eventDispatcher,
domainEventEmitter,
commandEmitter
};
};
const createEvent = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'domain_event'
});
return message;
};
function _catch$3(body, recover) {
try {
var result = body();
} catch (e) {
return recover(e);
}
const createCommand = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'command'
});
return message;
};
if (result && result.then) {
return result.then(void 0, recover);
}
const toJSON = message => {
return JSON.parse(message.content.toString());
};
return result;
}
const amqpConnect = connectionString => {
return amqplib.connect(connectionString);
};
class CommandConsumer {
constructor({
config
}) {
this.transports = config.transports;
}
consume({
transport,
queueName,
prefetchValue,
emitter = null
}) {
try {
const _this = this;
if (!_this.transports[transport]) {
throw Error(`Transport ${transport} is not defined`);
}
const {
connectionString,
queues
} = _this.transports[transport];
const queue = queues.find(({
name
}) => name === queueName);
if (!queue) {
throw Error(`Queue ${queue} is not defined`);
}
const {
retryPolicy = null
} = queue;
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return _catch$3(function () {
return Promise.resolve(connection.createChannel()).then(function (channel) {
if (prefetchValue) {
channel.prefetch(prefetchValue);
}
const commandEmitter = emitter === null ? getCommandEmitter() : emitter;
const onMessage = _this.workable({
channel,
emitter: commandEmitter,
retryPolicy,
queueName
});
channel.consume(queueName, onMessage, {
noAck: false
});
});
}, function (error) {
connection.close();
throw error;
});
});
} catch (e) {
return Promise.reject(e);
}
}
workable({
channel,
emitter,
retryPolicy,
queueName
}) {
return function (msg) {
try {
const message = toJSON(msg);
const {
data
} = message;
const {
type: eventName
} = data;
try {
const onError = () => {
if (!retryPolicy) channel.nack(msg, false, false);
const {
maxRetries,
delay,
retryExchangeName = 'blom.retries.exchange',
onRejected = null
} = retryPolicy;
retryable({
channel,
message: msg,
queue: {
name: queueName
},
retryExchange: {
name: retryExchangeName
},
maxRetries,
delay,
onRejected
});
};
const onSuccess = () => {
channel.ack(msg);
};
emitter.emit(eventName, {
message,
onSuccess,
onError
});
} catch (error) {
channel.nack(msg, false, false);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
};
}
}
function _catch$4(body, recover) {
try {
var result = body();
} catch (e) {
return recover(e);
}
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class DomainEventConsumer {
constructor({
config
}) {
this.transports = config.transports;
}
consume({
transport,
queueName,
prefetchValue,
emitter = null
}) {
try {
const _this = this;
if (!_this.transports[transport]) {
throw Error(`Transport ${transport} is not defined`);
}
const {
connectionString,
queues
} = _this.transports[transport];
const queue = queues.find(({
name
}) => name === queueName);
if (!queue) {
throw Error(`Queue ${queue} is not defined`);
}
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return _catch$4(function () {
return Promise.resolve(connection.createChannel()).then(function (channel) {
if (prefetchValue) {
channel.prefetch(prefetchValue);
}
const domainEventsEmitter = emitter === null ? getDomainEventsEmitter() : emitter;
const consumerData = {
onMessage: _this.workable({
channel,
emitter: domainEventsEmitter
}),
options: {
noAck: false
}
};
channel.consume(queueName, consumerData.onMessage, consumerData.options);
});
}, function (error) {
connection.close();
throw error;
});
});
} catch (e) {
return Promise.reject(e);
}
}
workable({
channel,
emitter
}) {
return function (msg) {
try {
const message = toJSON(msg);
const {
data
} = message;
const {
type: eventName
} = data;
try {
emitter.emit(eventName, {
message
});
channel.ack(msg);
} catch (error) {
channel.nack(msg, false, false);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
};
}
}
var transportsInitialization = (function ({
config
}) {
try {
const {
transports
} = config;
Object.entries(transports).forEach(function ([, transport]) {
try {
const {
connectionString,
exchange,
queues = []
} = transport;
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return Promise.resolve(connection.createChannel()).then(function (channel) {
const {
name: exchangeName,
type: exchangeType,
options: exchangeOptions = {}
} = exchange;
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () {
return Promise.resolve(Promise.all(queues.map(function ({
name: queueName,
bindingKey = null,
options: queueOptions = {}
}) {
try {
const pattern = bindingKey || '';
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({
queue: assertedQueue
}) {
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, pattern)).then(function () {});
});
} catch (e) {
return Promise.reject(e);
}
}))).then(function () {
connection.close();
});
});
});
});
} catch (e) {
return Promise.reject(e);
}
});
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
});
exports.CommandConsumer = CommandConsumer;
exports.CommandEmitter = CommandEmitter;
exports.DomainEventConsumer = DomainEventConsumer;
exports.DomainEventEmitter = DomainEventEmitter;
exports.EventDispatcher = EventDispatcher;
exports.amqpConnect = amqpConnect;
exports.commandSubscriber = commandSubscriber;
exports.createCommand = createCommand;
exports.createEvent = createEvent;
exports.deadLetter = deadLetter;
exports.domainEventSubscriber = domainEventSubscriber;
exports.producer = producer;
exports.pubSubInitialization = pubSubInitialization;
exports.retryable = retryable;
exports.toJSON = toJSON;
exports.transportsInitialization = transportsInitialization;
exports.worker = worker;
//# sourceMappingURL=main.js.map

@@ -0,71 +1,67 @@

import { v4 } from 'uuid';
import { connect } from 'amqplib';
import { v4 } from 'uuid';
import { EventEmitter } from 'events';
import { resolve } from 'path';
const assertExchange = async ({
channel,
retryExchange
const parse = ({
payload,
meta,
type
}) => {
const {
'arguments': extraArgs = {},
...options
} = retryExchange.options || {};
const assertExchangeOptions = {
durable: false,
'arguments': {
'x-delayed-type': 'direct',
...extraArgs
},
...options
company,
context,
version,
entity,
name
} = meta;
return {
data: {
messageId: v4(),
occurredOn: Date.now(),
attributes: payload,
type: `${company}.${context}.${version}.${type}.${entity}.${name}`
}
};
await channel.assertExchange(retryExchange.name, 'x-delayed-message', assertExchangeOptions);
};
var retryable = (async ({
channel,
message,
queue,
retryExchange,
maxRetries: _maxRetries = 5,
delay: _delay = 5000,
onRejected
const createEvent = ({
payload,
meta
}) => {
const {
'x-retries': xRetries
} = message.properties.headers;
const retries = xRetries ?? 0;
const nextDelay = (retries + 1) * _delay;
const message = parse({
payload,
meta,
type: 'domain_event'
});
return message;
};
if (retries < _maxRetries) {
await assertExchange({
channel,
retryExchange
});
const retryRoutingKey = queue.bindingKey ?? message.fields.routingKey;
await channel.bindQueue(queue.name, retryExchange.name, retryRoutingKey);
channel.publish(retryExchange.name, retryRoutingKey, Buffer.from(message.content.toString()), {
headers: {
'x-retries': retries + 1,
'x-delay': nextDelay
}
});
channel.ack(message);
} else {
if (typeof onRejected === 'function') onRejected(message);
channel.nack(message, false, false);
}
});
const createCommand = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'command'
});
return message;
};
var deadLetter = (async ({
const toJSON = message => {
return JSON.parse(message.content.toString());
};
var producer = (async ({
channel,
targetQueue,
dlxQueue,
dlxExchange
message,
key,
exchange
}) => {
const exchangeType = dlxExchange.type || 'direct';
await channel.assertExchange(dlxExchange.name, exchangeType);
const {
queue: queueAsserted
} = await channel.assertQueue(dlxQueue.name);
const bindingKey = targetQueue.options?.deadLetterRoutingKey ?? dlxQueue.bindingKey ?? '';
await channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey);
exchange: exchangeAsserted
} = await channel.assertExchange(exchange.name, exchange.type, exchange.options);
channel.publish(exchangeAsserted, key, Buffer.from(message));
await channel.close();
});

@@ -116,70 +112,590 @@

var producer = (async ({
var deadLetter = (async ({
channel,
message,
key,
exchange
targetQueue,
dlxQueue,
dlxExchange
}) => {
const exchangeType = dlxExchange.type || 'direct';
await channel.assertExchange(dlxExchange.name, exchangeType);
const {
exchange: exchangeAsserted
} = await channel.assertExchange(exchange.name, exchange.type, exchange.options);
channel.publish(exchangeAsserted, key, Buffer.from(message));
await channel.close();
queue: queueAsserted
} = await channel.assertQueue(dlxQueue.name);
const bindingKey = targetQueue.options?.deadLetterRoutingKey ?? dlxQueue.bindingKey ?? '';
await channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey);
});
const parse = ({
payload,
meta,
type
const assertExchange = async ({
channel,
retryExchange
}) => {
const {
company,
context,
version,
entity,
name
} = meta;
return {
data: {
messageId: v4(),
occurredOn: Date.now(),
attributes: payload,
type: `${company}.${context}.${version}.${type}.${entity}.${name}`
}
'arguments': extraArgs = {},
...options
} = retryExchange.options || {};
const assertExchangeOptions = {
durable: false,
'arguments': {
'x-delayed-type': 'direct',
...extraArgs
},
...options
};
await channel.assertExchange(retryExchange.name, 'x-delayed-message', assertExchangeOptions);
};
const createEvent = ({
payload,
meta
var retryable = (async ({
channel,
message,
queue,
retryExchange,
maxRetries: _maxRetries = 5,
delay: _delay = 5000,
onRejected
}) => {
const message = parse({
payload,
meta,
type: 'domain_event'
});
return message;
const {
'x-retries': xRetries
} = message.properties.headers;
const retries = xRetries ?? 0;
const nextDelay = (retries + 1) * _delay;
if (retries < _maxRetries) {
await assertExchange({
channel,
retryExchange
});
const retryRoutingKey = queue.bindingKey ?? message.fields.routingKey;
await channel.bindQueue(queue.name, retryExchange.name, retryRoutingKey);
channel.publish(retryExchange.name, retryRoutingKey, Buffer.from(message.content.toString()), {
headers: {
'x-retries': retries + 1,
'x-delay': nextDelay
}
});
channel.ack(message);
} else {
if (typeof onRejected === 'function') onRejected(message);
channel.nack(message, false, false);
}
});
const amqpConnect = connectionString => {
return connect(connectionString);
};
const createCommand = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'command'
});
return message;
class EventDispatcher {
constructor({
config
}) {
var _this = this;
this.asyncDispatch = async function ({
eventConfig,
messageBody
}) {
const {
transport
} = eventConfig;
const {
connectionString,
exchange
} = _this.config.transports[transport];
const connection = await amqpConnect(connectionString);
try {
const channel = await connection.createChannel();
const {
type: key
} = messageBody.data;
const message = JSON.stringify(messageBody);
await producer({
channel,
message,
key,
exchange
});
connection.close();
return true;
} catch (error) {
connection.close();
return false;
}
};
this.asyncCommandDispatch = async function ({
eventConfig,
event
}) {
const [company, context, version,, entity, name] = event.getName().split('.');
const messageBody = createCommand({
payload: event.getPayload(),
meta: {
company,
context,
version,
entity,
name
}
});
return _this.asyncDispatch({
eventConfig,
messageBody
});
};
this.asyncEventdDispatch = async function ({
eventConfig,
event
}) {
const [company, context, version,, entity, name] = event.getName().split('.');
const messageBody = createEvent({
payload: event.getPayload(),
meta: {
company,
context,
version,
entity,
name
}
});
return _this.asyncDispatch({
eventConfig,
messageBody
});
};
this.config = config;
}
async dispatch({
event
}) {
const eventClassname = event.constructor.name;
const isCommand = /Command$/.test(eventClassname);
const isEvent = /Event$/.test(eventClassname);
const eventConfig = this.config.routing[eventClassname];
if (!eventConfig) {
throw Error(`Missed event configuration for event ${eventClassname}`);
}
if (eventConfig.async === true || eventConfig.async === undefined) {
if (isCommand) {
this.asyncCommandDispatch({
eventConfig,
event
});
}
if (isEvent) {
this.asyncEventdDispatch({
eventConfig,
event
});
}
}
}
}
class DomainEventEmitter extends EventEmitter {
on(eventName, originalFunction) {
const decoratedFunction = async function ({
message,
onSuccess,
onError
}) {
try {
await originalFunction(message);
if (onSuccess) onSuccess();
} catch (error) {
if (onError) onError({
error
});
}
};
super.addListener(eventName, decoratedFunction);
return this;
}
}
class CommandEmitter extends EventEmitter {
on(eventName, originalFunction) {
const decoratedFunction = async function ({
message,
onSuccess,
onError
}) {
try {
await originalFunction(message);
if (onSuccess) onSuccess();
} catch (error) {
if (onError) onError({
error
});
}
};
super.addListener(eventName, decoratedFunction);
return this;
}
}
var domainEventSubscriber = {
async subscribe({
emitter,
eventDispatcher,
eventName,
commandsPath
}) {
commandsPath.forEach(async commandPath => {
const {
default: Command
} = await import(resolve(commandPath.trim()));
emitter.on(eventName, message => {
eventDispatcher.dispatch({
event: new Command({
message
})
});
});
});
}
};
const toJSON = message => {
return JSON.parse(message.content.toString());
var commandSubscriber = {
async subscribe({
emitter,
eventName,
handlerPath
}) {
const {
default: CommandHandler
} = await import(resolve(handlerPath.trim()));
const {
default: Command
} = await import(resolve(handlerPath.replace('Handler', '').trim()));
emitter.on(eventName, message => {
const handler = new CommandHandler({
command: new Command({
message
})
});
handler.handle();
});
}
};
const amqpConnect = connectionString => {
return connect(connectionString);
let commandEmitter = null;
let domainEventEmitter = null;
const getCommandEmitter = () => {
return commandEmitter;
};
const getDomainEventsEmitter = () => {
return domainEventEmitter;
};
export { amqpConnect, createCommand, createEvent, deadLetter, producer, retryable, toJSON, worker };
const subscribersInitialization = async ({
subscribers,
eventDispatcher,
// eslint-disable-next-line no-shadow
domainEventEmitter,
// eslint-disable-next-line no-shadow
commandEmitter
}) => {
if (subscribers.domainEvents) {
await Promise.all(subscribers.domainEvents.map(async ({
eventName,
commandsPath
}) => {
try {
domainEventSubscriber.subscribe({
eventDispatcher,
emitter: domainEventEmitter,
eventName,
commandsPath
});
} catch (error) {
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`);
}
}));
}
if (subscribers.commands) {
await Promise.all(subscribers.commands.map(async ({
eventName,
handlerPath
}) => {
try {
commandSubscriber.subscribe({
emitter: commandEmitter,
eventName,
handlerPath
});
} catch (error) {
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`);
}
}));
}
};
const pubSubInitialization = ({
config
}) => {
const eventDispatcher = new EventDispatcher({
config
});
domainEventEmitter = new DomainEventEmitter();
commandEmitter = new CommandEmitter();
const {
subscribers
} = config;
subscribersInitialization({
subscribers,
eventDispatcher,
domainEventEmitter,
commandEmitter
});
return {
eventDispatcher,
domainEventEmitter,
commandEmitter
};
};
class CommandConsumer {
constructor({
config
}) {
this.transports = config.transports;
}
async consume({
transport,
queueName,
prefetchValue,
emitter = null
}) {
if (!this.transports[transport]) {
throw Error(`Transport ${transport} is not defined`);
}
const {
connectionString,
queues
} = this.transports[transport];
const queue = queues.find(({
name
}) => name === queueName);
if (!queue) {
throw Error(`Queue ${queue} is not defined`);
}
const {
retryPolicy = null
} = queue;
const connection = await amqpConnect(connectionString);
try {
const channel = await connection.createChannel();
if (prefetchValue) {
channel.prefetch(prefetchValue);
}
const commandEmitter = emitter === null ? getCommandEmitter() : emitter;
const onMessage = this.workable({
channel,
emitter: commandEmitter,
retryPolicy,
queueName
});
channel.consume(queueName, onMessage, {
noAck: false
});
} catch (error) {
connection.close();
throw error;
}
}
workable({
channel,
emitter,
retryPolicy,
queueName
}) {
return async function (msg) {
const message = toJSON(msg);
const {
data
} = message;
const {
type: eventName
} = data;
try {
const onError = () => {
if (!retryPolicy) channel.nack(msg, false, false);
const {
maxRetries,
delay,
retryExchangeName = 'blom.retries.exchange',
onRejected = null
} = retryPolicy;
retryable({
channel,
message: msg,
queue: {
name: queueName
},
retryExchange: {
name: retryExchangeName
},
maxRetries,
delay,
onRejected
});
};
const onSuccess = () => {
channel.ack(msg);
};
emitter.emit(eventName, {
message,
onSuccess,
onError
});
} catch (error) {
channel.nack(msg, false, false);
}
};
}
}
class DomainEventConsumer {
constructor({
config
}) {
this.transports = config.transports;
}
async consume({
transport,
queueName,
prefetchValue,
emitter = null
}) {
if (!this.transports[transport]) {
throw Error(`Transport ${transport} is not defined`);
}
const {
connectionString,
queues
} = this.transports[transport];
const queue = queues.find(({
name
}) => name === queueName);
if (!queue) {
throw Error(`Queue ${queue} is not defined`);
}
const connection = await amqpConnect(connectionString);
try {
const channel = await connection.createChannel();
if (prefetchValue) {
channel.prefetch(prefetchValue);
}
const domainEventsEmitter = emitter === null ? getDomainEventsEmitter() : emitter;
const consumerData = {
onMessage: this.workable({
channel,
emitter: domainEventsEmitter
}),
options: {
noAck: false
}
};
channel.consume(queueName, consumerData.onMessage, consumerData.options);
} catch (error) {
connection.close();
throw error;
}
}
workable({
channel,
emitter
}) {
return async function (msg) {
const message = toJSON(msg);
const {
data
} = message;
const {
type: eventName
} = data;
try {
emitter.emit(eventName, {
message
});
channel.ack(msg);
} catch (error) {
channel.nack(msg, false, false);
}
};
}
}
var transportsInitialization = (async ({
config
}) => {
const {
transports
} = config;
Object.entries(transports).forEach(async ([, transport]) => {
const {
connectionString,
exchange,
queues = []
} = transport;
const connection = await amqpConnect(connectionString);
const channel = await connection.createChannel();
const {
name: exchangeName,
type: exchangeType,
options: exchangeOptions = {}
} = exchange;
await channel.assertExchange(exchangeName, exchangeType, exchangeOptions);
await Promise.all(queues.map(async ({
name: queueName,
bindingKey: _bindingKey = null,
options: queueOptions = {}
}) => {
const pattern = _bindingKey || '';
const {
queue: assertedQueue
} = await channel.assertQueue(queueName, queueOptions);
await channel.bindQueue(assertedQueue, exchangeName, pattern);
}));
connection.close();
});
});
export { CommandConsumer, CommandEmitter, DomainEventConsumer, DomainEventEmitter, EventDispatcher, amqpConnect, commandSubscriber, createCommand, createEvent, deadLetter, domainEventSubscriber, producer, pubSubInitialization, retryable, toJSON, transportsInitialization, worker };
//# sourceMappingURL=main.modern.js.map
(function (global, factory) {
typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports, require('amqplib'), require('uuid')) :
typeof define === 'function' && define.amd ? define(['exports', 'amqplib', 'uuid'], factory) :
(global = global || self, factory(global.nodejsAmqplib = {}, global.amqplib, global.uuid));
}(this, (function (exports, amqplib, uuid) {
typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports, require('uuid'), require('amqplib'), require('events'), require('path')) :
typeof define === 'function' && define.amd ? define(['exports', 'uuid', 'amqplib', 'events', 'path'], factory) :
(global = global || self, factory(global.nodejsAmqplib = {}, global.uuid, global.amqplib, global.events, global.path));
}(this, (function (exports, uuid, amqplib, events, path) {
const parse = ({
payload,
meta,
type
}) => {
const {
company,
context,
version,
entity,
name
} = meta;
return {
data: {
messageId: uuid.v4(),
occurredOn: Date.now(),
attributes: payload,
type: `${company}.${context}.${version}.${type}.${entity}.${name}`
}
};
};
const createEvent = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'domain_event'
});
return message;
};
const createCommand = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'command'
});
return message;
};
const toJSON = message => {
return JSON.parse(message.content.toString());
};
var producer = (function ({
channel,
message,
key,
exchange
}) {
try {
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({
exchange: exchangeAsserted
}) {
channel.publish(exchangeAsserted, key, Buffer.from(message));
return Promise.resolve(channel.close()).then(function () {});
});
} catch (e) {
return Promise.reject(e);
}
});
var worker = (function ({
channel,
exchange,
queue,
consumer
}) {
try {
const {
name: exchangeName,
type: exchangeType = 'topic',
options: exchangeOptions
} = exchange;
const {
name: queueName,
bindingKey,
options: queueOptions,
dlx = null
} = queue;
const {
onMessage,
options: consumerOptions
} = consumer;
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () {
function _temp2() {
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({
queue: assertedQueue
}) {
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () {
return channel.consume(assertedQueue, onMessage, consumerOptions);
});
});
}
const _temp = function () {
if (dlx !== null) {
const {
dlxQueue,
dlxExchange
} = dlx.params;
return Promise.resolve(dlx.func({
channel,
targetQueue: queue,
dlxQueue,
dlxExchange
})).then(function () {});
}
}();
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp);
});
} catch (e) {
return Promise.reject(e);
}
});
var deadLetter = (function ({
channel,
targetQueue,
dlxQueue,
dlxExchange
}) {
try {
const exchangeType = dlxExchange.type || 'direct';
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () {
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({
queue: queueAsserted
}) {
var _ref, _targetQueue$options$, _targetQueue$options;
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : '';
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {});
});
});
} catch (e) {
return Promise.reject(e);
}
});
function _extends() {

@@ -110,159 +256,728 @@ _extends = Object.assign || function (target) {

var deadLetter = (function ({
channel,
targetQueue,
dlxQueue,
dlxExchange
}) {
const amqpConnect = connectionString => {
return amqplib.connect(connectionString);
};
function _catch(body, recover) {
try {
const exchangeType = dlxExchange.type || 'direct';
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () {
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({
queue: queueAsserted
}) {
var _ref, _targetQueue$options$, _targetQueue$options;
var result = body();
} catch (e) {
return recover(e);
}
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : '';
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {});
});
});
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class EventDispatcher {
constructor({
config
}) {
const _this = this,
_this2 = this,
_this3 = this;
this.asyncDispatch = function ({
eventConfig,
messageBody
}) {
try {
const {
transport
} = eventConfig;
const {
connectionString,
exchange
} = _this.config.transports[transport];
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return _catch(function () {
return Promise.resolve(connection.createChannel()).then(function (channel) {
const {
type: key
} = messageBody.data;
const message = JSON.stringify(messageBody);
return Promise.resolve(producer({
channel,
message,
key,
exchange
})).then(function () {
connection.close();
return true;
});
});
}, function () {
connection.close();
return false;
});
});
} catch (e) {
return Promise.reject(e);
}
};
this.asyncCommandDispatch = function ({
eventConfig,
event
}) {
try {
const [company, context, version,, entity, name] = event.getName().split('.');
const messageBody = createCommand({
payload: event.getPayload(),
meta: {
company,
context,
version,
entity,
name
}
});
return Promise.resolve(_this2.asyncDispatch({
eventConfig,
messageBody
}));
} catch (e) {
return Promise.reject(e);
}
};
this.asyncEventdDispatch = function ({
eventConfig,
event
}) {
try {
const [company, context, version,, entity, name] = event.getName().split('.');
const messageBody = createEvent({
payload: event.getPayload(),
meta: {
company,
context,
version,
entity,
name
}
});
return Promise.resolve(_this3.asyncDispatch({
eventConfig,
messageBody
}));
} catch (e) {
return Promise.reject(e);
}
};
this.config = config;
}
dispatch({
event
}) {
try {
const _this4 = this;
const eventClassname = event.constructor.name;
const isCommand = /Command$/.test(eventClassname);
const isEvent = /Event$/.test(eventClassname);
const eventConfig = _this4.config.routing[eventClassname];
if (!eventConfig) {
throw Error(`Missed event configuration for event ${eventClassname}`);
}
if (eventConfig.async === true || eventConfig.async === undefined) {
if (isCommand) {
_this4.asyncCommandDispatch({
eventConfig,
event
});
}
if (isEvent) {
_this4.asyncEventdDispatch({
eventConfig,
event
});
}
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}
}
function _catch$1(body, recover) {
try {
var result = body();
} catch (e) {
return Promise.reject(e);
return recover(e);
}
});
var worker = (function ({
channel,
exchange,
queue,
consumer
}) {
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class DomainEventEmitter extends events.EventEmitter {
on(eventName, originalFunction) {
const decoratedFunction = function ({
message,
onSuccess,
onError
}) {
try {
const _temp = _catch$1(function () {
return Promise.resolve(originalFunction(message)).then(function () {
if (onSuccess) onSuccess();
});
}, function (error) {
if (onError) onError({
error
});
});
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0);
} catch (e) {
return Promise.reject(e);
}
};
super.addListener(eventName, decoratedFunction);
return this;
}
}
function _catch$2(body, recover) {
try {
const {
name: exchangeName,
type: exchangeType = 'topic',
options: exchangeOptions
} = exchange;
const {
name: queueName,
bindingKey,
options: queueOptions,
dlx = null
} = queue;
const {
onMessage,
options: consumerOptions
} = consumer;
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () {
function _temp2() {
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({
queue: assertedQueue
}) {
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () {
return channel.consume(assertedQueue, onMessage, consumerOptions);
var result = body();
} catch (e) {
return recover(e);
}
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class CommandEmitter extends events.EventEmitter {
on(eventName, originalFunction) {
const decoratedFunction = function ({
message,
onSuccess,
onError
}) {
try {
const _temp = _catch$2(function () {
return Promise.resolve(originalFunction(message)).then(function () {
if (onSuccess) onSuccess();
});
}, function (error) {
if (onError) onError({
error
});
});
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0);
} catch (e) {
return Promise.reject(e);
}
};
const _temp = function () {
if (dlx !== null) {
const {
dlxQueue,
dlxExchange
} = dlx.params;
return Promise.resolve(dlx.func({
channel,
targetQueue: queue,
dlxQueue,
dlxExchange
})).then(function () {});
super.addListener(eventName, decoratedFunction);
return this;
}
}
var domainEventSubscriber = {
subscribe: function ({
emitter,
eventDispatcher,
eventName,
commandsPath
}) {
try {
commandsPath.forEach(function (commandPath) {
try {
return Promise.resolve(import(path.resolve(commandPath.trim()))).then(function ({
default: Command
}) {
emitter.on(eventName, message => {
eventDispatcher.dispatch({
event: new Command({
message
})
});
});
});
} catch (e) {
return Promise.reject(e);
}
}();
});
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}
};
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp);
});
} catch (e) {
return Promise.reject(e);
var commandSubscriber = {
subscribe: function ({
emitter,
eventName,
handlerPath
}) {
try {
return Promise.resolve(import(path.resolve(handlerPath.trim()))).then(function ({
default: CommandHandler
}) {
return Promise.resolve(import(path.resolve(handlerPath.replace('Handler', '').trim()))).then(function ({
default: Command
}) {
emitter.on(eventName, message => {
const handler = new CommandHandler({
command: new Command({
message
})
});
handler.handle();
});
});
});
} catch (e) {
return Promise.reject(e);
}
}
});
};
var producer = (function ({
channel,
message,
key,
exchange
let commandEmitter = null;
let domainEventEmitter = null;
const getCommandEmitter = () => {
return commandEmitter;
};
const getDomainEventsEmitter = () => {
return domainEventEmitter;
};
const subscribersInitialization = function ({
subscribers,
eventDispatcher,
// eslint-disable-next-line no-shadow
domainEventEmitter,
// eslint-disable-next-line no-shadow
commandEmitter
}) {
try {
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({
exchange: exchangeAsserted
}) {
channel.publish(exchangeAsserted, key, Buffer.from(message));
return Promise.resolve(channel.close()).then(function () {});
});
function _temp3() {
const _temp = function () {
if (subscribers.commands) {
return Promise.resolve(Promise.all(subscribers.commands.map(function ({
eventName,
handlerPath
}) {
try {
try {
commandSubscriber.subscribe({
emitter: commandEmitter,
eventName,
handlerPath
});
} catch (error) {
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}))).then(function () {});
}
}();
if (_temp && _temp.then) return _temp.then(function () {});
}
const _temp2 = function () {
if (subscribers.domainEvents) {
return Promise.resolve(Promise.all(subscribers.domainEvents.map(function ({
eventName,
commandsPath
}) {
try {
try {
domainEventSubscriber.subscribe({
eventDispatcher,
emitter: domainEventEmitter,
eventName,
commandsPath
});
} catch (error) {
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
}))).then(function () {});
}
}();
return Promise.resolve(_temp2 && _temp2.then ? _temp2.then(_temp3) : _temp3(_temp2));
} catch (e) {
return Promise.reject(e);
}
});
};
const parse = ({
payload,
meta,
type
const pubSubInitialization = ({
config
}) => {
const eventDispatcher = new EventDispatcher({
config
});
domainEventEmitter = new DomainEventEmitter();
commandEmitter = new CommandEmitter();
const {
company,
context,
version,
entity,
name
} = meta;
subscribers
} = config;
subscribersInitialization({
subscribers,
eventDispatcher,
domainEventEmitter,
commandEmitter
});
return {
data: {
messageId: uuid.v4(),
occurredOn: Date.now(),
attributes: payload,
type: `${company}.${context}.${version}.${type}.${entity}.${name}`
}
eventDispatcher,
domainEventEmitter,
commandEmitter
};
};
const createEvent = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'domain_event'
});
return message;
};
function _catch$3(body, recover) {
try {
var result = body();
} catch (e) {
return recover(e);
}
const createCommand = ({
payload,
meta
}) => {
const message = parse({
payload,
meta,
type: 'command'
});
return message;
};
if (result && result.then) {
return result.then(void 0, recover);
}
const toJSON = message => {
return JSON.parse(message.content.toString());
};
return result;
}
const amqpConnect = connectionString => {
return amqplib.connect(connectionString);
};
class CommandConsumer {
constructor({
config
}) {
this.transports = config.transports;
}
consume({
transport,
queueName,
prefetchValue,
emitter = null
}) {
try {
const _this = this;
if (!_this.transports[transport]) {
throw Error(`Transport ${transport} is not defined`);
}
const {
connectionString,
queues
} = _this.transports[transport];
const queue = queues.find(({
name
}) => name === queueName);
if (!queue) {
throw Error(`Queue ${queue} is not defined`);
}
const {
retryPolicy = null
} = queue;
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return _catch$3(function () {
return Promise.resolve(connection.createChannel()).then(function (channel) {
if (prefetchValue) {
channel.prefetch(prefetchValue);
}
const commandEmitter = emitter === null ? getCommandEmitter() : emitter;
const onMessage = _this.workable({
channel,
emitter: commandEmitter,
retryPolicy,
queueName
});
channel.consume(queueName, onMessage, {
noAck: false
});
});
}, function (error) {
connection.close();
throw error;
});
});
} catch (e) {
return Promise.reject(e);
}
}
workable({
channel,
emitter,
retryPolicy,
queueName
}) {
return function (msg) {
try {
const message = toJSON(msg);
const {
data
} = message;
const {
type: eventName
} = data;
try {
const onError = () => {
if (!retryPolicy) channel.nack(msg, false, false);
const {
maxRetries,
delay,
retryExchangeName = 'blom.retries.exchange',
onRejected = null
} = retryPolicy;
retryable({
channel,
message: msg,
queue: {
name: queueName
},
retryExchange: {
name: retryExchangeName
},
maxRetries,
delay,
onRejected
});
};
const onSuccess = () => {
channel.ack(msg);
};
emitter.emit(eventName, {
message,
onSuccess,
onError
});
} catch (error) {
channel.nack(msg, false, false);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
};
}
}
function _catch$4(body, recover) {
try {
var result = body();
} catch (e) {
return recover(e);
}
if (result && result.then) {
return result.then(void 0, recover);
}
return result;
}
class DomainEventConsumer {
constructor({
config
}) {
this.transports = config.transports;
}
consume({
transport,
queueName,
prefetchValue,
emitter = null
}) {
try {
const _this = this;
if (!_this.transports[transport]) {
throw Error(`Transport ${transport} is not defined`);
}
const {
connectionString,
queues
} = _this.transports[transport];
const queue = queues.find(({
name
}) => name === queueName);
if (!queue) {
throw Error(`Queue ${queue} is not defined`);
}
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return _catch$4(function () {
return Promise.resolve(connection.createChannel()).then(function (channel) {
if (prefetchValue) {
channel.prefetch(prefetchValue);
}
const domainEventsEmitter = emitter === null ? getDomainEventsEmitter() : emitter;
const consumerData = {
onMessage: _this.workable({
channel,
emitter: domainEventsEmitter
}),
options: {
noAck: false
}
};
channel.consume(queueName, consumerData.onMessage, consumerData.options);
});
}, function (error) {
connection.close();
throw error;
});
});
} catch (e) {
return Promise.reject(e);
}
}
workable({
channel,
emitter
}) {
return function (msg) {
try {
const message = toJSON(msg);
const {
data
} = message;
const {
type: eventName
} = data;
try {
emitter.emit(eventName, {
message
});
channel.ack(msg);
} catch (error) {
channel.nack(msg, false, false);
}
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
};
}
}
var transportsInitialization = (function ({
config
}) {
try {
const {
transports
} = config;
Object.entries(transports).forEach(function ([, transport]) {
try {
const {
connectionString,
exchange,
queues = []
} = transport;
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) {
return Promise.resolve(connection.createChannel()).then(function (channel) {
const {
name: exchangeName,
type: exchangeType,
options: exchangeOptions = {}
} = exchange;
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () {
return Promise.resolve(Promise.all(queues.map(function ({
name: queueName,
bindingKey = null,
options: queueOptions = {}
}) {
try {
const pattern = bindingKey || '';
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({
queue: assertedQueue
}) {
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, pattern)).then(function () {});
});
} catch (e) {
return Promise.reject(e);
}
}))).then(function () {
connection.close();
});
});
});
});
} catch (e) {
return Promise.reject(e);
}
});
return Promise.resolve();
} catch (e) {
return Promise.reject(e);
}
});
exports.CommandConsumer = CommandConsumer;
exports.CommandEmitter = CommandEmitter;
exports.DomainEventConsumer = DomainEventConsumer;
exports.DomainEventEmitter = DomainEventEmitter;
exports.EventDispatcher = EventDispatcher;
exports.amqpConnect = amqpConnect;
exports.commandSubscriber = commandSubscriber;
exports.createCommand = createCommand;
exports.createEvent = createEvent;
exports.deadLetter = deadLetter;
exports.domainEventSubscriber = domainEventSubscriber;
exports.producer = producer;
exports.pubSubInitialization = pubSubInitialization;
exports.retryable = retryable;
exports.toJSON = toJSON;
exports.transportsInitialization = transportsInitialization;
exports.worker = worker;

@@ -269,0 +984,0 @@

{
"name": "@thecolvinco/nodejs-amqplib",
"description": "RabbitMQ abstraction with some utils",
"version": "1.0.0",
"version": "2.0.0",
"source": "src/main.ts",

@@ -86,2 +86,3 @@ "main": "dist/main.umd.js",

"amqplib": "^0.7.1",
"events": "^3.3.0",
"uuid": "^8.3.2"

@@ -88,0 +89,0 @@ },

@@ -1,136 +0,141 @@

# Colvin nodejs rabbitmq utils
# Colvin nodejs async events
A RabbitMQ package with some utils like retryables to use across diferents projects.
A RabbitMQ package with some utils like retryables
### Usage
#### Consumer/worker example
```javascript
import { amqpConnect, retryable, worker, deadLetter, toJSON } from '@thecolvinco/nodejs-amqplib';
const connectionString = 'amqp://rabbitmq:rabbitmq@localhost:5672';
const args = process.argv.slice(2);
const queueName = args[0] || 'send-invoice-on-shopify-order-created';
const bindingKey = args[1] || 'blom.superapp.1.event.shopify.order-created';
const dlxExchange = args[2] || 'blom.superapp.dlx.direct';
const dlxRoutingKey = args[3] || 'blom.superapp.dlx';
const dlxQueueName = args[4] || 'blom-superapp-dlx-queue';
This library give you some classes, functions, types and interfaces ready to use. The idea behind this project has been inspired from messenger Symfony component so we have to define a config file and a couple of files. Let's see how to start it with.
* Create a config file with a required structure
* Setup your transports and bind the events
* Create your events and commands with their respective handlers
* Create your consumers
const workable = (channel) => {
return async (msg) => {
const payload = toJSON(msg).payload;
// Simulate some retryables
if (payload === 'retry') {
await retryable({
channel,
message: msg,
queue: {
name: queueName,
### Create a config file with a required structure
You have to create a file with this structure. Later you must to import this file so you can put it in any place. Let's see an example explained
```js
export default {
transports: {
accounting_commands: { // A transport unique name as identifier
connectionString: process.env.RABBITMQ_CONNECTION_URL, // A dns connection string like --> amqp://rabbitmq:rabbitmq@rabbitmq-blom:5672
exchange: { // the exchange for this transport where all message will be send it
name: 'blom.accounting.commands.exchange',
type: 'fanout',
},
queues: [
{
name: 'blom.accounting.commands', // A queue name
bindingKey: null, // A queue binding key. You can use null for fanout exchanges or wildcard string for topic ones
options: { // All allowed queue options provided for the ampq node lib
durable: true,
deadLetterExchange: 'blom.accounting.exchange.dlx',
deadLetterRoutingKey: 'blom.accounting.commands.dlx',
},
retryPolicy: { // The retries policiy for this trasnport. The retries will be try in a exponential way (firstly in 2s, secondly in 4s, lastly in 6s)
maxRetries: 3,
delay: 2000,
},
},
retryExchange: {
name: 'retries.exchange',
},
maxRetries: 3,
delay: 1000,
});
} else {
channel.ack(msg);
};
};
};
try {
const connection = await amqpConnect(connectionString);
const channel = await conn.createChannel();
const exchangeData = {
name: 'blom.exchange.topic',
type: 'topic',
};
const queueData = {
name: queueName,
bindingKey,
options: {
durable: true,
deadLetterExchange: dlxExchange,
deadLetterRoutingKey: dlxRoutingKey
],
},
dlx: {
func: deadLetter,
params: {
channel,
dlxQueue: {
name: dlxQueueName,
},
dlxExchange: {
name: dlxExchange,
type: 'direct',
},
// <----- others trasnports goes here
},
subscribers: {
domainEvents: [ // A mandatory key. All your domains events config goes under it
{
eventName: 'blom.superapp.1.domain_event.order.updated', // The event which the built-in domain events subscribers will subscribe it
commandsPath: [ // An array of commands paths witch will be created and dispached. These events (in this case commands) must implement the EventInterface
'server/events/orders/command/SyncronizeOrderCommand.js',
'server/events/orders/command/SendOrderNotificationCommand.js',
],
},
// <--- Other events goes here
],
commands: [
{
eventName: 'blom.accounting.1.command.order.syncronize_order', // The event which the built-in command subscribers will subscribe it
handlerPath: 'server/events/orders/command/SyncronizeOrderCommandHandler.js', // The handler which will be invoke passing their Command as argument
},
// <--- Other commands goes here
],
},
routing: { // A map of events wich will can dispatch through a specific transports
SyncronizeOrderCommand: { // The key is the name of the class ([instance].constructor.name)
transport: 'accounting_commands',
},
};
// <--- Other events goes here
},
};
const consumerData = {
onMessage: workable(channel),
options: {
noAck: false
},
};
```
---
### Setup your transports and bind the events
There are two functions for setting up. You should use this functions before app init.
```js
import { pubSubInitialization, transportsInitialization } from '@thecolvinco/nodejs-amqplib';
import config from 'path-to-your-config-file';
await worker({
channel,
exchange: exchangeData,
queue: queueData,
consumer: consumerData,
});
const {
eventDispatcher,
domainEventEmitter,
commandEmitter,
} = pubSubInitialization({ config });
console.log("Wating for messages....");
} catch(error) {
console.log(error)
}
transportsInitialization({ config });
```
---
### Create your events and commands with their respective handlers
This is an example for a Command
```js
export default class SendOrderNotificationCommand {
constructor({ message }) {
this.message = message;
}
#### Producer example
```javascript
import { amqpConnect, producer, createEvent } from '@thecolvinco/nodejs-amqplib';
getName() {
return 'blom.accounting.1.command.order.send_notification';
}
const connectionString = 'amqp://rabbitmq:rabbitmq@localhost:5672';
const args = process.argv.slice(2);
getPayload() {
return this.message.data.attributes;
}
try {
const connection = await amqpConnect(connectionString);
const channel = await conn.createChannel();
const exchange = {
name: 'blom.exchange.topic',
type: 'topic',
};
const messageBody = createEvent({
payload: args[0] || 'Hello world',
meta: {
company: 'blom',
context: 'superapp',
version: 1,
entity: 'shopify',
name: args[1] || 'order-created',
},
});
const key = messageBody.name;
const message = JSON.stringify(messageBody);
getCorrelationId() {
return this.message.data.messageId || null;
}
}
```
This is an example for a command handler
```js
export default class SendOrderNotificationCommandHandler {
constructor({ command }) {
this.command = command;
}
await producer({
channel,
message,
key,
exchange,
});
handle() {
console.log('Do something!');
}
}
```
---
### Create your consumers
```js
import { CommandConsumer } from '@thecolvinco/nodejs-amqplib';
import config from 'path-to-your-config';
console.log(`[x] Sent ${message} to ${exchange.name} with key ${key}`);
const commandConsumer = new CommandConsumer({ config });
connection.close();
} catch (error) {
console.log(error);
}
commandConsumer.consume({
transport: 'accounting_commands',
queueName: 'blom.accounting.commands',
}).then(() => {
console.info('Waiting for messages....');
}).catch(error => {
console.error(error.message);
process.exit();
});
```
### About

@@ -137,0 +142,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc