Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

pull-stream

Package Overview
Dependencies
Maintainers
1
Versions
87
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

pull-stream - npm Package Compare versions

Comparing version 2.17.1 to 2.18.0

maybe.js

95

index.js

@@ -5,94 +5,23 @@

var throughs = require('./throughs')
var u = require('./util')
for(var k in sources)
exports[k] = Source(sources[k])
exports[k] = u.Source(sources[k])
for(var k in throughs)
exports[k] = Through(throughs[k])
exports[k] = u.Through(throughs[k])
for(var k in sinks)
exports[k] = Sink(sinks[k])
exports[k] = u.Sink(sinks[k])
exports.Duplex =
exports.Through = exports.pipeable = Through
exports.Source = exports.pipeableSource = Source
exports.Sink = exports.pipeableSink = Sink
var maybe = require('./maybe')(exports)
exports.addPipe = addPipe
exports.addReaderPipe
= addReaderPipe
for(var k in maybe)
exports[k] = maybe[k]
function addPipe(read) {
if('function' !== typeof read)
return read
exports.Duplex =
exports.Through = exports.pipeable = u.Through
exports.Source = exports.pipeableSource = u.Source
exports.Sink = exports.pipeableSink = u.Sink
read.pipe = read.pipe || function (reader) {
if('function' != typeof reader)
throw new Error('must pipe to reader')
return addPipe(reader(read))
}
read.type = 'Source'
return read
}
function Source (createRead) {
function s() {
var args = [].slice.call(arguments)
return addPipe(createRead.apply(null, args))
}
s.type = 'Source'
return s
}
function addReaderPipe(reader) {
var piped = []
function _reader (read) {
read = reader(read)
while(piped.length)
read = piped.shift()(read)
return read
//pipeing to from this reader should compose...
}
_reader.pipe = function (read) {
piped.push(read)
return reader
}
return _reader
}
function Through (createRead) {
return function () {
var args = [].slice.call(arguments)
var piped = []
function reader (read) {
args.unshift(read)
read = createRead.apply(null, args)
while(piped.length)
read = piped.shift()(read)
return read
//pipeing to from this reader should compose...
}
reader.pipe = function (read) {
piped.push(read)
if(read.type === 'Source')
throw new Error('cannot pipe ' + reader.type + ' to Source')
reader.type = read.type === 'Sink' ? 'Sink' : 'Through'
return reader
}
reader.type = 'Through'
return reader
}
}
function Sink(createReader) {
return function () {
var args = [].slice.call(arguments)
function s (read) {
args.unshift(read)
return createReader.apply(null, args)
}
s.type = 'Sink'
return s
}
}
{
"name": "pull-stream",
"description": "minimal pull stream",
"version": "2.17.1",
"version": "2.18.0",
"homepage": "https://github.com/dominictarr/pull-stream",

@@ -6,0 +6,0 @@ "repository": {

@@ -1,5 +0,1 @@

var u = require('./util')
var prop = u.prop
var id = u.id
var drain = exports.drain = function (read, op, done) {

@@ -33,41 +29,2 @@ ;(function next() {

var find =
exports.find = function (read, test, cb) {
var ended = false
if(!cb)
cb = test, test = id
else
test = prop(test) || id
drain(read, function (data) {
if(test(data)) {
ended = true
cb(null, data)
return false
}
}, function (err) {
if(ended) return //already called back
cb(err === true ? null : err, null)
})
}
var reduce = exports.reduce =
function (read, reduce, acc, cb) {
drain(read, function (data) {
acc = reduce(acc, data)
}, function (err) {
cb(err, acc)
})
}
var collect = exports.collect = exports.writeArray =
function (read, cb) {
return reduce(read, function (arr, item) {
arr.push(item)
return arr
}, [], cb)
}
//if the source callsback sync, then loop
//rather than recurse
var onEnd = exports.onEnd = function (read, done) {

@@ -78,4 +35,6 @@ return drain(read, null, done)

var log = exports.log = function (read, done) {
return drain(read, console.log.bind(console), done)
return drain(read, function (data) {
console.log(data)
}, done)
}

@@ -90,3 +90,2 @@

reads.unshift(once(start))
// reads.unshift(createStream(start))

@@ -93,0 +92,0 @@ return function next (end, cb) {

@@ -7,5 +7,5 @@ var tape = require('tape')

var sinks = require('../sinks')
var pull = require('../')
var readArray = sources.readArray
var writeArray = sinks.writeArray

@@ -77,6 +77,4 @@ var pipeableSource = pstrm.pipeableSource

var array = [1, 2, 3]
console.log('readArray', readArray)
var read = pipeableSource(readArray)(array)
console.log('reader?', read)
arrayWriter = pipeableSink(writeArray)
arrayWriter = pull.writeArray

@@ -83,0 +81,0 @@ t.equal('function', typeof read)

@@ -59,2 +59,17 @@ var pull = require('../')

test('maybe(cb) -> Sink', function (t) {
var n = pull.collect(function (){})
console.log('Sink?', n)
t.equal(n.type, 'Sink')
t.end()
})
test('maybe(cb) -> Through', function (t) {
console.error('***********')
var n = pull.collect()
console.error('Through?', n)
t.equal(n.type, 'Through')
t.end()
})
var u = require('./util')
var sources = require('./sources')
var sinks = require('./sinks')
var prop = u.prop

@@ -235,2 +237,22 @@ var id = u.id

//var drainIf = exports.drainIf = function (op, done) {
// sinks.drain(
//}
var _reduce = exports._reduce = function (read, reduce, initial) {
return function (close, cb) {
if(close) return read(close, cb)
if(ended) return cb(ended)
sinks.drain(function (item) {
initial = reduce(initial, item)
}, function (err, data) {
ended = err || true
if(!err) cb(null, initial)
else cb(ended)
})
(read)
}
}
var nextTick = process.nextTick

@@ -237,0 +259,0 @@

@@ -23,2 +23,94 @@ exports.id =

exports.addPipe = addPipe
function addPipe(read) {
if('function' !== typeof read)
return read
read.pipe = read.pipe || function (reader) {
if('function' != typeof reader)
throw new Error('must pipe to reader')
return addPipe(reader(read))
}
read.type = 'Source'
return read
}
var Source =
exports.Source =
function Source (createRead) {
function s() {
var args = [].slice.call(arguments)
return addPipe(createRead.apply(null, args))
}
s.type = 'Source'
return s
}
var Through =
exports.Through =
function (createRead) {
return function () {
var args = [].slice.call(arguments)
var piped = []
function reader (read) {
args.unshift(read)
read = createRead.apply(null, args)
while(piped.length)
read = piped.shift()(read)
return read
//pipeing to from this reader should compose...
}
reader.pipe = function (read) {
piped.push(read)
if(read.type === 'Source')
throw new Error('cannot pipe ' + reader.type + ' to Source')
reader.type = read.type === 'Sink' ? 'Sink' : 'Through'
return reader
}
reader.type = 'Through'
return reader
}
}
var Sink =
exports.Sink =
function Sink(createReader) {
return function () {
var args = [].slice.call(arguments)
if(!createReader)
throw new Error('must be createReader function')
function s (read) {
args.unshift(read)
return createReader.apply(null, args)
}
s.type = 'Sink'
return s
}
}
exports.maybeSink =
exports.maybeDrain =
function (createSink, cb) {
if(!cb)
return Through(function (read) {
var ended
return function (close, cb) {
if(close) return read(close, cb)
if(ended) return cb(ended)
createSink(function (err, data) {
ended = err || true
if(!err) cb(null, data)
else cb(ended)
}) (read)
}
})()
return Sink(function (read) {
return createSink(cb) (read)
})()
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc