docker-stream-cleanser
Advanced tools
Comparing version 0.0.10 to 0.0.11
65
app.js
@@ -27,3 +27,4 @@ module.exports = function(data, encoding, fixCarriageReturns) { | ||
* This takes data from an input stream, strips out the payload, then writes that payload | ||
* to the output stream. Since this method is for real-time streaming | ||
* to the output stream. Since this method is for real-time streaming, I gave it a chance to buffer | ||
* the data in case the message was broken up by docker (which seems to happen sometimes) | ||
* @param buildStream input stream | ||
@@ -35,2 +36,3 @@ * @param clientStream output stream | ||
module.exports.cleanStreams = function (buildStream, clientStream, encoding, fixCarriageReturns) { | ||
var lastBuffer = null; | ||
buildStream.on('data', function(data) { | ||
@@ -40,21 +42,52 @@ if (!Buffer.isBuffer(data)) { | ||
} | ||
var header = null, pointer = 0; | ||
if (!data || data.length < 8 || data[1] !== 0) { | ||
// Sometimes messages get chopped up by docker, even in the header, so we need to handle this | ||
//We need to make sure that this isn't a continuation of the message before it | ||
if (lastBuffer) { | ||
data = Buffer.concat([lastBuffer, data], lastBuffer.length + data.length); | ||
lastBuffer = null; | ||
} | ||
// If the header got chopped up, then we probably don't have a length. But we should at least | ||
// have the first 4 bytes (hopefully). | ||
if (data[0] > 0 && data[0] < 4 && (data[1] - data[2] - data[3] === 0)) { | ||
// If true, we at least have a legit docker message' | ||
if (data.length < 8) { | ||
// If we don't even have enough message for the full header, save it to the buffer and leave | ||
lastBuffer = data; | ||
} else { | ||
// We can at least read the size now. | ||
var header = data.slice(0, 8); | ||
var pointer = 8; | ||
while(header && header.length && header[1] - header[2] - header[3] === 0) { | ||
var size = header.readUInt32BE(4); | ||
if (pointer + size > data.length) { | ||
// check if the size greater than the rest of the message. If it is, slice off the | ||
// whole message (-8 to grab the header as well), and store it in the buffer | ||
lastBuffer = data.slice(pointer - 8); | ||
header = null; | ||
} else { | ||
var payload = data.slice(pointer, pointer += size); | ||
if (payload === null) break; | ||
payload = (fixCarriageReturns) ? | ||
payload.toString().replace(/\r?\n/g, '\r\n') : payload.toString(); | ||
clientStream.write(payload); | ||
if (data.length <= pointer) { | ||
header = null; | ||
} else if (data.length > pointer+8) { | ||
header = data.slice(pointer, pointer += 8); | ||
} else { | ||
// Somehow only part of a header is on the end of the message, so save it to the | ||
// buffer. | ||
lastBuffer = data.slice(pointer); | ||
header = null; | ||
} | ||
} | ||
} | ||
} | ||
} else { | ||
clientStream.write((fixCarriageReturns) ? | ||
data.toString().replace(/\r?\n/g, '\r\n') : data.toString()); | ||
} else { | ||
while(pointer < data.length) { | ||
header = data.slice(pointer, pointer += 8); | ||
if (header[1] - header[2] - header[3] !== 0) { | ||
break; | ||
} | ||
var size = header.readUInt32BE(4); | ||
var payload = data.slice(pointer, pointer += size); | ||
if (payload === null) break; | ||
payload = (fixCarriageReturns) ? | ||
payload.toString().replace(/\r?\n/g, '\r\n') : payload.toString(); | ||
clientStream.write(payload); | ||
} | ||
} | ||
}); | ||
}; |
{ | ||
"name": "docker-stream-cleanser", | ||
"version": "0.0.10", | ||
"version": "0.0.11", | ||
"main": "app.js", | ||
@@ -5,0 +5,0 @@ "devDependencies": { |
174
tests.js
@@ -7,2 +7,3 @@ var Lab = require('lab'); | ||
var tester = require('./app'); | ||
var MockStream = require('./mockreadwritestream'); | ||
@@ -62,2 +63,175 @@ describe('Testing the cleanser', function() { | ||
describe('Testing the streaming cleanser', function() { | ||
// Things to test | ||
// normal output | ||
// 1 message broken in the middle of the message | ||
// 1 message broken in the middle of the header | ||
// 2 separate messages, 2 messages put together, but with the last message broken in the header | ||
var data = 'h'; | ||
for (var x = 0; x < 500; x ++) { | ||
data += 'f'; | ||
} | ||
var dataWithHeader = createStream(data, true); | ||
it('should clean the header from a huge message in the stream', function(done) { | ||
var dockerStream = new MockStream(); | ||
var clientStream = new MockStream(); | ||
tester.cleanStreams(dockerStream, clientStream, 'hex', false); | ||
clientStream.on('data', function (result) { | ||
expect(data).to.equal(result); | ||
done(); | ||
}); | ||
dockerStream.write(dataWithHeader.toString('hex')); | ||
}); | ||
it('1 message broken in the middle of the message', function(done) { | ||
var dockerStream = new MockStream(); | ||
var clientStream = new MockStream(); | ||
tester.cleanStreams(dockerStream, clientStream, 'hex', false); | ||
var data2 = 'd'; | ||
for (var x = 0; x < 100; x ++) { | ||
data2 += 'b'; | ||
} | ||
var dataWithHeader2 = createStream(data2, true); | ||
var endData = false; | ||
clientStream.on('data', function (result) { | ||
if (endData) { | ||
expect(data2).to.equal(result); | ||
done(); | ||
} else { | ||
endData = true; | ||
expect(data).to.equal(result); | ||
} | ||
}); | ||
dockerStream.write(dataWithHeader.toString('hex') + dataWithHeader2.toString('hex')); | ||
}); | ||
it('1 message broken in the middle of the message', function(done) { | ||
var dockerStream = new MockStream(); | ||
var clientStream = new MockStream(); | ||
tester.cleanStreams(dockerStream, clientStream, 'hex', false); | ||
var data2 = 'd'; | ||
for (var x = 0; x < 100; x ++) { | ||
data2 += 'b'; | ||
} | ||
var dataWithHeader2 = createStream(data2, true); | ||
var endData = false; | ||
clientStream.on('data', function (result) { | ||
if (endData) { | ||
expect(data2).to.equal(result); | ||
done(); | ||
} else { | ||
endData = true; | ||
expect(data).to.equal(result); | ||
} | ||
}); | ||
dockerStream.write(dataWithHeader.toString('hex') + | ||
dataWithHeader2.slice(0, 50).toString('hex')); | ||
dockerStream.write(dataWithHeader2.slice(50).toString('hex')); | ||
}); | ||
it('1 message broken in the middle of the header', function(done) { | ||
var dockerStream = new MockStream(); | ||
var clientStream = new MockStream(); | ||
tester.cleanStreams(dockerStream, clientStream, 'hex', false); | ||
var data2 = 'd'; | ||
for (var x = 0; x < 100; x ++) { | ||
data2 += 'b'; | ||
} | ||
var dataWithHeader2 = createStream(data2, true); | ||
var endData = false; | ||
clientStream.on('data', function (result) { | ||
if (endData) { | ||
expect(data2).to.equal(result); | ||
done(); | ||
} else { | ||
endData = true; | ||
expect(data).to.equal(result); | ||
} | ||
}); | ||
dockerStream.write(dataWithHeader.toString('hex') + | ||
dataWithHeader2.slice(0, 4).toString('hex')); | ||
dockerStream.write(dataWithHeader2.slice(4).toString('hex')); | ||
}); | ||
it('3 separate messages, 2 messages put together, but with the last message broken in the header', | ||
function(done) { | ||
var dockerStream = new MockStream(); | ||
var clientStream = new MockStream(); | ||
tester.cleanStreams(dockerStream, clientStream, 'hex', false); | ||
var data2 = 'd'; | ||
for (var x = 0; x < 100; x ++) { | ||
data2 += 'b'; | ||
} | ||
var dataWithHeader2 = createStream(data2, true); | ||
var data3 = 'd'; | ||
for (var x = 0; x < 100; x ++) { | ||
data3 += 'b'; | ||
} | ||
var dataWithHeader3 = createStream(data3, true); | ||
var endData = 2; | ||
clientStream.on('data', function (result) { | ||
if (! endData) { | ||
expect(data3).to.equal(result); | ||
done(); | ||
} else if (endData === 2) { | ||
endData--; | ||
expect(data).to.equal(result); | ||
} else { | ||
endData--; | ||
expect(data2).to.equal(result); | ||
} | ||
}); | ||
dockerStream.write(dataWithHeader.toString('hex') + dataWithHeader2.toString('hex') + | ||
dataWithHeader3.slice(0, 4).toString('hex')); | ||
dockerStream.write(dataWithHeader3.slice(4).toString('hex')); | ||
}); | ||
it('3 separate messages, 2 messages put together, but with the first message broken in the header', | ||
function(done) { | ||
var dockerStream = new MockStream(); | ||
var clientStream = new MockStream(); | ||
tester.cleanStreams(dockerStream, clientStream, 'hex', false); | ||
var data2 = 'd'; | ||
for (var x = 0; x < 100; x ++) { | ||
data2 += 'b'; | ||
} | ||
var dataWithHeader2 = createStream(data2, true); | ||
var data3 = 'd'; | ||
for (var x = 0; x < 100; x ++) { | ||
data3 += 'b'; | ||
} | ||
var dataWithHeader3 = createStream(data3, true); | ||
var endData = 2; | ||
clientStream.on('data', function (result) { | ||
if (! endData) { | ||
expect(data3).to.equal(result); | ||
done(); | ||
} else if (endData === 2) { | ||
endData--; | ||
expect(data).to.equal(result); | ||
} else { | ||
endData--; | ||
expect(data2).to.equal(result); | ||
} | ||
}); | ||
dockerStream.write(dataWithHeader.toString('hex') + | ||
dataWithHeader2.slice(0, 4).toString('hex')); | ||
dockerStream.write(dataWithHeader2.slice(4).toString('hex') + dataWithHeader3.toString('hex')); | ||
}); | ||
}); | ||
function createStream(data, includeHeader) { | ||
@@ -64,0 +238,0 @@ var length = data.length + ((includeHeader) ? 8 : 0); |
Sorry, the diff of this file is not supported yet
69940
20
337