New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

amqp-as-promised

Package Overview
Dependencies
Maintainers
2
Versions
75
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-as-promised - npm Package Compare versions

Comparing version 4.2.0 to 5.0.0

115

lib/amqp-client.js

@@ -22,7 +22,5 @@ // Generated by CoffeeScript 1.12.5

function AmqpClient(conf, compat) {
var opts, reconnect, ref, uri;
this.conf = conf;
this.compat = compat != null ? compat : require('./compat-node-amqp');
this.shutdown = bind(this.shutdown, this);
this._shutdown = bind(this._shutdown, this);
this.unbind = bind(this.unbind, this);

@@ -36,32 +34,21 @@ this._unbind = bind(this._unbind, this);

this._exchange = bind(this._exchange, this);
this.getChannel = bind(this.getChannel, this);
this.connect = bind(this.connect, this);
this.exchanges = {};
this.queues = {};
}
AmqpClient.prototype.connect = function() {
var opts, ref, uri;
ref = this.compat.connection(this.conf), uri = ref[0], opts = ref[1];
log.info("connecting to:", uri);
reconnect = (function(_this) {
return function() {
return amqp.connect(uri, opts).then(function(conn) {
conn.on('error', function(err) {
var ref1;
if (_this._shuttingDown) {
return;
}
log.warn('amqp error:', (err.message ? err.message : err));
if (typeof ((ref1 = _this.conf) != null ? ref1.errorHandler : void 0) === 'function') {
return _this.conf.errorHandler(err);
} else {
throw err;
}
});
return conn.createChannel().then(function(c) {
c.setMaxListeners(_this.conf.maxListeners || 1000);
return c;
});
})["catch"](function(err) {
var time;
if (_this.conf.waitForConnection) {
log.info("waiting for connection to:", uri);
time = _this.conf.waitForConnection;
if (typeof time !== 'number') {
time = 1000;
}
return wait(time).then(reconnect);
return amqp.connect(uri, opts).then((function(_this) {
return function(conn) {
_this.conn = conn;
log.info("connected");
_this.conn.on('error', function(err) {
var ref1;
log.warn('amqp error:', (err.message ? err.message : err));
if (typeof ((ref1 = _this.conf) != null ? ref1.errorHandler : void 0) === 'function') {
return _this.conf.errorHandler(err);
} else {

@@ -71,12 +58,37 @@ throw err;

});
return Promise.resolve("ok");
};
})(this);
this.channel = reconnect();
this.exchanges = {};
this.queues = {};
}
})(this))["catch"]((function(_this) {
return function(err) {
var time;
if (_this.conf.waitForConnection) {
time = _this.conf.waitForConnection;
if (typeof time !== 'number') {
time = 1000;
}
log.info("waiting for connection to:", uri);
return wait(time).then(function() {
return _this.connect(uri, opts);
});
} else {
return Promise.reject(err);
}
};
})(this));
};
AmqpClient.prototype.getChannel = function() {
return this.conn.createChannel().then((function(_this) {
return function(c) {
c.setMaxListeners(_this.conf.maxListeners || 1000);
return Promise.resolve(c);
};
})(this))["catch"](function(err) {
return Promise.reject(err);
});
};
AmqpClient.prototype._exchange = function(name, type, opts) {
if (name instanceof ExchangeWrapper) {
return Promise.resolve(name);
return name;
}

@@ -86,3 +98,3 @@ if (this.exchanges[name]) {

}
return this.exchanges[name] = this.channel.then((function(_this) {
return this.getChannel().then((function(_this) {
return function(c) {

@@ -97,6 +109,9 @@ return (!type ? name === '' ? Promise.resolve({

log.info('exchange ready:', e.exchange);
return new ExchangeWrapper(_this, e);
return _this.exchanges[name] = new ExchangeWrapper(_this, e, c);
});
};
})(this));
})(this))["catch"](function(err) {
log.error("_echange error", err);
return Promise.reject(err);
});
};

@@ -127,10 +142,13 @@

}
return this.queues[qname] = this.channel.then((function(_this) {
return this.getChannel().then((function(_this) {
return function(c) {
return (!opts ? c.checkQueue(qname) : c.assertQueue(qname, opts)).then(function(q) {
log.info('queue created:', q.queue);
return new QueueWrapper(_this, q);
return _this.queues[qname] = new QueueWrapper(_this, q, c);
});
};
})(this));
})(this))["catch"](function(err) {
log.error("_queue error", err);
return Promise.reject(err);
});
};

@@ -160,2 +178,5 @@

});
})["catch"](function(err) {
log.error("_bind error", err);
return Promise.reject(err);
});

@@ -182,11 +203,5 @@ };

AmqpClient.prototype._shutdown = function() {
return this.channel.then(function(c) {
return c.close();
});
};
AmqpClient.prototype.shutdown = function() {
this._shuttingDown = true;
return this.compat.promise(this._shutdown());
var ref;
return this.compat.promise((ref = this.conn) != null ? ref.close() : void 0);
};

@@ -193,0 +208,0 @@

@@ -72,3 +72,3 @@ // Generated by CoffeeScript 1.12.5

},
callback: function(client, cb) {
callback: function(channel, cb) {
return function(data) {

@@ -104,5 +104,3 @@ var ack, content, headers, info, ref, ref1, ref10, ref11, ref12, ref13, ref14, ref15, ref16, ref17, ref18, ref19, ref2, ref20, ref3, ref4, ref5, ref6, ref7, ref8, ref9;

acknowledge: function() {
return client.channel.then(function(c) {
return c.ack(data);
});
return channel.ack(data);
}

@@ -109,0 +107,0 @@ };

@@ -7,5 +7,6 @@ // Generated by CoffeeScript 1.12.5

module.exports = ExchangeWrapper = (function() {
function ExchangeWrapper(client, exchange) {
function ExchangeWrapper(client, exchange, channel) {
this.client = client;
this.exchange = exchange;
this.channel = channel;
this.publish = bind(this.publish, this);

@@ -17,15 +18,9 @@ this._publish = bind(this._publish, this);

ExchangeWrapper.prototype._publish = function(routingKey, message, options) {
return this.client.channel.then((function(_this) {
return function(c) {
return new Promise(function(rs, rj) {
if (c.publish(_this.name, routingKey, message, options)) {
return rs();
} else {
return c.once('drain', function() {
return rs();
});
}
});
};
})(this));
if (this.channel.publish(this.name, routingKey, message, options)) {
return {};
} else {
return this.channel.once('drain', function() {
return {};
});
}
};

@@ -32,0 +27,0 @@

@@ -14,26 +14,35 @@ // Generated by CoffeeScript 1.12.5

module.exports = function(conf) {
var client, rpc, rpcBackend;
if (conf == null) {
conf = {};
}
if (conf.logLevel) {
log.level(conf.logLevel);
}
if (!conf.connection) {
conf = {
connection: conf
};
}
client = new AmqpClient(conf);
rpc = new Rpc(client, conf.rpc);
rpcBackend = new RpcBackend(client);
return {
exchange: client.exchange,
queue: client.queue,
bind: client.bind,
rpc: rpc.rpc,
serve: rpcBackend.serve,
shutdown: client.shutdown,
local: client.local
};
return new Promise(function(resolve, reject) {
var client;
if (conf.logLevel) {
log.level(conf.logLevel);
}
if (!conf.connection) {
conf = {
connection: conf
};
}
client = new AmqpClient(conf);
return client.connect().then((function(_this) {
return function() {
var rpc, rpcBackend;
rpc = new Rpc(client, conf.rpc);
rpcBackend = new RpcBackend(client);
return resolve({
exchange: client.exchange,
queue: client.queue,
bind: client.bind,
rpc: rpc.rpc,
serve: rpcBackend.serve,
shutdown: client.shutdown,
local: client.local
});
};
})(this))["catch"](function(err) {
return reject(err);
});
});
};

@@ -40,0 +49,0 @@

@@ -9,5 +9,6 @@ // Generated by CoffeeScript 1.12.5

module.exports = QueueWrapper = (function() {
function QueueWrapper(client, queue) {
function QueueWrapper(client, queue, channel) {
this.client = client;
this.queue = queue;
this.channel = channel;
this.isAutoDelete = bind(this.isAutoDelete, this);

@@ -42,5 +43,3 @@ this.isDurable = bind(this.isDurable, this);

log.info('binding:', exchange.name, _this.name, topic);
return _this.client.channel.then(function(c) {
return c.bindQueue(_this.name, exchange.name, topic);
}).then(function() {
return _this.channel.bindQueue(_this.name, exchange.name, topic).then(function() {
_this._exchange = exchange.name;

@@ -52,3 +51,6 @@ _this._topic = topic;

};
})(this));
})(this))["catch"](function(err) {
log.error("qw _bind error", err);
return Promise.reject(err);
});
};

@@ -64,12 +66,13 @@

}
return this.client.channel.then((function(_this) {
return function(c) {
return c.unbindQueue(_this.name, _this._exchange, _this._topic).then(function() {
log.info('queue unbound:', _this.name, _this._topic);
delete _this._exchange;
delete _this._topic;
return _this;
});
return this.channel.unbindQueue(this.name, this._exchange, this._topic).then((function(_this) {
return function() {
log.info('queue unbound:', _this.name, _this._topic);
delete _this._exchange;
delete _this._topic;
return _this;
};
})(this));
})(this))["catch"](function(err) {
log.error("qw _unbind error", err);
return Promise.reject(err);
});
};

@@ -88,13 +91,14 @@

return function() {
return _this.client.channel.then(function(c) {
opts = _this.client.compat.subscribeOpts(opts);
return Promise.all([c.prefetch(opts.prefetch), c.consume(_this.queue.queue, _this.client.compat.callback(_this.client, callback), opts)]).then(function(arg) {
var consumerTag, ref, whatevs;
whatevs = arg[0], (ref = arg[1], consumerTag = ref.consumerTag);
_this._consumerTag = consumerTag;
return _this;
});
opts = _this.client.compat.subscribeOpts(opts);
return Promise.all([_this.channel.prefetch(opts.prefetch), _this.channel.consume(_this.queue.queue, _this.client.compat.callback(_this.channel, callback), opts)]).then(function(arg) {
var consumerTag, ref, whatevs;
whatevs = arg[0], (ref = arg[1], consumerTag = ref.consumerTag);
_this._consumerTag = consumerTag;
return _this;
});
};
})(this));
})(this))["catch"](function(err) {
log.error("qw _subscribe error", err);
return Promise.reject(err);
});
};

