New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

stream-equal

Package Overview
Dependencies
Maintainers
1
Versions
18
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stream-equal - npm Package Compare versions

Comparing version 0.1.3 to 0.1.4

145

lib/index.js

@@ -0,13 +1,37 @@

var Readable = require('readable-stream').Readable;
/**
* Tests that two readable streams are equal.
*
* @param (ReadableStream) readStream2
* @param (ReadableStream) readStream2
* @param (Function(!Error, boolean)) callback
* @param {Readable|Stream} readStream2
* @param {Readable|Stream} readStream2
* @param {Function(!Error, Boolean)} callback
*/
module.exports = function streamEqual(readStream1, readStream2, callback) {
var stream1 = { stream: readStream1, data: null, pos: 0, ended: false };
var stream2 = { stream: readStream2, data: null, pos: 0, ended: false };
var ondata1 = createOnData(stream1, stream2, cleanup);
var ondata2 = createOnData(stream2, stream1, cleanup);
readStream1 = getReadable(readStream1);
readStream1.id = 1;
readStream2 = getReadable(readStream2);
readStream2.id = 2;
var stream1 = {
stream: readStream1,
data: null, pos: 0,
ended: false,
readable: false,
onreadable: function onreadable() {
stream1.readable = true;
}
};
var stream2 = {
stream: readStream2,
data: null,
pos: 0,
ended: false,
readable: false,
onreadable: function onreadable() {
stream2.readable = true;
}
};
var read1 = createRead(stream1, stream2, cleanup);
var read2 = createRead(stream2, stream1, cleanup);
var onend1 = createOnEnd(stream1, stream2, cleanup);

@@ -17,7 +41,7 @@ var onend2 = createOnEnd(stream2, stream1, cleanup);

function cleanup(err, equal) {
readStream1.removeListener('data', ondata1);
readStream1.removeListener('readable', stream1.onreadable);
readStream1.removeListener('error', cleanup);
readStream1.removeListener('end', onend1);
readStream2.removeListener('data', ondata2);
readStream2.removeListener('readable', stream2.onreadable);
readStream2.removeListener('error', cleanup);

@@ -29,9 +53,14 @@ readStream2.removeListener('end', onend2);

readStream1.on('data', ondata1);
stream1.read = read1;
readStream1.on('readable', stream1.onreadable);
readStream1.on('end', onend1);
readStream1.on('error', cleanup);
readStream2.on('data', ondata2);
stream2.read = read2;
readStream2.on('readable', stream2.onreadable);
readStream2.on('end', onend2);
readStream2.on('error', cleanup);
// Start by reading from the first stream.
read1();
};

@@ -41,13 +70,23 @@

/**
* Returns a function that compares emitted `data` event with that of the
* most recent `data` event from another stream.
* Returns a function that compares emitted `read()` call with that of the
* most recent `read` call from another stream.
*
* @param (Object) stream
* @param (Object) otherStream
* @param (Function(Error, boolean)) callback
* @return (Function(Buffer|string))
* @param {Object} stream
* @param {Object} otherStream
* @param {Function(Error, Boolean)} callback
* @return {Function(Buffer|String)}
*/
function createOnData(stream, otherStream, callback) {
return function ondata(data) {
// make sure `data` is a buffer
function createRead(stream, otherStream, callback) {
return function read() {
if (!stream.readable) {
return stream.stream.once('readable', stream.read);
}
stream.readable = false;
var data = stream.stream.read();
if (!data) {
return stream.stream.once('readable', stream.read);
}
// Make sure `data` is a buffer.
if (!Buffer.isBuffer(data)) {

@@ -75,3 +114,3 @@ var type = typeof data;

// compare
// Compare.
for (var i = 0, len = streamData.length; i < len; i++) {

@@ -84,3 +123,3 @@ if (streamData[i] !== otherStreamData[i]) {

} else if (stream.data && stream.data.length) {
stream.data = bufferConcat(stream.data, data);
stream.data = Buffer.concat([stream.data, data]);
} else {

@@ -90,16 +129,18 @@ stream.data = data;

stream.pos = newPos;
if (newPos > otherStream.pos) {
if (otherStream.ended) {
// if this stream is still emitting `data` events but the other has
// ended, then this is longer than the other one
// If this stream is still emitting `data` events but the other has
// ended, then this is longer than the other one.
return callback(null, false);
}
// if this stream has caught up to the other,
// pause it and resume the other one
stream.stream.pause();
otherStream.stream.resume();
// If this stream has caught up to the other,
// read from other one.
otherStream.read();
} else {
stream.read();
}
stream.pos = newPos;
};

@@ -112,5 +153,5 @@ }

*
* @param (Object) stream
* @param (Object) otherStream
* @param (Function(Error, boolean)) callback
* @param {Object} stream
* @param {Object} otherStream
* @param {Function(!Error, Boolean)} callback
*/

@@ -123,3 +164,3 @@ function createOnEnd(stream, otherStream, callback) {

} else {
otherStream.stream.resume();
otherStream.read();
}

@@ -131,13 +172,31 @@ };

/**
* Concatenate two buffers. Because 0.6.x...
* Returns a readable new stream API stream if the stream is using the
* old API. Otherwise it returns the same stream.
*
* @param (Buffer) buf1
* @param (Buffer) buf2
* @return (Buffer)
* @param {Readable|Stream} stream
* @return {Readable}
*/
function bufferConcat(buf1, buf2) {
var newBuf = new Buffer(buf1.length + buf2.length);
buf1.copy(newBuf);
buf2.copy(newBuf, buf1.length);
return newBuf;
function getReadable(stream) {
var readable;
if (isOldStyleStream(stream)) {
readable = new Readable();
readable.wrap(stream);
stream = readable;
}
return stream;
}
/**
* Returns true if a stream is an old style API stream.
*
* @param {Readable|Stream} stream
* @return {Boolean}
*/
function isOldStyleStream(stream) {
return typeof stream.read !== 'function' ||
typeof stream._read !== 'function' ||
typeof stream.push !== 'function' ||
typeof stream.unshift !== 'function' ||
typeof stream.wrap !== 'function';
}

@@ -5,3 +5,3 @@ {

"keywords": ["stream", "input", "output", "io", "assert", "test"],
"version": "0.1.3",
"version": "0.1.4",
"repository": {

@@ -19,2 +19,5 @@ "type": "git",

},
"dependencies": {
"readable-stream": "~1.0.2"
},
"devDependencies": {

@@ -21,0 +24,0 @@ "mocha": "x",

var streamEqual = require('..')
, assert = require('assert')
, from = require('from')
, PassThrough = require('readable-stream').PassThrough
, fs = require('fs')

@@ -19,4 +19,4 @@ , path = require('path')

*
* @param (Object) options1
* @param (Object) options2
* @param {Object} options1
* @param {Object} options2
*/

@@ -71,5 +71,22 @@ function testEqual(options1, options2) {

it('Streams should not be equal', function(done) {
var stream1 = from('you\'re the man now'.split(' '));
var stream2 = from('you\'re the man now dawg'.split(' '));
var stream1 = new PassThrough();
var stream2 = new PassThrough();
function writeToStream(stream, str) {
var pieces = str.split(' ');
process.nextTick(function next() {
var piece = pieces.shift();
if (piece) {
stream.write(piece);
process.nextTick(next);
} else {
stream.end();
}
});
}
writeToStream(stream1, 'you\'re the man now');
writeToStream(stream2, 'you\'re the man now dawg!');
streamEqual(stream1, stream2, function(err, equal) {

@@ -76,0 +93,0 @@ if (err) return done(err);

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc