Comparing version 0.1.6 to 0.1.7
@@ -36,3 +36,3 @@ "use strict"; | ||
this.connected = false; | ||
this.routes = []; | ||
this.routers = []; | ||
this.$promise = null; | ||
@@ -78,3 +78,3 @@ } | ||
* @param {Function} [handler] | ||
* @returns {*} | ||
* @returns {Promise} | ||
*/ | ||
@@ -87,3 +87,3 @@ Client.prototype.route = function (route, options, handler) { | ||
var codec = this.codec; | ||
var open = this.svcs_container.route(route, options, function (message) { | ||
var router = this.context.route(route, options, function (message) { | ||
when.try(function () { | ||
@@ -93,3 +93,3 @@ message.payload = codec.decode(message.content); | ||
}).done(function () { | ||
message.ack(); | ||
return message.ack(); | ||
}, function (err) { | ||
@@ -99,4 +99,4 @@ debug('error', 'Error thrown in routing handler, not acking message. Error: ', err.stack); | ||
}); | ||
this.routes.push(this.svcs_container.routes[this.svcs_container.routes.length - 1]); | ||
return open; | ||
this.routers.push(router); | ||
return router.$promise; | ||
}; | ||
@@ -114,10 +114,10 @@ | ||
return close_connection(this.conn).then(function () { | ||
return when.all(when.map(that.routes, function (route) { | ||
return route.connection.then(close_connection); | ||
return when.all(when.map(that.routers, function (router) { | ||
return router.connection.then(function (conn) { | ||
if (conn === that.conn) return; | ||
return close_connection(conn); | ||
}); | ||
})); | ||
}).then(function () { | ||
_.remove(that.svcs_container.routes, function (item) { | ||
return _.indexOf(that.routes, item) >= 0; | ||
}); | ||
that.routes = []; | ||
that.routers = []; | ||
that.closed = true; | ||
@@ -130,3 +130,3 @@ that.closing = false; | ||
function close_connection(conn) { | ||
if (!conn) return when.resolve(); | ||
if (!conn || conn.cloing || conn.closed) return when.resolve(); | ||
return when.try(function () { | ||
@@ -133,0 +133,0 @@ var d = when.defer(); |
@@ -6,4 +6,4 @@ "use strict"; | ||
var when = require('when'); | ||
var amqp = require('amqplib'); | ||
var svcs = require('svcs'); | ||
var amqp = require('./amqp'); | ||
var routify = require('./routify'); | ||
var Client = require('./client'); | ||
@@ -31,6 +31,9 @@ | ||
client.svcs_container = svcs(); | ||
var open = when(amqp.connect(url)); | ||
client.context = routify.createContext({ | ||
connection: open, | ||
errorHandler: client.emit.bind(client, 'error', routerErr) | ||
}); | ||
var setup = open.then(function (conn) { | ||
@@ -41,3 +44,2 @@ debug('connected'); | ||
conn.on('error', client.emit.bind(client, 'error')); | ||
conn.on('close', function () { | ||
@@ -48,4 +50,3 @@ client.connected = false; | ||
client.svcs_container.set('amqpConnection', conn); | ||
client.svcs_container.set('errorHandler', client.emit.bind(client, 'error', routerErr)); | ||
conn.on('error', client.emit.bind(client, 'error')); | ||
@@ -52,0 +53,0 @@ return conn; |
{ | ||
"name": "amqper", | ||
"version": "0.1.6", | ||
"version": "0.1.7", | ||
"description": "A simple and elegant AMQP client for node based on amqplib.", | ||
@@ -20,5 +20,5 @@ "homepage": "https://github.com/taoyuan/amqper", | ||
"debug": "^2.2.0", | ||
"houkou": "^0.2.2", | ||
"lodash": "^3.10.0", | ||
"msgpack": "^0.2.6", | ||
"svcs": "^0.2.1", | ||
"when": "^3.7.3" | ||
@@ -25,0 +25,0 @@ }, |
@@ -7,3 +7,10 @@ 'use strict'; | ||
function delayCloseClient(client, done) { | ||
setTimeout(function () { | ||
client.close(done); | ||
}, 100); | ||
} | ||
describe('amqper', function () { | ||
this.timeout(8000); | ||
@@ -31,11 +38,9 @@ describe('connect', function () { | ||
t.deepEqual(message.payload, data); | ||
client.close(function () { | ||
t.lengthOf(client.svcs_container.routes, 0); | ||
done(); | ||
}); | ||
delayCloseClient(client, done); | ||
}).then(function () { | ||
client.publish('amq.topic', 'test1.a', data); | ||
}); | ||
setTimeout(function () { | ||
client.publish('amq.topic', 'test1.a', data); | ||
}, 500); | ||
}); | ||
}); | ||
@@ -54,10 +59,6 @@ | ||
t.deepEqual(message.payload, data); | ||
client.close(function () { | ||
t.lengthOf(client.svcs_container.routes, 0); | ||
done(); | ||
}); | ||
delayCloseClient(client, done); | ||
}).then(function () { | ||
client.publish('amq.topic', 'test2.a', data); | ||
}); | ||
setTimeout(function () { | ||
client.publish('amq.topic', 'test2.a', data); | ||
}, 500); | ||
}); | ||
@@ -64,0 +65,0 @@ }); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
19355
22
486
+ Addedhoukou@^0.2.2
- Removedsvcs@^0.2.1
- Removedamqplib@0.1.3(transitive)
- Removedbitsyntax@0.0.3(transitive)
- Removeddebug@0.7.4(transitive)
- Removedhoek@1.4.1(transitive)
- Removedlog4js@0.6.38(transitive)
- Removedobject-keys@0.4.0(transitive)
- Removedreadable-stream@1.0.34(transitive)
- Removedsemver@4.3.6(transitive)
- Removedstatsd-client@0.0.15(transitive)
- Removedsvcs@0.2.1(transitive)
- Removedwhen@2.1.12.7.1(transitive)
- Removedxtend@2.1.2(transitive)