Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp-message-bus

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-message-bus - npm Package Compare versions

Comparing version 1.1.1 to 2.0.0

5

CHANGELOG.md

@@ -0,1 +1,6 @@

## 2.0.0 - 2017-11-20
* Complete rewrite to separate queues from exchanges.
* Compatible with node v.7+.
## 1.1.1 - 2017-05-11

@@ -2,0 +7,0 @@

771

dist/MessageBus.js

@@ -1,357 +0,292 @@

'use strict';
'use strict';Object.defineProperty(exports, "__esModule", { value: true });var _crypto = require('crypto');var _crypto2 = _interopRequireDefault(_crypto);
var _bluebird = require('bluebird');var _bluebird2 = _interopRequireDefault(_bluebird);
var _isPlainObject = require('lodash/isPlainObject');var _isPlainObject2 = _interopRequireDefault(_isPlainObject);
var _isString = require('lodash/isString');var _isString2 = _interopRequireDefault(_isString);
var _isFunction = require('lodash/isFunction');var _isFunction2 = _interopRequireDefault(_isFunction);
var _isNull = require('lodash/isNull');var _isNull2 = _interopRequireDefault(_isNull);
var _isUndefined = require('lodash/isUndefined');var _isUndefined2 = _interopRequireDefault(_isUndefined);
var _isInteger = require('lodash/isInteger');var _isInteger2 = _interopRequireDefault(_isInteger);
var _inRange = require('lodash/inRange');var _inRange2 = _interopRequireDefault(_inRange);
var _omitBy = require('lodash/omitBy');var _omitBy2 = _interopRequireDefault(_omitBy);
var _typeof = require('typeof');var _typeof2 = _interopRequireDefault(_typeof);
var _amqplib = require('amqplib');var _amqplib2 = _interopRequireDefault(_amqplib);
var _uuid = require('uuid');var _uuid2 = _interopRequireDefault(_uuid);function _interopRequireDefault(obj) {return obj && obj.__esModule ? obj : { default: obj };}function _asyncToGenerator(fn) {return function () {var gen = fn.apply(this, arguments);return new _bluebird2.default(function (resolve, reject) {function step(key, arg) {try {var info = gen[key](arg);var value = info.value;} catch (error) {reject(error);return;}if (info.done) {resolve(value);} else {return _bluebird2.default.resolve(value).then(function (value) {step("next", value);}, function (err) {step("throw", err);});}}return step("next");});};}
Object.defineProperty(exports, "__esModule", {
value: true
});
var _bluebird = require('bluebird');
var _createClass = function () { function defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } return function (Constructor, protoProps, staticProps) { if (protoProps) defineProperties(Constructor.prototype, protoProps); if (staticProps) defineProperties(Constructor, staticProps); return Constructor; }; }();
var _crypto = require('crypto');
var _crypto2 = _interopRequireDefault(_crypto);
var _isPlainObject = require('lodash/isPlainObject');
var _isPlainObject2 = _interopRequireDefault(_isPlainObject);
var _isString = require('lodash/isString');
var _isString2 = _interopRequireDefault(_isString);
var _isFunction = require('lodash/isFunction');
var _isFunction2 = _interopRequireDefault(_isFunction);
var _isUndefined = require('lodash/isUndefined');
var _isUndefined2 = _interopRequireDefault(_isUndefined);
var _isInteger = require('lodash/isInteger');
var _isInteger2 = _interopRequireDefault(_isInteger);
var _inRange = require('lodash/inRange');
var _inRange2 = _interopRequireDefault(_inRange);
var _omitBy = require('lodash/omitBy');
var _omitBy2 = _interopRequireDefault(_omitBy);
var _typeof = require('typeof');
var _typeof2 = _interopRequireDefault(_typeof);
var _amqplib = require('amqplib');
var _amqplib2 = _interopRequireDefault(_amqplib);
var _uuid = require('uuid');
var _uuid2 = _interopRequireDefault(_uuid);
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; }
function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }
var MessageBus = function () {
class MessageBus {
/**
* Constructs new message bus with the supplied properties.
* @param {Object} spec message bus properties
* @property {string} spec.url AMQP server URL
* @constructor
*/
function MessageBus(spec) {
_classCallCheck(this, MessageBus);
* Constructs new message bus with the supplied properties.
* @param {Object} props message bus properties
* @property {string} props.url AMQP server URL
* @property {string} [props.encryptionKey]
* @constructor
*/
constructor(props) {
if (!(0, _isPlainObject2.default)(props)) {
throw new TypeError(`Invalid props; expected plain object, received ${(0, _typeof2.default)(props)}`);
}
if (!(0, _isPlainObject2.default)(spec)) throw new TypeError('Invalid "spec" param; expected plain object, received ' + (0, _typeof2.default)(spec));
const {
url,
encryptionKey = null } =
props;
var url = spec.url,
queue = spec.queue,
encryptionKey = spec.encryptionKey;
if (!(0, _isString2.default)(url)) {
throw new TypeError(`Invalid url property; expected string, received ${(0, _typeof2.default)(url)}`);
}
if (!((0, _isString2.default)(encryptionKey) || (0, _isNull2.default)(encryptionKey))) {
throw new TypeError(`Invalid encryptionKey property; expected string, received ${(0, _typeof2.default)(encryptionKey)}`);
}
if (!(0, _isString2.default)(url)) throw new TypeError('Invalid "url" property; expected string, received ' + (0, _typeof2.default)(url));
if (!(0, _isString2.default)(queue)) throw new TypeError('Invalid "queue" property; expected string, received ' + (0, _typeof2.default)(queue));
if (!((0, _isString2.default)(encryptionKey) || (0, _isUndefined2.default)(encryptionKey))) throw new TypeError('Invalid "encryptionKey" property; expected string, received ' + (0, _typeof2.default)(encryptionKey));
this.url = url;
this.queue = queue;
this.encryptionKey = encryptionKey;
this.consumers = new Map();
this.conn = null;
this.publisherChannel = null;
this.subscriberChannel = null;
this.consumerTag = null;
this.incomingChannel = null;
this.outgoingChannel = null;
}
/**
* Encrypts the supplied JSON object and returns a new buffer.
* @param {Object} obj
* @returns {Buffer}
*/
_createClass(MessageBus, [{
key: 'encrypt',
value: function encrypt(json) {
if (this.encryptionKey === undefined) {
return Buffer.from(JSON.stringify(json), 'utf8');
}
var cipher = _crypto2.default.createCipher('aes128', this.encryptionKey);
var buf1 = cipher.update(JSON.stringify(json), 'utf8');
var buf2 = cipher.final();
return Buffer.concat([buf1, buf2], buf1.length + buf2.length);
}
/**
* Decrypts the supplied buffer and returns a JSON object.
* @param {Buffer} buf
* @returns {Object}
*/
}, {
key: 'decrypt',
value: function decrypt(buf) {
if (this.encryptionKey === undefined) {
return JSON.parse(buf.toString('utf8'));
}
var decipher = _crypto2.default.createDecipher('aes128', this.encryptionKey);
var str = [decipher.update(buf, 'utf8'), decipher.final('utf8')].join('');
return JSON.parse(str);
}
/**
* Connects to AMQP server.
* @returns {Promise}
*/
connect() {var _this = this;return _asyncToGenerator(function* () {
// make sure not already connected
if (_this.conn) {
return; // exit
}
}, {
key: 'connect',
value: function () {
var _ref = (0, _bluebird.coroutine)(regeneratorRuntime.mark(function _callee() {
return regeneratorRuntime.wrap(function _callee$(_context) {
while (1) {
switch (_context.prev = _context.next) {
case 0:
if (!(this.conn !== null)) {
_context.next = 2;
break;
}
// create connection
_this.conn = yield _amqplib2.default.connect(_this.url);
_this.conn.on('error', function (err) {
console.error(err);
});
_this.conn.on('close', function () {
_this.reconnect();
});
return _context.abrupt('return');
// create for incoming / outgoing messages
_this.incomingChannel = yield _this.conn.createChannel();
_this.outgoingChannel = yield _this.conn.createConfirmChannel();
case 2:
_context.next = 4;
return _amqplib2.default.connect(this.url);
yield _this.incomingChannel.prefetch(1);})();
}
case 4:
this.conn = _context.sent;
_context.next = 7;
return this.conn.createChannel();
case 7:
this.subscriberChannel = _context.sent;
_context.next = 10;
return this.conn.createConfirmChannel();
case 10:
this.publisherChannel = _context.sent;
_context.next = 13;
return this.subscriberChannel.assertQueue(this.queue, {
durable: true,
maxPriority: 10
});
case 13:
_context.next = 15;
return this.subscriberChannel.prefetch(1);
case 15:
case 'end':
return _context.stop();
}
}
}, _callee, this);
}));
function connect() {
return _ref.apply(this, arguments);
}
return connect;
}()
/**
/**
* Disconnects from AMQP server.
* @returns {Promise}
*/
disconnect() {var _this2 = this;return _asyncToGenerator(function* () {
// make sure not disconnected
if (!_this2.conn) {
return; // exit
}
}, {
key: 'disconnect',
value: function () {
var _ref2 = (0, _bluebird.coroutine)(regeneratorRuntime.mark(function _callee2() {
return regeneratorRuntime.wrap(function _callee2$(_context2) {
while (1) {
switch (_context2.prev = _context2.next) {
case 0:
if (!(this.conn === null)) {
_context2.next = 2;
break;
}
// unsubscribe any active consumer(s)
yield _bluebird2.default.all(
Array.from(_this2.consumers.keys()).map(function (consumerTag) {
return _this2.unsubscribe(consumerTag);
}));
return _context2.abrupt('return');
case 2:
if (!(this.consumerTag !== null)) {
_context2.next = 6;
break;
}
// close connection
_this2.conn.removeAllListeners();
yield _this2.conn.close();
_context2.next = 5;
return this.subscriberChannel.cancel(this.consumerTag);
// reset local state
_this2.conn = null;
_this2.incomingChannel = null;
_this2.outgoingChannel = null;})();
}
case 5:
this.consumerTag = null;
/**
* Reconnects to AMQP server.
* @returns {Promise}
*/
reconnect() {var _this3 = this;return _asyncToGenerator(function* () {
_this3.conn.removeAllListeners();
_this3.conn = null; // you need this otherwise connect() will exit prematurily
case 6:
_context2.next = 8;
return this.conn.close();
while (true) {
yield _bluebird2.default.delay(1000);
try {
yield _this3.connect();
break; // exit loop
} catch (err) {
// do nothing
}
}})();
}
case 8:
/**
* Encrypts the supplied payload and returns a new buffer.
* @param {string} encryptionKey
* @returns {Buffer}
*/
encrypt(payload) {
if (this.encryptionKey == null) {
return Buffer.from(JSON.stringify(payload), 'utf8');
}
this.conn = null;
this.subscriberChannel = null;
this.publisherChannel = null;
const cipher = _crypto2.default.createCipher('aes128', this.encryptionKey);
const buf1 = cipher.update(JSON.stringify(payload), 'utf8');
const buf2 = cipher.final();
return Buffer.concat([buf1, buf2], buf1.length + buf2.length);
}
case 11:
case 'end':
return _context2.stop();
}
}
}, _callee2, this);
}));
/**
* Decrypts the supplied buffer and returns its payload.
* @param {Buffer} buf
* @param {string} encryptionKey
* @returns {Object}
*/
decrypt(buf) {
if (this.encryptionKey == null) {
return JSON.parse(buf.toString('utf8'));
}
function disconnect() {
return _ref2.apply(this, arguments);
}
const decipher = _crypto2.default.createDecipher('aes128', this.encryptionKey);
const str = [
decipher.update(buf, 'utf8'),
decipher.final('utf8')].
join('');
return disconnect;
}()
return JSON.parse(str);
}
/**
* Subscribes to the message bus for incoming messages.
* @param {Function<Object, Object, Function>} listener a function with (msg, props, done) arguments
/**
* Subscribes to the designated queue for messages.
* @param {string} queue
* @param {Function<Object, Object, Function>} listener i.e. function(msg, props, done) {}
* @returns {Promise<Function>} resolving to an unsubscribe method
* @see {@link http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue} for further info on option properties.
*/
subscribe(queue, listener) {var _this4 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);
}
if (!(0, _isFunction2.default)(listener)) {
throw new TypeError(`Invalid listener; expected function, received ${(0, _typeof2.default)(listener)}`);
}
}, {
key: 'subscribe',
value: function () {
var _ref3 = (0, _bluebird.coroutine)(regeneratorRuntime.mark(function _callee4(listener) {
var _this = this;
if (!_this4.conn) {
throw new Error('Cannot subscribe to queue; did you forget to call #connect()');
}
return regeneratorRuntime.wrap(function _callee4$(_context4) {
while (1) {
switch (_context4.prev = _context4.next) {
case 0:
if ((0, _isFunction2.default)(listener)) {
_context4.next = 2;
break;
}
if (_this4.consumerTag) {
throw new Error('Subscription already active; cannot open multiple subscriptions to the same message bus');
}
throw new TypeError('Invalid "callback" param; expected function, received ' + (0, _typeof2.default)(listener));
// create unique consumer tag
const consumerTag = _uuid2.default.v4();
case 2:
if (!(this.subscriberChannel === null)) {
_context4.next = 4;
break;
}
try {
// subscribe to channel
yield _this4.incomingChannel.consume(queue, function (msg) {
listener(
_this4.decrypt(msg.content),
(0, _omitBy2.default)(msg.properties, _isUndefined2.default),
function (err) {return _this4.incomingChannel[err ? 'nack' : 'ack'](msg);});
throw new Error('Cannot subscribe to message bus; did you forget to call #connect()');
}, {
consumerTag,
noAck: false // explicitely ack messages when done
});
case 4:
if (!this.consumerTag) {
_context4.next = 6;
break;
}
// return unsubscribe() method
return _this4.unsubscribe.bind(_this4, consumerTag);
} finally {
// add consumerTag to consumers registry
_this4.consumers.set(consumerTag, queue);
}})();
}
throw new Error('Subscription already active; cannot open multiple subscriptions to the same message bus');
/**
* Unsubscribes the designated consumer.
* @param {string} consumerTag
* @returns {Promise}
*/
unsubscribe(consumerTag) {var _this5 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(consumerTag)) {
throw new TypeError(`Invalid consumerTag; expected string, received ${(0, _typeof2.default)(consumerTag)}`);
}
if (!_this5.consumers.has(consumerTag)) {
throw new Error(`Unknown consumer tag ${consumerTag}`);
}
case 6:
_context4.prev = 6;
yield _this5.incomingChannel.cancel(consumerTag);
_this5.consumers.delete(consumerTag);})();
}
// create unique consumer tag
this.consumerTag = _uuid2.default.v4();
/**
* Publishes the supplied message to the given exchange.
* @param {string} exchange
* @param {*} message can be any JSON serializable value, incl. Object and Array.
* @param {Object} [props]
* @property {number} [props.priority=1] message priority must be between 1 and 10.
* @property {string} [props.type]
* @property {string} [props.messageId=uuid.v4()]
* @property {number} [props.timestamp=Date.now()]
* @returns {Promise<boolean>}
*/
publish(exchange, routingKey, message, props = {}) {var _this6 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(exchange)) {
throw new TypeError(`Invalid exchange; expected string, received ${(0, _typeof2.default)(exchange)}`);
}
if (!(0, _isString2.default)(routingKey)) {
throw new TypeError(`Invalid routingKey; expected string, received ${(0, _typeof2.default)(routingKey)}`);
}
if ((0, _isUndefined2.default)(message)) {
throw new TypeError('Invalid message; must be specified');
}
if (!(0, _isPlainObject2.default)(props)) {
throw new TypeError(`Invalid props; expected plain object, received ${(0, _typeof2.default)(props)}`);
}
// subscribe to channel
_context4.next = 10;
return this.subscriberChannel.consume(this.queue, function (msg) {
var done = function done(err) {
if (err) {
_this.subscriberChannel.nack(msg);
} else {
_this.subscriberChannel.ack(msg);
}
};
const {
type,
priority = 1,
messageId = _uuid2.default.v4(),
timestamp = Date.now() } =
props;
var content = _this.decrypt(msg.content);
var props = (0, _omitBy2.default)(msg.properties, _isUndefined2.default);
if (!(0, _isInteger2.default)(priority)) {
throw new TypeError(`Invalid "priority" property; expected integer, received ${(0, _typeof2.default)(priority)}`);
}
if (!(0, _inRange2.default)(priority, 1, 11)) {
throw new TypeError('Invalid "priority" property; must be between 1 and 10');
}
if (!(0, _isString2.default)(messageId)) {
throw new TypeError(`Invalid "messageId" property; expected string, received ${(0, _typeof2.default)(messageId)}`);
}
if (!((0, _isString2.default)(type) || (0, _isUndefined2.default)(type))) {
throw new TypeError(`Invalid "type" property; expected string, received ${(0, _typeof2.default)(type)}`);
}
if (!(0, _isInteger2.default)(timestamp)) {
throw new TypeError(`Invalid "timestamp" property; expected integer, received ${(0, _typeof2.default)(timestamp)}`);
}
listener(content, props, done);
}, {
consumerTag: this.consumerTag,
noAck: false // explicitely ack messages when done
});
// make sure connection is open
if (!_this6.conn) {
throw new Error('Cannot publish to exchange; did you forget to call #connect()');
}
case 10:
return _context4.abrupt('return', (0, _bluebird.coroutine)(regeneratorRuntime.mark(function _callee3() {
return regeneratorRuntime.wrap(function _callee3$(_context3) {
while (1) {
switch (_context3.prev = _context3.next) {
case 0:
_context3.next = 2;
return _this.subscriberChannel.cancel(_this.consumerTag);
return new _bluebird2.default(function (resolve) {
_this6.outgoingChannel.publish(
exchange,
routingKey,
_this6.encrypt(message),
{
messageId,
type,
priority,
timestamp },
case 2:
_this.consumerTag = null;
function () {return resolve();});
case 3:
case 'end':
return _context3.stop();
}
}
}, _callee3, _this);
})));
});})();
}
case 13:
_context4.prev = 13;
_context4.t0 = _context4['catch'](6);
this.consumerTag = null;
throw _context4.t0;
case 17:
case 'end':
return _context4.stop();
}
}
}, _callee4, this, [[6, 13]]);
}));
function subscribe(_x) {
return _ref3.apply(this, arguments);
}
return subscribe;
}()
/**
* Published the supplied message to the given queue.
* @param {*} content can be any JSON serializable value, incl. Object and Array.
/**
* Sends the supplied message to the given queue.
* @param {string} queue
* @param {*} message can be any JSON serializable value, incl. Object and Array.
* @param {Object} [props]

@@ -364,106 +299,176 @@ * @property {number} [props.priority=1] message priority must be between 1 and 10.

*/
sendToQueue(queue, message, props = {}) {var _this7 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);
}
if ((0, _isUndefined2.default)(message)) {
throw new TypeError('Invalid message; must be specified');
}
if (!(0, _isPlainObject2.default)(props)) {
throw new TypeError(`Invalid props; expected plain object, received ${(0, _typeof2.default)(props)}`);
}
}, {
key: 'publish',
value: function () {
var _ref5 = (0, _bluebird.coroutine)(regeneratorRuntime.mark(function _callee5(content) {
var props = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {};
const {
type,
priority = 1,
messageId = _uuid2.default.v4(),
timestamp = Date.now() } =
props;
var type, _props$priority, priority, _props$messageId, messageId, _props$timestamp, timestamp;
if (!(0, _isInteger2.default)(priority)) {
throw new TypeError(`Invalid "priority" property; expected integer, received ${(0, _typeof2.default)(priority)}`);
}
if (!(0, _inRange2.default)(priority, 1, 11)) {
throw new TypeError('Invalid "priority" property; must be between 1 and 10');
}
if (!(0, _isString2.default)(messageId)) {
throw new TypeError(`Invalid "messageId" property; expected string, received ${(0, _typeof2.default)(messageId)}`);
}
if (!((0, _isString2.default)(type) || (0, _isUndefined2.default)(type))) {
throw new TypeError(`Invalid "type" property; expected string, received ${(0, _typeof2.default)(type)}`);
}
if (!(0, _isInteger2.default)(timestamp)) {
throw new TypeError(`Invalid "timestamp" property; expected integer, received ${(0, _typeof2.default)(timestamp)}`);
}
return regeneratorRuntime.wrap(function _callee5$(_context5) {
while (1) {
switch (_context5.prev = _context5.next) {
case 0:
if (!(0, _isUndefined2.default)(content)) {
_context5.next = 2;
break;
}
// make sure connection is open
if (!_this7.conn) {
throw new Error('Cannot send to queue; did you forget to call #connect()');
}
throw new TypeError('Invalid "content" param; must be specified');
return new _bluebird2.default(function (resolve) {
_this7.outgoingChannel.sendToQueue(
queue,
_this7.encrypt(message),
{
messageId,
type,
priority,
timestamp },
case 2:
if ((0, _isPlainObject2.default)(props)) {
_context5.next = 4;
break;
}
function () {return resolve();});
throw new TypeError('Invalid "props" param; expected plain object, received ' + (0, _typeof2.default)(props));
});})();
}
case 4:
type = props.type, _props$priority = props.priority, priority = _props$priority === undefined ? 1 : _props$priority, _props$messageId = props.messageId, messageId = _props$messageId === undefined ? _uuid2.default.v4() : _props$messageId, _props$timestamp = props.timestamp, timestamp = _props$timestamp === undefined ? Date.now() : _props$timestamp;
assertExchange(exchange, type, options = {}) {var _this8 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(exchange)) {
throw new TypeError(`Invalid exchange; expected string, received ${(0, _typeof2.default)(exchange)}`);
}
if (!(0, _isString2.default)(type)) {
throw new TypeError(`Invalid type; expected string, received ${(0, _typeof2.default)(type)}`);
}
if (!(0, _isPlainObject2.default)(options)) {
throw new TypeError(`Invalid options; expected plain object, received ${(0, _typeof2.default)(options)}`);
}
if ((0, _isInteger2.default)(priority)) {
_context5.next = 7;
break;
}
// make sure connection is open
if (!_this8.conn) {
throw new Error('Cannot assert exchange; did you forget to call #connect()');
}
throw new TypeError('Invalid "priority" property; expected integer, received ' + (0, _typeof2.default)(priority));
return _this8.incomingChannel.assertExchange(exchange, type, options);})();
}
case 7:
if ((0, _inRange2.default)(priority, 1, 11)) {
_context5.next = 9;
break;
}
assertQueue(queue, options = {}) {var _this9 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);
}
if (!(0, _isPlainObject2.default)(options)) {
throw new TypeError(`Invalid options; expected plain object, received ${(0, _typeof2.default)(options)}`);
}
throw new TypeError('Invalid "priority" property; must be between 1 and 10');
// make sure connection is open
if (!_this9.conn) {
throw new Error('Cannot assert queue; did you forget to call #connect()');
}
case 9:
if ((0, _isString2.default)(messageId)) {
_context5.next = 11;
break;
}
return _this9.incomingChannel.assertQueue(queue, options);})();
}
throw new TypeError('Invalid "messageId" property; expected string, received ' + (0, _typeof2.default)(messageId));
deleteQueue(queue, options = {}) {var _this10 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);
}
if (!(0, _isPlainObject2.default)(options)) {
throw new TypeError(`Invalid options; expected plain object, received ${(0, _typeof2.default)(options)}`);
}
case 11:
if ((0, _isString2.default)(type) || (0, _isUndefined2.default)(type)) {
_context5.next = 13;
break;
}
// make sure connection is open
if (!_this10.conn) {
throw new Error('Cannot delete queue; did you forget to call #connect()');
}
throw new TypeError('Invalid "type" property; expected string, received ' + (0, _typeof2.default)(type));
// unsubscribe any queue consumer
yield _bluebird2.default.all(
Array.from(_this10.consumers).
filter(function ([key, value]) {return value === queue;}).
map(function ([key]) {return _this10.unsubscribe(key);}));
case 13:
if ((0, _isInteger2.default)(timestamp)) {
_context5.next = 15;
break;
}
throw new TypeError('Invalid "timestamp" property; expected integer, received ' + (0, _typeof2.default)(timestamp));
return _this10.incomingChannel.deleteQueue(queue, options);})();
}
case 15:
if (!(this.publisherChannel === null)) {
_context5.next = 17;
break;
}
bindQueue(queue, source, pattern) {var _this11 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);
}
if (!(0, _isString2.default)(source)) {
throw new TypeError(`Invalid source; expected string, received ${(0, _typeof2.default)(source)}`);
}
if (!(0, _isString2.default)(pattern)) {
throw new TypeError(`Invalid pattern; expected string, received ${(0, _typeof2.default)(pattern)}`);
}
throw new Error('Cannot publish to message bus; did you forget to call #connect()');
// make sure connection is open
if (!_this11.conn) {
throw new Error('Cannot assert queue; did you forget to call #connect()');
}
case 17:
return _this11.incomingChannel.bindQueue(queue, source, pattern);})();
}
this.publisherChannel.sendToQueue(this.queue, this.encrypt(content), { messageId: messageId, type: type, priority: priority, timestamp: timestamp });
unbindQueue(queue, source, pattern) {var _this12 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);
}
if (!(0, _isString2.default)(source)) {
throw new TypeError(`Invalid source; expected string, received ${(0, _typeof2.default)(source)}`);
}
if (!(0, _isString2.default)(pattern)) {
throw new TypeError(`Invalid pattern; expected string, received ${(0, _typeof2.default)(pattern)}`);
}
return _context5.abrupt('return', this.publisherChannel.waitForConfirms());
// make sure connection is open
if (!_this12.conn) {
throw new Error('Cannot assert queue; did you forget to call #connect()');
}
case 19:
case 'end':
return _context5.stop();
}
}
}, _callee5, this);
}));
return _this12.incomingChannel.unbindQueue(queue, source, pattern);})();
}
function publish(_x2) {
return _ref5.apply(this, arguments);
}
// /**
// * Indicates whether the designated queue already exists.
// * @param {string} queue
// * @returns {Promise}
// */
// async existsQueue(queue) {
// if (!isString(queue)) {
// throw new TypeError(`Invalid queue; expected string, received ${typeOf(queue)}`);
// }
return publish;
}()
}]);
// // make sure connection is open
// if (!this.conn) {
// throw new Error('Unable to check queue existence; did you forget to call #connect()');
// }
return MessageBus;
}();
// try {
// const response = await this.incomingChannel.checkQueue(queue);
// return response.queue === queue;
// } catch (err) {
// return false; // TODO: Find out why this shit causes the connection to close
// }
// }
}exports.default =
exports.default = MessageBus;
module.exports = exports['default'];
MessageBus;module.exports = exports['default'];
{
"name": "amqp-message-bus",
"version": "1.1.1",
"version": "2.0.0",
"description": "Node.js message bus interface for AMQP servers, such as RabbitMQ.",

@@ -23,3 +23,3 @@ "keywords": [

"engines": {
"node": ">=6.0.0"
"node": ">=7.6"
},

@@ -36,24 +36,23 @@ "main": "dist/MessageBus.js",

"dependencies": {
"amqplib": "^0.5.1",
"bluebird": "^3.5.0",
"amqplib": "^0.5.2",
"bluebird": "^3.5.1",
"dotenv": "^4.0.0",
"lodash": "^4.17.2",
"typeof": "^1.0.0",
"uuid": "^3.0.1"
"uuid": "^3.1.0"
},
"devDependencies": {
"babel-cli": "^6.24.1",
"babel-jest": "^20.0.0",
"babel-cli": "^6.26.0",
"babel-jest": "^21.2.0",
"babel-plugin-add-module-exports": "^0.2.1",
"babel-plugin-transform-async-to-module-method": "^6.24.1",
"babel-plugin-transform-es2015-modules-commonjs": "^6.24.1",
"babel-preset-es2015": "^6.24.1",
"eslint": "^3.19.0",
"eslint-config-airbnb": "^14.1.0",
"eslint-plugin-import": "^2.2.0",
"eslint-plugin-jsx-a11y": "^3.0.2",
"eslint-plugin-react": "^6.9.0",
"jest-cli": "^20.0.0",
"rimraf": "^2.6.1"
"babel-plugin-transform-es2015-modules-commonjs": "^6.26.0",
"babel-preset-es2017": "^6.24.1",
"eslint": "^4.11.0",
"eslint-config-airbnb": "^16.1.0",
"eslint-plugin-import": "^2.8.0",
"eslint-plugin-jsx-a11y": "^6.0.2",
"eslint-plugin-react": "^7.5.1",
"jest-cli": "^21.2.1",
"rimraf": "^2.6.2"
}
}

@@ -5,9 +5,9 @@ # AMQP Message Bus

[![Build Status](https://travis-ci.org/controlly/ampq-message-bus.svg?branch=master)](https://travis-ci.org/controlly/ampq-message-bus) [![npm version](https://badge.fury.io/js/ampq-message-bus.svg)](https://badge.fury.io/js/ampq-message-bus)
[![Build Status](https://travis-ci.org/yeepio/amqp-message-bus.svg?branch=master)](https://travis-ci.org/yeepio/amqp-message-bus) [![npm version](https://badge.fury.io/js/amqp-message-bus.svg)](https://badge.fury.io/js/amqp-message-bus)
#### Features
* Message bus API hides the complexity of AMQP connectors;
* Supports symmetric message encryption;
* Works pefectly fine with async/await.
* Hides the complexity of AMQP client;
* Comes with build-in symmetric message encryption;
* Supports promises + async/await.

@@ -22,12 +22,6 @@ ## Installation

* Node.js v.6+
* Node.js v.7+
## Quick start
Install `amqp-message-bus` from npm.
```
$ npm install amqp-message-bus --save
```
Create new message bus.

@@ -39,3 +33,2 @@

const bus = new MessageBus({
queue: 'tasks',
url: 'amqp://localhost',

@@ -46,24 +39,8 @@ encryptionKey: 'keep-it-safe'

Connect to AMQP server and subscribe for messages.
Connect to AMQP server and subscribe to queue for messages.
```javascript
bus.connect()
.then(() => bus.subscribe((msg, props, done) => {
// process msg + props
console.log(`Received message ${props.messageId} with priority ${props.priority}, published on ${props.timestamp}`);
// call done when message is done processing to remove from rabbitmq
done();
}))
// call this when you want to unsubscribe...
.then((unsubscribe) => unsubscribe())
// always catch errors with promises :-)
.catch((err) => console.error(err));
```
The same looks much better using async/await.
```javascript
await bus.connect();
const unsubscribe = await bus.subscribe((msg, props, done) => {
const unsubscribe = await bus.subscribe('my_queue', (msg, props, done) => {
// process msg + props

@@ -75,15 +52,29 @@ console.log(`Received message ${props.messageId} with priority ${props.priority}, published on ${props.timestamp}`);

// call this when you want to unsubscribe...
// unsubscribe from queue
await unsubscribe();
// disconnect from bus
await bus.disconnect();
```
Connect to AMQP server, publish message and immediately disconnect.
Connect to AMQP server, create queue (if not exists) and send message.
```javascript
bus.connect()
.then(() => bus.publish({ foo: 1, bar: 2 }))
.catch((err) => console.error(err))
.finally(() => bus.disconnect);
await bus.connect();
await bus.assertQueue('my_queue');
await bus.sendToQueue('my_queue', { foo: 1, bar: 2 });
await bus.disconnect();
```
Connect to AMQP server, create topic exchange and publish message to route.
```javascript
await bus.connect();
await bus.assertExchange('my_exchange', 'topic'); // note the 2nd argument (i.e. "topic") used to create a topic exchange
await bus.assertQueue('my_queue');
await bus.bindQueue('my_queue', 'my_exchange', 'route.1'); // note the 3rd argument (i.e. route.1)
await bus.publish('my_exchange', 'route.1', { foo: 1, bar: 2 }); // note the 3rd argument (i.e. route.1)
await bus.disconnect();
```
## API Docs

@@ -95,14 +86,12 @@

##### Arguments
#### Arguments
1. `spec` _(Object)_ message bus properties (required).
* `spec.url` _(string)_ AMQP server URL (required).
* `spec.queue` _(string)_ the name of the queue to subscribe to (required).
* `spec.encryptionKey` _(string)_ encryption key to use with assymetric encryption (optional). Signifies no encryption if left unspecified.
- **props** _(Object)_ message bus properties (required).
- **props.url** _(string)_ AMQP server URL (required).
- **props.encryptionKey** _(string)_ encryption key to use with symmetric encryption (optional).
##### Example
#### Example
```javascript
const bus = new MessageBus({
queue: 'tasks',
url: 'amqp://localhost',

@@ -115,9 +104,9 @@ encryptionKey: 'keep-it-safe'

Connects to AMQP server using the connection properties specified at construction time.
Connects to AMQP server.
##### Returns
#### Returns
Returns a native Promise.
`Promise`
##### Example
#### Example

@@ -127,3 +116,3 @@ ```javascript

.then(() => {
console.log('Connected to amqp server');
console.log('Connected to rabbitmq');
})

@@ -139,7 +128,7 @@ .catch((err) => {

##### Returns
#### Returns
Returns a native Promise.
`Promise`
##### Example
#### Example

@@ -149,3 +138,3 @@ ```javascript

.then(() => {
console.log('Disconnected from amqp server');
console.log('Disconnected from rabbitmq');
})

@@ -157,24 +146,24 @@ .catch((err) => {

### <a name="subscribe" href="subscribe">#</a>subscribe(listener) -> Promise\<Function\>
### <a name="subscribe" href="subscribe">#</a>subscribe(queue, listener)
Subscribes to the message bus for incoming messages.
Subscribes to the designated queue for incoming messages.
##### Arguments
#### Arguments
1. `listener` _(Function\<Object, Object, Function\>)_ listener function (required).
- **queue** _(string)_ the name of the queue to subscribe to
- **listener** _(Function)_ listener function, i.e. `function(msg, props, done)` (required).
- **msg** _(Object)_ message body (required).
- **props** _(Object)_ message meta-data (required).
- **done** _(Function)_ call done to signal message proccessing is done (required).
###### Listener function arguments
Please visit [http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue) for further info on `props` meta-data.
1. `msg` _(Object)_ message body (required).
2. `props` _(Object)_ message meta-data (required).
3. `done` _(Function)_ call done to signal message proccessing is done (required).
#### Returns
Please visit [http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue) for further info on `props` meta-data.
`Promise<Function>`
##### Returns
The function returned is the `unsubscribe()` method.
Returns a native Promise resolving to an `unsubscribe()` method.
#### Example
##### Example
```javascript

@@ -191,3 +180,3 @@ const listener = (msg, props, done) => {

// unsubscribe when ready
unsubscribe();
return unsubscribe();
})

@@ -199,3 +188,3 @@ .catch((err) => {

##### Example using async/await
#### Example using async/await

@@ -214,23 +203,29 @@ ```javascript

### <a name="publish" href="publish">#</a>publish(msg, props) -> Promise\<boolean\>
### <a name="sendToQueue" href="sendToQueue">#</a>sendToQueue(queue, message, props)
Publishes the supplied message to the AMQP server.
Sends the supplied message to the designated queue.
##### Arguments
#### Arguments
1. `content` _(*)_ message body (required); can be any JSON serializable value, e.g. Object, Array.
2. `props` _(Object)_ message props (optional).
* `props.id` _(string)_ message ID (optional; defaults to `UUID v4`)
* `props.priority` _(integer)_ message priority, must be between 1 and 10 (optional; defaults to 1)
* `props.timestamp` _(number)_ message timestamp (optional; defaults to `Date.now()`)
* `props.type` _(string)_ message type (optional)
- **queue** _(string)_ the name of the queue to send message to (required)
- **message** _(*)_ message body; can be any JSON serializable value (required)
- **props** _(Object)_ message props (optional).
- **props.id** _(string)_ message ID (optional; defaults to `UUID v4`)
- **props.priority** _(integer)_ message priority, must be between 1 and 10 (optional; defaults to 1)
- **props.timestamp** _(number)_ message timestamp (optional; defaults to `Date.now()`)
- **props.type** _(string)_ message type (optional)
##### Returns
#### Returns
Returns a native Promise resolving to a boolean value.
`Promise`
##### Example
#### Example
```javascript
bus.publish({ foo: 'bar' }, { type: 'nonsense', priority: 10 })
bus.sendToQueue('my_queue', {
foo: 'bar'
}, {
type: 'nonsense',
priority: 10
})
.catch((err) => {

@@ -241,8 +236,57 @@ console.error(err);

##### Example using async/await
#### Example using async/await
```javascript
await bus.publish({ foo: 'bar' }, { type: 'nonsense', priority: 10 });
await bus.sendToQueue({
foo: 'bar'
}, {
type: 'nonsense',
priority: 10
});
```
### <a name="publish" href="publish">#</a>publish(exchange, routingKey, message, props)
Publishes the supplied message to the designated exchange.
#### Arguments
- **exchange** _(string)_ the name of the exchange to publish message to (required)
- **routingKey** _(string)_ the routing key to publish message to (required)
- **message** _(*)_ message body; can be any JSON serializable value (required)
- **props** _(Object)_ message props (optional).
- **props.id** _(string)_ message ID (optional; defaults to `UUID v4`)
- **props.priority** _(integer)_ message priority, must be between 1 and 10 (optional; defaults to 1)
- **props.timestamp** _(number)_ message timestamp (optional; defaults to `Date.now()`)
- **props.type** _(string)_ message type (optional)
#### Returns
`Promise`
#### Example
```javascript
bus.publish('my_exchange', 'route.1', {
foo: 'bar'
}, {
type: 'nonsense',
priority: 10
})
.catch((err) => {
console.error(err);
});
```
#### Example using async/await
```javascript
await bus.publish('my_exchange', 'route.1', {
foo: 'bar'
}, {
type: 'nonsense',
priority: 10
});
```
## Contribute

@@ -249,0 +293,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc