resolve-bus-rabbitmq
Advanced tools
Comparing version
116
es/index.js
@@ -1,109 +0,11 @@ | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; var ownKeys = Object.keys(source); if (typeof Object.getOwnPropertySymbols === 'function') { ownKeys = ownKeys.concat(Object.getOwnPropertySymbols(source).filter(function (sym) { return Object.getOwnPropertyDescriptor(source, sym).enumerable; })); } ownKeys.forEach(function (key) { _defineProperty(target, key, source[key]); }); } return target; } | ||
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
import amqp from 'amqplib'; | ||
const defaultOptions = { | ||
exchange: 'exchange', | ||
queueName: '', | ||
channelName: '', | ||
exchangeType: 'fanout', | ||
messageTtl: 2000, | ||
maxLength: 10000 | ||
}; | ||
class RabbitMQBusError extends Error { | ||
constructor(message, cause) { | ||
super(); | ||
this.name = 'RabbitMQ Bus Error'; | ||
this.message = message; | ||
if (cause) { | ||
this.cause = cause; | ||
} | ||
} | ||
} | ||
const init = async ({ | ||
url, | ||
exchange, | ||
exchangeType, | ||
queueName, | ||
messageTtl, | ||
maxLength | ||
}, handler) => { | ||
try { | ||
const connection = await amqp.connect(url); | ||
const channel = await connection.createChannel(); | ||
await channel.assertExchange(exchange, exchangeType, { | ||
durable: false | ||
}); | ||
const queue = await channel.assertQueue(queueName, { | ||
arguments: { | ||
messageTtl: messageTtl, | ||
maxLength: maxLength | ||
} | ||
}); | ||
await channel.bindQueue(queue.queue, exchange); | ||
await channel.consume(queueName, msg => { | ||
if (msg) { | ||
const content = msg.content.toString(); | ||
const message = JSON.parse(content); | ||
handler(message); | ||
} | ||
}, { | ||
noAck: true | ||
}); | ||
return channel; | ||
} catch (e) { | ||
throw new RabbitMQBusError(e.message, e.cause); | ||
} | ||
}; | ||
function createAdapter(options) { | ||
let handler = () => {}; | ||
const config = _objectSpread({}, defaultOptions, options); | ||
let initPromise = null; | ||
const { | ||
exchange, | ||
queueName, | ||
messageTtl | ||
} = config; | ||
return { | ||
init: async () => { | ||
if (!initPromise) { | ||
initPromise = init(config, event => handler(event)); | ||
} | ||
try { | ||
return await initPromise; | ||
} catch (e) { | ||
initPromise = null; | ||
throw e; | ||
} | ||
}, | ||
publish: async event => { | ||
if (!initPromise) { | ||
throw new RabbitMQBusError('Adapter is not initialized'); | ||
} | ||
const channel = await initPromise; | ||
return channel.publish(exchange, queueName, new Buffer(JSON.stringify(event)), // Additional options described here: | ||
// http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish | ||
{ | ||
expiration: messageTtl, | ||
persistent: false | ||
}); | ||
}, | ||
subscribe: async callback => handler = callback, | ||
close: async () => { | ||
initPromise = null; | ||
} | ||
}; | ||
} | ||
export default createAdapter; | ||
import createAdapter from './create-adapter'; | ||
import wrapInit from './wrap-init'; | ||
import wrapMethod from './wrap-method'; | ||
import onMessage from './on-message'; | ||
import init from './init'; | ||
import publish from './publish'; | ||
import subscribe from './subscribe'; | ||
import dispose from './dispose'; | ||
export default createAdapter.bind(null, wrapInit, wrapMethod, onMessage, init, publish, subscribe, dispose, amqp); | ||
//# sourceMappingURL=index.js.map |
109
lib/index.js
@@ -10,112 +10,23 @@ "use strict"; | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
var _createAdapter = _interopRequireDefault(require("./create-adapter")); | ||
function _objectSpread(target) { for (var i = 1; i < arguments.length; i++) { var source = arguments[i] != null ? arguments[i] : {}; var ownKeys = Object.keys(source); if (typeof Object.getOwnPropertySymbols === 'function') { ownKeys = ownKeys.concat(Object.getOwnPropertySymbols(source).filter(function (sym) { return Object.getOwnPropertyDescriptor(source, sym).enumerable; })); } ownKeys.forEach(function (key) { _defineProperty(target, key, source[key]); }); } return target; } | ||
var _wrapInit = _interopRequireDefault(require("./wrap-init")); | ||
function _defineProperty(obj, key, value) { if (key in obj) { Object.defineProperty(obj, key, { value: value, enumerable: true, configurable: true, writable: true }); } else { obj[key] = value; } return obj; } | ||
var _wrapMethod = _interopRequireDefault(require("./wrap-method")); | ||
const defaultOptions = { | ||
exchange: 'exchange', | ||
queueName: '', | ||
channelName: '', | ||
exchangeType: 'fanout', | ||
messageTtl: 2000, | ||
maxLength: 10000 | ||
}; | ||
var _onMessage = _interopRequireDefault(require("./on-message")); | ||
class RabbitMQBusError extends Error { | ||
constructor(message, cause) { | ||
super(); | ||
this.name = 'RabbitMQ Bus Error'; | ||
this.message = message; | ||
var _init = _interopRequireDefault(require("./init")); | ||
if (cause) { | ||
this.cause = cause; | ||
} | ||
} | ||
var _publish = _interopRequireDefault(require("./publish")); | ||
} | ||
var _subscribe = _interopRequireDefault(require("./subscribe")); | ||
const init = async ({ | ||
url, | ||
exchange, | ||
exchangeType, | ||
queueName, | ||
messageTtl, | ||
maxLength | ||
}, handler) => { | ||
try { | ||
const connection = await _amqplib.default.connect(url); | ||
const channel = await connection.createChannel(); | ||
await channel.assertExchange(exchange, exchangeType, { | ||
durable: false | ||
}); | ||
const queue = await channel.assertQueue(queueName, { | ||
arguments: { | ||
messageTtl: messageTtl, | ||
maxLength: maxLength | ||
} | ||
}); | ||
await channel.bindQueue(queue.queue, exchange); | ||
await channel.consume(queueName, msg => { | ||
if (msg) { | ||
const content = msg.content.toString(); | ||
const message = JSON.parse(content); | ||
handler(message); | ||
} | ||
}, { | ||
noAck: true | ||
}); | ||
return channel; | ||
} catch (e) { | ||
throw new RabbitMQBusError(e.message, e.cause); | ||
} | ||
}; | ||
var _dispose = _interopRequireDefault(require("./dispose")); | ||
function createAdapter(options) { | ||
let handler = () => {}; | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
const config = _objectSpread({}, defaultOptions, options); | ||
var _default = _createAdapter.default.bind(null, _wrapInit.default, _wrapMethod.default, _onMessage.default, _init.default, _publish.default, _subscribe.default, _dispose.default, _amqplib.default); | ||
let initPromise = null; | ||
const { | ||
exchange, | ||
queueName, | ||
messageTtl | ||
} = config; | ||
return { | ||
init: async () => { | ||
if (!initPromise) { | ||
initPromise = init(config, event => handler(event)); | ||
} | ||
try { | ||
return await initPromise; | ||
} catch (e) { | ||
initPromise = null; | ||
throw e; | ||
} | ||
}, | ||
publish: async event => { | ||
if (!initPromise) { | ||
throw new RabbitMQBusError('Adapter is not initialized'); | ||
} | ||
const channel = await initPromise; | ||
return channel.publish(exchange, queueName, new Buffer(JSON.stringify(event)), // Additional options described here: | ||
// http://www.squaremobius.net/amqp.node/channel_api.html#channel_publish | ||
{ | ||
expiration: messageTtl, | ||
persistent: false | ||
}); | ||
}, | ||
subscribe: async callback => handler = callback, | ||
close: async () => { | ||
initPromise = null; | ||
} | ||
}; | ||
} | ||
var _default = createAdapter; | ||
exports.default = _default; | ||
//# sourceMappingURL=index.js.map |
{ | ||
"name": "resolve-bus-rabbitmq", | ||
"version": "0.17.2", | ||
"version": "0.17.3", | ||
"description": "This package is an adapter for resolve-bus to emit events using RabbitMQ.", | ||
@@ -5,0 +5,0 @@ "engines": { |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
38966
94.92%47
571.43%383
90.55%1
Infinity%