amqp-message-bus
Advanced tools
Comparing version 1.1.1 to 2.0.0
@@ -0,1 +1,6 @@ | ||
## 2.0.0 - 2017-11-20 | ||
* Complete rewrite to separate queues from exchanges. | ||
* Compatible with node v.7+. | ||
## 1.1.1 - 2017-05-11 | ||
@@ -2,0 +7,0 @@ |
@@ -1,357 +0,292 @@ | ||
'use strict'; | ||
'use strict';Object.defineProperty(exports, "__esModule", { value: true });var _crypto = require('crypto');var _crypto2 = _interopRequireDefault(_crypto); | ||
var _bluebird = require('bluebird');var _bluebird2 = _interopRequireDefault(_bluebird); | ||
var _isPlainObject = require('lodash/isPlainObject');var _isPlainObject2 = _interopRequireDefault(_isPlainObject); | ||
var _isString = require('lodash/isString');var _isString2 = _interopRequireDefault(_isString); | ||
var _isFunction = require('lodash/isFunction');var _isFunction2 = _interopRequireDefault(_isFunction); | ||
var _isNull = require('lodash/isNull');var _isNull2 = _interopRequireDefault(_isNull); | ||
var _isUndefined = require('lodash/isUndefined');var _isUndefined2 = _interopRequireDefault(_isUndefined); | ||
var _isInteger = require('lodash/isInteger');var _isInteger2 = _interopRequireDefault(_isInteger); | ||
var _inRange = require('lodash/inRange');var _inRange2 = _interopRequireDefault(_inRange); | ||
var _omitBy = require('lodash/omitBy');var _omitBy2 = _interopRequireDefault(_omitBy); | ||
var _typeof = require('typeof');var _typeof2 = _interopRequireDefault(_typeof); | ||
var _amqplib = require('amqplib');var _amqplib2 = _interopRequireDefault(_amqplib); | ||
var _uuid = require('uuid');var _uuid2 = _interopRequireDefault(_uuid);function _interopRequireDefault(obj) {return obj && obj.__esModule ? obj : { default: obj };}function _asyncToGenerator(fn) {return function () {var gen = fn.apply(this, arguments);return new _bluebird2.default(function (resolve, reject) {function step(key, arg) {try {var info = gen[key](arg);var value = info.value;} catch (error) {reject(error);return;}if (info.done) {resolve(value);} else {return _bluebird2.default.resolve(value).then(function (value) {step("next", value);}, function (err) {step("throw", err);});}}return step("next");});};} | ||
Object.defineProperty(exports, "__esModule", { | ||
value: true | ||
}); | ||
var _bluebird = require('bluebird'); | ||
var _createClass = function () { function defineProperties(target, props) { for (var i = 0; i < props.length; i++) { var descriptor = props[i]; descriptor.enumerable = descriptor.enumerable || false; descriptor.configurable = true; if ("value" in descriptor) descriptor.writable = true; Object.defineProperty(target, descriptor.key, descriptor); } } return function (Constructor, protoProps, staticProps) { if (protoProps) defineProperties(Constructor.prototype, protoProps); if (staticProps) defineProperties(Constructor, staticProps); return Constructor; }; }(); | ||
var _crypto = require('crypto'); | ||
var _crypto2 = _interopRequireDefault(_crypto); | ||
var _isPlainObject = require('lodash/isPlainObject'); | ||
var _isPlainObject2 = _interopRequireDefault(_isPlainObject); | ||
var _isString = require('lodash/isString'); | ||
var _isString2 = _interopRequireDefault(_isString); | ||
var _isFunction = require('lodash/isFunction'); | ||
var _isFunction2 = _interopRequireDefault(_isFunction); | ||
var _isUndefined = require('lodash/isUndefined'); | ||
var _isUndefined2 = _interopRequireDefault(_isUndefined); | ||
var _isInteger = require('lodash/isInteger'); | ||
var _isInteger2 = _interopRequireDefault(_isInteger); | ||
var _inRange = require('lodash/inRange'); | ||
var _inRange2 = _interopRequireDefault(_inRange); | ||
var _omitBy = require('lodash/omitBy'); | ||
var _omitBy2 = _interopRequireDefault(_omitBy); | ||
var _typeof = require('typeof'); | ||
var _typeof2 = _interopRequireDefault(_typeof); | ||
var _amqplib = require('amqplib'); | ||
var _amqplib2 = _interopRequireDefault(_amqplib); | ||
var _uuid = require('uuid'); | ||
var _uuid2 = _interopRequireDefault(_uuid); | ||
function _interopRequireDefault(obj) { return obj && obj.__esModule ? obj : { default: obj }; } | ||
function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } } | ||
var MessageBus = function () { | ||
class MessageBus { | ||
/** | ||
* Constructs new message bus with the supplied properties. | ||
* @param {Object} spec message bus properties | ||
* @property {string} spec.url AMQP server URL | ||
* @constructor | ||
*/ | ||
function MessageBus(spec) { | ||
_classCallCheck(this, MessageBus); | ||
* Constructs new message bus with the supplied properties. | ||
* @param {Object} props message bus properties | ||
* @property {string} props.url AMQP server URL | ||
* @property {string} [props.encryptionKey] | ||
* @constructor | ||
*/ | ||
constructor(props) { | ||
if (!(0, _isPlainObject2.default)(props)) { | ||
throw new TypeError(`Invalid props; expected plain object, received ${(0, _typeof2.default)(props)}`); | ||
} | ||
if (!(0, _isPlainObject2.default)(spec)) throw new TypeError('Invalid "spec" param; expected plain object, received ' + (0, _typeof2.default)(spec)); | ||
const { | ||
url, | ||
encryptionKey = null } = | ||
props; | ||
var url = spec.url, | ||
queue = spec.queue, | ||
encryptionKey = spec.encryptionKey; | ||
if (!(0, _isString2.default)(url)) { | ||
throw new TypeError(`Invalid url property; expected string, received ${(0, _typeof2.default)(url)}`); | ||
} | ||
if (!((0, _isString2.default)(encryptionKey) || (0, _isNull2.default)(encryptionKey))) { | ||
throw new TypeError(`Invalid encryptionKey property; expected string, received ${(0, _typeof2.default)(encryptionKey)}`); | ||
} | ||
if (!(0, _isString2.default)(url)) throw new TypeError('Invalid "url" property; expected string, received ' + (0, _typeof2.default)(url)); | ||
if (!(0, _isString2.default)(queue)) throw new TypeError('Invalid "queue" property; expected string, received ' + (0, _typeof2.default)(queue)); | ||
if (!((0, _isString2.default)(encryptionKey) || (0, _isUndefined2.default)(encryptionKey))) throw new TypeError('Invalid "encryptionKey" property; expected string, received ' + (0, _typeof2.default)(encryptionKey)); | ||
this.url = url; | ||
this.queue = queue; | ||
this.encryptionKey = encryptionKey; | ||
this.consumers = new Map(); | ||
this.conn = null; | ||
this.publisherChannel = null; | ||
this.subscriberChannel = null; | ||
this.consumerTag = null; | ||
this.incomingChannel = null; | ||
this.outgoingChannel = null; | ||
} | ||
/** | ||
* Encrypts the supplied JSON object and returns a new buffer. | ||
* @param {Object} obj | ||
* @returns {Buffer} | ||
*/ | ||
_createClass(MessageBus, [{ | ||
key: 'encrypt', | ||
value: function encrypt(json) { | ||
if (this.encryptionKey === undefined) { | ||
return Buffer.from(JSON.stringify(json), 'utf8'); | ||
} | ||
var cipher = _crypto2.default.createCipher('aes128', this.encryptionKey); | ||
var buf1 = cipher.update(JSON.stringify(json), 'utf8'); | ||
var buf2 = cipher.final(); | ||
return Buffer.concat([buf1, buf2], buf1.length + buf2.length); | ||
} | ||
/** | ||
* Decrypts the supplied buffer and returns a JSON object. | ||
* @param {Buffer} buf | ||
* @returns {Object} | ||
*/ | ||
}, { | ||
key: 'decrypt', | ||
value: function decrypt(buf) { | ||
if (this.encryptionKey === undefined) { | ||
return JSON.parse(buf.toString('utf8')); | ||
} | ||
var decipher = _crypto2.default.createDecipher('aes128', this.encryptionKey); | ||
var str = [decipher.update(buf, 'utf8'), decipher.final('utf8')].join(''); | ||
return JSON.parse(str); | ||
} | ||
/** | ||
* Connects to AMQP server. | ||
* @returns {Promise} | ||
*/ | ||
connect() {var _this = this;return _asyncToGenerator(function* () { | ||
// make sure not already connected | ||
if (_this.conn) { | ||
return; // exit | ||
} | ||
}, { | ||
key: 'connect', | ||
value: function () { | ||
var _ref = (0, _bluebird.coroutine)(regeneratorRuntime.mark(function _callee() { | ||
return regeneratorRuntime.wrap(function _callee$(_context) { | ||
while (1) { | ||
switch (_context.prev = _context.next) { | ||
case 0: | ||
if (!(this.conn !== null)) { | ||
_context.next = 2; | ||
break; | ||
} | ||
// create connection | ||
_this.conn = yield _amqplib2.default.connect(_this.url); | ||
_this.conn.on('error', function (err) { | ||
console.error(err); | ||
}); | ||
_this.conn.on('close', function () { | ||
_this.reconnect(); | ||
}); | ||
return _context.abrupt('return'); | ||
// create for incoming / outgoing messages | ||
_this.incomingChannel = yield _this.conn.createChannel(); | ||
_this.outgoingChannel = yield _this.conn.createConfirmChannel(); | ||
case 2: | ||
_context.next = 4; | ||
return _amqplib2.default.connect(this.url); | ||
yield _this.incomingChannel.prefetch(1);})(); | ||
} | ||
case 4: | ||
this.conn = _context.sent; | ||
_context.next = 7; | ||
return this.conn.createChannel(); | ||
case 7: | ||
this.subscriberChannel = _context.sent; | ||
_context.next = 10; | ||
return this.conn.createConfirmChannel(); | ||
case 10: | ||
this.publisherChannel = _context.sent; | ||
_context.next = 13; | ||
return this.subscriberChannel.assertQueue(this.queue, { | ||
durable: true, | ||
maxPriority: 10 | ||
}); | ||
case 13: | ||
_context.next = 15; | ||
return this.subscriberChannel.prefetch(1); | ||
case 15: | ||
case 'end': | ||
return _context.stop(); | ||
} | ||
} | ||
}, _callee, this); | ||
})); | ||
function connect() { | ||
return _ref.apply(this, arguments); | ||
} | ||
return connect; | ||
}() | ||
/** | ||
/** | ||
* Disconnects from AMQP server. | ||
* @returns {Promise} | ||
*/ | ||
disconnect() {var _this2 = this;return _asyncToGenerator(function* () { | ||
// make sure not disconnected | ||
if (!_this2.conn) { | ||
return; // exit | ||
} | ||
}, { | ||
key: 'disconnect', | ||
value: function () { | ||
var _ref2 = (0, _bluebird.coroutine)(regeneratorRuntime.mark(function _callee2() { | ||
return regeneratorRuntime.wrap(function _callee2$(_context2) { | ||
while (1) { | ||
switch (_context2.prev = _context2.next) { | ||
case 0: | ||
if (!(this.conn === null)) { | ||
_context2.next = 2; | ||
break; | ||
} | ||
// unsubscribe any active consumer(s) | ||
yield _bluebird2.default.all( | ||
Array.from(_this2.consumers.keys()).map(function (consumerTag) { | ||
return _this2.unsubscribe(consumerTag); | ||
})); | ||
return _context2.abrupt('return'); | ||
case 2: | ||
if (!(this.consumerTag !== null)) { | ||
_context2.next = 6; | ||
break; | ||
} | ||
// close connection | ||
_this2.conn.removeAllListeners(); | ||
yield _this2.conn.close(); | ||
_context2.next = 5; | ||
return this.subscriberChannel.cancel(this.consumerTag); | ||
// reset local state | ||
_this2.conn = null; | ||
_this2.incomingChannel = null; | ||
_this2.outgoingChannel = null;})(); | ||
} | ||
case 5: | ||
this.consumerTag = null; | ||
/** | ||
* Reconnects to AMQP server. | ||
* @returns {Promise} | ||
*/ | ||
reconnect() {var _this3 = this;return _asyncToGenerator(function* () { | ||
_this3.conn.removeAllListeners(); | ||
_this3.conn = null; // you need this otherwise connect() will exit prematurily | ||
case 6: | ||
_context2.next = 8; | ||
return this.conn.close(); | ||
while (true) { | ||
yield _bluebird2.default.delay(1000); | ||
try { | ||
yield _this3.connect(); | ||
break; // exit loop | ||
} catch (err) { | ||
// do nothing | ||
} | ||
}})(); | ||
} | ||
case 8: | ||
/** | ||
* Encrypts the supplied payload and returns a new buffer. | ||
* @param {string} encryptionKey | ||
* @returns {Buffer} | ||
*/ | ||
encrypt(payload) { | ||
if (this.encryptionKey == null) { | ||
return Buffer.from(JSON.stringify(payload), 'utf8'); | ||
} | ||
this.conn = null; | ||
this.subscriberChannel = null; | ||
this.publisherChannel = null; | ||
const cipher = _crypto2.default.createCipher('aes128', this.encryptionKey); | ||
const buf1 = cipher.update(JSON.stringify(payload), 'utf8'); | ||
const buf2 = cipher.final(); | ||
return Buffer.concat([buf1, buf2], buf1.length + buf2.length); | ||
} | ||
case 11: | ||
case 'end': | ||
return _context2.stop(); | ||
} | ||
} | ||
}, _callee2, this); | ||
})); | ||
/** | ||
* Decrypts the supplied buffer and returns its payload. | ||
* @param {Buffer} buf | ||
* @param {string} encryptionKey | ||
* @returns {Object} | ||
*/ | ||
decrypt(buf) { | ||
if (this.encryptionKey == null) { | ||
return JSON.parse(buf.toString('utf8')); | ||
} | ||
function disconnect() { | ||
return _ref2.apply(this, arguments); | ||
} | ||
const decipher = _crypto2.default.createDecipher('aes128', this.encryptionKey); | ||
const str = [ | ||
decipher.update(buf, 'utf8'), | ||
decipher.final('utf8')]. | ||
join(''); | ||
return disconnect; | ||
}() | ||
return JSON.parse(str); | ||
} | ||
/** | ||
* Subscribes to the message bus for incoming messages. | ||
* @param {Function<Object, Object, Function>} listener a function with (msg, props, done) arguments | ||
/** | ||
* Subscribes to the designated queue for messages. | ||
* @param {string} queue | ||
* @param {Function<Object, Object, Function>} listener i.e. function(msg, props, done) {} | ||
* @returns {Promise<Function>} resolving to an unsubscribe method | ||
* @see {@link http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue} for further info on option properties. | ||
*/ | ||
subscribe(queue, listener) {var _this4 = this;return _asyncToGenerator(function* () { | ||
if (!(0, _isString2.default)(queue)) { | ||
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`); | ||
} | ||
if (!(0, _isFunction2.default)(listener)) { | ||
throw new TypeError(`Invalid listener; expected function, received ${(0, _typeof2.default)(listener)}`); | ||
} | ||
}, { | ||
key: 'subscribe', | ||
value: function () { | ||
var _ref3 = (0, _bluebird.coroutine)(regeneratorRuntime.mark(function _callee4(listener) { | ||
var _this = this; | ||
if (!_this4.conn) { | ||
throw new Error('Cannot subscribe to queue; did you forget to call #connect()'); | ||
} | ||
return regeneratorRuntime.wrap(function _callee4$(_context4) { | ||
while (1) { | ||
switch (_context4.prev = _context4.next) { | ||
case 0: | ||
if ((0, _isFunction2.default)(listener)) { | ||
_context4.next = 2; | ||
break; | ||
} | ||
if (_this4.consumerTag) { | ||
throw new Error('Subscription already active; cannot open multiple subscriptions to the same message bus'); | ||
} | ||
throw new TypeError('Invalid "callback" param; expected function, received ' + (0, _typeof2.default)(listener)); | ||
// create unique consumer tag | ||
const consumerTag = _uuid2.default.v4(); | ||
case 2: | ||
if (!(this.subscriberChannel === null)) { | ||
_context4.next = 4; | ||
break; | ||
} | ||
try { | ||
// subscribe to channel | ||
yield _this4.incomingChannel.consume(queue, function (msg) { | ||
listener( | ||
_this4.decrypt(msg.content), | ||
(0, _omitBy2.default)(msg.properties, _isUndefined2.default), | ||
function (err) {return _this4.incomingChannel[err ? 'nack' : 'ack'](msg);}); | ||
throw new Error('Cannot subscribe to message bus; did you forget to call #connect()'); | ||
}, { | ||
consumerTag, | ||
noAck: false // explicitely ack messages when done | ||
}); | ||
case 4: | ||
if (!this.consumerTag) { | ||
_context4.next = 6; | ||
break; | ||
} | ||
// return unsubscribe() method | ||
return _this4.unsubscribe.bind(_this4, consumerTag); | ||
} finally { | ||
// add consumerTag to consumers registry | ||
_this4.consumers.set(consumerTag, queue); | ||
}})(); | ||
} | ||
throw new Error('Subscription already active; cannot open multiple subscriptions to the same message bus'); | ||
/** | ||
* Unsubscribes the designated consumer. | ||
* @param {string} consumerTag | ||
* @returns {Promise} | ||
*/ | ||
unsubscribe(consumerTag) {var _this5 = this;return _asyncToGenerator(function* () { | ||
if (!(0, _isString2.default)(consumerTag)) { | ||
throw new TypeError(`Invalid consumerTag; expected string, received ${(0, _typeof2.default)(consumerTag)}`); | ||
} | ||
if (!_this5.consumers.has(consumerTag)) { | ||
throw new Error(`Unknown consumer tag ${consumerTag}`); | ||
} | ||
case 6: | ||
_context4.prev = 6; | ||
yield _this5.incomingChannel.cancel(consumerTag); | ||
_this5.consumers.delete(consumerTag);})(); | ||
} | ||
// create unique consumer tag | ||
this.consumerTag = _uuid2.default.v4(); | ||
/** | ||
* Publishes the supplied message to the given exchange. | ||
* @param {string} exchange | ||
* @param {*} message can be any JSON serializable value, incl. Object and Array. | ||
* @param {Object} [props] | ||
* @property {number} [props.priority=1] message priority must be between 1 and 10. | ||
* @property {string} [props.type] | ||
* @property {string} [props.messageId=uuid.v4()] | ||
* @property {number} [props.timestamp=Date.now()] | ||
* @returns {Promise<boolean>} | ||
*/ | ||
publish(exchange, routingKey, message, props = {}) {var _this6 = this;return _asyncToGenerator(function* () { | ||
if (!(0, _isString2.default)(exchange)) { | ||
throw new TypeError(`Invalid exchange; expected string, received ${(0, _typeof2.default)(exchange)}`); | ||
} | ||
if (!(0, _isString2.default)(routingKey)) { | ||
throw new TypeError(`Invalid routingKey; expected string, received ${(0, _typeof2.default)(routingKey)}`); | ||
} | ||
if ((0, _isUndefined2.default)(message)) { | ||
throw new TypeError('Invalid message; must be specified'); | ||
} | ||
if (!(0, _isPlainObject2.default)(props)) { | ||
throw new TypeError(`Invalid props; expected plain object, received ${(0, _typeof2.default)(props)}`); | ||
} | ||
// subscribe to channel | ||
_context4.next = 10; | ||
return this.subscriberChannel.consume(this.queue, function (msg) { | ||
var done = function done(err) { | ||
if (err) { | ||
_this.subscriberChannel.nack(msg); | ||
} else { | ||
_this.subscriberChannel.ack(msg); | ||
} | ||
}; | ||
const { | ||
type, | ||
priority = 1, | ||
messageId = _uuid2.default.v4(), | ||
timestamp = Date.now() } = | ||
props; | ||
var content = _this.decrypt(msg.content); | ||
var props = (0, _omitBy2.default)(msg.properties, _isUndefined2.default); | ||
if (!(0, _isInteger2.default)(priority)) { | ||
throw new TypeError(`Invalid "priority" property; expected integer, received ${(0, _typeof2.default)(priority)}`); | ||
} | ||
if (!(0, _inRange2.default)(priority, 1, 11)) { | ||
throw new TypeError('Invalid "priority" property; must be between 1 and 10'); | ||
} | ||
if (!(0, _isString2.default)(messageId)) { | ||
throw new TypeError(`Invalid "messageId" property; expected string, received ${(0, _typeof2.default)(messageId)}`); | ||
} | ||
if (!((0, _isString2.default)(type) || (0, _isUndefined2.default)(type))) { | ||
throw new TypeError(`Invalid "type" property; expected string, received ${(0, _typeof2.default)(type)}`); | ||
} | ||
if (!(0, _isInteger2.default)(timestamp)) { | ||
throw new TypeError(`Invalid "timestamp" property; expected integer, received ${(0, _typeof2.default)(timestamp)}`); | ||
} | ||
listener(content, props, done); | ||
}, { | ||
consumerTag: this.consumerTag, | ||
noAck: false // explicitely ack messages when done | ||
}); | ||
// make sure connection is open | ||
if (!_this6.conn) { | ||
throw new Error('Cannot publish to exchange; did you forget to call #connect()'); | ||
} | ||
case 10: | ||
return _context4.abrupt('return', (0, _bluebird.coroutine)(regeneratorRuntime.mark(function _callee3() { | ||
return regeneratorRuntime.wrap(function _callee3$(_context3) { | ||
while (1) { | ||
switch (_context3.prev = _context3.next) { | ||
case 0: | ||
_context3.next = 2; | ||
return _this.subscriberChannel.cancel(_this.consumerTag); | ||
return new _bluebird2.default(function (resolve) { | ||
_this6.outgoingChannel.publish( | ||
exchange, | ||
routingKey, | ||
_this6.encrypt(message), | ||
{ | ||
messageId, | ||
type, | ||
priority, | ||
timestamp }, | ||
case 2: | ||
_this.consumerTag = null; | ||
function () {return resolve();}); | ||
case 3: | ||
case 'end': | ||
return _context3.stop(); | ||
} | ||
} | ||
}, _callee3, _this); | ||
}))); | ||
});})(); | ||
} | ||
case 13: | ||
_context4.prev = 13; | ||
_context4.t0 = _context4['catch'](6); | ||
this.consumerTag = null; | ||
throw _context4.t0; | ||
case 17: | ||
case 'end': | ||
return _context4.stop(); | ||
} | ||
} | ||
}, _callee4, this, [[6, 13]]); | ||
})); | ||
function subscribe(_x) { | ||
return _ref3.apply(this, arguments); | ||
} | ||
return subscribe; | ||
}() | ||
/** | ||
* Published the supplied message to the given queue. | ||
* @param {*} content can be any JSON serializable value, incl. Object and Array. | ||
/** | ||
* Sends the supplied message to the given queue. | ||
* @param {string} queue | ||
* @param {*} message can be any JSON serializable value, incl. Object and Array. | ||
* @param {Object} [props] | ||
@@ -364,106 +299,176 @@ * @property {number} [props.priority=1] message priority must be between 1 and 10. | ||
*/ | ||
sendToQueue(queue, message, props = {}) {var _this7 = this;return _asyncToGenerator(function* () { | ||
if (!(0, _isString2.default)(queue)) { | ||
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`); | ||
} | ||
if ((0, _isUndefined2.default)(message)) { | ||
throw new TypeError('Invalid message; must be specified'); | ||
} | ||
if (!(0, _isPlainObject2.default)(props)) { | ||
throw new TypeError(`Invalid props; expected plain object, received ${(0, _typeof2.default)(props)}`); | ||
} | ||
}, { | ||
key: 'publish', | ||
value: function () { | ||
var _ref5 = (0, _bluebird.coroutine)(regeneratorRuntime.mark(function _callee5(content) { | ||
var props = arguments.length > 1 && arguments[1] !== undefined ? arguments[1] : {}; | ||
const { | ||
type, | ||
priority = 1, | ||
messageId = _uuid2.default.v4(), | ||
timestamp = Date.now() } = | ||
props; | ||
var type, _props$priority, priority, _props$messageId, messageId, _props$timestamp, timestamp; | ||
if (!(0, _isInteger2.default)(priority)) { | ||
throw new TypeError(`Invalid "priority" property; expected integer, received ${(0, _typeof2.default)(priority)}`); | ||
} | ||
if (!(0, _inRange2.default)(priority, 1, 11)) { | ||
throw new TypeError('Invalid "priority" property; must be between 1 and 10'); | ||
} | ||
if (!(0, _isString2.default)(messageId)) { | ||
throw new TypeError(`Invalid "messageId" property; expected string, received ${(0, _typeof2.default)(messageId)}`); | ||
} | ||
if (!((0, _isString2.default)(type) || (0, _isUndefined2.default)(type))) { | ||
throw new TypeError(`Invalid "type" property; expected string, received ${(0, _typeof2.default)(type)}`); | ||
} | ||
if (!(0, _isInteger2.default)(timestamp)) { | ||
throw new TypeError(`Invalid "timestamp" property; expected integer, received ${(0, _typeof2.default)(timestamp)}`); | ||
} | ||
return regeneratorRuntime.wrap(function _callee5$(_context5) { | ||
while (1) { | ||
switch (_context5.prev = _context5.next) { | ||
case 0: | ||
if (!(0, _isUndefined2.default)(content)) { | ||
_context5.next = 2; | ||
break; | ||
} | ||
// make sure connection is open | ||
if (!_this7.conn) { | ||
throw new Error('Cannot send to queue; did you forget to call #connect()'); | ||
} | ||
throw new TypeError('Invalid "content" param; must be specified'); | ||
return new _bluebird2.default(function (resolve) { | ||
_this7.outgoingChannel.sendToQueue( | ||
queue, | ||
_this7.encrypt(message), | ||
{ | ||
messageId, | ||
type, | ||
priority, | ||
timestamp }, | ||
case 2: | ||
if ((0, _isPlainObject2.default)(props)) { | ||
_context5.next = 4; | ||
break; | ||
} | ||
function () {return resolve();}); | ||
throw new TypeError('Invalid "props" param; expected plain object, received ' + (0, _typeof2.default)(props)); | ||
});})(); | ||
} | ||
case 4: | ||
type = props.type, _props$priority = props.priority, priority = _props$priority === undefined ? 1 : _props$priority, _props$messageId = props.messageId, messageId = _props$messageId === undefined ? _uuid2.default.v4() : _props$messageId, _props$timestamp = props.timestamp, timestamp = _props$timestamp === undefined ? Date.now() : _props$timestamp; | ||
assertExchange(exchange, type, options = {}) {var _this8 = this;return _asyncToGenerator(function* () { | ||
if (!(0, _isString2.default)(exchange)) { | ||
throw new TypeError(`Invalid exchange; expected string, received ${(0, _typeof2.default)(exchange)}`); | ||
} | ||
if (!(0, _isString2.default)(type)) { | ||
throw new TypeError(`Invalid type; expected string, received ${(0, _typeof2.default)(type)}`); | ||
} | ||
if (!(0, _isPlainObject2.default)(options)) { | ||
throw new TypeError(`Invalid options; expected plain object, received ${(0, _typeof2.default)(options)}`); | ||
} | ||
if ((0, _isInteger2.default)(priority)) { | ||
_context5.next = 7; | ||
break; | ||
} | ||
// make sure connection is open | ||
if (!_this8.conn) { | ||
throw new Error('Cannot assert exchange; did you forget to call #connect()'); | ||
} | ||
throw new TypeError('Invalid "priority" property; expected integer, received ' + (0, _typeof2.default)(priority)); | ||
return _this8.incomingChannel.assertExchange(exchange, type, options);})(); | ||
} | ||
case 7: | ||
if ((0, _inRange2.default)(priority, 1, 11)) { | ||
_context5.next = 9; | ||
break; | ||
} | ||
assertQueue(queue, options = {}) {var _this9 = this;return _asyncToGenerator(function* () { | ||
if (!(0, _isString2.default)(queue)) { | ||
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`); | ||
} | ||
if (!(0, _isPlainObject2.default)(options)) { | ||
throw new TypeError(`Invalid options; expected plain object, received ${(0, _typeof2.default)(options)}`); | ||
} | ||
throw new TypeError('Invalid "priority" property; must be between 1 and 10'); | ||
// make sure connection is open | ||
if (!_this9.conn) { | ||
throw new Error('Cannot assert queue; did you forget to call #connect()'); | ||
} | ||
case 9: | ||
if ((0, _isString2.default)(messageId)) { | ||
_context5.next = 11; | ||
break; | ||
} | ||
return _this9.incomingChannel.assertQueue(queue, options);})(); | ||
} | ||
throw new TypeError('Invalid "messageId" property; expected string, received ' + (0, _typeof2.default)(messageId)); | ||
deleteQueue(queue, options = {}) {var _this10 = this;return _asyncToGenerator(function* () { | ||
if (!(0, _isString2.default)(queue)) { | ||
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`); | ||
} | ||
if (!(0, _isPlainObject2.default)(options)) { | ||
throw new TypeError(`Invalid options; expected plain object, received ${(0, _typeof2.default)(options)}`); | ||
} | ||
case 11: | ||
if ((0, _isString2.default)(type) || (0, _isUndefined2.default)(type)) { | ||
_context5.next = 13; | ||
break; | ||
} | ||
// make sure connection is open | ||
if (!_this10.conn) { | ||
throw new Error('Cannot delete queue; did you forget to call #connect()'); | ||
} | ||
throw new TypeError('Invalid "type" property; expected string, received ' + (0, _typeof2.default)(type)); | ||
// unsubscribe any queue consumer | ||
yield _bluebird2.default.all( | ||
Array.from(_this10.consumers). | ||
filter(function ([key, value]) {return value === queue;}). | ||
map(function ([key]) {return _this10.unsubscribe(key);})); | ||
case 13: | ||
if ((0, _isInteger2.default)(timestamp)) { | ||
_context5.next = 15; | ||
break; | ||
} | ||
throw new TypeError('Invalid "timestamp" property; expected integer, received ' + (0, _typeof2.default)(timestamp)); | ||
return _this10.incomingChannel.deleteQueue(queue, options);})(); | ||
} | ||
case 15: | ||
if (!(this.publisherChannel === null)) { | ||
_context5.next = 17; | ||
break; | ||
} | ||
bindQueue(queue, source, pattern) {var _this11 = this;return _asyncToGenerator(function* () { | ||
if (!(0, _isString2.default)(queue)) { | ||
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`); | ||
} | ||
if (!(0, _isString2.default)(source)) { | ||
throw new TypeError(`Invalid source; expected string, received ${(0, _typeof2.default)(source)}`); | ||
} | ||
if (!(0, _isString2.default)(pattern)) { | ||
throw new TypeError(`Invalid pattern; expected string, received ${(0, _typeof2.default)(pattern)}`); | ||
} | ||
throw new Error('Cannot publish to message bus; did you forget to call #connect()'); | ||
// make sure connection is open | ||
if (!_this11.conn) { | ||
throw new Error('Cannot assert queue; did you forget to call #connect()'); | ||
} | ||
case 17: | ||
return _this11.incomingChannel.bindQueue(queue, source, pattern);})(); | ||
} | ||
this.publisherChannel.sendToQueue(this.queue, this.encrypt(content), { messageId: messageId, type: type, priority: priority, timestamp: timestamp }); | ||
unbindQueue(queue, source, pattern) {var _this12 = this;return _asyncToGenerator(function* () { | ||
if (!(0, _isString2.default)(queue)) { | ||
throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`); | ||
} | ||
if (!(0, _isString2.default)(source)) { | ||
throw new TypeError(`Invalid source; expected string, received ${(0, _typeof2.default)(source)}`); | ||
} | ||
if (!(0, _isString2.default)(pattern)) { | ||
throw new TypeError(`Invalid pattern; expected string, received ${(0, _typeof2.default)(pattern)}`); | ||
} | ||
return _context5.abrupt('return', this.publisherChannel.waitForConfirms()); | ||
// make sure connection is open | ||
if (!_this12.conn) { | ||
throw new Error('Cannot assert queue; did you forget to call #connect()'); | ||
} | ||
case 19: | ||
case 'end': | ||
return _context5.stop(); | ||
} | ||
} | ||
}, _callee5, this); | ||
})); | ||
return _this12.incomingChannel.unbindQueue(queue, source, pattern);})(); | ||
} | ||
function publish(_x2) { | ||
return _ref5.apply(this, arguments); | ||
} | ||
// /** | ||
// * Indicates whether the designated queue already exists. | ||
// * @param {string} queue | ||
// * @returns {Promise} | ||
// */ | ||
// async existsQueue(queue) { | ||
// if (!isString(queue)) { | ||
// throw new TypeError(`Invalid queue; expected string, received ${typeOf(queue)}`); | ||
// } | ||
return publish; | ||
}() | ||
}]); | ||
// // make sure connection is open | ||
// if (!this.conn) { | ||
// throw new Error('Unable to check queue existence; did you forget to call #connect()'); | ||
// } | ||
return MessageBus; | ||
}(); | ||
// try { | ||
// const response = await this.incomingChannel.checkQueue(queue); | ||
// return response.queue === queue; | ||
// } catch (err) { | ||
// return false; // TODO: Find out why this shit causes the connection to close | ||
// } | ||
// } | ||
}exports.default = | ||
exports.default = MessageBus; | ||
module.exports = exports['default']; | ||
MessageBus;module.exports = exports['default']; |
{ | ||
"name": "amqp-message-bus", | ||
"version": "1.1.1", | ||
"version": "2.0.0", | ||
"description": "Node.js message bus interface for AMQP servers, such as RabbitMQ.", | ||
@@ -23,3 +23,3 @@ "keywords": [ | ||
"engines": { | ||
"node": ">=6.0.0" | ||
"node": ">=7.6" | ||
}, | ||
@@ -36,24 +36,23 @@ "main": "dist/MessageBus.js", | ||
"dependencies": { | ||
"amqplib": "^0.5.1", | ||
"bluebird": "^3.5.0", | ||
"amqplib": "^0.5.2", | ||
"bluebird": "^3.5.1", | ||
"dotenv": "^4.0.0", | ||
"lodash": "^4.17.2", | ||
"typeof": "^1.0.0", | ||
"uuid": "^3.0.1" | ||
"uuid": "^3.1.0" | ||
}, | ||
"devDependencies": { | ||
"babel-cli": "^6.24.1", | ||
"babel-jest": "^20.0.0", | ||
"babel-cli": "^6.26.0", | ||
"babel-jest": "^21.2.0", | ||
"babel-plugin-add-module-exports": "^0.2.1", | ||
"babel-plugin-transform-async-to-module-method": "^6.24.1", | ||
"babel-plugin-transform-es2015-modules-commonjs": "^6.24.1", | ||
"babel-preset-es2015": "^6.24.1", | ||
"eslint": "^3.19.0", | ||
"eslint-config-airbnb": "^14.1.0", | ||
"eslint-plugin-import": "^2.2.0", | ||
"eslint-plugin-jsx-a11y": "^3.0.2", | ||
"eslint-plugin-react": "^6.9.0", | ||
"jest-cli": "^20.0.0", | ||
"rimraf": "^2.6.1" | ||
"babel-plugin-transform-es2015-modules-commonjs": "^6.26.0", | ||
"babel-preset-es2017": "^6.24.1", | ||
"eslint": "^4.11.0", | ||
"eslint-config-airbnb": "^16.1.0", | ||
"eslint-plugin-import": "^2.8.0", | ||
"eslint-plugin-jsx-a11y": "^6.0.2", | ||
"eslint-plugin-react": "^7.5.1", | ||
"jest-cli": "^21.2.1", | ||
"rimraf": "^2.6.2" | ||
} | ||
} |
208
README.md
@@ -5,9 +5,9 @@ # AMQP Message Bus | ||
[![Build Status](https://travis-ci.org/controlly/ampq-message-bus.svg?branch=master)](https://travis-ci.org/controlly/ampq-message-bus) [![npm version](https://badge.fury.io/js/ampq-message-bus.svg)](https://badge.fury.io/js/ampq-message-bus) | ||
[![Build Status](https://travis-ci.org/yeepio/amqp-message-bus.svg?branch=master)](https://travis-ci.org/yeepio/amqp-message-bus) [![npm version](https://badge.fury.io/js/amqp-message-bus.svg)](https://badge.fury.io/js/amqp-message-bus) | ||
#### Features | ||
* Message bus API hides the complexity of AMQP connectors; | ||
* Supports symmetric message encryption; | ||
* Works pefectly fine with async/await. | ||
* Hides the complexity of AMQP client; | ||
* Comes with build-in symmetric message encryption; | ||
* Supports promises + async/await. | ||
@@ -22,12 +22,6 @@ ## Installation | ||
* Node.js v.6+ | ||
* Node.js v.7+ | ||
## Quick start | ||
Install `amqp-message-bus` from npm. | ||
``` | ||
$ npm install amqp-message-bus --save | ||
``` | ||
Create new message bus. | ||
@@ -39,3 +33,2 @@ | ||
const bus = new MessageBus({ | ||
queue: 'tasks', | ||
url: 'amqp://localhost', | ||
@@ -46,24 +39,8 @@ encryptionKey: 'keep-it-safe' | ||
Connect to AMQP server and subscribe for messages. | ||
Connect to AMQP server and subscribe to queue for messages. | ||
```javascript | ||
bus.connect() | ||
.then(() => bus.subscribe((msg, props, done) => { | ||
// process msg + props | ||
console.log(`Received message ${props.messageId} with priority ${props.priority}, published on ${props.timestamp}`); | ||
// call done when message is done processing to remove from rabbitmq | ||
done(); | ||
})) | ||
// call this when you want to unsubscribe... | ||
.then((unsubscribe) => unsubscribe()) | ||
// always catch errors with promises :-) | ||
.catch((err) => console.error(err)); | ||
``` | ||
The same looks much better using async/await. | ||
```javascript | ||
await bus.connect(); | ||
const unsubscribe = await bus.subscribe((msg, props, done) => { | ||
const unsubscribe = await bus.subscribe('my_queue', (msg, props, done) => { | ||
// process msg + props | ||
@@ -75,15 +52,29 @@ console.log(`Received message ${props.messageId} with priority ${props.priority}, published on ${props.timestamp}`); | ||
// call this when you want to unsubscribe... | ||
// unsubscribe from queue | ||
await unsubscribe(); | ||
// disconnect from bus | ||
await bus.disconnect(); | ||
``` | ||
Connect to AMQP server, publish message and immediately disconnect. | ||
Connect to AMQP server, create queue (if not exists) and send message. | ||
```javascript | ||
bus.connect() | ||
.then(() => bus.publish({ foo: 1, bar: 2 })) | ||
.catch((err) => console.error(err)) | ||
.finally(() => bus.disconnect); | ||
await bus.connect(); | ||
await bus.assertQueue('my_queue'); | ||
await bus.sendToQueue('my_queue', { foo: 1, bar: 2 }); | ||
await bus.disconnect(); | ||
``` | ||
Connect to AMQP server, create topic exchange and publish message to route. | ||
```javascript | ||
await bus.connect(); | ||
await bus.assertExchange('my_exchange', 'topic'); // note the 2nd argument (i.e. "topic") used to create a topic exchange | ||
await bus.assertQueue('my_queue'); | ||
await bus.bindQueue('my_queue', 'my_exchange', 'route.1'); // note the 3rd argument (i.e. route.1) | ||
await bus.publish('my_exchange', 'route.1', { foo: 1, bar: 2 }); // note the 3rd argument (i.e. route.1) | ||
await bus.disconnect(); | ||
``` | ||
## API Docs | ||
@@ -95,14 +86,12 @@ | ||
##### Arguments | ||
#### Arguments | ||
1. `spec` _(Object)_ message bus properties (required). | ||
* `spec.url` _(string)_ AMQP server URL (required). | ||
* `spec.queue` _(string)_ the name of the queue to subscribe to (required). | ||
* `spec.encryptionKey` _(string)_ encryption key to use with assymetric encryption (optional). Signifies no encryption if left unspecified. | ||
- **props** _(Object)_ message bus properties (required). | ||
- **props.url** _(string)_ AMQP server URL (required). | ||
- **props.encryptionKey** _(string)_ encryption key to use with symmetric encryption (optional). | ||
##### Example | ||
#### Example | ||
```javascript | ||
const bus = new MessageBus({ | ||
queue: 'tasks', | ||
url: 'amqp://localhost', | ||
@@ -115,9 +104,9 @@ encryptionKey: 'keep-it-safe' | ||
Connects to AMQP server using the connection properties specified at construction time. | ||
Connects to AMQP server. | ||
##### Returns | ||
#### Returns | ||
Returns a native Promise. | ||
`Promise` | ||
##### Example | ||
#### Example | ||
@@ -127,3 +116,3 @@ ```javascript | ||
.then(() => { | ||
console.log('Connected to amqp server'); | ||
console.log('Connected to rabbitmq'); | ||
}) | ||
@@ -139,7 +128,7 @@ .catch((err) => { | ||
##### Returns | ||
#### Returns | ||
Returns a native Promise. | ||
`Promise` | ||
##### Example | ||
#### Example | ||
@@ -149,3 +138,3 @@ ```javascript | ||
.then(() => { | ||
console.log('Disconnected from amqp server'); | ||
console.log('Disconnected from rabbitmq'); | ||
}) | ||
@@ -157,24 +146,24 @@ .catch((err) => { | ||
### <a name="subscribe" href="subscribe">#</a>subscribe(listener) -> Promise\<Function\> | ||
### <a name="subscribe" href="subscribe">#</a>subscribe(queue, listener) | ||
Subscribes to the message bus for incoming messages. | ||
Subscribes to the designated queue for incoming messages. | ||
##### Arguments | ||
#### Arguments | ||
1. `listener` _(Function\<Object, Object, Function\>)_ listener function (required). | ||
- **queue** _(string)_ the name of the queue to subscribe to | ||
- **listener** _(Function)_ listener function, i.e. `function(msg, props, done)` (required). | ||
- **msg** _(Object)_ message body (required). | ||
- **props** _(Object)_ message meta-data (required). | ||
- **done** _(Function)_ call done to signal message proccessing is done (required). | ||
###### Listener function arguments | ||
Please visit [http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue) for further info on `props` meta-data. | ||
1. `msg` _(Object)_ message body (required). | ||
2. `props` _(Object)_ message meta-data (required). | ||
3. `done` _(Function)_ call done to signal message proccessing is done (required). | ||
#### Returns | ||
Please visit [http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue](http://www.squaremobius.net/amqp.node/channel_api.html#channel_assertQueue) for further info on `props` meta-data. | ||
`Promise<Function>` | ||
##### Returns | ||
The function returned is the `unsubscribe()` method. | ||
Returns a native Promise resolving to an `unsubscribe()` method. | ||
#### Example | ||
##### Example | ||
```javascript | ||
@@ -191,3 +180,3 @@ const listener = (msg, props, done) => { | ||
// unsubscribe when ready | ||
unsubscribe(); | ||
return unsubscribe(); | ||
}) | ||
@@ -199,3 +188,3 @@ .catch((err) => { | ||
##### Example using async/await | ||
#### Example using async/await | ||
@@ -214,23 +203,29 @@ ```javascript | ||
### <a name="publish" href="publish">#</a>publish(msg, props) -> Promise\<boolean\> | ||
### <a name="sendToQueue" href="sendToQueue">#</a>sendToQueue(queue, message, props) | ||
Publishes the supplied message to the AMQP server. | ||
Sends the supplied message to the designated queue. | ||
##### Arguments | ||
#### Arguments | ||
1. `content` _(*)_ message body (required); can be any JSON serializable value, e.g. Object, Array. | ||
2. `props` _(Object)_ message props (optional). | ||
* `props.id` _(string)_ message ID (optional; defaults to `UUID v4`) | ||
* `props.priority` _(integer)_ message priority, must be between 1 and 10 (optional; defaults to 1) | ||
* `props.timestamp` _(number)_ message timestamp (optional; defaults to `Date.now()`) | ||
* `props.type` _(string)_ message type (optional) | ||
- **queue** _(string)_ the name of the queue to send message to (required) | ||
- **message** _(*)_ message body; can be any JSON serializable value (required) | ||
- **props** _(Object)_ message props (optional). | ||
- **props.id** _(string)_ message ID (optional; defaults to `UUID v4`) | ||
- **props.priority** _(integer)_ message priority, must be between 1 and 10 (optional; defaults to 1) | ||
- **props.timestamp** _(number)_ message timestamp (optional; defaults to `Date.now()`) | ||
- **props.type** _(string)_ message type (optional) | ||
##### Returns | ||
#### Returns | ||
Returns a native Promise resolving to a boolean value. | ||
`Promise` | ||
##### Example | ||
#### Example | ||
```javascript | ||
bus.publish({ foo: 'bar' }, { type: 'nonsense', priority: 10 }) | ||
bus.sendToQueue('my_queue', { | ||
foo: 'bar' | ||
}, { | ||
type: 'nonsense', | ||
priority: 10 | ||
}) | ||
.catch((err) => { | ||
@@ -241,8 +236,57 @@ console.error(err); | ||
##### Example using async/await | ||
#### Example using async/await | ||
```javascript | ||
await bus.publish({ foo: 'bar' }, { type: 'nonsense', priority: 10 }); | ||
await bus.sendToQueue({ | ||
foo: 'bar' | ||
}, { | ||
type: 'nonsense', | ||
priority: 10 | ||
}); | ||
``` | ||
### <a name="publish" href="publish">#</a>publish(exchange, routingKey, message, props) | ||
Publishes the supplied message to the designated exchange. | ||
#### Arguments | ||
- **exchange** _(string)_ the name of the exchange to publish message to (required) | ||
- **routingKey** _(string)_ the routing key to publish message to (required) | ||
- **message** _(*)_ message body; can be any JSON serializable value (required) | ||
- **props** _(Object)_ message props (optional). | ||
- **props.id** _(string)_ message ID (optional; defaults to `UUID v4`) | ||
- **props.priority** _(integer)_ message priority, must be between 1 and 10 (optional; defaults to 1) | ||
- **props.timestamp** _(number)_ message timestamp (optional; defaults to `Date.now()`) | ||
- **props.type** _(string)_ message type (optional) | ||
#### Returns | ||
`Promise` | ||
#### Example | ||
```javascript | ||
bus.publish('my_exchange', 'route.1', { | ||
foo: 'bar' | ||
}, { | ||
type: 'nonsense', | ||
priority: 10 | ||
}) | ||
.catch((err) => { | ||
console.error(err); | ||
}); | ||
``` | ||
#### Example using async/await | ||
```javascript | ||
await bus.publish('my_exchange', 'route.1', { | ||
foo: 'bar' | ||
}, { | ||
type: 'nonsense', | ||
priority: 10 | ||
}); | ||
``` | ||
## Contribute | ||
@@ -249,0 +293,0 @@ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
28621
12
410
287
5
1
Updatedamqplib@^0.5.2
Updatedbluebird@^3.5.1
Updateduuid@^3.1.0