Socket
Socket
Sign inDemoInstall

stream-to-it

Package Overview
Dependencies
Maintainers
1
Versions
9
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

stream-to-it - npm Package Compare versions

Comparing version 0.2.0 to 0.2.1

23

package.json
{
"name": "stream-to-it",
"version": "0.2.0",
"version": "0.2.1",
"description": "Convert Node.js streams to streaming iterables",
"main": "index.js",
"files": [
"duplex.js",
"sink.js",
"source.js",
"transform.js"
],
"scripts": {

@@ -21,18 +27,13 @@ "test": "ava test/*.test.js --verbose",

"dependencies": {
"get-iterator": "^1.0.2",
"p-defer": "^3.0.0"
"get-iterator": "^1.0.2"
},
"devDependencies": {
"ava": "^2.2.0",
"ava": "^3.10.1",
"bl": "^4.0.0",
"it-pipe": "^1.0.1",
"nyc": "^14.1.1",
"it-pipe": "^1.1.0",
"nyc": "^15.1.0",
"p-fifo": "^1.0.0",
"standard": "^14.3.1",
"streaming-iterables": "^4.1.0"
"streaming-iterables": "^5.0.2"
},
"ava": {
"babel": false,
"compileEnhancements": false
},
"directories": {

@@ -39,0 +40,0 @@ "test": "test"

@@ -33,3 +33,3 @@ # stream-to-it

Also works with browser [ReadableStream](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream):
Also works with browser [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream):

@@ -84,7 +84,7 @@ ```js

### `toIterable.source(stream): Function`
### `toIterable.source(readable): Function`
Convert a [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_readable_streams) stream to a [source](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#source-it) iterable.
Convert a [`Readable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_readable_streams) stream or a browser [`ReadableStream`](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream) to a [source](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#source-it) iterable.
### `toIterable.sink(sink): Function`
### `toIterable.sink(writable): Function`

@@ -91,0 +91,0 @@ Convert a [`Writable`](https://nodejs.org/dist/latest/docs/api/stream.html#stream_writable_streams) stream to a [sink](https://gist.github.com/alanshaw/591dc7dd54e4f99338a347ef568d6ee9#sink-it) iterable.

const getIterator = require('get-iterator')
const defer = require('p-defer')

@@ -7,107 +6,91 @@ module.exports = writable => async source => {

const errPromise = defer()
const closePromise = defer()
const endingPromise = defer()
const finishPromise = defer()
let drainPromise
const endSource = (source) => {
if (typeof source.return === 'function') source.return()
}
const errorHandler = err => errPromise.reject(err)
const closeHandler = () => closePromise.resolve({ closed: true })
const finishHandler = () => finishPromise.resolve({ finished: true })
const drainHandler = () => {
if (drainPromise) drainPromise.resolve({ drained: true })
let error = null
let errCb = null
const errorHandler = (err) => {
error = err
if (errCb) errCb(err)
// When the writable errors, end the source to exit iteration early
endSource(source)
}
// There's no event to determine the start of a call to .end()
const _end = writable.end.bind(writable)
writable.end = (...args) => {
endingPromise.resolve({ ending: true })
return _end(...args)
let closeCb = null
let closed = false
const closeHandler = () => {
closed = true
if (closeCb) closeCb()
}
writable
.on('error', errorHandler)
.on('close', closeHandler)
.on('finish', finishHandler)
.on('drain', drainHandler)
let finishCb = null
let finished = false
const finishHandler = () => {
finished = true
if (finishCb) finishCb()
}
const getNext = async () => {
try {
return source.next()
} catch (err) {
writable.destroy(err)
return errPromise.promise
}
let drainCb = null
const drainHandler = () => {
if (drainCb) drainCb()
}
try {
while (true) {
// Race the iterator and the error, close and finish listener
const result = await Promise.race([
errPromise.promise,
closePromise.promise,
endingPromise.promise,
finishPromise.promise,
getNext()
])
const waitForDrainOrClose = () => {
return new Promise((resolve, reject) => {
closeCb = drainCb = resolve
errCb = reject
writable.once('drain', drainHandler)
})
}
if (result.closed || result.finished) {
break
}
const waitForDone = () => {
// Immediately end the source
endSource(source)
return new Promise((resolve, reject) => {
if (closed || finished) return resolve()
finishCb = closeCb = resolve
errCb = reject
})
}
// .end() was called, waiting on flush (finish event)
if (result.ending) {
await Promise.race([
errPromise.promise,
// TODO: do we need to wait on close? If slow end and destroy is
// called then what is emitted? close or finish?
closePromise.promise,
finishPromise.promise
])
break
}
const cleanup = () => {
writable.removeListener('error', errorHandler)
writable.removeListener('close', closeHandler)
writable.removeListener('finish', finishHandler)
writable.removeListener('drain', drainHandler)
}
// If destroyed, race err & close to determine reason & then throw/break
if (writable.destroyed) {
await Promise.race([
errPromise.promise,
closePromise.promise
])
break
}
writable.once('error', errorHandler)
writable.once('close', closeHandler)
writable.once('finish', finishHandler)
if (result.done) {
writable.end()
await Promise.race([
errPromise.promise,
// TODO: do we need to wait on close? If slow end and destroy is
// called then what is emitted? close or finish?
closePromise.promise,
finishPromise.promise
])
break
}
try {
for await (const value of source) {
if (!writable.writable || writable.destroyed) break
if (!writable.write(result.value)) {
drainPromise = defer()
await Promise.race([
errPromise.promise,
closePromise.promise,
finishPromise.promise,
drainPromise.promise
])
if (writable.write(value) === false) {
await waitForDrainOrClose()
}
}
} finally {
writable
.removeListener('error', errorHandler)
.removeListener('close', closeHandler)
.removeListener('finish', finishHandler)
.removeListener('drain', drainHandler)
} catch (err) {
// The writable did not error, give it the error
writable.destroy(err)
}
// End the iterator if it is a generator
if (typeof source.return === 'function') {
await source.return()
try {
// Everything is good and we're done writing, end everything
if (!error && writable.writable) {
writable.end()
}
// Wait until we close or finish. This supports halfClosed streams
await waitForDone()
} finally {
// Clean up listeners
cleanup()
}
// Notify the user an error occurred
if (error) throw error
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc