Comparing version 0.2.0 to 0.2.1
@@ -37,6 +37,11 @@ 'use strict'; | ||
log('promise', 'resolve'); | ||
msg.body = JSON.parse(msg.content); | ||
return defer.resolve(msg); | ||
try { | ||
msg.body = JSON.parse(msg.content); | ||
return defer.resolve(msg); | ||
} catch(err) { | ||
log('reject json parse issue.'); | ||
return defer.reject(err); | ||
} | ||
} | ||
}; | ||
}; |
@@ -17,3 +17,6 @@ 'use strict'; | ||
queueOpts: {durable: true, autoDelete: false, messageTtl: 30000, expires: 3600000}, | ||
autoAck: false | ||
autoAck: false, | ||
errorHandler: function(err) { | ||
console.error(err); | ||
} | ||
}; | ||
@@ -33,3 +36,3 @@ | ||
if(options.middleware && !Array.isArray(options.middleware)){ | ||
if (options.middleware && !Array.isArray(options.middleware)) { | ||
throw new Error('Middleware must be supplied in an array.'); | ||
@@ -81,10 +84,7 @@ } | ||
debug('queue', self.options.queue); | ||
if(self.options.autoAck) { | ||
if (self.options.autoAck) { | ||
debug('autoAck', 'true'); | ||
ackMessage(); | ||
} | ||
}, function(err){ | ||
ch.nack(msg); | ||
onErr(err); | ||
}); | ||
}, self.options.errorHandler); | ||
} | ||
@@ -103,6 +103,2 @@ | ||
function onErr(err) { | ||
self.container.logger.error(err); | ||
} | ||
/** | ||
@@ -119,3 +115,3 @@ * Initialise the route. | ||
open.then(function (conn) { | ||
open.then(function(conn) { | ||
conn.on('error', self.cleanup); | ||
@@ -131,3 +127,3 @@ var ok = conn.createChannel(); | ||
this.close = function close() { | ||
this.connection.then(function (conn) { | ||
this.connection.then(function(conn) { | ||
conn.close(); | ||
@@ -134,0 +130,0 @@ }); |
{ | ||
"name": "svcs", | ||
"version": "0.2.0", | ||
"version": "0.2.1", | ||
"description": "AMQP driven micro services for node.", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -25,9 +25,15 @@ # svcs [![Build Status](https://drone.io/github.com/wolfeidau/svcs/status.png)](https://drone.io/github.com/wolfeidau/svcs/latest) | ||
// override the default amqpUrl | ||
container.set('amqpUrl', 'amqp://guest:guest@rabbitmq.example.com:5672'); | ||
var amqpUrl = process.env.AMQP_URL || config.amqp.url; | ||
container.set('amqpUrl', amqpUrl); | ||
function onErr(err) { | ||
console.warn('error processing message', err); | ||
} | ||
// add a route which will process messages for the given routing key | ||
// the attribute :gatewayId will be replaced with * when passed to bindQueue | ||
container.route('$gw.:gatewayId.events', {queue: 'gw_events'}, function handler(err, msg){ | ||
container.route('$gw.:gatewayId.events', {queue: 'gw_events', errorHandler: onErr}, function handler(msg){ | ||
var gatewayId = msg.params.gatewayId; | ||
} | ||
}); | ||
``` | ||
@@ -34,0 +40,0 @@ |
@@ -78,2 +78,28 @@ 'use strict'; | ||
it('should setup a route and call error on bad json', function (done) { | ||
container.use(jsonDecode); | ||
function onError(err) { | ||
expect(err).to.exist; | ||
done(); | ||
} | ||
var open = container.route('$jsontest.*.events.err', {queue: 'test_events_json_err', errorHandler: onError}, function (msg) { | ||
}); | ||
setTimeout( | ||
function () { | ||
open.then(function (conn) { | ||
var ok = conn.createChannel(); | ||
ok = ok.then(function (ch) { | ||
ch.publish('amq.topic', '$jsontest.123456.events.err', new Buffer('{msg: "some message"}'), {contentType: 'application/json'}); | ||
}); | ||
return ok; | ||
}).then(null, console.warn); | ||
}, 100); | ||
}); | ||
}); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
23892
604
70