Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

object-stream-tools

Package Overview
Dependencies
Maintainers
1
Versions
10
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

object-stream-tools - npm Package Compare versions

Comparing version 1.1.0 to 1.2.1

145

index.js

@@ -1,75 +0,88 @@

'use strict';
'use strict'
const stream = require('stream');
const through2Concurrent = require('through2-concurrent');
const streamToArray = require('stream-to-array');
const stream = require('stream')
const through2Concurrent = require('through2-concurrent')
const streamToArray = require('stream-to-array')
function thru(transform, flush) {
return new stream.Transform({
objectMode: true,
transform: (obj, enc, cb) => transform(obj, cb),
flush
});
}
function thru(transform, flush) {
return new stream.Transform({
objectMode: true,
transform: (obj, enc, cb) => transform(obj, cb),
flush
})
}
function thruParallel(maxConcurrency, transform, flush) {
return through2Concurrent.obj({ maxConcurrency },
(obj, enc, cb) => transform(obj, cb), flush
);
}
function thruParallel(maxConcurrency, transform, flush) {
return through2Concurrent.obj({maxConcurrency},
(obj, enc, cb) => transform(obj, cb), flush
)
}
function arrayToStream(data) {
const newStream = new stream.Readable({ objectMode: true });
data.forEach(item => newStream.push(item));
newStream.push(null);
return newStream
};
function arrayToStream(data) {
const newStream = new stream.Readable({objectMode: true})
data.forEach(item => newStream.push(item))
newStream.push(null)
return newStream
}
function streamToSet(stream) {
return new Promise((resolve, reject) => {
const set = new Set();
stream
.on('data', data => set.add(data))
.on('error', reject)
.on('end', () => resolve(set))
})
}
function streamToSet(stream) {
return new Promise((resolve, reject) => {
const set = new Set()
stream
.on('data', data => set.add(data))
.on('error', reject)
.on('end', () => resolve(set))
})
}
function newReadable() {
const rs = new stream.Readable({ objectMode: true });
rs._read = () => {};
return rs
}
function newReadable() {
const rs = new stream.Readable({objectMode: true})
rs._read = () => {
}
return rs
}
function map(func) {
return new stream.Transform({
objectMode: true,
transform: (data, enc, cb) => {
cb(null, func(data))
}
})
}
function map(func) {
return new stream.Transform({
objectMode: true,
transform: (data, enc, cb) => {
cb(null, func(data))
}
})
}
function filter(func) {
return new stream.Transform({
objectMode: true,
transform: (data, enc, cb) => {
if (func(data)) {
cb(null, data)
} else {
cb();
}
}
})
}
function filter(func) {
return new stream.Transform({
objectMode: true,
transform: (data, enc, cb) => {
if (func(data)) {
cb(null, data)
} else {
cb()
}
}
})
}
module.exports = {
thru,
thruParallel,
arrayToStream,
streamToArray,
streamToSet,
newReadable,
map,
filter
}
function reduce(func, acc) {
let i = 0
return thru((curr, cb) => {
acc = func(acc, curr, i++)
cb()
}, function() {
this.emit('data', acc)
this.emit('end')
})
}
module.exports = {
thru,
thruParallel,
arrayToStream,
streamToArray,
streamToSet,
newReadable,
map,
filter,
reduce
}
{
"name": "object-stream-tools",
"version": "1.1.0",
"version": "1.2.1",
"description": "Useful tools for manipulating object streams. Will be especially helpful to developers used to map - filter - reduce approach of nodejs arrays.",

@@ -16,3 +16,3 @@ "main": "index.js",

"node",
"javaascript",
"javascript",
"streams",

@@ -24,5 +24,8 @@ "stream",

"array",
"set"
"set",
"tools",
"object stream",
"object streams"
],
"author": "Krzysztof Burlinski",
"author": "Krzysztof Burlinski and Lukasz Gintowt",
"license": "MIT",

@@ -35,2 +38,3 @@ "bugs": {

"JSONStream": "^1.1.1",
"lodash": "^4.13.1",
"tap": "^5.7.2"

@@ -37,0 +41,0 @@ },

@@ -1,1 +0,104 @@

# object-stream-tools
# object-stream-tools
This package brings goodies of functional programming (map, filter, reduce) to node streams.
# Installation
```js
npm install --save object-stream-tools
```
# Usage
#### arrayToStream
Converts existing array to stream of objects. Useful if you want to inject/merge those object to the existing stream.
```js
const ost = require('object-stream-tools')
ost.arrayToStream([{foo: 'bar'}, {web: 'scale'}])
.on('data', data => {
console.log(data)
})
.pipe(somewhereWritable)
```
Prints
```js
[{foo: 'bar'}, {web: 'scale'}]
```
#### streamToSet
Its very useful if you want to get unique elements / set of values
```js
const jsonStream = require('JSONStream')
ost.streamToSet(fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty)))
.then(uniqueSet => {
// here one get array of unique elements
const uniqueArray = Array.from(uniqueSet.values()).sort()
})
```
#### filter
If you just want to remove some objects from stream, you probably want to use filter function.
```js
ost.streamToArray(dataStream()
.pipe(ost.filter(e => e.property > 6)))
.then(filteredObjects =>
// here you will get filtered objects
)
```
#### map-reduce
Map is useful when you want to modify existing objects in the stream.
Reduce is useful if you want to get single object/value based on whole stream, but
you dont want to load whole stream to memory.
Example: sum / average value of huge stream
```js
const jsonStream = require('JSONStream')
ost.streamToArray((fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0)))
.then(reducedValue) => {
// here you will get reduced value wrapped in array
})
```
Here is example with buffered/string input output:
```js
const jsonStream = require('JSONStream')
fs.createReadStream('./test/data.json')
.pipe(jsonStream.parse('*'))
.pipe(ost.map(obj => obj.requiredProperty))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0)))
.on('data', reducedValue =>
// here you will get reduced value
})
.pipe(jsonStream.stringify())
.pipe(process.stdout)
```
## Please look at the tests for more use cases.
[{
"foo": "bar",
"szop": "pracz",
"cats": ["Cat 1", "Cat 2", "Cat 3"]
"cats": ["Cat 1", "Cat 2", "Cat 3"],
"value": 42
}, {
"foo": "foo",
"szop": "pracz",
"cats": ["Cat 4", "Cat 5", "Cat 6"]
"cats": ["Cat 4", "Cat 5", "Cat 6"],
"value": 1
}, {
"foo": "rand",
"szop": "niepracz",
"cats": ["Cat 5", "Cat 6"]
"cats": ["Cat 5", "Cat 6"],
"value": 7
}]

@@ -13,3 +13,3 @@ 'use strict'

.pipe(ost.thru((obj, cb) => cb(null, obj.cats.length))))
.then(objs => t.same(objs, [3, 3, 2]), t.fail)
.then(objs => t.same(objs, [3, 3, 2]), t.fail)
)

@@ -19,3 +19,3 @@

ost.streamToArray(dataStream())
.then(objs => t.same(objs, data), t.fail)
.then(objs => t.same(objs, data), t.fail)
)

@@ -26,7 +26,7 @@

.pipe(ost.map(obj => obj.szop)))
.then(set => {
const actual = Array.from(set.values()).sort();
const expectedData = ["pracz", "niepracz"].sort();
t.same(actual, expectedData)
}, t.fail)
.then(uniqueSet => {
const actual = Array.from(uniqueSet.values()).sort()
const expectedData = ["pracz", "niepracz"].sort()
t.same(actual, expectedData)
}, t.fail)
)

@@ -37,6 +37,6 @@

ost.arrayToStream(data)
.pipe(ost.map(obj => obj.szop)))
.then(objs => {
t.same(objs, ["pracz", "pracz", "niepracz"])
}, t.fail)
.pipe(ost.map(obj => obj.szop)))
.then(objs => {
t.same(objs, ["pracz", "pracz", "niepracz"])
}, t.fail)
)

@@ -47,3 +47,3 @@

.pipe(ost.map(obj => obj.foo)))
.then(objs => t.same(objs, ["bar", "foo", "rand"]), t.fail)
.then(objs => t.same(objs, ["bar", "foo", "rand"]), t.fail)
)

@@ -55,24 +55,44 @@

.pipe(ost.filter(testFilter)))
.then(objs => t.same(objs, data.filter(testFilter)), t.fail)
.then(objs => t.same(objs, data.filter(testFilter)), t.fail)
)
tap.test('Test filter on numerical values', t=>
ost.streamToArray(dataStream()
.pipe(ost.filter(e => e.value > 6)))
.then(objs => t.same(objs, data.filter(e => e.value > 6)))
.catch(t.fail)
)
tap.test('Test reduce', t =>
dataStream()
.pipe(ost.map(obj => obj.value))
.pipe(ost.reduce((acc, curr, i) => {
return acc + curr + i
}, 0))
.on('data', reducedValue =>
t.same(reducedValue, 42 + 1 + 7 + 3))
.on('error', err => t.fail(err.stack))
.pipe(jsonStream.stringify())
.on('end', () => t.end())
)
// This test is a bit more complicated. We will only let the 1st object through when second has already been called.
tap.test('Test thruParallel', t => {
let secondObjDone = false;
let secondObjDone = false
return ost.streamToArray(dataStream()
.pipe(ost.thruParallel(2, (obj, cb) => {
if (obj.foo === "bar") {
const interval = setInterval(() => {
if (secondObjDone) {
cb(null, obj.cats.length)
clearInterval(interval)
}
}, 100)
return
}
if (obj.foo === "foo") {
secondObjDone = true
}
cb(null, obj.cats.length)
})))
.pipe(ost.thruParallel(2, (obj, cb) => {
if (obj.foo === "bar") {
const interval = setInterval(() => {
if (secondObjDone) {
cb(null, obj.cats.length)
clearInterval(interval)
}
}, 100)
return
}
if (obj.foo === "foo") {
secondObjDone = true
}
cb(null, obj.cats.length)
})))
.then(objs => {

@@ -79,0 +99,0 @@ const actualData = objs.sort()

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc