tree-stream
tree-stream is a small node module that pipes streams together and destroys all of them if one of them closes.
npm install tree-stream
This package is forked from mafintosh's pump and aims to be a superset of pump
. When the provided pipe() topology is a linked-list, they're functionally equivalent.
What problem does it solve?
-
The original problems pump solved: When using standard source.pipe(dest)
source will not be destroyed if dest emits close or an error.
You are also not able to provide a callback to tell when then pipe has finished.
-
The object model (of ReadableStreamTree
and WritableStreamTree
) is expressive.
A representation for a sequence (or DAG) of stream transforms turns out to be really useful.
Sometimes you want to "pipeFrom" (a WritableStreamTree
) e.g. (from @wholebuzz/fs/src/local.ts):
import StreamTree, { ReadableStreamTree, WritableStreamTree } from 'tree-stream'
async function openWritableFile(url: string, _options?: OpenWritableFileOptions) {
let stream = StreamTree.writable(fs.createWriteStream(url))
if (url.endsWith('.gz')) stream = stream.pipeFrom(zlib.createGzip())
return stream
}
And sometimes you want the typical "pipe" case (for a ReadableStreamTree
):
async function openReadableFile(url: string, options?: OpenReadableFileOptions) {
let stream = StreamTree.readable(fs.createReadStream(url))
if (url.endsWith('.gz')) stream = stream.pipe(zlib.createGunzip())
return stream
}
You can equivalently apply a transform then, with either:
const tf = new Transform({ objectMode: true, transform(x, _, cb) { this.push(x); cb(); } })
readable.pipe(tf)
or
writable.pipeFrom(tf)
Provided that the ReadableStreamTree
and WritableStreamTree
are connected later:
const returnValue = await pumpWritable(writable, 'any return value', readable)
These APIs form the basis of @wholebuzz/fs,
which together with dbcp
powers @wholebuzz/mapreduce.
Usage
var streamTree = require('tree-stream')
var fs = require('fs')
var source = fs.createReadStream('/dev/random')
var dest = fs.createWriteStream('/dev/null')
var stream = streamTree.readable(source)
stream = stream.pipe(dest)
stream.finish(function(err) {
console.log('pipe finished', err)
})
setTimeout(function() {
dest.destroy()
}, 1000)
You can process an input stream and also hash it:
var streamTree = require('tree-stream')
var fs = require('fs')
var hasha = require('hasha')
var readable = streamTree.readable(fs.createReadStream('/tmp/foo.txt'))
readable = readable.split(2)
hasha.fromStream(readable[0].finish()).then((hash) => console.log(`Hash: ${hash}`))
readable[1].finish().on('data', function(data){ console.log(`Data: ${data}`) })
You can get pretty wild:
var streamTree = require('./tree-stream')
var fs = require('fs')
var hasha = require('hasha')
var writable = streamTree.writable(fs.createWriteStream('/tmp/foo.txt'))
var readable
[writable, readable] = writable.joinReadable(1)
hasha.fromStream(readable[0].finish()).then((hash) => console.log(`Hash: ${hash}`))
writable = writable.joinWritable([fs.createWriteStream('/tmp/bar.txt')])
var stream = writable.finish()
stream.write('unicorn', function() { stream.end() })
License
MIT
Related
Derived from pump
, part of the mississippi stream utility collection.