amqp-as-promised
Advanced tools
Comparing version 4.2.0 to 5.0.0
@@ -22,7 +22,5 @@ // Generated by CoffeeScript 1.12.5 | ||
function AmqpClient(conf, compat) { | ||
var opts, reconnect, ref, uri; | ||
this.conf = conf; | ||
this.compat = compat != null ? compat : require('./compat-node-amqp'); | ||
this.shutdown = bind(this.shutdown, this); | ||
this._shutdown = bind(this._shutdown, this); | ||
this.unbind = bind(this.unbind, this); | ||
@@ -36,32 +34,21 @@ this._unbind = bind(this._unbind, this); | ||
this._exchange = bind(this._exchange, this); | ||
this.getChannel = bind(this.getChannel, this); | ||
this.connect = bind(this.connect, this); | ||
this.exchanges = {}; | ||
this.queues = {}; | ||
} | ||
AmqpClient.prototype.connect = function() { | ||
var opts, ref, uri; | ||
ref = this.compat.connection(this.conf), uri = ref[0], opts = ref[1]; | ||
log.info("connecting to:", uri); | ||
reconnect = (function(_this) { | ||
return function() { | ||
return amqp.connect(uri, opts).then(function(conn) { | ||
conn.on('error', function(err) { | ||
var ref1; | ||
if (_this._shuttingDown) { | ||
return; | ||
} | ||
log.warn('amqp error:', (err.message ? err.message : err)); | ||
if (typeof ((ref1 = _this.conf) != null ? ref1.errorHandler : void 0) === 'function') { | ||
return _this.conf.errorHandler(err); | ||
} else { | ||
throw err; | ||
} | ||
}); | ||
return conn.createChannel().then(function(c) { | ||
c.setMaxListeners(_this.conf.maxListeners || 1000); | ||
return c; | ||
}); | ||
})["catch"](function(err) { | ||
var time; | ||
if (_this.conf.waitForConnection) { | ||
log.info("waiting for connection to:", uri); | ||
time = _this.conf.waitForConnection; | ||
if (typeof time !== 'number') { | ||
time = 1000; | ||
} | ||
return wait(time).then(reconnect); | ||
return amqp.connect(uri, opts).then((function(_this) { | ||
return function(conn) { | ||
_this.conn = conn; | ||
log.info("connected"); | ||
_this.conn.on('error', function(err) { | ||
var ref1; | ||
log.warn('amqp error:', (err.message ? err.message : err)); | ||
if (typeof ((ref1 = _this.conf) != null ? ref1.errorHandler : void 0) === 'function') { | ||
return _this.conf.errorHandler(err); | ||
} else { | ||
@@ -71,12 +58,37 @@ throw err; | ||
}); | ||
return Promise.resolve("ok"); | ||
}; | ||
})(this); | ||
this.channel = reconnect(); | ||
this.exchanges = {}; | ||
this.queues = {}; | ||
} | ||
})(this))["catch"]((function(_this) { | ||
return function(err) { | ||
var time; | ||
if (_this.conf.waitForConnection) { | ||
time = _this.conf.waitForConnection; | ||
if (typeof time !== 'number') { | ||
time = 1000; | ||
} | ||
log.info("waiting for connection to:", uri); | ||
return wait(time).then(function() { | ||
return _this.connect(uri, opts); | ||
}); | ||
} else { | ||
return Promise.reject(err); | ||
} | ||
}; | ||
})(this)); | ||
}; | ||
AmqpClient.prototype.getChannel = function() { | ||
return this.conn.createChannel().then((function(_this) { | ||
return function(c) { | ||
c.setMaxListeners(_this.conf.maxListeners || 1000); | ||
return Promise.resolve(c); | ||
}; | ||
})(this))["catch"](function(err) { | ||
return Promise.reject(err); | ||
}); | ||
}; | ||
AmqpClient.prototype._exchange = function(name, type, opts) { | ||
if (name instanceof ExchangeWrapper) { | ||
return Promise.resolve(name); | ||
return name; | ||
} | ||
@@ -86,3 +98,3 @@ if (this.exchanges[name]) { | ||
} | ||
return this.exchanges[name] = this.channel.then((function(_this) { | ||
return this.getChannel().then((function(_this) { | ||
return function(c) { | ||
@@ -97,6 +109,9 @@ return (!type ? name === '' ? Promise.resolve({ | ||
log.info('exchange ready:', e.exchange); | ||
return new ExchangeWrapper(_this, e); | ||
return _this.exchanges[name] = new ExchangeWrapper(_this, e, c); | ||
}); | ||
}; | ||
})(this)); | ||
})(this))["catch"](function(err) { | ||
log.error("_echange error", err); | ||
return Promise.reject(err); | ||
}); | ||
}; | ||
@@ -127,10 +142,13 @@ | ||
} | ||
return this.queues[qname] = this.channel.then((function(_this) { | ||
return this.getChannel().then((function(_this) { | ||
return function(c) { | ||
return (!opts ? c.checkQueue(qname) : c.assertQueue(qname, opts)).then(function(q) { | ||
log.info('queue created:', q.queue); | ||
return new QueueWrapper(_this, q); | ||
return _this.queues[qname] = new QueueWrapper(_this, q, c); | ||
}); | ||
}; | ||
})(this)); | ||
})(this))["catch"](function(err) { | ||
log.error("_queue error", err); | ||
return Promise.reject(err); | ||
}); | ||
}; | ||
@@ -160,2 +178,5 @@ | ||
}); | ||
})["catch"](function(err) { | ||
log.error("_bind error", err); | ||
return Promise.reject(err); | ||
}); | ||
@@ -182,11 +203,5 @@ }; | ||
AmqpClient.prototype._shutdown = function() { | ||
return this.channel.then(function(c) { | ||
return c.close(); | ||
}); | ||
}; | ||
AmqpClient.prototype.shutdown = function() { | ||
this._shuttingDown = true; | ||
return this.compat.promise(this._shutdown()); | ||
var ref; | ||
return this.compat.promise((ref = this.conn) != null ? ref.close() : void 0); | ||
}; | ||
@@ -193,0 +208,0 @@ |
@@ -72,3 +72,3 @@ // Generated by CoffeeScript 1.12.5 | ||
}, | ||
callback: function(client, cb) { | ||
callback: function(channel, cb) { | ||
return function(data) { | ||
@@ -104,5 +104,3 @@ var ack, content, headers, info, ref, ref1, ref10, ref11, ref12, ref13, ref14, ref15, ref16, ref17, ref18, ref19, ref2, ref20, ref3, ref4, ref5, ref6, ref7, ref8, ref9; | ||
acknowledge: function() { | ||
return client.channel.then(function(c) { | ||
return c.ack(data); | ||
}); | ||
return channel.ack(data); | ||
} | ||
@@ -109,0 +107,0 @@ }; |
@@ -7,5 +7,6 @@ // Generated by CoffeeScript 1.12.5 | ||
module.exports = ExchangeWrapper = (function() { | ||
function ExchangeWrapper(client, exchange) { | ||
function ExchangeWrapper(client, exchange, channel) { | ||
this.client = client; | ||
this.exchange = exchange; | ||
this.channel = channel; | ||
this.publish = bind(this.publish, this); | ||
@@ -17,15 +18,9 @@ this._publish = bind(this._publish, this); | ||
ExchangeWrapper.prototype._publish = function(routingKey, message, options) { | ||
return this.client.channel.then((function(_this) { | ||
return function(c) { | ||
return new Promise(function(rs, rj) { | ||
if (c.publish(_this.name, routingKey, message, options)) { | ||
return rs(); | ||
} else { | ||
return c.once('drain', function() { | ||
return rs(); | ||
}); | ||
} | ||
}); | ||
}; | ||
})(this)); | ||
if (this.channel.publish(this.name, routingKey, message, options)) { | ||
return {}; | ||
} else { | ||
return this.channel.once('drain', function() { | ||
return {}; | ||
}); | ||
} | ||
}; | ||
@@ -32,0 +27,0 @@ |
@@ -14,26 +14,35 @@ // Generated by CoffeeScript 1.12.5 | ||
module.exports = function(conf) { | ||
var client, rpc, rpcBackend; | ||
if (conf == null) { | ||
conf = {}; | ||
} | ||
if (conf.logLevel) { | ||
log.level(conf.logLevel); | ||
} | ||
if (!conf.connection) { | ||
conf = { | ||
connection: conf | ||
}; | ||
} | ||
client = new AmqpClient(conf); | ||
rpc = new Rpc(client, conf.rpc); | ||
rpcBackend = new RpcBackend(client); | ||
return { | ||
exchange: client.exchange, | ||
queue: client.queue, | ||
bind: client.bind, | ||
rpc: rpc.rpc, | ||
serve: rpcBackend.serve, | ||
shutdown: client.shutdown, | ||
local: client.local | ||
}; | ||
return new Promise(function(resolve, reject) { | ||
var client; | ||
if (conf.logLevel) { | ||
log.level(conf.logLevel); | ||
} | ||
if (!conf.connection) { | ||
conf = { | ||
connection: conf | ||
}; | ||
} | ||
client = new AmqpClient(conf); | ||
return client.connect().then((function(_this) { | ||
return function() { | ||
var rpc, rpcBackend; | ||
rpc = new Rpc(client, conf.rpc); | ||
rpcBackend = new RpcBackend(client); | ||
return resolve({ | ||
exchange: client.exchange, | ||
queue: client.queue, | ||
bind: client.bind, | ||
rpc: rpc.rpc, | ||
serve: rpcBackend.serve, | ||
shutdown: client.shutdown, | ||
local: client.local | ||
}); | ||
}; | ||
})(this))["catch"](function(err) { | ||
return reject(err); | ||
}); | ||
}); | ||
}; | ||
@@ -40,0 +49,0 @@ |
@@ -9,5 +9,6 @@ // Generated by CoffeeScript 1.12.5 | ||
module.exports = QueueWrapper = (function() { | ||
function QueueWrapper(client, queue) { | ||
function QueueWrapper(client, queue, channel) { | ||
this.client = client; | ||
this.queue = queue; | ||
this.channel = channel; | ||
this.isAutoDelete = bind(this.isAutoDelete, this); | ||
@@ -42,5 +43,3 @@ this.isDurable = bind(this.isDurable, this); | ||
log.info('binding:', exchange.name, _this.name, topic); | ||
return _this.client.channel.then(function(c) { | ||
return c.bindQueue(_this.name, exchange.name, topic); | ||
}).then(function() { | ||
return _this.channel.bindQueue(_this.name, exchange.name, topic).then(function() { | ||
_this._exchange = exchange.name; | ||
@@ -52,3 +51,6 @@ _this._topic = topic; | ||
}; | ||
})(this)); | ||
})(this))["catch"](function(err) { | ||
log.error("qw _bind error", err); | ||
return Promise.reject(err); | ||
}); | ||
}; | ||
@@ -64,12 +66,13 @@ | ||
} | ||
return this.client.channel.then((function(_this) { | ||
return function(c) { | ||
return c.unbindQueue(_this.name, _this._exchange, _this._topic).then(function() { | ||
log.info('queue unbound:', _this.name, _this._topic); | ||
delete _this._exchange; | ||
delete _this._topic; | ||
return _this; | ||
}); | ||
return this.channel.unbindQueue(this.name, this._exchange, this._topic).then((function(_this) { | ||
return function() { | ||
log.info('queue unbound:', _this.name, _this._topic); | ||
delete _this._exchange; | ||
delete _this._topic; | ||
return _this; | ||
}; | ||
})(this)); | ||
})(this))["catch"](function(err) { | ||
log.error("qw _unbind error", err); | ||
return Promise.reject(err); | ||
}); | ||
}; | ||
@@ -88,13 +91,14 @@ | ||
return function() { | ||
return _this.client.channel.then(function(c) { | ||
opts = _this.client.compat.subscribeOpts(opts); | ||
return Promise.all([c.prefetch(opts.prefetch), c.consume(_this.queue.queue, _this.client.compat.callback(_this.client, callback), opts)]).then(function(arg) { | ||
var consumerTag, ref, whatevs; | ||
whatevs = arg[0], (ref = arg[1], consumerTag = ref.consumerTag); | ||
_this._consumerTag = consumerTag; | ||
return _this; | ||
}); | ||
opts = _this.client.compat.subscribeOpts(opts); | ||
return Promise.all([_this.channel.prefetch(opts.prefetch), _this.channel.consume(_this.queue.queue, _this.client.compat.callback(_this.channel, callback), opts)]).then(function(arg) { | ||
var consumerTag, ref, whatevs; | ||
whatevs = arg[0], (ref = arg[1], consumerTag = ref.consumerTag); | ||
_this._consumerTag = consumerTag; | ||
return _this; | ||
}); | ||
}; | ||
})(this)); | ||
})(this))["catch"](function(err) { | ||
log.error("qw _subscribe error", err); | ||
return Promise.reject(err); | ||
}); | ||
}; | ||
@@ -108,9 +112,5 @@ | ||
if (!this._consumerTag) { | ||
return Promise.resolve(this); | ||
return this; | ||
} | ||
return this.client.channel.then((function(_this) { | ||
return function(c) { | ||
return c.cancel(_this._consumerTag); | ||
}; | ||
})(this)).then((function(_this) { | ||
return this.channel.cancel(this._consumerTag).then((function(_this) { | ||
return function() { | ||
@@ -121,3 +121,5 @@ log.info('unsubscribed:', _this.name, _this._consumerTag); | ||
}; | ||
})(this)); | ||
})(this))["catch"](function(err) { | ||
return log.error("qw _unsub error", err); | ||
}); | ||
}; | ||
@@ -124,0 +126,0 @@ |
@@ -116,5 +116,5 @@ // Generated by CoffeeScript 1.12.5 | ||
return p.then(function(payload) { | ||
return ex.publish(routingKey, payload, opts); | ||
}).then(function() { | ||
return def.promise; | ||
return ex.publish(routingKey, payload, opts).then(function() { | ||
return def.promise; | ||
}); | ||
}); | ||
@@ -121,0 +121,0 @@ }; |
{ | ||
"name": "amqp-as-promised", | ||
"description": "A promise-based AMQP API build on node-amqp", | ||
"version": "4.2.0", | ||
"version": "5.0.0", | ||
"repository": { | ||
@@ -39,3 +39,3 @@ "type": "git", | ||
"coffeelint": "^2.0.7", | ||
"mocha": "^3.1.2", | ||
"mocha": "^5.2.0", | ||
"mocha-jenkins-reporter": "^0.3.10", | ||
@@ -42,0 +42,0 @@ "proxyquire": "^1.7.3", |
@@ -10,5 +10,13 @@ AMQP as Promised | ||
A high-level [promise-based](https://github.com/kriskowal/q) API built on | ||
[node-amqp](https://github.com/postwait/node-amqp), extended with | ||
functions for AMQP-based RPC. | ||
[`amqplib`](https://www.npmjs.com/package/amqplib) | ||
extended with functions for AMQP-based RPC. | ||
* [`amqplib` API docs][amqplib-api-docs] | ||
* Old versions of this package were based on [node-amqp][npm-node-amqp]. | ||
[amqplib-api-docs]: (http://www.squaremobius.net/amqp.node/channel_api.html) | ||
[npm-node-amqp]: https://github.com/postwait/node-amqp | ||
## Table of contents | ||
* [Configuration](#configuration) | ||
@@ -23,5 +31,11 @@ * [Examples](#examples) | ||
## A note on version 3.0 | ||
## Version Notes | ||
As of version 3.0, the underlying amqp library has changed from | ||
#### 5.0 | ||
Syntax to access the library has been changed in 5.0 to improve connection management. See the [Running](#running)-section for instructions. | ||
#### 3.0 | ||
The underlying amqp library was changed from | ||
`node-amqp` to `amqplib`. Efforts have been made to keep everything as | ||
@@ -42,4 +56,13 @@ backwards compatible as possible, but some things have changed: | ||
5.0+ | ||
```coffee | ||
conf = require './myconf.json' # see example conf below | ||
((require 'amqp-as-promised') conf.amqp).then (amqpc) -> | ||
``` | ||
Earlier versions | ||
```coffee | ||
conf = require './myconf.json' # see example conf below | ||
amqpc = (require 'amqp-as-promised') conf.amqp | ||
``` | ||
@@ -266,3 +289,3 @@ ## Configuration | ||
1. If `exchange` is a string, then look up the existing exchange with | ||
that name. | ||
that name. | ||
2. If `queue` is a string, then look up the existing queue with that name. | ||
@@ -278,5 +301,5 @@ 3. Bind queue to `exchange/topic`. | ||
* `topic` - a string with the topic name. | ||
* `callback` - a function that takes the arguments `(msg, headers, | ||
deliveryinfo)`. | ||
* `callback` - see `queue.subscribe` below. | ||
### `amqpc.shutdown()` | ||
@@ -313,2 +336,8 @@ | ||
The callback will be called with arguments `(msg, headers, deliveryinfo, | ||
actions)`, where `actions` is an object that holds these methods: | ||
* `acknowledge()`: returns a Promise to acknowledge the message. This is | ||
only relevant if `opts.ack` is false (which is the default). | ||
### `queue.unsubscribe()` | ||
@@ -315,0 +344,0 @@ |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
115376
894
388
1