Comparing version
# Change log for amqplib | ||
## Unreleased | ||
- Remove bitsyntax dependency - See https://github.com/amqp-node/amqplib/pull/785. Thanks @ikenfin | ||
- Stop checking if frame max is exceeded when parsing frames - See https://github.com/amqp-node/amqplib/pull/784. Thanks @ohroy | ||
## v0.10.6 | ||
@@ -4,0 +8,0 @@ - Replace references to the old squaremo/amqp.name repo with ones to amqp-node/amqplib |
@@ -556,3 +556,3 @@ // | ||
// %%% identifying invariants might help here? | ||
var frame = parseFrame(this.rest, this.frameMax); | ||
var frame = parseFrame(this.rest); | ||
@@ -559,0 +559,0 @@ if (!frame) { |
174
lib/frame.js
@@ -7,2 +7,3 @@ // The river sweeps through | ||
const ints = require('buffer-more-ints') | ||
var defs = require('./defs'); | ||
@@ -12,4 +13,2 @@ var constants = defs.constants; | ||
var Bits = require('@acuminous/bitsyntax'); | ||
module.exports.PROTOCOL_HEADER = "AMQP" + String.fromCharCode(0, 0, 9, 1); | ||
@@ -38,37 +37,97 @@ | ||
var bodyCons = | ||
Bits.builder(FRAME_BODY, | ||
'channel:16, size:32, payload:size/binary', | ||
FRAME_END); | ||
// expected byte sizes for frame parts | ||
const TYPE_BYTES = 1 | ||
const CHANNEL_BYTES = 2 | ||
const SIZE_BYTES = 4 | ||
const FRAME_HEADER_BYTES = TYPE_BYTES + CHANNEL_BYTES + SIZE_BYTES | ||
const FRAME_END_BYTES = 1 | ||
/** | ||
* @typedef {{ | ||
* type: number, | ||
* channel: number, | ||
* size: number, | ||
* payload: Buffer, | ||
* rest: Buffer | ||
* }} FrameStructure | ||
*/ | ||
/** | ||
* This is a polyfill which will read a big int 64 bit as a number. | ||
* @arg { Buffer } buffer | ||
* @arg { number } offset | ||
* @returns { number } | ||
*/ | ||
function readInt64BE(buffer, offset) { | ||
/** | ||
* We try to use native implementation if available here because | ||
* buffer-more-ints does not | ||
*/ | ||
if (typeof Buffer.prototype.readBigInt64BE === 'function') { | ||
return Number(buffer.readBigInt64BE(offset)) | ||
} | ||
return ints.readInt64BE(buffer, offset) | ||
} | ||
// %%% TESTME possibly better to cons the first bit and write the | ||
// second directly, in the absence of IO lists | ||
module.exports.makeBodyFrame = function(channel, payload) { | ||
return bodyCons({channel: channel, size: payload.length, payload: payload}); | ||
/** | ||
* Make a frame header | ||
* @arg { number } channel | ||
* @arg { Buffer } payload | ||
*/ | ||
module.exports.makeBodyFrame = function (channel, payload) { | ||
const frameSize = FRAME_HEADER_BYTES + payload.length + FRAME_END_BYTES | ||
const frame = Buffer.alloc(frameSize) | ||
let offset = 0 | ||
offset = frame.writeUInt8(FRAME_BODY, offset) | ||
offset = frame.writeUInt16BE(channel, offset) | ||
offset = frame.writeInt32BE(payload.length, offset) | ||
payload.copy(frame, offset) | ||
offset += payload.length | ||
frame.writeUInt8(FRAME_END, offset) | ||
return frame | ||
}; | ||
var frameHeaderPattern = Bits.matcher('type:8', 'channel:16', | ||
'size:32', 'rest/binary'); | ||
/** | ||
* Parse an AMQP frame | ||
* @arg { Buffer } bin | ||
* @arg { number } max | ||
* @returns { FrameStructure | boolean } | ||
*/ | ||
function parseFrame(bin) { | ||
if (bin.length < FRAME_HEADER_BYTES) { | ||
return false | ||
} | ||
function parseFrame(bin, max) { | ||
var fh = frameHeaderPattern(bin); | ||
if (fh) { | ||
var size = fh.size, rest = fh.rest; | ||
if (size > max) { | ||
throw new Error('Frame size exceeds frame max'); | ||
} | ||
else if (rest.length > size) { | ||
if (rest[size] !== FRAME_END) | ||
throw new Error('Invalid frame'); | ||
const type = bin.readUInt8(0) | ||
const channel = bin.readUInt16BE(1) | ||
const size = bin.readUInt32BE(3) | ||
return { | ||
type: fh.type, | ||
channel: fh.channel, | ||
size: size, | ||
payload: rest.slice(0, size), | ||
rest: rest.slice(size + 1) | ||
}; | ||
} | ||
const totalSize = FRAME_HEADER_BYTES + size + FRAME_END_BYTES | ||
if (bin.length < totalSize) { | ||
return false | ||
} | ||
return false; | ||
const frameEnd = bin.readUInt8(FRAME_HEADER_BYTES + size) | ||
if (frameEnd !== FRAME_END) { | ||
throw new Error('Invalid frame') | ||
} | ||
return { | ||
type, | ||
channel, | ||
size, | ||
payload: bin.subarray(FRAME_HEADER_BYTES, FRAME_HEADER_BYTES + size), | ||
rest: bin.subarray(totalSize) | ||
} | ||
} | ||
@@ -78,31 +137,34 @@ | ||
var headerPattern = Bits.matcher('class:16', | ||
'_weight:16', | ||
'size:64', | ||
'flagsAndfields/binary'); | ||
var HEARTBEAT = {channel: 0}; | ||
var methodPattern = Bits.matcher('id:32, args/binary'); | ||
/** | ||
* Decode AMQP frame into JS object | ||
* @param { FrameStructure } frame | ||
* @returns | ||
*/ | ||
module.exports.decodeFrame = (frame) => { | ||
const payload = frame.payload | ||
const channel = frame.channel | ||
var HEARTBEAT = {channel: 0}; | ||
module.exports.decodeFrame = function(frame) { | ||
var payload = frame.payload; | ||
switch (frame.type) { | ||
case FRAME_METHOD: | ||
var idAndArgs = methodPattern(payload); | ||
var id = idAndArgs.id; | ||
var fields = decode(id, idAndArgs.args); | ||
return {id: id, channel: frame.channel, fields: fields}; | ||
case FRAME_HEADER: | ||
var parts = headerPattern(payload); | ||
var id = parts['class']; | ||
var fields = decode(id, parts.flagsAndfields); | ||
return {id: id, channel: frame.channel, | ||
size: parts.size, fields: fields}; | ||
case FRAME_BODY: | ||
return {channel: frame.channel, content: frame.payload}; | ||
case FRAME_HEARTBEAT: | ||
return HEARTBEAT; | ||
default: | ||
throw new Error('Unknown frame type ' + frame.type); | ||
case FRAME_METHOD: { | ||
const id = payload.readUInt32BE(0) | ||
const args = payload.subarray(4) | ||
const fields = decode(id, args) | ||
return { id, channel, fields } | ||
} | ||
case FRAME_HEADER: { | ||
const id = payload.readUInt16BE(0) | ||
// const weight = payload.readUInt16BE(2) | ||
const size = readInt64BE(payload, 4) | ||
const flagsAndfields = payload.subarray(12) | ||
const fields = decode(id, flagsAndfields) | ||
return { id, channel, size, fields } | ||
} | ||
case FRAME_BODY: | ||
return { channel, content: payload } | ||
case FRAME_HEARTBEAT: | ||
return HEARTBEAT | ||
default: | ||
throw new Error('Unknown frame type ' + frame.type) | ||
} | ||
@@ -117,2 +179,2 @@ } | ||
module.exports.HEARTBEAT = HEARTBEAT; | ||
module.exports.HEARTBEAT = HEARTBEAT; |
@@ -5,3 +5,3 @@ { | ||
"main": "./channel_api.js", | ||
"version": "0.10.6", | ||
"version": "0.10.7", | ||
"description": "An AMQP 0-9-1 (e.g., RabbitMQ) library and client.", | ||
@@ -16,3 +16,2 @@ "repository": { | ||
"dependencies": { | ||
"@acuminous/bitsyntax": "^0.1.2", | ||
"buffer-more-ints": "~1.0.0", | ||
@@ -19,0 +18,0 @@ "url-parse": "~1.5.10" |
@@ -72,8 +72,2 @@ 'use strict'; | ||
testBogusFrame('> max frame', | ||
[defs.constants.FRAME_BODY, | ||
0,0, 0,0,0,6, // too big! | ||
65,66,67,68,69,70, | ||
defs.constants.FRAME_END]); | ||
}); | ||
@@ -80,0 +74,0 @@ |
Sorry, the diff of this file is not supported yet
451871
0.24%2
-33.33%12657
0.36%- Removed
- Removed
- Removed
- Removed
- Removed