Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pull-stream

Package Overview
Dependencies
Maintainers
1
Versions
87
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pull-stream - npm Package Compare versions

Comparing version 2.28.4 to 3.0.0

examples.md

6

docs/sinks.md

@@ -5,3 +5,3 @@ # Sinks

You *must* have a sink at the end of a pipeline
for data to move through.
for data to move towards.

@@ -11,5 +11,3 @@ You can only use _one_ sink per pipeline.

``` js
source()
.pipe(through()) //optional
.pipe(sink())
pull(source, through, sink)
```

@@ -16,0 +14,0 @@

@@ -10,5 +10,3 @@ # Sources

``` js
source()
.pipe(through()) //optional
.pipe(sink())
pull(source, through, sink)
```

@@ -41,23 +39,2 @@

## defer
create a false source-stream that will be attached to a
real source-stream later. Use when you must do an async
operation before you can create the stream.
``` js
function ls (dir) {
var ds = pull.defer()
fs.readdir(dir, function (err, ls) {
if(err) return ds.abort(err)
return ds.resolve(readArray(ls)
.pipe(pull.map(function (file) {
return path.resolve(dir, file)
})
})
return ds
}
```
## empty

@@ -74,34 +51,1 @@

## pushable
Create a false source stream with a `.push(data, cb?)`
property. Use when you really need a push api,
or need to adapt pull-stream to some other push api.
``` js
function ls (dir) {
var ps = pull.pushable()
fs.readdir(dir, function (err, ls) {
if(err) return ps.end(err)
ls.forEach(function (file) {
ps.push(path.resolve(dir, file))
})
ps.end()
})
return ps
}
```
## depthFirst, widthFirst, leafFirst (start, createStream)
Traverse a tree structure. `start` is a value that represents
a node. `createStream` is a function that returns
a pull-stream of the children of a node.
`start` must be the same type output by `createStream`.
``` js
//passing in the `ls` function from the `defer` example.
pull.widthFirst(process.cwd(), ls)
.pipe(pull.log())
```

@@ -12,5 +12,3 @@ # Throughs

```js
source()
.pipe(through()) //optional
.pipe(sink())
pull(source, through, sink)
```

@@ -79,9 +77,2 @@

## group (length)
Group incoming data into arrays of max length `length`,
(the last item may be shorter than `length`)
Useful for data you can handle in batches.
## flatten ()

@@ -91,9 +82,1 @@

## highWaterMark (n)
An async buffering stream.
`highWaterMark` will eagerly read from the source stream,
while there are less than `n` chunks in the buffer.
var sources = require('./sources')
var sinks = require('./sinks')
var throughs = require('./throughs')
var u = require('pull-core')
function isFunction (fun) {
return 'function' === typeof fun
}
exports = module.exports = require('./pull')
function isReader (fun) {
return fun && (fun.type === "Through" || fun.length === 1)
}
var exports = module.exports = function pull () {
var args = [].slice.call(arguments)
if(isReader(args[0]))
return function (read) {
args.unshift(read)
return pull.apply(null, args)
}
var read = args.shift()
//if the first function is a duplex stream,
//pipe from the source.
if(isFunction(read.source))
read = read.source
function next () {
var s = args.shift()
if(null == s)
return next()
if(isFunction(s)) return s
return function (read) {
s.sink(read)
//this supports pipeing through a duplex stream
//pull(a, b, a) "telephone style".
//if this stream is in the a (first & last position)
//s.source will have already been used, but this should never be called
//so that is okay.
return s.source
}
}
while(args.length)
read = next() (read)
return read
}
for(var k in sources)
exports[k] = u.Source(sources[k])
exports[k] = sources[k]
for(var k in throughs)
exports[k] = u.Through(throughs[k])
exports[k] = throughs[k]
for(var k in sinks)
exports[k] = u.Sink(sinks[k])
exports[k] = sinks[k]
var maybe = require('./maybe')(exports)
for(var k in maybe)
exports[k] = maybe[k]
exports.Duplex =
exports.Through = exports.pipeable = u.Through
exports.Source = exports.pipeableSource = u.Source
exports.Sink = exports.pipeableSink = u.Sink
{
"name": "pull-stream",
"description": "minimal pull stream",
"version": "2.28.4",
"version": "3.0.0",
"homepage": "https://github.com/dominictarr/pull-stream",

@@ -10,5 +10,2 @@ "repository": {

},
"dependencies": {
"pull-core": "~1.1.0"
},
"devDependencies": {

@@ -15,0 +12,0 @@ "tape": "~2.12.3",

@@ -5,9 +5,10 @@ # pull-stream

In [classic-streams](https://github.com/joyent/node/blob/v0.8/doc/api/stream.markdown),
In [classic-streams](1),
streams _push_ data to the next stream in the pipeline.
In [new-streams](https://github.com/joyent/node/blob/v0.10/doc/api/stream.markdown),
data is pulled out of the source stream, into the destination.
In [new-classic-streams](
`pull-stream` is a minimal take on streams,
pull streams work great for "object" streams as well as streams of raw text or binary data.
`pull-stream` is a minimal take on pull streams,
optimized for "object" streams, but still supporting text streams.

@@ -40,40 +41,7 @@ ## Quick Example

## Examples
What if implementing a stream was this simple:
### Pipeable Streams
`pull.{Source,Through,Sink}` wraps a function and added a `type` property to signify what type of pull-stream it is.
```js
var pull = require('pull-stream')
var createSourceStream = pull.Source(function () {
return function (end, cb) {
return cb(end, Math.random())
}
})
var createThroughStream = pull.Through(function (read) {
return function (end, cb) {
read(end, cb)
}
})
var createSinkStream = pull.Sink(function (read) {
read(null, function next (end, data) {
if(end) return
console.log(data)
read(null, next)
})
})
pull(createSourceStream(), createThroughStream(), createSinkStream())
```
### Readable & Reader vs. Readable & Writable
Instead of a readable stream, and a writable stream, there is a `readable` stream,
and a `reader` stream.
(aka "Source") and a `reader` stream (aka "Sink"). Through streams
is a Sink that returns a Source.

@@ -85,20 +53,18 @@ See also:

### Readable
### Source (aka, Readable)
The readable stream is just a `function(end, cb)`,
The readable stream is just a `function read(end, cb)`,
that may be called many times,
and will (asynchronously) `callback(null, data)` once for each call.
and will (asynchronously) `cb(null, data)` once for each call.
The readable stream eventually `callback(err)` if there was an error, or `callback(true)`
if the stream has no more data. In both cases a second argument will be irgnored. That is, the readable stream either provides data _or_ indicates an error or end-of-data condition, never both.
To signify an end state, the stream eventually returns `cb(err)` or `cb(true)`.
When indicating a terminal state, `data` *must* be ignored.
if the user passes in `end = true`, then stop getting data from wherever.
The `read` function *must not* be called until the previous call has called back.
Unless, it is a call to abort the stream (`read(truthy, cb)`).
All [Sources](https://github.com/dominictarr/pull-stream/blob/master/docs/sources.md)
and [Throughs](https://github.com/dominictarr/pull-stream/blob/master/docs/throughs.md)
are readable streams.
```js
//a stream of 100 random numbers.
var i = 100
var randomReadable = pull.Source(function () {
var random = function () {
return function (end, cb) {

@@ -110,9 +76,10 @@ if(end) return cb(end)

}
})
}
```
### Reader (aka, "writable")
### Sink; (aka, Reader, "writable")
A `reader`, is just a function that calls a readable,
until it decideds to stop, or the readable `cb(err || true)`
A sink is just a `reader` function that calls a Source (read function),
until it decideds to stop, or the readable ends. `cb(err || true)`

@@ -124,3 +91,4 @@ All [Throughs](https://github.com/dominictarr/pull-stream/blob/master/docs/throughs.md)

```js
var logger = pull.Sink(function (read) {
//read source and log it.
var logger = function (read) {
read(null, function next(end, data) {

@@ -133,26 +101,32 @@ if(end === true) return

})
})
}
```
These can be connected together by passing the `readable` to the `reader`
Since these are just functions, you can pass them to each other!
```js
logger()(randomReadable())
var rand = random())
var log = logger()
log(rand) //"pipe" the streams.
```
Or, if you prefer to read things left-to-right
but, it's easier to read if you use's pull-stream's `pull` method
```js
pull(randomReadable(), logger())
var pull = require('pull-stream')
pull(random(), logger())
```
### Through / Duplex
### Through
A duplex/through stream is both a `reader` that is also `readable`
A duplex/through stream is just a function that takes a `read` function,
A through stream is a reader on one end and a readable on the other.
It's Sink that returns a Source.
That is, it's just a function that takes a `read` function,
and returns another `read` function.
```js
var map = pull.Through(function (read, map) {
var map = function (read, map) {
//return a readable function!

@@ -164,3 +138,3 @@ return function (end, cb) {

}
})
}
```

@@ -189,2 +163,6 @@

pull detects if it's missing a Source by checking function arity,
if the function takes only one argument it's either a sink or a through.
Otherwise it's a Source.
## Duplex Streams

@@ -191,0 +169,0 @@

@@ -1,37 +0,59 @@

var drain = exports.drain = function (read, op, done) {
'use strict'
;(function next() {
var loop = true, cbed = false
while(loop) {
cbed = false
read(null, function (end, data) {
cbed = true
if(end) {
function id (item) { return item }
function prop (key) {
return (
'string' == typeof key
? function (data) { return data[key] }
: 'object' === typeof key && 'function' === typeof key.exec //regexp
? function (data) { var v = map.exec(data); return v && v[0] }
: key || id
)
}
var drain = exports.drain = function (op, done) {
return function (read) {
//this function is much simpler to write if you
//just use recursion, but by using a while loop
//we do not blow the stack if the stream happens to be sync.
;(function next() {
var loop = true, cbed = false
while(loop) {
cbed = false
read(null, function (end, data) {
cbed = true
if(end) {
loop = false
if(done) done(end === true ? null : end)
else if(end && end !== true)
throw end
}
else if(op && false === op(data)) {
loop = false
read(true, done || function () {})
}
else if(!loop){
next()
}
})
if(!cbed) {
loop = false
if(done) done(end === true ? null : end)
else if(end && end !== true)
throw end
return
}
else if(op && false === op(data)) {
loop = false
read(true, done || function () {})
}
else if(!loop){
next()
}
})
if(!cbed) {
loop = false
return
}
}
})()
})()
}
}
var onEnd = exports.onEnd = function (read, done) {
return drain(read, null, done)
var onEnd = exports.onEnd = function (done) {
return drain(null, done)
}
var log = exports.log = function (read, done) {
return drain(read, function (data) {
var log = exports.log = function (done) {
return drain(function (data) {
console.log(data)

@@ -41,1 +63,46 @@ }, done)

var find =
exports.find = function (test, cb) {
var ended = false
if(!cb)
cb = test, test = id
else
test = prop(test) || id
return drain(function (data) {
if(test(data)) {
ended = true
cb(null, data)
return false
}
}, function (err) {
if(ended) return //already called back
cb(err === true ? null : err, null)
})
}
var reduce = exports.reduce = function (reduce, acc, cb) {
return drain(function (data) {
acc = reduce(acc, data)
}, function (err) {
cb(err, acc)
})
}
var collect = exports.collect =
function (cb) {
return reduce(function (arr, item) {
arr.push(item)
return arr
}, [], cb)
}
var concat = exports.concat =
function (cb) {
return reduce(function (a, b) {
return a + b
}, '', cb)
}

@@ -66,27 +66,3 @@

var defer = exports.defer = function () {
var _read, cbs = [], _end
var read = function (end, cb) {
if(!_read) {
_end = end
cbs.push(cb)
}
else _read(end, cb)
}
read.resolve = function (read) {
if(_read) throw new Error('already resolved')
_read = read
if(!_read) throw new Error('no read cannot resolve!' + _read)
while(cbs.length)
_read(_end, cbs.shift())
}
read.abort = function(err) {
read.resolve(function (_, cb) {
cb(err || true)
})
}
return read
}
//a stream that ends immediately.
var empty = exports.empty = function () {

@@ -98,2 +74,3 @@ return function (abort, cb) {

//a stream that errors immediately.
var error = exports.error = function (err) {

@@ -105,67 +82,1 @@ return function (abort, cb) {

var depthFirst = exports.depthFirst =
function (start, createStream) {
var reads = []
reads.unshift(once(start))
return function next (end, cb) {
if(!reads.length)
return cb(true)
reads[0](end, function (end, data) {
if(end) {
//if this stream has ended, go to the next queue
reads.shift()
return next(null, cb)
}
reads.unshift(createStream(data))
cb(end, data)
})
}
}
//width first is just like depth first,
//but push each new stream onto the end of the queue
var widthFirst = exports.widthFirst =
function (start, createStream) {
var reads = []
reads.push(once(start))
return function next (end, cb) {
if(!reads.length)
return cb(true)
reads[0](end, function (end, data) {
if(end) {
reads.shift()
return next(null, cb)
}
reads.push(createStream(data))
cb(end, data)
})
}
}
//this came out different to the first (strm)
//attempt at leafFirst, but it's still a valid
//topological sort.
var leafFirst = exports.leafFirst =
function (start, createStream) {
var reads = []
var output = []
reads.push(once(start))
return function next (end, cb) {
reads[0](end, function (end, data) {
if(end) {
reads.shift()
if(!output.length)
return cb(true)
return cb(null, output.shift())
}
reads.unshift(createStream(data))
output.unshift(data)
next(null, cb)
})
}
}

@@ -5,13 +5,2 @@ var pull = require('../')

// pull.count()
// .pipe(pull.take(21))
// .pipe(pull.asyncMap(function (data, cb) {
// return cb(null, data + 1)
// }))
// .pipe(pull.collect(function (err, ary) {
// console.log(ary)
// t.equal(ary.length, 21)
// t.end()
// }))
pull(

@@ -33,29 +22,2 @@ pull.count(),

require('tape')('para-map', function (t) {
var n = 0, m = 0, w = 6, i = 0
pull(
pull.count(),
pull.take(21),
pull.paraMap(function (data, cb) {
console.log('>', i++)
n ++
m = Math.max(m, n)
setTimeout(function () {
n --
console.log('<')
cb(null, data + 1)
}, Math.random() * 20)
}, w),
pull.collect(function (err, ary) {
console.log(ary)
t.equal(ary.length, 21)
t.equal(m, w)
t.end()
})
)
})

@@ -5,7 +5,10 @@ var pull = require('../')

test('collect empty', function (t) {
pull.empty().pipe(pull.collect(function (err, ary) {
t.notOk(err)
t.deepEqual(ary, [])
t.end()
}))
pull(
pull.empty(),
pull.collect(function (err, ary) {
t.notOk(err)
t.deepEqual(ary, [])
t.end()
})
)
})

@@ -9,15 +9,17 @@ var pull = require('../')

var pipeline =
map(function (d) {
//make exciting!
return d + '!'
})
.pipe(map(function (d) {
//make loud
return d.toUpperCase()
}))
.pipe(map(function (d) {
//add sparkles
return '*** ' + d + ' ***'
}))
var pipeline =
pull(
map(function (d) {
//make exciting!
return d + '!'
}),
map(function (d) {
//make loud
return d.toUpperCase()
}),
map(function (d) {
//add sparkles
return '*** ' + d + ' ***'
})
)
//the pipe line does not have a source stream.

@@ -33,5 +35,7 @@ //so it should be a reader (function that accepts

var read =
pull.readArray(['billy', 'joe', 'zeke'])
.pipe(pipeline)
var read =
pull(
pull.readArray(['billy', 'joe', 'zeke']),
pipeline
)

@@ -43,4 +47,5 @@ t.equal('function', typeof read)

read
.pipe(pull.writeArray(function (err, array) {
pull(
read,
pull.collect(function (err, array) {
console.log(array)

@@ -52,4 +57,5 @@ t.deepEqual(

t.end()
}))
})
)
})

@@ -8,6 +8,4 @@

pull.values([1,2,3]),
pull.reduce(function (a, b) {return a + b}, 0),
pull.through(console.log),
pull.collect(function (err, ary) {
t.equal(ary[0], 6)
pull.reduce(function (a, b) {return a + b}, 0, function (err, val) {
t.equal(val, 6)
t.end()

@@ -14,0 +12,0 @@ })

@@ -13,3 +13,3 @@

pull.take(100),
pull.writeArray(function (err, array) {
pull.collect(function (err, array) {
t.equal(array.length, 100)

@@ -34,3 +34,3 @@ array.forEach(function (d) {

pull.take(37),
pull.writeArray(function (err, array) {
pull.collect(function (err, array) {
t.equal(array.length, 37)

@@ -47,11 +47,11 @@ console.log(array)

test('inverse filter with regexp', function (t) {
pull.infinite()
.pipe(pull.map(function (d) {
pull(
pull.infinite(),
pull.map(function (d) {
return Math.round(d * 1000).toString(16)
}))
.pipe(pull.filterNot(/^[^e]+$/i)) //no E
.pipe(pull.take(37))
.pipe(pull.writeArray(function (err, array) {
}),
pull.filterNot(/^[^e]+$/i), //no E
pull.take(37),
pull.collect(function (err, array) {
t.equal(array.length, 37)
console.log(array)
array.forEach(function (d) {

@@ -61,4 +61,5 @@ t.notEqual(d.indexOf('e'), -1)

t.end()
}))
})
)
})

@@ -60,5 +60,3 @@ var pull = require('../')

pull.values([
pull.Source(function read(abort, cb) {
cb(_err)
})
pull.error(_err)
], function(err) {

@@ -65,0 +63,0 @@ sosEnded = err;

@@ -67,3 +67,3 @@ var pull = require('../')

pull.values([1,2,3,4,5,6,7,8,9,10]),
pull.Through(function (read) {
function (read) {
return function (end, cb) {

@@ -74,3 +74,3 @@ if (end !== true) reads++

}
})(),
},
pull.take(5),

@@ -77,0 +77,0 @@ pull.collect(function (err, five) {

@@ -1,100 +0,77 @@

var u = require('pull-core')
var sources = require('./sources')
var sinks = require('./sinks')
'use strict';
var prop = u.prop
var id = u.id
var tester = u.tester
function id (item) { return item }
var map = exports.map =
function (read, map) {
map = prop(map) || id
return function (abort, cb) {
read(abort, function (end, data) {
try {
data = !end ? map(data) : null
} catch (err) {
return read(err, function () {
return cb(err)
})
}
cb(end, data)
})
}
function prop (key) {
return (
'string' == typeof key
? function (data) { return data[key] }
: 'object' === typeof key && 'function' === typeof key.exec //regexp
? function (data) { var v = map.exec(data); return v && v[0] }
: key
)
}
var asyncMap = exports.asyncMap =
function (read, map) {
if(!map) return read
return function (end, cb) {
if(end) return read(end, cb) //abort
read(null, function (end, data) {
if(end) return cb(end, data)
map(data, cb)
})
}
function tester (test) {
return (
'object' === typeof test && 'function' === typeof test.test //regexp
? function (data) { return test.test(data) }
: prop (test) || id
)
}
var paraMap = exports.paraMap =
function (read, map, width) {
if(!map) return read
var ended = false, queue = [], _cb
var sources = require('./sources')
var sinks = require('./sinks')
function drain () {
if(!_cb) return
var cb = _cb
_cb = null
if(queue.length)
return cb(null, queue.shift())
else if(ended && !n)
return cb(ended)
_cb = cb
var map = exports.map =
function (map) {
if(!map) return id
map = prop(map)
return function (read) {
return function (abort, cb) {
read(abort, function (end, data) {
try {
data = !end ? map(data) : null
} catch (err) {
return read(err, function () {
return cb(err)
})
}
cb(end, data)
})
}
}
}
function pull () {
read(null, function (end, data) {
if(end) {
ended = end
return drain()
}
n++
map(data, function (err, data) {
n--
queue.push(data)
drain()
var asyncMap = exports.asyncMap =
function (map) {
if(!map) return id //when read is passed, pass it on.
return function (read) {
return function (end, cb) {
if(end) return read(end, cb) //abort
read(null, function (end, data) {
if(end) return cb(end, data)
map(data, cb)
})
if(n < width && !ended)
pull()
})
}
}
var n = 0
return function (end, cb) {
if(end) return read(end, cb) //abort
//continue to read while there are less than 3 maps in flight
_cb = cb
if(queue.length || ended)
pull(), drain()
else pull()
}
return highWaterMark(asyncMap(read, map), width)
}
var filter = exports.filter =
function (read, test) {
function (test) {
//regexp
test = tester(test)
return function next (end, cb) {
var sync, loop = true
while(loop) {
loop = false
sync = true
read(end, function (end, data) {
if(!end && !test(data))
return sync ? loop = true : next(end, cb)
cb(end, data)
})
sync = false
return function (read) {
return function next (end, cb) {
var sync, loop = true
while(loop) {
loop = false
sync = true
read(end, function (end, data) {
if(!end && !test(data))
return sync ? loop = true : next(end, cb)
cb(end, data)
})
sync = false
}
}

@@ -105,12 +82,12 @@ }

var filterNot = exports.filterNot =
function (read, test) {
function (test) {
test = tester(test)
return filter(read, function (e) {
return !test(e)
})
return filter(function (data) { return !test(data) })
}
//a pass through stream that doesn't change the value.
var through = exports.through =
function (read, op, onEnd) {
function (op, onEnd) {
var a = false
function once (abort) {

@@ -122,14 +99,17 @@ if(a || !onEnd) return

return function (end, cb) {
if(end) once(end)
return read(end, function (end, data) {
if(!end) op && op(data)
else once(end)
cb(end, data)
})
return function (read) {
return function (end, cb) {
if(end) once(end)
return read(end, function (end, data) {
if(!end) op && op(data)
else once(end)
cb(end, data)
})
}
}
}
//read a number of items and then stop.
var take = exports.take =
function (read, test, opts) {
function (test, opts) {
opts = opts || {}

@@ -145,31 +125,35 @@ var last = opts.last || false // whether the first item for which !test(item) should still pass

function terminate (cb) {
read(true, function (err) {
last = false; cb(err || true)
})
}
return function (read) {
return function (end, cb) {
if(ended) last ? terminate(cb) : cb(ended)
else if(ended = end) read(ended, cb)
else
read(null, function (end, data) {
if(ended = ended || end) {
//last ? terminate(cb) :
cb(ended)
}
else if(!test(data)) {
ended = true
last ? cb(null, data) : terminate(cb)
}
else
cb(null, data)
function terminate (cb) {
read(true, function (err) {
last = false; cb(err || true)
})
}
return function (end, cb) {
if(ended) last ? terminate(cb) : cb(ended)
else if(ended = end) read(ended, cb)
else
read(null, function (end, data) {
if(ended = ended || end) {
//last ? terminate(cb) :
cb(ended)
}
else if(!test(data)) {
ended = true
last ? cb(null, data) : terminate(cb)
}
else
cb(null, data)
})
}
}
}
var unique = exports.unique = function (read, field, invert) {
//drop items you have already seen.
var unique = exports.unique = function (field, invert) {
field = prop(field) || id
var seen = {}
return filter(read, function (data) {
return filter(function (data) {
var key = field(data)

@@ -182,171 +166,48 @@ if(seen[key]) return !!invert //false, by default

var nonUnique = exports.nonUnique = function (read, field) {
return unique(read, field, true)
//passes an item through when you see it for the second time.
var nonUnique = exports.nonUnique = function (field) {
return unique(field, true)
}
var group = exports.group =
function (read, size) {
var ended; size = size || 5
var queue = []
//convert a stream of arrays or streams into just a stream.
var flatten = exports.flatten = function () {
return function (read) {
var _read
return function (abort, cb) {
if (abort) { //abort the current stream, and then stream of streams.
_read ? _read(abort, function(err) {
read(err || abort, cb)
}) : read(abort, cb)
}
else if(_read) nextChunk()
else nextStream()
return function (end, cb) {
//this means that the upstream is sending an error.
if(end) return read(ended = end, cb)
//this means that we read an end before.
if(ended) return cb(ended)
read(null, function next(end, data) {
if(ended = ended || end) {
if(!queue.length)
return cb(ended)
var _queue = queue; queue = []
return cb(null, _queue)
function nextChunk () {
_read(null, function (err, data) {
if (err === true) nextStream()
else if (err) {
read(true, function(abortErr) {
// TODO: what do we do with the abortErr?
cb(err)
})
}
else cb(null, data)
})
}
queue.push(data)
if(queue.length < size)
return read(null, next)
var _queue = queue; queue = []
cb(null, _queue)
})
}
}
var flatten = exports.flatten = function (read) {
var _read
return function (abort, cb) {
if (abort) {
_read ? _read(abort, function(err) {
read(err || abort, cb)
}) : read(abort, cb)
function nextStream () {
_read = null
read(null, function (end, stream) {
if(end)
return cb(end)
if(Array.isArray(stream) || stream && 'object' === typeof stream)
stream = sources.values(stream)
else if('function' != typeof stream)
throw new Error('expected stream of streams')
_read = stream
nextChunk()
})
}
}
else if(_read) nextChunk()
else nextStream()
function nextChunk () {
_read(null, function (err, data) {
if (err === true) nextStream()
else if (err) {
read(true, function(abortErr) {
// TODO: what do we do with the abortErr?
cb(err)
})
}
else cb(null, data)
})
}
function nextStream () {
read(null, function (end, stream) {
if(end)
return cb(end)
if(Array.isArray(stream) || stream && 'object' === typeof stream)
stream = sources.values(stream)
else if('function' != typeof stream)
throw new Error('expected stream of streams')
_read = stream
nextChunk()
})
}
}
}
var prepend =
exports.prepend =
function (read, head) {
return function (abort, cb) {
if(head !== null) {
if(abort)
return read(abort, cb)
var _head = head
head = null
cb(null, _head)
} else {
read(abort, cb)
}
}
}
//var drainIf = exports.drainIf = function (op, done) {
// sinks.drain(
//}
var _reduce = exports._reduce = function (read, reduce, initial) {
return function (close, cb) {
if(close) return read(close, cb)
if(ended) return cb(ended)
sinks.drain(function (item) {
initial = reduce(initial, item)
}, function (err, data) {
ended = err || true
if(!err) cb(null, initial)
else cb(ended)
})
(read)
}
}
var nextTick = process.nextTick
var highWaterMark = exports.highWaterMark =
function (read, highWaterMark) {
var buffer = [], waiting = [], ended, ending, reading = false
highWaterMark = highWaterMark || 10
function readAhead () {
while(waiting.length && (buffer.length || ended))
waiting.shift()(ended, ended ? null : buffer.shift())
if (!buffer.length && ending) ended = ending;
}
function next () {
if(ended || ending || reading || buffer.length >= highWaterMark)
return
reading = true
return read(ended || ending, function (end, data) {
reading = false
ending = ending || end
if(data != null) buffer.push(data)
next(); readAhead()
})
}
process.nextTick(next)
return function (end, cb) {
ended = ended || end
waiting.push(cb)
next(); readAhead()
}
}
var flatMap = exports.flatMap =
function (read, mapper) {
mapper = mapper || id
var queue = [], ended
return function (abort, cb) {
if(queue.length) return cb(null, queue.shift())
else if(ended) return cb(ended)
read(abort, function next (end, data) {
if(end) ended = end
else {
var add = mapper(data)
while(add && add.length)
queue.push(add.shift())
}
if(queue.length) cb(null, queue.shift())
else if(ended) cb(ended)
else read(null, next)
})
}
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc