New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

amqp-as-promised

Package Overview
Dependencies
Maintainers
2
Versions
75
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-as-promised - npm Package Compare versions

Comparing version 2.1.0 to 3.0.0

lib/compat-amqplib.js

11

CHANGELOG.md
Changelog
=========
## Unreleased
## 3.0.0 - 2016-10-19
* The underlying amqp library has changed from `node-amqp` to
`amqplib`. Efforts have been made to keep everything as backwards
compatible as possible.
* Local mode is no longer supported.
* `queue.shift()` is no longer supported.
* `Q` has been dropped in favor of native promises. As a result,
support for promise progress notifications over RPC is no longer
supported.
## 0.3.0 - 2015-01-29

@@ -7,0 +16,0 @@ * TTL is now set on RPC messages.

311

lib/amqp-client.js
// Generated by CoffeeScript 1.11.1
(function() {
var ExchangeWrapper, Q, QueueWrapper, amqp, log;
var AmqpClient, ExchangeWrapper, QueueWrapper, amqp, log,
bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; };
log = require('bog');
Q = require('q');
amqp = require('amqplib');
amqp = require('amqp');
ExchangeWrapper = require('./exchange-wrapper');

@@ -15,141 +14,109 @@

module.exports = function(conf) {
var _self, bind, conn, exchange, isShutdown, local, queue, shutdown, unbind;
local = conf.local || process.env.LOCAL;
if (local) {
log.info("local means no amqp connection");
module.exports = AmqpClient = (function() {
function AmqpClient(conf, compat) {
var opts, ref, uri;
this.conf = conf;
this.compat = compat != null ? compat : require('./compat-node-amqp');
this.shutdown = bind(this.shutdown, this);
this._shutdown = bind(this._shutdown, this);
this.unbind = bind(this.unbind, this);
this._unbind = bind(this._unbind, this);
this.bind = bind(this.bind, this);
this._bind = bind(this._bind, this);
this.queue = bind(this.queue, this);
this._queue = bind(this._queue, this);
this.exchange = bind(this.exchange, this);
this._exchange = bind(this._exchange, this);
ref = this.compat.connection(this.conf), uri = ref[0], opts = ref[1];
log.info("connecting to:", uri);
this.channel = amqp.connect(uri, opts).then((function(_this) {
return function(conn) {
conn.on('error', function(err) {
var ref1;
log.warn('amqp error:', (err.message ? err.message : err));
if (typeof ((ref1 = _this.conf) != null ? ref1.errorHandler : void 0) === 'function') {
return _this.conf.errorHandler(err);
} else {
throw err;
}
});
return conn.createChannel();
};
})(this));
this.exchanges = {};
this.queues = {};
}
isShutdown = null;
conn = (function() {
var def, mq, ref;
if (local) {
return Q({
local: true
});
}
log.info("Connecting", conf.connection);
def = Q.defer();
mq = amqp.createConnection(conf.connection);
mq._ttQueues = (ref = mq._ttQueues) != null ? ref : {};
mq.on('ready', function(ev) {
log.info('amqp connection ready');
return def.resolve(mq);
});
mq.on('error', function(err) {
if (!isShutdown) {
log.warn('amqp error:', (err.message ? err.message : err));
if (typeof (conf != null ? conf.errorHandler : void 0) === 'function') {
return conf.errorHandler(err);
} else {
throw err;
}
}
});
return def.promise;
})();
exchange = function(name, opts) {
var def;
AmqpClient.prototype._exchange = function(name, type, opts) {
if (name instanceof ExchangeWrapper) {
return Q(name);
return Promise.resolve(name);
}
if (local) {
throw new Error('Unable connect exchange when local');
if (this.exchanges[name]) {
return this.exchanges[name];
}
if (isShutdown) {
throw new Error('Unable connect exchange when shutdown');
}
def = Q.defer();
conn.then(function(mq) {
var prom, ref;
mq._ttExchanges = (ref = mq._ttExchanges) != null ? ref : {};
prom = mq._ttExchanges[name];
if (prom) {
return prom.then(function(ex) {
return def.resolve(ex);
return this.exchanges[name] = this.channel.then((function(_this) {
return function(c) {
return (!type ? name === '' ? Promise.resolve({
exchange: ''
}) : c.checkExchange(name).then(function() {
return {
exchange: name
};
}) : c.assertExchange(name, type, opts)).then(function(e) {
log.info('exchange ready:', e.exchange);
return new ExchangeWrapper(_this, e);
});
}
mq._ttExchanges[name] = def.promise;
opts = opts != null ? opts : {
passive: true
};
if (name !== '') {
opts.confirm = true;
}
return mq.exchange(name, opts, function(ex) {
log.info('exchange ready:', ex.name);
return def.resolve(new ExchangeWrapper(ex));
}).on('error', function(err) {
return def.reject(err);
});
}).done();
return def.promise;
})(this));
};
queue = (function(_this) {
return function(qname, opts) {
var def;
if (qname instanceof QueueWrapper) {
return Q(qname);
}
if (local) {
throw new Error('Unable to connect queue when local');
}
if (isShutdown) {
throw new Error('Unable to connect queue shutdown');
}
if (qname !== null && typeof qname === 'object') {
opts = qname;
qname = '';
}
if (!qname) {
qname = '';
}
opts = opts != null ? opts : qname === '' ? {
exclusive: true
} : {
passive: true
};
def = Q.defer();
conn.then(function(mq) {
var prom;
if (qname !== '') {
prom = mq._ttQueues[qname];
if (prom) {
return prom.then(function(q) {
return def.resolve(q);
});
}
mq._ttQueues[qname] = def.promise;
}
return mq.queue(qname, opts, function(queue) {
log.info('queue created:', queue.name);
if (qname === '') {
mq._ttQueues[queue.name] = def.promise;
}
return def.resolve(new QueueWrapper(_self, queue));
}).on('error', function(err) {
return def.reject(err);
});
}).done();
return def.promise;
};
})(this);
bind = function(ex, q, topic, callback) {
if (local) {
throw new Error('Unable to bind when local');
AmqpClient.prototype.exchange = function() {
var name, opts, ref, ref1, type;
ref1 = (ref = this.compat).exchangeArgs.apply(ref, arguments), name = ref1[0], type = ref1[1], opts = ref1[2];
return this.compat.promise(this._exchange(name, type, opts));
};
AmqpClient.prototype._queue = function(qname, opts) {
if (qname instanceof QueueWrapper) {
return Promise.resolve(qname);
}
if (isShutdown) {
throw new Error('Unable to bind when shutdown');
if (qname !== null && typeof qname === 'object') {
opts = qname;
qname = '';
}
if (!qname) {
qname = '';
}
opts = opts != null ? opts : qname === '' ? {
exclusive: true
} : void 0;
if (this.queues[qname] && qname !== '') {
return this.queues[qname];
}
return this.queues[qname] = this.channel.then((function(_this) {
return function(c) {
return (!opts ? c.checkQueue(qname) : c.assertQueue(qname, opts)).then(function(q) {
log.info('queue created:', q.queue);
return new QueueWrapper(_this, q);
});
};
})(this));
};
AmqpClient.prototype.queue = function() {
var opts, qname, ref;
ref = this.compat.queueArgs.apply(void 0, arguments), qname = ref[0], opts = ref[1];
return this.compat.promise(this._queue(qname, opts));
};
AmqpClient.prototype._bind = function(exchange, queue, topic, callback) {
if (typeof topic === 'function') {
callback = topic;
topic = q;
q = '';
topic = queue;
queue = '';
}
if (!q) {
q = '';
}
return (Q.all([exchange(ex), queue(q)])).spread(function(ex, q) {
return Q.fcall(function() {
return q.bind(ex, topic);
}).then(function(q) {
return (Promise.all([this._exchange(exchange), this._queue(queue)])).then(function(arg) {
var ex, q;
ex = arg[0], q = arg[1];
return q.bind(ex, topic).then(function(q) {
if (callback != null) {

@@ -163,14 +130,9 @@ return q.subscribe(callback);

};
unbind = function(qname) {
return conn != null ? typeof conn.then === "function" ? conn.then(function(mq) {
var qp;
if (mq.local) {
return def.resolve(true);
}
qp = mq._ttQueues[qname];
if (!qp) {
return def.resolve(mq);
}
return qp;
}).then(function(q) {
AmqpClient.prototype.bind = function(exchange, queue, topic, callback) {
return this.compat.promise(this._bind(exchange, queue, topic, callback));
};
AmqpClient.prototype._unbind = function(queue) {
return this.queue(queue).then(function(q) {
return q.unbind();

@@ -180,51 +142,24 @@ }).then(function(q) {

}).then(function() {
return qname;
}) : void 0 : void 0;
return queue;
});
};
shutdown = function() {
var def;
if (isShutdown) {
return isShutdown.promise;
}
def = isShutdown = Q.defer();
conn.then(function(mq) {
var qname, qp, todo;
if (mq.local) {
return def.resolve(true);
}
todo = (function() {
var ref, results;
ref = mq._ttQueues;
results = [];
for (qname in ref) {
qp = ref[qname];
results.push(qp.then(function(queue) {
if (queue.isAutoDelete()) {
return unbind(qname);
}
}));
}
return results;
})();
return Q.all(todo).then(function() {
log.info('closing amqp connection');
mq.end();
log.info('amqp closed');
return def.resolve(true);
})["catch"](function(err) {
return def.reject(err);
});
}).done();
return def.promise;
AmqpClient.prototype.unbind = function(queue) {
return this.compat.promise(this._unbind(queue));
};
return _self = {
exchange: exchange,
queue: queue,
bind: bind,
unbind: unbind,
shutdown: shutdown,
local: local
AmqpClient.prototype._shutdown = function() {
return this.channel.then(function(c) {
return c.close();
});
};
};
AmqpClient.prototype.shutdown = function() {
return this.compat.promise(this._shutdown());
};
return AmqpClient;
})();
}).call(this);
// Generated by CoffeeScript 1.11.1
(function() {
var I, Q, comp, compress, decomp, decompress, jsonp, plug, zlib;
var I, comp, compress, decomp, decompress, jsonp, plug, zlib;
Q = require('q');
zlib = require('zlib');

@@ -20,3 +18,3 @@

comp = function(buf) {
return Q.Promise(function(rs, rj) {
return new Promise(function(rs, rj) {
return zlib.gzip(buf, plug(rs, rj));

@@ -27,3 +25,3 @@ });

decomp = function(buf) {
return Q.Promise(function(rs, rj) {
return new Promise(function(rs, rj) {
return zlib.gunzip(buf, plug(rs, rj));

@@ -34,3 +32,3 @@ });

jsonp = function(str) {
return Q.Promise(function(rs, rj) {
return new Promise(function(rs, rj) {
var err;

@@ -66,3 +64,3 @@ try {

} else {
return [null, Q(msg)];
return [null, Promise.resolve(msg)];
}

@@ -84,6 +82,6 @@ };

} else {
return [null, Q(msg)];
return [null, Promise.resolve(msg)];
}
} else {
return [null, Q(msg)];
return [null, Promise.resolve(msg)];
}

@@ -90,0 +88,0 @@ };

// Generated by CoffeeScript 1.11.1
(function() {
var ExchangeWrapper, Q;
var ExchangeWrapper,
bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; };
Q = require('q');
module.exports = ExchangeWrapper = (function() {
function ExchangeWrapper(exchange) {
function ExchangeWrapper(client, exchange) {
this.client = client;
this.exchange = exchange;
this.name = this.exchange.name;
this.publish = bind(this.publish, this);
this._publish = bind(this._publish, this);
this.name = this.exchange.exchange;
}
ExchangeWrapper.prototype.publish = function(routingKey, message, options) {
var def, ref;
def = Q.defer();
if (!((ref = this.exchange.options) != null ? ref.confirm : void 0)) {
this.exchange.publish(routingKey, message, options);
def.resolve();
} else {
this.exchange.publish(routingKey, message, options, function(err) {
if (err) {
return def.reject(err);
} else {
return def.resolve();
}
});
ExchangeWrapper.prototype._publish = function(routingKey, message, options) {
if (options == null) {
options = {};
}
return def.promise;
options = this.client.compat.publishOpts(options);
if (!options.contentType) {
options.contentType = 'application/octet-stream';
}
if (typeof message === 'object' && !(message instanceof Buffer)) {
message = new Buffer(JSON.stringify(message));
options.contentType = 'application/json';
}
return this.client.channel.then((function(_this) {
return function(c) {
return c.publish(_this.name, routingKey, message, options);
};
})(this));
};
ExchangeWrapper.prototype.publish = function(routingKey, message, options) {
return this.client.compat.promise(this._publish(routingKey, message, options));
};
return ExchangeWrapper;

@@ -32,0 +39,0 @@

// Generated by CoffeeScript 1.11.1
(function() {
var Rpc, RpcBackend, amqpClient, log;
var AmqpClient, Rpc, RpcBackend, log;
log = require('bog');
amqpClient = require('./amqp-client');
AmqpClient = require('./amqp-client');

@@ -14,3 +14,3 @@ Rpc = require('./rpc');

module.exports = function(conf) {
var amqpc, rpc, rpcBackend;
var client, rpc, rpcBackend;
if (conf == null) {

@@ -24,17 +24,16 @@ conf = {};

conf = {
connection: conf,
local: conf.local
connection: conf
};
}
amqpc = amqpClient(conf);
rpc = new Rpc(amqpc, conf.rpc);
rpcBackend = new RpcBackend(amqpc);
client = new AmqpClient(conf);
rpc = new Rpc(client, conf.rpc);
rpcBackend = new RpcBackend(client);
return {
exchange: amqpc.exchange,
queue: amqpc.queue,
bind: amqpc.bind,
exchange: client.exchange,
queue: client.queue,
bind: client.bind,
rpc: rpc.rpc,
serve: rpcBackend.serve,
shutdown: amqpc.shutdown,
local: amqpc.local
shutdown: client.shutdown,
local: client.local
};

@@ -41,0 +40,0 @@ };

// Generated by CoffeeScript 1.11.1
(function() {
var Q, QueueWrapper, log,
var QueueWrapper, log,
bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; };

@@ -8,47 +8,43 @@

Q = require('q');
module.exports = QueueWrapper = (function() {
function QueueWrapper(amqpc, queue) {
this.amqpc = amqpc;
function QueueWrapper(client, queue) {
this.client = client;
this.queue = queue;
this.shift = bind(this.shift, this);
this.isAutoDelete = bind(this.isAutoDelete, this);
this.isDurable = bind(this.isDurable, this);
this.unsubscribe = bind(this.unsubscribe, this);
this._unsubscribe = bind(this._unsubscribe, this);
this.subscribe = bind(this.subscribe, this);
this._subscribe = bind(this._subscribe, this);
this.unbind = bind(this.unbind, this);
this._unbind = bind(this._unbind, this);
this.bind = bind(this.bind, this);
this.name = this.queue.name;
this.queue.on('open', (function(_this) {
return function(name) {
return _this.name = name;
};
})(this));
this._bind = bind(this._bind, this);
this.name = this.queue.queue;
}
QueueWrapper.prototype.bind = function(exchange, topic) {
return Q().then((function(_this) {
QueueWrapper.prototype._bind = function(exchange, topic) {
if (!topic) {
topic = '';
}
return Promise.resolve().then((function(_this) {
return function() {
if (!topic || typeof topic !== 'string') {
if (typeof topic !== 'string') {
throw new Error('Topic is not a string');
}
return _this.amqpc.exchange(exchange);
return Promise.all([_this.unbind(), _this.client.exchange(exchange)]);
};
})(this)).then((function(_this) {
return function(exwrapper) {
var def, ex;
ex = exwrapper.exchange;
def = Q.defer();
_this.unbind().then(function() {
log.info('binding:', ex.name, _this.name, topic);
_this.queue.once('queueBindOk', function() {
_this._ex = ex;
_this._topic = topic;
log.info('queue bound:', _this.name, _this._topic);
return def.resolve(_this);
});
return _this.queue.bind(ex, topic);
}).done();
return def.promise;
return function(arg) {
var exchange, whatevs;
whatevs = arg[0], exchange = arg[1];
log.info('binding:', exchange.name, _this.name, topic);
return _this.client.channel.then(function(c) {
return c.bindQueue(_this.name, exchange.name, topic);
}).then(function() {
_this._exchange = exchange.name;
_this._topic = topic;
log.info('queue bound:', _this.name, _this._topic);
return _this;
});
};

@@ -58,89 +54,71 @@ })(this));

QueueWrapper.prototype.unbind = function() {
var def;
def = Q.defer();
if (!this._ex || !this.amqpc.conn) {
def.resolve(this);
return def.promise;
QueueWrapper.prototype.bind = function(exchange, topic) {
return this.client.compat.promise(this._bind(exchange, topic));
};
QueueWrapper.prototype._unbind = function() {
if (!(this._exchange && this._topic)) {
return Promise.resolve(this);
}
this.amqpc.conn.then((function(_this) {
return function(mq) {
_this.queue.unbind(_this._ex, _this._topic);
return _this.queue.once('queueUnbindOk', function() {
return this.client.channel.then((function(_this) {
return function(c) {
return c.unbindQueue(_this.name, _this._exchange, _this._topic).then(function() {
log.info('queue unbound:', _this.name, _this._topic);
delete _this._ex;
delete _this._exchange;
delete _this._topic;
return def.resolve(_this);
return _this;
});
};
})(this)).done();
return def.promise;
})(this));
};
QueueWrapper.prototype.subscribe = function(opts, callb) {
var def;
def = Q.defer();
QueueWrapper.prototype.unbind = function() {
return this.client.compat.promise(this._unbind());
};
QueueWrapper.prototype._subscribe = function(opts, callback) {
if (typeof opts === 'function') {
callb = opts;
callback = opts;
opts = null;
}
opts = opts != null ? opts : {
ack: false,
prefetchCount: 1
};
if (!(opts || typeof opts !== 'object')) {
throw new Error('Opts is not an object');
}
if (!(callb || typeof callb !== 'function')) {
throw new Error('Callback is not a function');
}
if (!!opts.ack && opts.prefetchCount > 1) {
this.noshifting = true;
}
this.unsubscribe().then((function(_this) {
return this.unsubscribe().then((function(_this) {
return function() {
var wrapper;
wrapper = function() {
var err;
try {
return callb.apply(null, arguments);
} catch (error) {
err = error;
return log.error(err);
}
};
return (_this.queue.subscribe(opts, wrapper)).addCallback(function(ok) {
var ctag;
ctag = ok.consumerTag;
_this._ctag = ctag;
log.info('subscribed:', _this.name, ctag);
return def.resolve(_this);
}).addErrback(function(err) {
return def.reject(err);
return _this.client.channel.then(function(c) {
opts = _this.client.compat.subscribeOpts(opts);
return Promise.all([c.prefetch(opts.prefetch), c.consume(_this.queue.queue, _this.client.compat.callback(_this.client, callback), opts)]).then(function(arg) {
var consumerTag, ref, whatevs;
whatevs = arg[0], (ref = arg[1], consumerTag = ref.consumerTag);
_this._consumerTag = consumerTag;
return _this;
});
});
};
})(this)).done();
return def.promise;
})(this));
};
QueueWrapper.prototype.unsubscribe = function() {
var ctag, def;
def = Q.defer();
if (!this._ctag) {
def.resolve(this);
return def.promise;
QueueWrapper.prototype.subscribe = function(opts, callback) {
return this.client.compat.promise(this._subscribe(opts, callback));
};
QueueWrapper.prototype._unsubscribe = function() {
if (!this._consumerTag) {
return Promise.resolve(this);
}
ctag = this._ctag;
delete this._ctag;
(this.queue.unsubscribe(ctag)).addCallback((function(_this) {
return this.client.channel.then((function(_this) {
return function(c) {
return c.cancel(_this._consumerTag);
};
})(this)).then((function(_this) {
return function() {
log.info('unsubscribed:', _this.name, ctag);
return def.resolve(_this);
log.info('unsubscribed:', _this.name, _this._consumerTag);
delete _this._consumerTag;
return _this;
};
})(this)).addErrback(function(err) {
return def.reject(err);
});
return def.promise;
})(this));
};
QueueWrapper.prototype.unsubscribe = function() {
return this.client.compat.promise(this._unsubscribe());
};
QueueWrapper.prototype.isDurable = function() {

@@ -154,9 +132,2 @@ return this.queue.options.durable;

QueueWrapper.prototype.shift = function() {
if (this.noshifting) {
throw new Error("ack:true and prefetchCount > 1 does not work with queue.shift(). use (msg, info, del, ack) => ack.acknowledge()");
}
return this.queue.shift.apply(this.queue, arguments);
};
return QueueWrapper;

@@ -163,0 +134,0 @@

// Generated by CoffeeScript 1.11.1
(function() {
var Q, RpcBackend, compress, decompress, doack, log, merge, ref,
var RpcBackend, compress, decompress, doack, log, merge, ref,
bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; };

@@ -8,4 +8,2 @@

Q = require('q');
merge = require('./merge');

@@ -18,3 +16,3 @@

if (opts != null ? opts.ack : void 0) {
return Q().then(function() {
return Promise.resolve().then(function() {
return cb();

@@ -33,4 +31,4 @@ }).then(function() {

module.exports = RpcBackend = (function() {
function RpcBackend(amqpc) {
this.amqpc = amqpc;
function RpcBackend(client) {
this.client = client;
this.serve = bind(this.serve, this);

@@ -45,13 +43,15 @@ }

opts = opts != null ? opts : {};
return Q.all([
this.amqpc.exchange(exname, {
return Promise.all([
this.client.exchange(exname, {
type: 'topic',
durable: true,
autoDelete: false
}), this.amqpc.exchange(''), this.amqpc.queue(exname + "." + topic, {
}), this.client.exchange(''), this.client.queue(exname + "." + topic, {
durable: true,
autoDelete: false
})
]).spread((function(_this) {
return function(ex, defaultex, queue) {
]).then((function(_this) {
return function(arg) {
var defaultex, ex, queue;
ex = arg[0], defaultex = arg[1], queue = arg[2];
queue.bind(ex, topic);

@@ -66,3 +66,3 @@ return queue.subscribe(opts, _this._mkcallback(defaultex, callback, opts));

return doack(subopts, ack)(function() {
var ct, opts, p, progress, ref1, ref2, timeout, timestamp;
var ct, opts, p, ref1, ref2, timeout, timestamp;
if (info.replyTo == null) {

@@ -86,23 +86,6 @@ return;

}
progress = [];
ref2 = decompress(msg, headers), ct = ref2[0], p = ref2[1];
merge(info, ct);
return p.then(function(payload) {
return Q.when(handler(payload, headers, info, function(prgs) {
var h, popts, prev, ref3, ref4;
if (!info.correlationId) {
return;
}
popts = {
correlationId: info.correlationId + "#x-progress:" + progress.length
};
ref3 = compress(prgs, headers), h = ref3[0], p = ref3[1];
if (h) {
popts.headers = h;
}
prev = (ref4 = progress[progress.length - 1]) != null ? ref4 : Q();
return progress.push(Q.all([prev, p]).spread(function(_, payload) {
return exchange.publish(info.replyTo, payload, popts);
}));
}));
return Promise.resolve(handler(payload, headers, info));
}).then(function(res) {

@@ -116,8 +99,6 @@ var h, ref3;

}).then(function(res) {
return Q.all(progress).then(function() {
return exchange.publish(info.replyTo, res, opts);
});
return exchange.publish(info.replyTo, res, opts);
})["catch"](function(err) {
log.error(err);
return Q().then(function() {
return Promise.resolve().then(function() {
var ref3;

@@ -124,0 +105,0 @@ return exchange.publish(info.replyTo, {

// Generated by CoffeeScript 1.11.1
(function() {
var Cache, DEFAULT_TIMEOUT, Q, Rpc, compress, decompress, merge, ref, uuid,
var Cache, DEFAULT_TIMEOUT, Rpc, compress, decompress, merge, ref, uuid,
bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; };
Q = require('q');
uuid = require('uuid');

@@ -19,5 +17,5 @@

module.exports = Rpc = (function() {
function Rpc(amqpc, options1) {
function Rpc(client, options1) {
var ref1;
this.amqpc = amqpc;
this.client = client;
this.options = options1;

@@ -42,3 +40,3 @@ this.rpc = bind(this.rpc, this);

if (!this._returnChannel) {
this._returnChannel = this.amqpc.queue('', {
this._returnChannel = this.client.queue('', {
autoDelete: true,

@@ -60,3 +58,7 @@ exclusive: true

var def, value;
def = Q.defer();
def = {};
def.promise = new Promise(function(resolve, reject) {
def.resolve = resolve;
return def.reject = reject;
});
options = options || {};

@@ -72,14 +74,9 @@ value = {

Rpc.prototype.resolveResponse = function(corrId, msg, headers) {
var ct, p, prgsSeq, ref1, ref2, response;
ref1 = (corrId != null ? corrId : '').split('#x-progress:'), corrId = ref1[0], prgsSeq = ref1[1];
var ct, p, ref1, response;
if (response = this.responses.get(corrId)) {
ref2 = decompress(msg, headers), ct = ref2[0], p = ref2[1];
ref1 = decompress(msg, headers), ct = ref1[0], p = ref1[1];
return p.then((function(_this) {
return function(payload) {
if (prgsSeq) {
return response.def.notify(payload);
} else {
_this.responses.remove(corrId);
return response.def.resolve(payload);
}
_this.responses.remove(corrId);
return response.def.resolve(payload);
};

@@ -96,5 +93,6 @@ })(this))["catch"](function(err) {

}
return Q.all([this.amqpc.exchange(exchange), this.returnChannel()]).spread((function(_this) {
return function(ex, q) {
var corrId, def, h, opts, p, ref1, timeout;
return Promise.all([this.client.exchange(exchange), this.returnChannel()]).then((function(_this) {
return function(arg) {
var corrId, def, ex, h, opts, p, q, ref1, timeout;
ex = arg[0], q = arg[1];
corrId = uuid.v4();

@@ -112,3 +110,3 @@ options = options || {};

opts.headers = headers || {};
opts.headers.timeout = timeout;
opts.headers.timeout = "" + timeout;
ref1 = compress(msg, options), h = ref1[0], p = ref1[1];

@@ -115,0 +113,0 @@ merge(opts.headers, h);

{
"name": "amqp-as-promised",
"description": "A promise-based AMQP API build on node-amqp",
"version": "2.1.0",
"version": "3.0.0",
"repository": {

@@ -25,7 +25,7 @@ "type": "git",

"dependencies": {
"q": "~1.4.1",
"amqp": "0.2.6",
"uuid": "~2.0.1",
"amqplib": "^0.4.2",
"bog": "1",
"mem-cache": "0.0.4"
"lodash.clone": "^4.5.0",
"mem-cache": "0.0.4",
"uuid": "~2.0.1"
},

@@ -32,0 +32,0 @@ "devDependencies": {

@@ -22,2 +22,14 @@ AMQP as Promised

## A note on version 3.0
As of version 3.0, the underlying amqp library has changed from
`node-amqp` to `amqplib`. Efforts have been made to keep everything as
backwards compatible as possible, but some things have changed:
* Local mode is no longer supported.
* `queue.shift()` is no longer supported.
* `Q` has been dropped in favor of native promises. As a result,
support for promise progress notifications over RPC is no longer
supported.
## Installing

@@ -286,7 +298,2 @@

### `queue.shift([reject[, requeue]])`
To be used with `queue.subscribe({ack:true}, callback)`. `reject`
rejects the previous message and will requeue it if `requeue` is true.
### `queue.name`

@@ -337,28 +344,1 @@

```
### Progress
*Since 0.4.0*
The RPC supports
[Q style progress](https://github.com/kriskowal/q#progress-notification)
which can be used to send partial responses.
Example
```coffee
amqpc.serve 'myexchange', 'routing.key', (msg, headers, del, progress) ->
... do some stuff
progress "it's almost done!!!"
... do more stuff
return "here's the result"
```
Client side
```coffee
amqpc.rpc('myexchange', 'routing.key', msg).progress (partial) ->
console.log 'the server tries to tell me', partial
.then (response) ->
console.log 'received message', response
```

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc