Comparing version 0.2.0 to 0.2.3
@@ -146,2 +146,13 @@ exports.constants = [ | ||
"fields": [] | ||
}, { | ||
"name": "blocked", | ||
"index": 60, | ||
"fields": [{ | ||
"name": "reason", | ||
"domain": "shortstr" | ||
}] | ||
}, { | ||
"name": "unblocked", | ||
"index": 61, | ||
"fields": [] | ||
}] | ||
@@ -749,2 +760,2 @@ }, { | ||
}] | ||
}]; | ||
}]; |
@@ -19,6 +19,6 @@ 'use strict'; | ||
var nodeAMQPVersion = require('../package').version; | ||
var maxFrameBuffer = 131072; // 128k, same as rabbitmq (which was | ||
// copying qpid) | ||
var channelMax = 65535; | ||
var defaultPorts = { 'amqp': 5672, 'amqps': 5671 }; | ||
@@ -64,7 +64,7 @@ | ||
this.setImplOptions(options); | ||
if (typeof readyCallback === 'function') { | ||
this._readyCallback = readyCallback; | ||
} | ||
this.connectionAttemptScheduled = false; | ||
@@ -74,2 +74,5 @@ this._defaultExchange = null; | ||
this._sendBuffer = new Buffer(maxFrameBuffer); | ||
this._blocked = false; | ||
this._blockedReason = null; | ||
}; | ||
@@ -208,3 +211,3 @@ util.inherits(Connection, EventEmitter); | ||
for (var channel in self.channels) { | ||
if (channel !== 0) { | ||
if (channel !== '0') { | ||
self.channels[channel].state = 'closed'; | ||
@@ -266,4 +269,3 @@ } | ||
_.each(self.channels, function(channel, index) { | ||
// FIXME why is the index "0" instead of 0? | ||
if (index !== "0") channel.reconnect(); | ||
if (index !== '0') channel.reconnect(); | ||
}); | ||
@@ -413,2 +415,21 @@ } | ||
Connection.prototype._saslResponse = function () { | ||
var response; | ||
if (this.options.authMechanism == 'AMQPLAIN') | ||
response = { | ||
LOGIN: this.options.login, | ||
PASSWORD: this.options.password | ||
}; | ||
else if (this.options.authMechanism == 'PLAIN') | ||
response = "\0" + this.options.login + "\0" + this.options.password; | ||
else if (this.options.authMechanism == 'EXTERNAL') | ||
response = "\0"; | ||
else if (this.options.authMechanism == 'ANONYMOUS') | ||
response = "\0"; | ||
else | ||
response = this.options.response; | ||
return response; | ||
} | ||
Connection.prototype._onMethod = function (channel, method, args) { | ||
@@ -449,6 +470,3 @@ debug && debug(channel + " > " + method.name + " " + JSON.stringify(args)); | ||
mechanism: this.options.authMechanism, | ||
response: { | ||
LOGIN: this.options.login, | ||
PASSWORD: this.options.password | ||
}, | ||
response: this._saslResponse(), | ||
locale: 'en_US' | ||
@@ -464,5 +482,9 @@ }); | ||
} | ||
if (args.channelMax) { | ||
debug && debug("tweaking channelMax to " + args.channelMax); | ||
channelMax = args.channelMax; | ||
} | ||
// 5. We respond with connectionTuneOk | ||
this._sendMethod(0, methods.connectionTuneOk, { | ||
channelMax: 0, | ||
channelMax: channelMax, | ||
frameMax: maxFrameBuffer, | ||
@@ -507,2 +529,16 @@ heartbeat: this.options.heartbeat || 0 | ||
case methods.connectionBlocked: | ||
debug && debug('Received connection.blocked from server with reason: ' + args.reason); | ||
this._blocked = true; | ||
this._blockedReason = args.reason; | ||
this.emit('blocked'); | ||
break; | ||
case methods.connectionUnblocked: | ||
debug && debug('Received connection.unblocked from server'); | ||
this._blocked = false; | ||
this._blockedReason = null; | ||
this.emit('unblocked'); | ||
break; | ||
default: | ||
@@ -541,3 +577,3 @@ throw new Error("Uncaught method '" + method.name + "' with args " + | ||
* Connect helpers | ||
* | ||
* | ||
*/ | ||
@@ -550,9 +586,9 @@ | ||
if(typeof this.options.hostPreference == 'number') { | ||
this.hosti = (this.options.hostPreference < this.options.host.length) ? | ||
this.options.hostPreference : this.options.host.length-1; | ||
} else { | ||
this.hosti = (this.options.hostPreference < this.options.host.length) ? | ||
this.options.hostPreference : this.options.host.length-1; | ||
} else { | ||
this.hosti = parseInt(Math.random() * this.options.host.length, 10); | ||
} | ||
} else { | ||
// If this is already set, it looks like we want to choose another one. | ||
// If this is already set, it looks like we want to choose another one. | ||
// Add one to hosti but don't overflow it. | ||
@@ -568,9 +604,18 @@ this.hosti = (this.hosti + 1) % this.options.host.length; | ||
Connection.prototype._createSocket = function() { | ||
var hostName = this._chooseHost(), self = this; | ||
var hostName = this._chooseHost(), self = this, port = this.options.port; | ||
var parsedHost = URL.parse(hostName); | ||
if(parsedHost.port){ | ||
hostName = parsedHost.hostname; | ||
port = parsedHost.port; | ||
} | ||
var options = { | ||
port: this.options.port, | ||
port: port, | ||
host: hostName | ||
}; | ||
// Disable tcp nagle's algo | ||
// Default: true, makes small messages faster | ||
var noDelay = this.options.noDelay || true; | ||
var resetConnectionTimeout = function () { | ||
@@ -602,2 +647,4 @@ debug && debug('connected so resetting connection timeout'); | ||
this.socket.setNoDelay(noDelay); | ||
// Proxy events. | ||
@@ -611,3 +658,3 @@ // Note that if we don't attach a 'data' event, no data will flow. | ||
// Proxy a few methods that we use / previously used. | ||
var methods = ['end', 'destroy', 'write', 'pause', 'resume', 'setEncoding', 'ref', 'unref', 'address']; | ||
var methods = ['destroy', 'write', 'pause', 'resume', 'setEncoding', 'ref', 'unref', 'address']; | ||
_.each(methods, function(method){ | ||
@@ -618,3 +665,20 @@ self[method] = function(){ | ||
}); | ||
}; | ||
Connection.prototype.end = function() { | ||
if (this.socket) { | ||
this.socket.end(); | ||
} | ||
this.options.heartbeat = false; | ||
if (this._inboundHeartbeatTimer !== null) { | ||
clearTimeout(this._inboundHeartbeatTimer); | ||
this._inboundHeartbeatTimer = null; | ||
} | ||
if (this._outboundHeartbeatTimer !== null) { | ||
clearTimeout(this._outboundHeartbeatTimer); | ||
this._outboundHeartbeatTimer = null; | ||
} | ||
}; | ||
@@ -625,2 +689,6 @@ | ||
this.sslConnectionOptions = {}; | ||
if (this.options.ssl.pfxFile) { | ||
this.sslConnectionOptions.pfx = fs.readFileSync(this.options.ssl.pfxFile); | ||
} | ||
if (this.options.ssl.keyFile) { | ||
@@ -635,3 +703,6 @@ this.sslConnectionOptions.key = fs.readFileSync(this.options.ssl.keyFile); | ||
} | ||
this.sslConnectionOptions.rejectUnauthorized = this.options.ssl.rejectUnauthorized; | ||
this.sslConnectionOptions.passphrase = this.options.ssl.passphrase; | ||
return this.sslConnectionOptions; | ||
@@ -650,3 +721,3 @@ }; | ||
* Parse helpers | ||
* | ||
* | ||
*/ | ||
@@ -805,3 +876,3 @@ | ||
// use values in range of 1..65535 | ||
channelId = channelId % 65535 + 1; | ||
channelId = channelId % channelMax + 1; | ||
if(!this.channels[channelId]){ | ||
@@ -808,0 +879,0 @@ break; |
@@ -51,2 +51,8 @@ 'use strict'; | ||
if (/^$|(amq\.)/.test(this.name)) { | ||
//If confirm mode is specified we have to set it no matter the exchange. | ||
if (this.options.confirm) { | ||
this.connection._sendMethod(channel, methods.confirmSelect, { noWait: false }); | ||
return; | ||
} | ||
this.state = 'open'; | ||
@@ -64,10 +70,29 @@ // - issue #33 fix | ||
} else if (this.options.noDeclare) { | ||
this.state = 'open'; | ||
if (this.options.confirm) { | ||
this.connection._sendMethod(channel, methods.confirmSelect, | ||
{ noWait: false }); | ||
// We should wait for the confirmSelectOk message to come back | ||
// before we consider the exchange open and fire the open callback. | ||
// the handler for confirmSelectOk initializes this._sequence. | ||
// whose values are used to match published messages with their | ||
// confirmations. If we fired the open callback here, publishes | ||
// could happen before that intialization. If this happens, | ||
// the first message gets a null value set as its "sequence" value | ||
// but the ack comes back with a 1. The 1 doesn't match anything | ||
// in the array of messages awaiting confirmation, and so the | ||
// confirmation doesn't do anything and the callback passed into | ||
// the publish method never fires. I've not full investigated, but | ||
// presumably any message publishes that are performed before | ||
// confirmSelectOk is received would have their confirmation | ||
// behave abnormally. | ||
} else { | ||
this.state = 'open'; | ||
if (this._openCallback) { | ||
this._openCallback(this); | ||
this._openCallback = null; | ||
if (this._openCallback) { | ||
this._openCallback(this); | ||
this._openCallback = null; | ||
} | ||
this.emit('open'); | ||
} | ||
this.emit('open'); | ||
} else { | ||
@@ -213,2 +238,10 @@ this.connection._sendMethod(channel, methods.exchangeDeclare, | ||
if (this.connection._blocked) { | ||
if (callback) { | ||
return callback(true, new Error('Connection is blocked, server reason: ' + this.connection._blockedReason)); | ||
} else { | ||
return; | ||
} | ||
} | ||
options = _.extend({}, options || {}); | ||
@@ -215,0 +248,0 @@ options.routingKey = routingKey; |
@@ -205,2 +205,3 @@ 'use strict'; | ||
} | ||
this._lastMessage = null; | ||
} | ||
@@ -358,3 +359,2 @@ }; | ||
self.exchange.binds--; | ||
self.exchange.cleanup(); | ||
} | ||
@@ -361,0 +361,0 @@ self.connection._sendMethod(self.channel, methods.queueDelete, |
{ "name" : "amqp" | ||
, "description" : "AMQP driver for node" | ||
, "keywords" : [ "amqp" ] | ||
, "version" : "0.2.0" | ||
, "version" : "0.2.3" | ||
, "author" : { "name" : "Ryan Dahl" } | ||
@@ -6,0 +6,0 @@ , "contributors" : |
@@ -17,3 +17,3 @@ [![build status](https://secure.travis-ci.org/postwait/node-amqp.png)](http://travis-ci.org/postwait/node-amqp) | ||
- [Queue](#queue) | ||
- [connection.queue(name, options, openCallback)](#connectionqueuename-options-opencallback) | ||
- [connection.queue(name[, options][, openCallback])](#connectionqueuename-options-opencallback) | ||
- [queue.subscribe([options,] listener)](#queuesubscribeoptions-listener) | ||
@@ -28,5 +28,5 @@ - [queue.subscribeRaw([options,] listener)](#queuesubscriberawoptions-listener) | ||
- [Exchange](#exchange) | ||
- [exchange.on('open', callback)](#exchangeon%27open%27-callback) | ||
- [exchange.on('open', callback)](#exchangeonopen-callback) | ||
- [connection.exchange()](#connectionexchange) | ||
- [connection.exchange(name, options={}, openCallback)](#connectionexchangename-options=%7B%7D-opencallback) | ||
- [connection.exchange(name, options={}, openCallback)](#connectionexchangename-options-opencallback) | ||
- [exchange.publish(routingKey, message, options, callback)](#exchangepublishroutingkey-message-options-callback) | ||
@@ -56,3 +56,3 @@ - [exchange.destroy(ifUnused = true)](#exchangedestroyifunused--true) | ||
// Use the default 'amq.topic' exchange | ||
connection.queue('my-queue', function(q){ | ||
connection.queue('my-queue', function (q) { | ||
// Catch all messages | ||
@@ -89,5 +89,6 @@ q.bind('#'); | ||
, password: 'guest' | ||
, connectionTimeout: 0, | ||
, connectionTimeout: 10000, | ||
, authMechanism: 'AMQPLAIN' | ||
, vhost: '/' | ||
, noDelay: true | ||
, ssl: { enabled : false | ||
@@ -114,3 +115,4 @@ } | ||
The key, certificate, and certificate authority files must be in pem format. | ||
If `port` is not specified, the default AMQPS port 5671 is used. | ||
Alternatively, `pfxFile` can be used to read key and certificate from a single | ||
file. If `port` is not specified, the default AMQPS port 5671 is used. | ||
If `rejectUnauthorized` is not specified, it defaults to true. | ||
@@ -248,3 +250,3 @@ | ||
### connection.queue(name, options, openCallback) | ||
### connection.queue(name[, options][, openCallback]) | ||
@@ -251,0 +253,0 @@ Returns a reference to a queue. The name parameter is required, unlike pika which defaults the name to `''`. The options are |
@@ -5,3 +5,3 @@ var harness = require('./harness'); | ||
// If given one, without an index (`this.hosti`), it will randomly pick one. | ||
options.host = [options.host,"nohost"]; | ||
options.host = [options.host,"nohost:5673"]; | ||
options.reconnect = true; | ||
@@ -8,0 +8,0 @@ |
@@ -21,3 +21,3 @@ require('./harness').run(); | ||
timeout = setTimeout(function() { | ||
puts("ERROR: Timeout occured!!!!!!!"); | ||
puts("ERROR: Timeout occurred!!!!!!!"); | ||
connection.end(); | ||
@@ -24,0 +24,0 @@ assert.ok(1, 2); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
496402
87
6001
551
10240
12