pumpify
Advanced tools
Comparing version 1.0.2 to 1.1.0
45
index.js
var pump = require('pump') | ||
var util = require('util') | ||
var duplexify = require('duplexify') | ||
var pumpifier = function(duplex) { | ||
return function() { | ||
var streams = Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments) | ||
var dup = duplex(null, null, {destroy:false}) | ||
var toArray = function(arguments) { | ||
if (!arguments.length) return [] | ||
return Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments) | ||
} | ||
var define = function(opts) { | ||
var Pumpify = function() { | ||
var streams = toArray(arguments) | ||
if (!(this instanceof Pumpify)) return new Pumpify(streams) | ||
duplexify.call(this, null, null, opts) | ||
if (streams.length) this.setPipeline(streams) | ||
} | ||
util.inherits(Pumpify, duplexify) | ||
Pumpify.prototype.setPipeline = function() { | ||
var streams = toArray(arguments) | ||
var self = this | ||
var w = streams[0] | ||
@@ -24,17 +41,17 @@ var r = streams[streams.length-1] | ||
if (r && r._writableState) dup.on('prefinish', onprefinish) | ||
dup.on('close', onclose) | ||
if (r && r._writableState) this.on('prefinish', onprefinish) | ||
this.on('close', onclose) | ||
pump(streams, function(err) { | ||
dup.removeListener('close', onclose) | ||
dup.destroy(err) | ||
self.removeListener('close', onclose) | ||
self.destroy(err) | ||
}) | ||
dup.setWritable(w) | ||
dup.setReadable(r) | ||
this.setWritable(w) | ||
this.setReadable(r) | ||
} | ||
return dup | ||
} | ||
return Pumpify | ||
} | ||
module.exports = pumpifier(duplexify) | ||
module.exports.obj = pumpifier(duplexify.obj) | ||
module.exports = define({destroy:false}) | ||
module.exports.obj = define({destroy:false, objectMode:true, highWaterMark:16}) |
{ | ||
"name": "pumpify", | ||
"version": "1.0.2", | ||
"version": "1.1.0", | ||
"description": "Combine an array of streams into a single duplex stream using pump and duplexify", | ||
"main": "index.js", | ||
"dependencies": { | ||
"duplexify": "^1.2.1", | ||
"duplexify": "^1.3.0", | ||
"pump": "^0.3.3" | ||
@@ -9,0 +9,0 @@ }, |
@@ -33,4 +33,19 @@ # pumpify | ||
### Using `setPipeline(s1, s2, ...)` | ||
Similar to [duplexify](https://github.com/mafintosh/duplexify) you can also define the pipeline asynchronously using `setPipeline(s1, s2, ...)` | ||
``` js | ||
var untar = pumpify() | ||
setTimeout(function() { | ||
// will start draining the input now | ||
untar.setPipeline(zlib.createGunzip(), tar.extract('output-folder')) | ||
}, 1000) | ||
fs.createReadStream('some-gzipped-tarball.tgz').pipe(untar) | ||
``` | ||
## License | ||
MIT |
29
test.js
@@ -116,2 +116,31 @@ var tape = require('tape') | ||
}, 100) | ||
}) | ||
tape('async', function(t) { | ||
var pipeline = pumpify() | ||
t.plan(4) | ||
pipeline.write('hello') | ||
pipeline.on('data', function(data) { | ||
t.same(data.toString(), 'HELLO') | ||
t.end() | ||
}) | ||
setTimeout(function() { | ||
pipeline.setPipeline( | ||
through(function(data, enc, cb) { | ||
t.same(data.toString(), 'hello') | ||
cb(null, data.toString().toUpperCase()) | ||
}), | ||
through(function(data, enc, cb) { | ||
t.same(data.toString(), 'HELLO') | ||
cb(null, data.toString().toLowerCase()) | ||
}), | ||
through(function(data, enc, cb) { | ||
t.same(data.toString(), 'hello') | ||
cb(null, data.toString().toUpperCase()) | ||
}) | ||
) | ||
}, 100) | ||
}) |
6751
160
50
Updatedduplexify@^1.3.0