Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

amqp

Package Overview
Dependencies
Maintainers
2
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

amqp - npm Package Compare versions

Comparing version 0.1.3 to 0.1.4

amqp-0-9-1-rabbit.xml

2

amqp-definitions-0-9-1.js
exports.constants = [[1,"frameMethod"],[2,"frameHeader"],[3,"frameBody"],[8,"frameHeartbeat"],[200,"replySuccess"],[206,"frameEnd"],[311,"contentTooLarge"],[313,"noConsumers"],[320,"connectionForced"],[402,"invalidPath"],[403,"accessRefused"],[404,"notFound"],[405,"resourceLocked"],[406,"preconditionFailed"],[501,"frameError"],[502,"syntaxError"],[503,"commandInvalid"],[504,"channelError"],[505,"unexpectedFrame"],[506,"resourceError"],[530,"notAllowed"],[540,"notImplemented"],[541,"internalError"],[4096,"frameMinSize"]];
exports.classes = [{"name":"connection","index":10,"fields":[],"methods":[{"name":"start","index":10,"fields":[{"name":"versionMajor","domain":"octet"},{"name":"versionMinor","domain":"octet"},{"name":"serverProperties","domain":"table"},{"name":"mechanisms","domain":"longstr"},{"name":"locales","domain":"longstr"}]},{"name":"startOk","index":11,"fields":[{"name":"clientProperties","domain":"table"},{"name":"mechanism","domain":"shortstr"},{"name":"response","domain":"longstr"},{"name":"locale","domain":"shortstr"}]},{"name":"secure","index":20,"fields":[{"name":"challenge","domain":"longstr"}]},{"name":"secureOk","index":21,"fields":[{"name":"response","domain":"longstr"}]},{"name":"tune","index":30,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"tuneOk","index":31,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"open","index":40,"fields":[{"name":"virtualHost","domain":"shortstr"},{"name":"reserved1","domain":"shortstr"},{"name":"reserved2","domain":"bit"}]},{"name":"openOk","index":41,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"close","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":51,"fields":[]}]},{"name":"channel","index":20,"fields":[],"methods":[{"name":"open","index":10,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"openOk","index":11,"fields":[{"name":"reserved1","domain":"longstr"}]},{"name":"flow","index":20,"fields":[{"name":"active","domain":"bit"}]},{"name":"flowOk","index":21,"fields":[{"name":"active","domain":"bit"}]},{"name":"close","index":40,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":41,"fields":[]}]},{"name":"exchange","index":40,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"type","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"reserved2","domain":"bit"},{"name":"reserved3","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[]},{"name":"delete","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":21,"fields":[]}]},{"name":"queue","index":50,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[{"name":"queue","domain":"shortstr"},{"name":"messageCount","domain":"long"},{"name":"consumerCount","domain":"long"}]},{"name":"bind","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"bindOk","index":21,"fields":[]},{"name":"unbind","index":50,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"arguments","domain":"table"}]},{"name":"unbindOk","index":51,"fields":[]},{"name":"purge","index":30,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"purgeOk","index":31,"fields":[{"name":"messageCount","domain":"long"}]},{"name":"delete","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"ifEmpty","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":41,"fields":[{"name":"messageCount","domain":"long"}]}]},{"name":"basic","index":60,"fields":[{"name":"contentType","domain":"shortstr"},{"name":"contentEncoding","domain":"shortstr"},{"name":"headers","domain":"table"},{"name":"deliveryMode","domain":"octet"},{"name":"priority","domain":"octet"},{"name":"correlationId","domain":"shortstr"},{"name":"replyTo","domain":"shortstr"},{"name":"expiration","domain":"shortstr"},{"name":"messageId","domain":"shortstr"},{"name":"timestamp","domain":"timestamp"},{"name":"type","domain":"shortstr"},{"name":"userId","domain":"shortstr"},{"name":"appId","domain":"shortstr"},{"name":"reserved","domain":"shortstr"}],"methods":[{"name":"qos","index":10,"fields":[{"name":"prefetchSize","domain":"long"},{"name":"prefetchCount","domain":"short"},{"name":"global","domain":"bit"}]},{"name":"qosOk","index":11,"fields":[]},{"name":"consume","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"consumerTag","domain":"shortstr"},{"name":"noLocal","domain":"bit"},{"name":"noAck","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"consumeOk","index":21,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"cancel","index":30,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"cancelOk","index":31,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"publish","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"mandatory","domain":"bit"},{"name":"immediate","domain":"bit"}]},{"name":"return","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"deliver","index":60,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"get","index":70,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noAck","domain":"bit"}]},{"name":"getOk","index":71,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"messageCount","domain":"long"}]},{"name":"getEmpty","index":72,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"ack","index":80,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"multiple","domain":"bit"}]},{"name":"reject","index":90,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"requeue","domain":"bit"}]},{"name":"recoverAsync","index":100,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recover","index":110,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recoverOk","index":111,"fields":[]}]},{"name":"tx","index":90,"fields":[],"methods":[{"name":"select","index":10,"fields":[]},{"name":"selectOk","index":11,"fields":[]},{"name":"commit","index":20,"fields":[]},{"name":"commitOk","index":21,"fields":[]},{"name":"rollback","index":30,"fields":[]},{"name":"rollbackOk","index":31,"fields":[]}]}];
exports.classes = [{"name":"connection","index":10,"fields":[],"methods":[{"name":"start","index":10,"fields":[{"name":"versionMajor","domain":"octet"},{"name":"versionMinor","domain":"octet"},{"name":"serverProperties","domain":"table"},{"name":"mechanisms","domain":"longstr"},{"name":"locales","domain":"longstr"}]},{"name":"startOk","index":11,"fields":[{"name":"clientProperties","domain":"table"},{"name":"mechanism","domain":"shortstr"},{"name":"response","domain":"longstr"},{"name":"locale","domain":"shortstr"}]},{"name":"secure","index":20,"fields":[{"name":"challenge","domain":"longstr"}]},{"name":"secureOk","index":21,"fields":[{"name":"response","domain":"longstr"}]},{"name":"tune","index":30,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"tuneOk","index":31,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"open","index":40,"fields":[{"name":"virtualHost","domain":"shortstr"},{"name":"reserved1","domain":"shortstr"},{"name":"reserved2","domain":"bit"}]},{"name":"openOk","index":41,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"close","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":51,"fields":[]}]},{"name":"channel","index":20,"fields":[],"methods":[{"name":"open","index":10,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"openOk","index":11,"fields":[{"name":"reserved1","domain":"longstr"}]},{"name":"flow","index":20,"fields":[{"name":"active","domain":"bit"}]},{"name":"flowOk","index":21,"fields":[{"name":"active","domain":"bit"}]},{"name":"close","index":40,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":41,"fields":[]}]},{"name":"exchange","index":40,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"type","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"reserved2","domain":"bit"},{"name":"reserved3","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[]},{"name":"delete","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":21,"fields":[]}]},{"name":"queue","index":50,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[{"name":"queue","domain":"shortstr"},{"name":"messageCount","domain":"long"},{"name":"consumerCount","domain":"long"}]},{"name":"bind","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"bindOk","index":21,"fields":[]},{"name":"unbind","index":50,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"arguments","domain":"table"}]},{"name":"unbindOk","index":51,"fields":[]},{"name":"purge","index":30,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"purgeOk","index":31,"fields":[{"name":"messageCount","domain":"long"}]},{"name":"delete","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"ifEmpty","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":41,"fields":[{"name":"messageCount","domain":"long"}]}]},{"name":"basic","index":60,"fields":[{"name":"contentType","domain":"shortstr"},{"name":"contentEncoding","domain":"shortstr"},{"name":"headers","domain":"table"},{"name":"deliveryMode","domain":"octet"},{"name":"priority","domain":"octet"},{"name":"correlationId","domain":"shortstr"},{"name":"replyTo","domain":"shortstr"},{"name":"expiration","domain":"shortstr"},{"name":"messageId","domain":"shortstr"},{"name":"timestamp","domain":"timestamp"},{"name":"type","domain":"shortstr"},{"name":"userId","domain":"shortstr"},{"name":"appId","domain":"shortstr"},{"name":"reserved","domain":"shortstr"}],"methods":[{"name":"qos","index":10,"fields":[{"name":"prefetchSize","domain":"long"},{"name":"prefetchCount","domain":"short"},{"name":"global","domain":"bit"}]},{"name":"qosOk","index":11,"fields":[]},{"name":"consume","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"consumerTag","domain":"shortstr"},{"name":"noLocal","domain":"bit"},{"name":"noAck","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"consumeOk","index":21,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"cancel","index":30,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"cancelOk","index":31,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"publish","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"mandatory","domain":"bit"},{"name":"immediate","domain":"bit"}]},{"name":"return","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"deliver","index":60,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"get","index":70,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noAck","domain":"bit"}]},{"name":"getOk","index":71,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"messageCount","domain":"long"}]},{"name":"getEmpty","index":72,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"ack","index":80,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"multiple","domain":"bit"}]},{"name":"reject","index":90,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"requeue","domain":"bit"}]},{"name":"recoverAsync","index":100,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recover","index":110,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recoverOk","index":111,"fields":[]}]},{"name":"tx","index":90,"fields":[],"methods":[{"name":"select","index":10,"fields":[]},{"name":"selectOk","index":11,"fields":[]},{"name":"commit","index":20,"fields":[]},{"name":"commitOk","index":21,"fields":[]},{"name":"rollback","index":30,"fields":[]},{"name":"rollbackOk","index":31,"fields":[]}]},{"name":"confirm","index":85,"fields":[],"methods":[{"name":"select","index":10,"fields":[{"name":"noWait","domain":"bit"}]},{"name":"selectOk","index":11,"fields":[]}]}];

@@ -8,4 +8,7 @@ var events = require('events'),

Promise = require('./promise').Promise,
URL = require('url');
URL = require('url'),
AMQPTypes = require('./constants').AMQPTypes,
Indicators = require('./constants').Indicators,
FrameType = require('./constants').FrameType;
function mixin () {

@@ -185,11 +188,11 @@ // copy reference to target object

if (data.length > 0) {
if (data[0] === 206) {
if (data[0] === Indicators.FRAME_END) {
switch (frameType) {
case 1:
case FrameType.METHOD:
self._parseMethodFrame(frameChannel, frameBuffer);
break;
case 2:
case FrameType.HEADER:
self._parseHeaderFrame(frameChannel, frameBuffer);
break;
case 3:
case FrameType.BODY:
if (self.onContent) {

@@ -199,4 +202,4 @@ self.onContent(frameChannel, frameBuffer);

break;
case 8:
debug("hearbeat");
case FrameType.HEARTBEAT:
debug("heartbeat");
if (self.onHeartBeat) self.onHeartBeat();

@@ -289,69 +292,73 @@ break;

function parseValue (buffer) {
switch (buffer[buffer.read++]) {
case AMQPTypes.STRING:
return parseLongString(buffer);
function parseTable (buffer) {
var length = buffer.read + parseInt(buffer, 4);
var table = {};
while (buffer.read < length) {
var field = parseShortString(buffer);
switch (buffer[buffer.read++]) {
case 'S'.charCodeAt(0):
table[field] = parseLongString(buffer);
break;
case AMQPTypes.INTEGER:
return parseInt(buffer, 4);
case 'I'.charCodeAt(0):
table[field] = parseInt(buffer, 4);
break;
case AMQPTypes.DECIMAL:
var dec = parseInt(buffer, 1);
var num = parseInt(buffer, 4);
return num / (dec * 10);
case 'D'.charCodeAt(0):
var dec = parseInt(buffer, 1);
var num = parseInt(buffer, 4);
table[field] = num / (dec * 10);
break;
case AMQPTypes._64BIT_FLOAT:
var b = [];
for (var i = 0; i < 8; ++i)
b[i] = buffer[buffer.read++];
case 'd'.charCodeAt(0):
var b = [];
for (var i = 0; i < 8; ++i)
b[i] = buffer[buffer.read++];
return (new jspack(true)).Unpack('d', b);
table[field] = (new jspack(true)).Unpack('d', b);
break;
case AMQPTypes._32BIT_FLOAT:
var b = [];
for (var i = 0; i < 4; ++i)
b[i] = buffer[buffer.read++];
case 'f'.charCodeAt(0):
var b = [];
for (var i = 0; i < 4; ++i)
b[i] = buffer[buffer.read++];
return (new jspack(true)).Unpack('f', b);
table[field] = (new jspack(true)).Unpack('f', b);
break;
case AMQPTypes.TIME:
var int = parseInt(buffer, 8);
return (new Date()).setTime(int * 1000);
case 'T'.charCodeAt(0):
var int = parseInt(buffer, 8);
table[field] = new Date();
table[field].setTime(int * 1000);
break;
case AMQPTypes.HASH:
return parseTable(buffer);
case 'F'.charCodeAt(0):
table[field] = parseTable(buffer);
break;
case AMQPTypes.SIGNED_64BIT:
return parseInt(buffer, 8);
case 'l'.charCodeAt(0):
table[field] = parseInt(buffer, 8);
break;
case AMQPTypes.BOOLEAN:
return (parseInt(buffer, 1) > 0);
case 't'.charCodeAt(0):
table[field] = (parseInt(buffer, 1) > 0);
break;
case AMQPTypes.BYTE_ARRAY:
var len = parseInt(buffer, 4);
var buf = new Buffer(len);
buffer.copy(buf, 0, buffer.read, buffer.read + len);
buffer.read += len;
return buf;
case 'x'.charCodeAt(0):
var len = parseInt(buffer, 4);
var buf = new Buffer(len);
buffer.copy(buf, 0, buffer.read, buffer.read + len);
buffer.read += len;
table[field] = buf;
break;
case AMQPTypes.ARRAY:
var len = parseInt(buffer, 4);
var end = buffer.read + len;
var arr = new Array();
default:
throw new Error("Unknown field value type " + buffer[buffer.read-1]);
}
while (buffer.read < end) {
arr.push(parseValue(buffer));
}
return arr;
default:
throw new Error("Unknown field value type " + buffer[buffer.read-1]);
}
}
function parseTable (buffer) {
var length = buffer.read + parseInt(buffer, 4);
var table = {};
while (buffer.read < length) {
table[parseShortString(buffer)] = parseValue(buffer);
}
return table;

@@ -614,2 +621,51 @@ }

function serializeValue (b, value) {
switch (typeof(value)) {
case 'string':
b[b.used++] = 'S'.charCodeAt(0);
serializeLongString(b, value);
break;
case 'number':
if (!isFloat(value)) {
if (isBigInt(value)) {
// 64-bit uint
b[b.used++] = 'l'.charCodeAt(0);
serializeInt(b, 8, value);
} else {
//32-bit uint
b[b.used++] = 'I'.charCodeAt(0);
serializeInt(b, 4, value);
}
} else {
//64-bit float
b[b.used++] = 'd'.charCodeAt(0);
serializeFloat(b, 8, value);
}
break;
case 'boolean':
b[b.used++] = 't'.charCodeAt(0);
b[b.used++] = value;
break;
default:
if (value instanceof Date) {
b[b.used++] = 'T'.charCodeAt(0);
serializeDate(b, value);
} else if (value instanceof Buffer) {
b[b.used++] = 'x'.charCodeAt(0);
serializeBuffer(b, value);
} else if (util.isArray(value)) {
b[b.used++] = 'A'.charCodeAt(0);
serializeArray(b, value);
} else if (typeof(value) === 'object') {
b[b.used++] = 'F'.charCodeAt(0);
serializeTable(b, value);
} else {
this.throwError("unsupported type in amqp table: " + typeof(value));
}
}
}
function serializeTable (b, object) {

@@ -624,3 +680,2 @@ if (typeof(object) != "object") {

b.used += 4; // sizeof long
var startIndex = b.used;

@@ -630,56 +685,25 @@

if (!object.hasOwnProperty(key)) continue;
serializeShortString(b, key);
serializeValue(b, object[key]);
}
var value = object[key];
var endIndex = b.used;
b.used = lengthIndex;
serializeInt(b, 4, endIndex - startIndex);
b.used = endIndex;
}
switch (typeof(value)) {
case 'string':
b[b.used++] = 'S'.charCodeAt(0);
serializeLongString(b, value);
break;
function serializeArray (b, arr) {
// Save our position so that we can go back and write the byte length of this array
// at the beginning of the packet (once we have serialized all elements).
var lengthIndex = b.used;
b.used += 4; // sizeof long
var startIndex = b.used;
case 'number':
if (!isFloat(value)) {
if (isBigInt(value)) {
// 64-bit uint
b[b.used++] = 'l'.charCodeAt(0);
serializeInt(b, 8, value);
} else {
//32-bit uint
b[b.used++] = 'I'.charCodeAt(0);
serializeInt(b, 4, value);
}
} else {
//64-bit float
b[b.used++] = 'd'.charCodeAt(0);
serializeFloat(b, 8, value);
}
break;
case 'boolean':
b[b.used++] = 't'.charCodeAt(0);
b[b.used++] = value;
break;
default:
if(value instanceof Date) {
b[b.used++] = 'T'.charCodeAt(0);
serializeDate(b, value);
} else if (value instanceof Buffer) {
b[b.used++] = 'x'.charCodeAt(0);
serializeBuffer(b, value);
} else {
if(typeof(value) === 'object') {
b[b.used++] = 'F'.charCodeAt(0);
serializeTable(b, value);
} else {
this.throwError("unsupported type in amqp table: " + typeof(value));
}
}
}
len = arr.length;
for (var i = 0; i < len; i++) {
serializeValue(b, arr[i]);
}
var endIndex = b.used;
b.used = lengthIndex;

@@ -690,3 +714,2 @@ serializeInt(b, 4, endIndex - startIndex);

function serializeFields (buffer, fields, args, strict) {

@@ -789,5 +812,78 @@ var bitField = 0;

var state = 'handshake';
var parser;
var backoffTime = null;
this.connectionAttemptScheduled = false;
var backoff = function () {
if (self._inboundHeartbeatTimer !== null) {
clearTimeout(self._inboundHeartbeatTimer);
self._inboundHeartbeatTimer = null;
}
if (self._outboundHeartbeatTimer !== null) {
clearTimeout(self._outboundHeartbeatTimer);
self._outboundHeartbeatTimer = null;
}
if (!self.connectionAttemptScheduled) {
// Set to true, as we are presently in the process of scheduling one.
self.connectionAttemptScheduled = true;
// Kill the socket, if it hasn't been killed already.
self.end();
// Reset parser state
parser = null;
// In order for our reconnection to be seamless, we have to notify the
// channels that they are no longer connected so that nobody attempts
// to send messages which would be doomed to fail.
for (var channel in self.channels) {
if (channel != 0) {
self.channels[channel].state = 'closed';
}
}
// Queues are channels (so we have already marked them as closed), but
// queues have special needs, since the subscriptions will no longer
// be known to the server when we reconnect. Mark the subscriptions as
// closed so that we can resubscribe them once we are reconnected.
for (var queue in self.queues) {
for (var index in self.queues[queue].consumerTagOptions) {
self.queues[queue].consumerTagOptions[index]['state'] = 'closed';
}
}
// Begin reconnection attempts
if (self.implOptions.reconnect) {
// Don't thrash, use a backoff strategy.
if (backoffTime === null) {
// This is the first time we've failed since a successful connection,
// so use the configured backoff time without any modification.
backoffTime = self.implOptions.reconnectBackoffTime;
} else if (self.implOptions.reconnectBackoffStrategy === 'exponential') {
// If you've configured exponential backoff, we'll double the
// backoff time each subsequent attempt until success.
backoffTime *= 2;
// limit the maxium timeout, to avoid potentially unlimited stalls
if(backoffTime > self.implOptions.reconnectExponentialLimit){
backoffTime = self.implOptions.reconnectExponentialLimit;
}
} else if (self.implOptions.reconnectBackoffStrategy === 'linear') {
// Linear strategy is the default. In this case, we will retry at a
// constant interval, so there's no need to change the backoff time
// between attempts.
} else {
// TODO should we warn people if they picked a nonexistent strategy?
}
setTimeout(function () {
// Set to false, so that if we fail in the reconnect attempt, we can
// schedule another one.
self.connectionAttemptScheduled = false;
self.reconnect();
}, backoffTime);
}
}
};
this._defaultExchange = null;

@@ -798,6 +894,8 @@ this.channelCounter = 0;

self.addListener('connect', function () {
// channel 0 is the control channel.
self.channels = {0:self};
self.queues = {};
self.exchanges = {};
// In the case where this is a reconnection, do not trample on the existing
// channels.
// For your reference, channel 0 is the control channel.
self.channels = (self.implOptions.reconnect ? self.channels : undefined) || {0:self};
self.queues = (self.implOptions.reconnect ? self.queues : undefined) || {};
self.exchanges = (self.implOptions.reconnect ? self.exchanges : undefined) || {};

@@ -833,7 +931,5 @@ parser = new AMQPParser('0-9-1', 'client');

parser.onError = function(e) {
self.end();
parser.onError = function (e) {
self.emit("error", e);
self.emit("close");
parser = null;
};

@@ -844,3 +940,2 @@ //debug("connected...");

self.write("AMQP" + String.fromCharCode(0,0,9,1));
state = 'handshake';
});

@@ -850,10 +945,25 @@

parser.execute(data);
self._inboundHeartbeatTimerReset();
});
self.addListener('end', function () {
self.end();
// in order to allow reconnects, have to clear the
// state.
parser = null;
self.addListener('error', function () {
backoff();
});
self.addListener('ready', function () {
// Reset the backoff time since we have successfully connected.
backoffTime = null;
if (self.implOptions.reconnect) {
// Reconnect any channels which were open.
for (var channel in self.channels) {
if (channel != 0) {
self.channels[channel].reconnect();
}
}
}
// Restart the heartbeat to the server
self._outboundHeartbeatTimerReset();
})
}

@@ -872,3 +982,16 @@ util.inherits(Connection, net.Stream);

};
var defaultImplOptions = { defaultExchangeName: '' };
// If the "reconnect" option is true, then the driver will attempt to
// reconnect using the configured strategy *any time* the connection
// becomes unavailable.
// If this is not appropriate for your application, do not set this option.
// If you would like this option, you can set parameters controlling how
// aggressively the reconnections will be attempted.
// Valid strategies are "linear" and "exponential".
// Backoff times are in milliseconds. Under the "linear" strategy, the driver
// will pause <reconnectBackoffTime> ms before the first attempt, and between
// each subsequent attempt. Under the "exponential" strategy, the driver will
// pause <reconnectBackoffTime> ms before the first attempt, and will double
// the previous pause between each subsequent attempt until a connection is
// reestablished.
var defaultImplOptions = { defaultExchangeName: '', reconnect: true , reconnectBackoffStrategy: 'linear' , reconnectExponentialLimit: 120000, reconnectBackoffTime: 1000 };

@@ -901,3 +1024,3 @@ function urlOptions(connectionString) {

// c.setImplOptions(options);
c.reconnect();
c.connect();
return c;

@@ -913,16 +1036,27 @@ };

Connection.prototype.setImplOptions = function(options) {
Connection.prototype.setImplOptions = function (options) {
var o = {}
mixin(o, defaultImplOptions, options || {});
this.implOptions = o;
}
};
Connection.prototype.reconnect = function () {
this.connect(this.options.port, this.options.host);
// Suspend activity on channels
for (var channel in this.channels) {
this.channels[channel].state = 'closed';
}
// Terminate socket activity
this.end();
this.connect();
};
Connection.prototype.connect = function () {
// Connect socket
net.Socket.prototype.connect.call(this, this.options.port, this.options.host);
};
Connection.prototype._onMethod = function (channel, method, args) {
debug(channel + " > " + method.name + " " + JSON.stringify(args));
// Channel 0 is the control channel. If not zero then deligate to
// Channel 0 is the control channel. If not zero then delegate to
// one of the channel objects.

@@ -1015,6 +1149,34 @@

Connection.prototype.heartbeat = function() {
Connection.prototype.heartbeat = function () {
this.write(new Buffer([8,0,0,0,0,0,0,206]));
};
Connection.prototype._outboundHeartbeatTimerReset = function () {
if (this._outboundHeartbeatTimer !== null) {
clearTimeout(this._outboundHeartbeatTimer);
this._outboundHeartbeatTimer = null;
}
if (this.options.heartbeat) {
var self = this;
this._outboundHeartbeatTimer = setTimeout(function () {
self.heartbeat();
self._outboundHeartbeatTimerReset();
}, 1000 * this.options.heartbeat);
}
};
Connection.prototype._inboundHeartbeatTimerReset = function () {
if (this._inboundHeartbeatTimer !== null) {
clearTimeout(this._inboundHeartbeatTimer);
this._inboundHeartbeatTimer = null;
}
if (this.options.heartbeat) {
var self = this;
var gracePeriod = 2 * this.options.heartbeat;
this._inboundHeartbeatTimer = setTimeout(function () {
self.emit('error', new Error('no heartbeat or data in last ' + gracePeriod + ' seconds'));
}, gracePeriod * 1000);
}
};
Connection.prototype._sendMethod = function (channel, method, args) {

@@ -1055,2 +1217,4 @@ debug(channel + " < " + method.name + " " + JSON.stringify(args));

this.write(c);
this._outboundHeartbeatTimerReset();
};

@@ -1314,6 +1478,6 @@

Message.prototype.reject = function (requeue){
this.queue.connection._sendMethod(this.queue.channel, methods.basicReject,
{ deliveryTag: this.deliveryTag
, requeue: requeue ? true : false
});
this.queue.connection._sendMethod(this.queue.channel, methods.basicReject,
{ deliveryTag: this.deliveryTag
, requeue: requeue ? true : false
});
}

@@ -1330,7 +1494,11 @@

this.connection._sendMethod(channel, methods.channelOpen, {reserved1: ""});
this.reconnect();
}
util.inherits(Channel, events.EventEmitter);
Channel.prototype.reconnect = function () {
this.connection._sendMethod(this.channel, methods.channelOpen, {reserved1: ""});
};
Channel.prototype._taskPush = function (reply, cb) {

@@ -1403,3 +1571,3 @@ var promise = new Promise();

this.consumerTagListeners = {};
this.consumerTagOptions = {};
var self = this;

@@ -1414,3 +1582,3 @@

this.options = { autoDelete: true };
this.options = { autoDelete: true, closeChannelOnUnsubscribe: false };
if (options) mixin(this.options, options);

@@ -1433,2 +1601,4 @@

}
options['state'] = 'opening';
this.consumerTagOptions[consumerTag] = options;

@@ -1455,2 +1625,3 @@ if (options.prefetchCount) {

});
self.consumerTagOptions[consumerTag]['state'] = 'open';
});

@@ -1468,3 +1639,7 @@ };

.addCallback(function () {
if(self.options.closeChannelOnUnsubscribe){
self.close();
}
delete self.consumerTagListeners[consumerTag];
delete self.consumerTagOptions[consumerTag];
});

@@ -1494,15 +1669,15 @@ };

// basic consume
var rawOptions = { noAck: !options.ack };
if (options.ack) {
self.connection._sendMethod(self.channel, methods.basicQos,
{ reserved1: 0
, prefetchSize: 0
, prefetchCount: options.prefetchCount
, global: false
});
rawOptions['prefetchCount'] = options.prefetchCount;
}
// basic consume
var rawOptions = { noAck: !options.ack };
return this.subscribeRaw(rawOptions, function (m) {
var isJSON = (m.contentType == 'text/json') || (m.contentType == 'application/json');
var contentType = m.contentType;
if (contentType == null && m.headers && m.headers.properties) {
contentType = m.headers.properties.content_type;
}
var isJSON = (contentType == 'text/json') || (contentType == 'application/json');

@@ -1580,3 +1755,3 @@ var b;

Queue.prototype.bind = function (/* [exchange,] routingKey */) {
Queue.prototype.bind = function (/* [exchange,] routingKey [, bindCallback] */) {
var self = this;

@@ -1588,4 +1763,13 @@

var exchange, routingKey;
var exchange, routingKey, callback;
if(typeof(arguments[arguments.length-1]) == 'function'){
callback = arguments[arguments.length-1];
}
// Remove callback from args so rest of bind functionality works as before
// Also, defend against cases where a non function callback has been passed as 3rd param
if (callback || arguments.length == 3) {
delete arguments[arguments.length-1];
arguments.length--;
}
if (arguments.length == 2) {

@@ -1598,2 +1782,3 @@ exchange = arguments[0];

}
if(callback) this._bindCallback = callback;

@@ -1686,3 +1871,2 @@

options = options || {};
return this._taskPush(methods.queueDeleteOk, function () {

@@ -1705,3 +1889,13 @@ self.connection.queueClosed(self.name);

Queue.prototype.purge = function() {
var self = this;
return this._taskPush(methods.queuePurgeOk, function () {
self.connection._sendMethod(self.channel, methods.queuePurge,
{ reserved1 : 0,
queue: self.name,
noWait: false})
});
};
Queue.prototype._onMethod = function (channel, method, args) {

@@ -1713,13 +1907,24 @@ this.emit(method.name, args);

case methods.channelOpenOk:
this.connection._sendMethod(channel, methods.queueDeclare,
{ reserved1: 0
, queue: this.name
, passive: this.options.passive ? true : false
, durable: this.options.durable ? true : false
, exclusive: this.options.exclusive ? true : false
, autoDelete: this.options.autoDelete ? true : false
, noWait: false
, "arguments": this.options.arguments || {}
});
this.state = "declare queue";
if (this.options.noDeclare) {
this.state = 'open';
if (this._openCallback) {
this._openCallback(this);
this._openCallback = null;
}
this.emit('open');
} else {
this.connection._sendMethod(channel, methods.queueDeclare,
{ reserved1: 0
, queue: this.name
, passive: this.options.passive ? true : false
, durable: this.options.durable ? true : false
, exclusive: this.options.exclusive ? true : false
, autoDelete: this.options.autoDelete ? true : false
, noWait: false
, "arguments": this.options.arguments || {}
});
this.state = "declare queue";
}
break;

@@ -1737,2 +1942,13 @@

this.emit('open', args.queue, args.messageCount, args.consumerCount);
// If this is a reconnect, we must re-subscribe our queue listeners.
var consumerTags = Object.keys(this.consumerTagListeners);
for (var index in consumerTags) {
if (this.consumerTagOptions[consumerTags[index]]['state'] === 'closed') {
this.subscribeRaw(this.consumerTagOptions[consumerTags[index]], this.consumerTagListeners[consumerTags[index]]);
// Having called subscribeRaw, we are now a new consumer with a new consumerTag.
delete this.consumerTagListeners[consumerTags[index]];
delete this.consumerTagOptions[consumerTags[index]];
}
}
break;

@@ -1745,2 +1961,8 @@

case methods.queueBindOk:
if (this._bindCallback) {
// setting this._bindCallback to null before calling the callback allows for a subsequent bind within the callback
var cb = this._bindCallback;
this._bindCallback = null;
cb(this);
}
break;

@@ -1751,2 +1973,7 @@

case methods.confirmSelectOk:
this._sequence = 1;
this.confirm = true;
break;
case methods.channelClose:

@@ -1798,2 +2025,8 @@ this.state = "closed";

Queue.prototype.flow = function(active) {
var self = this;
return this._taskPush(methods.channelFlowOk, function () {
self.connection._sendMethod(self.channel, methods.channelFlow, {'active': active });
})
};

@@ -1808,2 +2041,5 @@

this._openCallback = openCallback;
this._sequence = null;
this._unAcked = {};
}

@@ -1831,2 +2067,14 @@ util.inherits(Exchange, Channel);

// For if we want to delete a exchange,
// we dont care if all of the options match.
} else if (this.options.noDeclare){
this.state = 'open';
if (this._openCallback) {
this._openCallback(this);
this._openCallback = null;
}
this.emit('open');
} else {

@@ -1844,3 +2092,3 @@ this.connection._sendMethod(channel, methods.exchangeDeclare,

, noWait: false
, "arguments": {}
, "arguments":this.options.arguments || {}
});

@@ -1851,3 +2099,22 @@ this.state = 'declaring';

case methods.exchangeDeclareOk:
case methods.exchangeDeclareOk:
if (this.options.confirm){
this.connection._sendMethod(channel, methods.confirmSelect,
{ noWait: false });
}else{
this.state = 'open';
this.emit('open');
if (this._openCallback) {
this._openCallback(this);
this._openCallback = null;
}
}
break;
case methods.confirmSelectOk:
this._sequence = 1;
this.state = 'open';

@@ -1875,2 +2142,28 @@ this.emit('open');

case methods.basicAck:
this.emit('basic-ack', args);
if(args.deliveryTag == 0 && args.multiple == true){
// we must ack everything
for(var tag in this._unAcked){
this._unAcked[tag].emitAck()
delete this._unAcked[tag]
}
}else if(args.deliveryTag != 0 && args.multiple == true){
// we must ack everything before the delivery tag
for(var tag in this._unAcked){
if(tag <= args.deliveryTag){
this._unAcked[tag].emitAck()
delete this._unAcked[tag]
}
}
}else if(this._unAcked[args.deliveryTag] && args.multiple == false){
// simple single ack
this._unAcked[args.deliveryTag].emitAck()
delete this._unAcked[args.deliveryTag]
}
break;
case methods.basicReturn:

@@ -1907,3 +2200,6 @@ this.emit('basic-return', args);

// - clusterId
Exchange.prototype.publish = function (routingKey, data, options) {
//
// the callback is optional and is only used when confirm is turned on for the exchange
Exchange.prototype.publish = function (routingKey, data, options, callback) {
var self = this;

@@ -1918,3 +2214,3 @@

return this._taskPush(null, function () {
var task = this._taskPush(null, function () {
self.connection._sendMethod(self.channel, methods.basicPublish, options);

@@ -1930,2 +2226,15 @@ // This interface is probably not appropriate for streaming large files.

});
if (self.options.confirm){
task.sequence = self._sequence
self._unAcked[self._sequence] = task
self._sequence++
if(callback != null){
task.once('ack', function(){task.removeAllListeners();callback(false)});
this.once('error', function(){task.removeAllListeners();callback(true)});
}
}
return task
};

@@ -1935,4 +2244,4 @@

Exchange.prototype.cleanup = function() {
if (this.binds == 0) // don't keep reference open if unused
this.connection.exchangeClosed(this.name);
if (this.binds == 0) // don't keep reference open if unused
this.connection.exchangeClosed(this.name);
};

@@ -1939,0 +2248,0 @@

{ "name" : "amqp"
, "description" : "AMQP driver for node"
, "keywords" : [ "amqp" ]
, "version" : "0.1.3"
, "version" : "0.1.4"
, "preferGlobal" : true

@@ -29,3 +29,3 @@ , "author" : { "name" : "Ryan Dahl" }

, "main" : "./amqp"
, "engines" : { "node" : "0.4 || 0.5 || 0.6" }
, "engines" : { "node" : "0.4 || 0.5 || 0.6 || 0.8" }
, "licenses" :

@@ -32,0 +32,0 @@ [ { "type" : "MIT"

@@ -8,2 +8,3 @@ var events = require('events');

this.hasFired = false;
this.hasAcked = false;
this._values = undefined;

@@ -53,2 +54,10 @@ };

exports.Promise.prototype.emitAck = function() {
if (this.hasAcked) return;
this.hasAcked = 'true';
this._values = Array.prototype.slice.call(arguments);
this.emit.apply(this, ['ack'].concat(this._values));
};
exports.Promise.prototype.emitError = function() {

@@ -55,0 +64,0 @@ if (this.hasFired) return;

@@ -23,3 +23,2 @@ # node-amqp

connection.on('ready', function () {
// Create a queue and bind to all messages.
// Use the default 'amq.topic' exchange

@@ -167,2 +166,8 @@ connection.queue('my-queue', function(q){

won't be deleted.
- `noDeclare`: boolean, default false.
If set, the queue will not be declared, this will allow a queue to be
deleted if you dont know its previous options.
- `arguments`: a map of additional arguments to pass in when creating a queue.
- `closeChannelOnUnsubscribe` : a boolean when true the channel will close on
unsubscrube, default false.

@@ -249,4 +254,14 @@ ### queue.subscribe([options,] listener)

This method will emit `'queueBindOk'` when ready.
This method will emit `'queueBindOk'` when complete.
### queue.unbind([exchange,] routing)
This method unbinds a queue from an exchange.
If the exchange argument is left out `'amq.topic'` will be used.
Ths method will emit `'queueUnbindOk'` when complete.
### queue.bind_headers([exchange,] routing)

@@ -310,5 +325,15 @@

server restarts.
- `comfirm`: boolean, default false.
If set when connecting to a exchange the channel will send acks
for publishes. Published tasks will emit 'ack' when it is acked.
- `autoDelete`: boolean, default true.
If set, the exchange is deleted when there are no longer queues
bound to it.
- `noDeclare`: boolean, default false.
If set, the exchange will not be declared, this will allow the exchange
to be deleted if you dont know its previous options.
- `confirm`: boolean, default false.
If set, the exchange will be in confirm mode, and you will get a
'ack'|'error' event emitted on a publish, or the callback on the publish
will be called.

@@ -318,3 +343,3 @@ An exchange will emit the `'open'` event when it is finally declared.

### exchange.publish(routingKey, message, options)
### exchange.publish(routingKey, message, options, callback)

@@ -346,2 +371,6 @@ Publishes a message to the exchange. The `routingKey` argument is a string

`callback` is a function that will get called if the exchange is in confirm mode,
the value sent will be true or false, this is the presense of a error so true, means
an error occured and false, means the publish was successfull
### exchange.destroy(ifUnused = true)

@@ -348,0 +377,0 @@

@@ -19,3 +19,4 @@ require('./harness');

bar: 'foo',
number: '123'
number: '123',
stuff: [{x:1}, {x:2}]
} });

@@ -36,2 +37,4 @@

assert.equal('123', m.headers['number'].toString());
assert.equal(1, m.headers['stuff'][0].x);
assert.equal(2, m.headers['stuff'][1].x);
})

@@ -38,0 +41,0 @@ })

@@ -5,14 +5,11 @@ global.options = { heartbeat: 1 };

connects = 0;
var closed = 0;
var closed = false;
var hb = setInterval(function() {
puts(" -> heartbeat");
connection.heartbeat();
}, 1000);
setTimeout(function() {
assert.ok(!closed);
clearInterval(hb);
setTimeout(function() { assert.ok(closed); }, 3000);
// Change the local heartbeat interval (without changing the negotiated
// interval). This will cause the server to notice we've dropped off,
// and close the connection.
connection.options['heartbeat'] = 0;
setTimeout(function() { assert.ok(closed); }, 3500);
}, 5000);

@@ -25,6 +22,5 @@

puts("closed");
closed = 1;
closed = true;
});
connection.addListener('ready', function () {
connects++;
puts("connected to " + connection.serverProperties.product);

@@ -31,0 +27,0 @@

@@ -35,2 +35,4 @@ require('./harness');

// wait one second to receive the message, then quit
// make sure to delete our queue on our way out.
q.destroy();
connection.end();

@@ -37,0 +39,0 @@ }, 1000);

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc