pull-stream
Advanced tools
Comparing version 2.17.1 to 2.18.0
95
index.js
@@ -5,94 +5,23 @@ | ||
var throughs = require('./throughs') | ||
var u = require('./util') | ||
for(var k in sources) | ||
exports[k] = Source(sources[k]) | ||
exports[k] = u.Source(sources[k]) | ||
for(var k in throughs) | ||
exports[k] = Through(throughs[k]) | ||
exports[k] = u.Through(throughs[k]) | ||
for(var k in sinks) | ||
exports[k] = Sink(sinks[k]) | ||
exports[k] = u.Sink(sinks[k]) | ||
exports.Duplex = | ||
exports.Through = exports.pipeable = Through | ||
exports.Source = exports.pipeableSource = Source | ||
exports.Sink = exports.pipeableSink = Sink | ||
var maybe = require('./maybe')(exports) | ||
exports.addPipe = addPipe | ||
exports.addReaderPipe | ||
= addReaderPipe | ||
for(var k in maybe) | ||
exports[k] = maybe[k] | ||
function addPipe(read) { | ||
if('function' !== typeof read) | ||
return read | ||
exports.Duplex = | ||
exports.Through = exports.pipeable = u.Through | ||
exports.Source = exports.pipeableSource = u.Source | ||
exports.Sink = exports.pipeableSink = u.Sink | ||
read.pipe = read.pipe || function (reader) { | ||
if('function' != typeof reader) | ||
throw new Error('must pipe to reader') | ||
return addPipe(reader(read)) | ||
} | ||
read.type = 'Source' | ||
return read | ||
} | ||
function Source (createRead) { | ||
function s() { | ||
var args = [].slice.call(arguments) | ||
return addPipe(createRead.apply(null, args)) | ||
} | ||
s.type = 'Source' | ||
return s | ||
} | ||
function addReaderPipe(reader) { | ||
var piped = [] | ||
function _reader (read) { | ||
read = reader(read) | ||
while(piped.length) | ||
read = piped.shift()(read) | ||
return read | ||
//pipeing to from this reader should compose... | ||
} | ||
_reader.pipe = function (read) { | ||
piped.push(read) | ||
return reader | ||
} | ||
return _reader | ||
} | ||
function Through (createRead) { | ||
return function () { | ||
var args = [].slice.call(arguments) | ||
var piped = [] | ||
function reader (read) { | ||
args.unshift(read) | ||
read = createRead.apply(null, args) | ||
while(piped.length) | ||
read = piped.shift()(read) | ||
return read | ||
//pipeing to from this reader should compose... | ||
} | ||
reader.pipe = function (read) { | ||
piped.push(read) | ||
if(read.type === 'Source') | ||
throw new Error('cannot pipe ' + reader.type + ' to Source') | ||
reader.type = read.type === 'Sink' ? 'Sink' : 'Through' | ||
return reader | ||
} | ||
reader.type = 'Through' | ||
return reader | ||
} | ||
} | ||
function Sink(createReader) { | ||
return function () { | ||
var args = [].slice.call(arguments) | ||
function s (read) { | ||
args.unshift(read) | ||
return createReader.apply(null, args) | ||
} | ||
s.type = 'Sink' | ||
return s | ||
} | ||
} | ||
{ | ||
"name": "pull-stream", | ||
"description": "minimal pull stream", | ||
"version": "2.17.1", | ||
"version": "2.18.0", | ||
"homepage": "https://github.com/dominictarr/pull-stream", | ||
@@ -6,0 +6,0 @@ "repository": { |
47
sinks.js
@@ -1,5 +0,1 @@ | ||
var u = require('./util') | ||
var prop = u.prop | ||
var id = u.id | ||
var drain = exports.drain = function (read, op, done) { | ||
@@ -33,41 +29,2 @@ ;(function next() { | ||
var find = | ||
exports.find = function (read, test, cb) { | ||
var ended = false | ||
if(!cb) | ||
cb = test, test = id | ||
else | ||
test = prop(test) || id | ||
drain(read, function (data) { | ||
if(test(data)) { | ||
ended = true | ||
cb(null, data) | ||
return false | ||
} | ||
}, function (err) { | ||
if(ended) return //already called back | ||
cb(err === true ? null : err, null) | ||
}) | ||
} | ||
var reduce = exports.reduce = | ||
function (read, reduce, acc, cb) { | ||
drain(read, function (data) { | ||
acc = reduce(acc, data) | ||
}, function (err) { | ||
cb(err, acc) | ||
}) | ||
} | ||
var collect = exports.collect = exports.writeArray = | ||
function (read, cb) { | ||
return reduce(read, function (arr, item) { | ||
arr.push(item) | ||
return arr | ||
}, [], cb) | ||
} | ||
//if the source callsback sync, then loop | ||
//rather than recurse | ||
var onEnd = exports.onEnd = function (read, done) { | ||
@@ -78,4 +35,6 @@ return drain(read, null, done) | ||
var log = exports.log = function (read, done) { | ||
return drain(read, console.log.bind(console), done) | ||
return drain(read, function (data) { | ||
console.log(data) | ||
}, done) | ||
} | ||
@@ -90,3 +90,2 @@ | ||
reads.unshift(once(start)) | ||
// reads.unshift(createStream(start)) | ||
@@ -93,0 +92,0 @@ return function next (end, cb) { |
@@ -7,5 +7,5 @@ var tape = require('tape') | ||
var sinks = require('../sinks') | ||
var pull = require('../') | ||
var readArray = sources.readArray | ||
var writeArray = sinks.writeArray | ||
@@ -77,6 +77,4 @@ var pipeableSource = pstrm.pipeableSource | ||
var array = [1, 2, 3] | ||
console.log('readArray', readArray) | ||
var read = pipeableSource(readArray)(array) | ||
console.log('reader?', read) | ||
arrayWriter = pipeableSink(writeArray) | ||
arrayWriter = pull.writeArray | ||
@@ -83,0 +81,0 @@ t.equal('function', typeof read) |
@@ -59,2 +59,17 @@ var pull = require('../') | ||
test('maybe(cb) -> Sink', function (t) { | ||
var n = pull.collect(function (){}) | ||
console.log('Sink?', n) | ||
t.equal(n.type, 'Sink') | ||
t.end() | ||
}) | ||
test('maybe(cb) -> Through', function (t) { | ||
console.error('***********') | ||
var n = pull.collect() | ||
console.error('Through?', n) | ||
t.equal(n.type, 'Through') | ||
t.end() | ||
}) | ||
var u = require('./util') | ||
var sources = require('./sources') | ||
var sinks = require('./sinks') | ||
var prop = u.prop | ||
@@ -235,2 +237,22 @@ var id = u.id | ||
//var drainIf = exports.drainIf = function (op, done) { | ||
// sinks.drain( | ||
//} | ||
var _reduce = exports._reduce = function (read, reduce, initial) { | ||
return function (close, cb) { | ||
if(close) return read(close, cb) | ||
if(ended) return cb(ended) | ||
sinks.drain(function (item) { | ||
initial = reduce(initial, item) | ||
}, function (err, data) { | ||
ended = err || true | ||
if(!err) cb(null, initial) | ||
else cb(ended) | ||
}) | ||
(read) | ||
} | ||
} | ||
var nextTick = process.nextTick | ||
@@ -237,0 +259,0 @@ |
92
util.js
@@ -23,2 +23,94 @@ exports.id = | ||
exports.addPipe = addPipe | ||
function addPipe(read) { | ||
if('function' !== typeof read) | ||
return read | ||
read.pipe = read.pipe || function (reader) { | ||
if('function' != typeof reader) | ||
throw new Error('must pipe to reader') | ||
return addPipe(reader(read)) | ||
} | ||
read.type = 'Source' | ||
return read | ||
} | ||
var Source = | ||
exports.Source = | ||
function Source (createRead) { | ||
function s() { | ||
var args = [].slice.call(arguments) | ||
return addPipe(createRead.apply(null, args)) | ||
} | ||
s.type = 'Source' | ||
return s | ||
} | ||
var Through = | ||
exports.Through = | ||
function (createRead) { | ||
return function () { | ||
var args = [].slice.call(arguments) | ||
var piped = [] | ||
function reader (read) { | ||
args.unshift(read) | ||
read = createRead.apply(null, args) | ||
while(piped.length) | ||
read = piped.shift()(read) | ||
return read | ||
//pipeing to from this reader should compose... | ||
} | ||
reader.pipe = function (read) { | ||
piped.push(read) | ||
if(read.type === 'Source') | ||
throw new Error('cannot pipe ' + reader.type + ' to Source') | ||
reader.type = read.type === 'Sink' ? 'Sink' : 'Through' | ||
return reader | ||
} | ||
reader.type = 'Through' | ||
return reader | ||
} | ||
} | ||
var Sink = | ||
exports.Sink = | ||
function Sink(createReader) { | ||
return function () { | ||
var args = [].slice.call(arguments) | ||
if(!createReader) | ||
throw new Error('must be createReader function') | ||
function s (read) { | ||
args.unshift(read) | ||
return createReader.apply(null, args) | ||
} | ||
s.type = 'Sink' | ||
return s | ||
} | ||
} | ||
exports.maybeSink = | ||
exports.maybeDrain = | ||
function (createSink, cb) { | ||
if(!cb) | ||
return Through(function (read) { | ||
var ended | ||
return function (close, cb) { | ||
if(close) return read(close, cb) | ||
if(ended) return cb(ended) | ||
createSink(function (err, data) { | ||
ended = err || true | ||
if(!err) cb(null, data) | ||
else cb(ended) | ||
}) (read) | ||
} | ||
})() | ||
return Sink(function (read) { | ||
return createSink(cb) (read) | ||
})() | ||
} | ||
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
42295
30
1147