Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp

Package Overview
Dependencies
Maintainers
2
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp - npm Package Compare versions

Comparing version 0.2.0 to 0.2.3

test/test-connection-blocked.js

13

lib/amqp-definitions-0-9-1.js

@@ -146,2 +146,13 @@ exports.constants = [

"fields": []
}, {
"name": "blocked",
"index": 60,
"fields": [{
"name": "reason",
"domain": "shortstr"
}]
}, {
"name": "unblocked",
"index": 61,
"fields": []
}]

@@ -749,2 +760,2 @@ }, {

}]
}];
}];

@@ -19,6 +19,6 @@ 'use strict';

var nodeAMQPVersion = require('../package').version;
var maxFrameBuffer = 131072; // 128k, same as rabbitmq (which was
// copying qpid)
var channelMax = 65535;
var defaultPorts = { 'amqp': 5672, 'amqps': 5671 };

@@ -64,7 +64,7 @@

this.setImplOptions(options);
if (typeof readyCallback === 'function') {
this._readyCallback = readyCallback;
}
this.connectionAttemptScheduled = false;

@@ -74,2 +74,5 @@ this._defaultExchange = null;

this._sendBuffer = new Buffer(maxFrameBuffer);
this._blocked = false;
this._blockedReason = null;
};

@@ -208,3 +211,3 @@ util.inherits(Connection, EventEmitter);

for (var channel in self.channels) {
if (channel !== 0) {
if (channel !== '0') {
self.channels[channel].state = 'closed';

@@ -266,4 +269,3 @@ }

_.each(self.channels, function(channel, index) {
// FIXME why is the index "0" instead of 0?
if (index !== "0") channel.reconnect();
if (index !== '0') channel.reconnect();
});

@@ -413,2 +415,21 @@ }

Connection.prototype._saslResponse = function () {
var response;
if (this.options.authMechanism == 'AMQPLAIN')
response = {
LOGIN: this.options.login,
PASSWORD: this.options.password
};
else if (this.options.authMechanism == 'PLAIN')
response = "\0" + this.options.login + "\0" + this.options.password;
else if (this.options.authMechanism == 'EXTERNAL')
response = "\0";
else if (this.options.authMechanism == 'ANONYMOUS')
response = "\0";
else
response = this.options.response;
return response;
}
Connection.prototype._onMethod = function (channel, method, args) {

@@ -449,6 +470,3 @@ debug && debug(channel + " > " + method.name + " " + JSON.stringify(args));

mechanism: this.options.authMechanism,
response: {
LOGIN: this.options.login,
PASSWORD: this.options.password
},
response: this._saslResponse(),
locale: 'en_US'

@@ -464,5 +482,9 @@ });

}
if (args.channelMax) {
debug && debug("tweaking channelMax to " + args.channelMax);
channelMax = args.channelMax;
}
// 5. We respond with connectionTuneOk
this._sendMethod(0, methods.connectionTuneOk, {
channelMax: 0,
channelMax: channelMax,
frameMax: maxFrameBuffer,

@@ -507,2 +529,16 @@ heartbeat: this.options.heartbeat || 0

case methods.connectionBlocked:
debug && debug('Received connection.blocked from server with reason: ' + args.reason);
this._blocked = true;
this._blockedReason = args.reason;
this.emit('blocked');
break;
case methods.connectionUnblocked:
debug && debug('Received connection.unblocked from server');
this._blocked = false;
this._blockedReason = null;
this.emit('unblocked');
break;
default:

@@ -541,3 +577,3 @@ throw new Error("Uncaught method '" + method.name + "' with args " +

* Connect helpers
*
*
*/

@@ -550,9 +586,9 @@

if(typeof this.options.hostPreference == 'number') {
this.hosti = (this.options.hostPreference < this.options.host.length) ?
this.options.hostPreference : this.options.host.length-1;
} else {
this.hosti = (this.options.hostPreference < this.options.host.length) ?
this.options.hostPreference : this.options.host.length-1;
} else {
this.hosti = parseInt(Math.random() * this.options.host.length, 10);
}
} else {
// If this is already set, it looks like we want to choose another one.
// If this is already set, it looks like we want to choose another one.
// Add one to hosti but don't overflow it.

@@ -568,9 +604,18 @@ this.hosti = (this.hosti + 1) % this.options.host.length;

Connection.prototype._createSocket = function() {
var hostName = this._chooseHost(), self = this;
var hostName = this._chooseHost(), self = this, port = this.options.port;
var parsedHost = URL.parse(hostName);
if(parsedHost.port){
hostName = parsedHost.hostname;
port = parsedHost.port;
}
var options = {
port: this.options.port,
port: port,
host: hostName
};
// Disable tcp nagle's algo
// Default: true, makes small messages faster
var noDelay = this.options.noDelay || true;
var resetConnectionTimeout = function () {

@@ -602,2 +647,4 @@ debug && debug('connected so resetting connection timeout');

this.socket.setNoDelay(noDelay);
// Proxy events.

@@ -611,3 +658,3 @@ // Note that if we don't attach a 'data' event, no data will flow.

// Proxy a few methods that we use / previously used.
var methods = ['end', 'destroy', 'write', 'pause', 'resume', 'setEncoding', 'ref', 'unref', 'address'];
var methods = ['destroy', 'write', 'pause', 'resume', 'setEncoding', 'ref', 'unref', 'address'];
_.each(methods, function(method){

@@ -618,3 +665,20 @@ self[method] = function(){

});
};
Connection.prototype.end = function() {
if (this.socket) {
this.socket.end();
}
this.options.heartbeat = false;
if (this._inboundHeartbeatTimer !== null) {
clearTimeout(this._inboundHeartbeatTimer);
this._inboundHeartbeatTimer = null;
}
if (this._outboundHeartbeatTimer !== null) {
clearTimeout(this._outboundHeartbeatTimer);
this._outboundHeartbeatTimer = null;
}
};

@@ -625,2 +689,6 @@

this.sslConnectionOptions = {};
if (this.options.ssl.pfxFile) {
this.sslConnectionOptions.pfx = fs.readFileSync(this.options.ssl.pfxFile);
}
if (this.options.ssl.keyFile) {

@@ -635,3 +703,6 @@ this.sslConnectionOptions.key = fs.readFileSync(this.options.ssl.keyFile);

}
this.sslConnectionOptions.rejectUnauthorized = this.options.ssl.rejectUnauthorized;
this.sslConnectionOptions.passphrase = this.options.ssl.passphrase;
return this.sslConnectionOptions;

@@ -650,3 +721,3 @@ };

* Parse helpers
*
*
*/

@@ -805,3 +876,3 @@

// use values in range of 1..65535
channelId = channelId % 65535 + 1;
channelId = channelId % channelMax + 1;
if(!this.channels[channelId]){

@@ -808,0 +879,0 @@ break;

@@ -51,2 +51,8 @@ 'use strict';

if (/^$|(amq\.)/.test(this.name)) {
//If confirm mode is specified we have to set it no matter the exchange.
if (this.options.confirm) {
this.connection._sendMethod(channel, methods.confirmSelect, { noWait: false });
return;
}
this.state = 'open';

@@ -64,10 +70,29 @@ // - issue #33 fix

} else if (this.options.noDeclare) {
this.state = 'open';
if (this.options.confirm) {
this.connection._sendMethod(channel, methods.confirmSelect,
{ noWait: false });
// We should wait for the confirmSelectOk message to come back
// before we consider the exchange open and fire the open callback.
// the handler for confirmSelectOk initializes this._sequence.
// whose values are used to match published messages with their
// confirmations. If we fired the open callback here, publishes
// could happen before that intialization. If this happens,
// the first message gets a null value set as its "sequence" value
// but the ack comes back with a 1. The 1 doesn't match anything
// in the array of messages awaiting confirmation, and so the
// confirmation doesn't do anything and the callback passed into
// the publish method never fires. I've not full investigated, but
// presumably any message publishes that are performed before
// confirmSelectOk is received would have their confirmation
// behave abnormally.
} else {
this.state = 'open';
if (this._openCallback) {
this._openCallback(this);
this._openCallback = null;
if (this._openCallback) {
this._openCallback(this);
this._openCallback = null;
}
this.emit('open');
}
this.emit('open');
} else {

@@ -213,2 +238,10 @@ this.connection._sendMethod(channel, methods.exchangeDeclare,

if (this.connection._blocked) {
if (callback) {
return callback(true, new Error('Connection is blocked, server reason: ' + this.connection._blockedReason));
} else {
return;
}
}
options = _.extend({}, options || {});

@@ -215,0 +248,0 @@ options.routingKey = routingKey;

2

lib/queue.js

@@ -205,2 +205,3 @@ 'use strict';

}
this._lastMessage = null;
}

@@ -358,3 +359,2 @@ };

self.exchange.binds--;
self.exchange.cleanup();
}

@@ -361,0 +361,0 @@ self.connection._sendMethod(self.channel, methods.queueDelete,

{ "name" : "amqp"
, "description" : "AMQP driver for node"
, "keywords" : [ "amqp" ]
, "version" : "0.2.0"
, "version" : "0.2.3"
, "author" : { "name" : "Ryan Dahl" }

@@ -6,0 +6,0 @@ , "contributors" :

@@ -17,3 +17,3 @@ [![build status](https://secure.travis-ci.org/postwait/node-amqp.png)](http://travis-ci.org/postwait/node-amqp)

- [Queue](#queue)
- [connection.queue(name, options, openCallback)](#connectionqueuename-options-opencallback)
- [connection.queue(name[, options][, openCallback])](#connectionqueuename-options-opencallback)
- [queue.subscribe([options,] listener)](#queuesubscribeoptions-listener)

@@ -28,5 +28,5 @@ - [queue.subscribeRaw([options,] listener)](#queuesubscriberawoptions-listener)

- [Exchange](#exchange)
- [exchange.on('open', callback)](#exchangeon%27open%27-callback)
- [exchange.on('open', callback)](#exchangeonopen-callback)
- [connection.exchange()](#connectionexchange)
- [connection.exchange(name, options={}, openCallback)](#connectionexchangename-options=%7B%7D-opencallback)
- [connection.exchange(name, options={}, openCallback)](#connectionexchangename-options-opencallback)
- [exchange.publish(routingKey, message, options, callback)](#exchangepublishroutingkey-message-options-callback)

@@ -56,3 +56,3 @@ - [exchange.destroy(ifUnused = true)](#exchangedestroyifunused--true)

// Use the default 'amq.topic' exchange
connection.queue('my-queue', function(q){
connection.queue('my-queue', function (q) {
// Catch all messages

@@ -89,5 +89,6 @@ q.bind('#');

, password: 'guest'
, connectionTimeout: 0,
, connectionTimeout: 10000,
, authMechanism: 'AMQPLAIN'
, vhost: '/'
, noDelay: true
, ssl: { enabled : false

@@ -114,3 +115,4 @@ }

The key, certificate, and certificate authority files must be in pem format.
If `port` is not specified, the default AMQPS port 5671 is used.
Alternatively, `pfxFile` can be used to read key and certificate from a single
file. If `port` is not specified, the default AMQPS port 5671 is used.
If `rejectUnauthorized` is not specified, it defaults to true.

@@ -248,3 +250,3 @@

### connection.queue(name, options, openCallback)
### connection.queue(name[, options][, openCallback])

@@ -251,0 +253,0 @@ Returns a reference to a queue. The name parameter is required, unlike pika which defaults the name to `''`. The options are

@@ -5,3 +5,3 @@ var harness = require('./harness');

// If given one, without an index (`this.hosti`), it will randomly pick one.
options.host = [options.host,"nohost"];
options.host = [options.host,"nohost:5673"];
options.reconnect = true;

@@ -8,0 +8,0 @@

@@ -21,3 +21,3 @@ require('./harness').run();

timeout = setTimeout(function() {
puts("ERROR: Timeout occured!!!!!!!");
puts("ERROR: Timeout occurred!!!!!!!");
connection.end();

@@ -24,0 +24,0 @@ assert.ok(1, 2);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc