Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pull-stream

Package Overview
Dependencies
Maintainers
1
Versions
87
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pull-stream - npm Package Compare versions

Comparing version 0.0.4 to 2.0.0

docs/glossary.md

262

index.js
var readArray = exports.readArray = function (array) {
var i = 0
return function (reader) {
reader(function (end, cb) {
if(end)
return cb && cb(end)
cb(i >= array.length || null, array[i++])
})
}
}
var sources = require('./sources')
var sinks = require('./sinks')
var throughs = require('./throughs')
for(var k in sources)
exports[k] = pipeableSource(sources[k])
var count = function () {
var i = 0
return function (reader) {
return reader(function (end, cb) {
if(end) return cb && cb(end)
cb(null, i++)
})
}
}
for(var k in throughs)
exports[k] = pipeable(throughs[k])
var destack = function (n) {
var i = 0; n = n || 10, waiting = [], queued = false, ended = false
return function (readable) {
return function (reader) {
return reader(function (end, cb) {
ended = ended || end
if(i ++ < n) {
return readable(end, cb)
} else {
process.nextTick(function () {
i = 0
readable(end, cb)
})
}
})
}
}
}
for(var k in sinks)
exports[k] = pipeableSink(sinks[k])
var map = function (map) {
map = map || function (e) {return e}
return function (readable) {
return function (reader) {
return reader(function (end, cb) {
readable(end, function (end, data) {
cb(end, map(data))
})
})
}
}
}
exports.pipeableSource = pipeableSource
exports.pipeable = pipeable
exports.pipeableSink = pipeableSink
var drain = exports.drain = function (op) {
return function (readable) {
readable(null, function read (err, data) {
if(err) return
op && op(data)
readable(null, read)
})
//write-only
function addPipe(read) {
if('function' !== typeof read)
return read
read.pipe = read.pipe || function (reader) {
if('function' != typeof reader)
throw new Error('must pipe to reader')
return addPipe(reader(read))
}
}
var through = function () {
return function (readable) {
return function (reader) {
return reader(function (end, cb) {
//THIS IS A TEMPLATE.
//PUT WHATEVER YOU LIKE IN HERE.
return readable(end, cb)
})
}
}
return read
}
function pipeable (_reader) {
function pipeableSource (createRead) {
return function () {
var args = [].slice.call(arguments)
return function (readable) {
args.unshift(readable)
return function (reader) {
return reader(_reader.apply(null, args))
}
}
return addPipe(createRead.apply(null, args))
}
}
var dumb = pipeable(function (readable, op) {
return function (end, cb) {
return readable(end, function (end, data) {
op && op(data)
cb(end, data)
})
}
})
//var smart = pipeable(dumb)
var take = pipeable(function (readable, test) {
var ended = false
if('number' === typeof test) {
var n = test; test = function () {
return n-- > 0
function pipeable (createRead) {
return function () {
var args = [].slice.call(arguments)
var piped = []
function reader (read) {
args.unshift(read)
read = createRead.apply(null, args)
while(piped.length)
read = piped.shift()(read)
return read
//pipeing to from this reader should compose...
}
}
return function (end, cb) {
if(end) {
if(!ended) return ended = end, readable(end, cb)
cb(ended)
reader.pipe = function (read) {
piped.push(read)
return reader
}
return readable(null, function (end, data) {
if(end || !test(data)) return readable(end || true, cb)
return cb(null, data)
})
return reader
}
})
var nextTick = process.nextTick
var highWaterMark = pipeable(function (readable, highWaterMark) {
var buffer = [], waiting = [], ended, reading = false
highWaterMark = highWaterMark || 10
function read () {
while(waiting.length && (buffer.length || ended))
waiting.shift()(ended, ended ? null : buffer.shift())
}
function next () {
if(ended || reading || buffer.length >= highWaterMark)
return
reading = true
return readable(ended, function (end, data) {
reading = false
ended = ended || end
if(data != null) buffer.push(data)
next(); read()
})
}
process.nextTick(next)
return function (end, cb) {
ended = ended || end
waiting.push(cb)
next(); read()
}
})
function writeArray(cb) {
var array = []
return function (readable) {
;(function next () {
return readable(null, function (end, data) {
if(end) return cb(end == true ? null : end, array)
array.push(data)
next()
})
})()
return function () { throw new Error('write-only') }
}
}
var compose = function () {
var streams = [].slice.call(arguments)
return function (readable) {
return function (reader) {
while(streams.length)
readable = streams.shift()(readable)
return reader(readable)
function pipeableSink(createReader) {
return function () {
var args = [].slice.call(arguments)
return function (read) {
args.unshift(read)
return createReader.apply(null, args)
}

@@ -180,54 +69,21 @@ }

var duplex = function (_reader, _readable) {
/*
var destack = function (n) {
var i = 0; n = n || 10, waiting = [], queued = false, ended = false
return function (readable) {
return function (reader) {
_reader(readable);
reader(_readable);
return reader(function (end, cb) {
ended = ended || end
if(i ++ < n) {
return readable(end, cb)
} else {
process.nextTick(function () {
i = 0
readable(end, cb)
})
}
})
}
}
}
/*
var asyncMapSerial = pipeable(function (readable, map) {
var reading = false
return function (end, cb) {
if(reading) throw new Error('one at a time, please!')
reading == true
return readable(end, function (end, data) {
map(data, function (err, data) {
reading = false
})
})
}
})
*/
if(!module.parent)
count()
(destack ())
(take(20))
(highWaterMark(2))
(compose(map(function (e) {
return e * 1000
}),
map(function (e) {
return Math.round(e / 3)
}))
)
(writeArray(console.log))
// (drain(console.log))
/* (function (readable) {
return readable(null, function next (e, d) {
if (e) return
return readable(e, next)
})
})*/
//*/
// drain (destack (count()), console.log)
{
"name": "pull-stream",
"description": "",
"version": "0.0.4",
"description": "minimal pull stream",
"version": "2.0.0",
"homepage": "https://github.com/dominictarr/pull-stream",

@@ -11,5 +11,5 @@ "repository": {

"dependencies": {
"tape": "~0.3.0"
},
"devDependencies": {
},
"devDependencies": {},
"scripts": {

@@ -19,3 +19,3 @@ "test": "set -e; for t in test/*.js; do node $t; done"

"author": "Dominic Tarr <dominic.tarr@gmail.com> (http://dominictarr.com)",
"license": "MIT"
"license": "MIT"
}

@@ -21,5 +21,5 @@ # pull-stream

var createStream = pipeable(function (readable) {
return function read (end, cb) {
readable(end, cb)
var createStream = pipeable(function (read) {
return function (end, cb) {
read(end, cb)
}

@@ -41,7 +41,9 @@ })

var i = 100
var randomReadable = function (end, cb) {
if(end) return cb(end)
//only read 100 times
if(i-- < 0) return cb(true)
cb(null, Math.random())
var randomReadable = function () {
return function (end, cb) {
if(end) return cb(end)
//only read 100 times
if(i-- < 0) return cb(true)
cb(null, Math.random())
}
}

@@ -54,10 +56,11 @@ ```

``` js
var logger = function (readable) {
readable(null, function next(end, data) {
if(end === true) return
if(end) throw err
var logger = function (read) {
read(null, function next(end, data) {
if(end === true) return
if(end) throw err
console.log(data)
readable(end, next)
})
console.log(data)
readable(end, next)
})
}
}

@@ -69,3 +72,3 @@ ```

``` js
logger(randomReadable)
logger(randomReadable())
```

@@ -81,6 +84,6 @@

``` js
var map = function (readable, map) {
var map = function (read, map) {
//return a readable function!
return function (end, cb) {
readable(end, function (end, data) {
read(end, function (end, data) {
cb(end, data != null ? map(data) : null)

@@ -98,3 +101,3 @@ })

logger(
map(randomReadable, function (e) {
map(randomReadable(), function (e) {
return Math.round(e * 1000)

@@ -104,3 +107,4 @@ }))

That is good -- but it's kinda weird, because we are used to left to right syntax
That is good -- but it's kinda weird,
because we are used to left to right syntax
for streams... `ls | grep | wc -l`

@@ -110,46 +114,71 @@

So, we want to pass in the `readable` and `reader` function!
It needs to be that order, so that it reads left to right.
Every pipeline must go from a `source` to a `sink`.
Data will not start moving until the whole thing is connected.
A basic duplex function would look like this:
``` js
source.pipe(through).pipe(sink)
```
When setting up pipeability, you must use the right
function, so `pipe` has the right behavior.
Use `pipeable`, `pipeableSource` and `pipeableSink`,
to add pipeability to your pull-streams.
#### Sources
``` js
var i = 100
var multiply = function (readable) {
return function (reader) {
return reader(function (end, cb) {
//insert your own code in here!
readable(end, function (end, data) {
cb(end, Math.round(data * 1000))
})
})
//infinite stream of random noise
var pull = require('pull-stream')
var infinite = pull.pipeableSource(function () {
return function (end, cb) {
if(end) return cb(end)
cb(null, Math.random())
}
}
```
})
A stream that is only readable is simpler:
``` js
var randomReadable2 = function (reader) {
return reader(function (end, cb) {
cb(end, 'hello!')
})
}
//create an instace like this
var infStream = infinite()
```
and a "sink" stream, that can only read, is the same as before!
#### Throughs/Transforms
``` js
var reader = function (readable) {
readable(null, function (end, data) {
if(end === true) return
if(end) throw end
readable(end, data)
})
}
//map!
var pull = require('pull-stream')
var map = pull.pipeable(function (read, map) {
return function (end, cb) {
read(end, function (end, data) {
if(end) return cb(end)
cb(null, map(data))
})
}
})
//create an instance like this:
var mapStream = map(function (d) { return d * 100 })
```
The `reader` stream is the same as before!
### Sinks
``` js
var pull = require('pull-stream')
### Left-to-Right pipe
var log = pull.pipeableSink(function (read, done) {
read(null, function next(end, data) {
if(!end) {
console.log(data)
return setTimeout(function () {
read(null, next)
}, 200)
}
else //callback!
done(end == true ? null : end)
})
})
```

@@ -159,3 +188,5 @@ Now PIPE THEM TOGETHER!

``` js
randomReader2 (multiply) (logger)
infinite()
.pipe(map(function (d) { return d * 100 }))
.pipe(log())
```

@@ -165,4 +196,22 @@

## More Cool Stuff
What if you could do this?
``` js
var trippleThrough =
through1()
.pipe(through2())
.pipe(through3())
//THE THREE THROUGHS BECOME ONE
source()
.pipe(trippleThrough)
.pipe(sink())
//and then pipe it later!
```
## License
MIT
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc