amqp-as-promised
Advanced tools
Comparing version 0.0.5 to 0.0.6
// Generated by CoffeeScript 1.6.3 | ||
(function() { | ||
var Q, amqp, log; | ||
var Q, QueueWrapper, amqp, log, | ||
__bind = function(fn, me){ return function(){ return fn.apply(me, arguments); }; }; | ||
@@ -84,6 +85,17 @@ log = require('bog'); | ||
} | ||
if (qname !== null && typeof qname === 'object') { | ||
opts = qname; | ||
qname = ''; | ||
} | ||
if (!qname) { | ||
qname = ''; | ||
} | ||
opts = opts != null ? opts : { | ||
durable: true, | ||
autoDelete: qname === '' | ||
}; | ||
def = Q.defer(); | ||
conn.then(function(mq) { | ||
var prom; | ||
if (qname !== "") { | ||
if (qname !== '') { | ||
prom = mq._ttQueues[qname]; | ||
@@ -95,16 +107,10 @@ if (prom) { | ||
} | ||
if (qname) { | ||
mq._ttQueues[qname] = def.promise; | ||
} | ||
mq._ttQueues[qname] = def.promise; | ||
} | ||
opts = opts != null ? opts : { | ||
durable: true, | ||
autoDelete: qname === "" | ||
}; | ||
return mq.queue(qname, opts, function(queue) { | ||
log.info('queue created:', queue.name); | ||
if (qname === "") { | ||
if (qname === '') { | ||
mq._ttQueues[queue.name] = def.promise; | ||
} | ||
return def.resolve(queue); | ||
return def.resolve(new QueueWrapper(conn, queue)); | ||
}); | ||
@@ -125,34 +131,14 @@ }).done(); | ||
topic = qname; | ||
qname = ""; | ||
qname = ''; | ||
} | ||
if (!qname) { | ||
qname = ""; | ||
qname = ''; | ||
} | ||
def = Q.defer(); | ||
(unbind(qname)).then(function() { | ||
return Q.all([exchange(exname), queue(qname)]); | ||
}).spread(function(ex, q) { | ||
log.info('binding:', exname, q.name, topic); | ||
q.bind(ex, topic); | ||
return q.on('queueBindOk', function() { | ||
var wrap; | ||
wrap = function() { | ||
var err; | ||
try { | ||
return callback.apply(null, arguments); | ||
} catch (_error) { | ||
err = _error; | ||
return log.error(err); | ||
} | ||
}; | ||
return (q.subscribe(wrap)).addCallback(function(ok) { | ||
var ctag; | ||
ctag = ok.consumerTag; | ||
q._ttCtag = ctag; | ||
q._ttEx = ex; | ||
q._ttTopic = topic; | ||
log.info('consumer bound:', q.name, ctag); | ||
return def.resolve(ex); | ||
}); | ||
}); | ||
(Q.all([exchange(exname), queue(qname)])).spread(function(ex, q) { | ||
return q.bind(ex, topic); | ||
}).then(function(q) { | ||
return q.subscribe(callback); | ||
}).then(function() { | ||
return def.resolve(qname); | ||
}).done(); | ||
@@ -173,20 +159,9 @@ return def.promise; | ||
} | ||
return qp.then(function(q) { | ||
if (!q._ttEx) { | ||
return def.resolve(conn); | ||
} | ||
log.info('unbinding:', qname); | ||
q.unbind(q._ttEx, q._ttTopic); | ||
return q.on('queueUnbindOk', function() { | ||
var ctag; | ||
ctag = q._ttCtag; | ||
delete q._ttCtag; | ||
delete q._ttEx; | ||
delete q._ttTopic; | ||
q.unsubscribe(ctag); | ||
delete mq._ttQueues[qname]; | ||
log.info('consumer unbound:', qname, ctag); | ||
return def.resolve(mq); | ||
}); | ||
}); | ||
return qp; | ||
}).then(function(q) { | ||
return q.unbind(); | ||
}).then(function(q) { | ||
return q.unsubscribe(); | ||
}).then(function() { | ||
return def.resolve(qname); | ||
}).done(); | ||
@@ -207,3 +182,2 @@ return def.promise; | ||
} | ||
log.info('closing amqp connection'); | ||
return Q.all((function() { | ||
@@ -219,3 +193,4 @@ var _ref, _results; | ||
})()).then(function() { | ||
mq.backoff = mq.reconnect = function() { | ||
log.info('closing amqp connection'); | ||
mq.backoff = mq.reconnect = mq.connect = function() { | ||
return false; | ||
@@ -232,2 +207,3 @@ }; | ||
exchange: exchange, | ||
queue: queue, | ||
bind: bind, | ||
@@ -239,2 +215,120 @@ shutdown: shutdown, | ||
QueueWrapper = (function() { | ||
function QueueWrapper(conn, queue) { | ||
this.conn = conn; | ||
this.queue = queue; | ||
this.shift = __bind(this.shift, this); | ||
this.unsubscribe = __bind(this.unsubscribe, this); | ||
this.subscribe = __bind(this.subscribe, this); | ||
this.unbind = __bind(this.unbind, this); | ||
this.bind = __bind(this.bind, this); | ||
this.name = this.queue.name; | ||
} | ||
QueueWrapper.prototype.bind = function(ex, topic) { | ||
var def, | ||
_this = this; | ||
if (!(ex || typeof ex !== 'object')) { | ||
throw new Error('Exchange is not an object'); | ||
} | ||
if (!(topic || typeof topic !== 'string')) { | ||
throw new Error('Topic is not a string'); | ||
} | ||
def = Q.defer(); | ||
this.unbind().then(function() { | ||
log.info('binding:', ex.name, _this.name, topic); | ||
_this.queue.bind(ex, topic); | ||
return _this.queue.once('queueBindOk', function() { | ||
_this._ex = ex; | ||
_this._topic = topic; | ||
log.info('queue bound:', _this.name, _this._topic); | ||
return def.resolve(_this); | ||
}); | ||
}).done(); | ||
return def.promise; | ||
}; | ||
QueueWrapper.prototype.unbind = function() { | ||
var def, | ||
_this = this; | ||
def = Q.defer(); | ||
if (!this._ex) { | ||
def.resolve(this); | ||
return def.promise; | ||
} | ||
this.conn.then(function(mq) { | ||
_this.queue.unbind(_this._ex, _this._topic); | ||
return _this.queue.once('queueUnbindOk', function() { | ||
log.info('queue unbound:', _this.name, _this._topic); | ||
delete _this._ex; | ||
delete _this._topic; | ||
return def.resolve(_this); | ||
}); | ||
}).done(); | ||
return def.promise; | ||
}; | ||
QueueWrapper.prototype.subscribe = function(opts, callb) { | ||
var def, | ||
_this = this; | ||
def = Q.defer(); | ||
if (typeof opts === 'function') { | ||
callb = opts; | ||
opts = null; | ||
} | ||
opts = opts != null ? opts : { | ||
ack: false, | ||
prefetchCount: 1 | ||
}; | ||
if (!(opts || typeof opts !== 'object')) { | ||
throw new Error('Opts is not an object'); | ||
} | ||
if (!(callb || typeof callb !== 'function')) { | ||
throw new Error('Callback is not a function'); | ||
} | ||
this.unsubscribe().then(function() { | ||
var wrapper; | ||
wrapper = function() { | ||
var err; | ||
try { | ||
return callb.apply(null, arguments); | ||
} catch (_error) { | ||
err = _error; | ||
return log.error(err); | ||
} | ||
}; | ||
return (_this.queue.subscribe(opts, wrapper)).addCallback(function(ok) { | ||
var ctag; | ||
ctag = ok.consumerTag; | ||
_this._ctag = ctag; | ||
log.info('subscribed:', _this.name, ctag); | ||
return def.resolve(_this); | ||
}); | ||
}).done(); | ||
return def.promise; | ||
}; | ||
QueueWrapper.prototype.unsubscribe = function() { | ||
var ctag, def; | ||
def = Q.defer(); | ||
if (!this._ctag) { | ||
def.resolve(this); | ||
return def.promise; | ||
} | ||
ctag = this._ctag; | ||
delete this._ctag; | ||
this.queue.unsubscribe(ctag); | ||
log.info('unsubscribed:', this.name, ctag); | ||
def.resolve(this); | ||
return def.promise; | ||
}; | ||
QueueWrapper.prototype.shift = function() { | ||
return this.queue.shift.apply(this.queue, arguments); | ||
}; | ||
return QueueWrapper; | ||
})(); | ||
}).call(this); |
{ | ||
"name": "amqp-as-promised", | ||
"description": "Promise wrappers for node-amqp", | ||
"version": "0.0.5", | ||
"version": "0.0.6", | ||
"repository": { | ||
@@ -6,0 +6,0 @@ "type": "git", |
@@ -19,4 +19,3 @@ AMQP as Promised | ||
* Connection settings for RabbitMQ. `host`, `vhost`, `login`, | ||
`password` specifies how to connect. | ||
* Connection settings for RabbitMQ. `host`, `vhost`, `login`, `password` specifies how to connect. | ||
* `local`: means there will be no AMQP connection. | ||
@@ -44,7 +43,15 @@ | ||
This is shorthand for binding and subscribing. | ||
amqpc.bind 'myexchange', 'mytopic.#', (msg, headers, del) -> | ||
console.log 'received message', msg | ||
## Using `amqpc` to create an anonymous queue | ||
## Using `amqpc` to get an anomymous queue | ||
To create an anomymous queue. | ||
amqpc.queue().then (q) -> console.log 'my queue', q | ||
To create an anonymous queue shorthand. | ||
amqpc.bind('myexchange', '', 'mytopic.#').then (q) -> | ||
@@ -67,1 +74,60 @@ console.log 'queue created: ' + q.name | ||
process.on 'SIGTERM', graceful | ||
## The `amqpc` object | ||
### `amqpc.exchange(name, opts)` | ||
A promise for an exchange. If `opts` is omitted declares an exchange in `passive` mode. | ||
### `amqpc.queue(qname, opts)` | ||
A promise for a queue. If `qname` is omitted, `""` is used. If opts is | ||
omitted a default `durable:true` and `autoDelete:(qname=='')`. See | ||
`queue.*` below. | ||
### `amqpc.bind(exname, qname, topic, callback)` | ||
Shorthand for | ||
1. Looking up exchange for `exname`. Note that `passive:true` so | ||
exchange must be declared already. | ||
2. Looking up queue for `qname`. See `amqpc.queue` for queue default | ||
opts. | ||
3. Binding queue to `topic`. | ||
4. Subscribing `callback` to queue. | ||
### `amqpc.shutdown()` | ||
Will unbind all queues and unsubscribe all callbacks then gracefully | ||
shut down the socket connection. | ||
### `amqpc.local` | ||
Read only property that tells whether `conf.local` was true. | ||
### `queue.bind(ex, topic)` | ||
Binds the queue to the given exchange (object, not name). Will unbind | ||
if queue was already bound. | ||
### `queue.unbind()` | ||
Unbinds the queue (if currently bound). | ||
### `queue.subscribe(opts, callback)` | ||
Subscribes the callback to this queue. Will unsubscribe any previous | ||
callback. If opts is omitted, defaults to `ack: false, prefetchCount: 1` | ||
### `queue.unsubscribe()` | ||
Unsubscribes current callback (if any). | ||
### `queue.shift([reject[, requeue]])` | ||
To be used with `queue.subscribe({ack:true}, callback)`. `reject` | ||
rejects the previous message and will requeue it if `requeue` is true. | ||
### `queue.name` | ||
Read only property with the queue name. |
Sorry, the diff of this file is not supported yet
20509
313
131