New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

amqp-as-promised

Package Overview
Dependencies
Maintainers
2
Versions
75
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp-as-promised - npm Package Compare versions

Comparing version 0.0.5 to 0.0.6

212

lib/amqp-client.js
// Generated by CoffeeScript 1.6.3
(function() {
var Q, amqp, log;
var Q, QueueWrapper, amqp, log,
__bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; };

@@ -84,6 +85,17 @@ log = require('bog');

}
if (qname !== null && typeof qname === 'object') {
opts = qname;
qname = '';
}
if (!qname) {
qname = '';
}
opts = opts != null ? opts : {
durable: true,
autoDelete: qname === ''
};
def = Q.defer();
conn.then(function(mq) {
var prom;
if (qname !== "") {
if (qname !== '') {
prom = mq._ttQueues[qname];

@@ -95,16 +107,10 @@ if (prom) {

}
if (qname) {
mq._ttQueues[qname] = def.promise;
}
mq._ttQueues[qname] = def.promise;
}
opts = opts != null ? opts : {
durable: true,
autoDelete: qname === ""
};
return mq.queue(qname, opts, function(queue) {
log.info('queue created:', queue.name);
if (qname === "") {
if (qname === '') {
mq._ttQueues[queue.name] = def.promise;
}
return def.resolve(queue);
return def.resolve(new QueueWrapper(conn, queue));
});

@@ -125,34 +131,14 @@ }).done();

topic = qname;
qname = "";
qname = '';
}
if (!qname) {
qname = "";
qname = '';
}
def = Q.defer();
(unbind(qname)).then(function() {
return Q.all([exchange(exname), queue(qname)]);
}).spread(function(ex, q) {
log.info('binding:', exname, q.name, topic);
q.bind(ex, topic);
return q.on('queueBindOk', function() {
var wrap;
wrap = function() {
var err;
try {
return callback.apply(null, arguments);
} catch (_error) {
err = _error;
return log.error(err);
}
};
return (q.subscribe(wrap)).addCallback(function(ok) {
var ctag;
ctag = ok.consumerTag;
q._ttCtag = ctag;
q._ttEx = ex;
q._ttTopic = topic;
log.info('consumer bound:', q.name, ctag);
return def.resolve(ex);
});
});
(Q.all([exchange(exname), queue(qname)])).spread(function(ex, q) {
return q.bind(ex, topic);
}).then(function(q) {
return q.subscribe(callback);
}).then(function() {
return def.resolve(qname);
}).done();

@@ -173,20 +159,9 @@ return def.promise;

}
return qp.then(function(q) {
if (!q._ttEx) {
return def.resolve(conn);
}
log.info('unbinding:', qname);
q.unbind(q._ttEx, q._ttTopic);
return q.on('queueUnbindOk', function() {
var ctag;
ctag = q._ttCtag;
delete q._ttCtag;
delete q._ttEx;
delete q._ttTopic;
q.unsubscribe(ctag);
delete mq._ttQueues[qname];
log.info('consumer unbound:', qname, ctag);
return def.resolve(mq);
});
});
return qp;
}).then(function(q) {
return q.unbind();
}).then(function(q) {
return q.unsubscribe();
}).then(function() {
return def.resolve(qname);
}).done();

@@ -207,3 +182,2 @@ return def.promise;

}
log.info('closing amqp connection');
return Q.all((function() {

@@ -219,3 +193,4 @@ var _ref, _results;

})()).then(function() {
mq.backoff = mq.reconnect = function() {
log.info('closing amqp connection');
mq.backoff = mq.reconnect = mq.connect = function() {
return false;

@@ -232,2 +207,3 @@ };

exchange: exchange,
queue: queue,
bind: bind,

@@ -239,2 +215,120 @@ shutdown: shutdown,

QueueWrapper = (function() {
function QueueWrapper(conn, queue) {
this.conn = conn;
this.queue = queue;
this.shift = __bind(this.shift, this);
this.unsubscribe = __bind(this.unsubscribe, this);
this.subscribe = __bind(this.subscribe, this);
this.unbind = __bind(this.unbind, this);
this.bind = __bind(this.bind, this);
this.name = this.queue.name;
}
QueueWrapper.prototype.bind = function(ex, topic) {
var def,
_this = this;
if (!(ex || typeof ex !== 'object')) {
throw new Error('Exchange is not an object');
}
if (!(topic || typeof topic !== 'string')) {
throw new Error('Topic is not a string');
}
def = Q.defer();
this.unbind().then(function() {
log.info('binding:', ex.name, _this.name, topic);
_this.queue.bind(ex, topic);
return _this.queue.once('queueBindOk', function() {
_this._ex = ex;
_this._topic = topic;
log.info('queue bound:', _this.name, _this._topic);
return def.resolve(_this);
});
}).done();
return def.promise;
};
QueueWrapper.prototype.unbind = function() {
var def,
_this = this;
def = Q.defer();
if (!this._ex) {
def.resolve(this);
return def.promise;
}
this.conn.then(function(mq) {
_this.queue.unbind(_this._ex, _this._topic);
return _this.queue.once('queueUnbindOk', function() {
log.info('queue unbound:', _this.name, _this._topic);
delete _this._ex;
delete _this._topic;
return def.resolve(_this);
});
}).done();
return def.promise;
};
QueueWrapper.prototype.subscribe = function(opts, callb) {
var def,
_this = this;
def = Q.defer();
if (typeof opts === 'function') {
callb = opts;
opts = null;
}
opts = opts != null ? opts : {
ack: false,
prefetchCount: 1
};
if (!(opts || typeof opts !== 'object')) {
throw new Error('Opts is not an object');
}
if (!(callb || typeof callb !== 'function')) {
throw new Error('Callback is not a function');
}
this.unsubscribe().then(function() {
var wrapper;
wrapper = function() {
var err;
try {
return callb.apply(null, arguments);
} catch (_error) {
err = _error;
return log.error(err);
}
};
return (_this.queue.subscribe(opts, wrapper)).addCallback(function(ok) {
var ctag;
ctag = ok.consumerTag;
_this._ctag = ctag;
log.info('subscribed:', _this.name, ctag);
return def.resolve(_this);
});
}).done();
return def.promise;
};
QueueWrapper.prototype.unsubscribe = function() {
var ctag, def;
def = Q.defer();
if (!this._ctag) {
def.resolve(this);
return def.promise;
}
ctag = this._ctag;
delete this._ctag;
this.queue.unsubscribe(ctag);
log.info('unsubscribed:', this.name, ctag);
def.resolve(this);
return def.promise;
};
QueueWrapper.prototype.shift = function() {
return this.queue.shift.apply(this.queue, arguments);
};
return QueueWrapper;
})();
}).call(this);
{
"name": "amqp-as-promised",
"description": "Promise wrappers for node-amqp",
"version": "0.0.5",
"version": "0.0.6",
"repository": {

@@ -6,0 +6,0 @@ "type": "git",

@@ -19,4 +19,3 @@ AMQP as Promised

* Connection settings for RabbitMQ. `host`, `vhost`, `login`,
`password` specifies how to connect.
* Connection settings for RabbitMQ. `host`, `vhost`, `login`, `password` specifies how to connect.
* `local`: means there will be no AMQP connection.

@@ -44,7 +43,15 @@

This is shorthand for binding and subscribing.
amqpc.bind 'myexchange', 'mytopic.#', (msg, headers, del) ->
console.log 'received message', msg
## Using `amqpc` to create an anonymous queue
## Using `amqpc` to get an anomymous queue
To create an anomymous queue.
amqpc.queue().then (q) -> console.log 'my queue', q
To create an anonymous queue shorthand.
amqpc.bind('myexchange', '', 'mytopic.#').then (q) ->

@@ -67,1 +74,60 @@ console.log 'queue created: ' + q.name

process.on 'SIGTERM', graceful
## The `amqpc` object
### `amqpc.exchange(name, opts)`
A promise for an exchange. If `opts` is omitted declares an exchange in `passive` mode.
### `amqpc.queue(qname, opts)`
A promise for a queue. If `qname` is omitted, `""` is used. If opts is
omitted a default `durable:true` and `autoDelete:(qname=='')`. See
`queue.*` below.
### `amqpc.bind(exname, qname, topic, callback)`
Shorthand for
1. Looking up exchange for `exname`. Note that `passive:true` so
exchange must be declared already.
2. Looking up queue for `qname`. See `amqpc.queue` for queue default
opts.
3. Binding queue to `topic`.
4. Subscribing `callback` to queue.
### `amqpc.shutdown()`
Will unbind all queues and unsubscribe all callbacks then gracefully
shut down the socket connection.
### `amqpc.local`
Read only property that tells whether `conf.local` was true.
### `queue.bind(ex, topic)`
Binds the queue to the given exchange (object, not name). Will unbind
if queue was already bound.
### `queue.unbind()`
Unbinds the queue (if currently bound).
### `queue.subscribe(opts, callback)`
Subscribes the callback to this queue. Will unsubscribe any previous
callback. If opts is omitted, defaults to `ack: false, prefetchCount: 1`
### `queue.unsubscribe()`
Unsubscribes current callback (if any).
### `queue.shift([reject[, requeue]])`
To be used with `queue.subscribe({ack:true}, callback)`. `reject`
rejects the previous message and will requeue it if `requeue` is true.
### `queue.name`
Read only property with the queue name.

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc