Socket
Socket
Sign inDemoInstall

amqplib

Package Overview
Dependencies
Maintainers
2
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqplib - npm Package Compare versions

Comparing version 0.8.0 to 0.9.0

.github/workflows/test.yml

1

bin/generate-defs.js

@@ -119,3 +119,2 @@ var FS = require('fs');

println("'use strict';"); nl();
println('var Buffer = require("safe-buffer").Buffer;');
nl()

@@ -122,0 +121,0 @@ println('var codec = require("./codec");');

# Change log for amqplib
## Changes in v0.9.0
git log v0.8.0..v0.9.0
* Update mocha and replace the deprecated istanbul with nyc ([PR
681](https://github.com/squaremo/amqp.node/pull/681)
* Update url-parse ([PR
675](https://github.com/squaremo/amqp.node/pull/675), thank you
@suhail-n and @kibertoad)
* fix: done called twice on invalid options ([PR
667](https://github.com/squaremo/amqp.node/pull/667), thank you
@luddd3 and @kibertoad)
* Close connection to server on connect errors ([PR
647](https://github.com/squaremo/amqp.node/pull/647), thank you
@luddd3 and @kibertoad)
* Modernise channel_model.js ([PR
635](https://github.com/squaremo/amqp.node/pull/635), thank you
@kibertoad and @jimmywarting)
* Bring package-lock.json up to date ([PR
653](https://github.com/squaremo/amqp.node/pull/653)
* Update url-parse ([PR
652](https://github.com/squaremo/amqp.node/pull/652), thank you
@giorgioatanasov and @buffolander)
* Modernise channel_model.js ([PR
651](https://github.com/squaremo/amqp.node/pull/651), thank you
for the review @kibertoad)
* Modernise bitset.js ([PR
634](https://github.com/squaremo/amqp.node/pull/634), thank you
@kibertoad and @jimmywarting)
* :warning: Drop CI for node versions below 10 ([PR
631](https://github.com/squaremo/amqp.node/pull/631), thank you
for the review @kibertoad)
* Replace safe-buffer dependency with native buffers ([PR
628](https://github.com/squaremo/amqp.node/pull/628), thank you
@kibertoad and @jimmywarting)
## Changes in v0.8.0

@@ -4,0 +40,0 @@

@@ -88,7 +88,7 @@ # RabbitMQ tutorials

[rabbitmq-tutes]: http://github.com/rabbitmq/rabbitmq-tutorials
[tute-one]: http://www.rabbitmq.com/tutorials/tutorial-one-python.html
[tute-two]: http://www.rabbitmq.com/tutorials/tutorial-two-python.html
[tute-three]: http://www.rabbitmq.com/tutorials/tutorial-three-python.html
[tute-four]: http://www.rabbitmq.com/tutorials/tutorial-four-python.html
[tute-five]: http://www.rabbitmq.com/tutorials/tutorial-five-python.html
[tute-six]: http://www.rabbitmq.com/tutorials/tutorial-six-python.html
[tute-one]: http://www.rabbitmq.com/tutorials/tutorial-one-javascript.html
[tute-two]: http://www.rabbitmq.com/tutorials/tutorial-two-javascript.html
[tute-three]: http://www.rabbitmq.com/tutorials/tutorial-three-javascript.html
[tute-four]: http://www.rabbitmq.com/tutorials/tutorial-four-javascript.html
[tute-five]: http://www.rabbitmq.com/tutorials/tutorial-five-javascript.html
[tute-six]: http://www.rabbitmq.com/tutorials/tutorial-six-javascript.html

@@ -5,57 +5,114 @@ //

// A bitset implementation, after that in java.util. Yes there
// already exist such things, but none implement next{Clear|Set}Bit or
// equivalent, and none involved me tooling about for an evening.
'use strict';
function BitSet(size) {
if (size) {
var numWords = Math.ceil(size / 32);
this.words = new Array(numWords);
/**
* A bitset implementation, after that in java.util. Yes there
* already exist such things, but none implement next{Clear|Set}Bit or
* equivalent, and none involved me tooling about for an evening.
*/
class BitSet {
/**
* @param {number} [size]
*/
constructor(size) {
if (size) {
const numWords = Math.ceil(size / 32);
this.words = new Array(numWords);
}
else {
this.words = [];
}
this.wordsInUse = 0; // = number, not index
}
else {
this.words = [];
/**
* @param {number} numWords
*/
ensureSize(numWords) {
const wordsPresent = this.words.length;
if (wordsPresent < numWords) {
this.words = this.words.concat(new Array(numWords - wordsPresent));
}
}
this.wordsInUse = 0; // = number, not index
}
var P = BitSet.prototype;
/**
* @param {number} bitIndex
*/
set(bitIndex) {
const w = wordIndex(bitIndex);
if (w >= this.wordsInUse) {
this.ensureSize(w + 1);
this.wordsInUse = w + 1;
}
const bit = 1 << bitIndex;
this.words[w] |= bit;
}
function wordIndex(bitIndex) {
return Math.floor(bitIndex / 32);
}
/**
* @param {number} bitIndex
*/
clear(bitIndex) {
const w = wordIndex(bitIndex);
if (w >= this.wordsInUse) return;
const mask = ~(1 << bitIndex);
this.words[w] &= mask;
}
// Make sure we have at least numWords
P.ensureSize = function(numWords) {
var wordsPresent = this.words.length;
if (wordsPresent < numWords) {
this.words = this.words.concat(new Array(numWords - wordsPresent));
/**
* @param {number} bitIndex
*/
get(bitIndex) {
const w = wordIndex(bitIndex);
if (w >= this.wordsInUse) return false; // >= since index vs size
const bit = 1 << bitIndex;
return !!(this.words[w] & bit);
}
}
P.set = function(bitIndex) {
var w = wordIndex(bitIndex);
if (w >= this.wordsInUse) {
this.ensureSize(w + 1);
this.wordsInUse = w + 1;
/**
* Give the next bit that is set on or after fromIndex, or -1 if no such bit
*
* @param {number} fromIndex
*/
nextSetBit(fromIndex) {
let w = wordIndex(fromIndex);
if (w >= this.wordsInUse) return -1;
// the right-hand side is shifted to only test the bits of the first
// word that are > fromIndex
let word = this.words[w] & (0xffffffff << fromIndex);
while (true) {
if (word) return (w * 32) + trailingZeros(word);
w++;
if (w === this.wordsInUse) return -1;
word = this.words[w];
}
}
var bit = 1 << bitIndex;
this.words[w] |= bit;
};
P.clear = function(bitIndex) {
var w = wordIndex(bitIndex);
if (w >= this.wordsInUse) return;
var mask = ~(1 << bitIndex);
this.words[w] &= mask;
};
/**
* @param {number} fromIndex
*/
nextClearBit(fromIndex) {
let w = wordIndex(fromIndex);
if (w >= this.wordsInUse) return fromIndex;
P.get = function(bitIndex) {
var w = wordIndex(bitIndex);
if (w >= this.wordsInUse) return false; // >= since index vs size
var bit = 1 << bitIndex;
return !!(this.words[w] & bit);
let word = ~(this.words[w]) & (0xffffffff << fromIndex);
while (true) {
if (word) return (w * 32) + trailingZeros(word);
w++;
if (w == this.wordsInUse) return w * 32;
word = ~(this.words[w]);
}
}
}
/**
* @param {number} bitIndex
*/
function wordIndex(bitIndex) {
return Math.floor(bitIndex / 32);
}
/**
* @param {number} i
*/
function trailingZeros(i) {

@@ -66,3 +123,3 @@ // From Hacker's Delight, via JDK. Probably far less effective here,

if (i === 0) return 32;
var y, n = 31;
let y, n = 31;
y = i << 16; if (y != 0) { n = n -16; i = y; }

@@ -75,32 +132,2 @@ y = i << 8; if (y != 0) { n = n - 8; i = y; }

// Give the next bit that's set on or after fromIndex, or -1 if no such
// bit
P.nextSetBit = function(fromIndex) {
var w = wordIndex(fromIndex);
if (w >= this.wordsInUse) return -1;
// the right-hand side is shifted to only test the bits of the first
// word that are > fromIndex
var word = this.words[w] & (0xffffffff << fromIndex);
while (true) {
if (word) return (w * 32) + trailingZeros(word);
w++;
if (w === this.wordsInUse) return -1;
word = this.words[w];
}
};
P.nextClearBit = function(fromIndex) {
var w = wordIndex(fromIndex);
if (w >= this.wordsInUse) return fromIndex;
var word = ~(this.words[w]) & (0xffffffff << fromIndex);
while (true) {
if (word) return (w * 32) + trailingZeros(word);
w++;
if (w == this.wordsInUse) return w * 32;
word = ~(this.words[w]);
}
};
module.exports.BitSet = BitSet;

@@ -7,244 +7,249 @@ //

var defs = require('./defs');
var Promise = require('bluebird');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var BaseChannel = require('./channel').BaseChannel;
var acceptMessage = require('./channel').acceptMessage;
var Args = require('./api_args');
const EventEmitter = require('events');
const Promise = require('bluebird');
const defs = require('./defs');
const {BaseChannel} = require('./channel');
const {acceptMessage} = require('./channel');
const Args = require('./api_args');
const {inspect} = require('./format');
function ChannelModel(connection) {
if (!(this instanceof ChannelModel))
return new ChannelModel(connection);
EventEmitter.call( this );
this.connection = connection;
var self = this;
['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) {
connection.on(ev, self.emit.bind(self, ev));
});
}
inherits(ChannelModel, EventEmitter);
class ChannelModel extends EventEmitter {
constructor(connection) {
super();
this.connection = connection;
module.exports.ChannelModel = ChannelModel;
['error', 'close', 'blocked', 'unblocked'].forEach(ev => {
connection.on(ev, this.emit.bind(this, ev));
});
}
var CM = ChannelModel.prototype;
close() {
return Promise.fromCallback(this.connection.close.bind(this.connection));
}
CM.close = function() {
return Promise.fromCallback(this.connection.close.bind(this.connection));
};
async createChannel() {
const channel = new Channel(this.connection);
await channel.open();
return channel;
}
// Channels
function Channel(connection) {
BaseChannel.call(this, connection);
this.on('delivery', this.handleDelivery.bind(this));
this.on('cancel', this.handleCancel.bind(this));
async createConfirmChannel() {
const channel = new ConfirmChannel(this.connection);
await channel.open();
await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk);
return channel;
}
}
inherits(Channel, BaseChannel);
module.exports.Channel = Channel;
// Channels
CM.createChannel = function() {
var c = new Channel(this.connection);
return c.open().then(function(openOk) { return c; });
};
class Channel extends BaseChannel {
constructor(connection) {
super(connection);
this.on('delivery', this.handleDelivery.bind(this));
this.on('cancel', this.handleCancel.bind(this));
}
var C = Channel.prototype;
// An RPC that returns a 'proper' promise, which resolves to just the
// response's fields; this is intended to be suitable for implementing
// API procedures.
async rpc(method, fields, expect) {
const f = await Promise.fromCallback(cb => {
return this._rpc(method, fields, expect, cb);
})
// An RPC that returns a 'proper' promise, which resolves to just the
// response's fields; this is intended to be suitable for implementing
// API procedures.
C.rpc = function(method, fields, expect) {
var self = this;
return Promise.fromCallback(function(cb) {
return self._rpc(method, fields, expect, cb);
})
.then(function(f) {
return f.fields;
});
};
}
// Do the remarkably simple channel open handshake
C.open = function() {
return Promise.try(this.allocate.bind(this)).then(
function(ch) {
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
defs.ChannelOpenOk);
// Do the remarkably simple channel open handshake
open() {
return Promise.try(this.allocate.bind(this)).then(
ch => {
return ch.rpc(defs.ChannelOpen, {outOfBand: ""},
defs.ChannelOpenOk);
});
}
close() {
return Promise.fromCallback(cb => {
return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
cb);
});
};
}
C.close = function() {
var self = this;
return Promise.fromCallback(function(cb) {
return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
cb);
});
};
// === Public API, declaring queues and stuff ===
// === Public API, declaring queues and stuff ===
assertQueue(queue, options) {
return this.rpc(defs.QueueDeclare,
Args.assertQueue(queue, options),
defs.QueueDeclareOk);
}
C.assertQueue = function(queue, options) {
return this.rpc(defs.QueueDeclare,
Args.assertQueue(queue, options),
defs.QueueDeclareOk);
};
checkQueue(queue) {
return this.rpc(defs.QueueDeclare,
Args.checkQueue(queue),
defs.QueueDeclareOk);
}
C.checkQueue = function(queue) {
return this.rpc(defs.QueueDeclare,
Args.checkQueue(queue),
defs.QueueDeclareOk);
};
deleteQueue(queue, options) {
return this.rpc(defs.QueueDelete,
Args.deleteQueue(queue, options),
defs.QueueDeleteOk);
}
C.deleteQueue = function(queue, options) {
return this.rpc(defs.QueueDelete,
Args.deleteQueue(queue, options),
defs.QueueDeleteOk);
};
purgeQueue(queue) {
return this.rpc(defs.QueuePurge,
Args.purgeQueue(queue),
defs.QueuePurgeOk);
}
C.purgeQueue = function(queue) {
return this.rpc(defs.QueuePurge,
Args.purgeQueue(queue),
defs.QueuePurgeOk);
};
bindQueue(queue, source, pattern, argt) {
return this.rpc(defs.QueueBind,
Args.bindQueue(queue, source, pattern, argt),
defs.QueueBindOk);
}
C.bindQueue = function(queue, source, pattern, argt) {
return this.rpc(defs.QueueBind,
Args.bindQueue(queue, source, pattern, argt),
defs.QueueBindOk);
};
unbindQueue(queue, source, pattern, argt) {
return this.rpc(defs.QueueUnbind,
Args.unbindQueue(queue, source, pattern, argt),
defs.QueueUnbindOk);
}
C.unbindQueue = function(queue, source, pattern, argt) {
return this.rpc(defs.QueueUnbind,
Args.unbindQueue(queue, source, pattern, argt),
defs.QueueUnbindOk);
};
assertExchange(exchange, type, options) {
// The server reply is an empty set of fields, but it's convenient
// to have the exchange name handed to the continuation.
return this.rpc(defs.ExchangeDeclare,
Args.assertExchange(exchange, type, options),
defs.ExchangeDeclareOk)
.then(_ok => { return { exchange }; });
}
C.assertExchange = function(exchange, type, options) {
// The server reply is an empty set of fields, but it's convenient
// to have the exchange name handed to the continuation.
return this.rpc(defs.ExchangeDeclare,
Args.assertExchange(exchange, type, options),
defs.ExchangeDeclareOk)
.then(function(_ok) { return { exchange: exchange }; });
};
checkExchange(exchange) {
return this.rpc(defs.ExchangeDeclare,
Args.checkExchange(exchange),
defs.ExchangeDeclareOk);
}
C.checkExchange = function(exchange) {
return this.rpc(defs.ExchangeDeclare,
Args.checkExchange(exchange),
defs.ExchangeDeclareOk);
};
deleteExchange(name, options) {
return this.rpc(defs.ExchangeDelete,
Args.deleteExchange(name, options),
defs.ExchangeDeleteOk);
}
C.deleteExchange = function(name, options) {
return this.rpc(defs.ExchangeDelete,
Args.deleteExchange(name, options),
defs.ExchangeDeleteOk);
};
bindExchange(dest, source, pattern, argt) {
return this.rpc(defs.ExchangeBind,
Args.bindExchange(dest, source, pattern, argt),
defs.ExchangeBindOk);
}
C.bindExchange = function(dest, source, pattern, argt) {
return this.rpc(defs.ExchangeBind,
Args.bindExchange(dest, source, pattern, argt),
defs.ExchangeBindOk);
};
unbindExchange(dest, source, pattern, argt) {
return this.rpc(defs.ExchangeUnbind,
Args.unbindExchange(dest, source, pattern, argt),
defs.ExchangeUnbindOk);
}
C.unbindExchange = function(dest, source, pattern, argt) {
return this.rpc(defs.ExchangeUnbind,
Args.unbindExchange(dest, source, pattern, argt),
defs.ExchangeUnbindOk);
};
// Working with messages
// Working with messages
publish(exchange, routingKey, content, options) {
const fieldsAndProps = Args.publish(exchange, routingKey, options);
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
}
C.publish = function(exchange, routingKey, content, options) {
var fieldsAndProps = Args.publish(exchange, routingKey, options);
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
};
sendToQueue(queue, content, options) {
return this.publish('', queue, content, options);
}
C.sendToQueue = function(queue, content, options) {
return this.publish('', queue, content, options);
};
consume(queue, callback, options) {
// NB we want the callback to be run synchronously, so that we've
// registered the consumerTag before any messages can arrive.
const fields = Args.consume(queue, options);
return Promise.fromCallback(cb => {
this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
})
.then(ok => {
this.registerConsumer(ok.fields.consumerTag, callback);
return ok.fields;
});
}
C.consume = function(queue, callback, options) {
var self = this;
// NB we want the callback to be run synchronously, so that we've
// registered the consumerTag before any messages can arrive.
var fields = Args.consume(queue, options);
return Promise.fromCallback(function(cb) {
self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb);
})
.then(function(ok) {
self.registerConsumer(ok.fields.consumerTag, callback);
return ok.fields;
});
};
async cancel(consumerTag) {
const ok = await Promise.fromCallback(cb => {
this._rpc(defs.BasicCancel, Args.cancel(consumerTag),
defs.BasicCancelOk,
cb);
})
.then(ok => {
this.unregisterConsumer(consumerTag);
return ok.fields;
});
}
C.cancel = function(consumerTag) {
var self = this;
return Promise.fromCallback(function(cb) {
self._rpc(defs.BasicCancel, Args.cancel(consumerTag),
defs.BasicCancelOk,
cb);
})
.then(function(ok) {
self.unregisterConsumer(consumerTag);
return ok.fields;
});
};
C.get = function(queue, options) {
var self = this;
var fields = Args.get(queue, options);
return Promise.fromCallback(function(cb) {
return self.sendOrEnqueue(defs.BasicGet, fields, cb);
})
.then(function(f) {
if (f.id === defs.BasicGetEmpty) {
return false;
}
else if (f.id === defs.BasicGetOk) {
var fields = f.fields;
return new Promise(function(resolve) {
self.handleMessage = acceptMessage(function(m) {
m.fields = fields;
resolve(m);
get(queue, options) {
const fields = Args.get(queue, options);
return Promise.fromCallback(cb => {
return this.sendOrEnqueue(defs.BasicGet, fields, cb);
})
.then(f => {
if (f.id === defs.BasicGetEmpty) {
return false;
}
else if (f.id === defs.BasicGetOk) {
const fields = f.fields;
return new Promise(resolve => {
this.handleMessage = acceptMessage(m => {
m.fields = fields;
resolve(m);
});
});
});
}
else {
throw new Error("Unexpected response to BasicGet: " +
inspect(f));
}
})
};
}
else {
throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`);
}
});
}
C.ack = function(message, allUpTo) {
this.sendImmediately(
defs.BasicAck,
Args.ack(message.fields.deliveryTag, allUpTo));
};
ack(message, allUpTo) {
this.sendImmediately(
defs.BasicAck,
Args.ack(message.fields.deliveryTag, allUpTo));
}
C.ackAll = function() {
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
};
ackAll() {
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
}
C.nack = function(message, allUpTo, requeue) {
this.sendImmediately(
defs.BasicNack,
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
};
nack(message, allUpTo, requeue) {
this.sendImmediately(
defs.BasicNack,
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
}
C.nackAll = function(requeue) {
this.sendImmediately(defs.BasicNack,
Args.nack(0, true, requeue));
};
nackAll(requeue) {
this.sendImmediately(defs.BasicNack,
Args.nack(0, true, requeue));
}
// `Basic.Nack` is not available in older RabbitMQ versions (or in the
// AMQP specification), so you have to use the one-at-a-time
// `Basic.Reject`. This is otherwise synonymous with
// `#nack(message, false, requeue)`.
C.reject = function(message, requeue) {
this.sendImmediately(
defs.BasicReject,
Args.reject(message.fields.deliveryTag, requeue));
};
// `Basic.Nack` is not available in older RabbitMQ versions (or in the
// AMQP specification), so you have to use the one-at-a-time
// `Basic.Reject`. This is otherwise synonymous with
// `#nack(message, false, requeue)`.
reject(message, requeue) {
this.sendImmediately(
defs.BasicReject,
Args.reject(message.fields.deliveryTag, requeue));
}
recover() {
return this.rpc(defs.BasicRecover,
Args.recover(),
defs.BasicRecoverOk);
}
qos(count, global) {
return this.rpc(defs.BasicQos,
Args.prefetch(count, global),
defs.BasicQosOk);
}
}
// There are more options in AMQP than exposed here; RabbitMQ only

@@ -255,14 +260,4 @@ // implements prefetch based on message count, and only for individual

// and prefetch with `global` set as per-channel.
C.prefetch = C.qos = function(count, global) {
return this.rpc(defs.BasicQos,
Args.prefetch(count, global),
defs.BasicQosOk);
};
Channel.prototype.prefetch = Channel.prototype.qos
C.recover = function() {
return this.rpc(defs.BasicRecover,
Args.recover(),
defs.BasicRecoverOk);
};
// Confirm channel. This is a channel with confirms 'switched on',

@@ -275,47 +270,33 @@ // meaning sent messages will provoke a responding 'ack' or 'nack'

function ConfirmChannel(connection) {
Channel.call(this, connection);
class ConfirmChannel extends Channel {
publish(exchange, routingKey, content, options, cb) {
this.pushConfirmCallback(cb);
return Channel.prototype.publish.call(this, exchange, routingKey, content, options);
}
sendToQueue(queue, content, options, cb) {
return this.publish('', queue, content, options, cb);
}
waitForConfirms() {
const awaiting = [];
const unconfirmed = this.unconfirmed;
unconfirmed.forEach((val, index) => {
if (val !== null) {
const confirmed = new Promise((resolve, reject) => {
unconfirmed[index] = err => {
if (val) val(err);
if (err === null) resolve();
else reject(err);
};
});
awaiting.push(confirmed);
}
});
return Promise.all(awaiting);
}
}
inherits(ConfirmChannel, Channel);
module.exports.ConfirmChannel = ConfirmChannel;
CM.createConfirmChannel = function() {
var c = new ConfirmChannel(this.connection);
return c.open()
.then(function(openOk) {
return c.rpc(defs.ConfirmSelect, {nowait: false},
defs.ConfirmSelectOk)
})
.then(function() { return c; });
};
var CC = ConfirmChannel.prototype;
CC.publish = function(exchange, routingKey, content, options, cb) {
this.pushConfirmCallback(cb);
return C.publish.call(this, exchange, routingKey, content, options);
};
CC.sendToQueue = function(queue, content, options, cb) {
return this.publish('', queue, content, options, cb);
};
CC.waitForConfirms = function() {
var awaiting = [];
var unconfirmed = this.unconfirmed;
unconfirmed.forEach(function(val, index) {
if (val === null); // already confirmed
else {
var confirmed = new Promise(function(resolve, reject) {
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) resolve();
else reject(err);
};
});
awaiting.push(confirmed);
}
});
return Promise.all(awaiting);
};
module.exports.Channel = Channel;
module.exports.ChannelModel = ChannelModel;

@@ -19,3 +19,2 @@ //

var stackCapture = require('./error').stackCapture;
var Buffer = require('safe-buffer').Buffer
function Channel(connection) {

@@ -22,0 +21,0 @@ EventEmitter.call( this );

@@ -155,4 +155,8 @@ //

openCallback(null, c);
} else {
// The connection isn't closed by the server on e.g. wrong password
sock.end();
sock.destroy();
openCallback(err);
}
else openCallback(err);
});

@@ -159,0 +163,0 @@ }

@@ -12,3 +12,2 @@ //

var Mux = require('./mux').Mux;
var Buffer = require('safe-buffer').Buffer

@@ -183,4 +182,3 @@ var Duplex =

// options; e.g., something is a string instead of a number.
try { self.sendMethod(0, Method, tunedOptions); }
catch (err) { bail(err); }
self.sendMethod(0, Method, tunedOptions);
}

@@ -211,3 +209,8 @@

self.serverProperties = start.fields.serverProperties;
send(defs.ConnectionStartOk);
try {
send(defs.ConnectionStartOk);
} catch (err) {
bail(err);
return;
}
wait(afterStartOk);

@@ -234,4 +237,9 @@ }

negotiate(fields.heartbeat, allFields.heartbeat);
send(defs.ConnectionTuneOk);
send(defs.ConnectionOpen);
try {
send(defs.ConnectionTuneOk);
send(defs.ConnectionOpen);
} catch (err) {
bail(err);
return;
}
expect(defs.ConnectionOpenOk, onOpenOk);

@@ -238,0 +246,0 @@ break;

@@ -11,3 +11,2 @@ //

// context, i.e., your SSL certificate)
var Buffer = require('safe-buffer').Buffer
var codec = require('./codec')

@@ -14,0 +13,0 @@

@@ -10,3 +10,2 @@ // The river sweeps through

var decode = defs.decode;
var Buffer = require('safe-buffer').Buffer

@@ -13,0 +12,0 @@ var Bits = require('bitsyntax');

@@ -1,2 +0,1 @@

{

@@ -6,3 +5,3 @@ "name": "amqplib",

"main": "./channel_api.js",
"version": "0.8.0",
"version": "0.9.0",
"description": "An AMQP 0-9-1 (e.g., RabbitMQ) library and client.",

@@ -21,9 +20,8 @@ "repository": {

"readable-stream": "1.x >=1.1.9",
"safe-buffer": "~5.2.1",
"url-parse": "~1.5.1"
"url-parse": "~1.5.10"
},
"devDependencies": {
"claire": "0.4.1",
"istanbul": "0.1.x",
"mocha": "^3.5.3",
"mocha": "^9.2.2",
"nyc": "^15.1.0",
"uglify-js": "2.8.x"

@@ -30,0 +28,0 @@ },

'use strict';
var claire = require('claire');
const claire = require('claire');
const {BitSet} = require('../lib/bitset');
var forAll = claire.forAll,
arb = claire.data,
label = claire.label,
choice = claire.choice,
transform = claire.transform;
const {
forAll,
data: arb,
label,
choice,
transform
} = claire;
var BitSet = require('../lib/bitset').BitSet;
var PosInt = transform(Math.floor, arb.Positive);
const PosInt = transform(Math.floor, arb.Positive);
var EmptyBitSet = label('bitset', transform(
function(size) {
const EmptyBitSet = label('bitset', transform(
size => {
return new BitSet(size);

@@ -20,12 +22,12 @@ },

suite('BitSet', function() {
suite('BitSet', () => {
test('get bit', forAll(EmptyBitSet, PosInt)
.satisfy(function(b, bit) {
.satisfy((b, bit) => {
b.set(bit);
return b.get(bit);
}).asTest());
test('clear bit', forAll(EmptyBitSet, PosInt)
.satisfy(function(b, bit) {
.satisfy((b, bit) => {
b.set(bit);

@@ -37,3 +39,3 @@ b.clear(bit);

test('next set of empty', forAll(EmptyBitSet)
.satisfy(function(b) {
.satisfy(b => {
return b.nextSetBit(0) === -1;

@@ -43,3 +45,3 @@ }).asTest());

test('next set of one bit', forAll(EmptyBitSet, PosInt)
.satisfy(function(b, bit) {
.satisfy((b, bit) => {
b.set(bit);

@@ -50,3 +52,3 @@ return b.nextSetBit(0) === bit;

test('next set same bit', forAll(EmptyBitSet, PosInt)
.satisfy(function(b, bit) {
.satisfy((b, bit) => {
b.set(bit);

@@ -57,3 +59,3 @@ return b.nextSetBit(bit) === bit;

test('next set following bit', forAll(EmptyBitSet, PosInt)
.satisfy(function(b, bit) {
.satisfy((b, bit) => {
b.set(bit);

@@ -64,11 +66,10 @@ return b.nextSetBit(bit+1) === -1;

test('next clear of empty', forAll(EmptyBitSet, PosInt)
.satisfy(function(b, bit) { return b.nextClearBit(bit) === bit; })
.satisfy((b, bit) => { return b.nextClearBit(bit) === bit; })
.asTest());
test('next clear of one set', forAll(EmptyBitSet, PosInt)
.satisfy(function(b, bit) {
.satisfy((b, bit) => {
b.set(bit);
return b.nextClearBit(bit) === bit + 1;
}).asTest());
});

@@ -11,3 +11,2 @@ 'use strict';

var domain = require('domain');
var Buffer = require('safe-buffer').Buffer;

@@ -14,0 +13,0 @@ var URL = process.env.URL || 'amqp://localhost';

@@ -10,3 +10,2 @@ 'use strict';

var Promise = require('bluebird');
var Buffer = require('safe-buffer').Buffer;

@@ -13,0 +12,0 @@ var URL = process.env.URL || 'amqp://localhost';

@@ -15,3 +15,2 @@ // Test the channel machinery

var OPEN_OPTS = require('./connection').OPEN_OPTS;
var Buffer = require('safe-buffer').Buffer;

@@ -18,0 +17,0 @@ var LOG_ERRORS = process.env.LOG_ERRORS;

@@ -7,3 +7,2 @@ 'use strict';

var ints = require('buffer-more-ints');
var Buffer = require('safe-buffer').Buffer
var C = require('claire');

@@ -10,0 +9,0 @@ var forAll = C.forAll;

'use strict';
var connect = require('../lib/connect').connect;
var Buffer = require('safe-buffer').Buffer
var credentialsFromUrl = require('../lib/connect').credentialsFromUrl;
var defs = require('../lib/defs');
var assert = require('assert');
var util = require('./util');
var net = require('net');
var fail = util.fail, succeed = util.succeed,
var fail = util.fail, succeed = util.succeed, latch = util.latch,
kCallback = util.kCallback,

@@ -151,3 +151,48 @@ succeedIfAttributeEquals = util.succeedIfAttributeEquals;

});
});
suite('Errors on connect', function() {
var server
teardown(function() {
if (server) {
server.close();
}
})
test("closes underlying connection on authentication error", function(done) {
var bothDone = latch(2, done);
server = net.createServer(function(socket) {
socket.once('data', function(protocolHeader) {
assert.deepStrictEqual(
protocolHeader,
Buffer.from("AMQP" + String.fromCharCode(0,0,9,1))
);
util.runServer(socket, function(send, wait) {
send(defs.ConnectionStart,
{versionMajor: 0,
versionMinor: 9,
serverProperties: {},
mechanisms: Buffer.from('PLAIN'),
locales: Buffer.from('en_US')});
wait(defs.ConnectionStartOk)().then(function() {
send(defs.ConnectionClose,
{replyCode: 403,
replyText: 'ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN',
classId: 0,
methodId: 0});
});
});
});
// Wait for the connection to be closed after the authentication error
socket.once('end', function() {
bothDone();
});
}).listen(0);
connect('amqp://localhost:' + server.address().port, {}, function(err) {
if (!err) bothDone(new Error('Expected authentication error'));
bothDone();
});
});
});

@@ -5,3 +5,2 @@ 'use strict';

var defs = require('../lib/defs');
var Buffer = require('safe-buffer').Buffer;
var Connection = require('../lib/connection').Connection;

@@ -8,0 +7,0 @@ var HEARTBEAT = require('../lib/frame').HEARTBEAT;

@@ -6,3 +6,2 @@ // Property-based testing representations of various things in AMQP

var C = require('claire');
var Buffer = require('safe-buffer').Buffer;
var forAll = C.forAll;

@@ -9,0 +8,0 @@ var arb = C.data;

@@ -6,3 +6,2 @@ 'use strict';

var fail = require('./util').fail;
var Buffer = require('safe-buffer').Buffer;
var connection = require('../lib/connection');

@@ -31,3 +30,3 @@ var Frames = connection.Connection;

suite("Explicit parsing", function() {
test('Parse heartbeat', function() {

@@ -122,3 +121,3 @@ var input = inputs();

};
t.forEach(function(f) {

@@ -125,0 +124,0 @@ f.channel = 0;

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc