Comparing version 1.2.1 to 1.2.2
338
lib/pipe.js
@@ -1,206 +0,178 @@ | ||
(function () { | ||
'use strict'; | ||
'use strict'; | ||
var fs = require('fs'); | ||
var zlib = require('zlib'); | ||
var assert = require('assert'); | ||
var pullout = require('pullout/legacy'); | ||
module.exports = all; | ||
module.exports.getBody = function getBody(readStream, callback) { | ||
return pullout(readStream, 'string', callback); | ||
} | ||
function all(streams, options, callback) { | ||
if (!callback) { | ||
callback = options; | ||
options = { | ||
end: true | ||
}; | ||
} else if (typeof options.end === 'undefined') | ||
options.end = true; | ||
var fs = require('fs'), | ||
zlib = require('zlib'), | ||
assert = require('assert'); | ||
assert(streams, 'streams could not be empty!'); | ||
assert(callback, 'callback could not be empty!'); | ||
module.exports = all; | ||
module.exports.getBody = getBody; | ||
function all(streams, options, callback) { | ||
if (!callback) { | ||
callback = options; | ||
options = { | ||
end: true | ||
}; | ||
} else if (typeof options.end === 'undefined') | ||
options.end = true; | ||
pipe(streams, options, callback); | ||
} | ||
function pipe(allStreams, options, callback) { | ||
var error, finish, end, open, | ||
readError, writeError, | ||
assert(streams, 'streams could not be empty!'); | ||
assert(callback, 'callback could not be empty!'); | ||
streams = allStreams.slice(), | ||
read = streams.shift(), | ||
write = streams.pop(), | ||
pipe(streams, options, callback); | ||
} | ||
function pipe(allStreams, options, callback) { | ||
var error, finish, end, open, | ||
readError, writeError, | ||
streams = allStreams.slice(), | ||
read = streams.shift(), | ||
write = streams.pop(), | ||
isFsRead = read instanceof fs.ReadStream, | ||
isFsWrite = write instanceof fs.WriteStream, | ||
isGunzip = write instanceof zlib.Gunzip, | ||
rm = function(event, stream, fn) { | ||
stream.removeListener(event, fn); | ||
}, | ||
rmAll = function() { | ||
rm('error', write, onWriteError); | ||
rm('error', read, onReadError); | ||
rm('end', read, onReadEnd); | ||
rm('finish', write, onWriteFinish); | ||
}; | ||
isFsRead = read instanceof fs.ReadStream, | ||
isFsWrite = write instanceof fs.WriteStream, | ||
isGunzip = write instanceof zlib.Gunzip, | ||
read.on('end', onReadEnd); | ||
read.on('error', onReadError); | ||
rm = function(event, stream, fn) { | ||
stream.removeListener(event, fn); | ||
}, | ||
if (options.end) | ||
write.on('finish', onWriteFinish); | ||
rmAll = function() { | ||
rm('error', write, onWriteError); | ||
rm('error', read, onReadError); | ||
rm('end', read, onReadEnd); | ||
rm('finish', write, onWriteFinish); | ||
}; | ||
read.on('end', onReadEnd); | ||
read.on('error', onReadError); | ||
if (options.end) | ||
write.on('finish', onWriteFinish); | ||
callWhenOpen(write, function(e) { | ||
if (e) | ||
onWriteError(e); | ||
callWhenOpen(write, function(e) { | ||
if (e) | ||
onWriteError(e); | ||
if (end && readError) { | ||
onEnd(); | ||
} else { | ||
open = true; | ||
write.on('error', onWriteError); | ||
if (end && readError) { | ||
onEnd(); | ||
} else { | ||
open = true; | ||
write.on('error', onWriteError); | ||
setListeners(streams, onError); | ||
fullPipe(allStreams, options); | ||
} | ||
}); | ||
function onWriteError(error) { | ||
writeError = true; | ||
finish = true; | ||
onError(error); | ||
onResult(); | ||
setListeners(streams, onError); | ||
fullPipe(allStreams, options); | ||
} | ||
function onReadError(error) { | ||
readError = true; | ||
end = true; | ||
onError(error); | ||
onResult(); | ||
} | ||
function onReadEnd() { | ||
end = true; | ||
onResult(); | ||
} | ||
function onWriteFinish() { | ||
finish = true; | ||
onResult(); | ||
} | ||
}); | ||
function onWriteError(error) { | ||
writeError = true; | ||
finish = true; | ||
onError(error); | ||
onResult(); | ||
} | ||
function onError(e) { | ||
error = e; | ||
} | ||
function onReadError(error) { | ||
readError = true; | ||
end = true; | ||
onError(error); | ||
onResult(); | ||
} | ||
function onResult() { | ||
var justEnd = end && !isFsWrite, | ||
justFinish = write && !isFsRead, | ||
bothFinish = end && finish; | ||
if (readError && finish) { | ||
onEnd(); | ||
}else if (writeError && end) { | ||
onEnd(); | ||
} else if (writeError && isGunzip) { | ||
onEnd(); | ||
} else if (open && readError) { | ||
onEnd(); | ||
} else if (bothFinish || justEnd || justFinish) { | ||
onEnd(); | ||
} | ||
} | ||
function onReadEnd() { | ||
end = true; | ||
onResult(); | ||
} | ||
function onEnd() { | ||
rmAll(); | ||
unsetListeners(streams, onError); | ||
callback(error); | ||
} | ||
function onWriteFinish() { | ||
finish = true; | ||
onResult(); | ||
} | ||
/* | ||
* when stream is fs.WriteStream | ||
* finish event could be emitted before | ||
* open and then everything crash | ||
*/ | ||
function callWhenOpen(stream, fn) { | ||
var isFsWrite = stream instanceof fs.WriteStream, | ||
on = function(error) { | ||
var isError = error instanceof Error; | ||
stream.removeListener('open', on); | ||
stream.removeListener('error', on); | ||
if (isError) | ||
fn(error); | ||
else | ||
fn(); | ||
}; | ||
function onError(e) { | ||
error = e; | ||
} | ||
function onResult() { | ||
var justEnd = end && !isFsWrite, | ||
justFinish = write && !isFsRead, | ||
bothFinish = end && finish; | ||
if (!isFsWrite) { | ||
fn(); | ||
} else { | ||
stream.on('open', on); | ||
stream.on('error', on); | ||
if (readError && finish) { | ||
onEnd(); | ||
}else if (writeError && end) { | ||
onEnd(); | ||
} else if (writeError && isGunzip) { | ||
onEnd(); | ||
} else if (open && readError) { | ||
onEnd(); | ||
} else if (bothFinish || justEnd || justFinish) { | ||
onEnd(); | ||
} | ||
} | ||
function fullPipe(streams, options) { | ||
var main; | ||
function onEnd() { | ||
rmAll(); | ||
unsetListeners(streams, onError); | ||
streams.forEach(function(stream) { | ||
if (!main) | ||
main = stream; | ||
callback(error); | ||
} | ||
} | ||
/* | ||
* when stream is fs.WriteStream | ||
* finish event could be emitted before | ||
* open and then everything crash | ||
*/ | ||
function callWhenOpen(stream, fn) { | ||
var isFsWrite = stream instanceof fs.WriteStream, | ||
on = function(error) { | ||
var isError = error instanceof Error; | ||
stream.removeListener('open', on); | ||
stream.removeListener('error', on); | ||
if (isError) | ||
fn(error); | ||
else | ||
main = main.pipe(stream, { | ||
end: options.end | ||
}); | ||
}); | ||
} | ||
fn(); | ||
}; | ||
function setListeners(streams, fn) { | ||
streams.forEach(function(stream) { | ||
stream.on('error', fn); | ||
}); | ||
if (!isFsWrite) { | ||
fn(); | ||
} else { | ||
stream.on('open', on); | ||
stream.on('error', on); | ||
} | ||
} | ||
function fullPipe(streams, options) { | ||
var main; | ||
function unsetListeners(streams, fn) { | ||
streams.forEach(function(stream) { | ||
stream.removeListener('error', fn); | ||
}); | ||
} | ||
/** | ||
* get body of readStream | ||
* | ||
* @param readStream | ||
* @param callback | ||
*/ | ||
function getBody(readStream, callback) { | ||
var error, | ||
body = ''; | ||
assert(readStream, 'could not be empty!'); | ||
assert(callback, 'could not be empty!'); | ||
readStream.on('data', onData); | ||
readStream.on('error', onEnd); | ||
readStream.on('end', onEnd); | ||
function onData(chunk) { | ||
body += chunk; | ||
} | ||
function onEnd(error) { | ||
readStream.removeListener('data', onData); | ||
readStream.removeListener('error', onEnd); | ||
readStream.removeListener('end', onEnd); | ||
callback(error, body); | ||
} | ||
} | ||
})(); | ||
streams.forEach(function(stream) { | ||
if (!main) | ||
main = stream; | ||
else | ||
main = main.pipe(stream, { | ||
end: options.end | ||
}); | ||
}); | ||
} | ||
function setListeners(streams, fn) { | ||
streams.forEach(function(stream) { | ||
stream.on('error', fn); | ||
}); | ||
} | ||
function unsetListeners(streams, fn) { | ||
streams.forEach(function(stream) { | ||
stream.removeListener('error', fn); | ||
}); | ||
} |
{ | ||
"name": "pipe-io", | ||
"version": "1.2.1", | ||
"version": "1.2.2", | ||
"author": "coderaiser <mnemonic.enemy@gmail.com> (https://github.com/coderaiser)", | ||
@@ -16,3 +16,7 @@ "description": "Pipe streams and handle events", | ||
"scripts": { | ||
"lint": "redrun lint:*", | ||
"lint:lib": "eslint lib", | ||
"lint:test": "eslint --rule 'no-console:0' test", | ||
"test": "tape test/*.js", | ||
"watch:test": "nodemon -w lib -w test -x \"npm test\"", | ||
"coverage": "nyc npm test", | ||
@@ -29,5 +33,9 @@ "report": "nyc report --reporter=text-lcov | coveralls" | ||
"coveralls": "^2.11.6", | ||
"nyc": "^5.0.1", | ||
"eslint": "^3.8.1", | ||
"nodemon": "^1.11.0", | ||
"nyc": "^8.3.2", | ||
"pullout": "^1.0.0", | ||
"redrun": "^5.9.2", | ||
"tape": "^4.2.0" | ||
} | ||
} |
@@ -1,2 +0,2 @@ | ||
Pipe-io [![License][LicenseIMGURL]][LicenseURL] [![NPM version][NPMIMGURL]][NPMURL] [![Dependency Status][DependencyStatusIMGURL]][DependencyStatusURL] [![Build Status][BuildStatusIMGURL]][BuildStatusURL] | ||
Pipe-io [![License][LicenseIMGURL]][LicenseURL] [![NPM version][NPMIMGURL]][NPMURL] [![Dependency Status][DependencyStatusIMGURL]][DependencyStatusURL] [![Build Status][BuildStatusIMGURL]][BuildStatusURL] [![Coverage Status][CoverageIMGURL]][CoverageURL] | ||
========= | ||
@@ -43,3 +43,6 @@ Pipe [streams](https://github.com/substack/stream-handbook) and handle events. | ||
``` | ||
## Related | ||
- [Pullout](https://github.com/coderaiser/pullout "Pullout") - pull out data from stream | ||
## License | ||
@@ -52,2 +55,3 @@ MIT | ||
[LicenseIMGURL]: https://img.shields.io/badge/license-MIT-317BF9.svg?style=flat | ||
[CoverageIMGURL]: https://coveralls.io/repos/coderaiser/pipe-io/badge.svg?branch=master&service=github | ||
[NPMURL]: https://npmjs.org/package/pipe-io "npm" | ||
@@ -57,2 +61,2 @@ [BuildStatusURL]: https://travis-ci.org/coderaiser/pipe-io "Build Status" | ||
[LicenseURL]: https://tldrlegal.com/license/mit-license "MIT License" | ||
[CoverageURL]: https://coveralls.io/github/coderaiser/pipe-io?branch=master |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Network access
Supply chain riskThis module accesses the network.
Found 1 instance in 1 package
60
1
11712
7
5
144