resolve-bus-rabbitmq
Advanced tools
Comparing version 0.17.4 to 0.18.0
import amqp from 'amqplib'; | ||
import createAdapter from './create-adapter'; | ||
import wrapInit from './wrap-init'; | ||
import wrapMethod from './wrap-method'; | ||
import createAdapter from 'resolve-bus-base'; | ||
import onMessage from './on-message'; | ||
import init from './init'; | ||
import publish from './publish'; | ||
import subscribe from './subscribe'; | ||
import dispose from './dispose'; | ||
export default createAdapter.bind(null, wrapInit, wrapMethod, onMessage, init, publish, subscribe, dispose, amqp); | ||
export default createAdapter.bind(null, onMessage, init, publish, dispose, amqp); | ||
//# sourceMappingURL=index.js.map |
@@ -0,4 +1,11 @@ | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; var ownKeys = Object.keys(source); if (typeof Object.getOwnPropertySymbols === 'function') { ownKeys = ownKeys.concat(Object.getOwnPropertySymbols(source).filter(function (sym) { return Object.getOwnPropertyDescriptor(source, sym).enumerable; })); } ownKeys.forEach(function (key) { _defineProperty(target, key, source[key]); }); } return target; } | ||
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
import RabbitMQBusError from './rabbitmq-error'; | ||
import defaultOptions from './default-options'; | ||
const init = async (amqp, pool, onMessage) => { | ||
pool.config = _objectSpread({}, defaultOptions, pool.config); | ||
try { | ||
@@ -5,0 +12,0 @@ const { |
const onMessage = async (pool, message) => { | ||
if (!message || !message.content || typeof message.content.toString !== 'function') { | ||
return; | ||
} | ||
try { | ||
if (!message || !message.content || typeof message.content.toString !== 'function') { | ||
return; | ||
} | ||
const content = message.content.toString(); | ||
const event = JSON.parse(content); | ||
await Promise.resolve(); | ||
const content = message.content.toString(); | ||
const event = JSON.parse(content); | ||
const topics = pool.makeTopicsForEvent(event); | ||
const applicationPromises = []; | ||
try { | ||
await Promise.all(Array.from(pool.handlers).map(handler => handler(event))); | ||
for (const topic of topics) { | ||
const topicHandlers = pool.handlers.get(topic); | ||
if (topicHandlers == null) return; | ||
for (const handler of Array.from(topicHandlers)) { | ||
applicationPromises.push(handler(event)); | ||
} | ||
} | ||
await Promise.all(applicationPromises); | ||
} catch (error) { | ||
@@ -13,0 +23,0 @@ // eslint-disable-next-line no-console |
@@ -10,8 +10,4 @@ "use strict"; | ||
var _createAdapter = _interopRequireDefault(require("./create-adapter")); | ||
var _resolveBusBase = _interopRequireDefault(require("resolve-bus-base")); | ||
var _wrapInit = _interopRequireDefault(require("./wrap-init")); | ||
var _wrapMethod = _interopRequireDefault(require("./wrap-method")); | ||
var _onMessage = _interopRequireDefault(require("./on-message")); | ||
@@ -23,4 +19,2 @@ | ||
var _subscribe = _interopRequireDefault(require("./subscribe")); | ||
var _dispose = _interopRequireDefault(require("./dispose")); | ||
@@ -30,5 +24,5 @@ | ||
var _default = _createAdapter.default.bind(null, _wrapInit.default, _wrapMethod.default, _onMessage.default, _init.default, _publish.default, _subscribe.default, _dispose.default, _amqplib.default); | ||
var _default = _resolveBusBase.default.bind(null, _onMessage.default, _init.default, _publish.default, _dispose.default, _amqplib.default); | ||
exports.default = _default; | ||
//# sourceMappingURL=index.js.map |
@@ -10,5 +10,13 @@ "use strict"; | ||
var _defaultOptions = _interopRequireDefault(require("./default-options")); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; var ownKeys = Object.keys(source); if (typeof Object.getOwnPropertySymbols === 'function') { ownKeys = ownKeys.concat(Object.getOwnPropertySymbols(source).filter(function (sym) { return Object.getOwnPropertyDescriptor(source, sym).enumerable; })); } ownKeys.forEach(function (key) { _defineProperty(target, key, source[key]); }); } return target; } | ||
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
const init = async (amqp, pool, onMessage) => { | ||
pool.config = _objectSpread({}, _defaultOptions.default, pool.config); | ||
try { | ||
@@ -15,0 +23,0 @@ const { |
@@ -9,12 +9,22 @@ "use strict"; | ||
const onMessage = async (pool, message) => { | ||
if (!message || !message.content || typeof message.content.toString !== 'function') { | ||
return; | ||
} | ||
try { | ||
if (!message || !message.content || typeof message.content.toString !== 'function') { | ||
return; | ||
} | ||
const content = message.content.toString(); | ||
const event = JSON.parse(content); | ||
await Promise.resolve(); | ||
const content = message.content.toString(); | ||
const event = JSON.parse(content); | ||
const topics = pool.makeTopicsForEvent(event); | ||
const applicationPromises = []; | ||
try { | ||
await Promise.all(Array.from(pool.handlers).map(handler => handler(event))); | ||
for (const topic of topics) { | ||
const topicHandlers = pool.handlers.get(topic); | ||
if (topicHandlers == null) return; | ||
for (const handler of Array.from(topicHandlers)) { | ||
applicationPromises.push(handler(event)); | ||
} | ||
} | ||
await Promise.all(applicationPromises); | ||
} catch (error) { | ||
@@ -21,0 +31,0 @@ // eslint-disable-next-line no-console |
{ | ||
"name": "resolve-bus-rabbitmq", | ||
"version": "0.17.4", | ||
"version": "0.18.0", | ||
"description": "This package is an adapter for resolve-bus to emit events using RabbitMQ.", | ||
@@ -28,3 +28,5 @@ "engines": { | ||
"dependencies": { | ||
"amqplib": "^0.5.2" | ||
"@babel/runtime": "^7.0.0", | ||
"amqplib": "^0.5.2", | ||
"resolve-bus-base": "0.18.0" | ||
}, | ||
@@ -31,0 +33,0 @@ "devDependencies": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
29308
3
31
282
+ Added@babel/runtime@^7.0.0
+ Addedresolve-bus-base@0.18.0
+ Added@babel/runtime@7.26.7(transitive)
+ Addedregenerator-runtime@0.14.1(transitive)
+ Addedresolve-bus-base@0.18.0(transitive)