object-stream-tools
Advanced tools
Comparing version 1.2.3 to 2.0.0
60
index.js
@@ -5,3 +5,2 @@ 'use strict' | ||
const through2Concurrent = require('through2-concurrent') | ||
const streamToArray = require('stream-to-array') | ||
@@ -11,3 +10,5 @@ function thru(transform, flush) { | ||
objectMode: true, | ||
transform: function(obj, enc, cb) { transform.call(this, obj, cb) }, | ||
transform: function (obj, enc, cb) { | ||
transform.call(this, obj, cb) | ||
}, | ||
flush | ||
@@ -18,4 +19,4 @@ }) | ||
function thruParallel(maxConcurrency, transform, flush) { | ||
return through2Concurrent.obj({ maxConcurrency }, | ||
function(obj, enc, cb) { | ||
return through2Concurrent.obj({maxConcurrency}, | ||
function (obj, enc, cb) { | ||
transform.call(this, obj, cb) | ||
@@ -28,3 +29,3 @@ }, | ||
function arrayToStream(data) { | ||
const newStream = new stream.Readable({ objectMode: true }) | ||
const newStream = newReadable() | ||
data.forEach(item => newStream.push(item)) | ||
@@ -35,15 +36,20 @@ newStream.push(null) | ||
function streamToSet(stream) { | ||
return new Promise((resolve, reject) => { | ||
const set = new Set() | ||
stream | ||
.on('data', data => set.add(data)) | ||
.on('error', reject) | ||
.on('end', () => resolve(set)) | ||
}) | ||
function streamToSet() { | ||
return reduce((acc, curr) => { | ||
acc.add(curr) | ||
return acc | ||
}, new Set()) | ||
} | ||
function streamToArray() { | ||
return reduce((acc, curr) => { | ||
acc.push(curr) | ||
return acc | ||
}, []) | ||
} | ||
function newReadable() { | ||
const rs = new stream.Readable({ objectMode: true }) | ||
rs._read = () => {} | ||
const rs = new stream.Readable({objectMode: true}) | ||
rs._read = () => { | ||
} | ||
return rs | ||
@@ -61,2 +67,13 @@ } | ||
function promiseToStream(promise) { | ||
const newStream = newReadable() | ||
promise | ||
.then(data => { | ||
newStream.push(data) | ||
newStream.push(null) | ||
}) | ||
.catch(err => newStream.emit('error', err)) | ||
return newStream | ||
} | ||
function filter(func) { | ||
@@ -75,3 +92,7 @@ return new stream.Transform({ | ||
function reduce(func, acc) { | ||
function required() { | ||
throw new Error('Initial value required') | ||
} | ||
function reduce(func, acc = required()) { | ||
let i = 0 | ||
@@ -81,3 +102,3 @@ return thru((curr, cb) => { | ||
cb() | ||
}, function() { | ||
}, function () { | ||
this.emit('data', acc) | ||
@@ -92,8 +113,9 @@ this.emit('end') | ||
arrayToStream, | ||
streamToSet, | ||
streamToArray, | ||
streamToSet, | ||
newReadable, | ||
map, | ||
filter, | ||
reduce | ||
reduce, | ||
promiseToStream | ||
} |
{ | ||
"name": "object-stream-tools", | ||
"version": "1.2.3", | ||
"version": "2.0.0", | ||
"description": "Useful tools for manipulating object streams. Will be especially helpful to developers used to map - filter - reduce approach of nodejs arrays.", | ||
@@ -26,3 +26,5 @@ "main": "index.js", | ||
"object stream", | ||
"object streams" | ||
"object streams", | ||
"promise to stream", | ||
"promise to streams" | ||
], | ||
@@ -41,5 +43,4 @@ "author": "Krzysztof Burlinski and Lukasz Gintowt", | ||
"dependencies": { | ||
"stream-to-array": "^2.3.0", | ||
"through2-concurrent": "^1.1.1" | ||
} | ||
} |
@@ -75,10 +75,11 @@ # object-stream-tools | ||
const jsonStream = require('JSONStream') | ||
ost.streamToArray((fs.createReadStream('./test/data.json') | ||
fs.createReadStream('./test/data.json') | ||
.pipe(jsonStream.parse('*')) | ||
// pick required property you want to reduce over | ||
.pipe(ost.map(obj => obj.requiredProperty)) | ||
.pipe(ost.reduce((acc, curr, i) => { | ||
return acc + curr + i | ||
}, 0))) | ||
.then(reducedValue) => { | ||
// here you will get reduced value wrapped in array | ||
}, 0)) | ||
.on('data', reducedValue => { | ||
// here you will get reduced value | ||
}) | ||
@@ -104,3 +105,13 @@ ``` | ||
#### promise to stream | ||
It is a useful helper if you dealing with a lot of smaller data that are wrapped in Promise API, ex: | ||
```js | ||
ost.promiseToStream(myDbQueryThatReturnPromise()) | ||
.on('data', data => { | ||
// here you will get a real stream that you can pipe | ||
}) | ||
``` | ||
## Please look at the tests for more use cases. |
177
test/test.js
@@ -11,49 +11,65 @@ 'use strict' | ||
tap.test('Test thru', t => | ||
ost.streamToArray(dataStream() | ||
.pipe(ost.thru((obj, cb) => cb(null, obj.cats.length)))) | ||
.then(objs => t.same(objs, [3, 3, 2]), t.fail) | ||
dataStream() | ||
.pipe(ost.thru((obj, cb) => cb(null, obj.cats.length))) | ||
.pipe(ost.streamToArray()) | ||
.on('data', objs => t.same(objs, [3, 3, 2])) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
) | ||
tap.test('Test streamToArray', t => | ||
ost.streamToArray(dataStream()) | ||
.then(objs => t.same(objs, data), t.fail) | ||
dataStream() | ||
.pipe(ost.streamToArray()) | ||
.on('data', objs => t.same(objs, data)) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
) | ||
tap.test('Test streamToSet', t => | ||
ost.streamToSet(dataStream() | ||
.pipe(ost.map(obj => obj.szop))) | ||
.then(uniqueSet => { | ||
const actual = Array.from(uniqueSet.values()).sort() | ||
const expectedData = ["pracz", "niepracz"].sort() | ||
t.same(actual, expectedData) | ||
}, t.fail) | ||
tap.test('Test streamToSet returns unique values', t => | ||
dataStream() | ||
.pipe(ost.map(obj => obj.szop)) | ||
.pipe(ost.streamToSet()) | ||
.on('data', uniqueSet => | ||
t.same(Array.from(uniqueSet.values()), ['pracz', 'niepracz']) | ||
) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
) | ||
tap.test('Test arrayToStream', t => | ||
ost.streamToArray( | ||
ost.arrayToStream(data) | ||
.pipe(ost.map(obj => obj.szop))) | ||
.then(objs => { | ||
t.same(objs, ["pracz", "pracz", "niepracz"]) | ||
}, t.fail) | ||
ost.arrayToStream(data) | ||
.pipe(ost.map(obj => obj.szop)) | ||
.pipe(ost.streamToArray()) | ||
.on('data', objs => { | ||
t.same(objs, ['pracz', 'pracz', 'niepracz']) | ||
}) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
) | ||
tap.test('Test map', t => | ||
ost.streamToArray(dataStream() | ||
.pipe(ost.map(obj => obj.foo))) | ||
.then(objs => t.same(objs, ["bar", "foo", "rand"]), t.fail) | ||
dataStream() | ||
.pipe(ost.map(obj => obj.foo)) | ||
.pipe(ost.streamToArray()) | ||
.on('data', objs => t.same(objs, ['bar', 'foo', 'rand'])) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
) | ||
tap.test('Test filter', t => | ||
ost.streamToArray(dataStream() | ||
.pipe(ost.filter(testFilter))) | ||
.then(objs => t.same(objs, data.filter(testFilter)), t.fail) | ||
dataStream() | ||
.pipe(ost.filter(testFilter)) | ||
.pipe(ost.streamToArray()) | ||
.on('data', objs => t.same(objs, data.filter(testFilter))) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
) | ||
tap.test('Test filter on numerical values', t => | ||
ost.streamToArray(dataStream() | ||
.pipe(ost.filter(e => e.value > 6))) | ||
.then(objs => t.same(objs, data.filter(e => e.value > 6))) | ||
.catch(t.fail) | ||
dataStream() | ||
.pipe(ost.filter(e => e.value > 6)) | ||
.pipe(ost.streamToArray()) | ||
.on('data', objs => t.same(objs, data.filter(e => e.value > 6))) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
) | ||
@@ -63,11 +79,11 @@ | ||
dataStream() | ||
.pipe(ost.map(obj => obj.value)) | ||
.pipe(ost.reduce((acc, curr, i) => { | ||
return acc + curr + i | ||
}, 0)) | ||
.on('data', reducedValue => | ||
t.same(reducedValue, 42 + 1 + 7 + 3)) | ||
.on('error', err => t.fail(err.stack)) | ||
.pipe(jsonStream.stringify()) | ||
.on('end', () => t.end()) | ||
.pipe(ost.map(obj => obj.value)) | ||
.pipe(ost.reduce((acc, curr, i) => { | ||
return acc + curr + i | ||
}, 0)) | ||
.on('data', reducedValue => | ||
t.same(reducedValue, 42 + 1 + 7 + 3)) | ||
.on('error', err => t.fail(err.stack)) | ||
.pipe(jsonStream.stringify()) | ||
.on('end', t.end) | ||
) | ||
@@ -78,45 +94,74 @@ | ||
let secondObjDone = false | ||
return ost.streamToArray(dataStream() | ||
.pipe(ost.thruParallel(2, (obj, cb) => { | ||
if (obj.foo === "bar") { | ||
const interval = setInterval(() => { | ||
if (secondObjDone) { | ||
cb(null, obj.cats.length) | ||
clearInterval(interval) | ||
} | ||
}, 100) | ||
return | ||
} | ||
if (obj.foo === "foo") { | ||
secondObjDone = true | ||
} | ||
cb(null, obj.cats.length) | ||
}))) | ||
.then(objs => { | ||
dataStream() | ||
.pipe(ost.thruParallel(2, (obj, cb) => { | ||
if (obj.foo === 'bar') { | ||
const interval = setInterval(() => { | ||
if (secondObjDone) { | ||
cb(null, obj.cats.length) | ||
clearInterval(interval) | ||
} | ||
}, 100) | ||
return | ||
} | ||
if (obj.foo === 'foo') { | ||
secondObjDone = true | ||
} | ||
cb(null, obj.cats.length) | ||
})) | ||
.pipe(ost.streamToArray()) | ||
.on('data', objs => { | ||
const actualData = objs.sort() | ||
const expectedData = data.map(obj => obj.cats.length).sort() | ||
t.same(actualData, expectedData) | ||
}, t.fail) | ||
}) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
}) | ||
tap.test('Test thru not losing this (so it can use this.push)', t => | ||
ost.streamToArray(dataStream() | ||
.pipe(ost.thru(function(obj, cb) { | ||
dataStream() | ||
.pipe(ost.thru(function (obj, cb) { | ||
this.push(obj.cats.length) | ||
this.push(obj.cats.length * 2) | ||
cb() | ||
}))) | ||
.then(objs => t.same(objs, [3, 6, 3, 6, 2, 4]), t.fail) | ||
})) | ||
.pipe(ost.streamToArray()) | ||
.on('data', objs => t.same(objs, [3, 6, 3, 6, 2, 4])) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
) | ||
tap.test('Test thruParallel not losing this (so it can use this.push)', t => | ||
ost.streamToArray(dataStream() | ||
.pipe(ost.thruParallel(2, function(obj, cb) { | ||
dataStream() | ||
.pipe(ost.thruParallel(2, function (obj, cb) { | ||
this.push(obj.cats.length) | ||
this.push(obj.cats.length * 2) | ||
cb() | ||
}))) | ||
.then(objs => t.same(objs.sort(), [3, 6, 3, 6, 2, 4].sort()), t.fail) | ||
})) | ||
.pipe(ost.streamToArray()) | ||
.on('data', objs => t.same(objs.sort(), [3, 6, 3, 6, 2, 4].sort())) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
) | ||
tap.test('Test promise to stream on successful resolution', t => { | ||
ost.promiseToStream(new Promise((resolve, reject) => { | ||
setTimeout(() => resolve([3, 6, 3, 6, 2, 4]), 10) | ||
})) | ||
.on('data', objs => t.same(objs, [3, 6, 3, 6, 2, 4])) | ||
.on('error', t.fail) | ||
.on('end', t.end) | ||
}) | ||
tap.test('Test promise to stream on rejection', t => { | ||
ost.promiseToStream(new Promise((resolve, reject) => { | ||
setTimeout(() => reject([3, 6, 3, 6, 2, 4]), 10) | ||
})) | ||
.on('data', data => t.fail('this one should be rejected')) | ||
.on('error', err => { | ||
t.pass('this promise is rejected') | ||
t.end() | ||
}) | ||
}) | ||
function dataStream() { | ||
@@ -128,3 +173,3 @@ return fs.createReadStream('./test/data.json') | ||
function testFilter(obj) { | ||
return obj.szop === "pracz" | ||
return obj.szop === 'pracz' | ||
} |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
New author
Supply chain riskA new npm collaborator published a version of the package for the first time. New collaborators are usually benign additions to a project, but do indicate a change to the security surface area of a package.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
13573
1
269
116
1
- Removedstream-to-array@^2.3.0
- Removedany-promise@1.3.0(transitive)
- Removedstream-to-array@2.3.0(transitive)