object-stream-tools
This package brings goodies of functional programming (map, filter, reduce) to node streams.
Installation
npm install --save object-stream-tools
Usage
arrayToStream
Converts existing array to stream of objects. Useful if you want to inject/merge those object to the existing stream.
const ost = require('object-stream-tools')
ost.arrayToStream([{foo: 'bar'}, {web: 'scale'}])
.on('data', data => {
console.log(data)
})
.pipe(somewhereWritable)
Prints
[{foo: 'bar'}, {web: 'scale'}]
streamToSet
Its very useful if you want to get unique elements / set of values
const jsonStream = require('JSONStream')
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.streamToSet())
.on('data', uniqueSet => {
const uniqueArray = Array.from(uniqueSet.values()).sort()
})
filter
If you just want to remove some objects from stream, you probably want to use filter function.
fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.filter(e => e.value > 6))
.pipe(jsonStream.stringify())
.pipe(process.stdout)
map-reduce
Map is useful when you want to modify existing objects in the stream.
Reduce is useful if you want to get single object/value based on whole stream, but
you dont want to load whole stream to memory.
Example: sum / average value of huge stream
const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0))
.on('data', reducedValue => {
})
Here is example with buffered/string input output:
const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0)))
.on('data', reducedValue =>
})
.pipe(jsonStream.stringify())
.pipe(process.stdout)
Please note that if you do not pass initial value reduce function will start in (prev, curr, i) mode.
Objects/Array/Reduce
promise to stream
It is a useful helper if you dealing with a lot of smaller data that are wrapped in Promise API, ex:
ost.promiseToStream(myDbQueryThatReturnPromise())
.on('data', data => {
})
stream to promise
Very handy when you want to consume streams but rest of your application logic uses promises.
ost.streamToPromise(fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.filter(e => e.value > 6)))
.then(data => {
})
find
Find is super handy if we want to quickly check if vale/objects exists in the stream.
Think about it as a grep on the steroids.
ost.streamToPromise(fs.createReadStream('../test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.find(e => e.value > 6)))
.then(foundObj => {
})
Please look at the tests for more use cases.