object-stream-tools
This package brings goodies of functional programming (map, filter, reduce) to node streams.
Installation
npm install --save object-stream-tools
Usage
arrayToStream
Converts existing array to stream of objects. Useful if you want to inject/merge those object to the existing stream.
const ost = require('object-stream-tools')
ost.arrayToStream([{foo: 'bar'}, {web: 'scale'}])
.on('data', data => {
console.log(data)
})
.pipe(somewhereWritable)
Prints
[{foo: 'bar'}, {web: 'scale'}]
streamToSet
Its very useful if you want to get unique elements / set of values
const jsonStream = require('JSONStream')
ost.streamToSet(fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty)))
.then(uniqueSet => {
const uniqueArray = Array.from(uniqueSet.values()).sort()
})
filter
If you just want to remove some objects from stream, you probably want to use filter function.
ost.streamToArray(dataStream()
.pipe(ost.filter(e => e.property > 6)))
.then(filteredObjects =>
)
map-reduce
Map is useful when you want to modify existing objects in the stream.
Reduce is useful if you want to get single object/value based on whole stream, but
you dont want to load whole stream to memory.
Example: sum / average value of huge stream
const jsonStream = require('JSONStream')
ost.streamToArray((fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0)))
.then(reducedValue) => {
})
Here is example with buffered/string input output:
const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0)))
.on('data', reducedValue =>
})
.pipe(jsonStream.stringify())
.pipe(process.stdout)
Please look at the tests for more use cases.