taskcluster-client
Advanced tools
Comparing version
@@ -12,3 +12,2 @@ var events = require('events'); | ||
var Listener = function(options) { | ||
this._connected = false; | ||
this._bindings = []; | ||
@@ -60,3 +59,3 @@ this._options = _.defaults(options || {}, { | ||
Listener.prototype.connect = function() { | ||
assert(!this._connected, "Can't connect when already connected"); | ||
if (this._channel) return Promise.resolve(this._channel); | ||
assert(this._options.connectionString, "connectionString is required"); | ||
@@ -85,4 +84,4 @@ var that = this; | ||
// Find queue name and decide if this is an exclusive queue | ||
var exclusive = !this._options.queueName; | ||
this._queueName = this._options.queueName || slugid.v4(); | ||
var exclusive = this._options.queueName != undefined; | ||
@@ -116,12 +115,4 @@ // Create queue | ||
return bindingsCreated.then(function() { | ||
return that._channel.consume(that._queueName, function(msg) { | ||
debug("Received message from: %s", msg.fields.exchange); | ||
that._handle(msg); | ||
}); | ||
}).then(function(result) { | ||
that._consumerTag = result.consumerTag; | ||
that._connected = true; | ||
debug("Listening with consumer tag: '%s' on queue '%s'", | ||
that._consumerTag, that._queueName); | ||
}); | ||
return that._channel; | ||
}) | ||
}; | ||
@@ -131,7 +122,7 @@ | ||
Listener.prototype.pause = function() { | ||
if (!this._connected) { | ||
if (!this._channel) { | ||
debug("WARNING: Paused listener instance was wasn't connected yet"); | ||
return; | ||
} | ||
assert(this._connected, "Can't pause when not connected"); | ||
assert(this._channel, "Can't pause when not connected"); | ||
return this._channel.cancel(this._consumerTag); | ||
@@ -142,7 +133,5 @@ }; | ||
Listener.prototype.resume = function() { | ||
if(!this._connected) { | ||
return this.connect(); | ||
} else { | ||
var that = this; | ||
return that._channel.consume(that._queueName, function(msg) { | ||
var that = this; | ||
return this.connect().then(function(channel) { | ||
return channel.consume(that._queueName, function(msg) { | ||
that._handle(msg); | ||
@@ -152,3 +141,3 @@ }).then(function(result) { | ||
}); | ||
} | ||
}); | ||
}; | ||
@@ -155,0 +144,0 @@ |
{ | ||
"name": "taskcluster-client", | ||
"version": "0.7.4", | ||
"version": "0.8.0", | ||
"author": "Jonas Finnemann Jensen <jopsen@gmail.com>", | ||
@@ -5,0 +5,0 @@ "description": "Client for interfacing taskcluster components", |
@@ -76,4 +76,4 @@ # TaskCluster Client [](https://travis-ci.org/taskcluster/taskcluster-client) | ||
// Start listening for events | ||
listener.connect().then(function() { | ||
// Listen and consume events: | ||
listener.resume().then(function() { | ||
// Now listening | ||
@@ -83,2 +83,21 @@ }); | ||
For advanced queue usage the `connect` method can be used to | ||
create and bind the queue and return an associated [amqplib] channel: | ||
```js | ||
var taskcluster = require('taskcluster-client'); | ||
// Create a listener (this creates a queue on AMQP) | ||
var listener = new taskcluster.Listener({ | ||
connectionString: 'amqp://...' | ||
}); | ||
// See: http://www.squaremobius.net/amqp.node/doc/channel_api.html | ||
var channel = listener.connect().then(function(channel) { | ||
return channel.consume(function(msg) { | ||
channel.ack(msg); | ||
}); | ||
}); | ||
``` | ||
The listener creates a AMQP queue, on the server side and subscribes to messages | ||
@@ -291,5 +310,5 @@ on the queue. It's possible to use named queues, see details below. For details | ||
listener.connect().then(...); // Setup listener and start | ||
listener.connect().then(...); // Setup listener and bind queue | ||
listener.resume().then(...); // Start getting new messages | ||
listener.pause().then(...); // Pause retrieval of new messages | ||
listener.resume().then(...); // Start getting new messages | ||
listener.close(); // Disconnect from AMQP | ||
@@ -296,0 +315,0 @@ ``` |
@@ -64,3 +64,3 @@ suite('listener', function() { | ||
var published = listener.connect().then(function() { | ||
var published = listener.resume().then(function() { | ||
return _publisher.testExchange({ | ||
@@ -101,3 +101,3 @@ text: "my message" | ||
var published = listener.connect().then(function() { | ||
var published = listener.resume().then(function() { | ||
return _publisher.testExchange({ | ||
@@ -139,3 +139,3 @@ text: "my message" | ||
var published = listener.connect().then(function() { | ||
var published = listener.resume().then(function() { | ||
return _publisher.testExchange({ | ||
@@ -176,3 +176,3 @@ text: "my message" | ||
var published = listener.connect().then(function() { | ||
var published = listener.resume().then(function() { | ||
return _publisher.testExchange({ | ||
@@ -196,2 +196,3 @@ text: "my message" | ||
var listener = new taskcluster.Listener({ | ||
queueName: slugid.v4(), | ||
connectionString: mockEvents.connectionString | ||
@@ -218,3 +219,3 @@ }); | ||
var published = listener.connect().then(function() { | ||
var published = listener.resume().then(function() { | ||
return _publisher.testExchange({ | ||
@@ -267,2 +268,3 @@ text: "my message" | ||
var listener = new taskcluster.Listener({ | ||
queueName: slugid.v4(), | ||
connectionString: mockEvents.connectionString, | ||
@@ -289,3 +291,3 @@ maxLength: 3 | ||
return listener.connect().then(function() { | ||
return listener.resume().then(function() { | ||
return listener.pause().then(function() { | ||
@@ -292,0 +294,0 @@ return _publisher.testExchange({ |
111949
0.17%325
6.21%1974
-0.45%