pull-stream
Advanced tools
Comparing version 2.28.4 to 3.0.0
@@ -5,3 +5,3 @@ # Sinks | ||
You *must* have a sink at the end of a pipeline | ||
for data to move through. | ||
for data to move towards. | ||
@@ -11,5 +11,3 @@ You can only use _one_ sink per pipeline. | ||
``` js | ||
source() | ||
.pipe(through()) //optional | ||
.pipe(sink()) | ||
pull(source, through, sink) | ||
``` | ||
@@ -16,0 +14,0 @@ |
@@ -10,5 +10,3 @@ # Sources | ||
``` js | ||
source() | ||
.pipe(through()) //optional | ||
.pipe(sink()) | ||
pull(source, through, sink) | ||
``` | ||
@@ -41,23 +39,2 @@ | ||
## defer | ||
create a false source-stream that will be attached to a | ||
real source-stream later. Use when you must do an async | ||
operation before you can create the stream. | ||
``` js | ||
function ls (dir) { | ||
var ds = pull.defer() | ||
fs.readdir(dir, function (err, ls) { | ||
if(err) return ds.abort(err) | ||
return ds.resolve(readArray(ls) | ||
.pipe(pull.map(function (file) { | ||
return path.resolve(dir, file) | ||
}) | ||
}) | ||
return ds | ||
} | ||
``` | ||
## empty | ||
@@ -74,34 +51,1 @@ | ||
## pushable | ||
Create a false source stream with a `.push(data, cb?)` | ||
property. Use when you really need a push api, | ||
or need to adapt pull-stream to some other push api. | ||
``` js | ||
function ls (dir) { | ||
var ps = pull.pushable() | ||
fs.readdir(dir, function (err, ls) { | ||
if(err) return ps.end(err) | ||
ls.forEach(function (file) { | ||
ps.push(path.resolve(dir, file)) | ||
}) | ||
ps.end() | ||
}) | ||
return ps | ||
} | ||
``` | ||
## depthFirst, widthFirst, leafFirst (start, createStream) | ||
Traverse a tree structure. `start` is a value that represents | ||
a node. `createStream` is a function that returns | ||
a pull-stream of the children of a node. | ||
`start` must be the same type output by `createStream`. | ||
``` js | ||
//passing in the `ls` function from the `defer` example. | ||
pull.widthFirst(process.cwd(), ls) | ||
.pipe(pull.log()) | ||
``` | ||
@@ -12,5 +12,3 @@ # Throughs | ||
```js | ||
source() | ||
.pipe(through()) //optional | ||
.pipe(sink()) | ||
pull(source, through, sink) | ||
``` | ||
@@ -79,9 +77,2 @@ | ||
## group (length) | ||
Group incoming data into arrays of max length `length`, | ||
(the last item may be shorter than `length`) | ||
Useful for data you can handle in batches. | ||
## flatten () | ||
@@ -91,9 +82,1 @@ | ||
## highWaterMark (n) | ||
An async buffering stream. | ||
`highWaterMark` will eagerly read from the source stream, | ||
while there are less than `n` chunks in the buffer. | ||
67
index.js
var sources = require('./sources') | ||
var sinks = require('./sinks') | ||
var throughs = require('./throughs') | ||
var u = require('pull-core') | ||
function isFunction (fun) { | ||
return 'function' === typeof fun | ||
} | ||
exports = module.exports = require('./pull') | ||
function isReader (fun) { | ||
return fun && (fun.type === "Through" || fun.length === 1) | ||
} | ||
var exports = module.exports = function pull () { | ||
var args = [].slice.call(arguments) | ||
if(isReader(args[0])) | ||
return function (read) { | ||
args.unshift(read) | ||
return pull.apply(null, args) | ||
} | ||
var read = args.shift() | ||
//if the first function is a duplex stream, | ||
//pipe from the source. | ||
if(isFunction(read.source)) | ||
read = read.source | ||
function next () { | ||
var s = args.shift() | ||
if(null == s) | ||
return next() | ||
if(isFunction(s)) return s | ||
return function (read) { | ||
s.sink(read) | ||
//this supports pipeing through a duplex stream | ||
//pull(a, b, a) "telephone style". | ||
//if this stream is in the a (first & last position) | ||
//s.source will have already been used, but this should never be called | ||
//so that is okay. | ||
return s.source | ||
} | ||
} | ||
while(args.length) | ||
read = next() (read) | ||
return read | ||
} | ||
for(var k in sources) | ||
exports[k] = u.Source(sources[k]) | ||
exports[k] = sources[k] | ||
for(var k in throughs) | ||
exports[k] = u.Through(throughs[k]) | ||
exports[k] = throughs[k] | ||
for(var k in sinks) | ||
exports[k] = u.Sink(sinks[k]) | ||
exports[k] = sinks[k] | ||
var maybe = require('./maybe')(exports) | ||
for(var k in maybe) | ||
exports[k] = maybe[k] | ||
exports.Duplex = | ||
exports.Through = exports.pipeable = u.Through | ||
exports.Source = exports.pipeableSource = u.Source | ||
exports.Sink = exports.pipeableSink = u.Sink | ||
{ | ||
"name": "pull-stream", | ||
"description": "minimal pull stream", | ||
"version": "2.28.4", | ||
"version": "3.0.0", | ||
"homepage": "https://github.com/dominictarr/pull-stream", | ||
@@ -10,5 +10,2 @@ "repository": { | ||
}, | ||
"dependencies": { | ||
"pull-core": "~1.1.0" | ||
}, | ||
"devDependencies": { | ||
@@ -15,0 +12,0 @@ "tape": "~2.12.3", |
108
README.md
@@ -5,9 +5,10 @@ # pull-stream | ||
In [classic-streams](https://github.com/joyent/node/blob/v0.8/doc/api/stream.markdown), | ||
In [classic-streams](1), | ||
streams _push_ data to the next stream in the pipeline. | ||
In [new-streams](https://github.com/joyent/node/blob/v0.10/doc/api/stream.markdown), | ||
data is pulled out of the source stream, into the destination. | ||
In [new-classic-streams]( | ||
`pull-stream` is a minimal take on streams, | ||
pull streams work great for "object" streams as well as streams of raw text or binary data. | ||
`pull-stream` is a minimal take on pull streams, | ||
optimized for "object" streams, but still supporting text streams. | ||
@@ -40,40 +41,7 @@ ## Quick Example | ||
## Examples | ||
What if implementing a stream was this simple: | ||
### Pipeable Streams | ||
`pull.{Source,Through,Sink}` wraps a function and added a `type` property to signify what type of pull-stream it is. | ||
```js | ||
var pull = require('pull-stream') | ||
var createSourceStream = pull.Source(function () { | ||
return function (end, cb) { | ||
return cb(end, Math.random()) | ||
} | ||
}) | ||
var createThroughStream = pull.Through(function (read) { | ||
return function (end, cb) { | ||
read(end, cb) | ||
} | ||
}) | ||
var createSinkStream = pull.Sink(function (read) { | ||
read(null, function next (end, data) { | ||
if(end) return | ||
console.log(data) | ||
read(null, next) | ||
}) | ||
}) | ||
pull(createSourceStream(), createThroughStream(), createSinkStream()) | ||
``` | ||
### Readable & Reader vs. Readable & Writable | ||
Instead of a readable stream, and a writable stream, there is a `readable` stream, | ||
and a `reader` stream. | ||
(aka "Source") and a `reader` stream (aka "Sink"). Through streams | ||
is a Sink that returns a Source. | ||
@@ -85,20 +53,18 @@ See also: | ||
### Readable | ||
### Source (aka, Readable) | ||
The readable stream is just a `function(end, cb)`, | ||
The readable stream is just a `function read(end, cb)`, | ||
that may be called many times, | ||
and will (asynchronously) `callback(null, data)` once for each call. | ||
and will (asynchronously) `cb(null, data)` once for each call. | ||
The readable stream eventually `callback(err)` if there was an error, or `callback(true)` | ||
if the stream has no more data. In both cases a second argument will be irgnored. That is, the readable stream either provides data _or_ indicates an error or end-of-data condition, never both. | ||
To signify an end state, the stream eventually returns `cb(err)` or `cb(true)`. | ||
When indicating a terminal state, `data` *must* be ignored. | ||
if the user passes in `end = true`, then stop getting data from wherever. | ||
The `read` function *must not* be called until the previous call has called back. | ||
Unless, it is a call to abort the stream (`read(truthy, cb)`). | ||
All [Sources](https://github.com/dominictarr/pull-stream/blob/master/docs/sources.md) | ||
and [Throughs](https://github.com/dominictarr/pull-stream/blob/master/docs/throughs.md) | ||
are readable streams. | ||
```js | ||
//a stream of 100 random numbers. | ||
var i = 100 | ||
var randomReadable = pull.Source(function () { | ||
var random = function () { | ||
return function (end, cb) { | ||
@@ -110,9 +76,10 @@ if(end) return cb(end) | ||
} | ||
}) | ||
} | ||
``` | ||
### Reader (aka, "writable") | ||
### Sink; (aka, Reader, "writable") | ||
A `reader`, is just a function that calls a readable, | ||
until it decideds to stop, or the readable `cb(err || true)` | ||
A sink is just a `reader` function that calls a Source (read function), | ||
until it decideds to stop, or the readable ends. `cb(err || true)` | ||
@@ -124,3 +91,4 @@ All [Throughs](https://github.com/dominictarr/pull-stream/blob/master/docs/throughs.md) | ||
```js | ||
var logger = pull.Sink(function (read) { | ||
//read source and log it. | ||
var logger = function (read) { | ||
read(null, function next(end, data) { | ||
@@ -133,26 +101,32 @@ if(end === true) return | ||
}) | ||
}) | ||
} | ||
``` | ||
These can be connected together by passing the `readable` to the `reader` | ||
Since these are just functions, you can pass them to each other! | ||
```js | ||
logger()(randomReadable()) | ||
var rand = random()) | ||
var log = logger() | ||
log(rand) //"pipe" the streams. | ||
``` | ||
Or, if you prefer to read things left-to-right | ||
but, it's easier to read if you use's pull-stream's `pull` method | ||
```js | ||
pull(randomReadable(), logger()) | ||
var pull = require('pull-stream') | ||
pull(random(), logger()) | ||
``` | ||
### Through / Duplex | ||
### Through | ||
A duplex/through stream is both a `reader` that is also `readable` | ||
A duplex/through stream is just a function that takes a `read` function, | ||
A through stream is a reader on one end and a readable on the other. | ||
It's Sink that returns a Source. | ||
That is, it's just a function that takes a `read` function, | ||
and returns another `read` function. | ||
```js | ||
var map = pull.Through(function (read, map) { | ||
var map = function (read, map) { | ||
//return a readable function! | ||
@@ -164,3 +138,3 @@ return function (end, cb) { | ||
} | ||
}) | ||
} | ||
``` | ||
@@ -189,2 +163,6 @@ | ||
pull detects if it's missing a Source by checking function arity, | ||
if the function takes only one argument it's either a sink or a through. | ||
Otherwise it's a Source. | ||
## Duplex Streams | ||
@@ -191,0 +169,0 @@ |
123
sinks.js
@@ -1,37 +0,59 @@ | ||
var drain = exports.drain = function (read, op, done) { | ||
'use strict' | ||
;(function next() { | ||
var loop = true, cbed = false | ||
while(loop) { | ||
cbed = false | ||
read(null, function (end, data) { | ||
cbed = true | ||
if(end) { | ||
function id (item) { return item } | ||
function prop (key) { | ||
return ( | ||
'string' == typeof key | ||
? function (data) { return data[key] } | ||
: 'object' === typeof key && 'function' === typeof key.exec //regexp | ||
? function (data) { var v = map.exec(data); return v && v[0] } | ||
: key || id | ||
) | ||
} | ||
var drain = exports.drain = function (op, done) { | ||
return function (read) { | ||
//this function is much simpler to write if you | ||
//just use recursion, but by using a while loop | ||
//we do not blow the stack if the stream happens to be sync. | ||
;(function next() { | ||
var loop = true, cbed = false | ||
while(loop) { | ||
cbed = false | ||
read(null, function (end, data) { | ||
cbed = true | ||
if(end) { | ||
loop = false | ||
if(done) done(end === true ? null : end) | ||
else if(end && end !== true) | ||
throw end | ||
} | ||
else if(op && false === op(data)) { | ||
loop = false | ||
read(true, done || function () {}) | ||
} | ||
else if(!loop){ | ||
next() | ||
} | ||
}) | ||
if(!cbed) { | ||
loop = false | ||
if(done) done(end === true ? null : end) | ||
else if(end && end !== true) | ||
throw end | ||
return | ||
} | ||
else if(op && false === op(data)) { | ||
loop = false | ||
read(true, done || function () {}) | ||
} | ||
else if(!loop){ | ||
next() | ||
} | ||
}) | ||
if(!cbed) { | ||
loop = false | ||
return | ||
} | ||
} | ||
})() | ||
})() | ||
} | ||
} | ||
var onEnd = exports.onEnd = function (read, done) { | ||
return drain(read, null, done) | ||
var onEnd = exports.onEnd = function (done) { | ||
return drain(null, done) | ||
} | ||
var log = exports.log = function (read, done) { | ||
return drain(read, function (data) { | ||
var log = exports.log = function (done) { | ||
return drain(function (data) { | ||
console.log(data) | ||
@@ -41,1 +63,46 @@ }, done) | ||
var find = | ||
exports.find = function (test, cb) { | ||
var ended = false | ||
if(!cb) | ||
cb = test, test = id | ||
else | ||
test = prop(test) || id | ||
return drain(function (data) { | ||
if(test(data)) { | ||
ended = true | ||
cb(null, data) | ||
return false | ||
} | ||
}, function (err) { | ||
if(ended) return //already called back | ||
cb(err === true ? null : err, null) | ||
}) | ||
} | ||
var reduce = exports.reduce = function (reduce, acc, cb) { | ||
return drain(function (data) { | ||
acc = reduce(acc, data) | ||
}, function (err) { | ||
cb(err, acc) | ||
}) | ||
} | ||
var collect = exports.collect = | ||
function (cb) { | ||
return reduce(function (arr, item) { | ||
arr.push(item) | ||
return arr | ||
}, [], cb) | ||
} | ||
var concat = exports.concat = | ||
function (cb) { | ||
return reduce(function (a, b) { | ||
return a + b | ||
}, '', cb) | ||
} | ||
@@ -66,27 +66,3 @@ | ||
var defer = exports.defer = function () { | ||
var _read, cbs = [], _end | ||
var read = function (end, cb) { | ||
if(!_read) { | ||
_end = end | ||
cbs.push(cb) | ||
} | ||
else _read(end, cb) | ||
} | ||
read.resolve = function (read) { | ||
if(_read) throw new Error('already resolved') | ||
_read = read | ||
if(!_read) throw new Error('no read cannot resolve!' + _read) | ||
while(cbs.length) | ||
_read(_end, cbs.shift()) | ||
} | ||
read.abort = function(err) { | ||
read.resolve(function (_, cb) { | ||
cb(err || true) | ||
}) | ||
} | ||
return read | ||
} | ||
//a stream that ends immediately. | ||
var empty = exports.empty = function () { | ||
@@ -98,2 +74,3 @@ return function (abort, cb) { | ||
//a stream that errors immediately. | ||
var error = exports.error = function (err) { | ||
@@ -105,67 +82,1 @@ return function (abort, cb) { | ||
var depthFirst = exports.depthFirst = | ||
function (start, createStream) { | ||
var reads = [] | ||
reads.unshift(once(start)) | ||
return function next (end, cb) { | ||
if(!reads.length) | ||
return cb(true) | ||
reads[0](end, function (end, data) { | ||
if(end) { | ||
//if this stream has ended, go to the next queue | ||
reads.shift() | ||
return next(null, cb) | ||
} | ||
reads.unshift(createStream(data)) | ||
cb(end, data) | ||
}) | ||
} | ||
} | ||
//width first is just like depth first, | ||
//but push each new stream onto the end of the queue | ||
var widthFirst = exports.widthFirst = | ||
function (start, createStream) { | ||
var reads = [] | ||
reads.push(once(start)) | ||
return function next (end, cb) { | ||
if(!reads.length) | ||
return cb(true) | ||
reads[0](end, function (end, data) { | ||
if(end) { | ||
reads.shift() | ||
return next(null, cb) | ||
} | ||
reads.push(createStream(data)) | ||
cb(end, data) | ||
}) | ||
} | ||
} | ||
//this came out different to the first (strm) | ||
//attempt at leafFirst, but it's still a valid | ||
//topological sort. | ||
var leafFirst = exports.leafFirst = | ||
function (start, createStream) { | ||
var reads = [] | ||
var output = [] | ||
reads.push(once(start)) | ||
return function next (end, cb) { | ||
reads[0](end, function (end, data) { | ||
if(end) { | ||
reads.shift() | ||
if(!output.length) | ||
return cb(true) | ||
return cb(null, output.shift()) | ||
} | ||
reads.unshift(createStream(data)) | ||
output.unshift(data) | ||
next(null, cb) | ||
}) | ||
} | ||
} | ||
@@ -5,13 +5,2 @@ var pull = require('../') | ||
// pull.count() | ||
// .pipe(pull.take(21)) | ||
// .pipe(pull.asyncMap(function (data, cb) { | ||
// return cb(null, data + 1) | ||
// })) | ||
// .pipe(pull.collect(function (err, ary) { | ||
// console.log(ary) | ||
// t.equal(ary.length, 21) | ||
// t.end() | ||
// })) | ||
pull( | ||
@@ -33,29 +22,2 @@ pull.count(), | ||
require('tape')('para-map', function (t) { | ||
var n = 0, m = 0, w = 6, i = 0 | ||
pull( | ||
pull.count(), | ||
pull.take(21), | ||
pull.paraMap(function (data, cb) { | ||
console.log('>', i++) | ||
n ++ | ||
m = Math.max(m, n) | ||
setTimeout(function () { | ||
n -- | ||
console.log('<') | ||
cb(null, data + 1) | ||
}, Math.random() * 20) | ||
}, w), | ||
pull.collect(function (err, ary) { | ||
console.log(ary) | ||
t.equal(ary.length, 21) | ||
t.equal(m, w) | ||
t.end() | ||
}) | ||
) | ||
}) | ||
@@ -5,7 +5,10 @@ var pull = require('../') | ||
test('collect empty', function (t) { | ||
pull.empty().pipe(pull.collect(function (err, ary) { | ||
t.notOk(err) | ||
t.deepEqual(ary, []) | ||
t.end() | ||
})) | ||
pull( | ||
pull.empty(), | ||
pull.collect(function (err, ary) { | ||
t.notOk(err) | ||
t.deepEqual(ary, []) | ||
t.end() | ||
}) | ||
) | ||
}) |
@@ -9,15 +9,17 @@ var pull = require('../') | ||
var pipeline = | ||
map(function (d) { | ||
//make exciting! | ||
return d + '!' | ||
}) | ||
.pipe(map(function (d) { | ||
//make loud | ||
return d.toUpperCase() | ||
})) | ||
.pipe(map(function (d) { | ||
//add sparkles | ||
return '*** ' + d + ' ***' | ||
})) | ||
var pipeline = | ||
pull( | ||
map(function (d) { | ||
//make exciting! | ||
return d + '!' | ||
}), | ||
map(function (d) { | ||
//make loud | ||
return d.toUpperCase() | ||
}), | ||
map(function (d) { | ||
//add sparkles | ||
return '*** ' + d + ' ***' | ||
}) | ||
) | ||
//the pipe line does not have a source stream. | ||
@@ -33,5 +35,7 @@ //so it should be a reader (function that accepts | ||
var read = | ||
pull.readArray(['billy', 'joe', 'zeke']) | ||
.pipe(pipeline) | ||
var read = | ||
pull( | ||
pull.readArray(['billy', 'joe', 'zeke']), | ||
pipeline | ||
) | ||
@@ -43,4 +47,5 @@ t.equal('function', typeof read) | ||
read | ||
.pipe(pull.writeArray(function (err, array) { | ||
pull( | ||
read, | ||
pull.collect(function (err, array) { | ||
console.log(array) | ||
@@ -52,4 +57,5 @@ t.deepEqual( | ||
t.end() | ||
})) | ||
}) | ||
) | ||
}) |
@@ -8,6 +8,4 @@ | ||
pull.values([1,2,3]), | ||
pull.reduce(function (a, b) {return a + b}, 0), | ||
pull.through(console.log), | ||
pull.collect(function (err, ary) { | ||
t.equal(ary[0], 6) | ||
pull.reduce(function (a, b) {return a + b}, 0, function (err, val) { | ||
t.equal(val, 6) | ||
t.end() | ||
@@ -14,0 +12,0 @@ }) |
@@ -13,3 +13,3 @@ | ||
pull.take(100), | ||
pull.writeArray(function (err, array) { | ||
pull.collect(function (err, array) { | ||
t.equal(array.length, 100) | ||
@@ -34,3 +34,3 @@ array.forEach(function (d) { | ||
pull.take(37), | ||
pull.writeArray(function (err, array) { | ||
pull.collect(function (err, array) { | ||
t.equal(array.length, 37) | ||
@@ -47,11 +47,11 @@ console.log(array) | ||
test('inverse filter with regexp', function (t) { | ||
pull.infinite() | ||
.pipe(pull.map(function (d) { | ||
pull( | ||
pull.infinite(), | ||
pull.map(function (d) { | ||
return Math.round(d * 1000).toString(16) | ||
})) | ||
.pipe(pull.filterNot(/^[^e]+$/i)) //no E | ||
.pipe(pull.take(37)) | ||
.pipe(pull.writeArray(function (err, array) { | ||
}), | ||
pull.filterNot(/^[^e]+$/i), //no E | ||
pull.take(37), | ||
pull.collect(function (err, array) { | ||
t.equal(array.length, 37) | ||
console.log(array) | ||
array.forEach(function (d) { | ||
@@ -61,4 +61,5 @@ t.notEqual(d.indexOf('e'), -1) | ||
t.end() | ||
})) | ||
}) | ||
) | ||
}) | ||
@@ -60,5 +60,3 @@ var pull = require('../') | ||
pull.values([ | ||
pull.Source(function read(abort, cb) { | ||
cb(_err) | ||
}) | ||
pull.error(_err) | ||
], function(err) { | ||
@@ -65,0 +63,0 @@ sosEnded = err; |
@@ -67,3 +67,3 @@ var pull = require('../') | ||
pull.values([1,2,3,4,5,6,7,8,9,10]), | ||
pull.Through(function (read) { | ||
function (read) { | ||
return function (end, cb) { | ||
@@ -74,3 +74,3 @@ if (end !== true) reads++ | ||
} | ||
})(), | ||
}, | ||
pull.take(5), | ||
@@ -77,0 +77,0 @@ pull.collect(function (err, five) { |
421
throughs.js
@@ -1,100 +0,77 @@ | ||
var u = require('pull-core') | ||
var sources = require('./sources') | ||
var sinks = require('./sinks') | ||
'use strict'; | ||
var prop = u.prop | ||
var id = u.id | ||
var tester = u.tester | ||
function id (item) { return item } | ||
var map = exports.map = | ||
function (read, map) { | ||
map = prop(map) || id | ||
return function (abort, cb) { | ||
read(abort, function (end, data) { | ||
try { | ||
data = !end ? map(data) : null | ||
} catch (err) { | ||
return read(err, function () { | ||
return cb(err) | ||
}) | ||
} | ||
cb(end, data) | ||
}) | ||
} | ||
function prop (key) { | ||
return ( | ||
'string' == typeof key | ||
? function (data) { return data[key] } | ||
: 'object' === typeof key && 'function' === typeof key.exec //regexp | ||
? function (data) { var v = map.exec(data); return v && v[0] } | ||
: key | ||
) | ||
} | ||
var asyncMap = exports.asyncMap = | ||
function (read, map) { | ||
if(!map) return read | ||
return function (end, cb) { | ||
if(end) return read(end, cb) //abort | ||
read(null, function (end, data) { | ||
if(end) return cb(end, data) | ||
map(data, cb) | ||
}) | ||
} | ||
function tester (test) { | ||
return ( | ||
'object' === typeof test && 'function' === typeof test.test //regexp | ||
? function (data) { return test.test(data) } | ||
: prop (test) || id | ||
) | ||
} | ||
var paraMap = exports.paraMap = | ||
function (read, map, width) { | ||
if(!map) return read | ||
var ended = false, queue = [], _cb | ||
var sources = require('./sources') | ||
var sinks = require('./sinks') | ||
function drain () { | ||
if(!_cb) return | ||
var cb = _cb | ||
_cb = null | ||
if(queue.length) | ||
return cb(null, queue.shift()) | ||
else if(ended && !n) | ||
return cb(ended) | ||
_cb = cb | ||
var map = exports.map = | ||
function (map) { | ||
if(!map) return id | ||
map = prop(map) | ||
return function (read) { | ||
return function (abort, cb) { | ||
read(abort, function (end, data) { | ||
try { | ||
data = !end ? map(data) : null | ||
} catch (err) { | ||
return read(err, function () { | ||
return cb(err) | ||
}) | ||
} | ||
cb(end, data) | ||
}) | ||
} | ||
} | ||
} | ||
function pull () { | ||
read(null, function (end, data) { | ||
if(end) { | ||
ended = end | ||
return drain() | ||
} | ||
n++ | ||
map(data, function (err, data) { | ||
n-- | ||
queue.push(data) | ||
drain() | ||
var asyncMap = exports.asyncMap = | ||
function (map) { | ||
if(!map) return id //when read is passed, pass it on. | ||
return function (read) { | ||
return function (end, cb) { | ||
if(end) return read(end, cb) //abort | ||
read(null, function (end, data) { | ||
if(end) return cb(end, data) | ||
map(data, cb) | ||
}) | ||
if(n < width && !ended) | ||
pull() | ||
}) | ||
} | ||
} | ||
var n = 0 | ||
return function (end, cb) { | ||
if(end) return read(end, cb) //abort | ||
//continue to read while there are less than 3 maps in flight | ||
_cb = cb | ||
if(queue.length || ended) | ||
pull(), drain() | ||
else pull() | ||
} | ||
return highWaterMark(asyncMap(read, map), width) | ||
} | ||
var filter = exports.filter = | ||
function (read, test) { | ||
function (test) { | ||
//regexp | ||
test = tester(test) | ||
return function next (end, cb) { | ||
var sync, loop = true | ||
while(loop) { | ||
loop = false | ||
sync = true | ||
read(end, function (end, data) { | ||
if(!end && !test(data)) | ||
return sync ? loop = true : next(end, cb) | ||
cb(end, data) | ||
}) | ||
sync = false | ||
return function (read) { | ||
return function next (end, cb) { | ||
var sync, loop = true | ||
while(loop) { | ||
loop = false | ||
sync = true | ||
read(end, function (end, data) { | ||
if(!end && !test(data)) | ||
return sync ? loop = true : next(end, cb) | ||
cb(end, data) | ||
}) | ||
sync = false | ||
} | ||
} | ||
@@ -105,12 +82,12 @@ } | ||
var filterNot = exports.filterNot = | ||
function (read, test) { | ||
function (test) { | ||
test = tester(test) | ||
return filter(read, function (e) { | ||
return !test(e) | ||
}) | ||
return filter(function (data) { return !test(data) }) | ||
} | ||
//a pass through stream that doesn't change the value. | ||
var through = exports.through = | ||
function (read, op, onEnd) { | ||
function (op, onEnd) { | ||
var a = false | ||
function once (abort) { | ||
@@ -122,14 +99,17 @@ if(a || !onEnd) return | ||
return function (end, cb) { | ||
if(end) once(end) | ||
return read(end, function (end, data) { | ||
if(!end) op && op(data) | ||
else once(end) | ||
cb(end, data) | ||
}) | ||
return function (read) { | ||
return function (end, cb) { | ||
if(end) once(end) | ||
return read(end, function (end, data) { | ||
if(!end) op && op(data) | ||
else once(end) | ||
cb(end, data) | ||
}) | ||
} | ||
} | ||
} | ||
//read a number of items and then stop. | ||
var take = exports.take = | ||
function (read, test, opts) { | ||
function (test, opts) { | ||
opts = opts || {} | ||
@@ -145,31 +125,35 @@ var last = opts.last || false // whether the first item for which !test(item) should still pass | ||
function terminate (cb) { | ||
read(true, function (err) { | ||
last = false; cb(err || true) | ||
}) | ||
} | ||
return function (read) { | ||
return function (end, cb) { | ||
if(ended) last ? terminate(cb) : cb(ended) | ||
else if(ended = end) read(ended, cb) | ||
else | ||
read(null, function (end, data) { | ||
if(ended = ended || end) { | ||
//last ? terminate(cb) : | ||
cb(ended) | ||
} | ||
else if(!test(data)) { | ||
ended = true | ||
last ? cb(null, data) : terminate(cb) | ||
} | ||
else | ||
cb(null, data) | ||
function terminate (cb) { | ||
read(true, function (err) { | ||
last = false; cb(err || true) | ||
}) | ||
} | ||
return function (end, cb) { | ||
if(ended) last ? terminate(cb) : cb(ended) | ||
else if(ended = end) read(ended, cb) | ||
else | ||
read(null, function (end, data) { | ||
if(ended = ended || end) { | ||
//last ? terminate(cb) : | ||
cb(ended) | ||
} | ||
else if(!test(data)) { | ||
ended = true | ||
last ? cb(null, data) : terminate(cb) | ||
} | ||
else | ||
cb(null, data) | ||
}) | ||
} | ||
} | ||
} | ||
var unique = exports.unique = function (read, field, invert) { | ||
//drop items you have already seen. | ||
var unique = exports.unique = function (field, invert) { | ||
field = prop(field) || id | ||
var seen = {} | ||
return filter(read, function (data) { | ||
return filter(function (data) { | ||
var key = field(data) | ||
@@ -182,171 +166,48 @@ if(seen[key]) return !!invert //false, by default | ||
var nonUnique = exports.nonUnique = function (read, field) { | ||
return unique(read, field, true) | ||
//passes an item through when you see it for the second time. | ||
var nonUnique = exports.nonUnique = function (field) { | ||
return unique(field, true) | ||
} | ||
var group = exports.group = | ||
function (read, size) { | ||
var ended; size = size || 5 | ||
var queue = [] | ||
//convert a stream of arrays or streams into just a stream. | ||
var flatten = exports.flatten = function () { | ||
return function (read) { | ||
var _read | ||
return function (abort, cb) { | ||
if (abort) { //abort the current stream, and then stream of streams. | ||
_read ? _read(abort, function(err) { | ||
read(err || abort, cb) | ||
}) : read(abort, cb) | ||
} | ||
else if(_read) nextChunk() | ||
else nextStream() | ||
return function (end, cb) { | ||
//this means that the upstream is sending an error. | ||
if(end) return read(ended = end, cb) | ||
//this means that we read an end before. | ||
if(ended) return cb(ended) | ||
read(null, function next(end, data) { | ||
if(ended = ended || end) { | ||
if(!queue.length) | ||
return cb(ended) | ||
var _queue = queue; queue = [] | ||
return cb(null, _queue) | ||
function nextChunk () { | ||
_read(null, function (err, data) { | ||
if (err === true) nextStream() | ||
else if (err) { | ||
read(true, function(abortErr) { | ||
// TODO: what do we do with the abortErr? | ||
cb(err) | ||
}) | ||
} | ||
else cb(null, data) | ||
}) | ||
} | ||
queue.push(data) | ||
if(queue.length < size) | ||
return read(null, next) | ||
var _queue = queue; queue = [] | ||
cb(null, _queue) | ||
}) | ||
} | ||
} | ||
var flatten = exports.flatten = function (read) { | ||
var _read | ||
return function (abort, cb) { | ||
if (abort) { | ||
_read ? _read(abort, function(err) { | ||
read(err || abort, cb) | ||
}) : read(abort, cb) | ||
function nextStream () { | ||
_read = null | ||
read(null, function (end, stream) { | ||
if(end) | ||
return cb(end) | ||
if(Array.isArray(stream) || stream && 'object' === typeof stream) | ||
stream = sources.values(stream) | ||
else if('function' != typeof stream) | ||
throw new Error('expected stream of streams') | ||
_read = stream | ||
nextChunk() | ||
}) | ||
} | ||
} | ||
else if(_read) nextChunk() | ||
else nextStream() | ||
function nextChunk () { | ||
_read(null, function (err, data) { | ||
if (err === true) nextStream() | ||
else if (err) { | ||
read(true, function(abortErr) { | ||
// TODO: what do we do with the abortErr? | ||
cb(err) | ||
}) | ||
} | ||
else cb(null, data) | ||
}) | ||
} | ||
function nextStream () { | ||
read(null, function (end, stream) { | ||
if(end) | ||
return cb(end) | ||
if(Array.isArray(stream) || stream && 'object' === typeof stream) | ||
stream = sources.values(stream) | ||
else if('function' != typeof stream) | ||
throw new Error('expected stream of streams') | ||
_read = stream | ||
nextChunk() | ||
}) | ||
} | ||
} | ||
} | ||
var prepend = | ||
exports.prepend = | ||
function (read, head) { | ||
return function (abort, cb) { | ||
if(head !== null) { | ||
if(abort) | ||
return read(abort, cb) | ||
var _head = head | ||
head = null | ||
cb(null, _head) | ||
} else { | ||
read(abort, cb) | ||
} | ||
} | ||
} | ||
//var drainIf = exports.drainIf = function (op, done) { | ||
// sinks.drain( | ||
//} | ||
var _reduce = exports._reduce = function (read, reduce, initial) { | ||
return function (close, cb) { | ||
if(close) return read(close, cb) | ||
if(ended) return cb(ended) | ||
sinks.drain(function (item) { | ||
initial = reduce(initial, item) | ||
}, function (err, data) { | ||
ended = err || true | ||
if(!err) cb(null, initial) | ||
else cb(ended) | ||
}) | ||
(read) | ||
} | ||
} | ||
var nextTick = process.nextTick | ||
var highWaterMark = exports.highWaterMark = | ||
function (read, highWaterMark) { | ||
var buffer = [], waiting = [], ended, ending, reading = false | ||
highWaterMark = highWaterMark || 10 | ||
function readAhead () { | ||
while(waiting.length && (buffer.length || ended)) | ||
waiting.shift()(ended, ended ? null : buffer.shift()) | ||
if (!buffer.length && ending) ended = ending; | ||
} | ||
function next () { | ||
if(ended || ending || reading || buffer.length >= highWaterMark) | ||
return | ||
reading = true | ||
return read(ended || ending, function (end, data) { | ||
reading = false | ||
ending = ending || end | ||
if(data != null) buffer.push(data) | ||
next(); readAhead() | ||
}) | ||
} | ||
process.nextTick(next) | ||
return function (end, cb) { | ||
ended = ended || end | ||
waiting.push(cb) | ||
next(); readAhead() | ||
} | ||
} | ||
var flatMap = exports.flatMap = | ||
function (read, mapper) { | ||
mapper = mapper || id | ||
var queue = [], ended | ||
return function (abort, cb) { | ||
if(queue.length) return cb(null, queue.shift()) | ||
else if(ended) return cb(ended) | ||
read(abort, function next (end, data) { | ||
if(end) ended = end | ||
else { | ||
var add = mapper(data) | ||
while(add && add.length) | ||
queue.push(add.shift()) | ||
} | ||
if(queue.length) cb(null, queue.shift()) | ||
else if(ended) cb(ended) | ||
else read(null, next) | ||
}) | ||
} | ||
} | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
0
0
49941
31
1078
291
- Removedpull-core@~1.1.0
- Removedpull-core@1.1.0(transitive)