Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

multipipe

Package Overview
Dependencies
Maintainers
1
Versions
19
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

multipipe - npm Package Compare versions

Comparing version 1.0.2 to 2.0.0

107

index.js

@@ -1,2 +0,1 @@

/**

@@ -6,29 +5,15 @@ * Module dependencies.

var duplexer = require('duplexer2');
var PassThrough = require('stream').PassThrough;
var Readable = require('stream').PassThrough;
var objectAssign = require('object-assign');
const duplexer = require('duplexer2')
const { PassThrough, Readable } = require('stream')
/**
* Slice reference.
*/
var slice = [].slice;
/**
* Duplexer options.
*/
var defaultOpts = {
const defaultOpts = {
bubbleErrors: false,
objectMode: true
};
}
/**
* Expose `pipe`.
*/
module.exports = pipe;
/**
* Pipe.

@@ -43,53 +28,59 @@ *

function pipe(streams, opts, cb){
if (!Array.isArray(streams)) {
streams = slice.call(arguments);
opts = null;
cb = null;
}
const pipe = (...streams) => {
let opts, cb
var lastArg = streams[streams.length - 1];
if ('function' == typeof lastArg) {
cb = streams.splice(-1)[0];
lastArg = streams[streams.length - 1];
if (Array.isArray(streams[0])) {
streams = streams[0]
}
if ('object' == typeof lastArg && typeof lastArg.pipe != 'function') {
opts = streams.splice(-1)[0];
if (typeof streams[streams.length - 1] === 'function') {
cb = streams.pop()
}
var first = streams[0];
var last = streams[streams.length - 1];
var ret;
opts = objectAssign({}, defaultOpts, opts)
if (
typeof streams[streams.length - 1] === 'object' &&
typeof streams[streams.length - 1].pipe !== 'function'
) {
opts = streams.pop()
}
const first = streams[0]
const last = streams[streams.length - 1]
let ret
opts = Object.assign({}, defaultOpts, opts)
if (!first) {
if (cb) process.nextTick(cb);
return new PassThrough(opts);
if (cb) process.nextTick(cb)
return new PassThrough(opts)
}
if (first.writable && last.readable) ret = duplexer(opts, first, last);
else if (streams.length == 1) ret = new Readable(opts).wrap(streams[0]);
else if (first.writable) ret = first;
else if (last.readable) ret = last;
else ret = new PassThrough(opts);
streams.forEach(function(stream, i){
var next = streams[i+1];
if (next) stream.pipe(next);
if (stream != ret) stream.on('error', ret.emit.bind(ret, 'error'));
});
if (first.writable && last.readable) ret = duplexer(opts, first, last)
else if (streams.length === 1) ret = new Readable(opts).wrap(streams[0])
else if (first.writable) ret = first
else if (last.readable) ret = last
else ret = new PassThrough(opts)
for (const [i, stream] of streams.entries()) {
const next = streams[i + 1]
if (next) stream.pipe(next)
if (stream !== ret) stream.on('error', err => ret.emit('error', err))
}
if (cb) {
var ended = false;
ret.on('error', end);
last.on('finish', function(){ end() });
last.on('close', function(){ end() });
function end(err){
if (ended) return;
ended = true;
cb(err);
let ended = false
const end = err => {
if (ended) return
ended = true
cb(err)
}
ret.on('error', end)
last.on('finish', () => end())
last.on('close', () => end())
}
return ret;
return ret
}
/**
* Expose `pipe`.
*/
module.exports = pipe
{
"name": "multipipe",
"version": "1.0.2",
"version": "2.0.0",
"description": "pipe streams with centralized error handling",

@@ -13,7 +13,9 @@ "license": "MIT",

"mocha": "^3.0.0",
"prettier-standard": "^7.0.3",
"standard": "^10.0.3",
"through2": "^2.0.0"
},
"scripts": {
"test": "make test"
"test": "prettier-standard '*.js' 'test/*.js' && standard && mocha --reporter spec --timeout 300"
}
}

@@ -11,17 +11,17 @@ # multipipe

```js
var pipe = require('multipipe');
const pipe = require('multipipe')
// pipe streams
var stream = pipe(streamA, streamB, streamC);
const stream = pipe(streamA, streamB, streamC)
// centralized error handling
stream.on('error', fn);
stream.on('error', fn)
// creates a new stream
source.pipe(stream).pipe(dest);
source.pipe(stream).pipe(dest)
// optional callback on finish or error
pipe(streamA, streamB, streamC, function(err){
pipe(streamA, streamB, streamC, err => {
// ...
});
})

@@ -39,7 +39,7 @@ // pass options

```js
var stream = pipe(a, b, c);
const stream = pipe(a, b, c)
source
.pipe(stream)
.pipe(destination);
.pipe(destination)
```

@@ -60,11 +60,11 @@

```js
var stream = pipe(a, b, c);
const stream = pipe(a, b, c)
stream.on('error', function(err){
stream.on('error', err => {
// called three times
});
})
a.emit('error', new Error);
b.emit('error', new Error);
c.emit('error', new Error);
a.emit('error', new Error)
b.emit('error', new Error)
c.emit('error', new Error)
```

@@ -71,0 +71,0 @@

@@ -1,201 +0,214 @@

var assert = require('assert');
var pipe = require('..');
var Stream = require('stream');
var through = require('through2');
/* global describe, it */
describe('pipe()', function(){
it('should return a stream', function(done){
assert(pipe(done));
});
it('should accept options', function(){
const assert = require('assert')
const pipe = require('..')
const Stream = require('stream')
const through = require('through2')
const Readable = () => {
const readable = new Stream.Readable({ objectMode: true })
readable._read = function () {
this.push('a')
this.push(null)
}
return readable
}
const Transform = () => {
const transform = new Stream.Transform({ objectMode: true })
transform._transform = (chunk, _, done) => {
done(null, chunk.toUpperCase())
}
return transform
}
const Writable = cb => {
const writable = new Stream.Writable({ objectMode: true })
writable._write = (chunk, _, done) => {
assert.equal(chunk, 'A')
done()
cb && cb()
}
return writable
}
describe('pipe()', () => {
it('should return a stream', done => {
assert(pipe(done))
})
it('should accept options', () => {
assert.equal(pipe({ objectMode: false })._readableState.objectMode, false)
});
});
})
})
describe('pipe(a)', function(){
it('should pass through to a', function(done){
Readable().pipe(pipe(Transform())).pipe(Writable(done))
});
it('should accept options', function(){
var readable = Readable({ objectMode: true });
assert.equal(pipe(readable, { objectMode: false })._readableState.objectMode, false)
});
});
describe('pipe(a)', () => {
it('should pass through to a', done => {
Readable()
.pipe(pipe(Transform()))
.pipe(Writable(done))
})
it('should accept options', () => {
const readable = Readable({ objectMode: true })
assert.equal(
pipe(readable, { objectMode: false })._readableState.objectMode,
false
)
})
})
describe('pipe(a, b, c)', function(){
it('should pipe internally', function(done){
pipe(Readable(), Transform(), Writable(done));
});
it('should be writable', function(done){
var stream = pipe(Transform(), Writable(done));
assert(stream.writable);
Readable().pipe(stream);
});
describe('pipe(a, b, c)', () => {
it('should pipe internally', done => {
pipe(Readable(), Transform(), Writable(done))
})
it('should be readable', function(done){
var stream = pipe(Readable(), Transform());
assert(stream.readable);
stream.pipe(Writable(done));
});
it('should be readable and writable', function(done){
var stream = pipe(Transform(), Transform());
assert(stream.readable);
assert(stream.writable);
it('should be writable', done => {
const stream = pipe(Transform(), Writable(done))
assert(stream.writable)
Readable().pipe(stream)
})
it('should be readable', done => {
const stream = pipe(Readable(), Transform())
assert(stream.readable)
stream.pipe(Writable(done))
})
it('should be readable and writable', done => {
const stream = pipe(Transform(), Transform())
assert(stream.readable)
assert(stream.writable)
Readable()
.pipe(stream)
.pipe(Writable(done));
});
describe('errors', function(){
it('should reemit', function(done){
var a = Transform();
var b = Transform();
var c = Transform();
var stream = pipe(a, b, c);
var err = new Error;
var i = 0;
stream.on('error', function(_err){
i++;
assert.equal(_err, err);
assert(i <= 3);
if (i == 3) done();
});
a.emit('error', err);
b.emit('error', err);
c.emit('error', err);
});
.pipe(stream)
.pipe(Writable(done))
})
it('should not reemit endlessly', function(done){
var a = Transform();
var b = Transform();
var c = Transform();
c.readable = false;
var stream = pipe(a, b, c);
var err = new Error;
var i = 0;
stream.on('error', function(_err){
i++;
assert.equal(_err, err);
assert(i <= 3);
if (i == 3) done();
});
a.emit('error', err);
b.emit('error', err);
c.emit('error', err);
});
});
it('should accept options', function(){
var a = Readable()
var b = Transform()
var c = Writable()
assert.equal(pipe(a, b, c, { objectMode: false })._readableState.objectMode, false)
});
});
describe('errors', () => {
it('should reemit', done => {
const a = Transform()
const b = Transform()
const c = Transform()
const stream = pipe(a, b, c)
const err = new Error()
let i = 0
describe('pipe(a, b, c, fn)', function(){
it('should call on finish', function(done){
var finished = false;
var a = Readable();
var b = Transform();
var c = Writable(function(){
finished = true;
});
stream.on('error', _err => {
i++
assert.equal(_err, err)
assert(i <= 3)
if (i === 3) done()
})
pipe(a, b, c, function(err){
assert(!err);
assert(finished);
done();
});
});
a.emit('error', err)
b.emit('error', err)
c.emit('error', err)
})
it('should call with error once', function(done){
var a = Readable();
var b = Transform();
var c = Writable();
var err = new Error;
it('should not reemit endlessly', done => {
const a = Transform()
const b = Transform()
const c = Transform()
c.readable = false
const stream = pipe(a, b, c)
const err = new Error()
let i = 0
pipe(a, b, c, function(err){
assert(err);
done();
});
stream.on('error', function (_err) {
i++
assert.equal(_err, err)
assert(i <= 3)
if (i === 3) done()
})
a.emit('error', err);
b.emit('error', err);
c.emit('error', err);
});
a.emit('error', err)
b.emit('error', err)
c.emit('error', err)
})
})
it('should accept options', () => {
const a = Readable()
const b = Transform()
const c = Writable()
assert.equal(
pipe(a, b, c, { objectMode: false })._readableState.objectMode,
false
)
})
})
it('should call on destroy', function(done){
var a = Readable();
var b = Transform();
var c = through();
describe('pipe(a, b, c, fn)', () => {
it('should call on finish', done => {
let finished = false
const a = Readable()
const b = Transform()
const c = Writable(function () {
finished = true
})
pipe(a, b, c, function(err){
assert(!err);
done();
});
pipe(a, b, c, err => {
assert(!err)
assert(finished)
done()
})
})
c.destroy();
});
it('should call with error once', done => {
const a = Readable()
const b = Transform()
const c = Writable()
const err = new Error()
it('should call on destroy with error', function(done){
var a = Readable();
var b = Transform();
var c = through();
var err = new Error;
pipe(a, b, c, err => {
assert(err)
done()
})
pipe(a, b, c, function(_err){
assert.equal(_err, err);
done();
});
a.emit('error', err)
b.emit('error', err)
c.emit('error', err)
})
c.destroy(err);
});
it('should call on destroy', done => {
const a = Readable()
const b = Transform()
const c = through()
it('should accept options', function(done){
var a = Readable()
var b = Transform()
var c = Writable()
assert.equal(pipe(a, b, c, { objectMode: false }, done)._readableState.objectMode, false)
});
pipe(a, b, c, err => {
assert(!err)
done()
})
it('should ignore parameters on non error events', function(done){
var a = Readable();
var b = Transform();
var c = Writable();
pipe(a, b, c, done);
c.emit('finish', true);
});
});
c.destroy()
})
function Readable(){
var readable = new Stream.Readable({ objectMode: true });
readable._read = function(){
this.push('a');
this.push(null);
};
return readable;
}
it('should call on destroy with error', done => {
const a = Readable()
const b = Transform()
const c = through()
const err = new Error()
function Transform(){
var transform = new Stream.Transform({ objectMode: true });
transform._transform = function(chunk, _, done){
done(null, chunk.toUpperCase());
};
return transform;
}
pipe(a, b, c, _err => {
assert.equal(_err, err)
done()
})
function Writable(cb){
var writable = new Stream.Writable({ objectMode: true });
writable._write = function(chunk, _, done){
assert.equal(chunk, 'A');
done();
cb && cb();
};
return writable;
}
c.destroy(err)
})
it('should accept options', done => {
const a = Readable()
const b = Transform()
const c = Writable()
assert.equal(
pipe(a, b, c, { objectMode: false }, done)._readableState.objectMode,
false
)
})
it('should ignore parameters on non error events', done => {
const a = Readable()
const b = Transform()
const c = Writable()
pipe(a, b, c, done)
c.emit('finish', true)
})
})

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc