docker-stream-cleanser
Advanced tools
Comparing version 0.0.16 to 0.1.0
64
app.js
@@ -27,2 +27,66 @@ 'use strict'; | ||
module.exports.async = function(stream, encoding, cb) { | ||
if (typeof encoding === 'function') { | ||
cb = encoding; | ||
encoding = null; | ||
} | ||
var result = []; | ||
var array = []; | ||
var dataBuffer = null; | ||
function parseDataBuffer(ended) { | ||
// Sometimes messages get chopped up by docker, even in the header, so we need to handle this | ||
if (dataBuffer) { | ||
array.unshift(dataBuffer); | ||
} | ||
dataBuffer = Buffer.concat(array.splice(0, array.length)); | ||
//We need to make sure that this isn't a continuation of the message before it | ||
if (dataBuffer.length > 8 && dataBuffer[0] <= 3) { | ||
// We're going to be popping each message off of the top of dataBuffer | ||
var header = dataBuffer.slice(0, 8); | ||
var size = header.readUInt32BE(4); | ||
// check if the size greater than the rest of the message. If it is, slice off the | ||
// whole message (-8 to grab the header as well), and store it in the buffer | ||
if (8 + size <= dataBuffer.length) { | ||
// Basically do a splice here | ||
var payload = dataBuffer.slice(8, 8 + size); | ||
dataBuffer = dataBuffer.slice(8 + size); | ||
if (payload === null) { | ||
return; | ||
} | ||
result.push(payload); | ||
if (dataBuffer.length > 8) { | ||
// Since we've moved the data to a buffer outside of the processing, we can use a | ||
// timeout to schedule the next iteration. | ||
setImmediate(parseDataBuffer, ended); | ||
} | ||
} | ||
} | ||
if (!dataBuffer.length && ended) { | ||
cb(Buffer.concat(result)); | ||
} | ||
} | ||
function streamEnd() { | ||
// Write it to hex so we don't corrupt anything | ||
stream.removeListener('data', writeData); | ||
stream.removeListener('end', streamEnd); | ||
parseDataBuffer(true); | ||
} | ||
function writeData(data) { | ||
// First, make sure the data is a buffer | ||
if (!Buffer.isBuffer(data)) { | ||
data= new Buffer(data, encoding); | ||
} | ||
array.push(data); | ||
// Now send it off to the parseData function | ||
parseDataBuffer(); | ||
} | ||
// Subscribe to the data event here. | ||
stream.on('data', writeData); | ||
stream.on('end', streamEnd); | ||
}; | ||
/** | ||
@@ -29,0 +93,0 @@ * This takes data from an input stream, strips out the payload, then writes that payload |
{ | ||
"name": "docker-stream-cleanser", | ||
"version": "0.0.16", | ||
"version": "0.1.0", | ||
"main": "app.js", | ||
@@ -10,3 +10,3 @@ "devDependencies": { | ||
"scripts": { | ||
"test": "node ./node_modules/lab/bin/lab test.js" | ||
"test": "NODE_PATH=. node ./node_modules/lab/bin/lab tests.js" | ||
}, | ||
@@ -13,0 +13,0 @@ "repository": { |
@@ -0,1 +1,5 @@ | ||
[![NPM](https://nodei.co/npm/docker-stream-cleanser.png?downloads=true&downloadRank=true&stars=true)](https://nodei.co/npm/docker-stream-cleanser/) | ||
[![Build Status](https://travis-ci.org/Nathan219/docker-stream-cleanser.svg?branch=master)](https://travis-ci.org/Nathan219/docker-stream-cleanser) | ||
Docker Stream Cleanser | ||
@@ -16,2 +20,3 @@ ========= | ||
Usage | ||
@@ -37,4 +42,9 @@ ---- | ||
// cleanStreams(inputStream, outputStream, encoding, addCarraigeReturn) | ||
// addCarraigeReturn is a flag to replace all \n's in the stream with \r\n | ||
``` | ||
For more info, look at the header comments and the tests | ||
@@ -45,3 +55,3 @@ | ||
0.0.11 | ||
0.0.16 | ||
@@ -48,0 +58,0 @@ Installation |
89
tests.js
@@ -62,2 +62,91 @@ var Lab = require('lab'); | ||
describe('Testing the async', function() { | ||
it('1 message broken in the middle of the message', function(done) { | ||
var data = 'h'; | ||
for (var x = 0; x < 500; x ++) { | ||
data += 'f'; | ||
} | ||
var dataWithHeader = createStream(data, true); | ||
var data2 = 'd'; | ||
for (var x = 0; x < 100; x ++) { | ||
data2 += 'b'; | ||
} | ||
var dataWithHeader2 = createStream(data2, true); | ||
var dockerStream = new MockStream(); | ||
tester.async(dockerStream, function(result) { | ||
result = result.toString(); | ||
expect(data + data2).to.equal(result); | ||
done(); | ||
}); | ||
dockerStream.write(dataWithHeader); | ||
dockerStream.write(dataWithHeader2); | ||
dockerStream.end(); | ||
}); | ||
it('3 separate messages, 2 messages put together, but with the first message broken in the header', | ||
function(done) { | ||
var dockerStream = new MockStream(); | ||
var data = 'h'; | ||
for (var x = 0; x < 100; x ++) { | ||
data += 'f'; | ||
} | ||
var dataWithHeader = createStream(data, true); | ||
var data2 = 'd'; | ||
for (var x = 0; x < 100; x ++) { | ||
data2 += 'b'; | ||
} | ||
var dataWithHeader2 = createStream(data2, true); | ||
var data3 = 'e'; | ||
for (var x = 0; x < 100; x ++) { | ||
data3 += 's'; | ||
} | ||
var dataWithHeader3 = createStream(data3, true); | ||
tester.async(dockerStream, function(result) { | ||
result = result.toString(); | ||
expect(data + data2 + data3).to.equal(result); | ||
done(); | ||
}); | ||
dockerStream.write(Buffer.concat([dataWithHeader, | ||
dataWithHeader2.slice(0, 4)])); | ||
dockerStream.write(Buffer.concat([dataWithHeader2.slice(4), dataWithHeader3])); | ||
dockerStream.end(); | ||
}); | ||
it('3 separate messages, 2 messages put together, but with the last message broken in the header', | ||
function(done) { | ||
var dockerStream = new MockStream(); | ||
var data = 'h'; | ||
for (var x = 0; x < 100; x ++) { | ||
data += 'f'; | ||
} | ||
var dataWithHeader = createStream(data, true); | ||
var data2 = 'd'; | ||
for (var x = 0; x < 100; x ++) { | ||
data2 += 'b'; | ||
} | ||
var dataWithHeader2 = createStream(data2, true); | ||
var data3 = 'e'; | ||
for (var x = 0; x < 100; x ++) { | ||
data3 += 's'; | ||
} | ||
var dataWithHeader3 = createStream(data3, true); | ||
tester.async(dockerStream, function(result) { | ||
result = result.toString(); | ||
expect(data + data2 + data3).to.equal(result); | ||
done(); | ||
}); | ||
dockerStream.write(Buffer.concat([dataWithHeader, dataWithHeader2, | ||
dataWithHeader3.slice(0, 4)])); | ||
dockerStream.write(Buffer.concat([dataWithHeader3.slice(4)])); | ||
dockerStream.end(); | ||
}); | ||
}); | ||
describe('Testing the streaming cleanser', function() { | ||
@@ -64,0 +153,0 @@ // Things to test |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
83481
22
467
67