Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

object-stream-tools

Package Overview
Dependencies
Maintainers
2
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

object-stream-tools - npm Package Compare versions

Comparing version 1.2.3 to 2.0.0

60

index.js

@@ -5,3 +5,2 @@ 'use strict'

const through2Concurrent = require('through2-concurrent')
const streamToArray = require('stream-to-array')

@@ -11,3 +10,5 @@ function thru(transform, flush) {

objectMode: true,
transform: function(obj, enc, cb) { transform.call(this, obj, cb) },
transform: function (obj, enc, cb) {
transform.call(this, obj, cb)
},
flush

@@ -18,4 +19,4 @@ })

function thruParallel(maxConcurrency, transform, flush) {
return through2Concurrent.obj({ maxConcurrency },
function(obj, enc, cb) {
return through2Concurrent.obj({maxConcurrency},
function (obj, enc, cb) {
transform.call(this, obj, cb)

@@ -28,3 +29,3 @@ },

function arrayToStream(data) {
const newStream = new stream.Readable({ objectMode: true })
const newStream = newReadable()
data.forEach(item => newStream.push(item))

@@ -35,15 +36,20 @@ newStream.push(null)

function streamToSet(stream) {
return new Promise((resolve, reject) => {
const set = new Set()
stream
.on('data', data => set.add(data))
.on('error', reject)
.on('end', () => resolve(set))
})
function streamToSet() {
return reduce((acc, curr) => {
acc.add(curr)
return acc
}, new Set())
}
function streamToArray() {
return reduce((acc, curr) => {
acc.push(curr)
return acc
}, [])
}
function newReadable() {
const rs = new stream.Readable({ objectMode: true })
rs._read = () => {}
const rs = new stream.Readable({objectMode: true})
rs._read = () => {
}
return rs

@@ -61,2 +67,13 @@ }

function promiseToStream(promise) {
const newStream = newReadable()
promise
.then(data => {
newStream.push(data)
newStream.push(null)
})
.catch(err => newStream.emit('error', err))
return newStream
}
function filter(func) {

@@ -75,3 +92,7 @@ return new stream.Transform({

function reduce(func, acc) {
function required() {
throw new Error('Initial value required')
}
function reduce(func, acc = required()) {
let i = 0

@@ -81,3 +102,3 @@ return thru((curr, cb) => {

cb()
}, function() {
}, function () {
this.emit('data', acc)

@@ -92,8 +113,9 @@ this.emit('end')

arrayToStream,
streamToSet,
streamToArray,
streamToSet,
newReadable,
map,
filter,
reduce
reduce,
promiseToStream
}
{
"name": "object-stream-tools",
"version": "1.2.3",
"version": "2.0.0",
"description": "Useful tools for manipulating object streams. Will be especially helpful to developers used to map - filter - reduce approach of nodejs arrays.",

@@ -26,3 +26,5 @@ "main": "index.js",

"object stream",
"object streams"
"object streams",
"promise to stream",
"promise to streams"
],

@@ -41,5 +43,4 @@ "author": "Krzysztof Burlinski and Lukasz Gintowt",

"dependencies": {
"stream-to-array": "^2.3.0",
"through2-concurrent": "^1.1.1"
}
}

@@ -75,10 +75,11 @@ # object-stream-tools

const jsonStream = require('JSONStream')
ost.streamToArray((fs.createReadStream('./test/data.json')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
// pick required property you want to reduce over
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0)))
.then(reducedValue) => {
// here you will get reduced value wrapped in array
}, 0))
.on('data', reducedValue => {
// here you will get reduced value
})

@@ -104,3 +105,13 @@ ```

#### promise to stream
It is a useful helper if you dealing with a lot of smaller data that are wrapped in Promise API, ex:
```js
ost.promiseToStream(myDbQueryThatReturnPromise())
.on('data', data => {
// here you will get a real stream that you can pipe
})
```
## Please look at the tests for more use cases.

@@ -11,49 +11,65 @@ 'use strict'

tap.test('Test thru', t =>
ost.streamToArray(dataStream()
.pipe(ost.thru((obj, cb) => cb(null, obj.cats.length))))
.then(objs => t.same(objs, [3, 3, 2]), t.fail)
dataStream()
.pipe(ost.thru((obj, cb) => cb(null, obj.cats.length)))
.pipe(ost.streamToArray())
.on('data', objs => t.same(objs, [3, 3, 2]))
.on('error', t.fail)
.on('end', t.end)
)
tap.test('Test streamToArray', t =>
ost.streamToArray(dataStream())
.then(objs => t.same(objs, data), t.fail)
dataStream()
.pipe(ost.streamToArray())
.on('data', objs => t.same(objs, data))
.on('error', t.fail)
.on('end', t.end)
)
tap.test('Test streamToSet', t =>
ost.streamToSet(dataStream()
.pipe(ost.map(obj => obj.szop)))
.then(uniqueSet => {
const actual = Array.from(uniqueSet.values()).sort()
const expectedData = ["pracz", "niepracz"].sort()
t.same(actual, expectedData)
}, t.fail)
tap.test('Test streamToSet returns unique values', t =>
dataStream()
.pipe(ost.map(obj => obj.szop))
.pipe(ost.streamToSet())
.on('data', uniqueSet =>
t.same(Array.from(uniqueSet.values()), ['pracz', 'niepracz'])
)
.on('error', t.fail)
.on('end', t.end)
)
tap.test('Test arrayToStream', t =>
ost.streamToArray(
ost.arrayToStream(data)
.pipe(ost.map(obj => obj.szop)))
.then(objs => {
t.same(objs, ["pracz", "pracz", "niepracz"])
}, t.fail)
ost.arrayToStream(data)
.pipe(ost.map(obj => obj.szop))
.pipe(ost.streamToArray())
.on('data', objs => {
t.same(objs, ['pracz', 'pracz', 'niepracz'])
})
.on('error', t.fail)
.on('end', t.end)
)
tap.test('Test map', t =>
ost.streamToArray(dataStream()
.pipe(ost.map(obj => obj.foo)))
.then(objs => t.same(objs, ["bar", "foo", "rand"]), t.fail)
dataStream()
.pipe(ost.map(obj => obj.foo))
.pipe(ost.streamToArray())
.on('data', objs => t.same(objs, ['bar', 'foo', 'rand']))
.on('error', t.fail)
.on('end', t.end)
)
tap.test('Test filter', t =>
ost.streamToArray(dataStream()
.pipe(ost.filter(testFilter)))
.then(objs => t.same(objs, data.filter(testFilter)), t.fail)
dataStream()
.pipe(ost.filter(testFilter))
.pipe(ost.streamToArray())
.on('data', objs => t.same(objs, data.filter(testFilter)))
.on('error', t.fail)
.on('end', t.end)
)
tap.test('Test filter on numerical values', t =>
ost.streamToArray(dataStream()
.pipe(ost.filter(e => e.value > 6)))
.then(objs => t.same(objs, data.filter(e => e.value > 6)))
.catch(t.fail)
dataStream()
.pipe(ost.filter(e => e.value > 6))
.pipe(ost.streamToArray())
.on('data', objs => t.same(objs, data.filter(e => e.value > 6)))
.on('error', t.fail)
.on('end', t.end)
)

@@ -63,11 +79,11 @@

dataStream()
.pipe(ost.map(obj => obj.value))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0))
.on('data', reducedValue =>
t.same(reducedValue, 42 + 1 + 7 + 3))
.on('error', err => t.fail(err.stack))
.pipe(jsonStream.stringify())
.on('end', () => t.end())
.pipe(ost.map(obj => obj.value))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0))
.on('data', reducedValue =>
t.same(reducedValue, 42 + 1 + 7 + 3))
.on('error', err => t.fail(err.stack))
.pipe(jsonStream.stringify())
.on('end', t.end)
)

@@ -78,45 +94,74 @@

let secondObjDone = false
return ost.streamToArray(dataStream()
.pipe(ost.thruParallel(2, (obj, cb) => {
if (obj.foo === "bar") {
const interval = setInterval(() => {
if (secondObjDone) {
cb(null, obj.cats.length)
clearInterval(interval)
}
}, 100)
return
}
if (obj.foo === "foo") {
secondObjDone = true
}
cb(null, obj.cats.length)
})))
.then(objs => {
dataStream()
.pipe(ost.thruParallel(2, (obj, cb) => {
if (obj.foo === 'bar') {
const interval = setInterval(() => {
if (secondObjDone) {
cb(null, obj.cats.length)
clearInterval(interval)
}
}, 100)
return
}
if (obj.foo === 'foo') {
secondObjDone = true
}
cb(null, obj.cats.length)
}))
.pipe(ost.streamToArray())
.on('data', objs => {
const actualData = objs.sort()
const expectedData = data.map(obj => obj.cats.length).sort()
t.same(actualData, expectedData)
}, t.fail)
})
.on('error', t.fail)
.on('end', t.end)
})
tap.test('Test thru not losing this (so it can use this.push)', t =>
ost.streamToArray(dataStream()
.pipe(ost.thru(function(obj, cb) {
dataStream()
.pipe(ost.thru(function (obj, cb) {
this.push(obj.cats.length)
this.push(obj.cats.length * 2)
cb()
})))
.then(objs => t.same(objs, [3, 6, 3, 6, 2, 4]), t.fail)
}))
.pipe(ost.streamToArray())
.on('data', objs => t.same(objs, [3, 6, 3, 6, 2, 4]))
.on('error', t.fail)
.on('end', t.end)
)
tap.test('Test thruParallel not losing this (so it can use this.push)', t =>
ost.streamToArray(dataStream()
.pipe(ost.thruParallel(2, function(obj, cb) {
dataStream()
.pipe(ost.thruParallel(2, function (obj, cb) {
this.push(obj.cats.length)
this.push(obj.cats.length * 2)
cb()
})))
.then(objs => t.same(objs.sort(), [3, 6, 3, 6, 2, 4].sort()), t.fail)
}))
.pipe(ost.streamToArray())
.on('data', objs => t.same(objs.sort(), [3, 6, 3, 6, 2, 4].sort()))
.on('error', t.fail)
.on('end', t.end)
)
tap.test('Test promise to stream on successful resolution', t => {
ost.promiseToStream(new Promise((resolve, reject) => {
setTimeout(() => resolve([3, 6, 3, 6, 2, 4]), 10)
}))
.on('data', objs => t.same(objs, [3, 6, 3, 6, 2, 4]))
.on('error', t.fail)
.on('end', t.end)
})
tap.test('Test promise to stream on rejection', t => {
ost.promiseToStream(new Promise((resolve, reject) => {
setTimeout(() => reject([3, 6, 3, 6, 2, 4]), 10)
}))
.on('data', data => t.fail('this one should be rejected'))
.on('error', err => {
t.pass('this promise is rejected')
t.end()
})
})
function dataStream() {

@@ -128,3 +173,3 @@ return fs.createReadStream('./test/data.json')

function testFilter(obj) {
return obj.szop === "pracz"
return obj.szop === 'pracz'
}

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc