object-stream-tools
Advanced tools
Comparing version 1.1.0 to 1.2.1
145
index.js
@@ -1,75 +0,88 @@ | ||
'use strict'; | ||
'use strict' | ||
const stream = require('stream'); | ||
const through2Concurrent = require('through2-concurrent'); | ||
const streamToArray = require('stream-to-array'); | ||
const stream = require('stream') | ||
const through2Concurrent = require('through2-concurrent') | ||
const streamToArray = require('stream-to-array') | ||
function thru(transform, flush) { | ||
return new stream.Transform({ | ||
objectMode: true, | ||
transform: (obj, enc, cb) => transform(obj, cb), | ||
flush | ||
}); | ||
} | ||
function thru(transform, flush) { | ||
return new stream.Transform({ | ||
objectMode: true, | ||
transform: (obj, enc, cb) => transform(obj, cb), | ||
flush | ||
}) | ||
} | ||
function thruParallel(maxConcurrency, transform, flush) { | ||
return through2Concurrent.obj({ maxConcurrency }, | ||
(obj, enc, cb) => transform(obj, cb), flush | ||
); | ||
} | ||
function thruParallel(maxConcurrency, transform, flush) { | ||
return through2Concurrent.obj({maxConcurrency}, | ||
(obj, enc, cb) => transform(obj, cb), flush | ||
) | ||
} | ||
function arrayToStream(data) { | ||
const newStream = new stream.Readable({ objectMode: true }); | ||
data.forEach(item => newStream.push(item)); | ||
newStream.push(null); | ||
return newStream | ||
}; | ||
function arrayToStream(data) { | ||
const newStream = new stream.Readable({objectMode: true}) | ||
data.forEach(item => newStream.push(item)) | ||
newStream.push(null) | ||
return newStream | ||
} | ||
function streamToSet(stream) { | ||
return new Promise((resolve, reject) => { | ||
const set = new Set(); | ||
stream | ||
.on('data', data => set.add(data)) | ||
.on('error', reject) | ||
.on('end', () => resolve(set)) | ||
}) | ||
} | ||
function streamToSet(stream) { | ||
return new Promise((resolve, reject) => { | ||
const set = new Set() | ||
stream | ||
.on('data', data => set.add(data)) | ||
.on('error', reject) | ||
.on('end', () => resolve(set)) | ||
}) | ||
} | ||
function newReadable() { | ||
const rs = new stream.Readable({ objectMode: true }); | ||
rs._read = () => {}; | ||
return rs | ||
} | ||
function newReadable() { | ||
const rs = new stream.Readable({objectMode: true}) | ||
rs._read = () => { | ||
} | ||
return rs | ||
} | ||
function map(func) { | ||
return new stream.Transform({ | ||
objectMode: true, | ||
transform: (data, enc, cb) => { | ||
cb(null, func(data)) | ||
} | ||
}) | ||
} | ||
function map(func) { | ||
return new stream.Transform({ | ||
objectMode: true, | ||
transform: (data, enc, cb) => { | ||
cb(null, func(data)) | ||
} | ||
}) | ||
} | ||
function filter(func) { | ||
return new stream.Transform({ | ||
objectMode: true, | ||
transform: (data, enc, cb) => { | ||
if (func(data)) { | ||
cb(null, data) | ||
} else { | ||
cb(); | ||
} | ||
} | ||
}) | ||
} | ||
function filter(func) { | ||
return new stream.Transform({ | ||
objectMode: true, | ||
transform: (data, enc, cb) => { | ||
if (func(data)) { | ||
cb(null, data) | ||
} else { | ||
cb() | ||
} | ||
} | ||
}) | ||
} | ||
module.exports = { | ||
thru, | ||
thruParallel, | ||
arrayToStream, | ||
streamToArray, | ||
streamToSet, | ||
newReadable, | ||
map, | ||
filter | ||
} | ||
function reduce(func, acc) { | ||
let i = 0 | ||
return thru((curr, cb) => { | ||
acc = func(acc, curr, i++) | ||
cb() | ||
}, function() { | ||
this.emit('data', acc) | ||
this.emit('end') | ||
}) | ||
} | ||
module.exports = { | ||
thru, | ||
thruParallel, | ||
arrayToStream, | ||
streamToArray, | ||
streamToSet, | ||
newReadable, | ||
map, | ||
filter, | ||
reduce | ||
} |
{ | ||
"name": "object-stream-tools", | ||
"version": "1.1.0", | ||
"version": "1.2.1", | ||
"description": "Useful tools for manipulating object streams. Will be especially helpful to developers used to map - filter - reduce approach of nodejs arrays.", | ||
@@ -16,3 +16,3 @@ "main": "index.js", | ||
"node", | ||
"javaascript", | ||
"javascript", | ||
"streams", | ||
@@ -24,5 +24,8 @@ "stream", | ||
"array", | ||
"set" | ||
"set", | ||
"tools", | ||
"object stream", | ||
"object streams" | ||
], | ||
"author": "Krzysztof Burlinski", | ||
"author": "Krzysztof Burlinski and Lukasz Gintowt", | ||
"license": "MIT", | ||
@@ -35,2 +38,3 @@ "bugs": { | ||
"JSONStream": "^1.1.1", | ||
"lodash": "^4.13.1", | ||
"tap": "^5.7.2" | ||
@@ -37,0 +41,0 @@ }, |
105
README.md
@@ -1,1 +0,104 @@ | ||
# object-stream-tools | ||
# object-stream-tools | ||
This package brings goodies of functional programming (map, filter, reduce) to node streams. | ||
# Installation | ||
```js | ||
npm install --save object-stream-tools | ||
``` | ||
# Usage | ||
#### arrayToStream | ||
Converts existing array to stream of objects. Useful if you want to inject/merge those object to the existing stream. | ||
```js | ||
const ost = require('object-stream-tools') | ||
ost.arrayToStream([{foo: 'bar'}, {web: 'scale'}]) | ||
.on('data', data => { | ||
console.log(data) | ||
}) | ||
.pipe(somewhereWritable) | ||
``` | ||
Prints | ||
```js | ||
[{foo: 'bar'}, {web: 'scale'}] | ||
``` | ||
#### streamToSet | ||
Its very useful if you want to get unique elements / set of values | ||
```js | ||
const jsonStream = require('JSONStream') | ||
ost.streamToSet(fs.createReadStream('./test/data.json') | ||
.pipe(jsonStream.parse('*')) | ||
.pipe(ost.map(obj => obj.requiredProperty))) | ||
.then(uniqueSet => { | ||
// here one get array of unique elements | ||
const uniqueArray = Array.from(uniqueSet.values()).sort() | ||
}) | ||
``` | ||
#### filter | ||
If you just want to remove some objects from stream, you probably want to use filter function. | ||
```js | ||
ost.streamToArray(dataStream() | ||
.pipe(ost.filter(e => e.property > 6))) | ||
.then(filteredObjects => | ||
// here you will get filtered objects | ||
) | ||
``` | ||
#### map-reduce | ||
Map is useful when you want to modify existing objects in the stream. | ||
Reduce is useful if you want to get single object/value based on whole stream, but | ||
you dont want to load whole stream to memory. | ||
Example: sum / average value of huge stream | ||
```js | ||
const jsonStream = require('JSONStream') | ||
ost.streamToArray((fs.createReadStream('./test/data.json') | ||
.pipe(jsonStream.parse('*')) | ||
.pipe(ost.map(obj => obj.requiredProperty)) | ||
.pipe(ost.reduce((acc, curr, i) => { | ||
return acc + curr + i | ||
}, 0))) | ||
.then(reducedValue) => { | ||
// here you will get reduced value wrapped in array | ||
}) | ||
``` | ||
Here is example with buffered/string input output: | ||
```js | ||
const jsonStream = require('JSONStream') | ||
fs.createReadStream('./test/data.json') | ||
.pipe(jsonStream.parse('*')) | ||
.pipe(ost.map(obj => obj.requiredProperty)) | ||
.pipe(ost.reduce((acc, curr, i) => { | ||
return acc + curr + i | ||
}, 0))) | ||
.on('data', reducedValue => | ||
// here you will get reduced value | ||
}) | ||
.pipe(jsonStream.stringify()) | ||
.pipe(process.stdout) | ||
``` | ||
## Please look at the tests for more use cases. |
[{ | ||
"foo": "bar", | ||
"szop": "pracz", | ||
"cats": ["Cat 1", "Cat 2", "Cat 3"] | ||
"cats": ["Cat 1", "Cat 2", "Cat 3"], | ||
"value": 42 | ||
}, { | ||
"foo": "foo", | ||
"szop": "pracz", | ||
"cats": ["Cat 4", "Cat 5", "Cat 6"] | ||
"cats": ["Cat 4", "Cat 5", "Cat 6"], | ||
"value": 1 | ||
}, { | ||
"foo": "rand", | ||
"szop": "niepracz", | ||
"cats": ["Cat 5", "Cat 6"] | ||
"cats": ["Cat 5", "Cat 6"], | ||
"value": 7 | ||
}] |
@@ -13,3 +13,3 @@ 'use strict' | ||
.pipe(ost.thru((obj, cb) => cb(null, obj.cats.length)))) | ||
.then(objs => t.same(objs, [3, 3, 2]), t.fail) | ||
.then(objs => t.same(objs, [3, 3, 2]), t.fail) | ||
) | ||
@@ -19,3 +19,3 @@ | ||
ost.streamToArray(dataStream()) | ||
.then(objs => t.same(objs, data), t.fail) | ||
.then(objs => t.same(objs, data), t.fail) | ||
) | ||
@@ -26,7 +26,7 @@ | ||
.pipe(ost.map(obj => obj.szop))) | ||
.then(set => { | ||
const actual = Array.from(set.values()).sort(); | ||
const expectedData = ["pracz", "niepracz"].sort(); | ||
t.same(actual, expectedData) | ||
}, t.fail) | ||
.then(uniqueSet => { | ||
const actual = Array.from(uniqueSet.values()).sort() | ||
const expectedData = ["pracz", "niepracz"].sort() | ||
t.same(actual, expectedData) | ||
}, t.fail) | ||
) | ||
@@ -37,6 +37,6 @@ | ||
ost.arrayToStream(data) | ||
.pipe(ost.map(obj => obj.szop))) | ||
.then(objs => { | ||
t.same(objs, ["pracz", "pracz", "niepracz"]) | ||
}, t.fail) | ||
.pipe(ost.map(obj => obj.szop))) | ||
.then(objs => { | ||
t.same(objs, ["pracz", "pracz", "niepracz"]) | ||
}, t.fail) | ||
) | ||
@@ -47,3 +47,3 @@ | ||
.pipe(ost.map(obj => obj.foo))) | ||
.then(objs => t.same(objs, ["bar", "foo", "rand"]), t.fail) | ||
.then(objs => t.same(objs, ["bar", "foo", "rand"]), t.fail) | ||
) | ||
@@ -55,24 +55,44 @@ | ||
.pipe(ost.filter(testFilter))) | ||
.then(objs => t.same(objs, data.filter(testFilter)), t.fail) | ||
.then(objs => t.same(objs, data.filter(testFilter)), t.fail) | ||
) | ||
tap.test('Test filter on numerical values', t=> | ||
ost.streamToArray(dataStream() | ||
.pipe(ost.filter(e => e.value > 6))) | ||
.then(objs => t.same(objs, data.filter(e => e.value > 6))) | ||
.catch(t.fail) | ||
) | ||
tap.test('Test reduce', t => | ||
dataStream() | ||
.pipe(ost.map(obj => obj.value)) | ||
.pipe(ost.reduce((acc, curr, i) => { | ||
return acc + curr + i | ||
}, 0)) | ||
.on('data', reducedValue => | ||
t.same(reducedValue, 42 + 1 + 7 + 3)) | ||
.on('error', err => t.fail(err.stack)) | ||
.pipe(jsonStream.stringify()) | ||
.on('end', () => t.end()) | ||
) | ||
// This test is a bit more complicated. We will only let the 1st object through when second has already been called. | ||
tap.test('Test thruParallel', t => { | ||
let secondObjDone = false; | ||
let secondObjDone = false | ||
return ost.streamToArray(dataStream() | ||
.pipe(ost.thruParallel(2, (obj, cb) => { | ||
if (obj.foo === "bar") { | ||
const interval = setInterval(() => { | ||
if (secondObjDone) { | ||
cb(null, obj.cats.length) | ||
clearInterval(interval) | ||
} | ||
}, 100) | ||
return | ||
} | ||
if (obj.foo === "foo") { | ||
secondObjDone = true | ||
} | ||
cb(null, obj.cats.length) | ||
}))) | ||
.pipe(ost.thruParallel(2, (obj, cb) => { | ||
if (obj.foo === "bar") { | ||
const interval = setInterval(() => { | ||
if (secondObjDone) { | ||
cb(null, obj.cats.length) | ||
clearInterval(interval) | ||
} | ||
}, 100) | ||
return | ||
} | ||
if (obj.foo === "foo") { | ||
secondObjDone = true | ||
} | ||
cb(null, obj.cats.length) | ||
}))) | ||
.then(objs => { | ||
@@ -79,0 +99,0 @@ const actualData = objs.sort() |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
10949
186
105
3