Socket
Socket
Sign inDemoInstall

amqp-message-bus

Package Overview
Dependencies
Maintainers
1
Versions
3
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-message-bus - npm Package Compare versions

Comparing version 2.0.0 to 2.1.0

6

CHANGELOG.md

@@ -0,1 +1,7 @@

## 2.1.0 - 2017-11-21
* Expose `unsubscribeAll()` and `deleteExchange()` methods.
* Minor change in error messages copy.
* Document `unsubscribe()` and `unsubscribeAll()` in README.md.
## 2.0.0 - 2017-11-20

@@ -2,0 +8,0 @@

100

dist/MessageBus.js

@@ -85,8 +85,4 @@ 'use strict';Object.defineProperty(exports, "__esModule", { value: true });var _crypto = require('crypto');var _crypto2 = _interopRequireDefault(_crypto);

// unsubscribe any active consumer(s)
yield _bluebird2.default.all(
Array.from(_this2.consumers.keys()).map(function (consumerTag) {
return _this2.unsubscribe(consumerTag);
}));
yield _this2.unsubscribeAll();
// close connection

@@ -172,3 +168,3 @@ _this2.conn.removeAllListeners();

if (!_this4.conn) {
throw new Error('Cannot subscribe to queue; did you forget to call #connect()');
throw new Error('Unable to subscribe to queue; did you forget to call #connect()');
}

@@ -222,2 +218,14 @@

/**
* Unsubscribes any active consumer.
* @returns {Promise}
*/
unsubscribeAll() {var _this6 = this;return _asyncToGenerator(function* () {
return yield _bluebird2.default.all(
Array.from(_this6.consumers.keys()).map(function (consumerTag) {
return _this6.unsubscribe(consumerTag);
}));})();
}
/**
* Publishes the supplied message to the given exchange.

@@ -233,3 +241,3 @@ * @param {string} exchange

*/
publish(exchange, routingKey, message, props = {}) {var _this6 = this;return _asyncToGenerator(function* () {
publish(exchange, routingKey, message, props = {}) {var _this7 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(exchange)) {

@@ -272,11 +280,11 @@ throw new TypeError(`Invalid exchange; expected string, received ${(0, _typeof2.default)(exchange)}`);

// make sure connection is open
if (!_this6.conn) {
throw new Error('Cannot publish to exchange; did you forget to call #connect()');
if (!_this7.conn) {
throw new Error('Unable to publish to exchange; did you forget to call #connect()');
}
return new _bluebird2.default(function (resolve) {
_this6.outgoingChannel.publish(
_this7.outgoingChannel.publish(
exchange,
routingKey,
_this6.encrypt(message),
_this7.encrypt(message),
{

@@ -304,3 +312,3 @@ messageId,

*/
sendToQueue(queue, message, props = {}) {var _this7 = this;return _asyncToGenerator(function* () {
sendToQueue(queue, message, props = {}) {var _this8 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {

@@ -340,10 +348,10 @@ throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);

// make sure connection is open
if (!_this7.conn) {
throw new Error('Cannot send to queue; did you forget to call #connect()');
if (!_this8.conn) {
throw new Error('Unable to send to queue; did you forget to call #connect()');
}
return new _bluebird2.default(function (resolve) {
_this7.outgoingChannel.sendToQueue(
_this8.outgoingChannel.sendToQueue(
queue,
_this7.encrypt(message),
_this8.encrypt(message),
{

@@ -360,3 +368,3 @@ messageId,

assertExchange(exchange, type, options = {}) {var _this8 = this;return _asyncToGenerator(function* () {
assertExchange(exchange, type, options = {}) {var _this9 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(exchange)) {

@@ -373,10 +381,26 @@ throw new TypeError(`Invalid exchange; expected string, received ${(0, _typeof2.default)(exchange)}`);

// make sure connection is open
if (!_this8.conn) {
throw new Error('Cannot assert exchange; did you forget to call #connect()');
if (!_this9.conn) {
throw new Error('Unable to assert exchange; did you forget to call #connect()');
}
return _this8.incomingChannel.assertExchange(exchange, type, options);})();
return _this9.incomingChannel.assertExchange(exchange, type, options);})();
}
assertQueue(queue, options = {}) {var _this9 = this;return _asyncToGenerator(function* () {
deleteExchange(exchange, options = {}) {var _this10 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(exchange)) {
throw new TypeError(`Invalid exchange; expected string, received ${(0, _typeof2.default)(exchange)}`);
}
if (!(0, _isPlainObject2.default)(options)) {
throw new TypeError(`Invalid options; expected plain object, received ${(0, _typeof2.default)(options)}`);
}
// make sure connection is open
if (!_this10.conn) {
throw new Error('Unable to delete exchange; did you forget to call #connect()');
}
return _this10.incomingChannel.deleteExchange(exchange, options);})();
}
assertQueue(queue, options = {}) {var _this11 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {

@@ -390,10 +414,10 @@ throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);

// make sure connection is open
if (!_this9.conn) {
throw new Error('Cannot assert queue; did you forget to call #connect()');
if (!_this11.conn) {
throw new Error('Unable to assert queue; did you forget to call #connect()');
}
return _this9.incomingChannel.assertQueue(queue, options);})();
return _this11.incomingChannel.assertQueue(queue, options);})();
}
deleteQueue(queue, options = {}) {var _this10 = this;return _asyncToGenerator(function* () {
deleteQueue(queue, options = {}) {var _this12 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {

@@ -407,4 +431,4 @@ throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);

// make sure connection is open
if (!_this10.conn) {
throw new Error('Cannot delete queue; did you forget to call #connect()');
if (!_this12.conn) {
throw new Error('Unable to delete queue; did you forget to call #connect()');
}

@@ -414,11 +438,11 @@

yield _bluebird2.default.all(
Array.from(_this10.consumers).
Array.from(_this12.consumers).
filter(function ([key, value]) {return value === queue;}).
map(function ([key]) {return _this10.unsubscribe(key);}));
map(function ([key]) {return _this12.unsubscribe(key);}));
return _this10.incomingChannel.deleteQueue(queue, options);})();
return _this12.incomingChannel.deleteQueue(queue, options);})();
}
bindQueue(queue, source, pattern) {var _this11 = this;return _asyncToGenerator(function* () {
bindQueue(queue, source, pattern) {var _this13 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {

@@ -435,10 +459,10 @@ throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);

// make sure connection is open
if (!_this11.conn) {
throw new Error('Cannot assert queue; did you forget to call #connect()');
if (!_this13.conn) {
throw new Error('Unable to assert queue; did you forget to call #connect()');
}
return _this11.incomingChannel.bindQueue(queue, source, pattern);})();
return _this13.incomingChannel.bindQueue(queue, source, pattern);})();
}
unbindQueue(queue, source, pattern) {var _this12 = this;return _asyncToGenerator(function* () {
unbindQueue(queue, source, pattern) {var _this14 = this;return _asyncToGenerator(function* () {
if (!(0, _isString2.default)(queue)) {

@@ -455,7 +479,7 @@ throw new TypeError(`Invalid queue; expected string, received ${(0, _typeof2.default)(queue)}`);

// make sure connection is open
if (!_this12.conn) {
throw new Error('Cannot assert queue; did you forget to call #connect()');
if (!_this14.conn) {
throw new Error('Unable to assert queue; did you forget to call #connect()');
}
return _this12.incomingChannel.unbindQueue(queue, source, pattern);})();
return _this14.incomingChannel.unbindQueue(queue, source, pattern);})();
}

@@ -462,0 +486,0 @@

{
"name": "amqp-message-bus",
"version": "2.0.0",
"version": "2.1.0",
"description": "Node.js message bus interface for AMQP servers, such as RabbitMQ.",

@@ -5,0 +5,0 @@ "keywords": [

@@ -277,2 +277,40 @@ # AMQP Message Bus

### <a name="unsubscribe" href="unsubscribe">#</a>unsubscribe(consumerTag)
Unsubscribes the designated consumer.
#### Arguments
- **consumerTag** _(string)_ the ID of the consumer to unsubscribe from queue (required)
#### Returns
`Promise`
#### Example
```javascript
bus.unsubscribe('consumer-123')
.catch((err) => {
console.error(err);
});
```
### <a name="unsubscribeAll" href="unsubscribeAll">#</a>unsubscribeAll()
Unsubscribes all message bus consumers.
#### Returns
`Promise`
#### Example
```javascript
bus.unsubscribeAll()
.catch((err) => {
console.error(err);
});
```
## Contribute

@@ -279,0 +317,0 @@

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc