pipe-iterators
Advanced tools
Comparing version 1.0.0 to 1.1.0
101
index.js
var through = require('through2'), | ||
cloneLib = require('clone'), | ||
Readable = require('readable-stream').Readable; | ||
Readable = require('readable-stream').Readable, | ||
xtend = require('xtend'); | ||
@@ -33,2 +34,3 @@ var isStream = require('./lib/is-stream.js'), | ||
acc = (!captureFirst ? initial : null); | ||
return through.obj(function(obj, enc, onDone) { | ||
@@ -133,3 +135,3 @@ if (captureFirst) { | ||
// based on https://github.com/deoxxa/duplexer2/pull/6 | ||
// based on https://github.com/deoxxa/duplexer2/pull/6 (with an additional bugfix) | ||
exports.combine = function(writable, readable) { | ||
@@ -152,4 +154,3 @@ if (!isStream.isWritable(writable)) { | ||
var shouldRead = false, | ||
stream = exports.duplex.obj(function(chunk, enc, done) { | ||
var stream = exports.duplex.obj(function(chunk, enc, done) { | ||
// Node 0.8.x writable streams do not accept the third parameter, done | ||
@@ -162,18 +163,13 @@ var ok = writable.write(chunk, enc); | ||
} | ||
}, function() { | ||
if (shouldRead) { return; } | ||
shouldRead = true; | ||
forwardRead(); | ||
}); | ||
}, forwardRead); | ||
writable.once('finish', function() { stream.end(); }); | ||
stream.once('finish', function() { writable.end(); }); | ||
writable.on('error', function(err) { return stream.emit('error', err); }); | ||
readable.on('readable', forwardRead); | ||
readable.once('end', function() { return stream.push(null); }); | ||
readable.on('error', function(err) { return stream.emit('error', err); }); | ||
readable.once('end', function() { stream.push(null); }); | ||
writable.on('error', function(err) { stream.emit('error', err); }); | ||
readable.on('error', function(err) { stream.emit('error', err); }); | ||
function forwardRead() { | ||
if (!shouldRead) { return; } | ||
var data, waitingToRead = true; | ||
@@ -184,5 +180,6 @@ while ((data = readable.read()) !== null) { | ||
} | ||
shouldRead = waitingToRead; | ||
if (waitingToRead) { | ||
readable.once('readable', forwardRead); | ||
} | ||
} | ||
return stream; | ||
@@ -236,5 +233,4 @@ }; | ||
exports.match = function() { | ||
var args = (Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments)), | ||
conditions = [], | ||
function parseMatch(args) { | ||
var conditions = [], | ||
streams = [], | ||
@@ -255,10 +251,59 @@ i = 0; | ||
} | ||
return { conditions: conditions, streams: streams }; | ||
} | ||
return new Match({ | ||
objectMode: true, | ||
conditions: conditions, | ||
streams: streams | ||
}); | ||
exports.match = function() { | ||
var args = (Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments)); | ||
return new Match(xtend({ objectMode: true }, parseMatch(args))); | ||
}; | ||
exports.merge = require('merge-stream'); | ||
exports.forkMerge = function() { | ||
var args = (Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments)); | ||
return exports.combine(exports.fork(args), exports.merge(args)); | ||
}; | ||
exports.matchMerge = function() { | ||
var args = (Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments)), | ||
parsed = xtend({ objectMode: true }, parseMatch(args)); | ||
return exports.combine(new Match(parsed), exports.merge(parsed.streams)); | ||
}; | ||
var miniq = require('miniq'); | ||
exports.parallel = function(limit, execFn, endFn) { | ||
if (!execFn) { | ||
execFn = function(task, enc, done) { task.call(this, done); }; | ||
} | ||
var queue = miniq(limit), | ||
cleanup = function(done) { | ||
queue.removeAllListeners(); | ||
if (endFn) { endFn(done); } else { done(); } | ||
}, | ||
stream = exports.thru.obj(function(chunk, enc, chunkDone) { | ||
queue.exec(function(taskDone) { | ||
execFn.call(stream, chunk, enc, taskDone); | ||
}); | ||
if (!queue.isFull()) { | ||
chunkDone(); // ask for more tasks, queue still has space | ||
} else { | ||
queue.once('done', function() { chunkDone(); }); // wait until a task completes | ||
} | ||
}, function(done) { | ||
// once "_flush" occurs, wait for the queue to also become empty | ||
if (queue.isEmpty()) { | ||
cleanup(done); | ||
} else { | ||
queue.once('empty', cleanup.bind(this, done)); | ||
} | ||
}); | ||
queue.on('done', stream.emit.bind(stream, 'done')); | ||
queue.on('error', stream.emit.bind(stream, 'error')); | ||
queue.on('empty', stream.emit.bind(stream, 'empty')); | ||
return stream; | ||
}; | ||
// Constructing pipelines from individual elements | ||
@@ -269,7 +314,7 @@ | ||
if (!isStream.isReadable(args[0])) { | ||
throw new Error('First stream must be readable.'); | ||
throw new Error('pipe(): First stream must be readable.'); | ||
} | ||
if (!isStream.isWritable(args[0])) { | ||
throw new Error('Last stream must be writable.'); | ||
throw new Error('pipe(): Last stream must be writable.'); | ||
} | ||
@@ -279,3 +324,3 @@ | ||
if (!isStream.isDuplex(stream)) { | ||
throw new Error('Streams in the pipeline must be duplex.'); | ||
throw new Error('pipe(): Streams in the pipeline must be duplex.'); | ||
} | ||
@@ -299,3 +344,3 @@ }); | ||
exports.pipeline = function() { | ||
var streams = exports.pipe(Array.prototype.slice.call(arguments)), | ||
var streams = exports.pipe((Array.isArray(arguments[0]) ? arguments[0] : Array.prototype.slice.call(arguments))), | ||
last = streams[streams.length - 1], | ||
@@ -302,0 +347,0 @@ isDuplex = isStream.isDuplex(last), |
@@ -13,3 +13,3 @@ var Duplex = require('readable-stream').Duplex, | ||
if (typeof _write !== 'function') { | ||
throw new Error('You must implement an _read function'); | ||
throw new Error('You must implement a _write function'); | ||
} | ||
@@ -16,0 +16,0 @@ |
{ | ||
"name": "pipe-iterators", | ||
"version": "1.0.0", | ||
"description": "Map, reduce, filter, fork, pipeline and other utility functions for iterating over object mode streams.", | ||
"version": "1.1.0", | ||
"description": "Like underscore for Node streams. Map, reduce, filter, fork, pipeline and other utility functions for iterating over object mode streams.", | ||
"main": "index.js", | ||
@@ -27,2 +27,4 @@ "scripts": { | ||
"clone": "0.1.18", | ||
"merge-stream": "0.1.6", | ||
"miniq": "~1.0.0", | ||
"readable-stream": "*", | ||
@@ -29,0 +31,0 @@ "through2": "*", |
170
readme.md
# pipe-iterators | ||
Functions for iterating over object mode streams: `forEach`, `map`, `mapKey`, `reduce`, `filter`, `fromArray`, `toArray`, `devnull`, `pipe`, `head`, `tail`, `pipe`, `through`, `thru`, `writable`, `readable`, `duplex`, `pipeline`. | ||
Like underscore for Node streams (streams2 and up). | ||
Functions for iterating over object mode streams: | ||
- [Iteration functions](#iteration-functions): [`forEach`](#foreach), [`map`](#map), [`reduce`](#reduce), [`filter`](#filter), [`mapKey`](#mapkey) | ||
- [Input and output](#input-and-output): [`fromArray`](#fromarray), [`toArray`](#toarray) | ||
- [Constructing streams](#constructing-streams): [`through` / `thru`](#thru--through), [`writable`](#writable), [`readable`](#readable), [`duplex`](#duplex), [`combine`](#combine), [`devnull`](#devnull), [`cap`](#cap), [`clone`](#clone) | ||
- [Control flow](#control-flow): [`fork`](#fork), [`match`](#match), [`merge`](#merge), [`forkMerge`](#forkmerge), [`matchMerge`](#matchmerge), [`parallel`](#parallel) | ||
- [Constructing pipelines from individual elements](#constructing-pipelines-from-individual-elements): [`pipe`](#pipe), [`head`](#head), [`tail`](#tail), [`pipeline`](#pipeline) | ||
- [Checking stream instances](#checking-stream-instances): [`isStream`](#isstream), [`isReadable`](#isreadable), [`isWritable`](#iswritable), [`isDuplex`](#isduplex) | ||
# Installation | ||
@@ -15,2 +24,9 @@ | ||
## Changelog | ||
`v1.1.0`: | ||
- added the `merge`, `forkMerge`, `matchMerge` and `parallel` functions. | ||
- fixed a bug in `pipeline`. | ||
## Iteration functions | ||
@@ -26,3 +42,3 @@ | ||
Returns a duplex stream which calls a function for each element in the stream. `callback` is invoked with two arguments - `obj` (the element value) and `index` (the element index). The return value from the callback is ignored. | ||
Returns a duplex stream which calls a function for each element in the stream. `callback` is invoked with two arguments - `obj` (the element value) and `index` (the element index). The return value from the callback is ignored. | ||
@@ -42,3 +58,3 @@ If `thisArg` is provided, it is available as `this` within the callback. | ||
Returns a duplex stream which produces a new stream of values by mapping each value in the stream through a transformation callback. The callback is invoked with two arguments, `obj` (the element value) and `index` (the element index). The return value from the callback is written back to the stream. | ||
Returns a duplex stream which produces a new stream of values by mapping each value in the stream through a transformation callback. The callback is invoked with two arguments, `obj` (the element value) and `index` (the element index). The return value from the callback is written back to the stream. | ||
@@ -64,3 +80,3 @@ If `thisArg` is provided, it is available as `this` within the callback. | ||
If `initialValue` is not provided, then `prev` will be equal to the first value in the array and `curr` will be equal to the second on the first call. | ||
If `initialValue` is not provided, then `prev` will be equal to the first value in the array and `curr` will be equal to the second on the first call. | ||
@@ -80,3 +96,3 @@ ```js | ||
The callback is invoked with two arguments, `obj` (the element value) and `index` (the element index). If the callback returns `true`, the element is written to the next stream, otherwise the element is filtered out. | ||
The callback is invoked with two arguments, `obj` (the element value) and `index` (the element index). If the callback returns `true`, the element is written to the next stream, otherwise the element is filtered out. | ||
@@ -94,3 +110,3 @@ ```js | ||
Returns a duplex stream which produces a new stream of values by mapping a single key (when given `key` and `callback`) or multiple keys (when given `hash`) through a transformation callback. The callback is invoked with two arguments: `value` (the value `element[key]`), and `obj` (the element itself). The return value from the callback is set on the element, and the element itself is written back to the stream. | ||
Returns a duplex stream which produces a new stream of values by mapping a single key (when given `key` and `callback`) or multiple keys (when given `hash`) through a transformation callback. The callback is invoked with three arguments: `value` (the value `element[key]`), `obj` (the element itself) and `index` (the element index). The return value from the callback is set on the element, and the element itself is written back to the stream. | ||
@@ -156,5 +172,7 @@ If `thisArg` is provided, it is available as `this` within the callback. | ||
- The `flushFn` has the signature `function(onDone)`. See the [core docs](http://nodejs.org/api/stream.html#stream_transform_flush_callback) for details. | ||
- `thru.obj(fn)` is a convenience wrapper around `thru({ objectMode: true }, fn)`. | ||
- `thru.obj(fn)` is a convenience wrapper around `thru({ objectMode: true }, fn)`. | ||
- `thru.ctor()` returns a constructor for a custom Transform. This is useful when you want to use the same transform logic in multiple instances. | ||
BTW, if you need parallel execution but with the same API as a `thru` stream, check out [`parallel`](#parallel) in the control flow section. | ||
### writable | ||
@@ -175,3 +193,3 @@ | ||
- `writable.obj()` is a convenience wrapper for `writable({ objectMode: true })`. | ||
- `writable.ctor()` returns a constructor for the writable stream. | ||
- `writable.ctor()` returns a constructor for the writable stream. | ||
@@ -208,3 +226,3 @@ ### readable | ||
- The `options` hash is passed to `stream.Duplex` to construct the stream. See the [core docs](http://nodejs.org/api/stream.html#stream_new_stream_duplex_options). | ||
- The `writeFn` has the signature: `function(chunk, encoding, callback) {}`. See the [core docs](http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1) for details. | ||
- The `writeFn` has the signature: `function(chunk, encoding, callback) {}`. See the [core docs](http://nodejs.org/api/stream.html#stream_writable_write_chunk_encoding_callback_1) for details. | ||
- The `readFn` has the signature: `function(size) {}`. See the [core docs](http://nodejs.org/api/stream.html#stream_readable_read_size_1) for details. | ||
@@ -220,3 +238,3 @@ - `duplex.obj()` is a convenience wrapper for `duplex({ objectMode: true })`. | ||
Takes a readable stream and a writable stream and returns a duplex stream. | ||
Takes a readable stream and a writable stream and returns a duplex stream. | ||
@@ -273,4 +291,6 @@ Note: the two streams ARE NOT piped together. If you want to construct a pipeline with multiple streams, you can, but you need to perform the pipe operations yourself (or use the `.pipeline` function instead). This makes `.combine` work with streams where the connections is not via a pipe mechanism, like with `child_process.spawn`: | ||
Also accepts a single array of streams as the first parameter. | ||
Also accepts a single array of streams as the first parameter. | ||
Listeners for the `error` event on the stream returned from `fork` will receive errors that are emitted in any of the streams in passed to the function. | ||
### match | ||
@@ -285,3 +305,3 @@ | ||
Returns a writable stream given a series of `condition` function and `stream` pairs. When elements are written to the stream, they are matched against each condition function in order. | ||
Returns a writable stream given a series of `condition` function and `stream` pairs. When elements are written to the stream, they are matched against each condition function in order. | ||
@@ -292,5 +312,7 @@ The `condition` function is called with two arguments - `obj` (the element value) and `index` (the element index). If the condition returns `true`, the element is written to the associated stream and no further matches are performed. | ||
Listeners for the `error` event on the stream returned from `match` will receive errors that are emitted in any of the streams in passed to the function. | ||
```js | ||
pi.fromArray([ | ||
{ url: '/people' }, | ||
{ url: '/people' }, | ||
{ url: '/posts/1' }, { url: '/posts' }, | ||
@@ -301,3 +323,3 @@ { url: '/comments/2' }]) | ||
pi.pipeline( | ||
pi.forEach(function(obj) { console.log('person!', obj); }), | ||
pi.forEach(function(obj) { console.log('person!', obj); }), | ||
pi.devNull() | ||
@@ -307,7 +329,7 @@ ), | ||
pi.pipeline( | ||
pi.forEach(function(obj) { console.log('post!', obj); }), | ||
pi.forEach(function(obj) { console.log('post!', obj); }), | ||
pi.devNull() | ||
), | ||
pi.pipeline( | ||
pi.forEach(function(obj) { console.log('other:', obj); }), | ||
pi.forEach(function(obj) { console.log('other:', obj); }), | ||
pi.devNull() | ||
@@ -318,5 +340,104 @@ ) | ||
TODO: Listening for 'error' will recieve errors from all streams inside the pipe. | ||
### merge | ||
```js | ||
pi.merge(stream1, [stream2], [...]) | ||
pi.merge([ stream1, stream2, ... ]) | ||
``` | ||
Takes multiple readable streams and merges them into one stream. Accepts any number of readable streams and returns a duplex stream. | ||
### forkMerge | ||
```js | ||
pi.forkMerge(stream1, [stream2], [...]) | ||
pi.forkMerge([ stream1, stream2, ... ]) | ||
``` | ||
Fork followed by merge on a set of streams. Accepts any number of duplex streams; returns a duplex stream that: | ||
- `fork`s each input, writes each input into the streams, | ||
- reads and `merge`s the inputs from the streams and writes them out | ||
Useful if you need to concurrently apply different operations on a single input but want to produce a single merged output. | ||
``` | ||
/ to-html() \ | ||
read .md() - to-pdf() - write-to-disk() | ||
\ to-rtf() / | ||
``` | ||
For example, imagine converting a set of Markdown files into the HTML, PDF and RTF formats - the same file goes in, each of the processing operations are applied, but at the end there are three objects (binary files in the different formats) that go into the same "write to disk" pipeline. | ||
### matchMerge | ||
```js | ||
pi.matchMerge(condition1, stream1, [condition2], [stream2], [...], [rest]) | ||
pi.matchMerge([ condition1, stream1, condition2, stream2, ..., rest ]) | ||
``` | ||
Match followed by merge on a set of streams. Accepts any number of duplex streams; returns a duplex stream that: | ||
- `match`es conditions, selects the correct stream and writes to that stream | ||
- reads and `merge`s the inputs from each of the streams and writes them out | ||
Useful if you want to conditionally process some elements differently, while sharing the same downstream pipeline. | ||
For example, if you want to first check a cache and skip some processing for items that hit in the cache, you could do something like `pi.matchMerge(checkCache, getResultFromCache, performFullProcessing)` (where `checkCache` is a function and the other two are through streams). | ||
### parallel | ||
```js | ||
pi.parallel(limit, [transformFn], [flushFn]) | ||
``` | ||
Returns a object-mode Transform stream given a `limit`, a `transformFn` and `flushFn`. Works like a `through.obj` stream but: | ||
- the `transformFn` can be launched multiple times in parallel, with up to `limit` tasks running at the same time | ||
- the `flushFn` is only called after both 1) the thru-stream is instructed to end AND 2) all the tasks have been completed. | ||
- the stream emits the following events: | ||
- `"done"`: emitted after each `transformFn` execution completes | ||
- `"empty"`: emitted when the execution queue becomes empty | ||
The usual thru-stream conventions apply: | ||
- The `transformFn` has the signature: `function (chunk, encoding, onDone) {}`. See the [core docs](http://nodejs.org/api/stream.html#stream_transform_transform_chunk_encoding_callback) for details. | ||
- The `flushFn` has the signature `function(onDone)`. See the [core docs](http://nodejs.org/api/stream.html#stream_transform_flush_callback) for details. | ||
Both `transformFn` and `flushFn` are optional. If the transformFn is not provided, then it defaults to: | ||
```js | ||
function(task, enc, done) { task.call(this, done); } | ||
``` | ||
which works nicely if the items in your stream are something like: | ||
```js | ||
pi.fromArray([ | ||
function(done) { this.push(1); done(); }, | ||
function(done) { this.push(2); done(); } | ||
]) | ||
.pipe(pi.parallel(2)) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(result.sort(), [1, 2]); | ||
})); | ||
``` | ||
Note how each task runs with `this` set to the `parallel` stream, which means you can push results out. Similar to normal core streams, the `done` function can return one argument - `err`. If you need to process the other arguments, define your own `transformFn`. | ||
Of course, you don't have to use callback functions just to get parallel processing - any task, even a basic thru stream like: | ||
```js | ||
pi.parallel(16, function(filename, enc, done) { | ||
var self = this; | ||
fs.stat(filename, function(err, result) { | ||
self.push(result); done(); | ||
}) | ||
}); | ||
``` | ||
will execute up to 16 stat calls at a time with `parallel`. | ||
Note that you can safely call `this.write()` from within the transform function to add more tasks to run - this can be useful if your task processing causes more tasks to need to run. If you need the new payloads to go through some upstream processing, you can might consider writing to another stream that precedes `parallel`, provided you haven't ended that stream yet. | ||
## Constructing pipelines from individual elements | ||
@@ -335,3 +456,3 @@ | ||
Also accepts a single array of streams as the first parameter. | ||
Also accepts a single array of streams as the first parameter. | ||
@@ -347,5 +468,5 @@ ### head() | ||
Also accepts a single array of streams as the first parameter. | ||
Also accepts a single array of streams as the first parameter. | ||
Similar to `a.pipe(b).pipe(c)`, but `.head()` returns the first stream (`a`) rather than the last stream (`c`). | ||
Similar to `a.pipe(b).pipe(c)`, but `.head()` returns the first stream (`a`) rather than the last stream (`c`). | ||
@@ -361,3 +482,3 @@ ### tail() | ||
Also accepts a single array of streams as the first parameter. | ||
Also accepts a single array of streams as the first parameter. | ||
@@ -394,2 +515,4 @@ Just like calling `a.pipe(b).pipe(c)`. | ||
Listeners for the `error` event on the stream returned from `pipeline` will receive errors that are emitted in any of the streams in passed to the function. | ||
### Checking stream instances | ||
@@ -426,3 +549,3 @@ | ||
```js | ||
pi.isDuplext(obj) | ||
pi.isDuplex(obj) | ||
``` | ||
@@ -434,3 +557,3 @@ | ||
Meh, `through2` streams already make writing async iteration quite easy. | ||
Meh, `through2` streams already make writing async iteration quite easy. | ||
@@ -452,1 +575,2 @@ ## What about splitting strings? | ||
- [rvagg/isstream](https://github.com/rvagg/isstream) | ||
- [grncdr/merge-stream](https://github.com/grncdr/merge-stream) |
@@ -157,1 +157,316 @@ var assert = require('assert'), | ||
}); | ||
describe('merge', function() { | ||
it('returns a duplex stream', function() { | ||
assert.ok(isDuplex(pi.merge())); | ||
}); | ||
it('merges multiple streams', function(done) { | ||
pi.merge(pi.fromArray(1, 2), pi.fromArray(3, 4), pi.fromArray(5, 6)) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(result.sort(), [ 1, 2, 3, 4, 5, 6 ]); | ||
done(); | ||
})); | ||
}); | ||
it('merges multiple streams, arg is array', function(done) { | ||
pi.merge([pi.fromArray(1, 2), pi.fromArray(3, 4), pi.fromArray(5, 6)]) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(result.sort(), [ 1, 2, 3, 4, 5, 6 ]); | ||
done(); | ||
})); | ||
}); | ||
it('works with just one stream', function(done) { | ||
pi.merge(pi.fromArray(1)) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(result.sort(), [ 1 ]); | ||
done(); | ||
})); | ||
}); | ||
it('works with one empty stream', function(done) { | ||
pi.merge(pi.fromArray(1), pi.fromArray(), pi.fromArray(2)) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(result.sort(), [ 1, 2 ]); | ||
done(); | ||
})); | ||
}); | ||
it('works with just empty streams', function(done) { | ||
pi.merge(pi.fromArray(), pi.fromArray()) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(result.sort(), []); | ||
done(); | ||
})); | ||
}); | ||
it('works in flowing mode', function(done) { | ||
var result = []; | ||
pi.merge(pi.fromArray(1, 2), pi.fromArray(3, 4), pi.fromArray(5, 6)) | ||
.on('data', function(data) { result.push(data); }) | ||
.once('end', function() { | ||
assert.deepEqual(result.sort(), [ 1, 2, 3, 4, 5, 6]); | ||
done(); | ||
}); | ||
}); | ||
}); | ||
function logEvts(id, stream) { | ||
// readable (non-flowing) stream | ||
return stream.on('readable', function() { | ||
console.log('[' + id +'] "readable"'); | ||
}) | ||
.on('end', function() { | ||
console.log('[' + id +'] "end"'); | ||
}) | ||
.on('close', function() { | ||
console.log('[' + id +'] "close"'); | ||
}) | ||
.on('error', function(err) { | ||
console.log('[' + id +'] "error"', err); | ||
}) | ||
// writable (non-flowing) stream | ||
.on('drain', function() { | ||
console.log('[' + id +'] "drain"'); | ||
}) | ||
.on('finish', function() { | ||
console.log('[' + id +'] "finish"'); | ||
}) | ||
.on('pipe', function() { | ||
console.log('[' + id +'] "pipe"'); | ||
}) | ||
.on('unpipe', function() { | ||
console.log('[' + id +'] "unpipe"'); | ||
}); | ||
} | ||
function logStream(id) { | ||
return logEvts(id, pi.thru.obj(function(data, enc, done) { | ||
console.log('[' + id + '] _transform ' + data); | ||
this.push(data); | ||
done(); | ||
}, function(done) { | ||
console.log('[' + id +'] _flush'); | ||
done(); | ||
})); | ||
} | ||
describe('forkMerge', function() { | ||
function doubler(val) { return val * 2; } | ||
function add100(val) { return val + 100; } | ||
it('combines a fork stream and a merge stream', function(done) { | ||
pi.fromArray(1, 2, 3) | ||
.pipe( | ||
pi.forkMerge( | ||
pi.pipeline(pi.map(doubler), pi.map(doubler)), | ||
pi.pipeline(pi.map(add100), pi.map(add100)) | ||
) | ||
).pipe(pi.toArray(function(result) { | ||
assert.deepEqual( | ||
result.sort(function(a, b){ return a-b; }), | ||
[ 4, 8, 12, 201, 202, 203 ] | ||
); | ||
done(); | ||
})) | ||
}); | ||
}); | ||
describe('matchMerge', function() { | ||
function add10(val) { return val + 10; } | ||
function add100(val) { return val + 100; } | ||
it('combines a match stream and a merge stream', function(done) { | ||
pi.fromArray([ 1, 2, 3, 4, 5, 6 ]) | ||
.pipe(pi.matchMerge( | ||
function(obj) { return obj % 2 == 0; }, | ||
pi.map(add10), | ||
function(obj) { return obj % 3 == 0; }, | ||
pi.map(add100), | ||
pi.thru.obj() | ||
)) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual( | ||
result.sort(function(a, b){ return a-b; }), | ||
[ | ||
1, // 1 -> 1 | ||
5, // 5 -> 5 | ||
12, // 2 -> + 10 -> 12 | ||
14, // 4 -> + 10 -> 14 | ||
16, // 6 -> + 10 -> 16 | ||
103 // 3 -> + 100 -> 103 | ||
] | ||
); | ||
done(); | ||
})); | ||
}); | ||
}); | ||
describe('parallel', function() { | ||
it('can execute a series of tasks in serial order', function(done) { | ||
var calls = []; | ||
pi.fromArray(1, 2, 3) | ||
.pipe(pi.map(function(val, i) { | ||
calls.push(i); | ||
return function (done) { | ||
this.push(val * 2); | ||
done(); | ||
}; | ||
})) | ||
.pipe(pi.parallel(1)) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(calls, [ 0, 1, 2]); | ||
assert.deepEqual(result, [ 2, 4, 6 ]); | ||
done(); | ||
})); | ||
}); | ||
it('can run the example', function(done) { | ||
pi.fromArray([ | ||
function(done) { this.push(1); done(); }, | ||
function(done) { this.push(2); done(); } | ||
]) | ||
.pipe(pi.parallel(2)) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(result.sort(), [1, 2]); | ||
done(); | ||
})); | ||
}); | ||
it('can execute a series of tasks with parallelism 2', function(done) { | ||
pi.fromArray([ | ||
function(done) { | ||
var self = this; | ||
setTimeout(function() { | ||
self.push(1); | ||
done(); | ||
}, 50); | ||
}, | ||
function(done) { | ||
var self = this; | ||
setTimeout(function() { | ||
self.push(2); | ||
done(); | ||
}, 100); | ||
}, | ||
function(done) { | ||
var self = this; | ||
setTimeout(function() { | ||
self.push(3); | ||
done(); | ||
}, 25); | ||
} | ||
]) | ||
.pipe(pi.parallel(2)) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(result, [ 1, 3, 2 ]); // due to timeouts | ||
done(); | ||
})); | ||
}); | ||
it('can execute a series of tasks with infinite parallelism', function(done) { | ||
pi.fromArray([ | ||
function(done) { | ||
var self = this; | ||
setTimeout(function() { | ||
self.push(1); | ||
done(); | ||
}, 50); | ||
}, | ||
function(done) { | ||
var self = this; | ||
setTimeout(function() { | ||
self.push(2); | ||
done(); | ||
}, 100); | ||
}, | ||
function(done) { | ||
var self = this; | ||
setTimeout(function() { | ||
self.push(3); | ||
done(); | ||
}, 25); | ||
} | ||
]) | ||
.pipe(pi.parallel(Infinity)) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(result, [ 3, 1, 2 ]); // due to timeouts | ||
done(); | ||
})); | ||
}); | ||
it('works with empty', function(done) { | ||
pi.fromArray([]) | ||
.pipe(pi.parallel(Infinity)) | ||
.pipe(pi.toArray(function(result) { | ||
assert.deepEqual(result, [ ]); | ||
done(); | ||
})); | ||
}); | ||
it('add to parallel while executing', function(done) { | ||
var callOrder = []; | ||
// there are no guarantees that one "done" action runs | ||
// before another (unless you do parallelism = 1) | ||
function checkDone() { | ||
if (callOrder.length < 4) { | ||
return; | ||
} | ||
var expected = [ '1-1', '1-2', '2-2', '2-1' ]; | ||
assert.ok( | ||
expected.every(function(item) { return callOrder.indexOf(item) > -1; }), | ||
'every callback should have run'); | ||
done(); | ||
} | ||
pi.fromArray([ | ||
function a(done) { | ||
callOrder.push('1-1'); | ||
done(); | ||
// add more tasks | ||
this.write(function c(done) { | ||
setTimeout(function() { | ||
callOrder.push('2-1'); | ||
done(); | ||
checkDone(); | ||
}, 20); | ||
}); | ||
this.write(function d(done) { | ||
callOrder.push('2-2'); | ||
done(); | ||
checkDone(); | ||
}); | ||
}, | ||
function b(done) { | ||
setTimeout(function() { | ||
callOrder.push('1-2'); | ||
done(); | ||
checkDone(); | ||
}, 100); | ||
} | ||
]) | ||
.pipe(pi.parallel(1)) | ||
.pipe(pi.devnull()); | ||
}); | ||
}); |
@@ -130,2 +130,14 @@ var assert = require('assert'), | ||
}); | ||
// readable | ||
xit('emits the readable event when readable'); | ||
xit('emits the data event when in push stream mode'); | ||
xit('emits the end event when in push stream mode'); | ||
xit('emits the close event when the input stream emits close'); | ||
// writable | ||
xit('emits the finish event'); | ||
xit('emits the drain event when drained'); | ||
xit('emits the pipe event when piped to'); | ||
xit('emits the unpipe event when unpiped from'); | ||
}); |
Sorry, the diff of this file is not supported yet
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
554
2
69667
6
14
1433
+ Addedmerge-stream@0.1.6
+ Addedminiq@~1.0.0
+ Addedcore-util-is@1.0.3(transitive)
+ Addedisarray@0.0.1(transitive)
+ Addedmerge-stream@0.1.6(transitive)
+ Addedmicroee@0.0.6(transitive)
+ Addedminiq@1.0.1(transitive)
+ Addedondone@1.0.0(transitive)
+ Addedreadable-stream@1.0.34(transitive)
+ Addedstring_decoder@0.10.31(transitive)
+ Addedthrough2@0.6.5(transitive)