@thecolvinco/nodejs-amqplib
Advanced tools
Comparing version 1.0.0 to 2.0.0
@@ -1,8 +0,6 @@ | ||
import { Connection } from 'amqplib'; | ||
import retryable from './utils/retryable'; | ||
import deadLetter from './utils/deadLetter'; | ||
import worker from './utils/worker'; | ||
import producer from './utils/producer'; | ||
import { createEvent, createCommand, toJSON } from './utils/message'; | ||
declare const amqpConnect: (connectionString: string) => Promise<Connection>; | ||
export { amqpConnect, retryable, deadLetter, worker, producer, createEvent, createCommand, toJSON, }; | ||
import { createEvent, createCommand, toJSON, producer, worker, deadLetter, retryable, amqpConnect } from './utils'; | ||
import { CommandConsumer, DomainEventConsumer } from './consumer'; | ||
import { pubSubInitialization, transportsInitialization } from './container'; | ||
import { CommandEmitter, DomainEventEmitter, EventDispatcher } from './emitter'; | ||
import { commandSubscriber, domainEventSubscriber } from './subscribers'; | ||
export { amqpConnect, retryable, deadLetter, worker, producer, createEvent, createCommand, toJSON, CommandConsumer, DomainEventConsumer, pubSubInitialization, transportsInitialization, CommandEmitter, DomainEventEmitter, EventDispatcher, commandSubscriber, domainEventSubscriber }; |
@@ -0,4 +1,152 @@ | ||
import { v4 } from 'uuid'; | ||
import { connect } from 'amqplib'; | ||
import { v4 } from 'uuid'; | ||
import { EventEmitter } from 'events'; | ||
import { resolve } from 'path'; | ||
const parse = ({ | ||
payload, | ||
meta, | ||
type | ||
}) => { | ||
const { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} = meta; | ||
return { | ||
data: { | ||
messageId: v4(), | ||
occurredOn: Date.now(), | ||
attributes: payload, | ||
type: `${company}.${context}.${version}.${type}.${entity}.${name}` | ||
} | ||
}; | ||
}; | ||
const createEvent = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'domain_event' | ||
}); | ||
return message; | ||
}; | ||
const createCommand = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'command' | ||
}); | ||
return message; | ||
}; | ||
const toJSON = message => { | ||
return JSON.parse(message.content.toString()); | ||
}; | ||
var producer = (function ({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
}) { | ||
try { | ||
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({ | ||
exchange: exchangeAsserted | ||
}) { | ||
channel.publish(exchangeAsserted, key, Buffer.from(message)); | ||
return Promise.resolve(channel.close()).then(function () {}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
var worker = (function ({ | ||
channel, | ||
exchange, | ||
queue, | ||
consumer | ||
}) { | ||
try { | ||
const { | ||
name: exchangeName, | ||
type: exchangeType = 'topic', | ||
options: exchangeOptions | ||
} = exchange; | ||
const { | ||
name: queueName, | ||
bindingKey, | ||
options: queueOptions, | ||
dlx = null | ||
} = queue; | ||
const { | ||
onMessage, | ||
options: consumerOptions | ||
} = consumer; | ||
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () { | ||
function _temp2() { | ||
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({ | ||
queue: assertedQueue | ||
}) { | ||
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () { | ||
return channel.consume(assertedQueue, onMessage, consumerOptions); | ||
}); | ||
}); | ||
} | ||
const _temp = function () { | ||
if (dlx !== null) { | ||
const { | ||
dlxQueue, | ||
dlxExchange | ||
} = dlx.params; | ||
return Promise.resolve(dlx.func({ | ||
channel, | ||
targetQueue: queue, | ||
dlxQueue, | ||
dlxExchange | ||
})).then(function () {}); | ||
} | ||
}(); | ||
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
var deadLetter = (function ({ | ||
channel, | ||
targetQueue, | ||
dlxQueue, | ||
dlxExchange | ||
}) { | ||
try { | ||
const exchangeType = dlxExchange.type || 'direct'; | ||
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () { | ||
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({ | ||
queue: queueAsserted | ||
}) { | ||
var _ref, _targetQueue$options$, _targetQueue$options; | ||
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : ''; | ||
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
function _extends() { | ||
@@ -108,153 +256,713 @@ _extends = Object.assign || function (target) { | ||
var deadLetter = (function ({ | ||
channel, | ||
targetQueue, | ||
dlxQueue, | ||
dlxExchange | ||
}) { | ||
const amqpConnect = connectionString => { | ||
return connect(connectionString); | ||
}; | ||
function _catch(body, recover) { | ||
try { | ||
const exchangeType = dlxExchange.type || 'direct'; | ||
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () { | ||
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({ | ||
queue: queueAsserted | ||
}) { | ||
var _ref, _targetQueue$options$, _targetQueue$options; | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : ''; | ||
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {}); | ||
}); | ||
}); | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class EventDispatcher { | ||
constructor({ | ||
config | ||
}) { | ||
const _this = this, | ||
_this2 = this, | ||
_this3 = this; | ||
this.asyncDispatch = function ({ | ||
eventConfig, | ||
messageBody | ||
}) { | ||
try { | ||
const { | ||
transport | ||
} = eventConfig; | ||
const { | ||
connectionString, | ||
exchange | ||
} = _this.config.transports[transport]; | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return _catch(function () { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
const { | ||
type: key | ||
} = messageBody.data; | ||
const message = JSON.stringify(messageBody); | ||
return Promise.resolve(producer({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
})).then(function () { | ||
connection.close(); | ||
return true; | ||
}); | ||
}); | ||
}, function () { | ||
connection.close(); | ||
return false; | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
this.asyncCommandDispatch = function ({ | ||
eventConfig, | ||
event | ||
}) { | ||
try { | ||
const [company, context, version,, entity, name] = event.getName().split('.'); | ||
const messageBody = createCommand({ | ||
payload: event.getPayload(), | ||
meta: { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} | ||
}); | ||
return Promise.resolve(_this2.asyncDispatch({ | ||
eventConfig, | ||
messageBody | ||
})); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
this.asyncEventdDispatch = function ({ | ||
eventConfig, | ||
event | ||
}) { | ||
try { | ||
const [company, context, version,, entity, name] = event.getName().split('.'); | ||
const messageBody = createEvent({ | ||
payload: event.getPayload(), | ||
meta: { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} | ||
}); | ||
return Promise.resolve(_this3.asyncDispatch({ | ||
eventConfig, | ||
messageBody | ||
})); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
this.config = config; | ||
} | ||
dispatch({ | ||
event | ||
}) { | ||
try { | ||
const _this4 = this; | ||
const eventClassname = event.constructor.name; | ||
const isCommand = /Command$/.test(eventClassname); | ||
const isEvent = /Event$/.test(eventClassname); | ||
const eventConfig = _this4.config.routing[eventClassname]; | ||
if (!eventConfig) { | ||
throw Error(`Missed event configuration for event ${eventClassname}`); | ||
} | ||
if (eventConfig.async === true || eventConfig.async === undefined) { | ||
if (isCommand) { | ||
_this4.asyncCommandDispatch({ | ||
eventConfig, | ||
event | ||
}); | ||
} | ||
if (isEvent) { | ||
_this4.asyncEventdDispatch({ | ||
eventConfig, | ||
event | ||
}); | ||
} | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
} | ||
function _catch$1(body, recover) { | ||
try { | ||
var result = body(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
return recover(e); | ||
} | ||
}); | ||
var worker = (function ({ | ||
channel, | ||
exchange, | ||
queue, | ||
consumer | ||
}) { | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class DomainEventEmitter extends EventEmitter { | ||
on(eventName, originalFunction) { | ||
const decoratedFunction = function ({ | ||
message, | ||
onSuccess, | ||
onError | ||
}) { | ||
try { | ||
const _temp = _catch$1(function () { | ||
return Promise.resolve(originalFunction(message)).then(function () { | ||
if (onSuccess) onSuccess(); | ||
}); | ||
}, function (error) { | ||
if (onError) onError({ | ||
error | ||
}); | ||
}); | ||
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
super.addListener(eventName, decoratedFunction); | ||
return this; | ||
} | ||
} | ||
function _catch$2(body, recover) { | ||
try { | ||
const { | ||
name: exchangeName, | ||
type: exchangeType = 'topic', | ||
options: exchangeOptions | ||
} = exchange; | ||
const { | ||
name: queueName, | ||
bindingKey, | ||
options: queueOptions, | ||
dlx = null | ||
} = queue; | ||
const { | ||
onMessage, | ||
options: consumerOptions | ||
} = consumer; | ||
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () { | ||
function _temp2() { | ||
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({ | ||
queue: assertedQueue | ||
}) { | ||
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () { | ||
return channel.consume(assertedQueue, onMessage, consumerOptions); | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class CommandEmitter extends EventEmitter { | ||
on(eventName, originalFunction) { | ||
const decoratedFunction = function ({ | ||
message, | ||
onSuccess, | ||
onError | ||
}) { | ||
try { | ||
const _temp = _catch$2(function () { | ||
return Promise.resolve(originalFunction(message)).then(function () { | ||
if (onSuccess) onSuccess(); | ||
}); | ||
}, function (error) { | ||
if (onError) onError({ | ||
error | ||
}); | ||
}); | ||
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
const _temp = function () { | ||
if (dlx !== null) { | ||
const { | ||
dlxQueue, | ||
dlxExchange | ||
} = dlx.params; | ||
return Promise.resolve(dlx.func({ | ||
channel, | ||
targetQueue: queue, | ||
dlxQueue, | ||
dlxExchange | ||
})).then(function () {}); | ||
super.addListener(eventName, decoratedFunction); | ||
return this; | ||
} | ||
} | ||
var domainEventSubscriber = { | ||
subscribe: function ({ | ||
emitter, | ||
eventDispatcher, | ||
eventName, | ||
commandsPath | ||
}) { | ||
try { | ||
commandsPath.forEach(function (commandPath) { | ||
try { | ||
return Promise.resolve(import(resolve(commandPath.trim()))).then(function ({ | ||
default: Command | ||
}) { | ||
emitter.on(eventName, message => { | ||
eventDispatcher.dispatch({ | ||
event: new Command({ | ||
message | ||
}) | ||
}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}(); | ||
}); | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
}; | ||
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
var commandSubscriber = { | ||
subscribe: function ({ | ||
emitter, | ||
eventName, | ||
handlerPath | ||
}) { | ||
try { | ||
return Promise.resolve(import(resolve(handlerPath.trim()))).then(function ({ | ||
default: CommandHandler | ||
}) { | ||
return Promise.resolve(import(resolve(handlerPath.replace('Handler', '').trim()))).then(function ({ | ||
default: Command | ||
}) { | ||
emitter.on(eventName, message => { | ||
const handler = new CommandHandler({ | ||
command: new Command({ | ||
message | ||
}) | ||
}); | ||
handler.handle(); | ||
}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
}); | ||
}; | ||
var producer = (function ({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
let commandEmitter = null; | ||
let domainEventEmitter = null; | ||
const getCommandEmitter = () => { | ||
return commandEmitter; | ||
}; | ||
const getDomainEventsEmitter = () => { | ||
return domainEventEmitter; | ||
}; | ||
const subscribersInitialization = function ({ | ||
subscribers, | ||
eventDispatcher, | ||
// eslint-disable-next-line no-shadow | ||
domainEventEmitter, | ||
// eslint-disable-next-line no-shadow | ||
commandEmitter | ||
}) { | ||
try { | ||
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({ | ||
exchange: exchangeAsserted | ||
}) { | ||
channel.publish(exchangeAsserted, key, Buffer.from(message)); | ||
return Promise.resolve(channel.close()).then(function () {}); | ||
}); | ||
function _temp3() { | ||
const _temp = function () { | ||
if (subscribers.commands) { | ||
return Promise.resolve(Promise.all(subscribers.commands.map(function ({ | ||
eventName, | ||
handlerPath | ||
}) { | ||
try { | ||
try { | ||
commandSubscriber.subscribe({ | ||
emitter: commandEmitter, | ||
eventName, | ||
handlerPath | ||
}); | ||
} catch (error) { | ||
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}))).then(function () {}); | ||
} | ||
}(); | ||
if (_temp && _temp.then) return _temp.then(function () {}); | ||
} | ||
const _temp2 = function () { | ||
if (subscribers.domainEvents) { | ||
return Promise.resolve(Promise.all(subscribers.domainEvents.map(function ({ | ||
eventName, | ||
commandsPath | ||
}) { | ||
try { | ||
try { | ||
domainEventSubscriber.subscribe({ | ||
eventDispatcher, | ||
emitter: domainEventEmitter, | ||
eventName, | ||
commandsPath | ||
}); | ||
} catch (error) { | ||
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}))).then(function () {}); | ||
} | ||
}(); | ||
return Promise.resolve(_temp2 && _temp2.then ? _temp2.then(_temp3) : _temp3(_temp2)); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
}; | ||
const parse = ({ | ||
payload, | ||
meta, | ||
type | ||
const pubSubInitialization = ({ | ||
config | ||
}) => { | ||
const eventDispatcher = new EventDispatcher({ | ||
config | ||
}); | ||
domainEventEmitter = new DomainEventEmitter(); | ||
commandEmitter = new CommandEmitter(); | ||
const { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} = meta; | ||
subscribers | ||
} = config; | ||
subscribersInitialization({ | ||
subscribers, | ||
eventDispatcher, | ||
domainEventEmitter, | ||
commandEmitter | ||
}); | ||
return { | ||
data: { | ||
messageId: v4(), | ||
occurredOn: Date.now(), | ||
attributes: payload, | ||
type: `${company}.${context}.${version}.${type}.${entity}.${name}` | ||
} | ||
eventDispatcher, | ||
domainEventEmitter, | ||
commandEmitter | ||
}; | ||
}; | ||
const createEvent = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'domain_event' | ||
}); | ||
return message; | ||
}; | ||
function _catch$3(body, recover) { | ||
try { | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
const createCommand = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'command' | ||
}); | ||
return message; | ||
}; | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
const toJSON = message => { | ||
return JSON.parse(message.content.toString()); | ||
}; | ||
return result; | ||
} | ||
const amqpConnect = connectionString => { | ||
return connect(connectionString); | ||
}; | ||
class CommandConsumer { | ||
constructor({ | ||
config | ||
}) { | ||
this.transports = config.transports; | ||
} | ||
export { amqpConnect, createCommand, createEvent, deadLetter, producer, retryable, toJSON, worker }; | ||
consume({ | ||
transport, | ||
queueName, | ||
prefetchValue, | ||
emitter = null | ||
}) { | ||
try { | ||
const _this = this; | ||
if (!_this.transports[transport]) { | ||
throw Error(`Transport ${transport} is not defined`); | ||
} | ||
const { | ||
connectionString, | ||
queues | ||
} = _this.transports[transport]; | ||
const queue = queues.find(({ | ||
name | ||
}) => name === queueName); | ||
if (!queue) { | ||
throw Error(`Queue ${queue} is not defined`); | ||
} | ||
const { | ||
retryPolicy = null | ||
} = queue; | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return _catch$3(function () { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
if (prefetchValue) { | ||
channel.prefetch(prefetchValue); | ||
} | ||
const commandEmitter = emitter === null ? getCommandEmitter() : emitter; | ||
const onMessage = _this.workable({ | ||
channel, | ||
emitter: commandEmitter, | ||
retryPolicy, | ||
queueName | ||
}); | ||
channel.consume(queueName, onMessage, { | ||
noAck: false | ||
}); | ||
}); | ||
}, function (error) { | ||
connection.close(); | ||
throw error; | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
workable({ | ||
channel, | ||
emitter, | ||
retryPolicy, | ||
queueName | ||
}) { | ||
return function (msg) { | ||
try { | ||
const message = toJSON(msg); | ||
const { | ||
data | ||
} = message; | ||
const { | ||
type: eventName | ||
} = data; | ||
try { | ||
const onError = () => { | ||
if (!retryPolicy) channel.nack(msg, false, false); | ||
const { | ||
maxRetries, | ||
delay, | ||
retryExchangeName = 'blom.retries.exchange', | ||
onRejected = null | ||
} = retryPolicy; | ||
retryable({ | ||
channel, | ||
message: msg, | ||
queue: { | ||
name: queueName | ||
}, | ||
retryExchange: { | ||
name: retryExchangeName | ||
}, | ||
maxRetries, | ||
delay, | ||
onRejected | ||
}); | ||
}; | ||
const onSuccess = () => { | ||
channel.ack(msg); | ||
}; | ||
emitter.emit(eventName, { | ||
message, | ||
onSuccess, | ||
onError | ||
}); | ||
} catch (error) { | ||
channel.nack(msg, false, false); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
} | ||
} | ||
function _catch$4(body, recover) { | ||
try { | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class DomainEventConsumer { | ||
constructor({ | ||
config | ||
}) { | ||
this.transports = config.transports; | ||
} | ||
consume({ | ||
transport, | ||
queueName, | ||
prefetchValue, | ||
emitter = null | ||
}) { | ||
try { | ||
const _this = this; | ||
if (!_this.transports[transport]) { | ||
throw Error(`Transport ${transport} is not defined`); | ||
} | ||
const { | ||
connectionString, | ||
queues | ||
} = _this.transports[transport]; | ||
const queue = queues.find(({ | ||
name | ||
}) => name === queueName); | ||
if (!queue) { | ||
throw Error(`Queue ${queue} is not defined`); | ||
} | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return _catch$4(function () { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
if (prefetchValue) { | ||
channel.prefetch(prefetchValue); | ||
} | ||
const domainEventsEmitter = emitter === null ? getDomainEventsEmitter() : emitter; | ||
const consumerData = { | ||
onMessage: _this.workable({ | ||
channel, | ||
emitter: domainEventsEmitter | ||
}), | ||
options: { | ||
noAck: false | ||
} | ||
}; | ||
channel.consume(queueName, consumerData.onMessage, consumerData.options); | ||
}); | ||
}, function (error) { | ||
connection.close(); | ||
throw error; | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
workable({ | ||
channel, | ||
emitter | ||
}) { | ||
return function (msg) { | ||
try { | ||
const message = toJSON(msg); | ||
const { | ||
data | ||
} = message; | ||
const { | ||
type: eventName | ||
} = data; | ||
try { | ||
emitter.emit(eventName, { | ||
message | ||
}); | ||
channel.ack(msg); | ||
} catch (error) { | ||
channel.nack(msg, false, false); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
} | ||
} | ||
var transportsInitialization = (function ({ | ||
config | ||
}) { | ||
try { | ||
const { | ||
transports | ||
} = config; | ||
Object.entries(transports).forEach(function ([, transport]) { | ||
try { | ||
const { | ||
connectionString, | ||
exchange, | ||
queues = [] | ||
} = transport; | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
const { | ||
name: exchangeName, | ||
type: exchangeType, | ||
options: exchangeOptions = {} | ||
} = exchange; | ||
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () { | ||
return Promise.resolve(Promise.all(queues.map(function ({ | ||
name: queueName, | ||
bindingKey = null, | ||
options: queueOptions = {} | ||
}) { | ||
try { | ||
const pattern = bindingKey || ''; | ||
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({ | ||
queue: assertedQueue | ||
}) { | ||
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, pattern)).then(function () {}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}))).then(function () { | ||
connection.close(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
export { CommandConsumer, CommandEmitter, DomainEventConsumer, DomainEventEmitter, EventDispatcher, amqpConnect, commandSubscriber, createCommand, createEvent, deadLetter, domainEventSubscriber, producer, pubSubInitialization, retryable, toJSON, transportsInitialization, worker }; | ||
//# sourceMappingURL=main.esm.js.map |
978
dist/main.js
@@ -0,4 +1,171 @@ | ||
function _interopNamespace(e) { | ||
if (e && e.__esModule) { return e; } else { | ||
var n = {}; | ||
if (e) { | ||
Object.keys(e).forEach(function (k) { | ||
var d = Object.getOwnPropertyDescriptor(e, k); | ||
Object.defineProperty(n, k, d.get ? d : { | ||
enumerable: true, | ||
get: function () { | ||
return e[k]; | ||
} | ||
}); | ||
}); | ||
} | ||
n['default'] = e; | ||
return n; | ||
} | ||
} | ||
var uuid = require('uuid'); | ||
var amqplib = require('amqplib'); | ||
var uuid = require('uuid'); | ||
var events = require('events'); | ||
var path = require('path'); | ||
const parse = ({ | ||
payload, | ||
meta, | ||
type | ||
}) => { | ||
const { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} = meta; | ||
return { | ||
data: { | ||
messageId: uuid.v4(), | ||
occurredOn: Date.now(), | ||
attributes: payload, | ||
type: `${company}.${context}.${version}.${type}.${entity}.${name}` | ||
} | ||
}; | ||
}; | ||
const createEvent = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'domain_event' | ||
}); | ||
return message; | ||
}; | ||
const createCommand = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'command' | ||
}); | ||
return message; | ||
}; | ||
const toJSON = message => { | ||
return JSON.parse(message.content.toString()); | ||
}; | ||
var producer = (function ({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
}) { | ||
try { | ||
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({ | ||
exchange: exchangeAsserted | ||
}) { | ||
channel.publish(exchangeAsserted, key, Buffer.from(message)); | ||
return Promise.resolve(channel.close()).then(function () {}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
var worker = (function ({ | ||
channel, | ||
exchange, | ||
queue, | ||
consumer | ||
}) { | ||
try { | ||
const { | ||
name: exchangeName, | ||
type: exchangeType = 'topic', | ||
options: exchangeOptions | ||
} = exchange; | ||
const { | ||
name: queueName, | ||
bindingKey, | ||
options: queueOptions, | ||
dlx = null | ||
} = queue; | ||
const { | ||
onMessage, | ||
options: consumerOptions | ||
} = consumer; | ||
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () { | ||
function _temp2() { | ||
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({ | ||
queue: assertedQueue | ||
}) { | ||
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () { | ||
return channel.consume(assertedQueue, onMessage, consumerOptions); | ||
}); | ||
}); | ||
} | ||
const _temp = function () { | ||
if (dlx !== null) { | ||
const { | ||
dlxQueue, | ||
dlxExchange | ||
} = dlx.params; | ||
return Promise.resolve(dlx.func({ | ||
channel, | ||
targetQueue: queue, | ||
dlxQueue, | ||
dlxExchange | ||
})).then(function () {}); | ||
} | ||
}(); | ||
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
var deadLetter = (function ({ | ||
channel, | ||
targetQueue, | ||
dlxQueue, | ||
dlxExchange | ||
}) { | ||
try { | ||
const exchangeType = dlxExchange.type || 'direct'; | ||
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () { | ||
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({ | ||
queue: queueAsserted | ||
}) { | ||
var _ref, _targetQueue$options$, _targetQueue$options; | ||
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : ''; | ||
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
function _extends() { | ||
@@ -108,160 +275,729 @@ _extends = Object.assign || function (target) { | ||
var deadLetter = (function ({ | ||
channel, | ||
targetQueue, | ||
dlxQueue, | ||
dlxExchange | ||
}) { | ||
const amqpConnect = connectionString => { | ||
return amqplib.connect(connectionString); | ||
}; | ||
function _catch(body, recover) { | ||
try { | ||
const exchangeType = dlxExchange.type || 'direct'; | ||
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () { | ||
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({ | ||
queue: queueAsserted | ||
}) { | ||
var _ref, _targetQueue$options$, _targetQueue$options; | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : ''; | ||
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {}); | ||
}); | ||
}); | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class EventDispatcher { | ||
constructor({ | ||
config | ||
}) { | ||
const _this = this, | ||
_this2 = this, | ||
_this3 = this; | ||
this.asyncDispatch = function ({ | ||
eventConfig, | ||
messageBody | ||
}) { | ||
try { | ||
const { | ||
transport | ||
} = eventConfig; | ||
const { | ||
connectionString, | ||
exchange | ||
} = _this.config.transports[transport]; | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return _catch(function () { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
const { | ||
type: key | ||
} = messageBody.data; | ||
const message = JSON.stringify(messageBody); | ||
return Promise.resolve(producer({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
})).then(function () { | ||
connection.close(); | ||
return true; | ||
}); | ||
}); | ||
}, function () { | ||
connection.close(); | ||
return false; | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
this.asyncCommandDispatch = function ({ | ||
eventConfig, | ||
event | ||
}) { | ||
try { | ||
const [company, context, version,, entity, name] = event.getName().split('.'); | ||
const messageBody = createCommand({ | ||
payload: event.getPayload(), | ||
meta: { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} | ||
}); | ||
return Promise.resolve(_this2.asyncDispatch({ | ||
eventConfig, | ||
messageBody | ||
})); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
this.asyncEventdDispatch = function ({ | ||
eventConfig, | ||
event | ||
}) { | ||
try { | ||
const [company, context, version,, entity, name] = event.getName().split('.'); | ||
const messageBody = createEvent({ | ||
payload: event.getPayload(), | ||
meta: { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} | ||
}); | ||
return Promise.resolve(_this3.asyncDispatch({ | ||
eventConfig, | ||
messageBody | ||
})); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
this.config = config; | ||
} | ||
dispatch({ | ||
event | ||
}) { | ||
try { | ||
const _this4 = this; | ||
const eventClassname = event.constructor.name; | ||
const isCommand = /Command$/.test(eventClassname); | ||
const isEvent = /Event$/.test(eventClassname); | ||
const eventConfig = _this4.config.routing[eventClassname]; | ||
if (!eventConfig) { | ||
throw Error(`Missed event configuration for event ${eventClassname}`); | ||
} | ||
if (eventConfig.async === true || eventConfig.async === undefined) { | ||
if (isCommand) { | ||
_this4.asyncCommandDispatch({ | ||
eventConfig, | ||
event | ||
}); | ||
} | ||
if (isEvent) { | ||
_this4.asyncEventdDispatch({ | ||
eventConfig, | ||
event | ||
}); | ||
} | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
} | ||
function _catch$1(body, recover) { | ||
try { | ||
var result = body(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
return recover(e); | ||
} | ||
}); | ||
var worker = (function ({ | ||
channel, | ||
exchange, | ||
queue, | ||
consumer | ||
}) { | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class DomainEventEmitter extends events.EventEmitter { | ||
on(eventName, originalFunction) { | ||
const decoratedFunction = function ({ | ||
message, | ||
onSuccess, | ||
onError | ||
}) { | ||
try { | ||
const _temp = _catch$1(function () { | ||
return Promise.resolve(originalFunction(message)).then(function () { | ||
if (onSuccess) onSuccess(); | ||
}); | ||
}, function (error) { | ||
if (onError) onError({ | ||
error | ||
}); | ||
}); | ||
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
super.addListener(eventName, decoratedFunction); | ||
return this; | ||
} | ||
} | ||
function _catch$2(body, recover) { | ||
try { | ||
const { | ||
name: exchangeName, | ||
type: exchangeType = 'topic', | ||
options: exchangeOptions | ||
} = exchange; | ||
const { | ||
name: queueName, | ||
bindingKey, | ||
options: queueOptions, | ||
dlx = null | ||
} = queue; | ||
const { | ||
onMessage, | ||
options: consumerOptions | ||
} = consumer; | ||
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () { | ||
function _temp2() { | ||
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({ | ||
queue: assertedQueue | ||
}) { | ||
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () { | ||
return channel.consume(assertedQueue, onMessage, consumerOptions); | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class CommandEmitter extends events.EventEmitter { | ||
on(eventName, originalFunction) { | ||
const decoratedFunction = function ({ | ||
message, | ||
onSuccess, | ||
onError | ||
}) { | ||
try { | ||
const _temp = _catch$2(function () { | ||
return Promise.resolve(originalFunction(message)).then(function () { | ||
if (onSuccess) onSuccess(); | ||
}); | ||
}, function (error) { | ||
if (onError) onError({ | ||
error | ||
}); | ||
}); | ||
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
const _temp = function () { | ||
if (dlx !== null) { | ||
const { | ||
dlxQueue, | ||
dlxExchange | ||
} = dlx.params; | ||
return Promise.resolve(dlx.func({ | ||
channel, | ||
targetQueue: queue, | ||
dlxQueue, | ||
dlxExchange | ||
})).then(function () {}); | ||
super.addListener(eventName, decoratedFunction); | ||
return this; | ||
} | ||
} | ||
var domainEventSubscriber = { | ||
subscribe: function ({ | ||
emitter, | ||
eventDispatcher, | ||
eventName, | ||
commandsPath | ||
}) { | ||
try { | ||
commandsPath.forEach(function (commandPath) { | ||
try { | ||
return Promise.resolve(new Promise(function (resolve) { resolve(_interopNamespace(require(path.resolve(commandPath.trim())))); })).then(function ({ | ||
default: Command | ||
}) { | ||
emitter.on(eventName, message => { | ||
eventDispatcher.dispatch({ | ||
event: new Command({ | ||
message | ||
}) | ||
}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}(); | ||
}); | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
}; | ||
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
var commandSubscriber = { | ||
subscribe: function ({ | ||
emitter, | ||
eventName, | ||
handlerPath | ||
}) { | ||
try { | ||
return Promise.resolve(new Promise(function (resolve) { resolve(_interopNamespace(require(path.resolve(handlerPath.trim())))); })).then(function ({ | ||
default: CommandHandler | ||
}) { | ||
return Promise.resolve(new Promise(function (resolve) { resolve(_interopNamespace(require(path.resolve(handlerPath.replace('Handler', '').trim())))); })).then(function ({ | ||
default: Command | ||
}) { | ||
emitter.on(eventName, message => { | ||
const handler = new CommandHandler({ | ||
command: new Command({ | ||
message | ||
}) | ||
}); | ||
handler.handle(); | ||
}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
}); | ||
}; | ||
var producer = (function ({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
let commandEmitter = null; | ||
let domainEventEmitter = null; | ||
const getCommandEmitter = () => { | ||
return commandEmitter; | ||
}; | ||
const getDomainEventsEmitter = () => { | ||
return domainEventEmitter; | ||
}; | ||
const subscribersInitialization = function ({ | ||
subscribers, | ||
eventDispatcher, | ||
// eslint-disable-next-line no-shadow | ||
domainEventEmitter, | ||
// eslint-disable-next-line no-shadow | ||
commandEmitter | ||
}) { | ||
try { | ||
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({ | ||
exchange: exchangeAsserted | ||
}) { | ||
channel.publish(exchangeAsserted, key, Buffer.from(message)); | ||
return Promise.resolve(channel.close()).then(function () {}); | ||
}); | ||
function _temp3() { | ||
const _temp = function () { | ||
if (subscribers.commands) { | ||
return Promise.resolve(Promise.all(subscribers.commands.map(function ({ | ||
eventName, | ||
handlerPath | ||
}) { | ||
try { | ||
try { | ||
commandSubscriber.subscribe({ | ||
emitter: commandEmitter, | ||
eventName, | ||
handlerPath | ||
}); | ||
} catch (error) { | ||
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}))).then(function () {}); | ||
} | ||
}(); | ||
if (_temp && _temp.then) return _temp.then(function () {}); | ||
} | ||
const _temp2 = function () { | ||
if (subscribers.domainEvents) { | ||
return Promise.resolve(Promise.all(subscribers.domainEvents.map(function ({ | ||
eventName, | ||
commandsPath | ||
}) { | ||
try { | ||
try { | ||
domainEventSubscriber.subscribe({ | ||
eventDispatcher, | ||
emitter: domainEventEmitter, | ||
eventName, | ||
commandsPath | ||
}); | ||
} catch (error) { | ||
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}))).then(function () {}); | ||
} | ||
}(); | ||
return Promise.resolve(_temp2 && _temp2.then ? _temp2.then(_temp3) : _temp3(_temp2)); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
}; | ||
const parse = ({ | ||
payload, | ||
meta, | ||
type | ||
const pubSubInitialization = ({ | ||
config | ||
}) => { | ||
const eventDispatcher = new EventDispatcher({ | ||
config | ||
}); | ||
domainEventEmitter = new DomainEventEmitter(); | ||
commandEmitter = new CommandEmitter(); | ||
const { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} = meta; | ||
subscribers | ||
} = config; | ||
subscribersInitialization({ | ||
subscribers, | ||
eventDispatcher, | ||
domainEventEmitter, | ||
commandEmitter | ||
}); | ||
return { | ||
data: { | ||
messageId: uuid.v4(), | ||
occurredOn: Date.now(), | ||
attributes: payload, | ||
type: `${company}.${context}.${version}.${type}.${entity}.${name}` | ||
} | ||
eventDispatcher, | ||
domainEventEmitter, | ||
commandEmitter | ||
}; | ||
}; | ||
const createEvent = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'domain_event' | ||
}); | ||
return message; | ||
}; | ||
function _catch$3(body, recover) { | ||
try { | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
const createCommand = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'command' | ||
}); | ||
return message; | ||
}; | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
const toJSON = message => { | ||
return JSON.parse(message.content.toString()); | ||
}; | ||
return result; | ||
} | ||
const amqpConnect = connectionString => { | ||
return amqplib.connect(connectionString); | ||
}; | ||
class CommandConsumer { | ||
constructor({ | ||
config | ||
}) { | ||
this.transports = config.transports; | ||
} | ||
consume({ | ||
transport, | ||
queueName, | ||
prefetchValue, | ||
emitter = null | ||
}) { | ||
try { | ||
const _this = this; | ||
if (!_this.transports[transport]) { | ||
throw Error(`Transport ${transport} is not defined`); | ||
} | ||
const { | ||
connectionString, | ||
queues | ||
} = _this.transports[transport]; | ||
const queue = queues.find(({ | ||
name | ||
}) => name === queueName); | ||
if (!queue) { | ||
throw Error(`Queue ${queue} is not defined`); | ||
} | ||
const { | ||
retryPolicy = null | ||
} = queue; | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return _catch$3(function () { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
if (prefetchValue) { | ||
channel.prefetch(prefetchValue); | ||
} | ||
const commandEmitter = emitter === null ? getCommandEmitter() : emitter; | ||
const onMessage = _this.workable({ | ||
channel, | ||
emitter: commandEmitter, | ||
retryPolicy, | ||
queueName | ||
}); | ||
channel.consume(queueName, onMessage, { | ||
noAck: false | ||
}); | ||
}); | ||
}, function (error) { | ||
connection.close(); | ||
throw error; | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
workable({ | ||
channel, | ||
emitter, | ||
retryPolicy, | ||
queueName | ||
}) { | ||
return function (msg) { | ||
try { | ||
const message = toJSON(msg); | ||
const { | ||
data | ||
} = message; | ||
const { | ||
type: eventName | ||
} = data; | ||
try { | ||
const onError = () => { | ||
if (!retryPolicy) channel.nack(msg, false, false); | ||
const { | ||
maxRetries, | ||
delay, | ||
retryExchangeName = 'blom.retries.exchange', | ||
onRejected = null | ||
} = retryPolicy; | ||
retryable({ | ||
channel, | ||
message: msg, | ||
queue: { | ||
name: queueName | ||
}, | ||
retryExchange: { | ||
name: retryExchangeName | ||
}, | ||
maxRetries, | ||
delay, | ||
onRejected | ||
}); | ||
}; | ||
const onSuccess = () => { | ||
channel.ack(msg); | ||
}; | ||
emitter.emit(eventName, { | ||
message, | ||
onSuccess, | ||
onError | ||
}); | ||
} catch (error) { | ||
channel.nack(msg, false, false); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
} | ||
} | ||
function _catch$4(body, recover) { | ||
try { | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class DomainEventConsumer { | ||
constructor({ | ||
config | ||
}) { | ||
this.transports = config.transports; | ||
} | ||
consume({ | ||
transport, | ||
queueName, | ||
prefetchValue, | ||
emitter = null | ||
}) { | ||
try { | ||
const _this = this; | ||
if (!_this.transports[transport]) { | ||
throw Error(`Transport ${transport} is not defined`); | ||
} | ||
const { | ||
connectionString, | ||
queues | ||
} = _this.transports[transport]; | ||
const queue = queues.find(({ | ||
name | ||
}) => name === queueName); | ||
if (!queue) { | ||
throw Error(`Queue ${queue} is not defined`); | ||
} | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return _catch$4(function () { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
if (prefetchValue) { | ||
channel.prefetch(prefetchValue); | ||
} | ||
const domainEventsEmitter = emitter === null ? getDomainEventsEmitter() : emitter; | ||
const consumerData = { | ||
onMessage: _this.workable({ | ||
channel, | ||
emitter: domainEventsEmitter | ||
}), | ||
options: { | ||
noAck: false | ||
} | ||
}; | ||
channel.consume(queueName, consumerData.onMessage, consumerData.options); | ||
}); | ||
}, function (error) { | ||
connection.close(); | ||
throw error; | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
workable({ | ||
channel, | ||
emitter | ||
}) { | ||
return function (msg) { | ||
try { | ||
const message = toJSON(msg); | ||
const { | ||
data | ||
} = message; | ||
const { | ||
type: eventName | ||
} = data; | ||
try { | ||
emitter.emit(eventName, { | ||
message | ||
}); | ||
channel.ack(msg); | ||
} catch (error) { | ||
channel.nack(msg, false, false); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
} | ||
} | ||
var transportsInitialization = (function ({ | ||
config | ||
}) { | ||
try { | ||
const { | ||
transports | ||
} = config; | ||
Object.entries(transports).forEach(function ([, transport]) { | ||
try { | ||
const { | ||
connectionString, | ||
exchange, | ||
queues = [] | ||
} = transport; | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
const { | ||
name: exchangeName, | ||
type: exchangeType, | ||
options: exchangeOptions = {} | ||
} = exchange; | ||
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () { | ||
return Promise.resolve(Promise.all(queues.map(function ({ | ||
name: queueName, | ||
bindingKey = null, | ||
options: queueOptions = {} | ||
}) { | ||
try { | ||
const pattern = bindingKey || ''; | ||
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({ | ||
queue: assertedQueue | ||
}) { | ||
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, pattern)).then(function () {}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}))).then(function () { | ||
connection.close(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
exports.CommandConsumer = CommandConsumer; | ||
exports.CommandEmitter = CommandEmitter; | ||
exports.DomainEventConsumer = DomainEventConsumer; | ||
exports.DomainEventEmitter = DomainEventEmitter; | ||
exports.EventDispatcher = EventDispatcher; | ||
exports.amqpConnect = amqpConnect; | ||
exports.commandSubscriber = commandSubscriber; | ||
exports.createCommand = createCommand; | ||
exports.createEvent = createEvent; | ||
exports.deadLetter = deadLetter; | ||
exports.domainEventSubscriber = domainEventSubscriber; | ||
exports.producer = producer; | ||
exports.pubSubInitialization = pubSubInitialization; | ||
exports.retryable = retryable; | ||
exports.toJSON = toJSON; | ||
exports.transportsInitialization = transportsInitialization; | ||
exports.worker = worker; | ||
//# sourceMappingURL=main.js.map |
@@ -0,71 +1,67 @@ | ||
import { v4 } from 'uuid'; | ||
import { connect } from 'amqplib'; | ||
import { v4 } from 'uuid'; | ||
import { EventEmitter } from 'events'; | ||
import { resolve } from 'path'; | ||
const assertExchange = async ({ | ||
channel, | ||
retryExchange | ||
const parse = ({ | ||
payload, | ||
meta, | ||
type | ||
}) => { | ||
const { | ||
'arguments': extraArgs = {}, | ||
...options | ||
} = retryExchange.options || {}; | ||
const assertExchangeOptions = { | ||
durable: false, | ||
'arguments': { | ||
'x-delayed-type': 'direct', | ||
...extraArgs | ||
}, | ||
...options | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} = meta; | ||
return { | ||
data: { | ||
messageId: v4(), | ||
occurredOn: Date.now(), | ||
attributes: payload, | ||
type: `${company}.${context}.${version}.${type}.${entity}.${name}` | ||
} | ||
}; | ||
await channel.assertExchange(retryExchange.name, 'x-delayed-message', assertExchangeOptions); | ||
}; | ||
var retryable = (async ({ | ||
channel, | ||
message, | ||
queue, | ||
retryExchange, | ||
maxRetries: _maxRetries = 5, | ||
delay: _delay = 5000, | ||
onRejected | ||
const createEvent = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const { | ||
'x-retries': xRetries | ||
} = message.properties.headers; | ||
const retries = xRetries ?? 0; | ||
const nextDelay = (retries + 1) * _delay; | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'domain_event' | ||
}); | ||
return message; | ||
}; | ||
if (retries < _maxRetries) { | ||
await assertExchange({ | ||
channel, | ||
retryExchange | ||
}); | ||
const retryRoutingKey = queue.bindingKey ?? message.fields.routingKey; | ||
await channel.bindQueue(queue.name, retryExchange.name, retryRoutingKey); | ||
channel.publish(retryExchange.name, retryRoutingKey, Buffer.from(message.content.toString()), { | ||
headers: { | ||
'x-retries': retries + 1, | ||
'x-delay': nextDelay | ||
} | ||
}); | ||
channel.ack(message); | ||
} else { | ||
if (typeof onRejected === 'function') onRejected(message); | ||
channel.nack(message, false, false); | ||
} | ||
}); | ||
const createCommand = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'command' | ||
}); | ||
return message; | ||
}; | ||
var deadLetter = (async ({ | ||
const toJSON = message => { | ||
return JSON.parse(message.content.toString()); | ||
}; | ||
var producer = (async ({ | ||
channel, | ||
targetQueue, | ||
dlxQueue, | ||
dlxExchange | ||
message, | ||
key, | ||
exchange | ||
}) => { | ||
const exchangeType = dlxExchange.type || 'direct'; | ||
await channel.assertExchange(dlxExchange.name, exchangeType); | ||
const { | ||
queue: queueAsserted | ||
} = await channel.assertQueue(dlxQueue.name); | ||
const bindingKey = targetQueue.options?.deadLetterRoutingKey ?? dlxQueue.bindingKey ?? ''; | ||
await channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey); | ||
exchange: exchangeAsserted | ||
} = await channel.assertExchange(exchange.name, exchange.type, exchange.options); | ||
channel.publish(exchangeAsserted, key, Buffer.from(message)); | ||
await channel.close(); | ||
}); | ||
@@ -116,70 +112,590 @@ | ||
var producer = (async ({ | ||
var deadLetter = (async ({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
targetQueue, | ||
dlxQueue, | ||
dlxExchange | ||
}) => { | ||
const exchangeType = dlxExchange.type || 'direct'; | ||
await channel.assertExchange(dlxExchange.name, exchangeType); | ||
const { | ||
exchange: exchangeAsserted | ||
} = await channel.assertExchange(exchange.name, exchange.type, exchange.options); | ||
channel.publish(exchangeAsserted, key, Buffer.from(message)); | ||
await channel.close(); | ||
queue: queueAsserted | ||
} = await channel.assertQueue(dlxQueue.name); | ||
const bindingKey = targetQueue.options?.deadLetterRoutingKey ?? dlxQueue.bindingKey ?? ''; | ||
await channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey); | ||
}); | ||
const parse = ({ | ||
payload, | ||
meta, | ||
type | ||
const assertExchange = async ({ | ||
channel, | ||
retryExchange | ||
}) => { | ||
const { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} = meta; | ||
return { | ||
data: { | ||
messageId: v4(), | ||
occurredOn: Date.now(), | ||
attributes: payload, | ||
type: `${company}.${context}.${version}.${type}.${entity}.${name}` | ||
} | ||
'arguments': extraArgs = {}, | ||
...options | ||
} = retryExchange.options || {}; | ||
const assertExchangeOptions = { | ||
durable: false, | ||
'arguments': { | ||
'x-delayed-type': 'direct', | ||
...extraArgs | ||
}, | ||
...options | ||
}; | ||
await channel.assertExchange(retryExchange.name, 'x-delayed-message', assertExchangeOptions); | ||
}; | ||
const createEvent = ({ | ||
payload, | ||
meta | ||
var retryable = (async ({ | ||
channel, | ||
message, | ||
queue, | ||
retryExchange, | ||
maxRetries: _maxRetries = 5, | ||
delay: _delay = 5000, | ||
onRejected | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'domain_event' | ||
}); | ||
return message; | ||
const { | ||
'x-retries': xRetries | ||
} = message.properties.headers; | ||
const retries = xRetries ?? 0; | ||
const nextDelay = (retries + 1) * _delay; | ||
if (retries < _maxRetries) { | ||
await assertExchange({ | ||
channel, | ||
retryExchange | ||
}); | ||
const retryRoutingKey = queue.bindingKey ?? message.fields.routingKey; | ||
await channel.bindQueue(queue.name, retryExchange.name, retryRoutingKey); | ||
channel.publish(retryExchange.name, retryRoutingKey, Buffer.from(message.content.toString()), { | ||
headers: { | ||
'x-retries': retries + 1, | ||
'x-delay': nextDelay | ||
} | ||
}); | ||
channel.ack(message); | ||
} else { | ||
if (typeof onRejected === 'function') onRejected(message); | ||
channel.nack(message, false, false); | ||
} | ||
}); | ||
const amqpConnect = connectionString => { | ||
return connect(connectionString); | ||
}; | ||
const createCommand = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'command' | ||
}); | ||
return message; | ||
class EventDispatcher { | ||
constructor({ | ||
config | ||
}) { | ||
var _this = this; | ||
this.asyncDispatch = async function ({ | ||
eventConfig, | ||
messageBody | ||
}) { | ||
const { | ||
transport | ||
} = eventConfig; | ||
const { | ||
connectionString, | ||
exchange | ||
} = _this.config.transports[transport]; | ||
const connection = await amqpConnect(connectionString); | ||
try { | ||
const channel = await connection.createChannel(); | ||
const { | ||
type: key | ||
} = messageBody.data; | ||
const message = JSON.stringify(messageBody); | ||
await producer({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
}); | ||
connection.close(); | ||
return true; | ||
} catch (error) { | ||
connection.close(); | ||
return false; | ||
} | ||
}; | ||
this.asyncCommandDispatch = async function ({ | ||
eventConfig, | ||
event | ||
}) { | ||
const [company, context, version,, entity, name] = event.getName().split('.'); | ||
const messageBody = createCommand({ | ||
payload: event.getPayload(), | ||
meta: { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} | ||
}); | ||
return _this.asyncDispatch({ | ||
eventConfig, | ||
messageBody | ||
}); | ||
}; | ||
this.asyncEventdDispatch = async function ({ | ||
eventConfig, | ||
event | ||
}) { | ||
const [company, context, version,, entity, name] = event.getName().split('.'); | ||
const messageBody = createEvent({ | ||
payload: event.getPayload(), | ||
meta: { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} | ||
}); | ||
return _this.asyncDispatch({ | ||
eventConfig, | ||
messageBody | ||
}); | ||
}; | ||
this.config = config; | ||
} | ||
async dispatch({ | ||
event | ||
}) { | ||
const eventClassname = event.constructor.name; | ||
const isCommand = /Command$/.test(eventClassname); | ||
const isEvent = /Event$/.test(eventClassname); | ||
const eventConfig = this.config.routing[eventClassname]; | ||
if (!eventConfig) { | ||
throw Error(`Missed event configuration for event ${eventClassname}`); | ||
} | ||
if (eventConfig.async === true || eventConfig.async === undefined) { | ||
if (isCommand) { | ||
this.asyncCommandDispatch({ | ||
eventConfig, | ||
event | ||
}); | ||
} | ||
if (isEvent) { | ||
this.asyncEventdDispatch({ | ||
eventConfig, | ||
event | ||
}); | ||
} | ||
} | ||
} | ||
} | ||
class DomainEventEmitter extends EventEmitter { | ||
on(eventName, originalFunction) { | ||
const decoratedFunction = async function ({ | ||
message, | ||
onSuccess, | ||
onError | ||
}) { | ||
try { | ||
await originalFunction(message); | ||
if (onSuccess) onSuccess(); | ||
} catch (error) { | ||
if (onError) onError({ | ||
error | ||
}); | ||
} | ||
}; | ||
super.addListener(eventName, decoratedFunction); | ||
return this; | ||
} | ||
} | ||
class CommandEmitter extends EventEmitter { | ||
on(eventName, originalFunction) { | ||
const decoratedFunction = async function ({ | ||
message, | ||
onSuccess, | ||
onError | ||
}) { | ||
try { | ||
await originalFunction(message); | ||
if (onSuccess) onSuccess(); | ||
} catch (error) { | ||
if (onError) onError({ | ||
error | ||
}); | ||
} | ||
}; | ||
super.addListener(eventName, decoratedFunction); | ||
return this; | ||
} | ||
} | ||
var domainEventSubscriber = { | ||
async subscribe({ | ||
emitter, | ||
eventDispatcher, | ||
eventName, | ||
commandsPath | ||
}) { | ||
commandsPath.forEach(async commandPath => { | ||
const { | ||
default: Command | ||
} = await import(resolve(commandPath.trim())); | ||
emitter.on(eventName, message => { | ||
eventDispatcher.dispatch({ | ||
event: new Command({ | ||
message | ||
}) | ||
}); | ||
}); | ||
}); | ||
} | ||
}; | ||
const toJSON = message => { | ||
return JSON.parse(message.content.toString()); | ||
var commandSubscriber = { | ||
async subscribe({ | ||
emitter, | ||
eventName, | ||
handlerPath | ||
}) { | ||
const { | ||
default: CommandHandler | ||
} = await import(resolve(handlerPath.trim())); | ||
const { | ||
default: Command | ||
} = await import(resolve(handlerPath.replace('Handler', '').trim())); | ||
emitter.on(eventName, message => { | ||
const handler = new CommandHandler({ | ||
command: new Command({ | ||
message | ||
}) | ||
}); | ||
handler.handle(); | ||
}); | ||
} | ||
}; | ||
const amqpConnect = connectionString => { | ||
return connect(connectionString); | ||
let commandEmitter = null; | ||
let domainEventEmitter = null; | ||
const getCommandEmitter = () => { | ||
return commandEmitter; | ||
}; | ||
const getDomainEventsEmitter = () => { | ||
return domainEventEmitter; | ||
}; | ||
export { amqpConnect, createCommand, createEvent, deadLetter, producer, retryable, toJSON, worker }; | ||
const subscribersInitialization = async ({ | ||
subscribers, | ||
eventDispatcher, | ||
// eslint-disable-next-line no-shadow | ||
domainEventEmitter, | ||
// eslint-disable-next-line no-shadow | ||
commandEmitter | ||
}) => { | ||
if (subscribers.domainEvents) { | ||
await Promise.all(subscribers.domainEvents.map(async ({ | ||
eventName, | ||
commandsPath | ||
}) => { | ||
try { | ||
domainEventSubscriber.subscribe({ | ||
eventDispatcher, | ||
emitter: domainEventEmitter, | ||
eventName, | ||
commandsPath | ||
}); | ||
} catch (error) { | ||
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`); | ||
} | ||
})); | ||
} | ||
if (subscribers.commands) { | ||
await Promise.all(subscribers.commands.map(async ({ | ||
eventName, | ||
handlerPath | ||
}) => { | ||
try { | ||
commandSubscriber.subscribe({ | ||
emitter: commandEmitter, | ||
eventName, | ||
handlerPath | ||
}); | ||
} catch (error) { | ||
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`); | ||
} | ||
})); | ||
} | ||
}; | ||
const pubSubInitialization = ({ | ||
config | ||
}) => { | ||
const eventDispatcher = new EventDispatcher({ | ||
config | ||
}); | ||
domainEventEmitter = new DomainEventEmitter(); | ||
commandEmitter = new CommandEmitter(); | ||
const { | ||
subscribers | ||
} = config; | ||
subscribersInitialization({ | ||
subscribers, | ||
eventDispatcher, | ||
domainEventEmitter, | ||
commandEmitter | ||
}); | ||
return { | ||
eventDispatcher, | ||
domainEventEmitter, | ||
commandEmitter | ||
}; | ||
}; | ||
class CommandConsumer { | ||
constructor({ | ||
config | ||
}) { | ||
this.transports = config.transports; | ||
} | ||
async consume({ | ||
transport, | ||
queueName, | ||
prefetchValue, | ||
emitter = null | ||
}) { | ||
if (!this.transports[transport]) { | ||
throw Error(`Transport ${transport} is not defined`); | ||
} | ||
const { | ||
connectionString, | ||
queues | ||
} = this.transports[transport]; | ||
const queue = queues.find(({ | ||
name | ||
}) => name === queueName); | ||
if (!queue) { | ||
throw Error(`Queue ${queue} is not defined`); | ||
} | ||
const { | ||
retryPolicy = null | ||
} = queue; | ||
const connection = await amqpConnect(connectionString); | ||
try { | ||
const channel = await connection.createChannel(); | ||
if (prefetchValue) { | ||
channel.prefetch(prefetchValue); | ||
} | ||
const commandEmitter = emitter === null ? getCommandEmitter() : emitter; | ||
const onMessage = this.workable({ | ||
channel, | ||
emitter: commandEmitter, | ||
retryPolicy, | ||
queueName | ||
}); | ||
channel.consume(queueName, onMessage, { | ||
noAck: false | ||
}); | ||
} catch (error) { | ||
connection.close(); | ||
throw error; | ||
} | ||
} | ||
workable({ | ||
channel, | ||
emitter, | ||
retryPolicy, | ||
queueName | ||
}) { | ||
return async function (msg) { | ||
const message = toJSON(msg); | ||
const { | ||
data | ||
} = message; | ||
const { | ||
type: eventName | ||
} = data; | ||
try { | ||
const onError = () => { | ||
if (!retryPolicy) channel.nack(msg, false, false); | ||
const { | ||
maxRetries, | ||
delay, | ||
retryExchangeName = 'blom.retries.exchange', | ||
onRejected = null | ||
} = retryPolicy; | ||
retryable({ | ||
channel, | ||
message: msg, | ||
queue: { | ||
name: queueName | ||
}, | ||
retryExchange: { | ||
name: retryExchangeName | ||
}, | ||
maxRetries, | ||
delay, | ||
onRejected | ||
}); | ||
}; | ||
const onSuccess = () => { | ||
channel.ack(msg); | ||
}; | ||
emitter.emit(eventName, { | ||
message, | ||
onSuccess, | ||
onError | ||
}); | ||
} catch (error) { | ||
channel.nack(msg, false, false); | ||
} | ||
}; | ||
} | ||
} | ||
class DomainEventConsumer { | ||
constructor({ | ||
config | ||
}) { | ||
this.transports = config.transports; | ||
} | ||
async consume({ | ||
transport, | ||
queueName, | ||
prefetchValue, | ||
emitter = null | ||
}) { | ||
if (!this.transports[transport]) { | ||
throw Error(`Transport ${transport} is not defined`); | ||
} | ||
const { | ||
connectionString, | ||
queues | ||
} = this.transports[transport]; | ||
const queue = queues.find(({ | ||
name | ||
}) => name === queueName); | ||
if (!queue) { | ||
throw Error(`Queue ${queue} is not defined`); | ||
} | ||
const connection = await amqpConnect(connectionString); | ||
try { | ||
const channel = await connection.createChannel(); | ||
if (prefetchValue) { | ||
channel.prefetch(prefetchValue); | ||
} | ||
const domainEventsEmitter = emitter === null ? getDomainEventsEmitter() : emitter; | ||
const consumerData = { | ||
onMessage: this.workable({ | ||
channel, | ||
emitter: domainEventsEmitter | ||
}), | ||
options: { | ||
noAck: false | ||
} | ||
}; | ||
channel.consume(queueName, consumerData.onMessage, consumerData.options); | ||
} catch (error) { | ||
connection.close(); | ||
throw error; | ||
} | ||
} | ||
workable({ | ||
channel, | ||
emitter | ||
}) { | ||
return async function (msg) { | ||
const message = toJSON(msg); | ||
const { | ||
data | ||
} = message; | ||
const { | ||
type: eventName | ||
} = data; | ||
try { | ||
emitter.emit(eventName, { | ||
message | ||
}); | ||
channel.ack(msg); | ||
} catch (error) { | ||
channel.nack(msg, false, false); | ||
} | ||
}; | ||
} | ||
} | ||
var transportsInitialization = (async ({ | ||
config | ||
}) => { | ||
const { | ||
transports | ||
} = config; | ||
Object.entries(transports).forEach(async ([, transport]) => { | ||
const { | ||
connectionString, | ||
exchange, | ||
queues = [] | ||
} = transport; | ||
const connection = await amqpConnect(connectionString); | ||
const channel = await connection.createChannel(); | ||
const { | ||
name: exchangeName, | ||
type: exchangeType, | ||
options: exchangeOptions = {} | ||
} = exchange; | ||
await channel.assertExchange(exchangeName, exchangeType, exchangeOptions); | ||
await Promise.all(queues.map(async ({ | ||
name: queueName, | ||
bindingKey: _bindingKey = null, | ||
options: queueOptions = {} | ||
}) => { | ||
const pattern = _bindingKey || ''; | ||
const { | ||
queue: assertedQueue | ||
} = await channel.assertQueue(queueName, queueOptions); | ||
await channel.bindQueue(assertedQueue, exchangeName, pattern); | ||
})); | ||
connection.close(); | ||
}); | ||
}); | ||
export { CommandConsumer, CommandEmitter, DomainEventConsumer, DomainEventEmitter, EventDispatcher, amqpConnect, commandSubscriber, createCommand, createEvent, deadLetter, domainEventSubscriber, producer, pubSubInitialization, retryable, toJSON, transportsInitialization, worker }; | ||
//# sourceMappingURL=main.modern.js.map |
(function (global, factory) { | ||
typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports, require('amqplib'), require('uuid')) : | ||
typeof define === 'function' && define.amd ? define(['exports', 'amqplib', 'uuid'], factory) : | ||
(global = global || self, factory(global.nodejsAmqplib = {}, global.amqplib, global.uuid)); | ||
}(this, (function (exports, amqplib, uuid) { | ||
typeof exports === 'object' && typeof module !== 'undefined' ? factory(exports, require('uuid'), require('amqplib'), require('events'), require('path')) : | ||
typeof define === 'function' && define.amd ? define(['exports', 'uuid', 'amqplib', 'events', 'path'], factory) : | ||
(global = global || self, factory(global.nodejsAmqplib = {}, global.uuid, global.amqplib, global.events, global.path)); | ||
}(this, (function (exports, uuid, amqplib, events, path) { | ||
const parse = ({ | ||
payload, | ||
meta, | ||
type | ||
}) => { | ||
const { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} = meta; | ||
return { | ||
data: { | ||
messageId: uuid.v4(), | ||
occurredOn: Date.now(), | ||
attributes: payload, | ||
type: `${company}.${context}.${version}.${type}.${entity}.${name}` | ||
} | ||
}; | ||
}; | ||
const createEvent = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'domain_event' | ||
}); | ||
return message; | ||
}; | ||
const createCommand = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'command' | ||
}); | ||
return message; | ||
}; | ||
const toJSON = message => { | ||
return JSON.parse(message.content.toString()); | ||
}; | ||
var producer = (function ({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
}) { | ||
try { | ||
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({ | ||
exchange: exchangeAsserted | ||
}) { | ||
channel.publish(exchangeAsserted, key, Buffer.from(message)); | ||
return Promise.resolve(channel.close()).then(function () {}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
var worker = (function ({ | ||
channel, | ||
exchange, | ||
queue, | ||
consumer | ||
}) { | ||
try { | ||
const { | ||
name: exchangeName, | ||
type: exchangeType = 'topic', | ||
options: exchangeOptions | ||
} = exchange; | ||
const { | ||
name: queueName, | ||
bindingKey, | ||
options: queueOptions, | ||
dlx = null | ||
} = queue; | ||
const { | ||
onMessage, | ||
options: consumerOptions | ||
} = consumer; | ||
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () { | ||
function _temp2() { | ||
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({ | ||
queue: assertedQueue | ||
}) { | ||
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () { | ||
return channel.consume(assertedQueue, onMessage, consumerOptions); | ||
}); | ||
}); | ||
} | ||
const _temp = function () { | ||
if (dlx !== null) { | ||
const { | ||
dlxQueue, | ||
dlxExchange | ||
} = dlx.params; | ||
return Promise.resolve(dlx.func({ | ||
channel, | ||
targetQueue: queue, | ||
dlxQueue, | ||
dlxExchange | ||
})).then(function () {}); | ||
} | ||
}(); | ||
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
var deadLetter = (function ({ | ||
channel, | ||
targetQueue, | ||
dlxQueue, | ||
dlxExchange | ||
}) { | ||
try { | ||
const exchangeType = dlxExchange.type || 'direct'; | ||
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () { | ||
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({ | ||
queue: queueAsserted | ||
}) { | ||
var _ref, _targetQueue$options$, _targetQueue$options; | ||
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : ''; | ||
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
function _extends() { | ||
@@ -110,159 +256,728 @@ _extends = Object.assign || function (target) { | ||
var deadLetter = (function ({ | ||
channel, | ||
targetQueue, | ||
dlxQueue, | ||
dlxExchange | ||
}) { | ||
const amqpConnect = connectionString => { | ||
return amqplib.connect(connectionString); | ||
}; | ||
function _catch(body, recover) { | ||
try { | ||
const exchangeType = dlxExchange.type || 'direct'; | ||
return Promise.resolve(channel.assertExchange(dlxExchange.name, exchangeType)).then(function () { | ||
return Promise.resolve(channel.assertQueue(dlxQueue.name)).then(function ({ | ||
queue: queueAsserted | ||
}) { | ||
var _ref, _targetQueue$options$, _targetQueue$options; | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
const bindingKey = (_ref = (_targetQueue$options$ = (_targetQueue$options = targetQueue.options) == null ? void 0 : _targetQueue$options.deadLetterRoutingKey) != null ? _targetQueue$options$ : dlxQueue.bindingKey) != null ? _ref : ''; | ||
return Promise.resolve(channel.bindQueue(queueAsserted, dlxExchange.name, bindingKey)).then(function () {}); | ||
}); | ||
}); | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class EventDispatcher { | ||
constructor({ | ||
config | ||
}) { | ||
const _this = this, | ||
_this2 = this, | ||
_this3 = this; | ||
this.asyncDispatch = function ({ | ||
eventConfig, | ||
messageBody | ||
}) { | ||
try { | ||
const { | ||
transport | ||
} = eventConfig; | ||
const { | ||
connectionString, | ||
exchange | ||
} = _this.config.transports[transport]; | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return _catch(function () { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
const { | ||
type: key | ||
} = messageBody.data; | ||
const message = JSON.stringify(messageBody); | ||
return Promise.resolve(producer({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
})).then(function () { | ||
connection.close(); | ||
return true; | ||
}); | ||
}); | ||
}, function () { | ||
connection.close(); | ||
return false; | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
this.asyncCommandDispatch = function ({ | ||
eventConfig, | ||
event | ||
}) { | ||
try { | ||
const [company, context, version,, entity, name] = event.getName().split('.'); | ||
const messageBody = createCommand({ | ||
payload: event.getPayload(), | ||
meta: { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} | ||
}); | ||
return Promise.resolve(_this2.asyncDispatch({ | ||
eventConfig, | ||
messageBody | ||
})); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
this.asyncEventdDispatch = function ({ | ||
eventConfig, | ||
event | ||
}) { | ||
try { | ||
const [company, context, version,, entity, name] = event.getName().split('.'); | ||
const messageBody = createEvent({ | ||
payload: event.getPayload(), | ||
meta: { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} | ||
}); | ||
return Promise.resolve(_this3.asyncDispatch({ | ||
eventConfig, | ||
messageBody | ||
})); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
this.config = config; | ||
} | ||
dispatch({ | ||
event | ||
}) { | ||
try { | ||
const _this4 = this; | ||
const eventClassname = event.constructor.name; | ||
const isCommand = /Command$/.test(eventClassname); | ||
const isEvent = /Event$/.test(eventClassname); | ||
const eventConfig = _this4.config.routing[eventClassname]; | ||
if (!eventConfig) { | ||
throw Error(`Missed event configuration for event ${eventClassname}`); | ||
} | ||
if (eventConfig.async === true || eventConfig.async === undefined) { | ||
if (isCommand) { | ||
_this4.asyncCommandDispatch({ | ||
eventConfig, | ||
event | ||
}); | ||
} | ||
if (isEvent) { | ||
_this4.asyncEventdDispatch({ | ||
eventConfig, | ||
event | ||
}); | ||
} | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
} | ||
function _catch$1(body, recover) { | ||
try { | ||
var result = body(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
return recover(e); | ||
} | ||
}); | ||
var worker = (function ({ | ||
channel, | ||
exchange, | ||
queue, | ||
consumer | ||
}) { | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class DomainEventEmitter extends events.EventEmitter { | ||
on(eventName, originalFunction) { | ||
const decoratedFunction = function ({ | ||
message, | ||
onSuccess, | ||
onError | ||
}) { | ||
try { | ||
const _temp = _catch$1(function () { | ||
return Promise.resolve(originalFunction(message)).then(function () { | ||
if (onSuccess) onSuccess(); | ||
}); | ||
}, function (error) { | ||
if (onError) onError({ | ||
error | ||
}); | ||
}); | ||
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
super.addListener(eventName, decoratedFunction); | ||
return this; | ||
} | ||
} | ||
function _catch$2(body, recover) { | ||
try { | ||
const { | ||
name: exchangeName, | ||
type: exchangeType = 'topic', | ||
options: exchangeOptions | ||
} = exchange; | ||
const { | ||
name: queueName, | ||
bindingKey, | ||
options: queueOptions, | ||
dlx = null | ||
} = queue; | ||
const { | ||
onMessage, | ||
options: consumerOptions | ||
} = consumer; | ||
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () { | ||
function _temp2() { | ||
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({ | ||
queue: assertedQueue | ||
}) { | ||
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, bindingKey)).then(function () { | ||
return channel.consume(assertedQueue, onMessage, consumerOptions); | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class CommandEmitter extends events.EventEmitter { | ||
on(eventName, originalFunction) { | ||
const decoratedFunction = function ({ | ||
message, | ||
onSuccess, | ||
onError | ||
}) { | ||
try { | ||
const _temp = _catch$2(function () { | ||
return Promise.resolve(originalFunction(message)).then(function () { | ||
if (onSuccess) onSuccess(); | ||
}); | ||
}, function (error) { | ||
if (onError) onError({ | ||
error | ||
}); | ||
}); | ||
return Promise.resolve(_temp && _temp.then ? _temp.then(function () {}) : void 0); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
const _temp = function () { | ||
if (dlx !== null) { | ||
const { | ||
dlxQueue, | ||
dlxExchange | ||
} = dlx.params; | ||
return Promise.resolve(dlx.func({ | ||
channel, | ||
targetQueue: queue, | ||
dlxQueue, | ||
dlxExchange | ||
})).then(function () {}); | ||
super.addListener(eventName, decoratedFunction); | ||
return this; | ||
} | ||
} | ||
var domainEventSubscriber = { | ||
subscribe: function ({ | ||
emitter, | ||
eventDispatcher, | ||
eventName, | ||
commandsPath | ||
}) { | ||
try { | ||
commandsPath.forEach(function (commandPath) { | ||
try { | ||
return Promise.resolve(import(path.resolve(commandPath.trim()))).then(function ({ | ||
default: Command | ||
}) { | ||
emitter.on(eventName, message => { | ||
eventDispatcher.dispatch({ | ||
event: new Command({ | ||
message | ||
}) | ||
}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}(); | ||
}); | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
}; | ||
return _temp && _temp.then ? _temp.then(_temp2) : _temp2(_temp); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
var commandSubscriber = { | ||
subscribe: function ({ | ||
emitter, | ||
eventName, | ||
handlerPath | ||
}) { | ||
try { | ||
return Promise.resolve(import(path.resolve(handlerPath.trim()))).then(function ({ | ||
default: CommandHandler | ||
}) { | ||
return Promise.resolve(import(path.resolve(handlerPath.replace('Handler', '').trim()))).then(function ({ | ||
default: Command | ||
}) { | ||
emitter.on(eventName, message => { | ||
const handler = new CommandHandler({ | ||
command: new Command({ | ||
message | ||
}) | ||
}); | ||
handler.handle(); | ||
}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
}); | ||
}; | ||
var producer = (function ({ | ||
channel, | ||
message, | ||
key, | ||
exchange | ||
let commandEmitter = null; | ||
let domainEventEmitter = null; | ||
const getCommandEmitter = () => { | ||
return commandEmitter; | ||
}; | ||
const getDomainEventsEmitter = () => { | ||
return domainEventEmitter; | ||
}; | ||
const subscribersInitialization = function ({ | ||
subscribers, | ||
eventDispatcher, | ||
// eslint-disable-next-line no-shadow | ||
domainEventEmitter, | ||
// eslint-disable-next-line no-shadow | ||
commandEmitter | ||
}) { | ||
try { | ||
return Promise.resolve(channel.assertExchange(exchange.name, exchange.type, exchange.options)).then(function ({ | ||
exchange: exchangeAsserted | ||
}) { | ||
channel.publish(exchangeAsserted, key, Buffer.from(message)); | ||
return Promise.resolve(channel.close()).then(function () {}); | ||
}); | ||
function _temp3() { | ||
const _temp = function () { | ||
if (subscribers.commands) { | ||
return Promise.resolve(Promise.all(subscribers.commands.map(function ({ | ||
eventName, | ||
handlerPath | ||
}) { | ||
try { | ||
try { | ||
commandSubscriber.subscribe({ | ||
emitter: commandEmitter, | ||
eventName, | ||
handlerPath | ||
}); | ||
} catch (error) { | ||
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}))).then(function () {}); | ||
} | ||
}(); | ||
if (_temp && _temp.then) return _temp.then(function () {}); | ||
} | ||
const _temp2 = function () { | ||
if (subscribers.domainEvents) { | ||
return Promise.resolve(Promise.all(subscribers.domainEvents.map(function ({ | ||
eventName, | ||
commandsPath | ||
}) { | ||
try { | ||
try { | ||
domainEventSubscriber.subscribe({ | ||
eventDispatcher, | ||
emitter: domainEventEmitter, | ||
eventName, | ||
commandsPath | ||
}); | ||
} catch (error) { | ||
throw new Error(`Subscriber with event '${eventName}' not found. Error ${error.message}`); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}))).then(function () {}); | ||
} | ||
}(); | ||
return Promise.resolve(_temp2 && _temp2.then ? _temp2.then(_temp3) : _temp3(_temp2)); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
}; | ||
const parse = ({ | ||
payload, | ||
meta, | ||
type | ||
const pubSubInitialization = ({ | ||
config | ||
}) => { | ||
const eventDispatcher = new EventDispatcher({ | ||
config | ||
}); | ||
domainEventEmitter = new DomainEventEmitter(); | ||
commandEmitter = new CommandEmitter(); | ||
const { | ||
company, | ||
context, | ||
version, | ||
entity, | ||
name | ||
} = meta; | ||
subscribers | ||
} = config; | ||
subscribersInitialization({ | ||
subscribers, | ||
eventDispatcher, | ||
domainEventEmitter, | ||
commandEmitter | ||
}); | ||
return { | ||
data: { | ||
messageId: uuid.v4(), | ||
occurredOn: Date.now(), | ||
attributes: payload, | ||
type: `${company}.${context}.${version}.${type}.${entity}.${name}` | ||
} | ||
eventDispatcher, | ||
domainEventEmitter, | ||
commandEmitter | ||
}; | ||
}; | ||
const createEvent = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'domain_event' | ||
}); | ||
return message; | ||
}; | ||
function _catch$3(body, recover) { | ||
try { | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
const createCommand = ({ | ||
payload, | ||
meta | ||
}) => { | ||
const message = parse({ | ||
payload, | ||
meta, | ||
type: 'command' | ||
}); | ||
return message; | ||
}; | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
const toJSON = message => { | ||
return JSON.parse(message.content.toString()); | ||
}; | ||
return result; | ||
} | ||
const amqpConnect = connectionString => { | ||
return amqplib.connect(connectionString); | ||
}; | ||
class CommandConsumer { | ||
constructor({ | ||
config | ||
}) { | ||
this.transports = config.transports; | ||
} | ||
consume({ | ||
transport, | ||
queueName, | ||
prefetchValue, | ||
emitter = null | ||
}) { | ||
try { | ||
const _this = this; | ||
if (!_this.transports[transport]) { | ||
throw Error(`Transport ${transport} is not defined`); | ||
} | ||
const { | ||
connectionString, | ||
queues | ||
} = _this.transports[transport]; | ||
const queue = queues.find(({ | ||
name | ||
}) => name === queueName); | ||
if (!queue) { | ||
throw Error(`Queue ${queue} is not defined`); | ||
} | ||
const { | ||
retryPolicy = null | ||
} = queue; | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return _catch$3(function () { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
if (prefetchValue) { | ||
channel.prefetch(prefetchValue); | ||
} | ||
const commandEmitter = emitter === null ? getCommandEmitter() : emitter; | ||
const onMessage = _this.workable({ | ||
channel, | ||
emitter: commandEmitter, | ||
retryPolicy, | ||
queueName | ||
}); | ||
channel.consume(queueName, onMessage, { | ||
noAck: false | ||
}); | ||
}); | ||
}, function (error) { | ||
connection.close(); | ||
throw error; | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
workable({ | ||
channel, | ||
emitter, | ||
retryPolicy, | ||
queueName | ||
}) { | ||
return function (msg) { | ||
try { | ||
const message = toJSON(msg); | ||
const { | ||
data | ||
} = message; | ||
const { | ||
type: eventName | ||
} = data; | ||
try { | ||
const onError = () => { | ||
if (!retryPolicy) channel.nack(msg, false, false); | ||
const { | ||
maxRetries, | ||
delay, | ||
retryExchangeName = 'blom.retries.exchange', | ||
onRejected = null | ||
} = retryPolicy; | ||
retryable({ | ||
channel, | ||
message: msg, | ||
queue: { | ||
name: queueName | ||
}, | ||
retryExchange: { | ||
name: retryExchangeName | ||
}, | ||
maxRetries, | ||
delay, | ||
onRejected | ||
}); | ||
}; | ||
const onSuccess = () => { | ||
channel.ack(msg); | ||
}; | ||
emitter.emit(eventName, { | ||
message, | ||
onSuccess, | ||
onError | ||
}); | ||
} catch (error) { | ||
channel.nack(msg, false, false); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
} | ||
} | ||
function _catch$4(body, recover) { | ||
try { | ||
var result = body(); | ||
} catch (e) { | ||
return recover(e); | ||
} | ||
if (result && result.then) { | ||
return result.then(void 0, recover); | ||
} | ||
return result; | ||
} | ||
class DomainEventConsumer { | ||
constructor({ | ||
config | ||
}) { | ||
this.transports = config.transports; | ||
} | ||
consume({ | ||
transport, | ||
queueName, | ||
prefetchValue, | ||
emitter = null | ||
}) { | ||
try { | ||
const _this = this; | ||
if (!_this.transports[transport]) { | ||
throw Error(`Transport ${transport} is not defined`); | ||
} | ||
const { | ||
connectionString, | ||
queues | ||
} = _this.transports[transport]; | ||
const queue = queues.find(({ | ||
name | ||
}) => name === queueName); | ||
if (!queue) { | ||
throw Error(`Queue ${queue} is not defined`); | ||
} | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return _catch$4(function () { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
if (prefetchValue) { | ||
channel.prefetch(prefetchValue); | ||
} | ||
const domainEventsEmitter = emitter === null ? getDomainEventsEmitter() : emitter; | ||
const consumerData = { | ||
onMessage: _this.workable({ | ||
channel, | ||
emitter: domainEventsEmitter | ||
}), | ||
options: { | ||
noAck: false | ||
} | ||
}; | ||
channel.consume(queueName, consumerData.onMessage, consumerData.options); | ||
}); | ||
}, function (error) { | ||
connection.close(); | ||
throw error; | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
} | ||
workable({ | ||
channel, | ||
emitter | ||
}) { | ||
return function (msg) { | ||
try { | ||
const message = toJSON(msg); | ||
const { | ||
data | ||
} = message; | ||
const { | ||
type: eventName | ||
} = data; | ||
try { | ||
emitter.emit(eventName, { | ||
message | ||
}); | ||
channel.ack(msg); | ||
} catch (error) { | ||
channel.nack(msg, false, false); | ||
} | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}; | ||
} | ||
} | ||
var transportsInitialization = (function ({ | ||
config | ||
}) { | ||
try { | ||
const { | ||
transports | ||
} = config; | ||
Object.entries(transports).forEach(function ([, transport]) { | ||
try { | ||
const { | ||
connectionString, | ||
exchange, | ||
queues = [] | ||
} = transport; | ||
return Promise.resolve(amqpConnect(connectionString)).then(function (connection) { | ||
return Promise.resolve(connection.createChannel()).then(function (channel) { | ||
const { | ||
name: exchangeName, | ||
type: exchangeType, | ||
options: exchangeOptions = {} | ||
} = exchange; | ||
return Promise.resolve(channel.assertExchange(exchangeName, exchangeType, exchangeOptions)).then(function () { | ||
return Promise.resolve(Promise.all(queues.map(function ({ | ||
name: queueName, | ||
bindingKey = null, | ||
options: queueOptions = {} | ||
}) { | ||
try { | ||
const pattern = bindingKey || ''; | ||
return Promise.resolve(channel.assertQueue(queueName, queueOptions)).then(function ({ | ||
queue: assertedQueue | ||
}) { | ||
return Promise.resolve(channel.bindQueue(assertedQueue, exchangeName, pattern)).then(function () {}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}))).then(function () { | ||
connection.close(); | ||
}); | ||
}); | ||
}); | ||
}); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
return Promise.resolve(); | ||
} catch (e) { | ||
return Promise.reject(e); | ||
} | ||
}); | ||
exports.CommandConsumer = CommandConsumer; | ||
exports.CommandEmitter = CommandEmitter; | ||
exports.DomainEventConsumer = DomainEventConsumer; | ||
exports.DomainEventEmitter = DomainEventEmitter; | ||
exports.EventDispatcher = EventDispatcher; | ||
exports.amqpConnect = amqpConnect; | ||
exports.commandSubscriber = commandSubscriber; | ||
exports.createCommand = createCommand; | ||
exports.createEvent = createEvent; | ||
exports.deadLetter = deadLetter; | ||
exports.domainEventSubscriber = domainEventSubscriber; | ||
exports.producer = producer; | ||
exports.pubSubInitialization = pubSubInitialization; | ||
exports.retryable = retryable; | ||
exports.toJSON = toJSON; | ||
exports.transportsInitialization = transportsInitialization; | ||
exports.worker = worker; | ||
@@ -269,0 +984,0 @@ |
{ | ||
"name": "@thecolvinco/nodejs-amqplib", | ||
"description": "RabbitMQ abstraction with some utils", | ||
"version": "1.0.0", | ||
"version": "2.0.0", | ||
"source": "src/main.ts", | ||
@@ -86,2 +86,3 @@ "main": "dist/main.umd.js", | ||
"amqplib": "^0.7.1", | ||
"events": "^3.3.0", | ||
"uuid": "^8.3.2" | ||
@@ -88,0 +89,0 @@ }, |
235
README.md
@@ -1,136 +0,141 @@ | ||
# Colvin nodejs rabbitmq utils | ||
# Colvin nodejs async events | ||
A RabbitMQ package with some utils like retryables to use across diferents projects. | ||
A RabbitMQ package with some utils like retryables | ||
### Usage | ||
#### Consumer/worker example | ||
```javascript | ||
import { amqpConnect, retryable, worker, deadLetter, toJSON } from '@thecolvinco/nodejs-amqplib'; | ||
const connectionString = 'amqp://rabbitmq:rabbitmq@localhost:5672'; | ||
const args = process.argv.slice(2); | ||
const queueName = args[0] || 'send-invoice-on-shopify-order-created'; | ||
const bindingKey = args[1] || 'blom.superapp.1.event.shopify.order-created'; | ||
const dlxExchange = args[2] || 'blom.superapp.dlx.direct'; | ||
const dlxRoutingKey = args[3] || 'blom.superapp.dlx'; | ||
const dlxQueueName = args[4] || 'blom-superapp-dlx-queue'; | ||
This library give you some classes, functions, types and interfaces ready to use. The idea behind this project has been inspired from messenger Symfony component so we have to define a config file and a couple of files. Let's see how to start it with. | ||
* Create a config file with a required structure | ||
* Setup your transports and bind the events | ||
* Create your events and commands with their respective handlers | ||
* Create your consumers | ||
const workable = (channel) => { | ||
return async (msg) => { | ||
const payload = toJSON(msg).payload; | ||
// Simulate some retryables | ||
if (payload === 'retry') { | ||
await retryable({ | ||
channel, | ||
message: msg, | ||
queue: { | ||
name: queueName, | ||
### Create a config file with a required structure | ||
You have to create a file with this structure. Later you must to import this file so you can put it in any place. Let's see an example explained | ||
```js | ||
export default { | ||
transports: { | ||
accounting_commands: { // A transport unique name as identifier | ||
connectionString: process.env.RABBITMQ_CONNECTION_URL, // A dns connection string like --> amqp://rabbitmq:rabbitmq@rabbitmq-blom:5672 | ||
exchange: { // the exchange for this transport where all message will be send it | ||
name: 'blom.accounting.commands.exchange', | ||
type: 'fanout', | ||
}, | ||
queues: [ | ||
{ | ||
name: 'blom.accounting.commands', // A queue name | ||
bindingKey: null, // A queue binding key. You can use null for fanout exchanges or wildcard string for topic ones | ||
options: { // All allowed queue options provided for the ampq node lib | ||
durable: true, | ||
deadLetterExchange: 'blom.accounting.exchange.dlx', | ||
deadLetterRoutingKey: 'blom.accounting.commands.dlx', | ||
}, | ||
retryPolicy: { // The retries policiy for this trasnport. The retries will be try in a exponential way (firstly in 2s, secondly in 4s, lastly in 6s) | ||
maxRetries: 3, | ||
delay: 2000, | ||
}, | ||
}, | ||
retryExchange: { | ||
name: 'retries.exchange', | ||
}, | ||
maxRetries: 3, | ||
delay: 1000, | ||
}); | ||
} else { | ||
channel.ack(msg); | ||
}; | ||
}; | ||
}; | ||
try { | ||
const connection = await amqpConnect(connectionString); | ||
const channel = await conn.createChannel(); | ||
const exchangeData = { | ||
name: 'blom.exchange.topic', | ||
type: 'topic', | ||
}; | ||
const queueData = { | ||
name: queueName, | ||
bindingKey, | ||
options: { | ||
durable: true, | ||
deadLetterExchange: dlxExchange, | ||
deadLetterRoutingKey: dlxRoutingKey | ||
], | ||
}, | ||
dlx: { | ||
func: deadLetter, | ||
params: { | ||
channel, | ||
dlxQueue: { | ||
name: dlxQueueName, | ||
}, | ||
dlxExchange: { | ||
name: dlxExchange, | ||
type: 'direct', | ||
}, | ||
// <----- others trasnports goes here | ||
}, | ||
subscribers: { | ||
domainEvents: [ // A mandatory key. All your domains events config goes under it | ||
{ | ||
eventName: 'blom.superapp.1.domain_event.order.updated', // The event which the built-in domain events subscribers will subscribe it | ||
commandsPath: [ // An array of commands paths witch will be created and dispached. These events (in this case commands) must implement the EventInterface | ||
'server/events/orders/command/SyncronizeOrderCommand.js', | ||
'server/events/orders/command/SendOrderNotificationCommand.js', | ||
], | ||
}, | ||
// <--- Other events goes here | ||
], | ||
commands: [ | ||
{ | ||
eventName: 'blom.accounting.1.command.order.syncronize_order', // The event which the built-in command subscribers will subscribe it | ||
handlerPath: 'server/events/orders/command/SyncronizeOrderCommandHandler.js', // The handler which will be invoke passing their Command as argument | ||
}, | ||
// <--- Other commands goes here | ||
], | ||
}, | ||
routing: { // A map of events wich will can dispatch through a specific transports | ||
SyncronizeOrderCommand: { // The key is the name of the class ([instance].constructor.name) | ||
transport: 'accounting_commands', | ||
}, | ||
}; | ||
// <--- Other events goes here | ||
}, | ||
}; | ||
const consumerData = { | ||
onMessage: workable(channel), | ||
options: { | ||
noAck: false | ||
}, | ||
}; | ||
``` | ||
--- | ||
### Setup your transports and bind the events | ||
There are two functions for setting up. You should use this functions before app init. | ||
```js | ||
import { pubSubInitialization, transportsInitialization } from '@thecolvinco/nodejs-amqplib'; | ||
import config from 'path-to-your-config-file'; | ||
await worker({ | ||
channel, | ||
exchange: exchangeData, | ||
queue: queueData, | ||
consumer: consumerData, | ||
}); | ||
const { | ||
eventDispatcher, | ||
domainEventEmitter, | ||
commandEmitter, | ||
} = pubSubInitialization({ config }); | ||
console.log("Wating for messages...."); | ||
} catch(error) { | ||
console.log(error) | ||
} | ||
transportsInitialization({ config }); | ||
``` | ||
--- | ||
### Create your events and commands with their respective handlers | ||
This is an example for a Command | ||
```js | ||
export default class SendOrderNotificationCommand { | ||
constructor({ message }) { | ||
this.message = message; | ||
} | ||
#### Producer example | ||
```javascript | ||
import { amqpConnect, producer, createEvent } from '@thecolvinco/nodejs-amqplib'; | ||
getName() { | ||
return 'blom.accounting.1.command.order.send_notification'; | ||
} | ||
const connectionString = 'amqp://rabbitmq:rabbitmq@localhost:5672'; | ||
const args = process.argv.slice(2); | ||
getPayload() { | ||
return this.message.data.attributes; | ||
} | ||
try { | ||
const connection = await amqpConnect(connectionString); | ||
const channel = await conn.createChannel(); | ||
const exchange = { | ||
name: 'blom.exchange.topic', | ||
type: 'topic', | ||
}; | ||
const messageBody = createEvent({ | ||
payload: args[0] || 'Hello world', | ||
meta: { | ||
company: 'blom', | ||
context: 'superapp', | ||
version: 1, | ||
entity: 'shopify', | ||
name: args[1] || 'order-created', | ||
}, | ||
}); | ||
const key = messageBody.name; | ||
const message = JSON.stringify(messageBody); | ||
getCorrelationId() { | ||
return this.message.data.messageId || null; | ||
} | ||
} | ||
``` | ||
This is an example for a command handler | ||
```js | ||
export default class SendOrderNotificationCommandHandler { | ||
constructor({ command }) { | ||
this.command = command; | ||
} | ||
await producer({ | ||
channel, | ||
message, | ||
key, | ||
exchange, | ||
}); | ||
handle() { | ||
console.log('Do something!'); | ||
} | ||
} | ||
``` | ||
--- | ||
### Create your consumers | ||
```js | ||
import { CommandConsumer } from '@thecolvinco/nodejs-amqplib'; | ||
import config from 'path-to-your-config'; | ||
console.log(`[x] Sent ${message} to ${exchange.name} with key ${key}`); | ||
const commandConsumer = new CommandConsumer({ config }); | ||
connection.close(); | ||
} catch (error) { | ||
console.log(error); | ||
} | ||
commandConsumer.consume({ | ||
transport: 'accounting_commands', | ||
queueName: 'blom.accounting.commands', | ||
}).then(() => { | ||
console.info('Waiting for messages....'); | ||
}).catch(error => { | ||
console.error(error.message); | ||
process.exit(); | ||
}); | ||
``` | ||
### About | ||
@@ -137,0 +142,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Dynamic require
Supply chain riskDynamic require can indicate the package is performing dangerous or unsafe dynamic code execution.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
251154
32
3469
149
5
3
1
+ Addedevents@^3.3.0
+ Addedevents@3.3.0(transitive)