antenna-amqp
Advanced tools
Comparing version 0.1.1 to 0.1.2
@@ -119,2 +119,4 @@ /** | ||
q.removeListener('error', onQueueError); | ||
self.address = qname; | ||
self._subscribe(qopts, function(err) { | ||
@@ -197,2 +199,17 @@ if (err) { return self.emit('error', err); } | ||
Bus.prototype.direct = function(addr, msg, options, cb) { | ||
if (typeof options == 'function') { | ||
cb = options; | ||
options = undefined; | ||
} | ||
options = options || {}; | ||
// AMQP uses period ('.') separators rather than slash ('/') | ||
addr = addr.replace(/\//g, '.'); | ||
options.deliveryMode = (options.deliveryMode === undefined) ? NONPERSISTENT_MODE : options.deliveryMode; | ||
this._connection.publish(addr, msg, options); | ||
if (cb) { return process.nextTick(cb); } | ||
} | ||
/** | ||
@@ -277,2 +294,4 @@ * Subscribe to messages of `topic` broadcast on the bus. | ||
var m = new Message(message, headers, deliveryInfo); | ||
m.bus = self; | ||
self.emit('message', m); | ||
@@ -279,0 +298,0 @@ }).addCallback(function(ok) { |
@@ -11,4 +11,8 @@ /** | ||
if (deliveryInfo.contentType) { this.headers['content-type'] = deliveryInfo.contentType; } | ||
// TODO: only set body if it has been parsed, otherwise set `data` | ||
this.body = message; | ||
if (Buffer.isBuffer(message.data)) { | ||
this.data = message.data; | ||
} else { | ||
this.body = message; | ||
} | ||
} | ||
@@ -15,0 +19,0 @@ |
{ | ||
"name": "antenna-amqp", | ||
"version": "0.1.1", | ||
"version": "0.1.2", | ||
"description": "AMQP adapter for Antenna.", | ||
@@ -5,0 +5,0 @@ "keywords": [ |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
14202
342