@@ -108,9 +112,5 @@

if (!this._consumerTag) {
return Promise.resolve(this);
return this;
}
return this.client.channel.then((function(_this) {
return function(c) {
return c.cancel(_this._consumerTag);
};
})(this)).then((function(_this) {
return this.channel.cancel(this._consumerTag).then((function(_this) {
return function() {

@@ -121,3 +121,5 @@ log.info('unsubscribed:', _this.name, _this._consumerTag);

};
})(this));
})(this))["catch"](function(err) {
return log.error("qw _unsub error", err);
});
};

@@ -124,0 +126,0 @@

@@ -116,5 +116,5 @@ // Generated by CoffeeScript 1.12.5

return p.then(function(payload) {
return ex.publish(routingKey, payload, opts);
}).then(function() {
return def.promise;
return ex.publish(routingKey, payload, opts).then(function() {
return def.promise;
});
});

@@ -121,0 +121,0 @@ };

{
"name": "amqp-as-promised",
"description": "A promise-based AMQP API build on node-amqp",
"version": "4.2.0",
"version": "5.0.0",
"repository": {

@@ -39,3 +39,3 @@ "type": "git",

"coffeelint": "^2.0.7",
"mocha": "^3.1.2",
"mocha": "^5.2.0",
"mocha-jenkins-reporter": "^0.3.10",

@@ -42,0 +42,0 @@ "proxyquire": "^1.7.3",

@@ -10,5 +10,13 @@ AMQP as Promised

A high-level [promise-based](https://github.com/kriskowal/q) API built on
[node-amqp](https://github.com/postwait/node-amqp), extended with
functions for AMQP-based RPC.
[`amqplib`](https://www.npmjs.com/package/amqplib)
extended with functions for AMQP-based RPC.
* [`amqplib` API docs][amqplib-api-docs]
* Old versions of this package were based on [node-amqp][npm-node-amqp].
[amqplib-api-docs]: (http://www.squaremobius.net/amqp.node/channel_api.html)
[npm-node-amqp]: https://github.com/postwait/node-amqp
## Table of contents
* [Configuration](#configuration)

@@ -23,5 +31,11 @@ * [Examples](#examples)

## A note on version 3.0
## Version Notes
As of version 3.0, the underlying amqp library has changed from
#### 5.0
Syntax to access the library has been changed in 5.0 to improve connection management. See the [Running](#running)-section for instructions.
#### 3.0
The underlying amqp library was changed from
`node-amqp` to `amqplib`. Efforts have been made to keep everything as

@@ -42,4 +56,13 @@ backwards compatible as possible, but some things have changed:

5.0+
```coffee
conf = require './myconf.json' # see example conf below
((require 'amqp-as-promised') conf.amqp).then (amqpc) ->
```
Earlier versions
```coffee
conf = require './myconf.json' # see example conf below
amqpc = (require 'amqp-as-promised') conf.amqp
```

@@ -266,3 +289,3 @@ ## Configuration

1. If `exchange` is a string, then look up the existing exchange with
that name.
that name.
2. If `queue` is a string, then look up the existing queue with that name.

@@ -278,5 +301,5 @@ 3. Bind queue to `exchange/topic`.

* `topic` - a string with the topic name.
* `callback` - a function that takes the arguments `(msg, headers,
deliveryinfo)`.
* `callback` - see `queue.subscribe` below.
### `amqpc.shutdown()`

@@ -313,2 +336,8 @@

The callback will be called with arguments `(msg, headers, deliveryinfo,
actions)`, where `actions` is an object that holds these methods:
* `acknowledge()`: returns a Promise to acknowledge the message. This is
only relevant if `opts.ack` is false (which is the default).
### `queue.unsubscribe()`

@@ -315,0 +344,0 @@

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc