Launch Week Day 5: Introducing Reachability for PHP.Learn More
Socket
Book a DemoSign in
Socket

stream-to-it

Package Overview
Dependencies
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stream-to-it - npm Package Compare versions

Comparing version
0.2.0
to
0.2.1
+12
-11
package.json
{
"name": "stream-to-it",
"version": "0.2.0",
"version": "0.2.1",
"description": "Convert Node.js streams to streaming iterables",
"main": "index.js",
"files": [
"duplex.js",
"sink.js",
"source.js",
"transform.js"
],
"scripts": {

@@ -21,18 +27,13 @@ "test": "ava test/*.test.js --verbose",

"dependencies": {
"get-iterator": "^1.0.2",
"p-defer": "^3.0.0"
"get-iterator": "^1.0.2"
},
"devDependencies": {
"ava": "^2.2.0",
"ava": "^3.10.1",
"bl": "^4.0.0",
"it-pipe": "^1.0.1",
"nyc": "^14.1.1",
"it-pipe": "^1.1.0",
"nyc": "^15.1.0",
"p-fifo": "^1.0.0",
"standard": "^14.3.1",
"streaming-iterables": "^4.1.0"
"streaming-iterables": "^5.0.2"
},
"ava": {
"babel": false,
"compileEnhancements": false
},
"directories": {

@@ -39,0 +40,0 @@ "test": "test"

@@ -33,3 +33,3 @@ # stream-to-it

Also works with browser [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream):
Also works with browser [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream):

@@ -84,7 +84,7 @@ ```js

### `toIterable.source(stream): Function`
### `toIterable.source(readable): Function`
Convert a [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_readable_streams) stream to a [source](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#source-it) iterable.
Convert a [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_readable_streams) stream or a browser [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) to a [source](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#source-it) iterable.
### `toIterable.sink(sink): Function`
### `toIterable.sink(writable): Function`

@@ -91,0 +91,0 @@ Convert a [`Writable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_writable_streams) stream to a [sink](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#sink-it) iterable.

+71
-88
const getIterator = require('get-iterator')
const defer = require('p-defer')

@@ -7,107 +6,91 @@ module.exports = writable => async source => {

const errPromise = defer()
const closePromise = defer()
const endingPromise = defer()
const finishPromise = defer()
let drainPromise
const endSource = (source) => {
if (typeof source.return === 'function') source.return()
}
const errorHandler = err => errPromise.reject(err)
const closeHandler = () => closePromise.resolve({ closed: true })
const finishHandler = () => finishPromise.resolve({ finished: true })
const drainHandler = () => {
if (drainPromise) drainPromise.resolve({ drained: true })
let error = null
let errCb = null
const errorHandler = (err) => {
error = err
if (errCb) errCb(err)
// When the writable errors, end the source to exit iteration early
endSource(source)
}
// There's no event to determine the start of a call to .end()
const _end = writable.end.bind(writable)
writable.end = (...args) => {
endingPromise.resolve({ ending: true })
return _end(...args)
let closeCb = null
let closed = false
const closeHandler = () => {
closed = true
if (closeCb) closeCb()
}
writable
.on('error', errorHandler)
.on('close', closeHandler)
.on('finish', finishHandler)
.on('drain', drainHandler)
let finishCb = null
let finished = false
const finishHandler = () => {
finished = true
if (finishCb) finishCb()
}
const getNext = async () => {
try {
return source.next()
} catch (err) {
writable.destroy(err)
return errPromise.promise
}
let drainCb = null
const drainHandler = () => {
if (drainCb) drainCb()
}
try {
while (true) {
// Race the iterator and the error, close and finish listener
const result = await Promise.race([
errPromise.promise,
closePromise.promise,
endingPromise.promise,
finishPromise.promise,
getNext()
])
const waitForDrainOrClose = () => {
return new Promise((resolve, reject) => {
closeCb = drainCb = resolve
errCb = reject
writable.once('drain', drainHandler)
})
}
if (result.closed || result.finished) {
break
}
const waitForDone = () => {
// Immediately end the source
endSource(source)
return new Promise((resolve, reject) => {
if (closed || finished) return resolve()
finishCb = closeCb = resolve
errCb = reject
})
}
// .end() was called, waiting on flush (finish event)
if (result.ending) {
await Promise.race([
errPromise.promise,
// TODO: do we need to wait on close? If slow end and destroy is
// called then what is emitted? close or finish?
closePromise.promise,
finishPromise.promise
])
break
}
const cleanup = () => {
writable.removeListener('error', errorHandler)
writable.removeListener('close', closeHandler)
writable.removeListener('finish', finishHandler)
writable.removeListener('drain', drainHandler)
}
// If destroyed, race err & close to determine reason & then throw/break
if (writable.destroyed) {
await Promise.race([
errPromise.promise,
closePromise.promise
])
break
}
writable.once('error', errorHandler)
writable.once('close', closeHandler)
writable.once('finish', finishHandler)
if (result.done) {
writable.end()
await Promise.race([
errPromise.promise,
// TODO: do we need to wait on close? If slow end and destroy is
// called then what is emitted? close or finish?
closePromise.promise,
finishPromise.promise
])
break
}
try {
for await (const value of source) {
if (!writable.writable || writable.destroyed) break
if (!writable.write(result.value)) {
drainPromise = defer()
await Promise.race([
errPromise.promise,
closePromise.promise,
finishPromise.promise,
drainPromise.promise
])
if (writable.write(value) === false) {
await waitForDrainOrClose()
}
}
} finally {
writable
.removeListener('error', errorHandler)
.removeListener('close', closeHandler)
.removeListener('finish', finishHandler)
.removeListener('drain', drainHandler)
} catch (err) {
// The writable did not error, give it the error
writable.destroy(err)
}
// End the iterator if it is a generator
if (typeof source.return === 'function') {
await source.return()
try {
// Everything is good and we're done writing, end everything
if (!error && writable.writable) {
writable.end()
}
// Wait until we close or finish. This supports halfClosed streams
await waitForDone()
} finally {
// Clean up listeners
cleanup()
}
// Notify the user an error occurred
if (error) throw error
}
language: node_js
node_js:
- "stable"
const test = require('ava')
const { Duplex } = require('stream')
const pipe = require('it-pipe')
const { collect } = require('streaming-iterables')
const Fifo = require('p-fifo')
const toIterable = require('../')
const { randomInt, randomBytes } = require('./helpers/random')
test('should convert to duplex iterable', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
const fifo = new Fifo()
const output = await pipe(
input,
toIterable.duplex(new Duplex({
objectMode: true,
write (chunk, enc, cb) {
fifo.push(chunk).then(cb)
},
final (cb) {
fifo.push(null).then(cb)
},
async read (size) {
while (true) {
const chunk = await fifo.shift()
if (!this.push(chunk)) break
}
}
})),
collect
)
t.deepEqual(output, input)
})
const Crypto = require('crypto')
// Maximum is exclusive and the minimum is inclusive
const randomInt = (min, max) => {
min = Math.ceil(min)
max = Math.floor(max)
return Math.floor(Math.random() * (max - min)) + min
}
exports.randomInt = randomInt
const randomBytes = (min, max) => Crypto.randomBytes(randomInt(min, max))
exports.randomBytes = randomBytes
const test = require('ava')
const { Writable } = require('stream')
const toIterable = require('../')
const pipe = require('it-pipe')
const { randomInt, randomBytes } = require('./helpers/random')
test('should convert to sink iterable', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
const output = []
await pipe(
input,
toIterable.sink(new Writable({
write (chunk, enc, cb) {
output.push(chunk)
cb()
}
}))
)
t.deepEqual(output, input)
})
test('should convert to sink iterable and call return on end', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
let i = 0
let returnCalled = false
const output = []
await pipe(
{
[Symbol.iterator] () {
return this
},
next () {
const value = input[i++]
return { done: !value, value }
},
return () {
returnCalled = true
}
},
toIterable.sink(new Writable({
write (chunk, enc, cb) {
output.push(chunk)
cb()
}
}))
)
t.is(returnCalled, true)
t.deepEqual(output, input)
})
test('should end mid stream', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
const output = []
await pipe(
input,
toIterable.sink(new Writable({
write (chunk, enc, cb) {
output.push(chunk)
cb()
this.end()
}
}))
)
t.deepEqual(output, input.slice(0, 1))
})
test('should destroy mid stream', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
const output = []
await pipe(
input,
toIterable.sink(new Writable({
write (chunk, enc, cb) {
output.push(chunk)
cb()
this.destroy()
}
}))
)
t.deepEqual(output, input.slice(0, 1))
})
test('should destroy mid stream with error', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
const output = []
const err = await t.throwsAsync(
pipe(
input,
toIterable.sink(new Writable({
write (chunk, enc, cb) {
output.push(chunk)
cb()
this.destroy(new Error('boom'))
}
}))
)
)
t.is(err.message, 'boom')
})
test('should throw mid stream', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
const output = []
const err = await t.throwsAsync(
pipe(
input,
toIterable.sink(new Writable({
write (chunk, enc, cb) {
output.push(chunk)
cb()
throw new Error('boom')
}
}))
)
)
t.is(err.message, 'boom')
})
test('should destroy writable stream if source throws', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
const output = []
const source = {
[Symbol.iterator]: function * () {
yield * input[Symbol.iterator]()
throw new Error('boom')
}
}
const stream = new Writable({
write (chunk, enc, cb) {
output.push(chunk)
cb()
}
})
const err = await t.throwsAsync(pipe(source, toIterable.sink(stream)))
t.is(err.message, 'boom')
t.true(stream.destroyed)
})
const test = require('ava')
const { Readable } = require('stream')
const toIterable = require('../')
const { collect } = require('streaming-iterables')
const { randomInt, randomBytes } = require('./helpers/random')
test('should convert to source iterable', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
let i = 0
const readable = new Readable({
objectMode: true,
read () {
while (true) {
const data = input[i++] || null
if (!this.push(data)) break
}
}
})
const output = await collect(toIterable.source(readable))
t.deepEqual(input, output)
})
test('should convert browser ReadableStream to source iterable', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
let i = 0
// Like a Response object you get from a call to fetch() in the browser
const response = {
body: {
getReader () {
return {
read () {
const value = input[i++]
return value == null ? { done: true } : { value }
},
releaseLock () {}
}
}
}
}
const output = await collect(toIterable.source(response.body))
t.deepEqual(input, output)
})
const test = require('ava')
const { Transform } = require('stream')
const { collect } = require('streaming-iterables')
const pipe = require('it-pipe')
const bl = require('bl')
const toIterable = require('../')
const { randomInt, randomBytes } = require('./helpers/random')
test('should convert to transform iterable', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
const suffix = Buffer.from(`${Date.now()}`)
const output = await pipe(
input,
// Transform every chunk to have a "suffix"
toIterable.transform(new Transform({
transform (chunk, enc, cb) {
cb(null, Buffer.concat([chunk, suffix]))
}
})),
collect
)
t.deepEqual(
bl(input.map(d => Buffer.concat([d, suffix]))).slice(),
bl(output).slice()
)
})
test('should transform single chunk into multiple chunks', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
const separator = Buffer.from(`${Date.now()}`)
const output = await pipe(
input,
// Transform every chunk to have a "suffix"
toIterable.transform(new Transform({
transform (chunk, enc, cb) {
this.push(chunk)
setTimeout(() => {
this.push(separator)
cb()
})
}
})),
collect
)
t.deepEqual(
bl(input.map(d => Buffer.concat([d, separator]))).slice(),
bl(output).slice()
)
})
test('should transform single chunk into no chunks', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
const output = await pipe(
input,
toIterable.transform(new Transform({
transform (chunk, enc, cb) {
cb()
}
})),
collect
)
t.is(output.length, 0)
})
test('should error the iterator when transform stream errors', async t => {
const input = Array.from(Array(randomInt(5, 10)), () => randomBytes(1, 512))
let i = 0
const err = await t.throwsAsync(
pipe(
input,
toIterable.transform(new Transform({
transform (chunk, enc, cb) {
i++
if (i > 2) return cb(new Error('boom'))
cb(null, chunk)
}
})),
collect
)
)
t.is(err.message, 'boom')
})