Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqplib

Package Overview
Dependencies
Maintainers
2
Versions
32
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqplib - npm Package Compare versions

Comparing version 0.10.3 to 0.10.4

examples/direct_reply_to_client.js

11

.github/ISSUE_TEMPLATE/bug_report.md

@@ -26,11 +26,14 @@ ---

If the above does not help, please provide the following information...
If the above does not help, please provide as much of the following information as is relevant...
- A clear and concise description of what the bug is.
- A clear and concise description of the problem
- RabbitMQ version
- amqplib version
- Node.js version
- The simplest possible code snippet that demonstrates the problem
- A stack trace
If you include code snippets, please format them so they are easier to read
Please format code snippets and/or stack traces using GitHub Markdown
- https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/creating-and-highlighting-code-blocks
- https://docs.github.com/en/get-started/writing-on-github/working-with-advanced-formatting/creating-and-highlighting-code-blocks).

@@ -37,0 +40,0 @@ Thank you

@@ -498,3 +498,3 @@ var FS = require('fs');

println('len = buffer.readUInt32BE(offset); offset += 4;');
println('val = buffer.slice(offset, offset + len);');
println('val = buffer.subarray(offset, offset + len);');
println('offset += len;');

@@ -509,3 +509,3 @@ break;

println('len = buffer.readUInt32BE(offset); offset += 4;');
println('val = decodeFields(buffer.slice(offset, offset + len));');
println('val = decodeFields(buffer.subarray(offset, offset + len));');
println('offset += len;');

@@ -662,3 +662,3 @@ break;

println('buffer.writeUInt16BE(flags, 19);');
println('return buffer.slice(0, offset + 1);');
println('return buffer.subarray(0, offset + 1);');
println('}');

@@ -703,3 +703,3 @@ }

println('len = buffer.readUInt32BE(offset); offset += 4;');
println('val = buffer.slice(offset, offset + len);');
println('val = buffer.subarray(offset, offset + len);');
println('offset += len;');

@@ -714,3 +714,3 @@ break;

println('len = buffer.readUInt32BE(offset); offset += 4;');
println('val = decodeFields(buffer.slice(offset, offset + len));');
println('val = decodeFields(buffer.subarray(offset, offset + len));');
println('offset += len;');

@@ -717,0 +717,0 @@ break;

# Change log for amqplib
## Changes in v0.10.4
- Improve stream example as per https://github.com/amqp-node/amqplib/issues/722
- Added support for RabbitMQ's connection update-secret operation. See https://github.com/amqp-node/amqplib/issues/755
## Changes in v0.10.3

@@ -21,3 +26,3 @@

* Allow servername to be specified via socket options as discussed in as discussed in
* Allow servername to be specified via socket options as discussed in
[issue 697](https://github.com/squaremo/amqp.node/issues/697)

@@ -24,0 +29,0 @@

#!/usr/bin/env node
// Example of using a headers exchange
const amqp = require('../');
var amqp = require('../')
(async () => {
amqp.connect().then(function(conn) {
return conn.createChannel().then(withChannel);
}, console.error);
const connection = await amqp.connect();
const channel = await connection.createChannel();
function withChannel(ch) {
// NB the type of the exchange is 'headers'
ch.assertExchange('matching exchange', 'headers').then(function(ex) {
ch.assertQueue().then(function(q) {
bindAndConsume(ch, ex, q).then(function() {
send(ch, ex);
});
});
process.once('SIGINT', async () => {
await channel.close();
await connection.close();
});
}
function bindAndConsume(ch, ex, q) {
const { exchange } = await channel.assertExchange('matching exchange', 'headers');
const { queue } = await channel.assertQueue();
// When using a headers exchange, the headers to be matched go in

@@ -30,14 +25,17 @@ // the binding arguments. The routing key is ignore, so best left

// matched go in subsequent fields.
ch.bindQueue(q.queue, ex.exchange, '', {'x-match': 'any',
'foo': 'bar',
'baz': 'boo'});
return ch.consume(q.queue, function(msg) {
console.log(msg.content.toString());
}, {noAck: true});
}
await channel.bindQueue(queue, exchange, '', {
'x-match': 'any',
'foo': 'bar',
'baz': 'boo'
});
function send(ch, ex) {
// The headers for a message are given as an option to `publish`:
ch.publish(ex.exchange, '', Buffer.from('hello'), {headers: {baz: 'boo'}});
ch.publish(ex.exchange, '', Buffer.from('world'), {headers: {foo: 'bar'}});
}
await channel.consume(queue, (message) => {
console.log(message.content.toString());
}, { noAck: true });
channel.publish(exchange, '', Buffer.from('hello'), { headers: { baz: 'boo' }});
channel.publish(exchange, '', Buffer.from('hello'), { headers: { foo: 'bar' }});
channel.publish(exchange, '', Buffer.from('lost'), { headers: { meh: 'nah' }});
console.log(' [x] To exit press CTRL+C.');
})();

@@ -21,4 +21,4 @@ // Example of using a TLS/SSL connection. Note that the server must be

var amqp = require('../');
var fs = require('fs');
const amqp = require('../');
const fs = require('fs');

@@ -38,3 +38,3 @@ // Assemble the SSL options; for verification we need at least

// Options for full client and server verification:
var opts = {
const opts = {
cert: fs.readFileSync('../etc/client/cert.pem'),

@@ -54,3 +54,3 @@ key: fs.readFileSync('../etc/client/key.pem'),

//
// var opts = { ca: [fs.readFileSync('../etc/testca/cacert.pem')] };
// const opts = { ca: [fs.readFileSync('../etc/testca/cacert.pem')] };

@@ -60,9 +60,15 @@ // Option to use the SSL client certificate for authentication

var open = amqp.connect('amqps://localhost', opts);
(async () => {
const connection = await amqp.connect('amqp://localhost', opts);
const channel = await connection.createChannel();
open.then(function(conn) {
process.on('SIGINT', conn.close.bind(conn));
return conn.createChannel().then(function(ch) {
ch.sendToQueue('foo', Buffer.from('Hello World!'));
process.on('SIGINT', async () => {
await channel.close();
await connection.close();
});
}).then(null, console.warn);
channel.sendToQueue('foo', Buffer.from('Hello World!'));
console.log(' [x] To exit press CTRL+C.');
})();
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
const amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
var severity = (args.length > 0) ? args[0] : 'info';
var message = args.slice(1).join(' ') || 'Hello World!';
const exchange = 'direct_logs';
const args = process.argv.slice(2);
const routingKey = (args.length > 0) ? args[0] : 'info';
const text = args.slice(1).join(' ') || 'Hello World!';
function bail(err, conn) {
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
channel.assertExchange(exchange, 'direct', { durable: false }, (err) => {
if (err) return bail(err, connection);
channel.publish(exchange, routingKey, Buffer.from(text));
console.log(" [x] Sent '%s'", text);
channel.close(() => {
connection.close();
});
});
});
});
function bail(err, connection) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
if (connection) connection.close(() => {
process.exit(1);
});
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
var ex = 'direct_logs';
var exopts = {durable: false};
function on_channel_open(err, ch) {
if (err !== null) return bail(err, conn);
ch.assertExchange(ex, 'direct', exopts, function(err, ok) {
ch.publish(ex, severity, Buffer.from(message));
ch.close(function() { conn.close(); });
});
}
conn.createChannel(on_channel_open);
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
const amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'info';
var message = args.slice(1).join(' ') || 'Hello World!';
const exchange = 'topic_logs';
const args = process.argv.slice(2);
const routingKey = (args.length > 0) ? args[0] : 'info';
const text = args.slice(1).join(' ') || 'Hello World!';
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
var ex = 'topic_logs', exopts = {durable: false};
conn.createChannel(function(err, ch) {
ch.assertExchange(ex, 'topic', exopts, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.publish(ex, key, Buffer.from(message));
console.log(" [x] Sent %s:'%s'", key, message);
ch.close(function() { conn.close(); });
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
channel.assertExchange(exchange, 'topic', { durable: false }, (err) => {
if (err) return bail(err, connection);
channel.publish(exchange, routingKey, Buffer.from(text));
console.log(" [x] Sent '%s'", text);
channel.close(() => {
connection.close();
});
});
});
});
function bail(err, connection) {
console.error(err);
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
const amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
const exchange = 'logs';
const text = process.argv.slice(2).join(' ') || 'info: Hello World!';
function on_connect(err, conn) {
if (err !== null) return bail(err);
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
channel.assertExchange(exchange, 'fanout', { durable: false }, (err) => {
if (err) return bail(err, connection);
channel.publish(exchange, '', Buffer.from(text));
console.log(" [x] Sent '%s'", text);
channel.close(() => {
connection.close();
});
});
});
});
var ex = 'logs';
function on_channel_open(err, ch) {
if (err !== null) return bail(err, conn);
ch.assertExchange(ex, 'fanout', {durable: false});
var msg = process.argv.slice(2).join(' ') ||
'info: Hello World!';
ch.publish(ex, '', Buffer.from(msg));
console.log(" [x] Sent '%s'", msg);
ch.close(function() { conn.close(); });
}
conn.createChannel(on_channel_open);
function bail(err, connection) {
console.error(err);
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
const amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
const queue = 'task_queue';
const text = process.argv.slice(2).join(' ') || "Hello World!";
function on_connect(err, conn) {
if (err !== null) return bail(err);
var q = 'task_queue';
conn.createChannel(function(err, ch) {
if (err !== null) return bail(err, conn);
ch.assertQueue(q, {durable: true}, function(err, _ok) {
if (err !== null) return bail(err, conn);
var msg = process.argv.slice(2).join(' ') || "Hello World!";
ch.sendToQueue(q, Buffer.from(msg), {persistent: true});
console.log(" [x] Sent '%s'", msg);
ch.close(function() { conn.close(); });
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
channel.assertQueue(queue, { durable: true }, (err) => {
if (err) return bails(err, connection);
channel.sendToQueue(queue, Buffer.from(text), { persistent: true });
console.log(" [x] Sent '%s'", text);
channel.close(() => {
connection.close();
});
});
});
});
function bail(err, connection) {
console.error(err);
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
const amqp = require('amqplib/callback_api');
const { basename } = require('path');
var basename = require('path').basename;
var severities = process.argv.slice(2);
const exchange = 'direct_logs';
const severities = process.argv.slice(2);
if (severities.length < 1) {
console.log('Usage %s [info] [warning] [error]',
basename(process.argv[1]));
console.log('Usage %s [info] [warning] [error]', basename(process.argv[1]));
process.exit(1);
}
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });
process.once('SIGINT', () => {
channel.close(() => {
connection.close();
});
});
conn.createChannel(function(err, ch) {
if (err !== null) return bail(err, conn);
var ex = 'direct_logs', exopts = {durable: false};
ch.assertExchange(ex, 'direct', exopts);
ch.assertQueue('', {exclusive: true}, function(err, ok) {
if (err !== null) return bail(err, conn);
var queue = ok.queue, i = 0;
function sub(err) {
if (err !== null) return bail(err, conn);
else if (i < severities.length) {
ch.bindQueue(queue, ex, severities[i], {}, sub);
i++;
}
}
ch.consume(queue, logMessage, {noAck: true}, function(err) {
if (err !== null) return bail(err, conn);
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
sub(null);
channel.assertExchange(exchange, 'direct', { durable: false }, (err) => {
if (err) return bail(err, connection);
channel.assertQueue('', { exclusive: true }, (err, { queue }) => {
if (err) return bail(err, connection);
channel.consume(queue, (message) => {
if (message) console.log(" [x] %s:'%s'", message.fields.routingKey, message.content.toString());
else console.warn(' [x] Consumer cancelled');
}, {noAck: true}, function(err) {
if (err) return bail(err, connection);
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
subscribeAll(channel, queue, severities, (err) => {
if (err) return bail(err, connection);
});
});
});
});
});
});
function subscribeAll(channel, queue, bindingKeys, cb) {
if (bindingKeys.length === 0) return cb();
const bindingKey = bindingKeys.shift();
channel.bindQueue(queue, exchange, bindingKey, {}, (err) => {
if (err) return cb(err);
subscribeAll(channel, queue, bindingKeys, cb);
});
}
function logMessage(msg) {
console.log(" [x] %s:'%s'",
msg.fields.routingKey,
msg.content.toString());
function bail(err, connection) {
console.error(err);
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var basename = require('path').basename;
const amqp = require('amqplib/callback_api');
const { basename } = require('path');
var keys = process.argv.slice(2);
if (keys.length < 1) {
console.log('Usage %s pattern [pattern...]',
basename(process.argv[1]));
const exchange = 'topic_logs';
const severities = process.argv.slice(2);
if (severities.length < 1) {
console.log('Usage %s [info] [warning] [error]', basename(process.argv[1]));
process.exit(1);
}
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });
process.once('SIGINT', () => {
channel.close(() => {
connection.close();
});
});
conn.createChannel(function(err, ch) {
if (err !== null) return bail(err, conn);
var ex = 'topic_logs', exopts = {durable: false};
ch.assertExchange(ex, 'topic', exopts);
ch.assertQueue('', {exclusive: true}, function(err, ok) {
if (err !== null) return bail(err, conn);
var queue = ok.queue, i = 0;
function sub(err) {
if (err !== null) return bail(err, conn);
else if (i < keys.length) {
ch.bindQueue(queue, ex, keys[i], {}, sub);
i++;
}
}
ch.consume(queue, logMessage, {noAck: true}, function(err) {
if (err !== null) return bail(err, conn);
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
sub(null);
channel.assertExchange(exchange, 'topic', { durable: false }, (err) => {
if (err) return bail(err, connection);
channel.assertQueue('', { exclusive: true }, (err, { queue }) => {
if (err) return bail(err, connection);
channel.consume(queue, (message) => {
if (message) console.log(" [x] %s:'%s'", message.fields.routingKey, message.content.toString());
else console.warn(' [x] Consumer cancelled');
}, {noAck: true}, function(err) {
if (err) return bail(err, connection);
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
subscribeAll(channel, queue, severities, (err) => {
if (err) return bail(err, connection);
});
});
});
});
});
});
function subscribeAll(channel, queue, bindingKeys, cb) {
if (bindingKeys.length === 0) return cb();
const bindingKey = bindingKeys.shift();
channel.bindQueue(queue, exchange, bindingKey, {}, (err) => {
if (err) return cb(err);
subscribeAll(channel, queue, bindingKeys, cb);
});
}
function logMessage(msg) {
console.log(" [x] %s:'%s'",
msg.fields.routingKey,
msg.content.toString());
function bail(err, connection) {
console.error(err);
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
const amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
const exchange = 'logs';
function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
var ex = 'logs';
function on_channel_open(err, ch) {
if (err !== null) return bail(err, conn);
ch.assertQueue('', {exclusive: true}, function(err, ok) {
var q = ok.queue;
ch.bindQueue(q, ex, '');
ch.consume(q, logMessage, {noAck: true}, function(err, ok) {
if (err !== null) return bail(err, conn);
console.log(" [*] Waiting for logs. To exit press CTRL+C.");
process.once('SIGINT', () => {
channel.close(() => {
connection.close();
});
});
}
function logMessage(msg) {
if (msg)
console.log(" [x] '%s'", msg.content.toString());
}
channel.assertExchange(exchange, 'fanout', { durable: false }, (err, { queue }) => {
if (err) return bail(err, connection);
channel.assertQueue('', { exclusive: true }, (err, { queue }) => {
if (err) return bail(err, connection);
channel.bindQueue(queue, exchange, '', {}, (err) => {
if (err) return bail(err, connection);
channel.consume(queue, (message) => {
if (message) console.log(" [x] '%s'", message.content.toString());
else console.warn(' [x] Consumer cancelled');
}, { noAck: true }, (err) => {
if (err) return bail(err, connection);
console.log(" [*] Waiting for logs. To exit press CTRL+C.");
});
});
});
});
});
});
conn.createChannel(on_channel_open);
function bail(err, connection) {
console.error(err);
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
const amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
const queue = 'hello';
function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });
var q = 'hello';
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
function on_channel_open(err, ch) {
ch.assertQueue(q, {durable: false}, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.consume(q, function(msg) { // message callback
console.log(" [x] Received '%s'", msg.content.toString());
}, {noAck: true}, function(_consumeOk) { // consume callback
console.log(' [*] Waiting for messages. To exit press CTRL+C');
process.once('SIGINT', () => {
channel.close(() => {
connection.close();
});
});
}
conn.createChannel(on_channel_open);
channel.assertQueue(queue, { durable: false }, (err) => {
if (err) return bail(err, connection);
channel.consume(queue, (message) => {
if (message) console.log(" [x] Received '%s'", message.content.toString());
else console.warn(' [x] Consumer cancelled');
}, { noAck: true }, (err) => {
if (err) return bail(err, connection);
console.log(" [*] Waiting for logs. To exit press CTRL+C.");
});
});
});
});
function bail(err, connection) {
console.error(err);
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
var basename = require('path').basename;
var uuid = require('node-uuid');
const amqp = require('amqplib/callback_api');
const { basename } = require('path');
const { v4: uuid } = require('uuid');
var n;
try {
if (process.argv.length < 3) throw Error('Too few args');
n = parseInt(process.argv[2]);
}
catch (e) {
console.error(e);
const queue = 'rpc_queue';
const n = parseInt(process.argv[2], 10);
if (isNaN(n)) {
console.warn('Usage: %s number', basename(process.argv[1]));

@@ -18,33 +15,38 @@ process.exit(1);

function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
channel.assertQueue('', { exclusive: true }, (err, { queue: replyTo }) => {
if (err) return bail(err, connection);
function on_connect(err, conn) {
if (err !== null) return bail(err);
conn.createChannel(function(err, ch) {
if (err !== null) return bail(err, conn);
const correlationId = uuid();
channel.consume(replyTo, (message) => {
if (!message) console.warn(' [x] Consumer cancelled');
else if (message.properties.correlationId === correlationId) {
console.log(' [.] Got %d', message.content.toString());
channel.close(() => {
connection.close();
})
}
}, { noAck: true });
var correlationId = uuid();
function maybeAnswer(msg) {
if (msg.properties.correlationId === correlationId) {
console.log(' [.] Got %d', msg.content.toString());
}
else return bail(new Error('Unexpected message'), conn);
ch.close(function() { conn.close(); });
}
ch.assertQueue('', {exclusive: true}, function(err, ok) {
if (err !== null) return bail(err, conn);
var queue = ok.queue;
ch.consume(queue, maybeAnswer, {noAck:true});
console.log(' [x] Requesting fib(%d)', n);
ch.sendToQueue('rpc_queue', Buffer.from(n.toString()), {
replyTo: queue, correlationId: correlationId
channel.assertQueue(queue, { durable: false }, (err) => {
if (err) return bail(err, connection);
console.log(' [x] Requesting fib(%d)', n);
channel.sendToQueue(queue, Buffer.from(n.toString()), {
correlationId,
replyTo
});
});
});
});
});
function bail(err, connection) {
console.error(err);
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
const amqp = require('amqplib/callback_api');
const queue = 'rpc_queue';
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
process.once('SIGINT', () => {
channel.close(() => {
connection.close();
});
});
channel.assertQueue(queue, { durable: false }, (err) => {
if (err) return bail(err, connection);
channel.prefetch(1);
channel.consume(queue, (message) => {
const n = parseInt(message.content.toString(), 10);
console.log(' [.] fib(%d)', n);
const response = fib(n);
channel.sendToQueue(message.properties.replyTo, Buffer.from(response.toString()), {
correlationId: message.properties.correlationId
});
channel.ack(message);
}, { noAck: false }, function(err) {
if (err) return bail(err, conn);
console.log(' [x] Awaiting RPC requests. To exit press CTRL+C.');
});
});
});
});
function fib(n) {
var a = 0, b = 1;
for (var i=0; i < n; i++) {
var c = a + b;
// Do it the ridiculous, but not most ridiculous, way. For better,
// see http://nayuki.eigenstate.org/page/fast-fibonacci-algorithms
let a = 0, b = 1;
for (let i=0; i < n; i++) {
let c = a + b;
a = b; b = c;

@@ -14,33 +48,9 @@ }

function bail(err, conn) {
function bail(err, connection) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });
var q = 'rpc_queue';
conn.createChannel(function(err, ch) {
ch.assertQueue(q, {durable: false});
ch.prefetch(1);
ch.consume(q, reply, {noAck:false}, function(err) {
if (err !== null) return bail(err, conn);
console.log(' [x] Awaiting RPC requests');
});
function reply(msg) {
var n = parseInt(msg.content.toString());
console.log(' [.] fib(%d)', n);
ch.sendToQueue(msg.properties.replyTo,
Buffer.from(fib(n).toString()),
{correlationId: msg.properties.correlationId});
ch.ack(msg);
}
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
const amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
const queue = 'hello';
const text = 'Hello World!';
function on_connect(err, conn) {
if (err !== null) return bail(err);
var q = 'hello';
var msg = 'Hello World!';
function on_channel_open(err, ch) {
if (err !== null) return bail(err, conn);
ch.assertQueue(q, {durable: false}, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.sendToQueue(q, Buffer.from(msg));
console.log(" [x] Sent '%s'", msg);
ch.close(function() { conn.close(); });
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
channel.assertQueue(queue, { durable: false }, (err) => {
if (err) return bail(err, connection);
channel.sendToQueue(queue, Buffer.from(text));
console.log(" [x] Sent '%s'", text);
channel.close(() => {
connection.close();
});
});
}
});
});
conn.createChannel(on_channel_open);
function bail(err, connection) {
console.error(err);
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
const amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
const queue = 'task_queue';
function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINT', function() { conn.close(); });
var q = 'task_queue';
amqp.connect((err, connection) => {
if (err) return bail(err);
connection.createChannel((err, channel) => {
if (err) return bail(err, connection);
conn.createChannel(function(err, ch) {
if (err !== null) return bail(err, conn);
ch.assertQueue(q, {durable: true}, function(err, _ok) {
ch.consume(q, doWork, {noAck: false});
process.once('SIGINT', () => {
channel.close(() => {
connection.close();
});
});
channel.assertQueue(queue, { durable: true }, (err, { queue }) => {
if (err) return bail(err, connection);
channel.consume(queue, (message) => {
const text = message.content.toString();
console.log(" [x] Received '%s'", text);
const seconds = text.split('.').length - 1;
setTimeout(() => {
console.log(" [x] Done");
channel.ack(message);
}, seconds * 1000);
}, { noAck: false });
console.log(" [*] Waiting for messages. To exit press CTRL+C");
});
});
});
function doWork(msg) {
var body = msg.content.toString();
console.log(" [x] Received '%s'", body);
var secs = body.split('.').length - 1;
setTimeout(function() {
console.log(" [x] Done");
ch.ack(msg);
}, secs * 1000);
}
function bail(err, connection) {
console.error(err);
if (connection) connection.close(() => {
process.exit(1);
});
}
amqp.connect(on_connect);
#!/usr/bin/env node
var amqp = require('amqplib');
const amqp = require('amqplib');
var args = process.argv.slice(2);
var severity = (args.length > 0) ? args[0] : 'info';
var message = args.slice(1).join(' ') || 'Hello World!';
const exchange = 'direct_logs';
const args = process.argv.slice(2);
const routingKey = (args.length > 0) ? args[0] : 'info';
const text = args.slice(1).join(' ') || 'Hello World!';
amqp.connect('amqp://localhost').then(function(conn) {
return conn.createChannel().then(function(ch) {
var ex = 'direct_logs';
var ok = ch.assertExchange(ex, 'direct', {durable: false});
return ok.then(function() {
ch.publish(ex, severity, Buffer.from(message));
console.log(" [x] Sent %s:'%s'", severity, message);
return ch.close();
});
}).finally(function() { conn.close(); });
}).catch(console.warn);
(async () => {
let connection;
try {
connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertExchange(exchange, 'direct', { durable: false });
channel.publish(exchange, routingKey, Buffer.from(text));
console.log(" [x] Sent %s:'%s'", routingKey, text);
await channel.close();
}
catch (err) {
console.warn(err);
}
finally {
if (connection) await connection.close();
};
})();
#!/usr/bin/env node
var amqp = require('amqplib');
const amqp = require('amqplib');
var args = process.argv.slice(2);
var key = (args.length > 0) ? args[0] : 'info';
var message = args.slice(1).join(' ') || 'Hello World!';
const exchange = 'topic_logs';
const args = process.argv.slice(2);
const routingKeys = (args.length > 0) ? args[0] : 'info';
const text = args.slice(1).join(' ') || 'Hello World!';
amqp.connect('amqp://localhost').then(function(conn) {
return conn.createChannel().then(function(ch) {
var ex = 'topic_logs';
var ok = ch.assertExchange(ex, 'topic', {durable: false});
return ok.then(function() {
ch.publish(ex, key, Buffer.from(message));
console.log(" [x] Sent %s:'%s'", key, message);
return ch.close();
});
}).finally(function() { conn.close(); })
}).catch(console.log);
(async () => {
let connection;
try {
connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertExchange(exchange, 'topic', { durable: false });
channel.publish(exchange, routingKeys, Buffer.from(text));
console.log(" [x] Sent %s:'%s'", routingKeys, text);
await channel.close();
}
catch (err) {
console.warn(err);
}
finally {
if (connection) await connection.close();
};
})();
#!/usr/bin/env node
var amqp = require('amqplib');
const amqp = require('amqplib');
amqp.connect('amqp://localhost').then(function(conn) {
return conn.createChannel().then(function(ch) {
var ex = 'logs';
var ok = ch.assertExchange(ex, 'fanout', {durable: false})
const exchange = 'logs';
const text = process.argv.slice(2).join(' ') || 'info: Hello World!';
var message = process.argv.slice(2).join(' ') ||
'info: Hello World!';
return ok.then(function() {
ch.publish(ex, '', Buffer.from(message));
console.log(" [x] Sent '%s'", message);
return ch.close();
});
}).finally(function() { conn.close(); });
}).catch(console.warn);
(async () => {
let connection;
try {
connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertExchange(exchange, 'fanout', { durable: false });
channel.publish(exchange, '', Buffer.from(text));
console.log(" [x] Sent '%s'", text);
await channel.close();
}
catch (err) {
console.warn(err);
}
finally {
if (connection) await connection.close();
};
})();
#!/usr/bin/env node
// Post a new task to the work queue
var amqp = require('amqplib');
const amqp = require('amqplib');
amqp.connect('amqp://localhost').then(function(conn) {
return conn.createChannel().then(function(ch) {
var q = 'task_queue';
var ok = ch.assertQueue(q, {durable: true});
const queue = 'task_queue';
const text = process.argv.slice(2).join(' ') || "Hello World!";
return ok.then(function() {
var msg = process.argv.slice(2).join(' ') || "Hello World!";
ch.sendToQueue(q, Buffer.from(msg), {deliveryMode: true});
console.log(" [x] Sent '%s'", msg);
return ch.close();
});
}).finally(function() { conn.close(); });
}).catch(console.warn);
(async () => {
let connection;
try {
connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(text), { persistent: true });
console.log(" [x] Sent '%s'", text);
await channel.close();
}
catch (err) {
console.warn(err);
}
finally {
await connection.close();
};
})();
#!/usr/bin/env node
var amqp = require('amqplib');
var basename = require('path').basename;
const amqp = require('../..');
const { basename } = require('path');
var severities = process.argv.slice(2);
if (severities.length < 1) {
console.warn('Usage: %s [info] [warning] [error]',
basename(process.argv[1]));
const exchange = 'direct_logs';
const bindingKeys = process.argv.slice(2);
if (bindingKeys.length < 1) {
console.warn('Usage: %s [info] [warning] [error]', basename(process.argv[1]));
process.exit(1);
}
amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var ex = 'direct_logs';
(async () => {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
var ok = ch.assertExchange(ex, 'direct', {durable: false});
ok = ok.then(function() {
return ch.assertQueue('', {exclusive: true});
process.once('SIGINT', async () => {
await channel.close();
await connection.close();
});
ok = ok.then(function(qok) {
var queue = qok.queue;
return Promise.all(severities.map(function(sev) {
ch.bindQueue(queue, ex, sev);
})).then(function() { return queue; });
});
await channel.assertExchange(exchange, 'direct', { durable: false });
const { queue } = await channel.assertQueue('', { exclusive: true });
await Promise.all(bindingKeys.map(async (bindingKey) => {
await channel.bindQueue(queue, exchange, bindingKey);
}));
ok = ok.then(function(queue) {
return ch.consume(queue, logMessage, {noAck: true});
});
return ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
await channel.consume(queue, (message) => {
if (message) console.log(" [x] %s:'%s'", message.fields.routingKey, message.content.toString());
else console.warn(' [x] Consumer cancelled');
}, { noAck: true });
function logMessage(msg) {
console.log(" [x] %s:'%s'",
msg.fields.routingKey,
msg.content.toString());
}
});
}).catch(console.warn);
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
} catch(err) {
console.warn(err);
}
})();
#!/usr/bin/env node
var amqp = require('amqplib');
var basename = require('path').basename;
const amqp = require('../..');
const { basename } = require('path');
var keys = process.argv.slice(2);
if (keys.length < 1) {
console.log('Usage: %s pattern [pattern...]',
basename(process.argv[1]));
const exchange = 'topic_logs';
const bindingKeys = process.argv.slice(2);
if (bindingKeys.length < 1) {
console.log('Usage: %s pattern [pattern...]', basename(process.argv[1]));
process.exit(1);
}
amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var ex = 'topic_logs';
var ok = ch.assertExchange(ex, 'topic', {durable: false});
(async () => {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
ok = ok.then(function() {
return ch.assertQueue('', {exclusive: true});
process.once('SIGINT', async () => {
await channel.close();
await connection.close();
});
ok = ok.then(function(qok) {
var queue = qok.queue;
return Promise.all(keys.map(function(rk) {
ch.bindQueue(queue, ex, rk);
})).then(function() { return queue; });
});
await channel.assertExchange(exchange, 'topic', { durable: false });
const { queue } = await channel.assertQueue('', { exclusive: true });
await Promise.all(bindingKeys.map(async (bindingKey) => {
await channel.bindQueue(queue, exchange, bindingKey);
}));
ok = ok.then(function(queue) {
return ch.consume(queue, logMessage, {noAck: true});
});
return ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
});
await channel.consume(queue, (message) => {
if (message) console.log(" [x] %s:'%s'", message.fields.routingKey, message.content.toString());
else console.warn(' [x] Consumer cancelled');
}, { noAck: true });
function logMessage(msg) {
console.log(" [x] %s:'%s'",
msg.fields.routingKey,
msg.content.toString());
}
});
}).catch(console.warn);
console.log(' [*] Waiting for logs. To exit press CTRL+C.');
}
catch (err) {
console.warn(err);
}
})();
#!/usr/bin/env node
var amqp = require('amqplib');
const amqp = require('amqplib');
amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var ok = ch.assertExchange('logs', 'fanout', {durable: false});
ok = ok.then(function() {
return ch.assertQueue('', {exclusive: true});
const exchange = 'logs';
(async () => {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
process.once('SIGINT', async () => {
await channel.close();
await connection.close();
});
ok = ok.then(function(qok) {
return ch.bindQueue(qok.queue, 'logs', '').then(function() {
return qok.queue;
});
});
ok = ok.then(function(queue) {
return ch.consume(queue, logMessage, {noAck: true});
});
return ok.then(function() {
console.log(' [*] Waiting for logs. To exit press CTRL+C');
});
function logMessage(msg) {
console.log(" [x] '%s'", msg.content.toString());
}
});
}).catch(console.warn);
await channel.assertExchange(exchange, 'fanout', { durable: false });
const { queue } = await channel.assertQueue('', { exclusive: true });
await channel.bindQueue(queue, exchange, '')
await channel.consume(queue, (message) => {
if (message) console.log(" [x] '%s'", message.content.toString());
else console.warn(' [x] Consumer cancelled');
}, { noAck: true });
console.log(' [*] Waiting for logs. To exit press CTRL+C');
} catch (err) {
console.warn(err);
}
})();
#!/usr/bin/env node
var amqp = require('amqplib');
const amqp = require('amqplib');
amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
const queue = 'hello';
var ok = ch.assertQueue('hello', {durable: false});
(async () => {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
ok = ok.then(function(_qok) {
return ch.consume('hello', function(msg) {
console.log(" [x] Received '%s'", msg.content.toString());
}, {noAck: true});
process.once('SIGINT', async () => {
await channel.close();
await connection.close();
});
return ok.then(function(_consumeOk) {
console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
});
}).catch(console.warn);
await channel.assertQueue(queue, { durable: false });
await channel.consume(queue, (message) => {
console.log(" [x] Received '%s'", message.content.toString());
}, { noAck: true });
console.log(' [*] Waiting for messages. To exit press CTRL+C');
} catch (err) {
console.warn(err);
}
})();
#!/usr/bin/env node
var amqp = require('amqplib');
var basename = require('path').basename;
var uuid = require('node-uuid');
const amqp = require('amqplib');
const { basename } = require('path');
const { v4: uuid } = require('uuid');
// I've departed from the form of the original RPC tutorial, which
// needlessly introduces a class definition, and doesn't even
// parameterise the request.
const queue = 'rpc_queue';
var n;
try {
if (process.argv.length < 3) throw Error('Too few args');
n = parseInt(process.argv[2]);
}
catch (e) {
console.error(e);
const n = parseInt(process.argv[2], 10);
if (isNaN(n)) {
console.warn('Usage: %s number', basename(process.argv[1]));

@@ -22,32 +15,36 @@ process.exit(1);

amqp.connect('amqp://localhost').then(function(conn) {
return conn.createChannel().then(function(ch) {
return new Promise(function(resolve) {
var corrId = uuid();
function maybeAnswer(msg) {
if (msg.properties.correlationId === corrId) {
resolve(msg.content.toString());
}
}
(async () => {
let connection;
try {
connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
const correlationId = uuid();
var ok = ch.assertQueue('', {exclusive: true})
.then(function(qok) { return qok.queue; });
const requestFib = new Promise(async (resolve) => {
const { queue: replyTo } = await channel.assertQueue('', { exclusive: true });
ok = ok.then(function(queue) {
return ch.consume(queue, maybeAnswer, {noAck: true})
.then(function() { return queue; });
});
await channel.consume(replyTo, (message) => {
if (!message) console.warn(' [x] Consumer cancelled');
else if (message.properties.correlationId === correlationId) {
resolve(message.content.toString());
}
}, { noAck: true });
ok = ok.then(function(queue) {
console.log(' [x] Requesting fib(%d)', n);
ch.sendToQueue('rpc_queue', Buffer.from(n.toString()), {
correlationId: corrId, replyTo: queue
});
await channel.assertQueue(queue, { durable: false });
console.log(' [x] Requesting fib(%d)', n);
channel.sendToQueue(queue, Buffer.from(n.toString()), {
correlationId,
replyTo,
});
});
})
.then(function(fibN) {
const fibN = await requestFib;
console.log(' [.] Got %d', fibN);
})
.finally(function() { conn.close(); });
}).catch(console.warn);
}
catch (err) {
console.warn(err);
}
finally {
if (connection) await connection.close();
};
})();
#!/usr/bin/env node
var amqp = require('amqplib');
const amqp = require('amqplib');
const queue = 'rpc_queue';
(async () => {
try {
const connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
process.once('SIGINT', async () => {
await channel.close();
await connection.close();
});
await channel.assertQueue(queue, { durable: false });
channel.prefetch(1);
await channel.consume(queue, (message) => {
const n = parseInt(message.content.toString(), 10);
console.log(' [.] fib(%d)', n);
const response = fib(n);
channel.sendToQueue(message.properties.replyTo, Buffer.from(response.toString()), {
correlationId: message.properties.correlationId
});
channel.ack(message);
});
console.log(' [x] Awaiting RPC requests. To exit press CTRL+C.');
}
catch (err) {
console.warn(err);
}
})();
function fib(n) {
// Do it the ridiculous, but not most ridiculous, way. For better,
// see http://nayuki.eigenstate.org/page/fast-fibonacci-algorithms
var a = 0, b = 1;
for (var i=0; i < n; i++) {
var c = a + b;
let a = 0, b = 1;
for (let i=0; i < n; i++) {
let c = a + b;
a = b; b = c;

@@ -15,26 +47,1 @@ }

}
amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var q = 'rpc_queue';
var ok = ch.assertQueue(q, {durable: false});
var ok = ok.then(function() {
ch.prefetch(1);
return ch.consume(q, reply);
});
return ok.then(function() {
console.log(' [x] Awaiting RPC requests');
});
function reply(msg) {
var n = parseInt(msg.content.toString());
console.log(' [.] fib(%d)', n);
var response = fib(n);
ch.sendToQueue(msg.properties.replyTo,
Buffer.from(response.toString()),
{correlationId: msg.properties.correlationId});
ch.ack(msg);
}
});
}).catch(console.warn);
#!/usr/bin/env node
var amqp = require('amqplib');
const amqp = require('amqplib');
amqp.connect('amqp://localhost').then(function(conn) {
return conn.createChannel().then(function(ch) {
var q = 'hello';
var msg = 'Hello World!';
const queue = 'hello';
const text = 'Hello World!';
var ok = ch.assertQueue(q, {durable: false});
(async () => {
let connection;
try {
connection = await amqp.connect('amqp://localhost');
const channel = await connection.createChannel();
return ok.then(function(_qok) {
// NB: `sentToQueue` and `publish` both return a boolean
// indicating whether it's OK to send again straight away, or
// (when `false`) that you should wait for the event `'drain'`
// to fire before writing again. We're just doing the one write,
// so we'll ignore it.
ch.sendToQueue(q, Buffer.from(msg));
console.log(" [x] Sent '%s'", msg);
return ch.close();
});
}).finally(function() { conn.close(); });
}).catch(console.warn);
await channel.assertQueue(queue, { durable: false });
// NB: `sentToQueue` and `publish` both return a boolean
// indicating whether it's OK to send again straight away, or
// (when `false`) that you should wait for the event `'drain'`
// to fire before writing again. We're just doing the one write,
// so we'll ignore it.
channel.sendToQueue(queue, Buffer.from(text));
console.log(" [x] Sent '%s'", text);
await channel.close();
}
catch (err) {
console.warn(err);
}
finally {
if (connection) await connection.close();
};
})();
#!/usr/bin/env node
// Process tasks from the work queue
var amqp = require('amqplib');
const amqp = require('amqplib');
amqp.connect('amqp://localhost').then(function(conn) {
process.once('SIGINT', function() { conn.close(); });
return conn.createChannel().then(function(ch) {
var ok = ch.assertQueue('task_queue', {durable: true});
ok = ok.then(function() { ch.prefetch(1); });
ok = ok.then(function() {
ch.consume('task_queue', doWork, {noAck: false});
console.log(" [*] Waiting for messages. To exit press CTRL+C");
const queue = 'task_queue';
(async () => {
try {
const connection = await amqp.connect('amqp://localhost');
process.once('SIGINT', async () => {
await connection.close();
});
return ok;
function doWork(msg) {
var body = msg.content.toString();
console.log(" [x] Received '%s'", body);
var secs = body.split('.').length - 1;
//console.log(" [x] Task takes %d seconds", secs);
setTimeout(function() {
const channel = await connection.createChannel();
await channel.assertQueue(queue, { durable: true });
channel.prefetch(1);
await channel.consume(queue, (message) => {
const text = message.content.toString();
console.log(" [x] Received '%s'", text);
const seconds = text.split('.').length - 1;
setTimeout(() => {
console.log(" [x] Done");
ch.ack(msg);
}, secs * 1000);
}
});
}).catch(console.warn);
channel.ack(message);
}, seconds * 1000);
}, { noAck: false });
console.log(" [*] Waiting for messages. To exit press CTRL+C");
}
catch (err) {
console.warn(err);
}
})();

@@ -1,22 +0,21 @@

var amqp = require('../');
const amqp = require('../');
var NUM_MSGS = 20;
(async () => {
let connection;
try {
connection = await amqp.connect();
const channel = await connection.createConfirmChannel();
function mkCallback(i) {
return (i % 2) === 0 ? function(err) {
if (err !== null) { console.error('Message %d failed!', i); }
else { console.log('Message %d confirmed', i); }
} : null;
}
for (var i=0; i < 20; i++) {
channel.publish('amq.topic', 'whatever', Buffer.from('blah'));
};
amqp.connect().then(function(c) {
c.createConfirmChannel().then(function(ch) {
for (var i=0; i < NUM_MSGS; i++) {
ch.publish('amq.topic', 'whatever', Buffer.from('blah'), {}, mkCallback(i));
}
ch.waitForConfirms().then(function() {
console.log('All messages done');
c.close();
}).catch(console.error);
});
});
await channel.waitForConfirms();
console.log('All messages done');
await channel.close();
} catch (err) {
console.warn(err);
} finally {
if (connection) await connection.close();
}
})();

@@ -8,4 +8,3 @@ //

var defs = require('./defs');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var EventEmitter = require('events');
var BaseChannel = require('./channel').BaseChannel;

@@ -15,315 +14,321 @@ var acceptMessage = require('./channel').acceptMessage;

function CallbackModel(connection) {
if (!(this instanceof CallbackModel))
return new CallbackModel(connection);
EventEmitter.call( this );
this.connection = connection;
var self = this;
['error', 'close', 'blocked', 'unblocked'].forEach(function(ev) {
connection.on(ev, self.emit.bind(self, ev));
});
}
inherits(CallbackModel, EventEmitter);
class CallbackModel extends EventEmitter {
constructor (connection) {
super();
this.connection = connection;
var self = this;
['error', 'close', 'blocked', 'unblocked'].forEach(function (ev) {
connection.on(ev, self.emit.bind(self, ev));
});
}
module.exports.CallbackModel = CallbackModel;
close (cb) {
this.connection.close(cb);
}
CallbackModel.prototype.close = function(cb) {
this.connection.close(cb);
};
updateSecret(newSecret, reason, cb) {
this.connection._updateSecret(newSecret, reason, cb);
}
function Channel(connection) {
BaseChannel.call(this, connection);
this.on('delivery', this.handleDelivery.bind(this));
this.on('cancel', this.handleCancel.bind(this));
createChannel (cb) {
var ch = new Channel(this.connection);
ch.open(function (err, ok) {
if (err === null)
cb && cb(null, ch);
else
cb && cb(err);
});
return ch;
}
createConfirmChannel (cb) {
var ch = new ConfirmChannel(this.connection);
ch.open(function (err) {
if (err !== null)
return cb && cb(err);
else {
ch.rpc(defs.ConfirmSelect, { nowait: false },
defs.ConfirmSelectOk, function (err, _ok) {
if (err !== null)
return cb && cb(err);
else
cb && cb(null, ch);
});
}
});
return ch;
}
}
inherits(Channel, BaseChannel);
module.exports.Channel = Channel;
class Channel extends BaseChannel {
constructor (connection) {
super(connection);
this.on('delivery', this.handleDelivery.bind(this));
this.on('cancel', this.handleCancel.bind(this));
}
CallbackModel.prototype.createChannel = function(cb) {
var ch = new Channel(this.connection);
ch.open(function(err, ok) {
if (err === null) cb && cb(null, ch);
else cb && cb(err);
});
return ch;
};
// This encodes straight-forward RPC: no side-effects and return the
// fields from the server response. It wraps the callback given it, so
// the calling method argument can be passed as-is. For anything that
// needs to have side-effects, or needs to change the server response,
// use `#_rpc(...)` and remember to dereference `.fields` of the
// server response.
rpc (method, fields, expect, cb0) {
var cb = callbackWrapper(this, cb0);
this._rpc(method, fields, expect, function (err, ok) {
cb(err, ok && ok.fields); // in case of an error, ok will be
// Wrap an RPC callback to make sure the callback is invoked with
// either `(null, value)` or `(error)`, i.e., never two non-null
// values. Also substitutes a stub if the callback is `undefined` or
// otherwise falsey, for convenience in methods for which the callback
// is optional (that is, most of them).
function callbackWrapper(ch, cb) {
return (cb) ? function(err, ok) {
if (err === null) {
cb(null, ok);
}
else cb(err);
} : function() {};
}
// undefined
});
return this;
}
// This encodes straight-forward RPC: no side-effects and return the
// fields from the server response. It wraps the callback given it, so
// the calling method argument can be passed as-is. For anything that
// needs to have side-effects, or needs to change the server response,
// use `#_rpc(...)` and remember to dereference `.fields` of the
// server response.
Channel.prototype.rpc = function(method, fields, expect, cb0) {
var cb = callbackWrapper(this, cb0);
this._rpc(method, fields, expect, function(err, ok) {
cb(err, ok && ok.fields); // in case of an error, ok will be
// undefined
});
return this;
};
// === Public API ===
open (cb) {
try { this.allocate(); }
catch (e) { return cb(e); }
// === Public API ===
return this.rpc(defs.ChannelOpen, { outOfBand: "" },
defs.ChannelOpenOk, cb);
}
Channel.prototype.open = function(cb) {
try { this.allocate(); }
catch (e) { return cb(e); }
close (cb) {
return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
function () { cb && cb(null); });
}
return this.rpc(defs.ChannelOpen, {outOfBand: ""},
defs.ChannelOpenOk, cb);
};
assertQueue (queue, options, cb) {
return this.rpc(defs.QueueDeclare,
Args.assertQueue(queue, options),
defs.QueueDeclareOk, cb);
}
Channel.prototype.close = function(cb) {
return this.closeBecause("Goodbye", defs.constants.REPLY_SUCCESS,
function() { cb && cb(null); });
};
checkQueue (queue, cb) {
return this.rpc(defs.QueueDeclare,
Args.checkQueue(queue),
defs.QueueDeclareOk, cb);
}
Channel.prototype.assertQueue = function(queue, options, cb) {
return this.rpc(defs.QueueDeclare,
Args.assertQueue(queue, options),
defs.QueueDeclareOk, cb);
};
deleteQueue (queue, options, cb) {
return this.rpc(defs.QueueDelete,
Args.deleteQueue(queue, options),
defs.QueueDeleteOk, cb);
}
Channel.prototype.checkQueue = function(queue, cb) {
return this.rpc(defs.QueueDeclare,
Args.checkQueue(queue),
defs.QueueDeclareOk, cb);
};
purgeQueue (queue, cb) {
return this.rpc(defs.QueuePurge,
Args.purgeQueue(queue),
defs.QueuePurgeOk, cb);
}
Channel.prototype.deleteQueue = function(queue, options, cb) {
return this.rpc(defs.QueueDelete,
Args.deleteQueue(queue, options),
defs.QueueDeleteOk, cb);
};
Channel.prototype.purgeQueue = function(queue, cb) {
return this.rpc(defs.QueuePurge,
Args.purgeQueue(queue),
defs.QueuePurgeOk, cb);
};
Channel.prototype.bindQueue =
function(queue, source, pattern, argt, cb) {
bindQueue (queue, source, pattern, argt, cb) {
return this.rpc(defs.QueueBind,
Args.bindQueue(queue, source, pattern, argt),
defs.QueueBindOk, cb);
};
Args.bindQueue(queue, source, pattern, argt),
defs.QueueBindOk, cb);
}
Channel.prototype.unbindQueue =
function(queue, source, pattern, argt, cb) {
unbindQueue (queue, source, pattern, argt, cb) {
return this.rpc(defs.QueueUnbind,
Args.unbindQueue(queue, source, pattern, argt),
defs.QueueUnbindOk, cb);
};
Args.unbindQueue(queue, source, pattern, argt),
defs.QueueUnbindOk, cb);
}
Channel.prototype.assertExchange = function(ex, type, options, cb0) {
var cb = callbackWrapper(this, cb0);
this._rpc(defs.ExchangeDeclare,
Args.assertExchange(ex, type, options),
defs.ExchangeDeclareOk,
function(e, _) { cb(e, {exchange: ex}); });
return this;
};
assertExchange (ex, type, options, cb0) {
var cb = callbackWrapper(this, cb0);
this._rpc(defs.ExchangeDeclare,
Args.assertExchange(ex, type, options),
defs.ExchangeDeclareOk,
function (e, _) { cb(e, { exchange: ex }); });
return this;
}
Channel.prototype.checkExchange = function(exchange, cb) {
return this.rpc(defs.ExchangeDeclare,
Args.checkExchange(exchange),
defs.ExchangeDeclareOk, cb);
};
checkExchange (exchange, cb) {
return this.rpc(defs.ExchangeDeclare,
Args.checkExchange(exchange),
defs.ExchangeDeclareOk, cb);
}
Channel.prototype.deleteExchange = function(exchange, options, cb) {
return this.rpc(defs.ExchangeDelete,
Args.deleteExchange(exchange, options),
defs.ExchangeDeleteOk, cb);
};
deleteExchange (exchange, options, cb) {
return this.rpc(defs.ExchangeDelete,
Args.deleteExchange(exchange, options),
defs.ExchangeDeleteOk, cb);
}
Channel.prototype.bindExchange =
function(dest, source, pattern, argt, cb) {
bindExchange (dest, source, pattern, argt, cb) {
return this.rpc(defs.ExchangeBind,
Args.bindExchange(dest, source, pattern, argt),
defs.ExchangeBindOk, cb);
};
Args.bindExchange(dest, source, pattern, argt),
defs.ExchangeBindOk, cb);
}
Channel.prototype.unbindExchange =
function(dest, source, pattern, argt, cb) {
unbindExchange (dest, source, pattern, argt, cb) {
return this.rpc(defs.ExchangeUnbind,
Args.unbindExchange(dest, source, pattern, argt),
defs.ExchangeUnbindOk, cb);
};
Args.unbindExchange(dest, source, pattern, argt),
defs.ExchangeUnbindOk, cb);
}
Channel.prototype.publish =
function(exchange, routingKey, content, options) {
publish (exchange, routingKey, content, options) {
var fieldsAndProps = Args.publish(exchange, routingKey, options);
return this.sendMessage(fieldsAndProps, fieldsAndProps, content);
};
}
Channel.prototype.sendToQueue = function(queue, content, options) {
return this.publish('', queue, content, options);
};
sendToQueue (queue, content, options) {
return this.publish('', queue, content, options);
}
Channel.prototype.consume = function(queue, callback, options, cb0) {
var cb = callbackWrapper(this, cb0);
var fields = Args.consume(queue, options);
var self = this;
this._rpc(
defs.BasicConsume, fields, defs.BasicConsumeOk,
function(err, ok) {
if (err === null) {
self.registerConsumer(ok.fields.consumerTag, callback);
cb(null, ok.fields);
}
else cb(err);
});
return this;
};
consume (queue, callback, options, cb0) {
var cb = callbackWrapper(this, cb0);
var fields = Args.consume(queue, options);
var self = this;
this._rpc(
defs.BasicConsume, fields, defs.BasicConsumeOk,
function (err, ok) {
if (err === null) {
self.registerConsumer(ok.fields.consumerTag, callback);
cb(null, ok.fields);
}
else
cb(err);
});
return this;
}
Channel.prototype.cancel = function(consumerTag, cb0) {
var cb = callbackWrapper(this, cb0);
var self = this;
this._rpc(
defs.BasicCancel, Args.cancel(consumerTag), defs.BasicCancelOk,
function(err, ok) {
cancel (consumerTag, cb0) {
var cb = callbackWrapper(this, cb0);
var self = this;
this._rpc(
defs.BasicCancel, Args.cancel(consumerTag), defs.BasicCancelOk,
function (err, ok) {
if (err === null) {
self.unregisterConsumer(consumerTag);
cb(null, ok.fields);
}
else
cb(err);
});
return this;
}
get (queue, options, cb0) {
var self = this;
var fields = Args.get(queue, options);
var cb = callbackWrapper(this, cb0);
this.sendOrEnqueue(defs.BasicGet, fields, function (err, f) {
if (err === null) {
self.unregisterConsumer(consumerTag);
cb(null, ok.fields);
if (f.id === defs.BasicGetEmpty) {
cb(null, false);
}
else if (f.id === defs.BasicGetOk) {
self.handleMessage = acceptMessage(function (m) {
m.fields = f.fields;
cb(null, m);
});
}
else {
cb(new Error("Unexpected response to BasicGet: " +
inspect(f)));
}
}
else cb(err);
});
return this;
};
return this;
}
Channel.prototype.get = function(queue, options, cb0) {
var self = this;
var fields = Args.get(queue, options);
var cb = callbackWrapper(this, cb0);
this.sendOrEnqueue(defs.BasicGet, fields, function(err, f) {
if (err === null) {
if (f.id === defs.BasicGetEmpty) {
cb(null, false);
}
else if (f.id === defs.BasicGetOk) {
self.handleMessage = acceptMessage(function(m) {
m.fields = f.fields;
cb(null, m);
});
}
else {
cb(new Error("Unexpected response to BasicGet: " +
inspect(f)));
}
}
});
return this;
};
ack (message, allUpTo) {
this.sendImmediately(
defs.BasicAck, Args.ack(message.fields.deliveryTag, allUpTo));
return this;
}
Channel.prototype.ack = function(message, allUpTo) {
this.sendImmediately(
defs.BasicAck, Args.ack(message.fields.deliveryTag, allUpTo));
return this;
};
ackAll () {
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
return this;
}
Channel.prototype.ackAll = function() {
this.sendImmediately(defs.BasicAck, Args.ack(0, true));
return this;
};
nack (message, allUpTo, requeue) {
this.sendImmediately(
defs.BasicNack,
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
return this;
}
Channel.prototype.nack = function(message, allUpTo, requeue) {
this.sendImmediately(
defs.BasicNack,
Args.nack(message.fields.deliveryTag, allUpTo, requeue));
return this;
};
nackAll (requeue) {
this.sendImmediately(
defs.BasicNack, Args.nack(0, true, requeue));
return this;
}
Channel.prototype.nackAll = function(requeue) {
this.sendImmediately(
defs.BasicNack, Args.nack(0, true, requeue))
return this;
};
reject (message, requeue) {
this.sendImmediately(
defs.BasicReject,
Args.reject(message.fields.deliveryTag, requeue));
return this;
}
Channel.prototype.reject = function(message, requeue) {
this.sendImmediately(
defs.BasicReject,
Args.reject(message.fields.deliveryTag, requeue));
return this;
};
prefetch (count, global, cb) {
return this.rpc(defs.BasicQos,
Args.prefetch(count, global),
defs.BasicQosOk, cb);
}
Channel.prototype.prefetch = function(count, global, cb) {
return this.rpc(defs.BasicQos,
Args.prefetch(count, global),
defs.BasicQosOk, cb);
};
recover (cb) {
return this.rpc(defs.BasicRecover,
Args.recover(),
defs.BasicRecoverOk, cb);
}
}
Channel.prototype.recover = function(cb) {
return this.rpc(defs.BasicRecover,
Args.recover(),
defs.BasicRecoverOk, cb);
};
function ConfirmChannel(connection) {
Channel.call(this, connection);
// Wrap an RPC callback to make sure the callback is invoked with
// either `(null, value)` or `(error)`, i.e., never two non-null
// values. Also substitutes a stub if the callback is `undefined` or
// otherwise falsey, for convenience in methods for which the callback
// is optional (that is, most of them).
function callbackWrapper(ch, cb) {
return (cb) ? function(err, ok) {
if (err === null) {
cb(null, ok);
}
else cb(err);
} : function() {};
}
inherits(ConfirmChannel, Channel);
module.exports.ConfirmChannel = ConfirmChannel;
class ConfirmChannel extends Channel {
publish (exchange, routingKey,
content, options, cb) {
this.pushConfirmCallback(cb);
return Channel.prototype.publish.call(
this, exchange, routingKey, content, options);
}
CallbackModel.prototype.createConfirmChannel = function(cb) {
var ch = new ConfirmChannel(this.connection);
ch.open(function(err) {
if (err !== null) return cb && cb(err);
else {
ch.rpc(defs.ConfirmSelect, {nowait: false},
defs.ConfirmSelectOk, function(err, _ok) {
if (err !== null) return cb && cb(err);
else cb && cb(null, ch);
});
}
});
return ch;
};
sendToQueue (queue, content,
options, cb) {
return this.publish('', queue, content, options, cb);
}
ConfirmChannel.prototype.publish = function(exchange, routingKey,
content, options, cb) {
this.pushConfirmCallback(cb);
return Channel.prototype.publish.call(
this, exchange, routingKey, content, options);
};
waitForConfirms (k) {
var awaiting = [];
var unconfirmed = this.unconfirmed;
unconfirmed.forEach(function (val, index) {
if (val === null)
; // already confirmed
else {
var confirmed = new Promise(function (resolve, reject) {
unconfirmed[index] = function (err) {
if (val)
val(err);
if (err === null)
resolve();
else
reject(err);
};
});
awaiting.push(confirmed);
}
});
return Promise.all(awaiting).then(function () { k(); },
function (err) { k(err); });
}
}
ConfirmChannel.prototype.sendToQueue = function(queue, content,
options, cb) {
return this.publish('', queue, content, options, cb);
};
ConfirmChannel.prototype.waitForConfirms = function(k) {
var awaiting = [];
var unconfirmed = this.unconfirmed;
unconfirmed.forEach(function(val, index) {
if (val === null); // already confirmed
else {
var confirmed = new Promise(function(resolve, reject) {
unconfirmed[index] = function(err) {
if (val) val(err);
if (err === null) resolve();
else reject(err);
};
});
awaiting.push(confirmed);
}
});
return Promise.all(awaiting).then(function() { k(); },
function(err) { k(err); });
};
module.exports.CallbackModel = CallbackModel;
module.exports.Channel = Channel;
module.exports.ConfirmChannel = ConfirmChannel;

@@ -29,2 +29,6 @@ //

updateSecret(newSecret, reason) {
return promisify(this.connection._updateSecret.bind(this.connection))(newSecret, reason);
}
async createChannel() {

@@ -268,3 +272,3 @@ const channel = new Channel(this.connection);

this.pushConfirmCallback(cb);
return Channel.prototype.publish.call(this, exchange, routingKey, content, options);
return super.publish(exchange, routingKey, content, options);
}

@@ -271,0 +275,0 @@

@@ -14,132 +14,342 @@ //

var assert = require('assert');
var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var EventEmitter = require('events');
var fmt = require('util').format;
var IllegalOperationError = require('./error').IllegalOperationError;
var stackCapture = require('./error').stackCapture;
function Channel(connection) {
EventEmitter.call( this );
this.connection = connection;
// for the presently outstanding RPC
this.reply = null;
// for the RPCs awaiting action
this.pending = [];
// for unconfirmed messages
this.lwm = 1; // the least, unconfirmed deliveryTag
this.unconfirmed = []; // rolling window of delivery callbacks
this.on('ack', this.handleConfirm.bind(this, function(cb) {
if (cb) cb(null);
}));
this.on('nack', this.handleConfirm.bind(this, function(cb) {
if (cb) cb(new Error('message nacked'));
}));
this.on('close', function () {
var cb;
while (cb = this.unconfirmed.shift()) {
if (cb) cb(new Error('channel closed'));
class Channel extends EventEmitter {
constructor (connection) {
super();
this.connection = connection;
// for the presently outstanding RPC
this.reply = null;
// for the RPCs awaiting action
this.pending = [];
// for unconfirmed messages
this.lwm = 1; // the least, unconfirmed deliveryTag
this.unconfirmed = []; // rolling window of delivery callbacks
this.on('ack', this.handleConfirm.bind(this, function (cb) {
if (cb)
cb(null);
}));
this.on('nack', this.handleConfirm.bind(this, function (cb) {
if (cb)
cb(new Error('message nacked'));
}));
this.on('close', function () {
var cb;
while (cb = this.unconfirmed.shift()) {
if (cb)
cb(new Error('channel closed'));
}
});
// message frame state machine
this.handleMessage = acceptDeliveryOrReturn;
}
allocate () {
this.ch = this.connection.freshChannel(this);
return this;
}
// Incoming frames are either notifications of e.g., message delivery,
// or replies to something we've sent. In general I deal with the
// former by emitting an event, and with the latter by keeping a track
// of what's expecting a reply.
//
// The AMQP specification implies that RPCs can't be pipelined; that
// is, you can have only one outstanding RPC on a channel at a
// time. Certainly that's what RabbitMQ and its clients assume. For
// this reason, I buffer RPCs if the channel is already waiting for a
// reply.
// Just send the damn frame.
sendImmediately (method, fields) {
return this.connection.sendMethod(this.ch, method, fields);
}
// Invariant: !this.reply -> pending.length == 0. That is, whenever we
// clear a reply, we must send another RPC (and thereby fill
// this.reply) if there is one waiting. The invariant relevant here
// and in `accept`.
sendOrEnqueue (method, fields, reply) {
if (!this.reply) { // if no reply waiting, we can go
assert(this.pending.length === 0);
this.reply = reply;
this.sendImmediately(method, fields);
}
})
// message frame state machine
this.handleMessage = acceptDeliveryOrReturn;
}
inherits(Channel, EventEmitter);
else {
this.pending.push({
method: method,
fields: fields,
reply: reply
});
}
}
module.exports.Channel = Channel;
module.exports.acceptMessage = acceptMessage;
sendMessage (fields, properties, content) {
return this.connection.sendMessage(
this.ch,
defs.BasicPublish, fields,
defs.BasicProperties, properties,
content);
}
var C = Channel.prototype;
// Internal, synchronously resolved RPC; the return value is resolved
// with the whole frame.
_rpc (method, fields, expect, cb) {
var self = this;
C.allocate = function() {
this.ch = this.connection.freshChannel(this);
return this;
}
function reply (err, f) {
if (err === null) {
if (f.id === expect) {
return cb(null, f);
}
else {
// We have detected a problem, so it's up to us to close the
// channel
var expectedName = methodName(expect);
// Incoming frames are either notifications of e.g., message delivery,
// or replies to something we've sent. In general I deal with the
// former by emitting an event, and with the latter by keeping a track
// of what's expecting a reply.
//
// The AMQP specification implies that RPCs can't be pipelined; that
// is, you can have only one outstanding RPC on a channel at a
// time. Certainly that's what RabbitMQ and its clients assume. For
// this reason, I buffer RPCs if the channel is already waiting for a
// reply.
var e = new Error(fmt("Expected %s; got %s",
expectedName, inspect(f, false)));
self.closeWithError(f.id, fmt('Expected %s; got %s',
expectedName, methodName(f.id)),
defs.constants.UNEXPECTED_FRAME, e);
return cb(e);
}
}
// Just send the damn frame.
C.sendImmediately = function(method, fields) {
return this.connection.sendMethod(this.ch, method, fields);
};
// Invariant: !this.reply -> pending.length == 0. That is, whenever we
// clear a reply, we must send another RPC (and thereby fill
// this.reply) if there is one waiting. The invariant relevant here
// and in `accept`.
C.sendOrEnqueue = function(method, fields, reply) {
if (!this.reply) { // if no reply waiting, we can go
assert(this.pending.length === 0);
this.reply = reply;
this.sendImmediately(method, fields);
// An error will be given if, for example, this is waiting to be
// sent and the connection closes
else if (err instanceof Error)
return cb(err);
// A close frame will be given if this is the RPC awaiting reply
// and the channel is closed by the server
else {
// otherwise, it's a close frame
var closeReason = (err.fields.classId << 16) + err.fields.methodId;
var e = (method === closeReason)
? fmt("Operation failed: %s; %s",
methodName(method), closeMsg(err))
: fmt("Channel closed by server: %s", closeMsg(err));
var closeFrameError = new Error(e);
closeFrameError.code = err.fields.replyCode;
closeFrameError.classId = err.fields.classId;
closeFrameError.methodId = err.fields.methodId;
return cb(closeFrameError);
}
}
this.sendOrEnqueue(method, fields, reply);
}
else {
this.pending.push({method: method,
fields: fields,
reply: reply});
// Move to entirely closed state.
toClosed (capturedStack) {
this._rejectPending();
invalidateSend(this, 'Channel closed', capturedStack);
this.accept = invalidOp('Channel closed', capturedStack);
this.connection.releaseChannel(this.ch);
this.emit('close');
}
};
C.sendMessage = function(fields, properties, content) {
return this.connection.sendMessage(
this.ch,
defs.BasicPublish, fields,
defs.BasicProperties, properties,
content);
};
// Stop being able to send and receive methods and content. Used when
// we close the channel. Invokes the continuation once the server has
// acknowledged the close, but before the channel is moved to the
// closed state.
toClosing (capturedStack, k) {
var send = this.sendImmediately.bind(this);
invalidateSend(this, 'Channel closing', capturedStack);
// Internal, synchronously resolved RPC; the return value is resolved
// with the whole frame.
C._rpc = function(method, fields, expect, cb) {
var self = this;
this.accept = function (f) {
if (f.id === defs.ChannelCloseOk) {
if (k)
k();
var s = stackCapture('ChannelCloseOk frame received');
this.toClosed(s);
}
else if (f.id === defs.ChannelClose) {
send(defs.ChannelCloseOk, {});
}
// else ignore frame
};
}
function reply(err, f) {
if (err === null) {
if (f.id === expect) {
return cb(null, f);
_rejectPending () {
function rej (r) {
r(new Error("Channel ended, no reply will be forthcoming"));
}
if (this.reply !== null)
rej(this.reply);
this.reply = null;
var discard;
while (discard = this.pending.shift())
rej(discard.reply);
this.pending = null; // so pushes will break
}
closeBecause (reason, code, k) {
this.sendImmediately(defs.ChannelClose, {
replyText: reason,
replyCode: code,
methodId: 0, classId: 0
});
var s = stackCapture('closeBecause called: ' + reason);
this.toClosing(s, k);
}
// If we close because there's been an error, we need to distinguish
// between what we tell the server (`reason`) and what we report as
// the cause in the client (`error`).
closeWithError (id, reason, code, error) {
var self = this;
this.closeBecause(reason, code, function () {
error.code = code;
// content frames and consumer errors do not provide a method a class/method ID
if (id) {
error.classId = defs.info(id).classId;
error.methodId = defs.info(id).methodId;
}
else {
// We have detected a problem, so it's up to us to close the
// channel
var expectedName = methodName(expect);
self.emit('error', error);
});
}
var e = new Error(fmt("Expected %s; got %s",
expectedName, inspect(f, false)));
self.closeWithError(f.id, fmt('Expected %s; got %s',
expectedName, methodName(f.id)),
defs.constants.UNEXPECTED_FRAME, e);
return cb(e);
// A trampolining state machine for message frames on a channel. A
// message arrives in at least two frames: first, a method announcing
// the message (either a BasicDeliver or BasicGetOk); then, a message
// header with the message properties; then, zero or more content
// frames.
// Keep the try/catch localised, in an attempt to avoid disabling
// optimisation
acceptMessageFrame (f) {
try {
this.handleMessage = this.handleMessage(f);
}
catch (msg) {
if (typeof msg === 'string') {
this.closeWithError(f.id, msg, defs.constants.UNEXPECTED_FRAME,
new Error(msg));
}
else if (msg instanceof Error) {
this.closeWithError(f.id, 'Error while processing message',
defs.constants.INTERNAL_ERROR, msg);
}
else {
this.closeWithError(f.id, 'Internal error while processing message',
defs.constants.INTERNAL_ERROR,
new Error(msg.toString()));
}
}
// An error will be given if, for example, this is waiting to be
// sent and the connection closes
else if (err instanceof Error) return cb(err);
// A close frame will be given if this is the RPC awaiting reply
// and the channel is closed by the server
}
handleConfirm (handle, f) {
var tag = f.deliveryTag;
var multi = f.multiple;
if (multi) {
var confirmed = this.unconfirmed.splice(0, tag - this.lwm + 1);
this.lwm = tag + 1;
confirmed.forEach(handle);
}
else {
// otherwise, it's a close frame
var closeReason =
(err.fields.classId << 16) + err.fields.methodId;
var e = (method === closeReason)
? fmt("Operation failed: %s; %s",
methodName(method), closeMsg(err))
: fmt("Channel closed by server: %s", closeMsg(err));
var closeFrameError = new Error(e);
closeFrameError.code = err.fields.replyCode;
closeFrameError.classId = err.fields.classId;
closeFrameError.methodId = err.fields.methodId;
return cb(closeFrameError);
var c;
if (tag === this.lwm) {
c = this.unconfirmed.shift();
this.lwm++;
// Advance the LWM and the window to the next non-gap, or
// possibly to the end
while (this.unconfirmed[0] === null) {
this.unconfirmed.shift();
this.lwm++;
}
}
else {
c = this.unconfirmed[tag - this.lwm];
this.unconfirmed[tag - this.lwm] = null;
}
// Technically, in the single-deliveryTag case, I should report a
// protocol breach if it's already been confirmed.
handle(c);
}
}
this.sendOrEnqueue(method, fields, reply);
};
pushConfirmCallback (cb) {
// `null` is used specifically for marking already confirmed slots,
// so I coerce `undefined` and `null` to false; functions are never
// falsey.
this.unconfirmed.push(cb || false);
}
onBufferDrain () {
this.emit('drain');
}
accept(f) {
switch (f.id) {
// Message frames
case undefined: // content frame!
case defs.BasicDeliver:
case defs.BasicReturn:
case defs.BasicProperties:
return this.acceptMessageFrame(f);
// confirmations, need to do confirm.select first
case defs.BasicAck:
return this.emit('ack', f.fields);
case defs.BasicNack:
return this.emit('nack', f.fields);
case defs.BasicCancel:
// The broker can send this if e.g., the queue is deleted.
return this.emit('cancel', f.fields);
case defs.ChannelClose:
// Any remote closure is an error to us. Reject the pending reply
// with the close frame, so it can see whether it was that
// operation that caused it to close.
if (this.reply) {
var reply = this.reply; this.reply = null;
reply(f);
}
var emsg = "Channel closed by server: " + closeMsg(f);
this.sendImmediately(defs.ChannelCloseOk, {});
var error = new Error(emsg);
error.code = f.fields.replyCode;
error.classId = f.fields.classId;
error.methodId = f.fields.methodId;
this.emit('error', error);
var s = stackCapture(emsg);
this.toClosed(s);
return;
case defs.BasicFlow:
// RabbitMQ doesn't send this, it just blocks the TCP socket
return this.closeWithError(f.id, "Flow not implemented",
defs.constants.NOT_IMPLEMENTED,
new Error('Flow not implemented'));
default: // assume all other things are replies
// Resolving the reply may lead to another RPC; to make sure we
// don't hold that up, clear this.reply
var reply = this.reply; this.reply = null;
// however, maybe there's an RPC waiting to go? If so, that'll
// fill this.reply again, restoring the invariant. This does rely
// on any response being recv'ed after resolving the promise,
// below; hence, I use synchronous defer.
if (this.pending.length > 0) {
var send = this.pending.shift();
this.reply = send.reply;
this.sendImmediately(send.method, send.fields);
}
return reply(null, f);
}
}
}
// Shutdown protocol. There's three scenarios:

@@ -169,99 +379,2 @@ //

// Move to entirely closed state.
C.toClosed = function(capturedStack) {
this._rejectPending();
invalidateSend(this, 'Channel closed', capturedStack);
this.accept = invalidOp('Channel closed', capturedStack);
this.connection.releaseChannel(this.ch);
this.emit('close');
};
// Stop being able to send and receive methods and content. Used when
// we close the channel. Invokes the continuation once the server has
// acknowledged the close, but before the channel is moved to the
// closed state.
C.toClosing = function(capturedStack, k) {
var send = this.sendImmediately.bind(this);
invalidateSend(this, 'Channel closing', capturedStack);
this.accept = function(f) {
if (f.id === defs.ChannelCloseOk) {
if (k) k();
var s = stackCapture('ChannelCloseOk frame received');
this.toClosed(s);
}
else if (f.id === defs.ChannelClose) {
send(defs.ChannelCloseOk, {});
}
// else ignore frame
};
};
C._rejectPending = function() {
function rej(r) {
r(new Error("Channel ended, no reply will be forthcoming"));
}
if (this.reply !== null) rej(this.reply);
this.reply = null;
var discard;
while (discard = this.pending.shift()) rej(discard.reply);
this.pending = null; // so pushes will break
};
C.closeBecause = function(reason, code, k) {
this.sendImmediately(defs.ChannelClose, {
replyText: reason,
replyCode: code,
methodId:0, classId: 0
});
var s = stackCapture('closeBecause called: ' + reason);
this.toClosing(s, k);
};
// If we close because there's been an error, we need to distinguish
// between what we tell the server (`reason`) and what we report as
// the cause in the client (`error`).
C.closeWithError = function(id, reason, code, error) {
var self = this;
this.closeBecause(reason, code, function() {
error.code = code;
// content frames and consumer errors do not provide a method a class/method ID
if (id) {
error.classId = defs.info(id).classId;
error.methodId = defs.info(id).methodId;
}
self.emit('error', error);
});
};
// A trampolining state machine for message frames on a channel. A
// message arrives in at least two frames: first, a method announcing
// the message (either a BasicDeliver or BasicGetOk); then, a message
// header with the message properties; then, zero or more content
// frames.
// Keep the try/catch localised, in an attempt to avoid disabling
// optimisation
C.acceptMessageFrame = function(f) {
try {
this.handleMessage = this.handleMessage(f);
}
catch (msg) {
if (typeof msg === 'string') {
this.closeWithError(f.id, msg, defs.constants.UNEXPECTED_FRAME,
new Error(msg));
}
else if (msg instanceof Error) {
this.closeWithError(f.id, 'Error while processing message',
defs.constants.INTERNAL_ERROR, msg);
}
else {
this.closeWithError(f.id, 'Internal error while processing message',
defs.constants.INTERNAL_ERROR,
new Error(msg.toString()));
}
}
};
// Kick off a message delivery given a BasicDeliver or BasicReturn

@@ -352,151 +465,45 @@ // frame (BasicGet uses the RPC mechanism)

C.handleConfirm = function(handle, f) {
var tag = f.deliveryTag;
var multi = f.multiple;
// This adds just a bit more stuff useful for the APIs, but not
// low-level machinery.
class BaseChannel extends Channel {
constructor (connection) {
super(connection);
this.consumers = new Map();
}
if (multi) {
var confirmed = this.unconfirmed.splice(0, tag - this.lwm + 1);
this.lwm = tag + 1;
confirmed.forEach(handle);
// Not sure I like the ff, it's going to be changing hidden classes
// all over the place. On the other hand, whaddya do.
registerConsumer (tag, callback) {
this.consumers.set(tag, callback);
}
else {
var c;
if (tag === this.lwm) {
c = this.unconfirmed.shift();
this.lwm++;
// Advance the LWM and the window to the next non-gap, or
// possibly to the end
while (this.unconfirmed[0] === null) {
this.unconfirmed.shift();
this.lwm++;
}
unregisterConsumer (tag) {
this.consumers.delete(tag);
}
dispatchMessage (fields, message) {
var consumerTag = fields.consumerTag;
var consumer = this.consumers.get(consumerTag);
if (consumer) {
return consumer(message);
}
else {
c = this.unconfirmed[tag - this.lwm];
this.unconfirmed[tag - this.lwm] = null;
// %%% Surely a race here
throw new Error("Unknown consumer: " + consumerTag);
}
// Technically, in the single-deliveryTag case, I should report a
// protocol breach if it's already been confirmed.
handle(c);
}
};
C.pushConfirmCallback = function(cb) {
// `null` is used specifically for marking already confirmed slots,
// so I coerce `undefined` and `null` to false; functions are never
// falsey.
this.unconfirmed.push(cb || false);
};
handleDelivery (message) {
return this.dispatchMessage(message.fields, message);
}
// Interface for connection to use
C.accept = function(f) {
switch (f.id) {
// Message frames
case undefined: // content frame!
case defs.BasicDeliver:
case defs.BasicReturn:
case defs.BasicProperties:
return this.acceptMessageFrame(f);
// confirmations, need to do confirm.select first
case defs.BasicAck:
return this.emit('ack', f.fields);
case defs.BasicNack:
return this.emit('nack', f.fields);
case defs.BasicCancel:
// The broker can send this if e.g., the queue is deleted.
return this.emit('cancel', f.fields);
case defs.ChannelClose:
// Any remote closure is an error to us. Reject the pending reply
// with the close frame, so it can see whether it was that
// operation that caused it to close.
if (this.reply) {
var reply = this.reply; this.reply = null;
reply(f);
}
var emsg = "Channel closed by server: " + closeMsg(f);
this.sendImmediately(defs.ChannelCloseOk, {});
var error = new Error(emsg);
error.code = f.fields.replyCode;
error.classId = f.fields.classId;
error.methodId = f.fields.methodId;
this.emit('error', error);
var s = stackCapture(emsg);
this.toClosed(s);
return;
case defs.BasicFlow:
// RabbitMQ doesn't send this, it just blocks the TCP socket
return this.closeWithError(f.id, "Flow not implemented",
defs.constants.NOT_IMPLEMENTED,
new Error('Flow not implemented'));
default: // assume all other things are replies
// Resolving the reply may lead to another RPC; to make sure we
// don't hold that up, clear this.reply
var reply = this.reply; this.reply = null;
// however, maybe there's an RPC waiting to go? If so, that'll
// fill this.reply again, restoring the invariant. This does rely
// on any response being recv'ed after resolving the promise,
// below; hence, I use synchronous defer.
if (this.pending.length > 0) {
var send = this.pending.shift();
this.reply = send.reply;
this.sendImmediately(send.method, send.fields);
}
return reply(null, f);
handleCancel (fields) {
var result = this.dispatchMessage(fields, null);
this.unregisterConsumer(fields.consumerTag);
return result;
}
};
C.onBufferDrain = function() {
this.emit('drain');
};
// This adds just a bit more stuff useful for the APIs, but not
// low-level machinery.
function BaseChannel(connection) {
Channel.call(this, connection);
this.consumers = new Map();
}
inherits(BaseChannel, Channel);
module.exports.acceptMessage = acceptMessage;
module.exports.BaseChannel = BaseChannel;
// Not sure I like the ff, it's going to be changing hidden classes
// all over the place. On the other hand, whaddya do.
BaseChannel.prototype.registerConsumer = function(tag, callback) {
this.consumers.set(tag, callback);
};
BaseChannel.prototype.unregisterConsumer = function(tag) {
this.consumers.delete(tag);
};
BaseChannel.prototype.dispatchMessage = function(fields, message) {
var consumerTag = fields.consumerTag;
var consumer = this.consumers.get(consumerTag);
if (consumer) {
return consumer(message);
}
else {
// %%% Surely a race here
throw new Error("Unknown consumer: " + consumerTag);
}
};
BaseChannel.prototype.handleDelivery = function(message) {
return this.dispatchMessage(message.fields, message);
};
BaseChannel.prototype.handleCancel = function(fields) {
var result = this.dispatchMessage(fields, null);
this.unregisterConsumer(fields.consumerTag);
return result;
};
module.exports.Channel = Channel;

@@ -265,3 +265,3 @@ //

len = slice.readUInt32BE(offset); offset += 4;
val = decodeFields(slice.slice(offset, offset + len));
val = decodeFields(slice.subarray(offset, offset + len));
offset += len;

@@ -294,3 +294,3 @@ break;

len = slice.readUInt32BE(offset); offset += 4;
val = slice.slice(offset, offset + len);
val = slice.subarray(offset, offset + len);
offset += len;

@@ -297,0 +297,0 @@ break;

@@ -16,3 +16,3 @@ //

require('readable-stream/duplex');
var EventEmitter = require('events').EventEmitter;
var EventEmitter = require('events');
var Heart = require('./heartbeat').Heart;

@@ -25,3 +25,2 @@

var BitSet = require('./bitset').BitSet;
var inherits = require('util').inherits;
var fmt = require('util').format;

@@ -42,199 +41,121 @@ var PassThrough = require('stream').PassThrough ||

function Connection(underlying) {
EventEmitter.call( this );
var stream = this.stream = wrapStream(underlying);
this.muxer = new Mux(stream);
class Connection extends EventEmitter {
constructor (underlying) {
super();
// frames
this.rest = Buffer.alloc(0);
this.frameMax = constants.FRAME_MIN_SIZE;
this.sentSinceLastCheck = false;
this.recvSinceLastCheck = false;
var stream = this.stream = wrapStream(underlying);
this.muxer = new Mux(stream);
this.expectSocketClose = false;
this.freeChannels = new BitSet();
this.channels = [{channel: {accept: channel0(this)},
buffer: underlying}];
}
inherits(Connection, EventEmitter);
// frames
this.rest = Buffer.alloc(0);
this.frameMax = constants.FRAME_MIN_SIZE;
this.sentSinceLastCheck = false;
this.recvSinceLastCheck = false;
var C = Connection.prototype;
this.expectSocketClose = false;
this.freeChannels = new BitSet();
this.channels = [{
channel: { accept: channel0(this) },
buffer: underlying
}];
}
// Usual frame accept mode
function mainAccept(frame) {
var rec = this.channels[frame.channel];
if (rec) { return rec.channel.accept(frame); }
// NB CHANNEL_ERROR may not be right, but I don't know what is ..
else
this.closeWithError(
fmt('Frame on unknown channel %d', frame.channel),
constants.CHANNEL_ERROR,
new Error(fmt("Frame on unknown channel: %s",
inspect(frame, false))));
}
// This changed between versions, as did the codec, methods, etc. AMQP
// 0-9-1 is fairly similar to 0.8, but better, and nothing implements
// 0.8 that doesn't implement 0-9-1. In other words, it doesn't make
// much sense to generalise here.
sendProtocolHeader () {
this.sendBytes(frame.PROTOCOL_HEADER);
}
// Handle anything that comes through on channel 0, that's the
// connection control channel. This is only used once mainAccept is
// installed as the frame handler, after the opening handshake.
function channel0(connection) {
return function(f) {
// Once we get a 'close', we know 1. we'll get no more frames, and
// 2. anything we send except close, or close-ok, will be
// ignored. If we already sent 'close', this won't be invoked since
// we're already in closing mode; if we didn't well we're not going
// to send it now are we.
if (f === HEARTBEAT); // ignore; it's already counted as activity
// on the socket, which is its purpose
else if (f.id === defs.ConnectionClose) {
// Oh. OK. I guess we're done here then.
connection.sendMethod(0, defs.ConnectionCloseOk, {});
var emsg = fmt('Connection closed: %s', closeMsg(f));
var s = stackCapture(emsg);
var e = new Error(emsg);
e.code = f.fields.replyCode;
if (isFatalError(e)) {
connection.emit('error', e);
}
connection.toClosed(s, e);
}
else if (f.id === defs.ConnectionBlocked) {
connection.emit('blocked', f.fields.reason);
}
else if (f.id === defs.ConnectionUnblocked) {
connection.emit('unblocked');
}
else {
connection.closeWithError(
fmt("Unexpected frame on channel 0"),
constants.UNEXPECTED_FRAME,
new Error(fmt("Unexpected frame on channel 0: %s",
inspect(f, false))));
}
};
}
/*
The frighteningly complicated opening protocol (spec section 2.2.4):
// This changed between versions, as did the codec, methods, etc. AMQP
// 0-9-1 is fairly similar to 0.8, but better, and nothing implements
// 0.8 that doesn't implement 0-9-1. In other words, it doesn't make
// much sense to generalise here.
C.sendProtocolHeader = function() {
this.sendBytes(frame.PROTOCOL_HEADER);
};
Client -> Server
/*
The frighteningly complicated opening protocol (spec section 2.2.4):
protocol header ->
<- start
start-ok ->
.. next two zero or more times ..
<- secure
secure-ok ->
<- tune
tune-ok ->
open ->
<- open-ok
Client -> Server
If I'm only supporting SASL's PLAIN mechanism (which I am for the time
being), it gets a bit easier since the server won't in general send
back a `secure`, it'll just send `tune` after the `start-ok`.
(SASL PLAIN: http://tools.ietf.org/html/rfc4616)
protocol header ->
<- start
start-ok ->
.. next two zero or more times ..
<- secure
secure-ok ->
<- tune
tune-ok ->
open ->
<- open-ok
*/
open (allFields, openCallback0) {
var self = this;
var openCallback = openCallback0 || function () { };
If I'm only supporting SASL's PLAIN mechanism (which I am for the time
being), it gets a bit easier since the server won't in general send
back a `secure`, it'll just send `tune` after the `start-ok`.
(SASL PLAIN: http://tools.ietf.org/html/rfc4616)
// This is where we'll put our negotiated values
var tunedOptions = Object.create(allFields);
*/
C.open = function(allFields, openCallback0) {
var self = this;
var openCallback = openCallback0 || function() {};
// This is where we'll put our negotiated values
var tunedOptions = Object.create(allFields);
function wait(k) {
self.step(function(err, frame) {
if (err !== null) bail(err);
else if (frame.channel !== 0) {
bail(new Error(
fmt("Frame on channel != 0 during handshake: %s",
function wait (k) {
self.step(function (err, frame) {
if (err !== null)
bail(err);
else if (frame.channel !== 0) {
bail(new Error(
fmt("Frame on channel != 0 during handshake: %s",
inspect(frame, false))));
}
else k(frame);
});
}
}
else
k(frame);
});
}
function expect(Method, k) {
wait(function(frame) {
if (frame.id === Method) k(frame);
else {
bail(new Error(
fmt("Expected %s; got %s",
function expect (Method, k) {
wait(function (frame) {
if (frame.id === Method)
k(frame);
else {
bail(new Error(
fmt("Expected %s; got %s",
methodName(Method), inspect(frame, false))));
}
});
}
}
});
}
function bail(err) {
openCallback(err);
}
function bail (err) {
openCallback(err);
}
function send(Method) {
// This can throw an exception if there's some problem with the
// options; e.g., something is a string instead of a number.
self.sendMethod(0, Method, tunedOptions);
}
function negotiate(server, desired) {
// We get sent values for channelMax, frameMax and heartbeat,
// which we may accept or lower (subject to a minimum for
// frameMax, but we'll leave that to the server to enforce). In
// all cases, `0` really means "no limit", or rather the highest
// value in the encoding, e.g., unsigned short for channelMax.
if (server === 0 || desired === 0) {
// i.e., whichever places a limit, if either
return Math.max(server, desired);
function send (Method) {
// This can throw an exception if there's some problem with the
// options; e.g., something is a string instead of a number.
self.sendMethod(0, Method, tunedOptions);
}
else {
return Math.min(server, desired);
}
}
function onStart(start) {
var mechanisms = start.fields.mechanisms.toString().split(' ');
if (mechanisms.indexOf(allFields.mechanism) < 0) {
bail(new Error(fmt('SASL mechanism %s is not provided by the server',
allFields.mechanism)));
return;
function negotiate (server, desired) {
// We get sent values for channelMax, frameMax and heartbeat,
// which we may accept or lower (subject to a minimum for
// frameMax, but we'll leave that to the server to enforce). In
// all cases, `0` really means "no limit", or rather the highest
// value in the encoding, e.g., unsigned short for channelMax.
if (server === 0 || desired === 0) {
// i.e., whichever places a limit, if either
return Math.max(server, desired);
}
else {
return Math.min(server, desired);
}
}
self.serverProperties = start.fields.serverProperties;
try {
send(defs.ConnectionStartOk);
} catch (err) {
bail(err);
return;
}
wait(afterStartOk);
}
function afterStartOk(reply) {
switch (reply.id) {
case defs.ConnectionSecure:
bail(new Error(
"Wasn't expecting to have to go through secure"));
break;
case defs.ConnectionClose:
bail(new Error(fmt("Handshake terminated by server: %s",
closeMsg(reply))));
break;
case defs.ConnectionTune:
var fields = reply.fields;
tunedOptions.frameMax =
negotiate(fields.frameMax, allFields.frameMax);
tunedOptions.channelMax =
negotiate(fields.channelMax, allFields.channelMax);
tunedOptions.heartbeat =
negotiate(fields.heartbeat, allFields.heartbeat);
function onStart (start) {
var mechanisms = start.fields.mechanisms.toString().split(' ');
if (mechanisms.indexOf(allFields.mechanism) < 0) {
bail(new Error(fmt('SASL mechanism %s is not provided by the server',
allFields.mechanism)));
return;
}
self.serverProperties = start.fields.serverProperties;
try {
send(defs.ConnectionTuneOk);
send(defs.ConnectionOpen);
send(defs.ConnectionStartOk);
} catch (err) {

@@ -244,338 +165,347 @@ bail(err);

}
expect(defs.ConnectionOpenOk, onOpenOk);
break;
default:
bail(new Error(
fmt("Expected connection.secure, connection.close, " +
"or connection.tune during handshake; got %s",
inspect(reply, false))));
break;
wait(afterStartOk);
}
function afterStartOk (reply) {
switch (reply.id) {
case defs.ConnectionSecure:
bail(new Error(
"Wasn't expecting to have to go through secure"));
break;
case defs.ConnectionClose:
bail(new Error(fmt("Handshake terminated by server: %s",
closeMsg(reply))));
break;
case defs.ConnectionTune:
var fields = reply.fields;
tunedOptions.frameMax =
negotiate(fields.frameMax, allFields.frameMax);
tunedOptions.channelMax =
negotiate(fields.channelMax, allFields.channelMax);
tunedOptions.heartbeat =
negotiate(fields.heartbeat, allFields.heartbeat);
try {
send(defs.ConnectionTuneOk);
send(defs.ConnectionOpen);
} catch (err) {
bail(err);
return;
}
expect(defs.ConnectionOpenOk, onOpenOk);
break;
default:
bail(new Error(
fmt("Expected connection.secure, connection.close, " +
"or connection.tune during handshake; got %s",
inspect(reply, false))));
break;
}
}
function onOpenOk (openOk) {
// Impose the maximum of the encoded value, if the negotiated
// value is zero, meaning "no, no limits"
self.channelMax = tunedOptions.channelMax || 0xffff;
self.frameMax = tunedOptions.frameMax || 0xffffffff;
// 0 means "no heartbeat", rather than "maximum period of
// heartbeating"
self.heartbeat = tunedOptions.heartbeat;
self.heartbeater = self.startHeartbeater();
self.accept = mainAccept;
succeed(openOk);
}
// If the server closes the connection, it's probably because of
// something we did
function endWhileOpening (err) {
bail(err || new Error('Socket closed abruptly ' +
'during opening handshake'));
}
this.stream.on('end', endWhileOpening);
this.stream.on('error', endWhileOpening);
function succeed (ok) {
self.stream.removeListener('end', endWhileOpening);
self.stream.removeListener('error', endWhileOpening);
self.stream.on('error', self.onSocketError.bind(self));
self.stream.on('end', self.onSocketError.bind(
self, new Error('Unexpected close')));
self.on('frameError', self.onSocketError.bind(self));
self.acceptLoop();
openCallback(null, ok);
}
// Now kick off the handshake by prompting the server
this.sendProtocolHeader();
expect(defs.ConnectionStart, onStart);
}
function onOpenOk(openOk) {
// Impose the maximum of the encoded value, if the negotiated
// value is zero, meaning "no, no limits"
self.channelMax = tunedOptions.channelMax || 0xffff;
self.frameMax = tunedOptions.frameMax || 0xffffffff;
// 0 means "no heartbeat", rather than "maximum period of
// heartbeating"
self.heartbeat = tunedOptions.heartbeat;
self.heartbeater = self.startHeartbeater();
self.accept = mainAccept;
succeed(openOk);
// Closing things: AMQP has a closing handshake that applies to
// closing both connects and channels. As the initiating party, I send
// Close, then ignore all frames until I see either CloseOK --
// which signifies that the other party has seen the Close and shut
// the connection or channel down, so it's fine to free resources; or
// Close, which means the other party also wanted to close the
// whatever, and I should send CloseOk so it can free resources,
// then go back to waiting for the CloseOk. If I receive a Close
// out of the blue, I should throw away any unsent frames (they will
// be ignored anyway) and send CloseOk, then clean up resources. In
// general, Close out of the blue signals an error (or a forced
// closure, which may as well be an error).
//
// RUNNING [1] --- send Close ---> Closing [2] ---> recv Close --+
// | | [3]
// | +------ send CloseOk ------+
// recv Close recv CloseOk
// | |
// V V
// Ended [4] ---- send CloseOk ---> Closed [5]
//
// [1] All frames accepted; getting a Close frame from the server
// moves to Ended; client may initiate a close by sending Close
// itself.
// [2] Client has initiated a close; only CloseOk or (simulataneously
// sent) Close is accepted.
// [3] Simultaneous close
// [4] Server won't send any more frames; accept no more frames, send
// CloseOk.
// [5] Fully closed, client will send no more, server will send no
// more. Signal 'close' or 'error'.
//
// There are two signalling mechanisms used in the API. The first is
// that calling `close` will return a promise, that will either
// resolve once the connection or channel is cleanly shut down, or
// will reject if the shutdown times out.
//
// The second is the 'close' and 'error' events. These are
// emitted as above. The events will fire *before* promises are
// resolved.
// Close the connection without even giving a reason. Typical.
close (closeCallback) {
var k = closeCallback && function () { closeCallback(null); };
this.closeBecause("Cheers, thanks", constants.REPLY_SUCCESS, k);
}
// If the server closes the connection, it's probably because of
// something we did
function endWhileOpening(err) {
bail(err || new Error('Socket closed abruptly ' +
'during opening handshake'));
// Close with a reason and a 'code'. I'm pretty sure RabbitMQ totally
// ignores these; maybe it logs them. The continuation will be invoked
// when the CloseOk has been received, and before the 'close' event.
closeBecause (reason, code, k) {
this.sendMethod(0, defs.ConnectionClose, {
replyText: reason,
replyCode: code,
methodId: 0, classId: 0
});
var s = stackCapture('closeBecause called: ' + reason);
this.toClosing(s, k);
}
this.stream.on('end', endWhileOpening);
this.stream.on('error', endWhileOpening);
closeWithError (reason, code, error) {
this.emit('error', error);
this.closeBecause(reason, code);
}
function succeed(ok) {
self.stream.removeListener('end', endWhileOpening);
self.stream.removeListener('error', endWhileOpening);
self.stream.on('error', self.onSocketError.bind(self));
self.stream.on('end', self.onSocketError.bind(
self, new Error('Unexpected close')));
self.on('frameError', self.onSocketError.bind(self));
self.acceptLoop();
openCallback(null, ok);
onSocketError (err) {
if (!this.expectSocketClose) {
// forestall any more calls to onSocketError, since we're signed
// up for `'error'` *and* `'end'`
this.expectSocketClose = true;
this.emit('error', err);
var s = stackCapture('Socket error');
this.toClosed(s, err);
}
}
// Now kick off the handshake by prompting the server
this.sendProtocolHeader();
expect(defs.ConnectionStart, onStart);
};
// A close has been initiated. Repeat: a close has been initiated.
// This means we should not send more frames, anyway they will be
// ignored. We also have to shut down all the channels.
toClosing (capturedStack, k) {
var send = this.sendMethod.bind(this);
// Closing things: AMQP has a closing handshake that applies to
// closing both connects and channels. As the initiating party, I send
// Close, then ignore all frames until I see either CloseOK --
// which signifies that the other party has seen the Close and shut
// the connection or channel down, so it's fine to free resources; or
// Close, which means the other party also wanted to close the
// whatever, and I should send CloseOk so it can free resources,
// then go back to waiting for the CloseOk. If I receive a Close
// out of the blue, I should throw away any unsent frames (they will
// be ignored anyway) and send CloseOk, then clean up resources. In
// general, Close out of the blue signals an error (or a forced
// closure, which may as well be an error).
//
// RUNNING [1] --- send Close ---> Closing [2] ---> recv Close --+
// | | [3]
// | +------ send CloseOk ------+
// recv Close recv CloseOk
// | |
// V V
// Ended [4] ---- send CloseOk ---> Closed [5]
//
// [1] All frames accepted; getting a Close frame from the server
// moves to Ended; client may initiate a close by sending Close
// itself.
// [2] Client has initiated a close; only CloseOk or (simulataneously
// sent) Close is accepted.
// [3] Simultaneous close
// [4] Server won't send any more frames; accept no more frames, send
// CloseOk.
// [5] Fully closed, client will send no more, server will send no
// more. Signal 'close' or 'error'.
//
// There are two signalling mechanisms used in the API. The first is
// that calling `close` will return a promise, that will either
// resolve once the connection or channel is cleanly shut down, or
// will reject if the shutdown times out.
//
// The second is the 'close' and 'error' events. These are
// emitted as above. The events will fire *before* promises are
// resolved.
this.accept = function (f) {
if (f.id === defs.ConnectionCloseOk) {
if (k)
k();
var s = stackCapture('ConnectionCloseOk received');
this.toClosed(s, undefined);
}
else if (f.id === defs.ConnectionClose) {
send(0, defs.ConnectionCloseOk, {});
}
// else ignore frame
};
invalidateSend(this, 'Connection closing', capturedStack);
}
// Close the connection without even giving a reason. Typical.
C.close = function(closeCallback) {
var k = closeCallback && function() { closeCallback(null); };
this.closeBecause("Cheers, thanks", constants.REPLY_SUCCESS, k);
};
_closeChannels (capturedStack) {
for (var i = 1; i < this.channels.length; i++) {
var ch = this.channels[i];
if (ch !== null) {
ch.channel.toClosed(capturedStack); // %%% or with an error? not clear
}
}
}
// Close with a reason and a 'code'. I'm pretty sure RabbitMQ totally
// ignores these; maybe it logs them. The continuation will be invoked
// when the CloseOk has been received, and before the 'close' event.
C.closeBecause = function(reason, code, k) {
this.sendMethod(0, defs.ConnectionClose, {
replyText: reason,
replyCode: code,
methodId: 0, classId: 0
});
var s = stackCapture('closeBecause called: ' + reason);
this.toClosing(s, k);
};
C.closeWithError = function(reason, code, error) {
this.emit('error', error);
this.closeBecause(reason, code);
};
C.onSocketError = function(err) {
if (!this.expectSocketClose) {
// forestall any more calls to onSocketError, since we're signed
// up for `'error'` *and* `'end'`
// A close has been confirmed. Cease all communication.
toClosed (capturedStack, maybeErr) {
this._closeChannels(capturedStack);
var info = fmt('Connection closed (%s)',
(maybeErr) ? maybeErr.toString() : 'by client');
// Tidy up, invalidate enverything, dynamite the bridges.
invalidateSend(this, info, capturedStack);
this.accept = invalidOp(info, capturedStack);
this.close = function (cb) {
cb && cb(new IllegalOperationError(info, capturedStack));
};
if (this.heartbeater)
this.heartbeater.clear();
// This is certainly true now, if it wasn't before
this.expectSocketClose = true;
this.emit('error', err);
var s = stackCapture('Socket error');
this.toClosed(s, err);
this.stream.end();
this.emit('close', maybeErr);
}
};
function invalidOp(msg, stack) {
return function() {
throw new IllegalOperationError(msg, stack);
};
}
_updateSecret(newSecret, reason, cb) {
this.sendMethod(0, defs.ConnectionUpdateSecret, {
newSecret,
reason
});
this.once('update-secret-ok', cb);
}
function invalidateSend(conn, msg, stack) {
conn.sendMethod = conn.sendContent = conn.sendMessage =
invalidOp(msg, stack);
}
// A close has been initiated. Repeat: a close has been initiated.
// This means we should not send more frames, anyway they will be
// ignored. We also have to shut down all the channels.
C.toClosing = function(capturedStack, k) {
var send = this.sendMethod.bind(this);
this.accept = function(f) {
if (f.id === defs.ConnectionCloseOk) {
if (k) k();
var s = stackCapture('ConnectionCloseOk received');
this.toClosed(s, undefined);
// ===
startHeartbeater () {
if (this.heartbeat === 0)
return null;
else {
var self = this;
var hb = new Heart(this.heartbeat,
this.checkSend.bind(this),
this.checkRecv.bind(this));
hb.on('timeout', function () {
var hberr = new Error("Heartbeat timeout");
self.emit('error', hberr);
var s = stackCapture('Heartbeat timeout');
self.toClosed(s, hberr);
});
hb.on('beat', function () {
self.sendHeartbeat();
});
return hb;
}
else if (f.id === defs.ConnectionClose) {
send(0, defs.ConnectionCloseOk, {});
}
// else ignore frame
};
invalidateSend(this, 'Connection closing', capturedStack);
};
C._closeChannels = function(capturedStack) {
for (var i = 1; i < this.channels.length; i++) {
var ch = this.channels[i];
if (ch !== null) {
ch.channel.toClosed(capturedStack); // %%% or with an error? not clear
}
}
};
// A close has been confirmed. Cease all communication.
C.toClosed = function(capturedStack, maybeErr) {
this._closeChannels(capturedStack);
var info = fmt('Connection closed (%s)',
(maybeErr) ? maybeErr.toString() : 'by client');
// Tidy up, invalidate enverything, dynamite the bridges.
invalidateSend(this, info, capturedStack);
this.accept = invalidOp(info, capturedStack);
this.close = function(cb) {
cb && cb(new IllegalOperationError(info, capturedStack));
};
if (this.heartbeater) this.heartbeater.clear();
// This is certainly true now, if it wasn't before
this.expectSocketClose = true;
this.stream.end();
this.emit('close', maybeErr);
};
// I use an array to keep track of the channels, rather than an
// object. The channel identifiers are numbers, and allocated by the
// connection. If I try to allocate low numbers when they are
// available (which I do, by looking from the start of the bitset),
// this ought to keep the array small, and out of 'sparse array
// storage'. I also set entries to null, rather than deleting them, in
// the expectation that the next channel allocation will fill the slot
// again rather than growing the array. See
// http://www.html5rocks.com/en/tutorials/speed/v8/
freshChannel (channel, options) {
var next = this.freeChannels.nextClearBit(1);
if (next < 0 || next > this.channelMax)
throw new Error("No channels left to allocate");
this.freeChannels.set(next);
// ===
C.startHeartbeater = function() {
if (this.heartbeat === 0) return null;
else {
var self = this;
var hb = new Heart(this.heartbeat,
this.checkSend.bind(this),
this.checkRecv.bind(this));
hb.on('timeout', function() {
var hberr = new Error("Heartbeat timeout");
self.emit('error', hberr);
var s = stackCapture('Heartbeat timeout');
self.toClosed(s, hberr);
var hwm = (options && options.highWaterMark) || DEFAULT_WRITE_HWM;
var writeBuffer = new PassThrough({
objectMode: true, highWaterMark: hwm
});
hb.on('beat', function() {
self.sendHeartbeat();
this.channels[next] = { channel: channel, buffer: writeBuffer };
writeBuffer.on('drain', function () {
channel.onBufferDrain();
});
return hb;
this.muxer.pipeFrom(writeBuffer);
return next;
}
};
// I use an array to keep track of the channels, rather than an
// object. The channel identifiers are numbers, and allocated by the
// connection. If I try to allocate low numbers when they are
// available (which I do, by looking from the start of the bitset),
// this ought to keep the array small, and out of 'sparse array
// storage'. I also set entries to null, rather than deleting them, in
// the expectation that the next channel allocation will fill the slot
// again rather than growing the array. See
// http://www.html5rocks.com/en/tutorials/speed/v8/
C.freshChannel = function(channel, options) {
var next = this.freeChannels.nextClearBit(1);
if (next < 0 || next > this.channelMax)
throw new Error("No channels left to allocate");
this.freeChannels.set(next);
releaseChannel (channel) {
this.freeChannels.clear(channel);
var buffer = this.channels[channel].buffer;
buffer.end(); // will also cause it to be unpiped
this.channels[channel] = null;
}
var hwm = (options && options.highWaterMark) || DEFAULT_WRITE_HWM;
var writeBuffer = new PassThrough({
objectMode: true, highWaterMark: hwm
});
this.channels[next] = {channel: channel, buffer: writeBuffer};
writeBuffer.on('drain', function() {
channel.onBufferDrain();
});
this.muxer.pipeFrom(writeBuffer);
return next;
};
acceptLoop () {
var self = this;
C.releaseChannel = function(channel) {
this.freeChannels.clear(channel);
var buffer = this.channels[channel].buffer;
buffer.end(); // will also cause it to be unpiped
this.channels[channel] = null;
};
C.acceptLoop = function() {
var self = this;
function go() {
try {
var f; while (f = self.recvFrame()) self.accept(f);
function go () {
try {
var f; while (f = self.recvFrame())
self.accept(f);
}
catch (e) {
self.emit('frameError', e);
}
}
catch (e) {
self.emit('frameError', e);
}
self.stream.on('readable', go);
go();
}
self.stream.on('readable', go);
go();
};
C.step = function(cb) {
var self = this;
function recv() {
var f;
try {
f = self.recvFrame();
step (cb) {
var self = this;
function recv () {
var f;
try {
f = self.recvFrame();
}
catch (e) {
cb(e, null);
return;
}
if (f)
cb(null, f);
else
self.stream.once('readable', recv);
}
catch (e) {
cb(e, null);
return;
}
if (f) cb(null, f);
else self.stream.once('readable', recv);
recv();
}
recv();
};
C.checkSend = function() {
var check = this.sentSinceLastCheck;
this.sentSinceLastCheck = false;
return check;
}
checkSend () {
var check = this.sentSinceLastCheck;
this.sentSinceLastCheck = false;
return check;
}
C.checkRecv = function() {
var check = this.recvSinceLastCheck;
this.recvSinceLastCheck = false;
return check;
}
checkRecv () {
var check = this.recvSinceLastCheck;
this.recvSinceLastCheck = false;
return check;
}
C.sendBytes = function(bytes) {
this.sentSinceLastCheck = true;
this.stream.write(bytes);
};
sendBytes (bytes) {
this.sentSinceLastCheck = true;
this.stream.write(bytes);
}
C.sendHeartbeat = function() {
return this.sendBytes(frame.HEARTBEAT_BUF);
};
sendHeartbeat () {
return this.sendBytes(frame.HEARTBEAT_BUF);
}
var encodeMethod = defs.encodeMethod;
var encodeProperties = defs.encodeProperties;
sendMethod (channel, Method, fields) {
var frame = encodeMethod(Method, channel, fields);
this.sentSinceLastCheck = true;
var buffer = this.channels[channel].buffer;
return buffer.write(frame);
}
C.sendMethod = function(channel, Method, fields) {
var frame = encodeMethod(Method, channel, fields);
this.sentSinceLastCheck = true;
var buffer = this.channels[channel].buffer;
return buffer.write(frame);
};
sendMessage (channel, Method, fields, Properties, props, content) {
if (!Buffer.isBuffer(content))
throw new TypeError('content is not a buffer');
C.sendMessage = function(channel,
Method, fields,
Properties, props,
content) {
if (!Buffer.isBuffer(content))
throw new TypeError('content is not a buffer');
var mframe = encodeMethod(Method, channel, fields);
var pframe = encodeProperties(Properties, channel,
content.length, props);
var buffer = this.channels[channel].buffer;
this.sentSinceLastCheck = true;
var mframe = encodeMethod(Method, channel, fields);
var pframe = encodeProperties(Properties, channel,
content.length, props);
var buffer = this.channels[channel].buffer;
this.sentSinceLastCheck = true;
var methodHeaderLen = mframe.length + pframe.length;
var bodyLen = (content.length > 0) ?
content.length + FRAME_OVERHEAD : 0;
var allLen = methodHeaderLen + bodyLen;
var methodHeaderLen = mframe.length + pframe.length;
var bodyLen = (content.length > 0) ?
content.length + FRAME_OVERHEAD : 0;
var allLen = methodHeaderLen + bodyLen;
if (allLen < SINGLE_CHUNK_THRESHOLD) {
// Use `allocUnsafe` to avoid excessive allocations and CPU usage
// from zeroing. The returned Buffer is not zeroed and so must be
// completely filled to be used safely.
// See https://github.com/amqp-node/amqplib/pull/695
var all = Buffer.allocUnsafe(allLen);
var offset = mframe.copy(all, 0);
offset += pframe.copy(all, offset);
if (bodyLen > 0)
makeBodyFrame(channel, content).copy(all, offset);
return buffer.write(all);
}
else {
if (methodHeaderLen < SINGLE_CHUNK_THRESHOLD) {
if (allLen < SINGLE_CHUNK_THRESHOLD) {
// Use `allocUnsafe` to avoid excessive allocations and CPU usage

@@ -585,61 +515,146 @@ // from zeroing. The returned Buffer is not zeroed and so must be

// See https://github.com/amqp-node/amqplib/pull/695
var both = Buffer.allocUnsafe(methodHeaderLen);
var offset = mframe.copy(both, 0);
pframe.copy(both, offset);
buffer.write(both);
var all = Buffer.allocUnsafe(allLen);
var offset = mframe.copy(all, 0);
offset += pframe.copy(all, offset);
if (bodyLen > 0)
makeBodyFrame(channel, content).copy(all, offset);
return buffer.write(all);
}
else {
buffer.write(mframe);
buffer.write(pframe);
if (methodHeaderLen < SINGLE_CHUNK_THRESHOLD) {
// Use `allocUnsafe` to avoid excessive allocations and CPU usage
// from zeroing. The returned Buffer is not zeroed and so must be
// completely filled to be used safely.
// See https://github.com/amqp-node/amqplib/pull/695
var both = Buffer.allocUnsafe(methodHeaderLen);
var offset = mframe.copy(both, 0);
pframe.copy(both, offset);
buffer.write(both);
}
else {
buffer.write(mframe);
buffer.write(pframe);
}
return this.sendContent(channel, content);
}
return this.sendContent(channel, content);
}
};
var FRAME_OVERHEAD = defs.FRAME_OVERHEAD;
var makeBodyFrame = frame.makeBodyFrame;
sendContent (channel, body) {
if (!Buffer.isBuffer(body)) {
throw new TypeError(fmt("Expected buffer; got %s", body));
}
var writeResult = true;
var buffer = this.channels[channel].buffer;
C.sendContent = function(channel, body) {
if (!Buffer.isBuffer(body)) {
throw new TypeError(fmt("Expected buffer; got %s", body));
var maxBody = this.frameMax - FRAME_OVERHEAD;
for (var offset = 0; offset < body.length; offset += maxBody) {
var end = offset + maxBody;
var slice = (end > body.length) ? body.subarray(offset) : body.subarray(offset, end);
var bodyFrame = makeBodyFrame(channel, slice);
writeResult = buffer.write(bodyFrame);
}
this.sentSinceLastCheck = true;
return writeResult;
}
var writeResult = true;
var buffer = this.channels[channel].buffer;
var maxBody = this.frameMax - FRAME_OVERHEAD;
recvFrame () {
// %%% identifying invariants might help here?
var frame = parseFrame(this.rest, this.frameMax);
for (var offset = 0; offset < body.length; offset += maxBody) {
var end = offset + maxBody;
var slice = (end > body.length) ? body.slice(offset) : body.slice(offset, end);
var bodyFrame = makeBodyFrame(channel, slice);
writeResult = buffer.write(bodyFrame);
if (!frame) {
var incoming = this.stream.read();
if (incoming === null) {
return false;
}
else {
this.recvSinceLastCheck = true;
this.rest = Buffer.concat([this.rest, incoming]);
return this.recvFrame();
}
}
else {
this.rest = frame.rest;
return decodeFrame(frame);
}
}
this.sentSinceLastCheck = true;
return writeResult;
};
}
var parseFrame = frame.parseFrame;
var decodeFrame = frame.decodeFrame;
// Usual frame accept mode
function mainAccept(frame) {
var rec = this.channels[frame.channel];
if (rec) { return rec.channel.accept(frame); }
// NB CHANNEL_ERROR may not be right, but I don't know what is ..
else
this.closeWithError(
fmt('Frame on unknown channel %d', frame.channel),
constants.CHANNEL_ERROR,
new Error(fmt("Frame on unknown channel: %s",
inspect(frame, false))));
}
C.recvFrame = function() {
// %%% identifying invariants might help here?
var frame = parseFrame(this.rest, this.frameMax);
if (!frame) {
var incoming = this.stream.read();
if (incoming === null) {
return false;
// Handle anything that comes through on channel 0, that's the
// connection control channel. This is only used once mainAccept is
// installed as the frame handler, after the opening handshake.
function channel0(connection) {
return function(f) {
// Once we get a 'close', we know 1. we'll get no more frames, and
// 2. anything we send except close, or close-ok, will be
// ignored. If we already sent 'close', this won't be invoked since
// we're already in closing mode; if we didn't well we're not going
// to send it now are we.
if (f === HEARTBEAT); // ignore; it's already counted as activity
// on the socket, which is its purpose
else if (f.id === defs.ConnectionClose) {
// Oh. OK. I guess we're done here then.
connection.sendMethod(0, defs.ConnectionCloseOk, {});
var emsg = fmt('Connection closed: %s', closeMsg(f));
var s = stackCapture(emsg);
var e = new Error(emsg);
e.code = f.fields.replyCode;
if (isFatalError(e)) {
connection.emit('error', e);
}
connection.toClosed(s, e);
}
else if (f.id === defs.ConnectionBlocked) {
connection.emit('blocked', f.fields.reason);
}
else if (f.id === defs.ConnectionUnblocked) {
connection.emit('unblocked');
}
else if (f.id === defs.ConnectionUpdateSecretOk) {
connection.emit('update-secret-ok');
}
else {
this.recvSinceLastCheck = true;
this.rest = Buffer.concat([this.rest, incoming]);
return this.recvFrame();
connection.closeWithError(
fmt("Unexpected frame on channel 0"),
constants.UNEXPECTED_FRAME,
new Error(fmt("Unexpected frame on channel 0: %s",
inspect(f, false))));
}
}
else {
this.rest = frame.rest;
return decodeFrame(frame);
}
};
};
}
function invalidOp(msg, stack) {
return function() {
throw new IllegalOperationError(msg, stack);
};
}
function invalidateSend(conn, msg, stack) {
conn.sendMethod = conn.sendContent = conn.sendMessage =
invalidOp(msg, stack);
}
var encodeMethod = defs.encodeMethod;
var encodeProperties = defs.encodeProperties;
var FRAME_OVERHEAD = defs.FRAME_OVERHEAD;
var makeBodyFrame = frame.makeBodyFrame;
var parseFrame = frame.parseFrame;
var decodeFrame = frame.decodeFrame;
function wrapStream(s) {

@@ -646,0 +661,0 @@ if (s instanceof Duplex) return s;

@@ -30,3 +30,3 @@ //

const size = codec.encodeTable(buffer, { LOGIN: user, PASSWORD: passwd}, 0);
return buffer.slice(4, size);
return buffer.subarray(4, size);
},

@@ -33,0 +33,0 @@ username: user,

@@ -11,3 +11,2 @@ //

var format = require('util').format;
var inherits = require('util').inherits;
var HEARTBEAT = require('./frame').HEARTBEAT;

@@ -14,0 +13,0 @@

@@ -50,4 +50,3 @@ //

var inherits = require('util').inherits;
var EventEmitter = require('events').EventEmitter;
var EventEmitter = require('events');

@@ -57,35 +56,39 @@ // Exported so that we can mess with it in tests

function Heart(interval, checkSend, checkRecv) {
EventEmitter.call(this);
this.interval = interval;
class Heart extends EventEmitter {
constructor (interval, checkSend, checkRecv) {
super();
var intervalMs = interval * module.exports.UNITS_TO_MS;
// Function#bind is my new best friend
var beat = this.emit.bind(this, 'beat');
var timeout = this.emit.bind(this, 'timeout');
this.interval = interval;
this.sendTimer = setInterval(
this.runHeartbeat.bind(this, checkSend, beat), intervalMs / 2);
var intervalMs = interval * module.exports.UNITS_TO_MS;
// Function#bind is my new best friend
var beat = this.emit.bind(this, 'beat');
var timeout = this.emit.bind(this, 'timeout');
// A timeout occurs if I see nothing for *two consecutive* intervals
var recvMissed = 0;
function missedTwo() {
if (!checkRecv()) return (++recvMissed < 2);
else { recvMissed = 0; return true; }
this.sendTimer = setInterval(
this.runHeartbeat.bind(this, checkSend, beat), intervalMs / 2);
// A timeout occurs if I see nothing for *two consecutive* intervals
var recvMissed = 0;
function missedTwo () {
if (!checkRecv())
return (++recvMissed < 2);
else { recvMissed = 0; return true; }
}
this.recvTimer = setInterval(
this.runHeartbeat.bind(this, missedTwo, timeout), intervalMs);
}
this.recvTimer = setInterval(
this.runHeartbeat.bind(this, missedTwo, timeout), intervalMs);
clear () {
clearInterval(this.sendTimer);
clearInterval(this.recvTimer);
}
runHeartbeat (check, fail) {
// Have we seen activity?
if (!check())
fail();
}
}
inherits(Heart, EventEmitter);
module.exports.Heart = Heart;
Heart.prototype.clear = function() {
clearInterval(this.sendTimer);
clearInterval(this.recvTimer);
};
Heart.prototype.runHeartbeat = function(check, fail) {
// Have we seen activity?
if (!check()) fail();
};

@@ -16,115 +16,112 @@ //

function Mux(downstream) {
this.newStreams = [];
this.oldStreams = [];
this.blocked = false;
this.scheduledRead = false;
class Mux {
constructor (downstream) {
this.newStreams = [];
this.oldStreams = [];
this.blocked = false;
this.scheduledRead = false;
this.out = downstream;
var self = this;
downstream.on('drain', function() {
self.blocked = false;
self._readIncoming();
});
}
this.out = downstream;
var self = this;
downstream.on('drain', function () {
self.blocked = false;
self._readIncoming();
});
}
// There are 2 states we can be in:
// There are 2 states we can be in:
// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,
// - no packets to send, waiting for an inbound buffer to have
// packets, which will be signalled by a 'readable' event
// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).
_readIncoming () {
// - waiting for outbound capacity, which will be signalled by a
// - 'drain' event on the downstream; or,
// We may be sent here speculatively, if an incoming stream has
// become readable
if (this.blocked) return;
// - no packets to send, waiting for an inbound buffer to have
// packets, which will be signalled by a 'readable' event
var accepting = true;
var out = this.out;
// If we write all packets available whenever there is outbound
// capacity, we will either run out of outbound capacity (`#write`
// returns false), or run out of packets (all calls to an
// `inbound.read()` have returned null).
// Try to read a chunk from each stream in turn, until all streams
// are empty, or we exhaust our ability to accept chunks.
function roundrobin (streams) {
var s;
while (accepting && (s = streams.shift())) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
streams.push(s);
}
}
}
Mux.prototype._readIncoming = function() {
roundrobin(this.newStreams);
// We may be sent here speculatively, if an incoming stream has
// become readable
if (this.blocked) return;
var accepting = true;
var out = this.out;
// Try to read a chunk from each stream in turn, until all streams
// are empty, or we exhaust our ability to accept chunks.
function roundrobin(streams) {
var s;
while (accepting && (s = streams.shift())) {
var chunk = s.read();
if (chunk !== null) {
accepting = out.write(chunk);
streams.push(s);
}
// Either we exhausted the new queues, or we ran out of capacity. If
// we ran out of capacity, all the remaining new streams (i.e.,
// those with packets left) become old streams. This effectively
// prioritises streams that keep their buffers close to empty over
// those that are constantly near full.
if (accepting) { // all new queues are exhausted, write as many as
// we can from the old streams
assert.equal(0, this.newStreams.length);
roundrobin(this.oldStreams);
}
else { // ran out of room
assert(this.newStreams.length > 0, "Expect some new streams to remain");
Array.prototype.push.apply(this.oldStreams, this.newStreams);
this.newStreams = [];
}
// We may have exhausted all the old queues, or run out of room;
// either way, all we need to do is record whether we have capacity
// or not, so any speculative reads will know
this.blocked = !accepting;
}
roundrobin(this.newStreams);
_scheduleRead () {
var self = this;
// Either we exhausted the new queues, or we ran out of capacity. If
// we ran out of capacity, all the remaining new streams (i.e.,
// those with packets left) become old streams. This effectively
// prioritises streams that keep their buffers close to empty over
// those that are constantly near full.
if (accepting) { // all new queues are exhausted, write as many as
// we can from the old streams
assert.equal(0, this.newStreams.length);
roundrobin(this.oldStreams);
if (!self.scheduledRead) {
schedule(function () {
self.scheduledRead = false;
self._readIncoming();
});
self.scheduledRead = true;
}
}
else { // ran out of room
assert(this.newStreams.length > 0, "Expect some new streams to remain");
Array.prototype.push.apply(this.oldStreams, this.newStreams);
this.newStreams = [];
}
// We may have exhausted all the old queues, or run out of room;
// either way, all we need to do is record whether we have capacity
// or not, so any speculative reads will know
this.blocked = !accepting;
};
Mux.prototype._scheduleRead = function() {
var self = this;
pipeFrom (readable) {
var self = this;
if (!self.scheduledRead) {
schedule(function() {
self.scheduledRead = false;
self._readIncoming();
});
self.scheduledRead = true;
}
};
function enqueue () {
self.newStreams.push(readable);
self._scheduleRead();
}
Mux.prototype.pipeFrom = function(readable) {
var self = this;
function cleanup () {
readable.removeListener('readable', enqueue);
readable.removeListener('error', cleanup);
readable.removeListener('end', cleanup);
readable.removeListener('unpipeFrom', cleanupIfMe);
}
function cleanupIfMe (dest) {
if (dest === self) cleanup();
}
function enqueue() {
self.newStreams.push(readable);
self._scheduleRead();
readable.on('unpipeFrom', cleanupIfMe);
readable.on('end', cleanup);
readable.on('error', cleanup);
readable.on('readable', enqueue);
}
function cleanup() {
readable.removeListener('readable', enqueue);
readable.removeListener('error', cleanup);
readable.removeListener('end', cleanup);
readable.removeListener('unpipeFrom', cleanupIfMe);
unpipeFrom (readable) {
readable.emit('unpipeFrom', this);
}
function cleanupIfMe(dest) {
if (dest === self) cleanup();
}
}
readable.on('unpipeFrom', cleanupIfMe);
readable.on('end', cleanup);
readable.on('error', cleanup);
readable.on('readable', enqueue);
};
Mux.prototype.unpipeFrom = function(readable) {
readable.emit('unpipeFrom', this);
};
module.exports.Mux = Mux;

@@ -5,3 +5,3 @@ {

"main": "./channel_api.js",
"version": "0.10.3",
"version": "0.10.4",
"description": "An AMQP 0-9-1 (e.g., RabbitMQ) library and client.",

@@ -8,0 +8,0 @@ "repository": {

@@ -90,3 +90,3 @@ # AMQP 0-9-1 library and client for Node.JS

if (msg !== null) {
console.log('Recieved:', msg.content.toString());
console.log('Received:', msg.content.toString());
ch1.ack(msg);

@@ -93,0 +93,0 @@ } else {

@@ -60,6 +60,14 @@ 'use strict';

test('at all', function(done) {
connect(doneCallback(done));
test('at all', function(done) {
connect(doneCallback(done));
});
});
suite('updateSecret', function() {
test('updateSecret', function(done) {
connect(kCallback(function(c) {
c.updateSecret(Buffer.from('new secret'), 'no reason', doneCallback(done));
}));
});
});

@@ -83,9 +91,9 @@

channel_test('at all', function(ch, done) {
done();
});
channel_test('at all', function(ch, done) {
done();
});
channel_test('open and close', function(ch, done) {
ch.close(doneCallback(done));
});
channel_test('open and close', function(ch, done) {
ch.close(doneCallback(done));
});

@@ -96,30 +104,30 @@ });

channel_test('assert, check, delete queue', function(ch, done) {
ch.assertQueue('test.cb.queue', {}, kCallback(function(q) {
ch.checkQueue('test.cb.queue', kCallback(function(ok) {
ch.deleteQueue('test.cb.queue', {}, doneCallback(done));
channel_test('assert, check, delete queue', function(ch, done) {
ch.assertQueue('test.cb.queue', {}, kCallback(function(q) {
ch.checkQueue('test.cb.queue', kCallback(function(ok) {
ch.deleteQueue('test.cb.queue', {}, doneCallback(done));
}, done));
}, done));
}, done));
});
});
channel_test('assert, check, delete exchange', function(ch, done) {
ch.assertExchange(
'test.cb.exchange', 'topic', {}, kCallback(function(ex) {
ch.checkExchange('test.cb.exchange', kCallback(function(ok) {
ch.deleteExchange('test.cb.exchange', {}, doneCallback(done));
channel_test('assert, check, delete exchange', function(ch, done) {
ch.assertExchange(
'test.cb.exchange', 'topic', {}, kCallback(function(ex) {
ch.checkExchange('test.cb.exchange', kCallback(function(ok) {
ch.deleteExchange('test.cb.exchange', {}, doneCallback(done));
}, done));
}, done));
}, done));
});
});
channel_test('fail on check non-queue', function(ch, done) {
var both = twice(done);
ch.on('error', failCallback(both.first));
ch.checkQueue('test.cb.nothere', failCallback(both.second));
});
channel_test('fail on check non-queue', function(ch, done) {
var both = twice(done);
ch.on('error', failCallback(both.first));
ch.checkQueue('test.cb.nothere', failCallback(both.second));
});
channel_test('fail on check non-exchange', function(ch, done) {
var both = twice(done);
ch.on('error', failCallback(both.first));
ch.checkExchange('test.cb.nothere', failCallback(both.second));
});
channel_test('fail on check non-exchange', function(ch, done) {
var both = twice(done);
ch.on('error', failCallback(both.first));
ch.checkExchange('test.cb.nothere', failCallback(both.second));
});

@@ -130,24 +138,24 @@ });

channel_test('bind queue', function(ch, done) {
ch.assertQueue('test.cb.bindq', {}, kCallback(function(q) {
ch.assertExchange(
'test.cb.bindex', 'fanout', {}, kCallback(function(ex) {
ch.bindQueue(q.queue, ex.exchange, '', {},
doneCallback(done));
}, done));
}, done));
});
channel_test('bind exchange', function(ch, done) {
ch.assertExchange(
'test.cb.bindex1', 'fanout', {}, kCallback(function(ex1) {
channel_test('bind queue', function(ch, done) {
ch.assertQueue('test.cb.bindq', {}, kCallback(function(q) {
ch.assertExchange(
'test.cb.bindex2', 'fanout', {}, kCallback(function(ex2) {
ch.bindExchange(ex1.exchange,
ex2.exchange, '', {},
doneCallback(done));
'test.cb.bindex', 'fanout', {}, kCallback(function(ex) {
ch.bindQueue(q.queue, ex.exchange, '', {},
doneCallback(done));
}, done));
}, done));
});
});
channel_test('bind exchange', function(ch, done) {
ch.assertExchange(
'test.cb.bindex1', 'fanout', {}, kCallback(function(ex1) {
ch.assertExchange(
'test.cb.bindex2', 'fanout', {}, kCallback(function(ex2) {
ch.bindExchange(ex1.exchange,
ex2.exchange, '', {},
doneCallback(done));
}, done));
}, done));
});
});

@@ -157,53 +165,53 @@

channel_test('send to queue and consume noAck', function(ch, done) {
var msg = randomString();
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
ch.consume(q.queue, function(m) {
if (m.content.toString() == msg) done();
else done(new Error("message content doesn't match:" +
msg + " =/= " + m.content.toString()));
}, {noAck: true, exclusive: true});
ch.sendToQueue(q.queue, Buffer.from(msg));
channel_test('send to queue and consume noAck', function(ch, done) {
var msg = randomString();
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
ch.consume(q.queue, function(m) {
if (m.content.toString() == msg) done();
else done(new Error("message content doesn't match:" +
msg + " =/= " + m.content.toString()));
}, {noAck: true, exclusive: true});
ch.sendToQueue(q.queue, Buffer.from(msg));
});
});
});
channel_test('send to queue and consume ack', function(ch, done) {
var msg = randomString();
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
ch.consume(q.queue, function(m) {
if (m.content.toString() == msg) {
ch.ack(m);
done();
}
else done(new Error("message content doesn't match:" +
msg + " =/= " + m.content.toString()));
}, {noAck: false, exclusive: true});
ch.sendToQueue(q.queue, Buffer.from(msg));
channel_test('send to queue and consume ack', function(ch, done) {
var msg = randomString();
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e !== null) return done(e);
ch.consume(q.queue, function(m) {
if (m.content.toString() == msg) {
ch.ack(m);
done();
}
else done(new Error("message content doesn't match:" +
msg + " =/= " + m.content.toString()));
}, {noAck: false, exclusive: true});
ch.sendToQueue(q.queue, Buffer.from(msg));
});
});
});
channel_test('send to and get from queue', function(ch, done) {
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e != null) return done(e);
var msg = randomString();
ch.sendToQueue(q.queue, Buffer.from(msg));
waitForMessages(ch, q.queue, function(e, _) {
channel_test('send to and get from queue', function(ch, done) {
ch.assertQueue('', {exclusive: true}, function(e, q) {
if (e != null) return done(e);
ch.get(q.queue, {noAck: true}, function(e, m) {
if (e != null)
return done(e);
else if (!m)
return done(new Error('Empty (false) not expected'));
else if (m.content.toString() == msg)
return done();
else
return done(
new Error('Messages do not match: ' +
msg + ' =/= ' + m.content.toString()));
var msg = randomString();
ch.sendToQueue(q.queue, Buffer.from(msg));
waitForMessages(ch, q.queue, function(e, _) {
if (e != null) return done(e);
ch.get(q.queue, {noAck: true}, function(e, m) {
if (e != null)
return done(e);
else if (!m)
return done(new Error('Empty (false) not expected'));
else if (m.content.toString() == msg)
return done();
else
return done(
new Error('Messages do not match: ' +
msg + ' =/= ' + m.content.toString()));
});
});
});
});
});

@@ -214,15 +222,15 @@ });

confirm_channel_test('Receive confirmation', function(ch, done) {
// An unroutable message, on the basis that you're not allowed a
// queue with an empty name, and you can't make bindings to the
// default exchange. Tricky eh?
ch.publish('', '', Buffer.from('foo'), {}, done);
});
confirm_channel_test('Receive confirmation', function(ch, done) {
// An unroutable message, on the basis that you're not allowed a
// queue with an empty name, and you can't make bindings to the
// default exchange. Tricky eh?
ch.publish('', '', Buffer.from('foo'), {}, done);
});
confirm_channel_test('Wait for confirms', function(ch, done) {
for (var i=0; i < 1000; i++) {
ch.publish('', '', Buffer.from('foo'), {});
}
ch.waitForConfirms(done);
});
confirm_channel_test('Wait for confirms', function(ch, done) {
for (var i=0; i < 1000; i++) {
ch.publish('', '', Buffer.from('foo'), {});
}
ch.waitForConfirms(done);
});

@@ -233,87 +241,87 @@ });

/*
I don't like having to do this, but there appears to be something
broken about domains in Node.JS v0.8 and mocha. Apparently it has to
do with how mocha and domains hook into error propogation:
https://github.com/visionmedia/mocha/issues/513 (summary: domains in
Node.JS v0.8 don't prevent uncaughtException from firing, and that's
what mocha uses to detect .. an uncaught exception).
/*
I don't like having to do this, but there appears to be something
broken about domains in Node.JS v0.8 and mocha. Apparently it has to
do with how mocha and domains hook into error propogation:
https://github.com/visionmedia/mocha/issues/513 (summary: domains in
Node.JS v0.8 don't prevent uncaughtException from firing, and that's
what mocha uses to detect .. an uncaught exception).
Using domains with amqplib *does* work in practice in Node.JS v0.8:
that is, it's possible to throw an exception in a callback and deal
with it in the active domain, and thereby avoid it crashing the
program.
*/
if (util.versionGreaterThan(process.versions.node, '0.8')) {
test('Throw error in connection open callback', function(done) {
var dom = domain.createDomain();
dom.on('error', failCallback(done));
connect(dom.bind(function(err, conn) {
throw new Error('Spurious connection open callback error');
}));
});
}
Using domains with amqplib *does* work in practice in Node.JS v0.8:
that is, it's possible to throw an exception in a callback and deal
with it in the active domain, and thereby avoid it crashing the
program.
*/
if (util.versionGreaterThan(process.versions.node, '0.8')) {
test('Throw error in connection open callback', function(done) {
var dom = domain.createDomain();
dom.on('error', failCallback(done));
connect(dom.bind(function(err, conn) {
throw new Error('Spurious connection open callback error');
}));
});
}
// TODO: refactor {error_test, channel_test}
function error_test(name, fun) {
test(name, function(done) {
var dom = domain.createDomain();
dom.run(function() {
connect(kCallback(function(c) {
// Seems like there were some unironed wrinkles in 0.8's
// implementation of domains; explicitly adding the connection
// to the domain makes sure any exception thrown in the course
// of processing frames is handled by the domain. For other
// versions of Node.JS, this ends up being belt-and-braces.
dom.add(c);
c.createChannel(kCallback(function(ch) {
fun(ch, done, dom);
// TODO: refactor {error_test, channel_test}
function error_test(name, fun) {
test(name, function(done) {
var dom = domain.createDomain();
dom.run(function() {
connect(kCallback(function(c) {
// Seems like there were some unironed wrinkles in 0.8's
// implementation of domains; explicitly adding the connection
// to the domain makes sure any exception thrown in the course
// of processing frames is handled by the domain. For other
// versions of Node.JS, this ends up being belt-and-braces.
dom.add(c);
c.createChannel(kCallback(function(ch) {
fun(ch, done, dom);
}, done));
}, done));
}, done));
});
});
});
}
}
error_test('Channel open callback throws an error', function(ch, done, dom) {
dom.on('error', failCallback(done));
throw new Error('Error in open callback');
});
error_test('RPC callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.prefetch(0, false, function(err, ok) {
throw new Error('Spurious callback error');
error_test('Channel open callback throws an error', function(ch, done, dom) {
dom.on('error', failCallback(done));
throw new Error('Error in open callback');
});
});
error_test('Get callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.assertQueue('test.cb.get-with-error', {}, function(err, ok) {
ch.get('test.cb.get-with-error', {noAck: true}, function() {
error_test('RPC callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.prefetch(0, false, function(err, ok) {
throw new Error('Spurious callback error');
});
});
});
error_test('Consume callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.assertQueue('test.cb.consume-with-error', {}, function(err, ok) {
ch.consume('test.cb.consume-with-error', ignore, {noAck: true}, function() {
throw new Error('Spurious callback error');
error_test('Get callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.assertQueue('test.cb.get-with-error', {}, function(err, ok) {
ch.get('test.cb.get-with-error', {noAck: true}, function() {
throw new Error('Spurious callback error');
});
});
});
});
error_test('Get from non-queue invokes error k', function(ch, done, dom) {
var both = twice(failCallback(done));
dom.on('error', both.first);
ch.get('', {}, both.second);
});
error_test('Consume callback throws error', function(ch, done, dom) {
dom.on('error', failCallback(done));
ch.assertQueue('test.cb.consume-with-error', {}, function(err, ok) {
ch.consume('test.cb.consume-with-error', ignore, {noAck: true}, function() {
throw new Error('Spurious callback error');
});
});
});
error_test('Consume from non-queue invokes error k', function(ch, done, dom) {
var both = twice(failCallback(done));
dom.on('error', both.first);
ch.consume('', both.second);
});
error_test('Get from non-queue invokes error k', function(ch, done, dom) {
var both = twice(failCallback(done));
dom.on('error', both.first);
ch.get('', {}, both.second);
});
error_test('Consume from non-queue invokes error k', function(ch, done, dom) {
var both = twice(failCallback(done));
dom.on('error', both.first);
ch.consume('', both.second);
});
});

@@ -52,12 +52,22 @@ 'use strict';

test("at all", function(done) {
connect(URL).then(function(c) {
return c.close()
;}).then(succeed(done), fail(done));
});
test("at all", function(done) {
connect(URL).then(function(c) {
return c.close()
;}).then(succeed(done), fail(done));
});
chtest("create channel", ignore); // i.e., just don't bork
chtest("create channel", ignore); // i.e., just don't bork
});
suite("updateSecret", function() {
test("updateSecret", function(done) {
connect().then(function(c) {
c.updateSecret(Buffer.from("new secret"), "no reason")
.then(succeed(done), fail(done))
.finally(function() { c.close(); });
});
});
});
var QUEUE_OPTS = {durable: false};

@@ -68,75 +78,75 @@ var EX_OPTS = {durable: false};

chtest("assert and check queue", function(ch) {
return ch.assertQueue('test.check-queue', QUEUE_OPTS)
.then(function(qok) {
return ch.checkQueue('test.check-queue');
});
});
chtest("assert and check queue", function(ch) {
return ch.assertQueue('test.check-queue', QUEUE_OPTS)
.then(function(qok) {
return ch.checkQueue('test.check-queue');
});
});
chtest("assert and check exchange", function(ch) {
return ch.assertExchange('test.check-exchange', 'direct', EX_OPTS)
.then(function(eok) {
assert.equal('test.check-exchange', eok.exchange);
return ch.checkExchange('test.check-exchange');
});
});
chtest("assert and check exchange", function(ch) {
return ch.assertExchange('test.check-exchange', 'direct', EX_OPTS)
.then(function(eok) {
assert.equal('test.check-exchange', eok.exchange);
return ch.checkExchange('test.check-exchange');
});
});
chtest("fail on reasserting queue with different options",
function(ch) {
var q = 'test.reassert-queue';
return ch.assertQueue(
q, {durable: false, autoDelete: true})
.then(function() {
return expectFail(
ch.assertQueue(q, {durable: false,
autoDelete: false}));
});
});
chtest("fail on reasserting queue with different options",
function(ch) {
var q = 'test.reassert-queue';
return ch.assertQueue(
q, {durable: false, autoDelete: true})
.then(function() {
return expectFail(
ch.assertQueue(q, {durable: false,
autoDelete: false}));
});
});
chtest("fail on checking a queue that's not there", function(ch) {
return expectFail(ch.checkQueue('test.random-' + randomString()));
});
chtest("fail on checking a queue that's not there", function(ch) {
return expectFail(ch.checkQueue('test.random-' + randomString()));
});
chtest("fail on checking an exchange that's not there", function(ch) {
return expectFail(ch.checkExchange('test.random-' + randomString()));
});
chtest("fail on checking an exchange that's not there", function(ch) {
return expectFail(ch.checkExchange('test.random-' + randomString()));
});
chtest("fail on reasserting exchange with different type",
function(ch) {
var ex = 'test.reassert-ex';
return ch.assertExchange(ex, 'fanout', EX_OPTS)
.then(function() {
return expectFail(
ch.assertExchange(ex, 'direct', EX_OPTS));
});
});
chtest("fail on reasserting exchange with different type",
function(ch) {
var ex = 'test.reassert-ex';
return ch.assertExchange(ex, 'fanout', EX_OPTS)
.then(function() {
return expectFail(
ch.assertExchange(ex, 'direct', EX_OPTS));
});
});
chtest("channel break on publishing to non-exchange", function(ch) {
return new Promise(function(resolve) {
ch.on('error', resolve);
ch.publish(randomString(), '', Buffer.from('foobar'));
chtest("channel break on publishing to non-exchange", function(ch) {
return new Promise(function(resolve) {
ch.on('error', resolve);
ch.publish(randomString(), '', Buffer.from('foobar'));
});
});
});
chtest("delete queue", function(ch) {
var q = 'test.delete-queue';
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS),
ch.checkQueue(q)])
.then(function() {
return ch.deleteQueue(q);})
.then(function() {
return expectFail(ch.checkQueue(q));});
});
chtest("delete queue", function(ch) {
var q = 'test.delete-queue';
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS),
ch.checkQueue(q)])
.then(function() {
return ch.deleteQueue(q);})
.then(function() {
return expectFail(ch.checkQueue(q));});
});
chtest("delete exchange", function(ch) {
var ex = 'test.delete-exchange';
return Promise.all([
ch.assertExchange(ex, 'fanout', EX_OPTS),
ch.checkExchange(ex)])
.then(function() {
return ch.deleteExchange(ex);})
.then(function() {
return expectFail(ch.checkExchange(ex));});
});
chtest("delete exchange", function(ch) {
var ex = 'test.delete-exchange';
return Promise.all([
ch.assertExchange(ex, 'fanout', EX_OPTS),
ch.checkExchange(ex)])
.then(function() {
return ch.deleteExchange(ex);})
.then(function() {
return expectFail(ch.checkExchange(ex));});
});

@@ -178,36 +188,36 @@ });

// publish different size messages
chtest("send to queue and get from queue", function(ch) {
var q = 'test.send-to-q';
var msg = randomString();
return Promise.all([ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
.then(function() {
ch.sendToQueue(q, Buffer.from(msg));
return waitForMessages(q);
})
.then(function() {
return ch.get(q, {noAck: true});
})
.then(function(m) {
assert(m);
assert.equal(msg, m.content.toString());
});
});
// publish different size messages
chtest("send to queue and get from queue", function(ch) {
var q = 'test.send-to-q';
var msg = randomString();
return Promise.all([ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
.then(function() {
ch.sendToQueue(q, Buffer.from(msg));
return waitForMessages(q);
})
.then(function() {
return ch.get(q, {noAck: true});
})
.then(function(m) {
assert(m);
assert.equal(msg, m.content.toString());
});
});
chtest("send (and get) zero content to queue", function(ch) {
var q = 'test.send-to-q';
var msg = Buffer.alloc(0);
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q)])
.then(function() {
ch.sendToQueue(q, msg);
return waitForMessages(q);})
.then(function() {
return ch.get(q, {noAck: true});})
.then(function(m) {
assert(m);
assert.deepEqual(msg, m.content);
});
});
chtest("send (and get) zero content to queue", function(ch) {
var q = 'test.send-to-q';
var msg = Buffer.alloc(0);
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q)])
.then(function() {
ch.sendToQueue(q, msg);
return waitForMessages(q);})
.then(function() {
return ch.get(q, {noAck: true});})
.then(function(m) {
assert(m);
assert.deepEqual(msg, m.content);
});
});

@@ -218,304 +228,304 @@ });

// bind, publish, get
chtest("route message", function(ch) {
var ex = 'test.route-message';
var q = 'test.route-message-q';
var msg = randomString();
// bind, publish, get
chtest("route message", function(ch) {
var ex = 'test.route-message';
var q = 'test.route-message-q';
var msg = randomString();
return Promise.all([
ch.assertExchange(ex, 'fanout', EX_OPTS),
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q),
ch.bindQueue(q, ex, '', {})])
.then(function() {
ch.publish(ex, '', Buffer.from(msg));
return waitForMessages(q);})
.then(function() {
return ch.get(q, {noAck: true});})
.then(function(m) {
assert(m);
assert.equal(msg, m.content.toString());
});
});
return Promise.all([
ch.assertExchange(ex, 'fanout', EX_OPTS),
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q),
ch.bindQueue(q, ex, '', {})])
.then(function() {
ch.publish(ex, '', Buffer.from(msg));
return waitForMessages(q);})
.then(function() {
return ch.get(q, {noAck: true});})
.then(function(m) {
assert(m);
assert.equal(msg, m.content.toString());
});
});
// send to queue, purge, get-empty
chtest("purge queue", function(ch) {
var q = 'test.purge-queue';
return ch.assertQueue(q, {durable: false})
.then(function() {
ch.sendToQueue(q, Buffer.from('foobar'));
return waitForMessages(q);})
.then(function() {
ch.purgeQueue(q);
return ch.get(q, {noAck: true});})
.then(function(m) {
assert(!m); // get-empty
});
});
// send to queue, purge, get-empty
chtest("purge queue", function(ch) {
var q = 'test.purge-queue';
return ch.assertQueue(q, {durable: false})
.then(function() {
ch.sendToQueue(q, Buffer.from('foobar'));
return waitForMessages(q);})
.then(function() {
ch.purgeQueue(q);
return ch.get(q, {noAck: true});})
.then(function(m) {
assert(!m); // get-empty
});
});
// bind again, unbind, publish, get-empty
chtest("unbind queue", function(ch) {
var ex = 'test.unbind-queue-ex';
var q = 'test.unbind-queue';
var viabinding = randomString();
var direct = randomString();
// bind again, unbind, publish, get-empty
chtest("unbind queue", function(ch) {
var ex = 'test.unbind-queue-ex';
var q = 'test.unbind-queue';
var viabinding = randomString();
var direct = randomString();
return Promise.all([
ch.assertExchange(ex, 'fanout', EX_OPTS),
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q),
ch.bindQueue(q, ex, '', {})])
.then(function() {
ch.publish(ex, '', Buffer.from('foobar'));
return waitForMessages(q);})
.then(function() { // message got through!
return ch.get(q, {noAck:true})
.then(function(m) {assert(m);});})
.then(function() {
return ch.unbindQueue(q, ex, '', {});})
.then(function() {
// via the no-longer-existing binding
ch.publish(ex, '', Buffer.from(viabinding));
// direct to the queue
ch.sendToQueue(q, Buffer.from(direct));
return waitForMessages(q);})
.then(function() {return ch.get(q)})
.then(function(m) {
// the direct to queue message got through, the via-binding
// message (sent first) did not
assert.equal(direct, m.content.toString());
});
});
return Promise.all([
ch.assertExchange(ex, 'fanout', EX_OPTS),
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q),
ch.bindQueue(q, ex, '', {})])
.then(function() {
ch.publish(ex, '', Buffer.from('foobar'));
return waitForMessages(q);})
.then(function() { // message got through!
return ch.get(q, {noAck:true})
.then(function(m) {assert(m);});})
.then(function() {
return ch.unbindQueue(q, ex, '', {});})
.then(function() {
// via the no-longer-existing binding
ch.publish(ex, '', Buffer.from(viabinding));
// direct to the queue
ch.sendToQueue(q, Buffer.from(direct));
return waitForMessages(q);})
.then(function() {return ch.get(q)})
.then(function(m) {
// the direct to queue message got through, the via-binding
// message (sent first) did not
assert.equal(direct, m.content.toString());
});
});
// To some extent this is now just testing semantics of the server,
// but we can at least try out a few settings, and consume.
chtest("consume via exchange-exchange binding", function(ch) {
var ex1 = 'test.ex-ex-binding1', ex2 = 'test.ex-ex-binding2';
var q = 'test.ex-ex-binding-q';
var rk = 'test.routing.key', msg = randomString();
return Promise.all([
ch.assertExchange(ex1, 'direct', EX_OPTS),
ch.assertExchange(ex2, 'fanout',
{durable: false, internal: true}),
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q),
ch.bindExchange(ex2, ex1, rk, {}),
ch.bindQueue(q, ex2, '', {})])
.then(function() {
return new Promise(function(resolve, reject) {
function delivery(m) {
if (m.content.toString() === msg) resolve();
else reject(new Error("Wrong message"));
}
ch.consume(q, delivery, {noAck: true})
.then(function() {
ch.publish(ex1, rk, Buffer.from(msg));
});
// To some extent this is now just testing semantics of the server,
// but we can at least try out a few settings, and consume.
chtest("consume via exchange-exchange binding", function(ch) {
var ex1 = 'test.ex-ex-binding1', ex2 = 'test.ex-ex-binding2';
var q = 'test.ex-ex-binding-q';
var rk = 'test.routing.key', msg = randomString();
return Promise.all([
ch.assertExchange(ex1, 'direct', EX_OPTS),
ch.assertExchange(ex2, 'fanout',
{durable: false, internal: true}),
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q),
ch.bindExchange(ex2, ex1, rk, {}),
ch.bindQueue(q, ex2, '', {})])
.then(function() {
return new Promise(function(resolve, reject) {
function delivery(m) {
if (m.content.toString() === msg) resolve();
else reject(new Error("Wrong message"));
}
ch.consume(q, delivery, {noAck: true})
.then(function() {
ch.publish(ex1, rk, Buffer.from(msg));
});
});
});
});
// bind again, unbind, publish, get-empty
chtest("unbind exchange", function(ch) {
var source = 'test.unbind-ex-source';
var dest = 'test.unbind-ex-dest';
var q = 'test.unbind-ex-queue';
var viabinding = randomString();
var direct = randomString();
return Promise.all([
ch.assertExchange(source, 'fanout', EX_OPTS),
ch.assertExchange(dest, 'fanout', EX_OPTS),
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q),
ch.bindExchange(dest, source, '', {}),
ch.bindQueue(q, dest, '', {})])
.then(function() {
ch.publish(source, '', Buffer.from('foobar'));
return waitForMessages(q);})
.then(function() { // message got through!
return ch.get(q, {noAck:true})
.then(function(m) {assert(m);});})
.then(function() {
return ch.unbindExchange(dest, source, '', {});})
.then(function() {
// via the no-longer-existing binding
ch.publish(source, '', Buffer.from(viabinding));
// direct to the queue
ch.sendToQueue(q, Buffer.from(direct));
return waitForMessages(q);})
.then(function() {return ch.get(q)})
.then(function(m) {
// the direct to queue message got through, the via-binding
// message (sent first) did not
assert.equal(direct, m.content.toString());
});
});
// This is a bit convoluted. Sorry.
chtest("cancel consumer", function(ch) {
var q = 'test.consumer-cancel';
var ctag;
var recv1 = new Promise(function (resolve, reject) {
Promise.all([
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q),
// My callback is 'resolve the promise in `arrived`'
ch.consume(q, resolve, {noAck:true})
.then(function(ok) {
ctag = ok.consumerTag;
ch.sendToQueue(q, Buffer.from('foo'));
})]);
});
});
// bind again, unbind, publish, get-empty
chtest("unbind exchange", function(ch) {
var source = 'test.unbind-ex-source';
var dest = 'test.unbind-ex-dest';
var q = 'test.unbind-ex-queue';
var viabinding = randomString();
var direct = randomString();
// A message should arrive because of the consume
return recv1.then(function() {
var recv2 = Promise.all([
ch.cancel(ctag).then(function() {
return ch.sendToQueue(q, Buffer.from('bar'));
}),
// but check a message did arrive in the queue
waitForMessages(q)])
.then(function() {
return ch.get(q, {noAck:true});
})
.then(function(m) {
// I'm going to reject it, because I flip succeed/fail
// just below
if (m.content.toString() === 'bar') {
throw new Error();
}
});
return Promise.all([
ch.assertExchange(source, 'fanout', EX_OPTS),
ch.assertExchange(dest, 'fanout', EX_OPTS),
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q),
ch.bindExchange(dest, source, '', {}),
ch.bindQueue(q, dest, '', {})])
.then(function() {
ch.publish(source, '', Buffer.from('foobar'));
return waitForMessages(q);})
.then(function() { // message got through!
return ch.get(q, {noAck:true})
.then(function(m) {assert(m);});})
.then(function() {
return ch.unbindExchange(dest, source, '', {});})
.then(function() {
// via the no-longer-existing binding
ch.publish(source, '', Buffer.from(viabinding));
// direct to the queue
ch.sendToQueue(q, Buffer.from(direct));
return waitForMessages(q);})
.then(function() {return ch.get(q)})
.then(function(m) {
// the direct to queue message got through, the via-binding
// message (sent first) did not
assert.equal(direct, m.content.toString());
return expectFail(recv2);
});
});
});
// This is a bit convoluted. Sorry.
chtest("cancel consumer", function(ch) {
var q = 'test.consumer-cancel';
var ctag;
var recv1 = new Promise(function (resolve, reject) {
Promise.all([
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q),
// My callback is 'resolve the promise in `arrived`'
ch.consume(q, resolve, {noAck:true})
.then(function(ok) {
ctag = ok.consumerTag;
ch.sendToQueue(q, Buffer.from('foo'));
})]);
chtest("cancelled consumer", function(ch) {
var q = 'test.cancelled-consumer';
return new Promise(function(resolve, reject) {
return Promise.all([
ch.assertQueue(q),
ch.purgeQueue(q),
ch.consume(q, function(msg) {
if (msg === null) resolve();
else reject(new Error('Message not expected'));
})])
.then(function() {
return ch.deleteQueue(q);
});
});
});
// A message should arrive because of the consume
return recv1.then(function() {
var recv2 = Promise.all([
ch.cancel(ctag).then(function() {
return ch.sendToQueue(q, Buffer.from('bar'));
}),
// but check a message did arrive in the queue
waitForMessages(q)])
// ack, by default, removes a single message from the queue
chtest("ack", function(ch) {
var q = 'test.ack';
var msg1 = randomString(), msg2 = randomString();
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q)])
.then(function() {
return ch.get(q, {noAck:true});
ch.sendToQueue(q, Buffer.from(msg1));
ch.sendToQueue(q, Buffer.from(msg2));
return waitForMessages(q, 2);
})
.then(function() {
return ch.get(q, {noAck: false})
})
.then(function(m) {
// I'm going to reject it, because I flip succeed/fail
// just below
if (m.content.toString() === 'bar') {
throw new Error();
}
assert.equal(msg1, m.content.toString());
ch.ack(m);
// %%% is there a race here? may depend on
// rabbitmq-sepcific semantics
return ch.get(q);
})
.then(function(m) {
assert(m);
assert.equal(msg2, m.content.toString());
});
return expectFail(recv2);
});
});
chtest("cancelled consumer", function(ch) {
var q = 'test.cancelled-consumer';
return new Promise(function(resolve, reject) {
// Nack, by default, puts a message back on the queue (where in the
// queue is up to the server)
chtest("nack", function(ch) {
var q = 'test.nack';
var msg1 = randomString();
return Promise.all([
ch.assertQueue(q),
ch.purgeQueue(q),
ch.consume(q, function(msg) {
if (msg === null) resolve();
else reject(new Error('Message not expected'));
})])
ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
.then(function() {
return ch.deleteQueue(q);
ch.sendToQueue(q, Buffer.from(msg1));
return waitForMessages(q);})
.then(function() {
return ch.get(q, {noAck: false})})
.then(function(m) {
assert.equal(msg1, m.content.toString());
ch.nack(m);
return waitForMessages(q);})
.then(function() {
return ch.get(q);})
.then(function(m) {
assert(m);
assert.equal(msg1, m.content.toString());
});
});
});
// ack, by default, removes a single message from the queue
chtest("ack", function(ch) {
var q = 'test.ack';
var msg1 = randomString(), msg2 = randomString();
// reject is a near-synonym for nack, the latter of which is not
// available in earlier RabbitMQ (or in AMQP proper).
chtest("reject", function(ch) {
var q = 'test.reject';
var msg1 = randomString();
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS),
ch.purgeQueue(q)])
.then(function() {
ch.sendToQueue(q, Buffer.from(msg1));
ch.sendToQueue(q, Buffer.from(msg2));
return waitForMessages(q, 2);
})
.then(function() {
return ch.get(q, {noAck: false})
})
.then(function(m) {
assert.equal(msg1, m.content.toString());
ch.ack(m);
// %%% is there a race here? may depend on
// rabbitmq-sepcific semantics
return ch.get(q);
})
.then(function(m) {
assert(m);
assert.equal(msg2, m.content.toString());
});
});
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
.then(function() {
ch.sendToQueue(q, Buffer.from(msg1));
return waitForMessages(q);})
.then(function() {
return ch.get(q, {noAck: false})})
.then(function(m) {
assert.equal(msg1, m.content.toString());
ch.reject(m);
return waitForMessages(q);})
.then(function() {
return ch.get(q);})
.then(function(m) {
assert(m);
assert.equal(msg1, m.content.toString());
});
});
// Nack, by default, puts a message back on the queue (where in the
// queue is up to the server)
chtest("nack", function(ch) {
var q = 'test.nack';
var msg1 = randomString();
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
.then(function() {
ch.sendToQueue(q, Buffer.from(msg1));
return waitForMessages(q);})
.then(function() {
return ch.get(q, {noAck: false})})
.then(function(m) {
assert.equal(msg1, m.content.toString());
ch.nack(m);
return waitForMessages(q);})
.then(function() {
return ch.get(q);})
.then(function(m) {
assert(m);
assert.equal(msg1, m.content.toString());
});
});
// reject is a near-synonym for nack, the latter of which is not
// available in earlier RabbitMQ (or in AMQP proper).
chtest("reject", function(ch) {
var q = 'test.reject';
var msg1 = randomString();
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
.then(function() {
ch.sendToQueue(q, Buffer.from(msg1));
return waitForMessages(q);})
.then(function() {
return ch.get(q, {noAck: false})})
.then(function(m) {
assert.equal(msg1, m.content.toString());
ch.reject(m);
return waitForMessages(q);})
.then(function() {
return ch.get(q);})
.then(function(m) {
assert(m);
assert.equal(msg1, m.content.toString());
});
});
chtest("prefetch", function(ch) {
var q = 'test.prefetch';
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q),
ch.prefetch(1)])
.then(function() {
ch.sendToQueue(q, Buffer.from('foobar'));
ch.sendToQueue(q, Buffer.from('foobar'));
return waitForMessages(q, 2);
})
.then(function() {
return new Promise(function(resolve) {
var messageCount = 0;
function receive(msg) {
ch.ack(msg);
if (++messageCount > 1) {
resolve(messageCount);
chtest("prefetch", function(ch) {
var q = 'test.prefetch';
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q),
ch.prefetch(1)])
.then(function() {
ch.sendToQueue(q, Buffer.from('foobar'));
ch.sendToQueue(q, Buffer.from('foobar'));
return waitForMessages(q, 2);
})
.then(function() {
return new Promise(function(resolve) {
var messageCount = 0;
function receive(msg) {
ch.ack(msg);
if (++messageCount > 1) {
resolve(messageCount);
}
}
}
return ch.consume(q, receive, {noAck: false})
return ch.consume(q, receive, {noAck: false})
});
})
.then(function(c) {
return assert.equal(2, c);
});
})
.then(function(c) {
return assert.equal(2, c);
});
});
});
chtest('close', function(ch) {
// Resolving promise guarantees
// channel is closed
return ch.close();
});
chtest('close', function(ch) {
// Resolving promise guarantees
// channel is closed
return ch.close();
});

@@ -528,74 +538,74 @@ });

confirmtest('message is confirmed', function(ch) {
var q = 'test.confirm-message';
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
.then(function() {
return ch.sendToQueue(q, Buffer.from('bleep'));
});
});
// Usually one can provoke the server into confirming more than one
// message in an ack by simply sending a few messages in quick
// succession; a bit unscientific I know. Luckily we can eavesdrop on
// the acknowledgements coming through to see if we really did get a
// multi-ack.
confirmtest('multiple confirms', function(ch) {
var q = 'test.multiple-confirms';
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
.then(function() {
var multipleRainbows = false;
ch.on('ack', function(a) {
if (a.multiple) multipleRainbows = true;
confirmtest('message is confirmed', function(ch) {
var q = 'test.confirm-message';
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
.then(function() {
return ch.sendToQueue(q, Buffer.from('bleep'));
});
});
function prod(num) {
var cs = [];
// Usually one can provoke the server into confirming more than one
// message in an ack by simply sending a few messages in quick
// succession; a bit unscientific I know. Luckily we can eavesdrop on
// the acknowledgements coming through to see if we really did get a
// multi-ack.
confirmtest('multiple confirms', function(ch) {
var q = 'test.multiple-confirms';
return Promise.all([
ch.assertQueue(q, QUEUE_OPTS), ch.purgeQueue(q)])
.then(function() {
var multipleRainbows = false;
ch.on('ack', function(a) {
if (a.multiple) multipleRainbows = true;
});
function sendAndPushPromise() {
var conf = promisify(function(cb) {
return ch.sendToQueue(q, Buffer.from('bleep'), {}, cb);
})();
cs.push(conf);
}
function prod(num) {
var cs = [];
for (var i=0; i < num; i++) sendAndPushPromise();
return Promise.all(cs).then(function() {
if (multipleRainbows) return true;
else if (num > 500) throw new Error(
"Couldn't provoke the server" +
" into multi-acking with " + num +
" messages; giving up");
else {
//console.warn("Failed with " + num + "; trying " + num * 2);
return prod(num * 2);
function sendAndPushPromise() {
var conf = promisify(function(cb) {
return ch.sendToQueue(q, Buffer.from('bleep'), {}, cb);
})();
cs.push(conf);
}
});
}
return prod(5);
});
});
confirmtest('wait for confirms', function(ch) {
for (var i=0; i < 1000; i++) {
ch.publish('', '', Buffer.from('foobar'), {});
}
return ch.waitForConfirms();
})
for (var i=0; i < num; i++) sendAndPushPromise();
confirmtest('works when channel is closed', function(ch) {
for (var i=0; i < 1000; i++) {
ch.publish('', '', Buffer.from('foobar'), {});
}
return ch.close().then(function () {
return ch.waitForConfirms()
}).then(function () {
assert.strictEqual(true, false, 'Wait should have failed.')
}, function (e) {
assert.strictEqual(e.message, 'channel closed')
return Promise.all(cs).then(function() {
if (multipleRainbows) return true;
else if (num > 500) throw new Error(
"Couldn't provoke the server" +
" into multi-acking with " + num +
" messages; giving up");
else {
//console.warn("Failed with " + num + "; trying " + num * 2);
return prod(num * 2);
}
});
}
return prod(5);
});
});
});
confirmtest('wait for confirms', function(ch) {
for (var i=0; i < 1000; i++) {
ch.publish('', '', Buffer.from('foobar'), {});
}
return ch.waitForConfirms();
})
confirmtest('works when channel is closed', function(ch) {
for (var i=0; i < 1000; i++) {
ch.publish('', '', Buffer.from('foobar'), {});
}
return ch.close().then(function () {
return ch.waitForConfirms()
}).then(function () {
assert.strictEqual(true, false, 'Wait should have failed.')
}, function (e) {
assert.strictEqual(e.message, 'channel closed')
});
});
});

@@ -73,3 +73,3 @@ 'use strict';

var size = codec.encodeTable(buffer, val, 0);
var result = buffer.slice(4, size);
var result = buffer.subarray(4, size);
assert.deepEqual(expect, bufferToArray(result));

@@ -87,3 +87,3 @@ });

var size = codec.encodeTable(buf, t, 0);
var decoded = codec.decodeFields(buf.slice(4, size)); // ignore the length-prefix
var decoded = codec.decodeFields(buf.subarray(4, size)); // ignore the length-prefix
try {

@@ -209,3 +209,3 @@ assert.deepEqual(removeExplicitTypes(t), decoded);

// FIXME depends on framing, ugh
var fs1 = defs.decode(method.id, buf.slice(11, buf.length));
var fs1 = defs.decode(method.id, buf.subarray(11, buf.length));
assertEqualModuloDefaults(method, fs1);

@@ -221,3 +221,3 @@ return true;

// FIXME depends on framing, ugh
var fs1 = defs.decode(properties.id, buf.slice(19, buf.length));
var fs1 = defs.decode(properties.id, buf.subarray(19, buf.length));
assert.equal(properties.size, ints.readUInt64BE(buf, 11));

@@ -224,0 +224,0 @@ assertEqualModuloDefaults(properties, fs1);

@@ -41,5 +41,5 @@ 'use strict';

var frames = new Frames(input);
input.write(HB.slice(0, 3));
input.write(HB.subarray(0, 3));
assert(!frames.recvFrame());
input.write(HB.slice(3));
input.write(HB.subarray(3));
assert(frames.recvFrame() === HEARTBEAT);

@@ -147,5 +147,5 @@ assert(!frames.recvFrame());

return [
full.slice(0, onethird),
full.slice(onethird, twothirds),
full.slice(twothirds)
full.subarray(0, onethird),
full.subarray(onethird, twothirds),
full.subarray(twothirds)
];

@@ -152,0 +152,0 @@ }));

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc