Comparing version 0.8.0 to 0.9.0
@@ -119,3 +119,2 @@ var FS = require('fs'); | ||
println("'use strict';"); nl(); | ||
println('var Buffer = require("safe-buffer").Buffer;'); | ||
nl() | ||
@@ -122,0 +121,0 @@ println('var codec = require("./codec");'); |
# Change log for amqplib | ||
## Changes in v0.9.0 | ||
git log v0.8.0..v0.9.0 | ||
* Update mocha and replace the deprecated istanbul with nyc ([PR | ||
681](https://github.com/squaremo/amqp.node/pull/681) | ||
* Update url-parse ([PR | ||
675](https://github.com/squaremo/amqp.node/pull/675), thank you | ||
@suhail-n and @kibertoad) | ||
* fix: done called twice on invalid options ([PR | ||
667](https://github.com/squaremo/amqp.node/pull/667), thank you | ||
@luddd3 and @kibertoad) | ||
* Close connection to server on connect errors ([PR | ||
647](https://github.com/squaremo/amqp.node/pull/647), thank you | ||
@luddd3 and @kibertoad) | ||
* Modernise channel_model.js ([PR | ||
635](https://github.com/squaremo/amqp.node/pull/635), thank you | ||
@kibertoad and @jimmywarting) | ||
* Bring package-lock.json up to date ([PR | ||
653](https://github.com/squaremo/amqp.node/pull/653) | ||
* Update url-parse ([PR | ||
652](https://github.com/squaremo/amqp.node/pull/652), thank you | ||
@giorgioatanasov and @buffolander) | ||
* Modernise channel_model.js ([PR | ||
651](https://github.com/squaremo/amqp.node/pull/651), thank you | ||
for the review @kibertoad) | ||
* Modernise bitset.js ([PR | ||
634](https://github.com/squaremo/amqp.node/pull/634), thank you | ||
@kibertoad and @jimmywarting) | ||
* :warning: Drop CI for node versions below 10 ([PR | ||
631](https://github.com/squaremo/amqp.node/pull/631), thank you | ||
for the review @kibertoad) | ||
* Replace safe-buffer dependency with native buffers ([PR | ||
628](https://github.com/squaremo/amqp.node/pull/628), thank you | ||
@kibertoad and @jimmywarting) | ||
## Changes in v0.8.0 | ||
@@ -4,0 +40,0 @@ |
@@ -88,7 +88,7 @@ # RabbitMQ tutorials | ||
[rabbitmq-tutes]: http://github.com/rabbitmq/rabbitmq-tutorials | ||
[tute-one]: http://www.rabbitmq.com/tutorials/tutorial-one-python.html | ||
[tute-two]: http://www.rabbitmq.com/tutorials/tutorial-two-python.html | ||
[tute-three]: http://www.rabbitmq.com/tutorials/tutorial-three-python.html | ||
[tute-four]: http://www.rabbitmq.com/tutorials/tutorial-four-python.html | ||
[tute-five]: http://www.rabbitmq.com/tutorials/tutorial-five-python.html | ||
[tute-six]: http://www.rabbitmq.com/tutorials/tutorial-six-python.html | ||
[tute-one]: http://www.rabbitmq.com/tutorials/tutorial-one-javascript.html | ||
[tute-two]: http://www.rabbitmq.com/tutorials/tutorial-two-javascript.html | ||
[tute-three]: http://www.rabbitmq.com/tutorials/tutorial-three-javascript.html | ||
[tute-four]: http://www.rabbitmq.com/tutorials/tutorial-four-javascript.html | ||
[tute-five]: http://www.rabbitmq.com/tutorials/tutorial-five-javascript.html | ||
[tute-six]: http://www.rabbitmq.com/tutorials/tutorial-six-javascript.html |
@@ -5,57 +5,114 @@ // | ||
// A bitset implementation, after that in java.util. Yes there | ||
// already exist such things, but none implement next{Clear|Set}Bit or | ||
// equivalent, and none involved me tooling about for an evening. | ||
'use strict'; | ||
function BitSet(size) { | ||
if (size) { | ||
var numWords = Math.ceil(size / 32); | ||
this.words = new Array(numWords); | ||
/** | ||
* A bitset implementation, after that in java.util. Yes there | ||
* already exist such things, but none implement next{Clear|Set}Bit or | ||
* equivalent, and none involved me tooling about for an evening. | ||
*/ | ||
class BitSet { | ||
/** | ||
* @param {number} [size] | ||
*/ | ||
constructor(size) { | ||
if (size) { | ||
const numWords = Math.ceil(size / 32); | ||
this.words = new Array(numWords); | ||
} | ||
else { | ||
this.words = []; | ||
} | ||
this.wordsInUse = 0; // = number, not index | ||
} | ||
else { | ||
this.words = []; | ||
/** | ||
* @param {number} numWords | ||
*/ | ||
ensureSize(numWords) { | ||
const wordsPresent = this.words.length; | ||
if (wordsPresent < numWords) { | ||
this.words = this.words.concat(new Array(numWords - wordsPresent)); | ||
} | ||
} | ||
this.wordsInUse = 0; // = number, not index | ||
} | ||
var P = BitSet.prototype; | ||
/** | ||
* @param {number} bitIndex | ||
*/ | ||
set(bitIndex) { | ||
const w = wordIndex(bitIndex); | ||
if (w >= this.wordsInUse) { | ||
this.ensureSize(w + 1); | ||
this.wordsInUse = w + 1; | ||
} | ||
const bit = 1 << bitIndex; | ||
this.words[w] |= bit; | ||
} | ||
function wordIndex(bitIndex) { | ||
return Math.floor(bitIndex / 32); | ||
} | ||
/** | ||
* @param {number} bitIndex | ||
*/ | ||
clear(bitIndex) { | ||
const w = wordIndex(bitIndex); | ||
if (w >= this.wordsInUse) return; | ||
const mask = ~(1 << bitIndex); | ||
this.words[w] &= mask; | ||
} | ||
// Make sure we have at least numWords | ||
P.ensureSize = function(numWords) { | ||
var wordsPresent = this.words.length; | ||
if (wordsPresent < numWords) { | ||
this.words = this.words.concat(new Array(numWords - wordsPresent)); | ||
/** | ||
* @param {number} bitIndex | ||
*/ | ||
get(bitIndex) { | ||
const w = wordIndex(bitIndex); | ||
if (w >= this.wordsInUse) return false; // >= since index vs size | ||
const bit = 1 << bitIndex; | ||
return !!(this.words[w] & bit); | ||
} | ||
} | ||
P.set = function(bitIndex) { | ||
var w = wordIndex(bitIndex); | ||
if (w >= this.wordsInUse) { | ||
this.ensureSize(w + 1); | ||
this.wordsInUse = w + 1; | ||
/** | ||
* Give the next bit that is set on or after fromIndex, or -1 if no such bit | ||
* | ||
* @param {number} fromIndex | ||
*/ | ||
nextSetBit(fromIndex) { | ||
let w = wordIndex(fromIndex); | ||
if (w >= this.wordsInUse) return -1; | ||
// the right-hand side is shifted to only test the bits of the first | ||
// word that are > fromIndex | ||
let word = this.words[w] & (0xffffffff << fromIndex); | ||
while (true) { | ||
if (word) return (w * 32) + trailingZeros(word); | ||
w++; | ||
if (w === this.wordsInUse) return -1; | ||
word = this.words[w]; | ||
} | ||
} | ||
var bit = 1 << bitIndex; | ||
this.words[w] |= bit; | ||
}; | ||
P.clear = function(bitIndex) { | ||
var w = wordIndex(bitIndex); | ||
if (w >= this.wordsInUse) return; | ||
var mask = ~(1 << bitIndex); | ||
this.words[w] &= mask; | ||
}; | ||
/** | ||
* @param {number} fromIndex | ||
*/ | ||
nextClearBit(fromIndex) { | ||
let w = wordIndex(fromIndex); | ||
if (w >= this.wordsInUse) return fromIndex; | ||
P.get = function(bitIndex) { | ||
var w = wordIndex(bitIndex); | ||
if (w >= this.wordsInUse) return false; // >= since index vs size | ||
var bit = 1 << bitIndex; | ||
return !!(this.words[w] & bit); | ||
let word = ~(this.words[w]) & (0xffffffff << fromIndex); | ||
while (true) { | ||
if (word) return (w * 32) + trailingZeros(word); | ||
w++; | ||
if (w == this.wordsInUse) return w * 32; | ||
word = ~(this.words[w]); | ||
} | ||
} | ||
} | ||
/** | ||
* @param {number} bitIndex | ||
*/ | ||
function wordIndex(bitIndex) { | ||
return Math.floor(bitIndex / 32); | ||
} | ||
/** | ||
* @param {number} i | ||
*/ | ||
function trailingZeros(i) { | ||
@@ -66,3 +123,3 @@ // From Hacker's Delight, via JDK. Probably far less effective here, | ||
if (i === 0) return 32; | ||
var y, n = 31; | ||
let y, n = 31; | ||
y = i << 16; if (y != 0) { n = n -16; i = y; } | ||
@@ -75,32 +132,2 @@ y = i << 8; if (y != 0) { n = n - 8; i = y; } | ||
// Give the next bit that's set on or after fromIndex, or -1 if no such | ||
// bit | ||
P.nextSetBit = function(fromIndex) { | ||
var w = wordIndex(fromIndex); | ||
if (w >= this.wordsInUse) return -1; | ||
// the right-hand side is shifted to only test the bits of the first | ||
// word that are > fromIndex | ||
var word = this.words[w] & (0xffffffff << fromIndex); | ||
while (true) { | ||
if (word) return (w * 32) + trailingZeros(word); | ||
w++; | ||
if (w === this.wordsInUse) return -1; | ||
word = this.words[w]; | ||
} | ||
}; | ||
P.nextClearBit = function(fromIndex) { | ||
var w = wordIndex(fromIndex); | ||
if (w >= this.wordsInUse) return fromIndex; | ||
var word = ~(this.words[w]) & (0xffffffff << fromIndex); | ||
while (true) { | ||
if (word) return (w * 32) + trailingZeros(word); | ||
w++; | ||
if (w == this.wordsInUse) return w * 32; | ||
word = ~(this.words[w]); | ||
} | ||
}; | ||
module.exports.BitSet = BitSet; |
@@ -7,244 +7,249 @@ // | ||
var defs = require('./defs'); | ||
var Promise = require('bluebird'); | ||
var inherits = require('util').inherits; | ||
var EventEmitter = require('events').EventEmitter; | ||
var BaseChannel = require('./channel').BaseChannel; | ||
var acceptMessage = require('./channel').acceptMessage; | ||
var Args = require('./api_args'); | ||
const EventEmitter = require('events'); | ||
const Promise = require('bluebird'); | ||
const defs = require('./defs'); | ||
const {BaseChannel} = require('./channel'); | ||
const {acceptMessage} = require('./channel'); | ||
const Args = require('./api_args'); | ||
const {inspect} = require('./format'); | ||
function ChannelModel(connection) { | ||
if (!(this instanceof ChannelModel)) | ||
return new ChannelModel(connection); | ||
EventEmitter.call( this ); | ||
this.connection = connection; | ||
var self = this; | ||
['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) { | ||
connection.on(ev, self.emit.bind(self, ev)); | ||
}); | ||
} | ||
inherits(ChannelModel, EventEmitter); | ||
class ChannelModel extends EventEmitter { | ||
constructor(connection) { | ||
super(); | ||
this.connection = connection; | ||
module.exports.ChannelModel = ChannelModel; | ||
['error', 'close', 'blocked', 'unblocked'].forEach(ev => { | ||
connection.on(ev, this.emit.bind(this, ev)); | ||
}); | ||
} | ||
var CM = ChannelModel.prototype; | ||
close() { | ||
return Promise.fromCallback(this.connection.close.bind(this.connection)); | ||
} | ||
CM.close = function() { | ||
return Promise.fromCallback(this.connection.close.bind(this.connection)); | ||
}; | ||
async createChannel() { | ||
const channel = new Channel(this.connection); | ||
await channel.open(); | ||
return channel; | ||
} | ||
// Channels | ||
function Channel(connection) { | ||
BaseChannel.call(this, connection); | ||
this.on('delivery', this.handleDelivery.bind(this)); | ||
this.on('cancel', this.handleCancel.bind(this)); | ||
async createConfirmChannel() { | ||
const channel = new ConfirmChannel(this.connection); | ||
await channel.open(); | ||
await channel.rpc(defs.ConfirmSelect, {nowait: false}, defs.ConfirmSelectOk); | ||
return channel; | ||
} | ||
} | ||
inherits(Channel, BaseChannel); | ||
module.exports.Channel = Channel; | ||
// Channels | ||
CM.createChannel = function() { | ||
var c = new Channel(this.connection); | ||
return c.open().then(function(openOk) { return c; }); | ||
}; | ||
class Channel extends BaseChannel { | ||
constructor(connection) { | ||
super(connection); | ||
this.on('delivery', this.handleDelivery.bind(this)); | ||
this.on('cancel', this.handleCancel.bind(this)); | ||
} | ||
var C = Channel.prototype; | ||
// An RPC that returns a 'proper' promise, which resolves to just the | ||
// response's fields; this is intended to be suitable for implementing | ||
// API procedures. | ||
async rpc(method, fields, expect) { | ||
const f = await Promise.fromCallback(cb => { | ||
return this._rpc(method, fields, expect, cb); | ||
}) | ||
// An RPC that returns a 'proper' promise, which resolves to just the | ||
// response's fields; this is intended to be suitable for implementing | ||
// API procedures. | ||
C.rpc = function(method, fields, expect) { | ||
var self = this; | ||
return Promise.fromCallback(function(cb) { | ||
return self._rpc(method, fields, expect, cb); | ||
}) | ||
.then(function(f) { | ||
return f.fields; | ||
}); | ||
}; | ||
} | ||
// Do the remarkably simple channel open handshake | ||
C.open = function() { | ||
return Promise.try(this.allocate.bind(this)).then( | ||
function(ch) { | ||
return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, | ||
defs.ChannelOpenOk); | ||
// Do the remarkably simple channel open handshake | ||
open() { | ||
return Promise.try(this.allocate.bind(this)).then( | ||
ch => { | ||
return ch.rpc(defs.ChannelOpen, {outOfBand: ""}, | ||
defs.ChannelOpenOk); | ||
}); | ||
} | ||
close() { | ||
return Promise.fromCallback(cb => { | ||
return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, | ||
cb); | ||
}); | ||
}; | ||
} | ||
C.close = function() { | ||
var self = this; | ||
return Promise.fromCallback(function(cb) { | ||
return self.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS, | ||
cb); | ||
}); | ||
}; | ||
// === Public API, declaring queues and stuff === | ||
// === Public API, declaring queues and stuff === | ||
assertQueue(queue, options) { | ||
return this.rpc(defs.QueueDeclare, | ||
Args.assertQueue(queue, options), | ||
defs.QueueDeclareOk); | ||
} | ||
C.assertQueue = function(queue, options) { | ||
return this.rpc(defs.QueueDeclare, | ||
Args.assertQueue(queue, options), | ||
defs.QueueDeclareOk); | ||
}; | ||
checkQueue(queue) { | ||
return this.rpc(defs.QueueDeclare, | ||
Args.checkQueue(queue), | ||
defs.QueueDeclareOk); | ||
} | ||
C.checkQueue = function(queue) { | ||
return this.rpc(defs.QueueDeclare, | ||
Args.checkQueue(queue), | ||
defs.QueueDeclareOk); | ||
}; | ||
deleteQueue(queue, options) { | ||
return this.rpc(defs.QueueDelete, | ||
Args.deleteQueue(queue, options), | ||
defs.QueueDeleteOk); | ||
} | ||
C.deleteQueue = function(queue, options) { | ||
return this.rpc(defs.QueueDelete, | ||
Args.deleteQueue(queue, options), | ||
defs.QueueDeleteOk); | ||
}; | ||
purgeQueue(queue) { | ||
return this.rpc(defs.QueuePurge, | ||
Args.purgeQueue(queue), | ||
defs.QueuePurgeOk); | ||
} | ||
C.purgeQueue = function(queue) { | ||
return this.rpc(defs.QueuePurge, | ||
Args.purgeQueue(queue), | ||
defs.QueuePurgeOk); | ||
}; | ||
bindQueue(queue, source, pattern, argt) { | ||
return this.rpc(defs.QueueBind, | ||
Args.bindQueue(queue, source, pattern, argt), | ||
defs.QueueBindOk); | ||
} | ||
C.bindQueue = function(queue, source, pattern, argt) { | ||
return this.rpc(defs.QueueBind, | ||
Args.bindQueue(queue, source, pattern, argt), | ||
defs.QueueBindOk); | ||
}; | ||
unbindQueue(queue, source, pattern, argt) { | ||
return this.rpc(defs.QueueUnbind, | ||
Args.unbindQueue(queue, source, pattern, argt), | ||
defs.QueueUnbindOk); | ||
} | ||
C.unbindQueue = function(queue, source, pattern, argt) { | ||
return this.rpc(defs.QueueUnbind, | ||
Args.unbindQueue(queue, source, pattern, argt), | ||
defs.QueueUnbindOk); | ||
}; | ||
assertExchange(exchange, type, options) { | ||
// The server reply is an empty set of fields, but it's convenient | ||
// to have the exchange name handed to the continuation. | ||
return this.rpc(defs.ExchangeDeclare, | ||
Args.assertExchange(exchange, type, options), | ||
defs.ExchangeDeclareOk) | ||
.then(_ok => { return { exchange }; }); | ||
} | ||
C.assertExchange = function(exchange, type, options) { | ||
// The server reply is an empty set of fields, but it's convenient | ||
// to have the exchange name handed to the continuation. | ||
return this.rpc(defs.ExchangeDeclare, | ||
Args.assertExchange(exchange, type, options), | ||
defs.ExchangeDeclareOk) | ||
.then(function(_ok) { return { exchange: exchange }; }); | ||
}; | ||
checkExchange(exchange) { | ||
return this.rpc(defs.ExchangeDeclare, | ||
Args.checkExchange(exchange), | ||
defs.ExchangeDeclareOk); | ||
} | ||
C.checkExchange = function(exchange) { | ||
return this.rpc(defs.ExchangeDeclare, | ||
Args.checkExchange(exchange), | ||
defs.ExchangeDeclareOk); | ||
}; | ||
deleteExchange(name, options) { | ||
return this.rpc(defs.ExchangeDelete, | ||
Args.deleteExchange(name, options), | ||
defs.ExchangeDeleteOk); | ||
} | ||
C.deleteExchange = function(name, options) { | ||
return this.rpc(defs.ExchangeDelete, | ||
Args.deleteExchange(name, options), | ||
defs.ExchangeDeleteOk); | ||
}; | ||
bindExchange(dest, source, pattern, argt) { | ||
return this.rpc(defs.ExchangeBind, | ||
Args.bindExchange(dest, source, pattern, argt), | ||
defs.ExchangeBindOk); | ||
} | ||
C.bindExchange = function(dest, source, pattern, argt) { | ||
return this.rpc(defs.ExchangeBind, | ||
Args.bindExchange(dest, source, pattern, argt), | ||
defs.ExchangeBindOk); | ||
}; | ||
unbindExchange(dest, source, pattern, argt) { | ||
return this.rpc(defs.ExchangeUnbind, | ||
Args.unbindExchange(dest, source, pattern, argt), | ||
defs.ExchangeUnbindOk); | ||
} | ||
C.unbindExchange = function(dest, source, pattern, argt) { | ||
return this.rpc(defs.ExchangeUnbind, | ||
Args.unbindExchange(dest, source, pattern, argt), | ||
defs.ExchangeUnbindOk); | ||
}; | ||
// Working with messages | ||
// Working with messages | ||
publish(exchange, routingKey, content, options) { | ||
const fieldsAndProps = Args.publish(exchange, routingKey, options); | ||
return this.sendMessage(fieldsAndProps, fieldsAndProps, content); | ||
} | ||
C.publish = function(exchange, routingKey, content, options) { | ||
var fieldsAndProps = Args.publish(exchange, routingKey, options); | ||
return this.sendMessage(fieldsAndProps, fieldsAndProps, content); | ||
}; | ||
sendToQueue(queue, content, options) { | ||
return this.publish('', queue, content, options); | ||
} | ||
C.sendToQueue = function(queue, content, options) { | ||
return this.publish('', queue, content, options); | ||
}; | ||
consume(queue, callback, options) { | ||
// NB we want the callback to be run synchronously, so that we've | ||
// registered the consumerTag before any messages can arrive. | ||
const fields = Args.consume(queue, options); | ||
return Promise.fromCallback(cb => { | ||
this._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); | ||
}) | ||
.then(ok => { | ||
this.registerConsumer(ok.fields.consumerTag, callback); | ||
return ok.fields; | ||
}); | ||
} | ||
C.consume = function(queue, callback, options) { | ||
var self = this; | ||
// NB we want the callback to be run synchronously, so that we've | ||
// registered the consumerTag before any messages can arrive. | ||
var fields = Args.consume(queue, options); | ||
return Promise.fromCallback(function(cb) { | ||
self._rpc(defs.BasicConsume, fields, defs.BasicConsumeOk, cb); | ||
}) | ||
.then(function(ok) { | ||
self.registerConsumer(ok.fields.consumerTag, callback); | ||
return ok.fields; | ||
}); | ||
}; | ||
async cancel(consumerTag) { | ||
const ok = await Promise.fromCallback(cb => { | ||
this._rpc(defs.BasicCancel, Args.cancel(consumerTag), | ||
defs.BasicCancelOk, | ||
cb); | ||
}) | ||
.then(ok => { | ||
this.unregisterConsumer(consumerTag); | ||
return ok.fields; | ||
}); | ||
} | ||
C.cancel = function(consumerTag) { | ||
var self = this; | ||
return Promise.fromCallback(function(cb) { | ||
self._rpc(defs.BasicCancel, Args.cancel(consumerTag), | ||
defs.BasicCancelOk, | ||
cb); | ||
}) | ||
.then(function(ok) { | ||
self.unregisterConsumer(consumerTag); | ||
return ok.fields; | ||
}); | ||
}; | ||
C.get = function(queue, options) { | ||
var self = this; | ||
var fields = Args.get(queue, options); | ||
return Promise.fromCallback(function(cb) { | ||
return self.sendOrEnqueue(defs.BasicGet, fields, cb); | ||
}) | ||
.then(function(f) { | ||
if (f.id === defs.BasicGetEmpty) { | ||
return false; | ||
} | ||
else if (f.id === defs.BasicGetOk) { | ||
var fields = f.fields; | ||
return new Promise(function(resolve) { | ||
self.handleMessage = acceptMessage(function(m) { | ||
m.fields = fields; | ||
resolve(m); | ||
get(queue, options) { | ||
const fields = Args.get(queue, options); | ||
return Promise.fromCallback(cb => { | ||
return this.sendOrEnqueue(defs.BasicGet, fields, cb); | ||
}) | ||
.then(f => { | ||
if (f.id === defs.BasicGetEmpty) { | ||
return false; | ||
} | ||
else if (f.id === defs.BasicGetOk) { | ||
const fields = f.fields; | ||
return new Promise(resolve => { | ||
this.handleMessage = acceptMessage(m => { | ||
m.fields = fields; | ||
resolve(m); | ||
}); | ||
}); | ||
}); | ||
} | ||
else { | ||
throw new Error("Unexpected response to BasicGet: " + | ||
inspect(f)); | ||
} | ||
}) | ||
}; | ||
} | ||
else { | ||
throw new Error(`Unexpected response to BasicGet: ${inspect(f)}`); | ||
} | ||
}); | ||
} | ||
C.ack = function(message, allUpTo) { | ||
this.sendImmediately( | ||
defs.BasicAck, | ||
Args.ack(message.fields.deliveryTag, allUpTo)); | ||
}; | ||
ack(message, allUpTo) { | ||
this.sendImmediately( | ||
defs.BasicAck, | ||
Args.ack(message.fields.deliveryTag, allUpTo)); | ||
} | ||
C.ackAll = function() { | ||
this.sendImmediately(defs.BasicAck, Args.ack(0, true)); | ||
}; | ||
ackAll() { | ||
this.sendImmediately(defs.BasicAck, Args.ack(0, true)); | ||
} | ||
C.nack = function(message, allUpTo, requeue) { | ||
this.sendImmediately( | ||
defs.BasicNack, | ||
Args.nack(message.fields.deliveryTag, allUpTo, requeue)); | ||
}; | ||
nack(message, allUpTo, requeue) { | ||
this.sendImmediately( | ||
defs.BasicNack, | ||
Args.nack(message.fields.deliveryTag, allUpTo, requeue)); | ||
} | ||
C.nackAll = function(requeue) { | ||
this.sendImmediately(defs.BasicNack, | ||
Args.nack(0, true, requeue)); | ||
}; | ||
nackAll(requeue) { | ||
this.sendImmediately(defs.BasicNack, | ||
Args.nack(0, true, requeue)); | ||
} | ||
// `Basic.Nack` is not available in older RabbitMQ versions (or in the | ||
// AMQP specification), so you have to use the one-at-a-time | ||
// `Basic.Reject`. This is otherwise synonymous with | ||
// `#nack(message, false, requeue)`. | ||
C.reject = function(message, requeue) { | ||
this.sendImmediately( | ||
defs.BasicReject, | ||
Args.reject(message.fields.deliveryTag, requeue)); | ||
}; | ||
// `Basic.Nack` is not available in older RabbitMQ versions (or in the | ||
// AMQP specification), so you have to use the one-at-a-time | ||
// `Basic.Reject`. This is otherwise synonymous with | ||
// `#nack(message, false, requeue)`. | ||
reject(message, requeue) { | ||
this.sendImmediately( | ||
defs.BasicReject, | ||
Args.reject(message.fields.deliveryTag, requeue)); | ||
} | ||
recover() { | ||
return this.rpc(defs.BasicRecover, | ||
Args.recover(), | ||
defs.BasicRecoverOk); | ||
} | ||
qos(count, global) { | ||
return this.rpc(defs.BasicQos, | ||
Args.prefetch(count, global), | ||
defs.BasicQosOk); | ||
} | ||
} | ||
// There are more options in AMQP than exposed here; RabbitMQ only | ||
@@ -255,14 +260,4 @@ // implements prefetch based on message count, and only for individual | ||
// and prefetch with `global` set as per-channel. | ||
C.prefetch = C.qos = function(count, global) { | ||
return this.rpc(defs.BasicQos, | ||
Args.prefetch(count, global), | ||
defs.BasicQosOk); | ||
}; | ||
Channel.prototype.prefetch = Channel.prototype.qos | ||
C.recover = function() { | ||
return this.rpc(defs.BasicRecover, | ||
Args.recover(), | ||
defs.BasicRecoverOk); | ||
}; | ||
// Confirm channel. This is a channel with confirms 'switched on', | ||
@@ -275,47 +270,33 @@ // meaning sent messages will provoke a responding 'ack' or 'nack' | ||
function ConfirmChannel(connection) { | ||
Channel.call(this, connection); | ||
class ConfirmChannel extends Channel { | ||
publish(exchange, routingKey, content, options, cb) { | ||
this.pushConfirmCallback(cb); | ||
return Channel.prototype.publish.call(this, exchange, routingKey, content, options); | ||
} | ||
sendToQueue(queue, content, options, cb) { | ||
return this.publish('', queue, content, options, cb); | ||
} | ||
waitForConfirms() { | ||
const awaiting = []; | ||
const unconfirmed = this.unconfirmed; | ||
unconfirmed.forEach((val, index) => { | ||
if (val !== null) { | ||
const confirmed = new Promise((resolve, reject) => { | ||
unconfirmed[index] = err => { | ||
if (val) val(err); | ||
if (err === null) resolve(); | ||
else reject(err); | ||
}; | ||
}); | ||
awaiting.push(confirmed); | ||
} | ||
}); | ||
return Promise.all(awaiting); | ||
} | ||
} | ||
inherits(ConfirmChannel, Channel); | ||
module.exports.ConfirmChannel = ConfirmChannel; | ||
CM.createConfirmChannel = function() { | ||
var c = new ConfirmChannel(this.connection); | ||
return c.open() | ||
.then(function(openOk) { | ||
return c.rpc(defs.ConfirmSelect, {nowait: false}, | ||
defs.ConfirmSelectOk) | ||
}) | ||
.then(function() { return c; }); | ||
}; | ||
var CC = ConfirmChannel.prototype; | ||
CC.publish = function(exchange, routingKey, content, options, cb) { | ||
this.pushConfirmCallback(cb); | ||
return C.publish.call(this, exchange, routingKey, content, options); | ||
}; | ||
CC.sendToQueue = function(queue, content, options, cb) { | ||
return this.publish('', queue, content, options, cb); | ||
}; | ||
CC.waitForConfirms = function() { | ||
var awaiting = []; | ||
var unconfirmed = this.unconfirmed; | ||
unconfirmed.forEach(function(val, index) { | ||
if (val === null); // already confirmed | ||
else { | ||
var confirmed = new Promise(function(resolve, reject) { | ||
unconfirmed[index] = function(err) { | ||
if (val) val(err); | ||
if (err === null) resolve(); | ||
else reject(err); | ||
}; | ||
}); | ||
awaiting.push(confirmed); | ||
} | ||
}); | ||
return Promise.all(awaiting); | ||
}; | ||
module.exports.Channel = Channel; | ||
module.exports.ChannelModel = ChannelModel; |
@@ -19,3 +19,2 @@ // | ||
var stackCapture = require('./error').stackCapture; | ||
var Buffer = require('safe-buffer').Buffer | ||
function Channel(connection) { | ||
@@ -22,0 +21,0 @@ EventEmitter.call( this ); |
@@ -155,4 +155,8 @@ // | ||
openCallback(null, c); | ||
} else { | ||
// The connection isn't closed by the server on e.g. wrong password | ||
sock.end(); | ||
sock.destroy(); | ||
openCallback(err); | ||
} | ||
else openCallback(err); | ||
}); | ||
@@ -159,0 +163,0 @@ } |
@@ -12,3 +12,2 @@ // | ||
var Mux = require('./mux').Mux; | ||
var Buffer = require('safe-buffer').Buffer | ||
@@ -183,4 +182,3 @@ var Duplex = | ||
// options; e.g., something is a string instead of a number. | ||
try { self.sendMethod(0, Method, tunedOptions); } | ||
catch (err) { bail(err); } | ||
self.sendMethod(0, Method, tunedOptions); | ||
} | ||
@@ -211,3 +209,8 @@ | ||
self.serverProperties = start.fields.serverProperties; | ||
send(defs.ConnectionStartOk); | ||
try { | ||
send(defs.ConnectionStartOk); | ||
} catch (err) { | ||
bail(err); | ||
return; | ||
} | ||
wait(afterStartOk); | ||
@@ -234,4 +237,9 @@ } | ||
negotiate(fields.heartbeat, allFields.heartbeat); | ||
send(defs.ConnectionTuneOk); | ||
send(defs.ConnectionOpen); | ||
try { | ||
send(defs.ConnectionTuneOk); | ||
send(defs.ConnectionOpen); | ||
} catch (err) { | ||
bail(err); | ||
return; | ||
} | ||
expect(defs.ConnectionOpenOk, onOpenOk); | ||
@@ -238,0 +246,0 @@ break; |
@@ -11,3 +11,2 @@ // | ||
// context, i.e., your SSL certificate) | ||
var Buffer = require('safe-buffer').Buffer | ||
var codec = require('./codec') | ||
@@ -14,0 +13,0 @@ |
@@ -10,3 +10,2 @@ // The river sweeps through | ||
var decode = defs.decode; | ||
var Buffer = require('safe-buffer').Buffer | ||
@@ -13,0 +12,0 @@ var Bits = require('bitsyntax'); |
@@ -1,2 +0,1 @@ | ||
{ | ||
@@ -6,3 +5,3 @@ "name": "amqplib", | ||
"main": "./channel_api.js", | ||
"version": "0.8.0", | ||
"version": "0.9.0", | ||
"description": "An AMQP 0-9-1 (e.g., RabbitMQ) library and client.", | ||
@@ -21,9 +20,8 @@ "repository": { | ||
"readable-stream": "1.x >=1.1.9", | ||
"safe-buffer": "~5.2.1", | ||
"url-parse": "~1.5.1" | ||
"url-parse": "~1.5.10" | ||
}, | ||
"devDependencies": { | ||
"claire": "0.4.1", | ||
"istanbul": "0.1.x", | ||
"mocha": "^3.5.3", | ||
"mocha": "^9.2.2", | ||
"nyc": "^15.1.0", | ||
"uglify-js": "2.8.x" | ||
@@ -30,0 +28,0 @@ }, |
'use strict'; | ||
var claire = require('claire'); | ||
const claire = require('claire'); | ||
const {BitSet} = require('../lib/bitset'); | ||
var forAll = claire.forAll, | ||
arb = claire.data, | ||
label = claire.label, | ||
choice = claire.choice, | ||
transform = claire.transform; | ||
const { | ||
forAll, | ||
data: arb, | ||
label, | ||
choice, | ||
transform | ||
} = claire; | ||
var BitSet = require('../lib/bitset').BitSet; | ||
var PosInt = transform(Math.floor, arb.Positive); | ||
const PosInt = transform(Math.floor, arb.Positive); | ||
var EmptyBitSet = label('bitset', transform( | ||
function(size) { | ||
const EmptyBitSet = label('bitset', transform( | ||
size => { | ||
return new BitSet(size); | ||
@@ -20,12 +22,12 @@ }, | ||
suite('BitSet', function() { | ||
suite('BitSet', () => { | ||
test('get bit', forAll(EmptyBitSet, PosInt) | ||
.satisfy(function(b, bit) { | ||
.satisfy((b, bit) => { | ||
b.set(bit); | ||
return b.get(bit); | ||
}).asTest()); | ||
test('clear bit', forAll(EmptyBitSet, PosInt) | ||
.satisfy(function(b, bit) { | ||
.satisfy((b, bit) => { | ||
b.set(bit); | ||
@@ -37,3 +39,3 @@ b.clear(bit); | ||
test('next set of empty', forAll(EmptyBitSet) | ||
.satisfy(function(b) { | ||
.satisfy(b => { | ||
return b.nextSetBit(0) === -1; | ||
@@ -43,3 +45,3 @@ }).asTest()); | ||
test('next set of one bit', forAll(EmptyBitSet, PosInt) | ||
.satisfy(function(b, bit) { | ||
.satisfy((b, bit) => { | ||
b.set(bit); | ||
@@ -50,3 +52,3 @@ return b.nextSetBit(0) === bit; | ||
test('next set same bit', forAll(EmptyBitSet, PosInt) | ||
.satisfy(function(b, bit) { | ||
.satisfy((b, bit) => { | ||
b.set(bit); | ||
@@ -57,3 +59,3 @@ return b.nextSetBit(bit) === bit; | ||
test('next set following bit', forAll(EmptyBitSet, PosInt) | ||
.satisfy(function(b, bit) { | ||
.satisfy((b, bit) => { | ||
b.set(bit); | ||
@@ -64,11 +66,10 @@ return b.nextSetBit(bit+1) === -1; | ||
test('next clear of empty', forAll(EmptyBitSet, PosInt) | ||
.satisfy(function(b, bit) { return b.nextClearBit(bit) === bit; }) | ||
.satisfy((b, bit) => { return b.nextClearBit(bit) === bit; }) | ||
.asTest()); | ||
test('next clear of one set', forAll(EmptyBitSet, PosInt) | ||
.satisfy(function(b, bit) { | ||
.satisfy((b, bit) => { | ||
b.set(bit); | ||
return b.nextClearBit(bit) === bit + 1; | ||
}).asTest()); | ||
}); |
@@ -11,3 +11,2 @@ 'use strict'; | ||
var domain = require('domain'); | ||
var Buffer = require('safe-buffer').Buffer; | ||
@@ -14,0 +13,0 @@ var URL = process.env.URL || 'amqp://localhost'; |
@@ -10,3 +10,2 @@ 'use strict'; | ||
var Promise = require('bluebird'); | ||
var Buffer = require('safe-buffer').Buffer; | ||
@@ -13,0 +12,0 @@ var URL = process.env.URL || 'amqp://localhost'; |
@@ -15,3 +15,2 @@ // Test the channel machinery | ||
var OPEN_OPTS = require('./connection').OPEN_OPTS; | ||
var Buffer = require('safe-buffer').Buffer; | ||
@@ -18,0 +17,0 @@ var LOG_ERRORS = process.env.LOG_ERRORS; |
@@ -7,3 +7,2 @@ 'use strict'; | ||
var ints = require('buffer-more-ints'); | ||
var Buffer = require('safe-buffer').Buffer | ||
var C = require('claire'); | ||
@@ -10,0 +9,0 @@ var forAll = C.forAll; |
'use strict'; | ||
var connect = require('../lib/connect').connect; | ||
var Buffer = require('safe-buffer').Buffer | ||
var credentialsFromUrl = require('../lib/connect').credentialsFromUrl; | ||
var defs = require('../lib/defs'); | ||
var assert = require('assert'); | ||
var util = require('./util'); | ||
var net = require('net'); | ||
var fail = util.fail, succeed = util.succeed, | ||
var fail = util.fail, succeed = util.succeed, latch = util.latch, | ||
kCallback = util.kCallback, | ||
@@ -151,3 +151,48 @@ succeedIfAttributeEquals = util.succeedIfAttributeEquals; | ||
}); | ||
}); | ||
suite('Errors on connect', function() { | ||
var server | ||
teardown(function() { | ||
if (server) { | ||
server.close(); | ||
} | ||
}) | ||
test("closes underlying connection on authentication error", function(done) { | ||
var bothDone = latch(2, done); | ||
server = net.createServer(function(socket) { | ||
socket.once('data', function(protocolHeader) { | ||
assert.deepStrictEqual( | ||
protocolHeader, | ||
Buffer.from("AMQP" + String.fromCharCode(0,0,9,1)) | ||
); | ||
util.runServer(socket, function(send, wait) { | ||
send(defs.ConnectionStart, | ||
{versionMajor: 0, | ||
versionMinor: 9, | ||
serverProperties: {}, | ||
mechanisms: Buffer.from('PLAIN'), | ||
locales: Buffer.from('en_US')}); | ||
wait(defs.ConnectionStartOk)().then(function() { | ||
send(defs.ConnectionClose, | ||
{replyCode: 403, | ||
replyText: 'ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN', | ||
classId: 0, | ||
methodId: 0}); | ||
}); | ||
}); | ||
}); | ||
// Wait for the connection to be closed after the authentication error | ||
socket.once('end', function() { | ||
bothDone(); | ||
}); | ||
}).listen(0); | ||
connect('amqp://localhost:' + server.address().port, {}, function(err) { | ||
if (!err) bothDone(new Error('Expected authentication error')); | ||
bothDone(); | ||
}); | ||
}); | ||
}); |
@@ -5,3 +5,2 @@ 'use strict'; | ||
var defs = require('../lib/defs'); | ||
var Buffer = require('safe-buffer').Buffer; | ||
var Connection = require('../lib/connection').Connection; | ||
@@ -8,0 +7,0 @@ var HEARTBEAT = require('../lib/frame').HEARTBEAT; |
@@ -6,3 +6,2 @@ // Property-based testing representations of various things in AMQP | ||
var C = require('claire'); | ||
var Buffer = require('safe-buffer').Buffer; | ||
var forAll = C.forAll; | ||
@@ -9,0 +8,0 @@ var arb = C.data; |
@@ -6,3 +6,2 @@ 'use strict'; | ||
var fail = require('./util').fail; | ||
var Buffer = require('safe-buffer').Buffer; | ||
var connection = require('../lib/connection'); | ||
@@ -31,3 +30,3 @@ var Frames = connection.Connection; | ||
suite("Explicit parsing", function() { | ||
test('Parse heartbeat', function() { | ||
@@ -122,3 +121,3 @@ var input = inputs(); | ||
}; | ||
t.forEach(function(f) { | ||
@@ -125,0 +124,0 @@ f.channel = 0; |
Sorry, the diff of this file is too big to display
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
5
399915
67
11683
- Removedsafe-buffer@~5.2.1
- Removedsafe-buffer@5.2.1(transitive)
Updatedurl-parse@~1.5.10