Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

it-to-stream

Package Overview
Dependencies
Maintainers
1
Versions
4
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

it-to-stream - npm Package Compare versions

Comparing version 0.1.0 to 0.1.1

2

package.json
{
"name": "it-to-stream",
"version": "0.1.0",
"version": "0.1.1",
"description": "Convert streaming iterables to Node.js streams",

@@ -5,0 +5,0 @@ "main": "src/index.js",

@@ -66,7 +66,7 @@ const { Readable, Writable, Duplex } = require('stream')

async throw (err) {
stream.emit('error', err)
stream.destroy(err)
return { done: true }
},
async return () {
stream.emit('close')
stream.destroy()
return { done: true }

@@ -73,0 +73,0 @@ }

@@ -1,62 +0,13 @@

const { Transform } = require('stream')
const toDuplex = require('./duplex')
const defer = require('p-defer')
const CHUNK_TRANSFORMED = {}
module.exports = function toTransform (transform) {
let isFirstChunk = true
let nextChunk = defer()
let chunkTransformed = defer()
module.exports = function toTransform (transform, options) {
const { promise, resolve } = defer()
const outputSource = transform({
[Symbol.asyncIterator] () {
return this
},
// When next is called, it means the transform has dealt with a chunk
async next () {
if (isFirstChunk) {
isFirstChunk = false
} else {
chunkTransformed.resolve(CHUNK_TRANSFORMED)
}
const chunk = await nextChunk.promise
nextChunk = defer()
return { value: chunk }
}
})
const source = (async function * () {
const it = await promise
for await (const chunk of it) yield chunk
})()
let nextOutputChunkPromise
return new Transform({
async transform (chunk, enc, cb) {
nextChunk.resolve(chunk)
try {
while (true) {
if (!nextOutputChunkPromise) {
nextOutputChunkPromise = outputSource.next()
}
const res = await Promise.race([
chunkTransformed.promise,
nextOutputChunkPromise
])
if (res === CHUNK_TRANSFORMED) {
chunkTransformed = defer()
break // We completed transforming a chunk
}
nextOutputChunkPromise = null
if (!this.push(res.value)) {
// We pushed a value but we should not push more
// TODO? does this happen in transform streams?
}
}
} catch (err) {
return cb(err)
}
cb()
}
})
return toDuplex({ sink: s => resolve(transform(s)), source }, options)
}
SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc