Comparing version 0.1.3 to 0.1.4
exports.constants = [[1,"frameMethod"],[2,"frameHeader"],[3,"frameBody"],[8,"frameHeartbeat"],[200,"replySuccess"],[206,"frameEnd"],[311,"contentTooLarge"],[313,"noConsumers"],[320,"connectionForced"],[402,"invalidPath"],[403,"accessRefused"],[404,"notFound"],[405,"resourceLocked"],[406,"preconditionFailed"],[501,"frameError"],[502,"syntaxError"],[503,"commandInvalid"],[504,"channelError"],[505,"unexpectedFrame"],[506,"resourceError"],[530,"notAllowed"],[540,"notImplemented"],[541,"internalError"],[4096,"frameMinSize"]]; | ||
exports.classes = [{"name":"connection","index":10,"fields":[],"methods":[{"name":"start","index":10,"fields":[{"name":"versionMajor","domain":"octet"},{"name":"versionMinor","domain":"octet"},{"name":"serverProperties","domain":"table"},{"name":"mechanisms","domain":"longstr"},{"name":"locales","domain":"longstr"}]},{"name":"startOk","index":11,"fields":[{"name":"clientProperties","domain":"table"},{"name":"mechanism","domain":"shortstr"},{"name":"response","domain":"longstr"},{"name":"locale","domain":"shortstr"}]},{"name":"secure","index":20,"fields":[{"name":"challenge","domain":"longstr"}]},{"name":"secureOk","index":21,"fields":[{"name":"response","domain":"longstr"}]},{"name":"tune","index":30,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"tuneOk","index":31,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"open","index":40,"fields":[{"name":"virtualHost","domain":"shortstr"},{"name":"reserved1","domain":"shortstr"},{"name":"reserved2","domain":"bit"}]},{"name":"openOk","index":41,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"close","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":51,"fields":[]}]},{"name":"channel","index":20,"fields":[],"methods":[{"name":"open","index":10,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"openOk","index":11,"fields":[{"name":"reserved1","domain":"longstr"}]},{"name":"flow","index":20,"fields":[{"name":"active","domain":"bit"}]},{"name":"flowOk","index":21,"fields":[{"name":"active","domain":"bit"}]},{"name":"close","index":40,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":41,"fields":[]}]},{"name":"exchange","index":40,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"type","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"reserved2","domain":"bit"},{"name":"reserved3","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[]},{"name":"delete","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":21,"fields":[]}]},{"name":"queue","index":50,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[{"name":"queue","domain":"shortstr"},{"name":"messageCount","domain":"long"},{"name":"consumerCount","domain":"long"}]},{"name":"bind","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"bindOk","index":21,"fields":[]},{"name":"unbind","index":50,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"arguments","domain":"table"}]},{"name":"unbindOk","index":51,"fields":[]},{"name":"purge","index":30,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"purgeOk","index":31,"fields":[{"name":"messageCount","domain":"long"}]},{"name":"delete","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"ifEmpty","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":41,"fields":[{"name":"messageCount","domain":"long"}]}]},{"name":"basic","index":60,"fields":[{"name":"contentType","domain":"shortstr"},{"name":"contentEncoding","domain":"shortstr"},{"name":"headers","domain":"table"},{"name":"deliveryMode","domain":"octet"},{"name":"priority","domain":"octet"},{"name":"correlationId","domain":"shortstr"},{"name":"replyTo","domain":"shortstr"},{"name":"expiration","domain":"shortstr"},{"name":"messageId","domain":"shortstr"},{"name":"timestamp","domain":"timestamp"},{"name":"type","domain":"shortstr"},{"name":"userId","domain":"shortstr"},{"name":"appId","domain":"shortstr"},{"name":"reserved","domain":"shortstr"}],"methods":[{"name":"qos","index":10,"fields":[{"name":"prefetchSize","domain":"long"},{"name":"prefetchCount","domain":"short"},{"name":"global","domain":"bit"}]},{"name":"qosOk","index":11,"fields":[]},{"name":"consume","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"consumerTag","domain":"shortstr"},{"name":"noLocal","domain":"bit"},{"name":"noAck","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"consumeOk","index":21,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"cancel","index":30,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"cancelOk","index":31,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"publish","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"mandatory","domain":"bit"},{"name":"immediate","domain":"bit"}]},{"name":"return","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"deliver","index":60,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"get","index":70,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noAck","domain":"bit"}]},{"name":"getOk","index":71,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"messageCount","domain":"long"}]},{"name":"getEmpty","index":72,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"ack","index":80,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"multiple","domain":"bit"}]},{"name":"reject","index":90,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"requeue","domain":"bit"}]},{"name":"recoverAsync","index":100,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recover","index":110,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recoverOk","index":111,"fields":[]}]},{"name":"tx","index":90,"fields":[],"methods":[{"name":"select","index":10,"fields":[]},{"name":"selectOk","index":11,"fields":[]},{"name":"commit","index":20,"fields":[]},{"name":"commitOk","index":21,"fields":[]},{"name":"rollback","index":30,"fields":[]},{"name":"rollbackOk","index":31,"fields":[]}]}]; | ||
exports.classes = [{"name":"connection","index":10,"fields":[],"methods":[{"name":"start","index":10,"fields":[{"name":"versionMajor","domain":"octet"},{"name":"versionMinor","domain":"octet"},{"name":"serverProperties","domain":"table"},{"name":"mechanisms","domain":"longstr"},{"name":"locales","domain":"longstr"}]},{"name":"startOk","index":11,"fields":[{"name":"clientProperties","domain":"table"},{"name":"mechanism","domain":"shortstr"},{"name":"response","domain":"longstr"},{"name":"locale","domain":"shortstr"}]},{"name":"secure","index":20,"fields":[{"name":"challenge","domain":"longstr"}]},{"name":"secureOk","index":21,"fields":[{"name":"response","domain":"longstr"}]},{"name":"tune","index":30,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"tuneOk","index":31,"fields":[{"name":"channelMax","domain":"short"},{"name":"frameMax","domain":"long"},{"name":"heartbeat","domain":"short"}]},{"name":"open","index":40,"fields":[{"name":"virtualHost","domain":"shortstr"},{"name":"reserved1","domain":"shortstr"},{"name":"reserved2","domain":"bit"}]},{"name":"openOk","index":41,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"close","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":51,"fields":[]}]},{"name":"channel","index":20,"fields":[],"methods":[{"name":"open","index":10,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"openOk","index":11,"fields":[{"name":"reserved1","domain":"longstr"}]},{"name":"flow","index":20,"fields":[{"name":"active","domain":"bit"}]},{"name":"flowOk","index":21,"fields":[{"name":"active","domain":"bit"}]},{"name":"close","index":40,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"classId","domain":"short"},{"name":"methodId","domain":"short"}]},{"name":"closeOk","index":41,"fields":[]}]},{"name":"exchange","index":40,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"type","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"reserved2","domain":"bit"},{"name":"reserved3","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[]},{"name":"delete","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":21,"fields":[]}]},{"name":"queue","index":50,"fields":[],"methods":[{"name":"declare","index":10,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"passive","domain":"bit"},{"name":"durable","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"autoDelete","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"declareOk","index":11,"fields":[{"name":"queue","domain":"shortstr"},{"name":"messageCount","domain":"long"},{"name":"consumerCount","domain":"long"}]},{"name":"bind","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"bindOk","index":21,"fields":[]},{"name":"unbind","index":50,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"arguments","domain":"table"}]},{"name":"unbindOk","index":51,"fields":[]},{"name":"purge","index":30,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"purgeOk","index":31,"fields":[{"name":"messageCount","domain":"long"}]},{"name":"delete","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"ifUnused","domain":"bit"},{"name":"ifEmpty","domain":"bit"},{"name":"noWait","domain":"bit"}]},{"name":"deleteOk","index":41,"fields":[{"name":"messageCount","domain":"long"}]}]},{"name":"basic","index":60,"fields":[{"name":"contentType","domain":"shortstr"},{"name":"contentEncoding","domain":"shortstr"},{"name":"headers","domain":"table"},{"name":"deliveryMode","domain":"octet"},{"name":"priority","domain":"octet"},{"name":"correlationId","domain":"shortstr"},{"name":"replyTo","domain":"shortstr"},{"name":"expiration","domain":"shortstr"},{"name":"messageId","domain":"shortstr"},{"name":"timestamp","domain":"timestamp"},{"name":"type","domain":"shortstr"},{"name":"userId","domain":"shortstr"},{"name":"appId","domain":"shortstr"},{"name":"reserved","domain":"shortstr"}],"methods":[{"name":"qos","index":10,"fields":[{"name":"prefetchSize","domain":"long"},{"name":"prefetchCount","domain":"short"},{"name":"global","domain":"bit"}]},{"name":"qosOk","index":11,"fields":[]},{"name":"consume","index":20,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"consumerTag","domain":"shortstr"},{"name":"noLocal","domain":"bit"},{"name":"noAck","domain":"bit"},{"name":"exclusive","domain":"bit"},{"name":"noWait","domain":"bit"},{"name":"arguments","domain":"table"}]},{"name":"consumeOk","index":21,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"cancel","index":30,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"noWait","domain":"bit"}]},{"name":"cancelOk","index":31,"fields":[{"name":"consumerTag","domain":"shortstr"}]},{"name":"publish","index":40,"fields":[{"name":"reserved1","domain":"short"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"mandatory","domain":"bit"},{"name":"immediate","domain":"bit"}]},{"name":"return","index":50,"fields":[{"name":"replyCode","domain":"short"},{"name":"replyText","domain":"shortstr"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"deliver","index":60,"fields":[{"name":"consumerTag","domain":"shortstr"},{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"}]},{"name":"get","index":70,"fields":[{"name":"reserved1","domain":"short"},{"name":"queue","domain":"shortstr"},{"name":"noAck","domain":"bit"}]},{"name":"getOk","index":71,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"redelivered","domain":"bit"},{"name":"exchange","domain":"shortstr"},{"name":"routingKey","domain":"shortstr"},{"name":"messageCount","domain":"long"}]},{"name":"getEmpty","index":72,"fields":[{"name":"reserved1","domain":"shortstr"}]},{"name":"ack","index":80,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"multiple","domain":"bit"}]},{"name":"reject","index":90,"fields":[{"name":"deliveryTag","domain":"longlong"},{"name":"requeue","domain":"bit"}]},{"name":"recoverAsync","index":100,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recover","index":110,"fields":[{"name":"requeue","domain":"bit"}]},{"name":"recoverOk","index":111,"fields":[]}]},{"name":"tx","index":90,"fields":[],"methods":[{"name":"select","index":10,"fields":[]},{"name":"selectOk","index":11,"fields":[]},{"name":"commit","index":20,"fields":[]},{"name":"commitOk","index":21,"fields":[]},{"name":"rollback","index":30,"fields":[]},{"name":"rollbackOk","index":31,"fields":[]}]},{"name":"confirm","index":85,"fields":[],"methods":[{"name":"select","index":10,"fields":[{"name":"noWait","domain":"bit"}]},{"name":"selectOk","index":11,"fields":[]}]}]; |
647
amqp.js
@@ -8,4 +8,7 @@ var events = require('events'), | ||
Promise = require('./promise').Promise, | ||
URL = require('url'); | ||
URL = require('url'), | ||
AMQPTypes = require('./constants').AMQPTypes, | ||
Indicators = require('./constants').Indicators, | ||
FrameType = require('./constants').FrameType; | ||
function mixin () { | ||
@@ -185,11 +188,11 @@ // copy reference to target object | ||
if (data.length > 0) { | ||
if (data[0] === 206) { | ||
if (data[0] === Indicators.FRAME_END) { | ||
switch (frameType) { | ||
case 1: | ||
case FrameType.METHOD: | ||
self._parseMethodFrame(frameChannel, frameBuffer); | ||
break; | ||
case 2: | ||
case FrameType.HEADER: | ||
self._parseHeaderFrame(frameChannel, frameBuffer); | ||
break; | ||
case 3: | ||
case FrameType.BODY: | ||
if (self.onContent) { | ||
@@ -199,4 +202,4 @@ self.onContent(frameChannel, frameBuffer); | ||
break; | ||
case 8: | ||
debug("hearbeat"); | ||
case FrameType.HEARTBEAT: | ||
debug("heartbeat"); | ||
if (self.onHeartBeat) self.onHeartBeat(); | ||
@@ -289,69 +292,73 @@ break; | ||
function parseValue (buffer) { | ||
switch (buffer[buffer.read++]) { | ||
case AMQPTypes.STRING: | ||
return parseLongString(buffer); | ||
function parseTable (buffer) { | ||
var length = buffer.read + parseInt(buffer, 4); | ||
var table = {}; | ||
while (buffer.read < length) { | ||
var field = parseShortString(buffer); | ||
switch (buffer[buffer.read++]) { | ||
case 'S'.charCodeAt(0): | ||
table[field] = parseLongString(buffer); | ||
break; | ||
case AMQPTypes.INTEGER: | ||
return parseInt(buffer, 4); | ||
case 'I'.charCodeAt(0): | ||
table[field] = parseInt(buffer, 4); | ||
break; | ||
case AMQPTypes.DECIMAL: | ||
var dec = parseInt(buffer, 1); | ||
var num = parseInt(buffer, 4); | ||
return num / (dec * 10); | ||
case 'D'.charCodeAt(0): | ||
var dec = parseInt(buffer, 1); | ||
var num = parseInt(buffer, 4); | ||
table[field] = num / (dec * 10); | ||
break; | ||
case AMQPTypes._64BIT_FLOAT: | ||
var b = []; | ||
for (var i = 0; i < 8; ++i) | ||
b[i] = buffer[buffer.read++]; | ||
case 'd'.charCodeAt(0): | ||
var b = []; | ||
for (var i = 0; i < 8; ++i) | ||
b[i] = buffer[buffer.read++]; | ||
return (new jspack(true)).Unpack('d', b); | ||
table[field] = (new jspack(true)).Unpack('d', b); | ||
break; | ||
case AMQPTypes._32BIT_FLOAT: | ||
var b = []; | ||
for (var i = 0; i < 4; ++i) | ||
b[i] = buffer[buffer.read++]; | ||
case 'f'.charCodeAt(0): | ||
var b = []; | ||
for (var i = 0; i < 4; ++i) | ||
b[i] = buffer[buffer.read++]; | ||
return (new jspack(true)).Unpack('f', b); | ||
table[field] = (new jspack(true)).Unpack('f', b); | ||
break; | ||
case AMQPTypes.TIME: | ||
var int = parseInt(buffer, 8); | ||
return (new Date()).setTime(int * 1000); | ||
case 'T'.charCodeAt(0): | ||
var int = parseInt(buffer, 8); | ||
table[field] = new Date(); | ||
table[field].setTime(int * 1000); | ||
break; | ||
case AMQPTypes.HASH: | ||
return parseTable(buffer); | ||
case 'F'.charCodeAt(0): | ||
table[field] = parseTable(buffer); | ||
break; | ||
case AMQPTypes.SIGNED_64BIT: | ||
return parseInt(buffer, 8); | ||
case 'l'.charCodeAt(0): | ||
table[field] = parseInt(buffer, 8); | ||
break; | ||
case AMQPTypes.BOOLEAN: | ||
return (parseInt(buffer, 1) > 0); | ||
case 't'.charCodeAt(0): | ||
table[field] = (parseInt(buffer, 1) > 0); | ||
break; | ||
case AMQPTypes.BYTE_ARRAY: | ||
var len = parseInt(buffer, 4); | ||
var buf = new Buffer(len); | ||
buffer.copy(buf, 0, buffer.read, buffer.read + len); | ||
buffer.read += len; | ||
return buf; | ||
case 'x'.charCodeAt(0): | ||
var len = parseInt(buffer, 4); | ||
var buf = new Buffer(len); | ||
buffer.copy(buf, 0, buffer.read, buffer.read + len); | ||
buffer.read += len; | ||
table[field] = buf; | ||
break; | ||
case AMQPTypes.ARRAY: | ||
var len = parseInt(buffer, 4); | ||
var end = buffer.read + len; | ||
var arr = new Array(); | ||
default: | ||
throw new Error("Unknown field value type " + buffer[buffer.read-1]); | ||
} | ||
while (buffer.read < end) { | ||
arr.push(parseValue(buffer)); | ||
} | ||
return arr; | ||
default: | ||
throw new Error("Unknown field value type " + buffer[buffer.read-1]); | ||
} | ||
} | ||
function parseTable (buffer) { | ||
var length = buffer.read + parseInt(buffer, 4); | ||
var table = {}; | ||
while (buffer.read < length) { | ||
table[parseShortString(buffer)] = parseValue(buffer); | ||
} | ||
return table; | ||
@@ -614,2 +621,51 @@ } | ||
function serializeValue (b, value) { | ||
switch (typeof(value)) { | ||
case 'string': | ||
b[b.used++] = 'S'.charCodeAt(0); | ||
serializeLongString(b, value); | ||
break; | ||
case 'number': | ||
if (!isFloat(value)) { | ||
if (isBigInt(value)) { | ||
// 64-bit uint | ||
b[b.used++] = 'l'.charCodeAt(0); | ||
serializeInt(b, 8, value); | ||
} else { | ||
//32-bit uint | ||
b[b.used++] = 'I'.charCodeAt(0); | ||
serializeInt(b, 4, value); | ||
} | ||
} else { | ||
//64-bit float | ||
b[b.used++] = 'd'.charCodeAt(0); | ||
serializeFloat(b, 8, value); | ||
} | ||
break; | ||
case 'boolean': | ||
b[b.used++] = 't'.charCodeAt(0); | ||
b[b.used++] = value; | ||
break; | ||
default: | ||
if (value instanceof Date) { | ||
b[b.used++] = 'T'.charCodeAt(0); | ||
serializeDate(b, value); | ||
} else if (value instanceof Buffer) { | ||
b[b.used++] = 'x'.charCodeAt(0); | ||
serializeBuffer(b, value); | ||
} else if (util.isArray(value)) { | ||
b[b.used++] = 'A'.charCodeAt(0); | ||
serializeArray(b, value); | ||
} else if (typeof(value) === 'object') { | ||
b[b.used++] = 'F'.charCodeAt(0); | ||
serializeTable(b, value); | ||
} else { | ||
this.throwError("unsupported type in amqp table: " + typeof(value)); | ||
} | ||
} | ||
} | ||
function serializeTable (b, object) { | ||
@@ -624,3 +680,2 @@ if (typeof(object) != "object") { | ||
b.used += 4; // sizeof long | ||
var startIndex = b.used; | ||
@@ -630,56 +685,25 @@ | ||
if (!object.hasOwnProperty(key)) continue; | ||
serializeShortString(b, key); | ||
serializeValue(b, object[key]); | ||
} | ||
var value = object[key]; | ||
var endIndex = b.used; | ||
b.used = lengthIndex; | ||
serializeInt(b, 4, endIndex - startIndex); | ||
b.used = endIndex; | ||
} | ||
switch (typeof(value)) { | ||
case 'string': | ||
b[b.used++] = 'S'.charCodeAt(0); | ||
serializeLongString(b, value); | ||
break; | ||
function serializeArray (b, arr) { | ||
// Save our position so that we can go back and write the byte length of this array | ||
// at the beginning of the packet (once we have serialized all elements). | ||
var lengthIndex = b.used; | ||
b.used += 4; // sizeof long | ||
var startIndex = b.used; | ||
case 'number': | ||
if (!isFloat(value)) { | ||
if (isBigInt(value)) { | ||
// 64-bit uint | ||
b[b.used++] = 'l'.charCodeAt(0); | ||
serializeInt(b, 8, value); | ||
} else { | ||
//32-bit uint | ||
b[b.used++] = 'I'.charCodeAt(0); | ||
serializeInt(b, 4, value); | ||
} | ||
} else { | ||
//64-bit float | ||
b[b.used++] = 'd'.charCodeAt(0); | ||
serializeFloat(b, 8, value); | ||
} | ||
break; | ||
case 'boolean': | ||
b[b.used++] = 't'.charCodeAt(0); | ||
b[b.used++] = value; | ||
break; | ||
default: | ||
if(value instanceof Date) { | ||
b[b.used++] = 'T'.charCodeAt(0); | ||
serializeDate(b, value); | ||
} else if (value instanceof Buffer) { | ||
b[b.used++] = 'x'.charCodeAt(0); | ||
serializeBuffer(b, value); | ||
} else { | ||
if(typeof(value) === 'object') { | ||
b[b.used++] = 'F'.charCodeAt(0); | ||
serializeTable(b, value); | ||
} else { | ||
this.throwError("unsupported type in amqp table: " + typeof(value)); | ||
} | ||
} | ||
} | ||
len = arr.length; | ||
for (var i = 0; i < len; i++) { | ||
serializeValue(b, arr[i]); | ||
} | ||
var endIndex = b.used; | ||
b.used = lengthIndex; | ||
@@ -690,3 +714,2 @@ serializeInt(b, 4, endIndex - startIndex); | ||
function serializeFields (buffer, fields, args, strict) { | ||
@@ -789,5 +812,78 @@ var bitField = 0; | ||
var state = 'handshake'; | ||
var parser; | ||
var backoffTime = null; | ||
this.connectionAttemptScheduled = false; | ||
var backoff = function () { | ||
if (self._inboundHeartbeatTimer !== null) { | ||
clearTimeout(self._inboundHeartbeatTimer); | ||
self._inboundHeartbeatTimer = null; | ||
} | ||
if (self._outboundHeartbeatTimer !== null) { | ||
clearTimeout(self._outboundHeartbeatTimer); | ||
self._outboundHeartbeatTimer = null; | ||
} | ||
if (!self.connectionAttemptScheduled) { | ||
// Set to true, as we are presently in the process of scheduling one. | ||
self.connectionAttemptScheduled = true; | ||
// Kill the socket, if it hasn't been killed already. | ||
self.end(); | ||
// Reset parser state | ||
parser = null; | ||
// In order for our reconnection to be seamless, we have to notify the | ||
// channels that they are no longer connected so that nobody attempts | ||
// to send messages which would be doomed to fail. | ||
for (var channel in self.channels) { | ||
if (channel != 0) { | ||
self.channels[channel].state = 'closed'; | ||
} | ||
} | ||
// Queues are channels (so we have already marked them as closed), but | ||
// queues have special needs, since the subscriptions will no longer | ||
// be known to the server when we reconnect. Mark the subscriptions as | ||
// closed so that we can resubscribe them once we are reconnected. | ||
for (var queue in self.queues) { | ||
for (var index in self.queues[queue].consumerTagOptions) { | ||
self.queues[queue].consumerTagOptions[index]['state'] = 'closed'; | ||
} | ||
} | ||
// Begin reconnection attempts | ||
if (self.implOptions.reconnect) { | ||
// Don't thrash, use a backoff strategy. | ||
if (backoffTime === null) { | ||
// This is the first time we've failed since a successful connection, | ||
// so use the configured backoff time without any modification. | ||
backoffTime = self.implOptions.reconnectBackoffTime; | ||
} else if (self.implOptions.reconnectBackoffStrategy === 'exponential') { | ||
// If you've configured exponential backoff, we'll double the | ||
// backoff time each subsequent attempt until success. | ||
backoffTime *= 2; | ||
// limit the maxium timeout, to avoid potentially unlimited stalls | ||
if(backoffTime > self.implOptions.reconnectExponentialLimit){ | ||
backoffTime = self.implOptions.reconnectExponentialLimit; | ||
} | ||
} else if (self.implOptions.reconnectBackoffStrategy === 'linear') { | ||
// Linear strategy is the default. In this case, we will retry at a | ||
// constant interval, so there's no need to change the backoff time | ||
// between attempts. | ||
} else { | ||
// TODO should we warn people if they picked a nonexistent strategy? | ||
} | ||
setTimeout(function () { | ||
// Set to false, so that if we fail in the reconnect attempt, we can | ||
// schedule another one. | ||
self.connectionAttemptScheduled = false; | ||
self.reconnect(); | ||
}, backoffTime); | ||
} | ||
} | ||
}; | ||
this._defaultExchange = null; | ||
@@ -798,6 +894,8 @@ this.channelCounter = 0; | ||
self.addListener('connect', function () { | ||
// channel 0 is the control channel. | ||
self.channels = {0:self}; | ||
self.queues = {}; | ||
self.exchanges = {}; | ||
// In the case where this is a reconnection, do not trample on the existing | ||
// channels. | ||
// For your reference, channel 0 is the control channel. | ||
self.channels = (self.implOptions.reconnect ? self.channels : undefined) || {0:self}; | ||
self.queues = (self.implOptions.reconnect ? self.queues : undefined) || {}; | ||
self.exchanges = (self.implOptions.reconnect ? self.exchanges : undefined) || {}; | ||
@@ -833,7 +931,5 @@ parser = new AMQPParser('0-9-1', 'client'); | ||
parser.onError = function(e) { | ||
self.end(); | ||
parser.onError = function (e) { | ||
self.emit("error", e); | ||
self.emit("close"); | ||
parser = null; | ||
}; | ||
@@ -844,3 +940,2 @@ //debug("connected..."); | ||
self.write("AMQP" + String.fromCharCode(0,0,9,1)); | ||
state = 'handshake'; | ||
}); | ||
@@ -850,10 +945,25 @@ | ||
parser.execute(data); | ||
self._inboundHeartbeatTimerReset(); | ||
}); | ||
self.addListener('end', function () { | ||
self.end(); | ||
// in order to allow reconnects, have to clear the | ||
// state. | ||
parser = null; | ||
self.addListener('error', function () { | ||
backoff(); | ||
}); | ||
self.addListener('ready', function () { | ||
// Reset the backoff time since we have successfully connected. | ||
backoffTime = null; | ||
if (self.implOptions.reconnect) { | ||
// Reconnect any channels which were open. | ||
for (var channel in self.channels) { | ||
if (channel != 0) { | ||
self.channels[channel].reconnect(); | ||
} | ||
} | ||
} | ||
// Restart the heartbeat to the server | ||
self._outboundHeartbeatTimerReset(); | ||
}) | ||
} | ||
@@ -872,3 +982,16 @@ util.inherits(Connection, net.Stream); | ||
}; | ||
var defaultImplOptions = { defaultExchangeName: '' }; | ||
// If the "reconnect" option is true, then the driver will attempt to | ||
// reconnect using the configured strategy *any time* the connection | ||
// becomes unavailable. | ||
// If this is not appropriate for your application, do not set this option. | ||
// If you would like this option, you can set parameters controlling how | ||
// aggressively the reconnections will be attempted. | ||
// Valid strategies are "linear" and "exponential". | ||
// Backoff times are in milliseconds. Under the "linear" strategy, the driver | ||
// will pause <reconnectBackoffTime> ms before the first attempt, and between | ||
// each subsequent attempt. Under the "exponential" strategy, the driver will | ||
// pause <reconnectBackoffTime> ms before the first attempt, and will double | ||
// the previous pause between each subsequent attempt until a connection is | ||
// reestablished. | ||
var defaultImplOptions = { defaultExchangeName: '', reconnect: true , reconnectBackoffStrategy: 'linear' , reconnectExponentialLimit: 120000, reconnectBackoffTime: 1000 }; | ||
@@ -901,3 +1024,3 @@ function urlOptions(connectionString) { | ||
// c.setImplOptions(options); | ||
c.reconnect(); | ||
c.connect(); | ||
return c; | ||
@@ -913,16 +1036,27 @@ }; | ||
Connection.prototype.setImplOptions = function(options) { | ||
Connection.prototype.setImplOptions = function (options) { | ||
var o = {} | ||
mixin(o, defaultImplOptions, options || {}); | ||
this.implOptions = o; | ||
} | ||
}; | ||
Connection.prototype.reconnect = function () { | ||
this.connect(this.options.port, this.options.host); | ||
// Suspend activity on channels | ||
for (var channel in this.channels) { | ||
this.channels[channel].state = 'closed'; | ||
} | ||
// Terminate socket activity | ||
this.end(); | ||
this.connect(); | ||
}; | ||
Connection.prototype.connect = function () { | ||
// Connect socket | ||
net.Socket.prototype.connect.call(this, this.options.port, this.options.host); | ||
}; | ||
Connection.prototype._onMethod = function (channel, method, args) { | ||
debug(channel + " > " + method.name + " " + JSON.stringify(args)); | ||
// Channel 0 is the control channel. If not zero then deligate to | ||
// Channel 0 is the control channel. If not zero then delegate to | ||
// one of the channel objects. | ||
@@ -1015,6 +1149,34 @@ | ||
Connection.prototype.heartbeat = function() { | ||
Connection.prototype.heartbeat = function () { | ||
this.write(new Buffer([8,0,0,0,0,0,0,206])); | ||
}; | ||
Connection.prototype._outboundHeartbeatTimerReset = function () { | ||
if (this._outboundHeartbeatTimer !== null) { | ||
clearTimeout(this._outboundHeartbeatTimer); | ||
this._outboundHeartbeatTimer = null; | ||
} | ||
if (this.options.heartbeat) { | ||
var self = this; | ||
this._outboundHeartbeatTimer = setTimeout(function () { | ||
self.heartbeat(); | ||
self._outboundHeartbeatTimerReset(); | ||
}, 1000 * this.options.heartbeat); | ||
} | ||
}; | ||
Connection.prototype._inboundHeartbeatTimerReset = function () { | ||
if (this._inboundHeartbeatTimer !== null) { | ||
clearTimeout(this._inboundHeartbeatTimer); | ||
this._inboundHeartbeatTimer = null; | ||
} | ||
if (this.options.heartbeat) { | ||
var self = this; | ||
var gracePeriod = 2 * this.options.heartbeat; | ||
this._inboundHeartbeatTimer = setTimeout(function () { | ||
self.emit('error', new Error('no heartbeat or data in last ' + gracePeriod + ' seconds')); | ||
}, gracePeriod * 1000); | ||
} | ||
}; | ||
Connection.prototype._sendMethod = function (channel, method, args) { | ||
@@ -1055,2 +1217,4 @@ debug(channel + " < " + method.name + " " + JSON.stringify(args)); | ||
this.write(c); | ||
this._outboundHeartbeatTimerReset(); | ||
}; | ||
@@ -1314,6 +1478,6 @@ | ||
Message.prototype.reject = function (requeue){ | ||
this.queue.connection._sendMethod(this.queue.channel, methods.basicReject, | ||
{ deliveryTag: this.deliveryTag | ||
, requeue: requeue ? true : false | ||
}); | ||
this.queue.connection._sendMethod(this.queue.channel, methods.basicReject, | ||
{ deliveryTag: this.deliveryTag | ||
, requeue: requeue ? true : false | ||
}); | ||
} | ||
@@ -1330,7 +1494,11 @@ | ||
this.connection._sendMethod(channel, methods.channelOpen, {reserved1: ""}); | ||
this.reconnect(); | ||
} | ||
util.inherits(Channel, events.EventEmitter); | ||
Channel.prototype.reconnect = function () { | ||
this.connection._sendMethod(this.channel, methods.channelOpen, {reserved1: ""}); | ||
}; | ||
Channel.prototype._taskPush = function (reply, cb) { | ||
@@ -1403,3 +1571,3 @@ var promise = new Promise(); | ||
this.consumerTagListeners = {}; | ||
this.consumerTagOptions = {}; | ||
var self = this; | ||
@@ -1414,3 +1582,3 @@ | ||
this.options = { autoDelete: true }; | ||
this.options = { autoDelete: true, closeChannelOnUnsubscribe: false }; | ||
if (options) mixin(this.options, options); | ||
@@ -1433,2 +1601,4 @@ | ||
} | ||
options['state'] = 'opening'; | ||
this.consumerTagOptions[consumerTag] = options; | ||
@@ -1455,2 +1625,3 @@ if (options.prefetchCount) { | ||
}); | ||
self.consumerTagOptions[consumerTag]['state'] = 'open'; | ||
}); | ||
@@ -1468,3 +1639,7 @@ }; | ||
.addCallback(function () { | ||
if(self.options.closeChannelOnUnsubscribe){ | ||
self.close(); | ||
} | ||
delete self.consumerTagListeners[consumerTag]; | ||
delete self.consumerTagOptions[consumerTag]; | ||
}); | ||
@@ -1494,15 +1669,15 @@ }; | ||
// basic consume | ||
var rawOptions = { noAck: !options.ack }; | ||
if (options.ack) { | ||
self.connection._sendMethod(self.channel, methods.basicQos, | ||
{ reserved1: 0 | ||
, prefetchSize: 0 | ||
, prefetchCount: options.prefetchCount | ||
, global: false | ||
}); | ||
rawOptions['prefetchCount'] = options.prefetchCount; | ||
} | ||
// basic consume | ||
var rawOptions = { noAck: !options.ack }; | ||
return this.subscribeRaw(rawOptions, function (m) { | ||
var isJSON = (m.contentType == 'text/json') || (m.contentType == 'application/json'); | ||
var contentType = m.contentType; | ||
if (contentType == null && m.headers && m.headers.properties) { | ||
contentType = m.headers.properties.content_type; | ||
} | ||
var isJSON = (contentType == 'text/json') || (contentType == 'application/json'); | ||
@@ -1580,3 +1755,3 @@ var b; | ||
Queue.prototype.bind = function (/* [exchange,] routingKey */) { | ||
Queue.prototype.bind = function (/* [exchange,] routingKey [, bindCallback] */) { | ||
var self = this; | ||
@@ -1588,4 +1763,13 @@ | ||
var exchange, routingKey; | ||
var exchange, routingKey, callback; | ||
if(typeof(arguments[arguments.length-1]) == 'function'){ | ||
callback = arguments[arguments.length-1]; | ||
} | ||
// Remove callback from args so rest of bind functionality works as before | ||
// Also, defend against cases where a non function callback has been passed as 3rd param | ||
if (callback || arguments.length == 3) { | ||
delete arguments[arguments.length-1]; | ||
arguments.length--; | ||
} | ||
if (arguments.length == 2) { | ||
@@ -1598,2 +1782,3 @@ exchange = arguments[0]; | ||
} | ||
if(callback) this._bindCallback = callback; | ||
@@ -1686,3 +1871,2 @@ | ||
options = options || {}; | ||
return this._taskPush(methods.queueDeleteOk, function () { | ||
@@ -1705,3 +1889,13 @@ self.connection.queueClosed(self.name); | ||
Queue.prototype.purge = function() { | ||
var self = this; | ||
return this._taskPush(methods.queuePurgeOk, function () { | ||
self.connection._sendMethod(self.channel, methods.queuePurge, | ||
{ reserved1 : 0, | ||
queue: self.name, | ||
noWait: false}) | ||
}); | ||
}; | ||
Queue.prototype._onMethod = function (channel, method, args) { | ||
@@ -1713,13 +1907,24 @@ this.emit(method.name, args); | ||
case methods.channelOpenOk: | ||
this.connection._sendMethod(channel, methods.queueDeclare, | ||
{ reserved1: 0 | ||
, queue: this.name | ||
, passive: this.options.passive ? true : false | ||
, durable: this.options.durable ? true : false | ||
, exclusive: this.options.exclusive ? true : false | ||
, autoDelete: this.options.autoDelete ? true : false | ||
, noWait: false | ||
, "arguments": this.options.arguments || {} | ||
}); | ||
this.state = "declare queue"; | ||
if (this.options.noDeclare) { | ||
this.state = 'open'; | ||
if (this._openCallback) { | ||
this._openCallback(this); | ||
this._openCallback = null; | ||
} | ||
this.emit('open'); | ||
} else { | ||
this.connection._sendMethod(channel, methods.queueDeclare, | ||
{ reserved1: 0 | ||
, queue: this.name | ||
, passive: this.options.passive ? true : false | ||
, durable: this.options.durable ? true : false | ||
, exclusive: this.options.exclusive ? true : false | ||
, autoDelete: this.options.autoDelete ? true : false | ||
, noWait: false | ||
, "arguments": this.options.arguments || {} | ||
}); | ||
this.state = "declare queue"; | ||
} | ||
break; | ||
@@ -1737,2 +1942,13 @@ | ||
this.emit('open', args.queue, args.messageCount, args.consumerCount); | ||
// If this is a reconnect, we must re-subscribe our queue listeners. | ||
var consumerTags = Object.keys(this.consumerTagListeners); | ||
for (var index in consumerTags) { | ||
if (this.consumerTagOptions[consumerTags[index]]['state'] === 'closed') { | ||
this.subscribeRaw(this.consumerTagOptions[consumerTags[index]], this.consumerTagListeners[consumerTags[index]]); | ||
// Having called subscribeRaw, we are now a new consumer with a new consumerTag. | ||
delete this.consumerTagListeners[consumerTags[index]]; | ||
delete this.consumerTagOptions[consumerTags[index]]; | ||
} | ||
} | ||
break; | ||
@@ -1745,2 +1961,8 @@ | ||
case methods.queueBindOk: | ||
if (this._bindCallback) { | ||
// setting this._bindCallback to null before calling the callback allows for a subsequent bind within the callback | ||
var cb = this._bindCallback; | ||
this._bindCallback = null; | ||
cb(this); | ||
} | ||
break; | ||
@@ -1751,2 +1973,7 @@ | ||
case methods.confirmSelectOk: | ||
this._sequence = 1; | ||
this.confirm = true; | ||
break; | ||
case methods.channelClose: | ||
@@ -1798,2 +2025,8 @@ this.state = "closed"; | ||
Queue.prototype.flow = function(active) { | ||
var self = this; | ||
return this._taskPush(methods.channelFlowOk, function () { | ||
self.connection._sendMethod(self.channel, methods.channelFlow, {'active': active }); | ||
}) | ||
}; | ||
@@ -1808,2 +2041,5 @@ | ||
this._openCallback = openCallback; | ||
this._sequence = null; | ||
this._unAcked = {}; | ||
} | ||
@@ -1831,2 +2067,14 @@ util.inherits(Exchange, Channel); | ||
// For if we want to delete a exchange, | ||
// we dont care if all of the options match. | ||
} else if (this.options.noDeclare){ | ||
this.state = 'open'; | ||
if (this._openCallback) { | ||
this._openCallback(this); | ||
this._openCallback = null; | ||
} | ||
this.emit('open'); | ||
} else { | ||
@@ -1844,3 +2092,3 @@ this.connection._sendMethod(channel, methods.exchangeDeclare, | ||
, noWait: false | ||
, "arguments": {} | ||
, "arguments":this.options.arguments || {} | ||
}); | ||
@@ -1851,3 +2099,22 @@ this.state = 'declaring'; | ||
case methods.exchangeDeclareOk: | ||
case methods.exchangeDeclareOk: | ||
if (this.options.confirm){ | ||
this.connection._sendMethod(channel, methods.confirmSelect, | ||
{ noWait: false }); | ||
}else{ | ||
this.state = 'open'; | ||
this.emit('open'); | ||
if (this._openCallback) { | ||
this._openCallback(this); | ||
this._openCallback = null; | ||
} | ||
} | ||
break; | ||
case methods.confirmSelectOk: | ||
this._sequence = 1; | ||
this.state = 'open'; | ||
@@ -1875,2 +2142,28 @@ this.emit('open'); | ||
case methods.basicAck: | ||
this.emit('basic-ack', args); | ||
if(args.deliveryTag == 0 && args.multiple == true){ | ||
// we must ack everything | ||
for(var tag in this._unAcked){ | ||
this._unAcked[tag].emitAck() | ||
delete this._unAcked[tag] | ||
} | ||
}else if(args.deliveryTag != 0 && args.multiple == true){ | ||
// we must ack everything before the delivery tag | ||
for(var tag in this._unAcked){ | ||
if(tag <= args.deliveryTag){ | ||
this._unAcked[tag].emitAck() | ||
delete this._unAcked[tag] | ||
} | ||
} | ||
}else if(this._unAcked[args.deliveryTag] && args.multiple == false){ | ||
// simple single ack | ||
this._unAcked[args.deliveryTag].emitAck() | ||
delete this._unAcked[args.deliveryTag] | ||
} | ||
break; | ||
case methods.basicReturn: | ||
@@ -1907,3 +2200,6 @@ this.emit('basic-return', args); | ||
// - clusterId | ||
Exchange.prototype.publish = function (routingKey, data, options) { | ||
// | ||
// the callback is optional and is only used when confirm is turned on for the exchange | ||
Exchange.prototype.publish = function (routingKey, data, options, callback) { | ||
var self = this; | ||
@@ -1918,3 +2214,3 @@ | ||
return this._taskPush(null, function () { | ||
var task = this._taskPush(null, function () { | ||
self.connection._sendMethod(self.channel, methods.basicPublish, options); | ||
@@ -1930,2 +2226,15 @@ // This interface is probably not appropriate for streaming large files. | ||
}); | ||
if (self.options.confirm){ | ||
task.sequence = self._sequence | ||
self._unAcked[self._sequence] = task | ||
self._sequence++ | ||
if(callback != null){ | ||
task.once('ack', function(){task.removeAllListeners();callback(false)}); | ||
this.once('error', function(){task.removeAllListeners();callback(true)}); | ||
} | ||
} | ||
return task | ||
}; | ||
@@ -1935,4 +2244,4 @@ | ||
Exchange.prototype.cleanup = function() { | ||
if (this.binds == 0) // don't keep reference open if unused | ||
this.connection.exchangeClosed(this.name); | ||
if (this.binds == 0) // don't keep reference open if unused | ||
this.connection.exchangeClosed(this.name); | ||
}; | ||
@@ -1939,0 +2248,0 @@ |
{ "name" : "amqp" | ||
, "description" : "AMQP driver for node" | ||
, "keywords" : [ "amqp" ] | ||
, "version" : "0.1.3" | ||
, "version" : "0.1.4" | ||
, "preferGlobal" : true | ||
@@ -29,3 +29,3 @@ , "author" : { "name" : "Ryan Dahl" } | ||
, "main" : "./amqp" | ||
, "engines" : { "node" : "0.4 || 0.5 || 0.6" } | ||
, "engines" : { "node" : "0.4 || 0.5 || 0.6 || 0.8" } | ||
, "licenses" : | ||
@@ -32,0 +32,0 @@ [ { "type" : "MIT" |
@@ -8,2 +8,3 @@ var events = require('events'); | ||
this.hasFired = false; | ||
this.hasAcked = false; | ||
this._values = undefined; | ||
@@ -53,2 +54,10 @@ }; | ||
exports.Promise.prototype.emitAck = function() { | ||
if (this.hasAcked) return; | ||
this.hasAcked = 'true'; | ||
this._values = Array.prototype.slice.call(arguments); | ||
this.emit.apply(this, ['ack'].concat(this._values)); | ||
}; | ||
exports.Promise.prototype.emitError = function() { | ||
@@ -55,0 +64,0 @@ if (this.hasFired) return; |
@@ -23,3 +23,2 @@ # node-amqp | ||
connection.on('ready', function () { | ||
// Create a queue and bind to all messages. | ||
// Use the default 'amq.topic' exchange | ||
@@ -167,2 +166,8 @@ connection.queue('my-queue', function(q){ | ||
won't be deleted. | ||
- `noDeclare`: boolean, default false. | ||
If set, the queue will not be declared, this will allow a queue to be | ||
deleted if you dont know its previous options. | ||
- `arguments`: a map of additional arguments to pass in when creating a queue. | ||
- `closeChannelOnUnsubscribe` : a boolean when true the channel will close on | ||
unsubscrube, default false. | ||
@@ -249,4 +254,14 @@ ### queue.subscribe([options,] listener) | ||
This method will emit `'queueBindOk'` when ready. | ||
This method will emit `'queueBindOk'` when complete. | ||
### queue.unbind([exchange,] routing) | ||
This method unbinds a queue from an exchange. | ||
If the exchange argument is left out `'amq.topic'` will be used. | ||
Ths method will emit `'queueUnbindOk'` when complete. | ||
### queue.bind_headers([exchange,] routing) | ||
@@ -310,5 +325,15 @@ | ||
server restarts. | ||
- `comfirm`: boolean, default false. | ||
If set when connecting to a exchange the channel will send acks | ||
for publishes. Published tasks will emit 'ack' when it is acked. | ||
- `autoDelete`: boolean, default true. | ||
If set, the exchange is deleted when there are no longer queues | ||
bound to it. | ||
- `noDeclare`: boolean, default false. | ||
If set, the exchange will not be declared, this will allow the exchange | ||
to be deleted if you dont know its previous options. | ||
- `confirm`: boolean, default false. | ||
If set, the exchange will be in confirm mode, and you will get a | ||
'ack'|'error' event emitted on a publish, or the callback on the publish | ||
will be called. | ||
@@ -318,3 +343,3 @@ An exchange will emit the `'open'` event when it is finally declared. | ||
### exchange.publish(routingKey, message, options) | ||
### exchange.publish(routingKey, message, options, callback) | ||
@@ -346,2 +371,6 @@ Publishes a message to the exchange. The `routingKey` argument is a string | ||
`callback` is a function that will get called if the exchange is in confirm mode, | ||
the value sent will be true or false, this is the presense of a error so true, means | ||
an error occured and false, means the publish was successfull | ||
### exchange.destroy(ifUnused = true) | ||
@@ -348,0 +377,0 @@ |
@@ -19,3 +19,4 @@ require('./harness'); | ||
bar: 'foo', | ||
number: '123' | ||
number: '123', | ||
stuff: [{x:1}, {x:2}] | ||
} }); | ||
@@ -36,2 +37,4 @@ | ||
assert.equal('123', m.headers['number'].toString()); | ||
assert.equal(1, m.headers['stuff'][0].x); | ||
assert.equal(2, m.headers['stuff'][1].x); | ||
}) | ||
@@ -38,0 +41,0 @@ }) |
@@ -5,14 +5,11 @@ global.options = { heartbeat: 1 }; | ||
connects = 0; | ||
var closed = 0; | ||
var closed = false; | ||
var hb = setInterval(function() { | ||
puts(" -> heartbeat"); | ||
connection.heartbeat(); | ||
}, 1000); | ||
setTimeout(function() { | ||
assert.ok(!closed); | ||
clearInterval(hb); | ||
setTimeout(function() { assert.ok(closed); }, 3000); | ||
// Change the local heartbeat interval (without changing the negotiated | ||
// interval). This will cause the server to notice we've dropped off, | ||
// and close the connection. | ||
connection.options['heartbeat'] = 0; | ||
setTimeout(function() { assert.ok(closed); }, 3500); | ||
}, 5000); | ||
@@ -25,6 +22,5 @@ | ||
puts("closed"); | ||
closed = 1; | ||
closed = true; | ||
}); | ||
connection.addListener('ready', function () { | ||
connects++; | ||
puts("connected to " + connection.serverProperties.product); | ||
@@ -31,0 +27,0 @@ |
@@ -35,2 +35,4 @@ require('./harness'); | ||
// wait one second to receive the message, then quit | ||
// make sure to delete our queue on our way out. | ||
q.destroy(); | ||
connection.end(); | ||
@@ -37,0 +39,0 @@ }, 1000); |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Non-existent author
Supply chain riskThe package was published by an npm account that no longer exists.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
Long strings
Supply chain riskContains long string literals, which may be a sign of obfuscated or packed code.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
435670
55
4090
377
1
10
4