stream-equal
Advanced tools
Comparing version 0.1.3 to 0.1.4
145
lib/index.js
@@ -0,13 +1,37 @@ | ||
var Readable = require('readable-stream').Readable; | ||
/** | ||
* Tests that two readable streams are equal. | ||
* | ||
* @param (ReadableStream) readStream2 | ||
* @param (ReadableStream) readStream2 | ||
* @param (Function(!Error, boolean)) callback | ||
* @param {Readable|Stream} readStream2 | ||
* @param {Readable|Stream} readStream2 | ||
* @param {Function(!Error, Boolean)} callback | ||
*/ | ||
module.exports = function streamEqual(readStream1, readStream2, callback) { | ||
var stream1 = { stream: readStream1, data: null, pos: 0, ended: false }; | ||
var stream2 = { stream: readStream2, data: null, pos: 0, ended: false }; | ||
var ondata1 = createOnData(stream1, stream2, cleanup); | ||
var ondata2 = createOnData(stream2, stream1, cleanup); | ||
readStream1 = getReadable(readStream1); | ||
readStream1.id = 1; | ||
readStream2 = getReadable(readStream2); | ||
readStream2.id = 2; | ||
var stream1 = { | ||
stream: readStream1, | ||
data: null, pos: 0, | ||
ended: false, | ||
readable: false, | ||
onreadable: function onreadable() { | ||
stream1.readable = true; | ||
} | ||
}; | ||
var stream2 = { | ||
stream: readStream2, | ||
data: null, | ||
pos: 0, | ||
ended: false, | ||
readable: false, | ||
onreadable: function onreadable() { | ||
stream2.readable = true; | ||
} | ||
}; | ||
var read1 = createRead(stream1, stream2, cleanup); | ||
var read2 = createRead(stream2, stream1, cleanup); | ||
var onend1 = createOnEnd(stream1, stream2, cleanup); | ||
@@ -17,7 +41,7 @@ var onend2 = createOnEnd(stream2, stream1, cleanup); | ||
function cleanup(err, equal) { | ||
readStream1.removeListener('data', ondata1); | ||
readStream1.removeListener('readable', stream1.onreadable); | ||
readStream1.removeListener('error', cleanup); | ||
readStream1.removeListener('end', onend1); | ||
readStream2.removeListener('data', ondata2); | ||
readStream2.removeListener('readable', stream2.onreadable); | ||
readStream2.removeListener('error', cleanup); | ||
@@ -29,9 +53,14 @@ readStream2.removeListener('end', onend2); | ||
readStream1.on('data', ondata1); | ||
stream1.read = read1; | ||
readStream1.on('readable', stream1.onreadable); | ||
readStream1.on('end', onend1); | ||
readStream1.on('error', cleanup); | ||
readStream2.on('data', ondata2); | ||
stream2.read = read2; | ||
readStream2.on('readable', stream2.onreadable); | ||
readStream2.on('end', onend2); | ||
readStream2.on('error', cleanup); | ||
// Start by reading from the first stream. | ||
read1(); | ||
}; | ||
@@ -41,13 +70,23 @@ | ||
/** | ||
* Returns a function that compares emitted `data` event with that of the | ||
* most recent `data` event from another stream. | ||
* Returns a function that compares emitted `read()` call with that of the | ||
* most recent `read` call from another stream. | ||
* | ||
* @param (Object) stream | ||
* @param (Object) otherStream | ||
* @param (Function(Error, boolean)) callback | ||
* @return (Function(Buffer|string)) | ||
* @param {Object} stream | ||
* @param {Object} otherStream | ||
* @param {Function(Error, Boolean)} callback | ||
* @return {Function(Buffer|String)} | ||
*/ | ||
function createOnData(stream, otherStream, callback) { | ||
return function ondata(data) { | ||
// make sure `data` is a buffer | ||
function createRead(stream, otherStream, callback) { | ||
return function read() { | ||
if (!stream.readable) { | ||
return stream.stream.once('readable', stream.read); | ||
} | ||
stream.readable = false; | ||
var data = stream.stream.read(); | ||
if (!data) { | ||
return stream.stream.once('readable', stream.read); | ||
} | ||
// Make sure `data` is a buffer. | ||
if (!Buffer.isBuffer(data)) { | ||
@@ -75,3 +114,3 @@ var type = typeof data; | ||
// compare | ||
// Compare. | ||
for (var i = 0, len = streamData.length; i < len; i++) { | ||
@@ -84,3 +123,3 @@ if (streamData[i] !== otherStreamData[i]) { | ||
} else if (stream.data && stream.data.length) { | ||
stream.data = bufferConcat(stream.data, data); | ||
stream.data = Buffer.concat([stream.data, data]); | ||
} else { | ||
@@ -90,16 +129,18 @@ stream.data = data; | ||
stream.pos = newPos; | ||
if (newPos > otherStream.pos) { | ||
if (otherStream.ended) { | ||
// if this stream is still emitting `data` events but the other has | ||
// ended, then this is longer than the other one | ||
// If this stream is still emitting `data` events but the other has | ||
// ended, then this is longer than the other one. | ||
return callback(null, false); | ||
} | ||
// if this stream has caught up to the other, | ||
// pause it and resume the other one | ||
stream.stream.pause(); | ||
otherStream.stream.resume(); | ||
// If this stream has caught up to the other, | ||
// read from other one. | ||
otherStream.read(); | ||
} else { | ||
stream.read(); | ||
} | ||
stream.pos = newPos; | ||
}; | ||
@@ -112,5 +153,5 @@ } | ||
* | ||
* @param (Object) stream | ||
* @param (Object) otherStream | ||
* @param (Function(Error, boolean)) callback | ||
* @param {Object} stream | ||
* @param {Object} otherStream | ||
* @param {Function(!Error, Boolean)} callback | ||
*/ | ||
@@ -123,3 +164,3 @@ function createOnEnd(stream, otherStream, callback) { | ||
} else { | ||
otherStream.stream.resume(); | ||
otherStream.read(); | ||
} | ||
@@ -131,13 +172,31 @@ }; | ||
/** | ||
* Concatenate two buffers. Because 0.6.x... | ||
* Returns a readable new stream API stream if the stream is using the | ||
* old API. Otherwise it returns the same stream. | ||
* | ||
* @param (Buffer) buf1 | ||
* @param (Buffer) buf2 | ||
* @return (Buffer) | ||
* @param {Readable|Stream} stream | ||
* @return {Readable} | ||
*/ | ||
function bufferConcat(buf1, buf2) { | ||
var newBuf = new Buffer(buf1.length + buf2.length); | ||
buf1.copy(newBuf); | ||
buf2.copy(newBuf, buf1.length); | ||
return newBuf; | ||
function getReadable(stream) { | ||
var readable; | ||
if (isOldStyleStream(stream)) { | ||
readable = new Readable(); | ||
readable.wrap(stream); | ||
stream = readable; | ||
} | ||
return stream; | ||
} | ||
/** | ||
* Returns true if a stream is an old style API stream. | ||
* | ||
* @param {Readable|Stream} stream | ||
* @return {Boolean} | ||
*/ | ||
function isOldStyleStream(stream) { | ||
return typeof stream.read !== 'function' || | ||
typeof stream._read !== 'function' || | ||
typeof stream.push !== 'function' || | ||
typeof stream.unshift !== 'function' || | ||
typeof stream.wrap !== 'function'; | ||
} |
@@ -5,3 +5,3 @@ { | ||
"keywords": ["stream", "input", "output", "io", "assert", "test"], | ||
"version": "0.1.3", | ||
"version": "0.1.4", | ||
"repository": { | ||
@@ -19,2 +19,5 @@ "type": "git", | ||
}, | ||
"dependencies": { | ||
"readable-stream": "~1.0.2" | ||
}, | ||
"devDependencies": { | ||
@@ -21,0 +24,0 @@ "mocha": "x", |
var streamEqual = require('..') | ||
, assert = require('assert') | ||
, from = require('from') | ||
, PassThrough = require('readable-stream').PassThrough | ||
, fs = require('fs') | ||
@@ -19,4 +19,4 @@ , path = require('path') | ||
* | ||
* @param (Object) options1 | ||
* @param (Object) options2 | ||
* @param {Object} options1 | ||
* @param {Object} options2 | ||
*/ | ||
@@ -71,5 +71,22 @@ function testEqual(options1, options2) { | ||
it('Streams should not be equal', function(done) { | ||
var stream1 = from('you\'re the man now'.split(' ')); | ||
var stream2 = from('you\'re the man now dawg'.split(' ')); | ||
var stream1 = new PassThrough(); | ||
var stream2 = new PassThrough(); | ||
function writeToStream(stream, str) { | ||
var pieces = str.split(' '); | ||
process.nextTick(function next() { | ||
var piece = pieces.shift(); | ||
if (piece) { | ||
stream.write(piece); | ||
process.nextTick(next); | ||
} else { | ||
stream.end(); | ||
} | ||
}); | ||
} | ||
writeToStream(stream1, 'you\'re the man now'); | ||
writeToStream(stream2, 'you\'re the man now dawg!'); | ||
streamEqual(stream1, stream2, function(err, equal) { | ||
@@ -76,0 +93,0 @@ if (err) return done(err); |
Sorry, the diff of this file is not supported yet
10482
240
1
+ Addedreadable-stream@~1.0.2
+ Addedcore-util-is@1.0.3(transitive)
+ Addedinherits@2.0.4(transitive)
+ Addedisarray@0.0.1(transitive)
+ Addedreadable-stream@1.0.34(transitive)
+ Addedstring_decoder@0.10.31(transitive)