pull-stream
Advanced tools
Comparing version 0.0.4 to 2.0.0
262
index.js
var readArray = exports.readArray = function (array) { | ||
var i = 0 | ||
return function (reader) { | ||
reader(function (end, cb) { | ||
if(end) | ||
return cb && cb(end) | ||
cb(i >= array.length || null, array[i++]) | ||
}) | ||
} | ||
} | ||
var sources = require('./sources') | ||
var sinks = require('./sinks') | ||
var throughs = require('./throughs') | ||
for(var k in sources) | ||
exports[k] = pipeableSource(sources[k]) | ||
var count = function () { | ||
var i = 0 | ||
return function (reader) { | ||
return reader(function (end, cb) { | ||
if(end) return cb && cb(end) | ||
cb(null, i++) | ||
}) | ||
} | ||
} | ||
for(var k in throughs) | ||
exports[k] = pipeable(throughs[k]) | ||
var destack = function (n) { | ||
var i = 0; n = n || 10, waiting = [], queued = false, ended = false | ||
return function (readable) { | ||
return function (reader) { | ||
return reader(function (end, cb) { | ||
ended = ended || end | ||
if(i ++ < n) { | ||
return readable(end, cb) | ||
} else { | ||
process.nextTick(function () { | ||
i = 0 | ||
readable(end, cb) | ||
}) | ||
} | ||
}) | ||
} | ||
} | ||
} | ||
for(var k in sinks) | ||
exports[k] = pipeableSink(sinks[k]) | ||
var map = function (map) { | ||
map = map || function (e) {return e} | ||
return function (readable) { | ||
return function (reader) { | ||
return reader(function (end, cb) { | ||
readable(end, function (end, data) { | ||
cb(end, map(data)) | ||
}) | ||
}) | ||
} | ||
} | ||
} | ||
exports.pipeableSource = pipeableSource | ||
exports.pipeable = pipeable | ||
exports.pipeableSink = pipeableSink | ||
var drain = exports.drain = function (op) { | ||
return function (readable) { | ||
readable(null, function read (err, data) { | ||
if(err) return | ||
op && op(data) | ||
readable(null, read) | ||
}) | ||
//write-only | ||
function addPipe(read) { | ||
if('function' !== typeof read) | ||
return read | ||
read.pipe = read.pipe || function (reader) { | ||
if('function' != typeof reader) | ||
throw new Error('must pipe to reader') | ||
return addPipe(reader(read)) | ||
} | ||
} | ||
var through = function () { | ||
return function (readable) { | ||
return function (reader) { | ||
return reader(function (end, cb) { | ||
//THIS IS A TEMPLATE. | ||
//PUT WHATEVER YOU LIKE IN HERE. | ||
return readable(end, cb) | ||
}) | ||
} | ||
} | ||
return read | ||
} | ||
function pipeable (_reader) { | ||
function pipeableSource (createRead) { | ||
return function () { | ||
var args = [].slice.call(arguments) | ||
return function (readable) { | ||
args.unshift(readable) | ||
return function (reader) { | ||
return reader(_reader.apply(null, args)) | ||
} | ||
} | ||
return addPipe(createRead.apply(null, args)) | ||
} | ||
} | ||
var dumb = pipeable(function (readable, op) { | ||
return function (end, cb) { | ||
return readable(end, function (end, data) { | ||
op && op(data) | ||
cb(end, data) | ||
}) | ||
} | ||
}) | ||
//var smart = pipeable(dumb) | ||
var take = pipeable(function (readable, test) { | ||
var ended = false | ||
if('number' === typeof test) { | ||
var n = test; test = function () { | ||
return n-- > 0 | ||
function pipeable (createRead) { | ||
return function () { | ||
var args = [].slice.call(arguments) | ||
var piped = [] | ||
function reader (read) { | ||
args.unshift(read) | ||
read = createRead.apply(null, args) | ||
while(piped.length) | ||
read = piped.shift()(read) | ||
return read | ||
//pipeing to from this reader should compose... | ||
} | ||
} | ||
return function (end, cb) { | ||
if(end) { | ||
if(!ended) return ended = end, readable(end, cb) | ||
cb(ended) | ||
reader.pipe = function (read) { | ||
piped.push(read) | ||
return reader | ||
} | ||
return readable(null, function (end, data) { | ||
if(end || !test(data)) return readable(end || true, cb) | ||
return cb(null, data) | ||
}) | ||
return reader | ||
} | ||
}) | ||
var nextTick = process.nextTick | ||
var highWaterMark = pipeable(function (readable, highWaterMark) { | ||
var buffer = [], waiting = [], ended, reading = false | ||
highWaterMark = highWaterMark || 10 | ||
function read () { | ||
while(waiting.length && (buffer.length || ended)) | ||
waiting.shift()(ended, ended ? null : buffer.shift()) | ||
} | ||
function next () { | ||
if(ended || reading || buffer.length >= highWaterMark) | ||
return | ||
reading = true | ||
return readable(ended, function (end, data) { | ||
reading = false | ||
ended = ended || end | ||
if(data != null) buffer.push(data) | ||
next(); read() | ||
}) | ||
} | ||
process.nextTick(next) | ||
return function (end, cb) { | ||
ended = ended || end | ||
waiting.push(cb) | ||
next(); read() | ||
} | ||
}) | ||
function writeArray(cb) { | ||
var array = [] | ||
return function (readable) { | ||
;(function next () { | ||
return readable(null, function (end, data) { | ||
if(end) return cb(end == true ? null : end, array) | ||
array.push(data) | ||
next() | ||
}) | ||
})() | ||
return function () { throw new Error('write-only') } | ||
} | ||
} | ||
var compose = function () { | ||
var streams = [].slice.call(arguments) | ||
return function (readable) { | ||
return function (reader) { | ||
while(streams.length) | ||
readable = streams.shift()(readable) | ||
return reader(readable) | ||
function pipeableSink(createReader) { | ||
return function () { | ||
var args = [].slice.call(arguments) | ||
return function (read) { | ||
args.unshift(read) | ||
return createReader.apply(null, args) | ||
} | ||
@@ -180,54 +69,21 @@ } | ||
var duplex = function (_reader, _readable) { | ||
/* | ||
var destack = function (n) { | ||
var i = 0; n = n || 10, waiting = [], queued = false, ended = false | ||
return function (readable) { | ||
return function (reader) { | ||
_reader(readable); | ||
reader(_readable); | ||
return reader(function (end, cb) { | ||
ended = ended || end | ||
if(i ++ < n) { | ||
return readable(end, cb) | ||
} else { | ||
process.nextTick(function () { | ||
i = 0 | ||
readable(end, cb) | ||
}) | ||
} | ||
}) | ||
} | ||
} | ||
} | ||
/* | ||
var asyncMapSerial = pipeable(function (readable, map) { | ||
var reading = false | ||
return function (end, cb) { | ||
if(reading) throw new Error('one at a time, please!') | ||
reading == true | ||
return readable(end, function (end, data) { | ||
map(data, function (err, data) { | ||
reading = false | ||
}) | ||
}) | ||
} | ||
}) | ||
*/ | ||
if(!module.parent) | ||
count() | ||
(destack ()) | ||
(take(20)) | ||
(highWaterMark(2)) | ||
(compose(map(function (e) { | ||
return e * 1000 | ||
}), | ||
map(function (e) { | ||
return Math.round(e / 3) | ||
})) | ||
) | ||
(writeArray(console.log)) | ||
// (drain(console.log)) | ||
/* (function (readable) { | ||
return readable(null, function next (e, d) { | ||
if (e) return | ||
return readable(e, next) | ||
}) | ||
})*/ | ||
//*/ | ||
// drain (destack (count()), console.log) | ||
{ | ||
"name": "pull-stream", | ||
"description": "", | ||
"version": "0.0.4", | ||
"description": "minimal pull stream", | ||
"version": "2.0.0", | ||
"homepage": "https://github.com/dominictarr/pull-stream", | ||
@@ -11,5 +11,5 @@ "repository": { | ||
"dependencies": { | ||
"tape": "~0.3.0" | ||
}, | ||
"devDependencies": { | ||
}, | ||
"devDependencies": {}, | ||
"scripts": { | ||
@@ -19,3 +19,3 @@ "test": "set -e; for t in test/*.js; do node $t; done" | ||
"author": "Dominic Tarr <dominic.tarr@gmail.com> (http://dominictarr.com)", | ||
"license": "MIT" | ||
"license": "MIT" | ||
} |
153
README.md
@@ -21,5 +21,5 @@ # pull-stream | ||
var createStream = pipeable(function (readable) { | ||
return function read (end, cb) { | ||
readable(end, cb) | ||
var createStream = pipeable(function (read) { | ||
return function (end, cb) { | ||
read(end, cb) | ||
} | ||
@@ -41,7 +41,9 @@ }) | ||
var i = 100 | ||
var randomReadable = function (end, cb) { | ||
if(end) return cb(end) | ||
//only read 100 times | ||
if(i-- < 0) return cb(true) | ||
cb(null, Math.random()) | ||
var randomReadable = function () { | ||
return function (end, cb) { | ||
if(end) return cb(end) | ||
//only read 100 times | ||
if(i-- < 0) return cb(true) | ||
cb(null, Math.random()) | ||
} | ||
} | ||
@@ -54,10 +56,11 @@ ``` | ||
``` js | ||
var logger = function (readable) { | ||
readable(null, function next(end, data) { | ||
if(end === true) return | ||
if(end) throw err | ||
var logger = function (read) { | ||
read(null, function next(end, data) { | ||
if(end === true) return | ||
if(end) throw err | ||
console.log(data) | ||
readable(end, next) | ||
}) | ||
console.log(data) | ||
readable(end, next) | ||
}) | ||
} | ||
} | ||
@@ -69,3 +72,3 @@ ``` | ||
``` js | ||
logger(randomReadable) | ||
logger(randomReadable()) | ||
``` | ||
@@ -81,6 +84,6 @@ | ||
``` js | ||
var map = function (readable, map) { | ||
var map = function (read, map) { | ||
//return a readable function! | ||
return function (end, cb) { | ||
readable(end, function (end, data) { | ||
read(end, function (end, data) { | ||
cb(end, data != null ? map(data) : null) | ||
@@ -98,3 +101,3 @@ }) | ||
logger( | ||
map(randomReadable, function (e) { | ||
map(randomReadable(), function (e) { | ||
return Math.round(e * 1000) | ||
@@ -104,3 +107,4 @@ })) | ||
That is good -- but it's kinda weird, because we are used to left to right syntax | ||
That is good -- but it's kinda weird, | ||
because we are used to left to right syntax | ||
for streams... `ls | grep | wc -l` | ||
@@ -110,46 +114,71 @@ | ||
So, we want to pass in the `readable` and `reader` function! | ||
It needs to be that order, so that it reads left to right. | ||
Every pipeline must go from a `source` to a `sink`. | ||
Data will not start moving until the whole thing is connected. | ||
A basic duplex function would look like this: | ||
``` js | ||
source.pipe(through).pipe(sink) | ||
``` | ||
When setting up pipeability, you must use the right | ||
function, so `pipe` has the right behavior. | ||
Use `pipeable`, `pipeableSource` and `pipeableSink`, | ||
to add pipeability to your pull-streams. | ||
#### Sources | ||
``` js | ||
var i = 100 | ||
var multiply = function (readable) { | ||
return function (reader) { | ||
return reader(function (end, cb) { | ||
//insert your own code in here! | ||
readable(end, function (end, data) { | ||
cb(end, Math.round(data * 1000)) | ||
}) | ||
}) | ||
//infinite stream of random noise | ||
var pull = require('pull-stream') | ||
var infinite = pull.pipeableSource(function () { | ||
return function (end, cb) { | ||
if(end) return cb(end) | ||
cb(null, Math.random()) | ||
} | ||
} | ||
``` | ||
}) | ||
A stream that is only readable is simpler: | ||
``` js | ||
var randomReadable2 = function (reader) { | ||
return reader(function (end, cb) { | ||
cb(end, 'hello!') | ||
}) | ||
} | ||
//create an instace like this | ||
var infStream = infinite() | ||
``` | ||
and a "sink" stream, that can only read, is the same as before! | ||
#### Throughs/Transforms | ||
``` js | ||
var reader = function (readable) { | ||
readable(null, function (end, data) { | ||
if(end === true) return | ||
if(end) throw end | ||
readable(end, data) | ||
}) | ||
} | ||
//map! | ||
var pull = require('pull-stream') | ||
var map = pull.pipeable(function (read, map) { | ||
return function (end, cb) { | ||
read(end, function (end, data) { | ||
if(end) return cb(end) | ||
cb(null, map(data)) | ||
}) | ||
} | ||
}) | ||
//create an instance like this: | ||
var mapStream = map(function (d) { return d * 100 }) | ||
``` | ||
The `reader` stream is the same as before! | ||
### Sinks | ||
``` js | ||
var pull = require('pull-stream') | ||
### Left-to-Right pipe | ||
var log = pull.pipeableSink(function (read, done) { | ||
read(null, function next(end, data) { | ||
if(!end) { | ||
console.log(data) | ||
return setTimeout(function () { | ||
read(null, next) | ||
}, 200) | ||
} | ||
else //callback! | ||
done(end == true ? null : end) | ||
}) | ||
}) | ||
``` | ||
@@ -159,3 +188,5 @@ Now PIPE THEM TOGETHER! | ||
``` js | ||
randomReader2 (multiply) (logger) | ||
infinite() | ||
.pipe(map(function (d) { return d * 100 })) | ||
.pipe(log()) | ||
``` | ||
@@ -165,4 +196,22 @@ | ||
## More Cool Stuff | ||
What if you could do this? | ||
``` js | ||
var trippleThrough = | ||
through1() | ||
.pipe(through2()) | ||
.pipe(through3()) | ||
//THE THREE THROUGHS BECOME ONE | ||
source() | ||
.pipe(trippleThrough) | ||
.pipe(sink()) | ||
//and then pipe it later! | ||
``` | ||
## License | ||
MIT |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Filesystem access
Supply chain riskAccesses the file system, and could potentially read sensitive data.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
No v1
QualityPackage is not semver >=1. This means it is not stable and does not support ^ ranges.
Found 1 instance in 1 package
24542
17
561
0
208
1
1
1
+ Addedtape@~0.3.0
+ Addeddeep-equal@0.0.0(transitive)
+ Addeddefined@0.0.0(transitive)
+ Addedjsonify@0.0.1(transitive)
+ Addedtape@0.3.3(transitive)