Comparing version 1.0.1 to 1.0.2
33
index.js
var pump = require('pump') | ||
var duplexify = require('duplexify') | ||
var Foo = function(proxy) { | ||
this._proxy = proxy | ||
} | ||
Foo.prototype._e | ||
var pumpifier = function(duplex) { | ||
return function() { | ||
var streams = Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments) | ||
var first = streams[0] | ||
var last = streams[streams.length-1] | ||
var dup = duplex(null, null, {destroy:false}) | ||
var dup = duplex() | ||
var w = first.writable ? first : null | ||
var r = last.readable ? last : null | ||
var w = streams[0] | ||
var r = streams[streams.length-1] | ||
dup.setWritable(w) | ||
dup.setReadable(r) | ||
r = r.readable ? r : null | ||
w = w.writable ? w : null | ||
var onprefinish = function(cb) { | ||
if (r._writableState.ended) return cb() | ||
r.on('finish', cb) | ||
} | ||
var onclose = function() { | ||
for (var i = 0; i < streams.length; i++) { | ||
if (!streams[i].destroy) continue; | ||
// we only need to destroy one. pump will care of the others | ||
if (streams[i] !== r && streams[i] !== w) streams[i].destroy() | ||
} | ||
streams[0].emit('error', new Error('stream was destroyed')) | ||
} | ||
if (r && r._writableState) dup.on('prefinish', onprefinish) | ||
dup.on('close', onclose) | ||
@@ -37,2 +31,5 @@ pump(streams, function(err) { | ||
dup.setWritable(w) | ||
dup.setReadable(r) | ||
return dup | ||
@@ -39,0 +36,0 @@ } |
{ | ||
"name": "pumpify", | ||
"version": "1.0.1", | ||
"version": "1.0.2", | ||
"description": "Combine an array of streams into a single duplex stream using pump and duplexify", | ||
"main": "index.js", | ||
"dependencies": { | ||
"duplexify": "^1.1.0", | ||
"pump": "^0.3.2" | ||
"duplexify": "^1.2.1", | ||
"pump": "^0.3.3" | ||
}, | ||
@@ -10,0 +10,0 @@ "devDependencies": { |
43
test.js
var tape = require('tape') | ||
var through = require('through2') | ||
var pumpify = require('./') | ||
var stream = require('stream') | ||
@@ -73,2 +74,44 @@ tape('basic', function(t) { | ||
test.emit('error', new Error('lol')) | ||
}) | ||
tape('end waits for last one', function(t) { | ||
var ran = false | ||
var a = through() | ||
var b = through() | ||
var c = through(function(data, enc, cb) { | ||
setTimeout(function() { | ||
ran = true | ||
cb() | ||
}, 100) | ||
}) | ||
var pipeline = pumpify(a, b, c) | ||
pipeline.write('foo') | ||
pipeline.end(function() { | ||
t.ok(ran) | ||
t.end() | ||
}) | ||
t.ok(!ran) | ||
}) | ||
tape('always wait for finish', function(t) { | ||
var a = new stream.Readable() | ||
a._read = function() {} | ||
a.push('hello') | ||
var pipeline = pumpify(a, through(), through()) | ||
var ran = false | ||
pipeline.on('finish', function() { | ||
t.ok(ran) | ||
t.end() | ||
}) | ||
setTimeout(function() { | ||
ran = true | ||
a.push(null) | ||
}, 100) | ||
}) |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
5221
123
Updatedduplexify@^1.2.1
Updatedpump@^0.3.3