Comparing version 1.0.2 to 2.0.0
107
index.js
@@ -1,2 +0,1 @@ | ||
/** | ||
@@ -6,29 +5,15 @@ * Module dependencies. | ||
var duplexer = require('duplexer2'); | ||
var PassThrough = require('stream').PassThrough; | ||
var Readable = require('stream').PassThrough; | ||
var objectAssign = require('object-assign'); | ||
const duplexer = require('duplexer2') | ||
const { PassThrough, Readable } = require('stream') | ||
/** | ||
* Slice reference. | ||
*/ | ||
var slice = [].slice; | ||
/** | ||
* Duplexer options. | ||
*/ | ||
var defaultOpts = { | ||
const defaultOpts = { | ||
bubbleErrors: false, | ||
objectMode: true | ||
}; | ||
} | ||
/** | ||
* Expose `pipe`. | ||
*/ | ||
module.exports = pipe; | ||
/** | ||
* Pipe. | ||
@@ -43,53 +28,59 @@ * | ||
function pipe(streams, opts, cb){ | ||
if (!Array.isArray(streams)) { | ||
streams = slice.call(arguments); | ||
opts = null; | ||
cb = null; | ||
} | ||
const pipe = (...streams) => { | ||
let opts, cb | ||
var lastArg = streams[streams.length - 1]; | ||
if ('function' == typeof lastArg) { | ||
cb = streams.splice(-1)[0]; | ||
lastArg = streams[streams.length - 1]; | ||
if (Array.isArray(streams[0])) { | ||
streams = streams[0] | ||
} | ||
if ('object' == typeof lastArg && typeof lastArg.pipe != 'function') { | ||
opts = streams.splice(-1)[0]; | ||
if (typeof streams[streams.length - 1] === 'function') { | ||
cb = streams.pop() | ||
} | ||
var first = streams[0]; | ||
var last = streams[streams.length - 1]; | ||
var ret; | ||
opts = objectAssign({}, defaultOpts, opts) | ||
if ( | ||
typeof streams[streams.length - 1] === 'object' && | ||
typeof streams[streams.length - 1].pipe !== 'function' | ||
) { | ||
opts = streams.pop() | ||
} | ||
const first = streams[0] | ||
const last = streams[streams.length - 1] | ||
let ret | ||
opts = Object.assign({}, defaultOpts, opts) | ||
if (!first) { | ||
if (cb) process.nextTick(cb); | ||
return new PassThrough(opts); | ||
if (cb) process.nextTick(cb) | ||
return new PassThrough(opts) | ||
} | ||
if (first.writable && last.readable) ret = duplexer(opts, first, last); | ||
else if (streams.length == 1) ret = new Readable(opts).wrap(streams[0]); | ||
else if (first.writable) ret = first; | ||
else if (last.readable) ret = last; | ||
else ret = new PassThrough(opts); | ||
streams.forEach(function(stream, i){ | ||
var next = streams[i+1]; | ||
if (next) stream.pipe(next); | ||
if (stream != ret) stream.on('error', ret.emit.bind(ret, 'error')); | ||
}); | ||
if (first.writable && last.readable) ret = duplexer(opts, first, last) | ||
else if (streams.length === 1) ret = new Readable(opts).wrap(streams[0]) | ||
else if (first.writable) ret = first | ||
else if (last.readable) ret = last | ||
else ret = new PassThrough(opts) | ||
for (const [i, stream] of streams.entries()) { | ||
const next = streams[i + 1] | ||
if (next) stream.pipe(next) | ||
if (stream !== ret) stream.on('error', err => ret.emit('error', err)) | ||
} | ||
if (cb) { | ||
var ended = false; | ||
ret.on('error', end); | ||
last.on('finish', function(){ end() }); | ||
last.on('close', function(){ end() }); | ||
function end(err){ | ||
if (ended) return; | ||
ended = true; | ||
cb(err); | ||
let ended = false | ||
const end = err => { | ||
if (ended) return | ||
ended = true | ||
cb(err) | ||
} | ||
ret.on('error', end) | ||
last.on('finish', () => end()) | ||
last.on('close', () => end()) | ||
} | ||
return ret; | ||
return ret | ||
} | ||
/** | ||
* Expose `pipe`. | ||
*/ | ||
module.exports = pipe |
{ | ||
"name": "multipipe", | ||
"version": "1.0.2", | ||
"version": "2.0.0", | ||
"description": "pipe streams with centralized error handling", | ||
@@ -13,7 +13,9 @@ "license": "MIT", | ||
"mocha": "^3.0.0", | ||
"prettier-standard": "^7.0.3", | ||
"standard": "^10.0.3", | ||
"through2": "^2.0.0" | ||
}, | ||
"scripts": { | ||
"test": "make test" | ||
"test": "prettier-standard '*.js' 'test/*.js' && standard && mocha --reporter spec --timeout 300" | ||
} | ||
} |
@@ -11,17 +11,17 @@ # multipipe | ||
```js | ||
var pipe = require('multipipe'); | ||
const pipe = require('multipipe') | ||
// pipe streams | ||
var stream = pipe(streamA, streamB, streamC); | ||
const stream = pipe(streamA, streamB, streamC) | ||
// centralized error handling | ||
stream.on('error', fn); | ||
stream.on('error', fn) | ||
// creates a new stream | ||
source.pipe(stream).pipe(dest); | ||
source.pipe(stream).pipe(dest) | ||
// optional callback on finish or error | ||
pipe(streamA, streamB, streamC, function(err){ | ||
pipe(streamA, streamB, streamC, err => { | ||
// ... | ||
}); | ||
}) | ||
@@ -39,7 +39,7 @@ // pass options | ||
```js | ||
var stream = pipe(a, b, c); | ||
const stream = pipe(a, b, c) | ||
source | ||
.pipe(stream) | ||
.pipe(destination); | ||
.pipe(destination) | ||
``` | ||
@@ -60,11 +60,11 @@ | ||
```js | ||
var stream = pipe(a, b, c); | ||
const stream = pipe(a, b, c) | ||
stream.on('error', function(err){ | ||
stream.on('error', err => { | ||
// called three times | ||
}); | ||
}) | ||
a.emit('error', new Error); | ||
b.emit('error', new Error); | ||
c.emit('error', new Error); | ||
a.emit('error', new Error) | ||
b.emit('error', new Error) | ||
c.emit('error', new Error) | ||
``` | ||
@@ -71,0 +71,0 @@ |
@@ -1,201 +0,214 @@ | ||
var assert = require('assert'); | ||
var pipe = require('..'); | ||
var Stream = require('stream'); | ||
var through = require('through2'); | ||
/* global describe, it */ | ||
describe('pipe()', function(){ | ||
it('should return a stream', function(done){ | ||
assert(pipe(done)); | ||
}); | ||
it('should accept options', function(){ | ||
const assert = require('assert') | ||
const pipe = require('..') | ||
const Stream = require('stream') | ||
const through = require('through2') | ||
const Readable = () => { | ||
const readable = new Stream.Readable({ objectMode: true }) | ||
readable._read = function () { | ||
this.push('a') | ||
this.push(null) | ||
} | ||
return readable | ||
} | ||
const Transform = () => { | ||
const transform = new Stream.Transform({ objectMode: true }) | ||
transform._transform = (chunk, _, done) => { | ||
done(null, chunk.toUpperCase()) | ||
} | ||
return transform | ||
} | ||
const Writable = cb => { | ||
const writable = new Stream.Writable({ objectMode: true }) | ||
writable._write = (chunk, _, done) => { | ||
assert.equal(chunk, 'A') | ||
done() | ||
cb && cb() | ||
} | ||
return writable | ||
} | ||
describe('pipe()', () => { | ||
it('should return a stream', done => { | ||
assert(pipe(done)) | ||
}) | ||
it('should accept options', () => { | ||
assert.equal(pipe({ objectMode: false })._readableState.objectMode, false) | ||
}); | ||
}); | ||
}) | ||
}) | ||
describe('pipe(a)', function(){ | ||
it('should pass through to a', function(done){ | ||
Readable().pipe(pipe(Transform())).pipe(Writable(done)) | ||
}); | ||
it('should accept options', function(){ | ||
var readable = Readable({ objectMode: true }); | ||
assert.equal(pipe(readable, { objectMode: false })._readableState.objectMode, false) | ||
}); | ||
}); | ||
describe('pipe(a)', () => { | ||
it('should pass through to a', done => { | ||
Readable() | ||
.pipe(pipe(Transform())) | ||
.pipe(Writable(done)) | ||
}) | ||
it('should accept options', () => { | ||
const readable = Readable({ objectMode: true }) | ||
assert.equal( | ||
pipe(readable, { objectMode: false })._readableState.objectMode, | ||
false | ||
) | ||
}) | ||
}) | ||
describe('pipe(a, b, c)', function(){ | ||
it('should pipe internally', function(done){ | ||
pipe(Readable(), Transform(), Writable(done)); | ||
}); | ||
it('should be writable', function(done){ | ||
var stream = pipe(Transform(), Writable(done)); | ||
assert(stream.writable); | ||
Readable().pipe(stream); | ||
}); | ||
describe('pipe(a, b, c)', () => { | ||
it('should pipe internally', done => { | ||
pipe(Readable(), Transform(), Writable(done)) | ||
}) | ||
it('should be readable', function(done){ | ||
var stream = pipe(Readable(), Transform()); | ||
assert(stream.readable); | ||
stream.pipe(Writable(done)); | ||
}); | ||
it('should be readable and writable', function(done){ | ||
var stream = pipe(Transform(), Transform()); | ||
assert(stream.readable); | ||
assert(stream.writable); | ||
it('should be writable', done => { | ||
const stream = pipe(Transform(), Writable(done)) | ||
assert(stream.writable) | ||
Readable().pipe(stream) | ||
}) | ||
it('should be readable', done => { | ||
const stream = pipe(Readable(), Transform()) | ||
assert(stream.readable) | ||
stream.pipe(Writable(done)) | ||
}) | ||
it('should be readable and writable', done => { | ||
const stream = pipe(Transform(), Transform()) | ||
assert(stream.readable) | ||
assert(stream.writable) | ||
Readable() | ||
.pipe(stream) | ||
.pipe(Writable(done)); | ||
}); | ||
describe('errors', function(){ | ||
it('should reemit', function(done){ | ||
var a = Transform(); | ||
var b = Transform(); | ||
var c = Transform(); | ||
var stream = pipe(a, b, c); | ||
var err = new Error; | ||
var i = 0; | ||
stream.on('error', function(_err){ | ||
i++; | ||
assert.equal(_err, err); | ||
assert(i <= 3); | ||
if (i == 3) done(); | ||
}); | ||
a.emit('error', err); | ||
b.emit('error', err); | ||
c.emit('error', err); | ||
}); | ||
.pipe(stream) | ||
.pipe(Writable(done)) | ||
}) | ||
it('should not reemit endlessly', function(done){ | ||
var a = Transform(); | ||
var b = Transform(); | ||
var c = Transform(); | ||
c.readable = false; | ||
var stream = pipe(a, b, c); | ||
var err = new Error; | ||
var i = 0; | ||
stream.on('error', function(_err){ | ||
i++; | ||
assert.equal(_err, err); | ||
assert(i <= 3); | ||
if (i == 3) done(); | ||
}); | ||
a.emit('error', err); | ||
b.emit('error', err); | ||
c.emit('error', err); | ||
}); | ||
}); | ||
it('should accept options', function(){ | ||
var a = Readable() | ||
var b = Transform() | ||
var c = Writable() | ||
assert.equal(pipe(a, b, c, { objectMode: false })._readableState.objectMode, false) | ||
}); | ||
}); | ||
describe('errors', () => { | ||
it('should reemit', done => { | ||
const a = Transform() | ||
const b = Transform() | ||
const c = Transform() | ||
const stream = pipe(a, b, c) | ||
const err = new Error() | ||
let i = 0 | ||
describe('pipe(a, b, c, fn)', function(){ | ||
it('should call on finish', function(done){ | ||
var finished = false; | ||
var a = Readable(); | ||
var b = Transform(); | ||
var c = Writable(function(){ | ||
finished = true; | ||
}); | ||
stream.on('error', _err => { | ||
i++ | ||
assert.equal(_err, err) | ||
assert(i <= 3) | ||
if (i === 3) done() | ||
}) | ||
pipe(a, b, c, function(err){ | ||
assert(!err); | ||
assert(finished); | ||
done(); | ||
}); | ||
}); | ||
a.emit('error', err) | ||
b.emit('error', err) | ||
c.emit('error', err) | ||
}) | ||
it('should call with error once', function(done){ | ||
var a = Readable(); | ||
var b = Transform(); | ||
var c = Writable(); | ||
var err = new Error; | ||
it('should not reemit endlessly', done => { | ||
const a = Transform() | ||
const b = Transform() | ||
const c = Transform() | ||
c.readable = false | ||
const stream = pipe(a, b, c) | ||
const err = new Error() | ||
let i = 0 | ||
pipe(a, b, c, function(err){ | ||
assert(err); | ||
done(); | ||
}); | ||
stream.on('error', function (_err) { | ||
i++ | ||
assert.equal(_err, err) | ||
assert(i <= 3) | ||
if (i === 3) done() | ||
}) | ||
a.emit('error', err); | ||
b.emit('error', err); | ||
c.emit('error', err); | ||
}); | ||
a.emit('error', err) | ||
b.emit('error', err) | ||
c.emit('error', err) | ||
}) | ||
}) | ||
it('should accept options', () => { | ||
const a = Readable() | ||
const b = Transform() | ||
const c = Writable() | ||
assert.equal( | ||
pipe(a, b, c, { objectMode: false })._readableState.objectMode, | ||
false | ||
) | ||
}) | ||
}) | ||
it('should call on destroy', function(done){ | ||
var a = Readable(); | ||
var b = Transform(); | ||
var c = through(); | ||
describe('pipe(a, b, c, fn)', () => { | ||
it('should call on finish', done => { | ||
let finished = false | ||
const a = Readable() | ||
const b = Transform() | ||
const c = Writable(function () { | ||
finished = true | ||
}) | ||
pipe(a, b, c, function(err){ | ||
assert(!err); | ||
done(); | ||
}); | ||
pipe(a, b, c, err => { | ||
assert(!err) | ||
assert(finished) | ||
done() | ||
}) | ||
}) | ||
c.destroy(); | ||
}); | ||
it('should call with error once', done => { | ||
const a = Readable() | ||
const b = Transform() | ||
const c = Writable() | ||
const err = new Error() | ||
it('should call on destroy with error', function(done){ | ||
var a = Readable(); | ||
var b = Transform(); | ||
var c = through(); | ||
var err = new Error; | ||
pipe(a, b, c, err => { | ||
assert(err) | ||
done() | ||
}) | ||
pipe(a, b, c, function(_err){ | ||
assert.equal(_err, err); | ||
done(); | ||
}); | ||
a.emit('error', err) | ||
b.emit('error', err) | ||
c.emit('error', err) | ||
}) | ||
c.destroy(err); | ||
}); | ||
it('should call on destroy', done => { | ||
const a = Readable() | ||
const b = Transform() | ||
const c = through() | ||
it('should accept options', function(done){ | ||
var a = Readable() | ||
var b = Transform() | ||
var c = Writable() | ||
assert.equal(pipe(a, b, c, { objectMode: false }, done)._readableState.objectMode, false) | ||
}); | ||
pipe(a, b, c, err => { | ||
assert(!err) | ||
done() | ||
}) | ||
it('should ignore parameters on non error events', function(done){ | ||
var a = Readable(); | ||
var b = Transform(); | ||
var c = Writable(); | ||
pipe(a, b, c, done); | ||
c.emit('finish', true); | ||
}); | ||
}); | ||
c.destroy() | ||
}) | ||
function Readable(){ | ||
var readable = new Stream.Readable({ objectMode: true }); | ||
readable._read = function(){ | ||
this.push('a'); | ||
this.push(null); | ||
}; | ||
return readable; | ||
} | ||
it('should call on destroy with error', done => { | ||
const a = Readable() | ||
const b = Transform() | ||
const c = through() | ||
const err = new Error() | ||
function Transform(){ | ||
var transform = new Stream.Transform({ objectMode: true }); | ||
transform._transform = function(chunk, _, done){ | ||
done(null, chunk.toUpperCase()); | ||
}; | ||
return transform; | ||
} | ||
pipe(a, b, c, _err => { | ||
assert.equal(_err, err) | ||
done() | ||
}) | ||
function Writable(cb){ | ||
var writable = new Stream.Writable({ objectMode: true }); | ||
writable._write = function(chunk, _, done){ | ||
assert.equal(chunk, 'A'); | ||
done(); | ||
cb && cb(); | ||
}; | ||
return writable; | ||
} | ||
c.destroy(err) | ||
}) | ||
it('should accept options', done => { | ||
const a = Readable() | ||
const b = Transform() | ||
const c = Writable() | ||
assert.equal( | ||
pipe(a, b, c, { objectMode: false }, done)._readableState.objectMode, | ||
false | ||
) | ||
}) | ||
it('should ignore parameters on non error events', done => { | ||
const a = Readable() | ||
const b = Transform() | ||
const c = Writable() | ||
pipe(a, b, c, done) | ||
c.emit('finish', true) | ||
}) | ||
}) |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
255
10339
4
6
1