Socket
Socket
Sign inDemoInstall

pipe-io

Package Overview
Dependencies
Maintainers
1
Versions
78
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pipe-io - npm Package Compare versions

Comparing version 1.2.1 to 1.2.2

338

lib/pipe.js

@@ -1,206 +0,178 @@

(function () {
'use strict';
'use strict';
var fs = require('fs');
var zlib = require('zlib');
var assert = require('assert');
var pullout = require('pullout/legacy');
module.exports = all;
module.exports.getBody = function getBody(readStream, callback) {
return pullout(readStream, 'string', callback);
}
function all(streams, options, callback) {
if (!callback) {
callback = options;
options = {
end: true
};
} else if (typeof options.end === 'undefined')
options.end = true;
var fs = require('fs'),
zlib = require('zlib'),
assert = require('assert');
assert(streams, 'streams could not be empty!');
assert(callback, 'callback could not be empty!');
module.exports = all;
module.exports.getBody = getBody;
function all(streams, options, callback) {
if (!callback) {
callback = options;
options = {
end: true
};
} else if (typeof options.end === 'undefined')
options.end = true;
pipe(streams, options, callback);
}
function pipe(allStreams, options, callback) {
var error, finish, end, open,
readError, writeError,
assert(streams, 'streams could not be empty!');
assert(callback, 'callback could not be empty!');
streams = allStreams.slice(),
read = streams.shift(),
write = streams.pop(),
pipe(streams, options, callback);
}
function pipe(allStreams, options, callback) {
var error, finish, end, open,
readError, writeError,
streams = allStreams.slice(),
read = streams.shift(),
write = streams.pop(),
isFsRead = read instanceof fs.ReadStream,
isFsWrite = write instanceof fs.WriteStream,
isGunzip = write instanceof zlib.Gunzip,
rm = function(event, stream, fn) {
stream.removeListener(event, fn);
},
rmAll = function() {
rm('error', write, onWriteError);
rm('error', read, onReadError);
rm('end', read, onReadEnd);
rm('finish', write, onWriteFinish);
};
isFsRead = read instanceof fs.ReadStream,
isFsWrite = write instanceof fs.WriteStream,
isGunzip = write instanceof zlib.Gunzip,
read.on('end', onReadEnd);
read.on('error', onReadError);
rm = function(event, stream, fn) {
stream.removeListener(event, fn);
},
if (options.end)
write.on('finish', onWriteFinish);
rmAll = function() {
rm('error', write, onWriteError);
rm('error', read, onReadError);
rm('end', read, onReadEnd);
rm('finish', write, onWriteFinish);
};
read.on('end', onReadEnd);
read.on('error', onReadError);
if (options.end)
write.on('finish', onWriteFinish);
callWhenOpen(write, function(e) {
if (e)
onWriteError(e);
callWhenOpen(write, function(e) {
if (e)
onWriteError(e);
if (end && readError) {
onEnd();
} else {
open = true;
write.on('error', onWriteError);
if (end && readError) {
onEnd();
} else {
open = true;
write.on('error', onWriteError);
setListeners(streams, onError);
fullPipe(allStreams, options);
}
});
function onWriteError(error) {
writeError = true;
finish = true;
onError(error);
onResult();
setListeners(streams, onError);
fullPipe(allStreams, options);
}
function onReadError(error) {
readError = true;
end = true;
onError(error);
onResult();
}
function onReadEnd() {
end = true;
onResult();
}
function onWriteFinish() {
finish = true;
onResult();
}
});
function onWriteError(error) {
writeError = true;
finish = true;
onError(error);
onResult();
}
function onError(e) {
error = e;
}
function onReadError(error) {
readError = true;
end = true;
onError(error);
onResult();
}
function onResult() {
var justEnd = end && !isFsWrite,
justFinish = write && !isFsRead,
bothFinish = end && finish;
if (readError && finish) {
onEnd();
}else if (writeError && end) {
onEnd();
} else if (writeError && isGunzip) {
onEnd();
} else if (open && readError) {
onEnd();
} else if (bothFinish || justEnd || justFinish) {
onEnd();
}
}
function onReadEnd() {
end = true;
onResult();
}
function onEnd() {
rmAll();
unsetListeners(streams, onError);
callback(error);
}
function onWriteFinish() {
finish = true;
onResult();
}
/*
* when stream is fs.WriteStream
* finish event could be emitted before
* open and then everything crash
*/
function callWhenOpen(stream, fn) {
var isFsWrite = stream instanceof fs.WriteStream,
on = function(error) {
var isError = error instanceof Error;
stream.removeListener('open', on);
stream.removeListener('error', on);
if (isError)
fn(error);
else
fn();
};
function onError(e) {
error = e;
}
function onResult() {
var justEnd = end && !isFsWrite,
justFinish = write && !isFsRead,
bothFinish = end && finish;
if (!isFsWrite) {
fn();
} else {
stream.on('open', on);
stream.on('error', on);
if (readError && finish) {
onEnd();
}else if (writeError && end) {
onEnd();
} else if (writeError && isGunzip) {
onEnd();
} else if (open && readError) {
onEnd();
} else if (bothFinish || justEnd || justFinish) {
onEnd();
}
}
function fullPipe(streams, options) {
var main;
function onEnd() {
rmAll();
unsetListeners(streams, onError);
streams.forEach(function(stream) {
if (!main)
main = stream;
callback(error);
}
}
/*
* when stream is fs.WriteStream
* finish event could be emitted before
* open and then everything crash
*/
function callWhenOpen(stream, fn) {
var isFsWrite = stream instanceof fs.WriteStream,
on = function(error) {
var isError = error instanceof Error;
stream.removeListener('open', on);
stream.removeListener('error', on);
if (isError)
fn(error);
else
main = main.pipe(stream, {
end: options.end
});
});
}
fn();
};
function setListeners(streams, fn) {
streams.forEach(function(stream) {
stream.on('error', fn);
});
if (!isFsWrite) {
fn();
} else {
stream.on('open', on);
stream.on('error', on);
}
}
function fullPipe(streams, options) {
var main;
function unsetListeners(streams, fn) {
streams.forEach(function(stream) {
stream.removeListener('error', fn);
});
}
/**
* get body of readStream
*
* @param readStream
* @param callback
*/
function getBody(readStream, callback) {
var error,
body = '';
assert(readStream, 'could not be empty!');
assert(callback, 'could not be empty!');
readStream.on('data', onData);
readStream.on('error', onEnd);
readStream.on('end', onEnd);
function onData(chunk) {
body += chunk;
}
function onEnd(error) {
readStream.removeListener('data', onData);
readStream.removeListener('error', onEnd);
readStream.removeListener('end', onEnd);
callback(error, body);
}
}
})();
streams.forEach(function(stream) {
if (!main)
main = stream;
else
main = main.pipe(stream, {
end: options.end
});
});
}
function setListeners(streams, fn) {
streams.forEach(function(stream) {
stream.on('error', fn);
});
}
function unsetListeners(streams, fn) {
streams.forEach(function(stream) {
stream.removeListener('error', fn);
});
}
{
"name": "pipe-io",
"version": "1.2.1",
"version": "1.2.2",
"author": "coderaiser <mnemonic.enemy@gmail.com> (https://github.com/coderaiser)",

@@ -16,3 +16,7 @@ "description": "Pipe streams and handle events",

"scripts": {
"lint": "redrun lint:*",
"lint:lib": "eslint lib",
"lint:test": "eslint --rule 'no-console:0' test",
"test": "tape test/*.js",
"watch:test": "nodemon -w lib -w test -x \"npm test\"",
"coverage": "nyc npm test",

@@ -29,5 +33,9 @@ "report": "nyc report --reporter=text-lcov | coveralls"

"coveralls": "^2.11.6",
"nyc": "^5.0.1",
"eslint": "^3.8.1",
"nodemon": "^1.11.0",
"nyc": "^8.3.2",
"pullout": "^1.0.0",
"redrun": "^5.9.2",
"tape": "^4.2.0"
}
}

@@ -1,2 +0,2 @@

Pipe-io [![License][LicenseIMGURL]][LicenseURL] [![NPM version][NPMIMGURL]][NPMURL] [![Dependency Status][DependencyStatusIMGURL]][DependencyStatusURL] [![Build Status][BuildStatusIMGURL]][BuildStatusURL]
Pipe-io [![License][LicenseIMGURL]][LicenseURL] [![NPM version][NPMIMGURL]][NPMURL] [![Dependency Status][DependencyStatusIMGURL]][DependencyStatusURL] [![Build Status][BuildStatusIMGURL]][BuildStatusURL] [![Coverage Status][CoverageIMGURL]][CoverageURL]
=========

@@ -43,3 +43,6 @@ Pipe [streams](https://github.com/substack/stream-handbook) and handle events.

```
## Related
- [Pullout](https://github.com/coderaiser/pullout "Pullout") - pull out data from stream
## License

@@ -52,2 +55,3 @@ MIT

[LicenseIMGURL]: https://img.shields.io/badge/license-MIT-317BF9.svg?style=flat
[CoverageIMGURL]: https://coveralls.io/repos/coderaiser/pipe-io/badge.svg?branch=master&service=github
[NPMURL]: https://npmjs.org/package/pipe-io "npm"

@@ -57,2 +61,2 @@ [BuildStatusURL]: https://travis-ci.org/coderaiser/pipe-io "Build Status"

[LicenseURL]: https://tldrlegal.com/license/mit-license "MIT License"
[CoverageURL]: https://coveralls.io/github/coderaiser/pipe-io?branch=master

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc