promisepipe
Advanced tools
Comparing version 2.1.3 to 3.0.0
46
index.js
@@ -12,11 +12,9 @@ 'use strict'; | ||
const allEvents = ['error', 'end', 'close', 'finish']; | ||
const writableEvents = ['error', 'close', 'finish']; | ||
const readableEvents = ['error', 'end', 'close']; | ||
const events = ['error', 'end', 'close', 'finish']; | ||
function cleanupEventHandlers(stream, listener) { | ||
allEvents.map(e => stream.removeListener(e, listener)); | ||
events.map(e => stream.removeListener(e, listener)); | ||
} | ||
function streamPromise(stream) { | ||
function streamPromise(stream, state) { | ||
if (stream === process.stdout || stream === process.stderr) { | ||
@@ -28,3 +26,3 @@ return Promise.resolve(stream); | ||
// and https://github.com/epeli/node-promisepipe/issues/15 | ||
const events = stream.readable || typeof stream._read === 'function' ? readableEvents : writableEvents; | ||
const isReadable = stream.readable || typeof stream._read === 'function'; | ||
@@ -36,2 +34,9 @@ function on(evt) { | ||
() => { | ||
// For readable streams, we ignore the "finish" event. However, if there | ||
// already was an error on another stream, the "end" event may never come, | ||
// so in that case we accept "finish" too. | ||
if (isReadable && evt === 'finish' && !state.error) { | ||
return; | ||
} | ||
cleanupEventHandlers(stream, fn); | ||
@@ -61,10 +66,35 @@ resolve(stream); | ||
allStreams.reduce((current, next) => current.pipe(next)); | ||
return Promise.all(allStreams.map(streamPromise)); | ||
return allStreamsDone(streams); | ||
} | ||
function allStreamsDone(allStreams) { | ||
let state = {}; | ||
let firstRejection; | ||
return Promise.all(allStreams.map(stream => streamPromise(stream, state).catch((e) => { | ||
if (!firstRejection) { | ||
firstRejection = e; | ||
state.error = true; | ||
// Close all streams as they are not closed automatically on error. | ||
allStreams.forEach(stream => { | ||
if (stream !== process.stdout && stream !== process.stderr) { | ||
stream.destroy(); | ||
} | ||
}); | ||
} | ||
}))).then((allResults) => { | ||
if (firstRejection) { | ||
throw firstRejection; | ||
} | ||
return allResults; | ||
}); | ||
} | ||
module.exports = Object.assign(promisePipe, { | ||
__esModule: true, | ||
default: promisePipe, | ||
justPromise: streams => Promise.all(streams.map(streamPromise)), | ||
justPromise: streams => allStreamsDone(streams), | ||
StreamError, | ||
}); |
{ | ||
"name": "promisepipe", | ||
"version": "2.1.3", | ||
"version": "3.0.0", | ||
"description": "Pipe node.js streams safely with Promises", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
@@ -22,3 +22,7 @@ # promisePipe | ||
- `message`: The error message from original error | ||
Note: the last stream in the chain needs to be a writable stream, not a duplex/transform stream. If you use a 3rd party library which returns deplux streams instead of writable streams, you'll need to add something like [`.pipe(devnull())`](https://www.npmjs.com/package/dev-null) to the end, otherwise the promise will never resolve ([#16](https://github.com/epeli/node-promisepipe/issues/16)). | ||
Starting with v3, all streams are destroyed if there's an error to prevent memory leaks. | ||
## Example | ||
@@ -25,0 +29,0 @@ |
25
test.js
@@ -118,3 +118,3 @@ /*global it, describe, beforeEach */ | ||
assert(err); | ||
assert.equal(err.originalError.code, "EACCES"); | ||
assert.ok([ "EACCES", "EPERM" ].includes(err.originalError.code)); | ||
}); | ||
@@ -169,2 +169,25 @@ }); | ||
}); | ||
it("closes streams on errors", function() { | ||
var input = fs.createReadStream(INPUT + ".x"); | ||
var output = fs.createWriteStream(OUTPUT); | ||
return promisePipe(input, new Upcase(), output) | ||
.catch(err => err) | ||
.then(function() { | ||
return fs.unlink(OUTPUT); | ||
}); | ||
}); | ||
it("resolves chains with transform streams on error", function() { | ||
var input = fs.createReadStream(INPUT); | ||
var output = fs.createWriteStream("/bad"); | ||
return promisePipe(input, new Upcase(), output) | ||
.catch(err => err) | ||
.then(function(err) { | ||
assert(err); | ||
assert.ok([ "EACCES", "EPERM" ].includes(err.originalError.code)); | ||
}); | ||
}); | ||
}); | ||
@@ -171,0 +194,0 @@ |
Sorry, the diff of this file is not supported yet
13064
249
93