it-to-stream
Advanced tools
Comparing version 0.1.0 to 0.1.1
{ | ||
"name": "it-to-stream", | ||
"version": "0.1.0", | ||
"version": "0.1.1", | ||
"description": "Convert streaming iterables to Node.js streams", | ||
@@ -5,0 +5,0 @@ "main": "src/index.js", |
@@ -66,7 +66,7 @@ const { Readable, Writable, Duplex } = require('stream') | ||
async throw (err) { | ||
stream.emit('error', err) | ||
stream.destroy(err) | ||
return { done: true } | ||
}, | ||
async return () { | ||
stream.emit('close') | ||
stream.destroy() | ||
return { done: true } | ||
@@ -73,0 +73,0 @@ } |
@@ -1,62 +0,13 @@ | ||
const { Transform } = require('stream') | ||
const toDuplex = require('./duplex') | ||
const defer = require('p-defer') | ||
const CHUNK_TRANSFORMED = {} | ||
module.exports = function toTransform (transform) { | ||
let isFirstChunk = true | ||
let nextChunk = defer() | ||
let chunkTransformed = defer() | ||
module.exports = function toTransform (transform, options) { | ||
const { promise, resolve } = defer() | ||
const outputSource = transform({ | ||
[Symbol.asyncIterator] () { | ||
return this | ||
}, | ||
// When next is called, it means the transform has dealt with a chunk | ||
async next () { | ||
if (isFirstChunk) { | ||
isFirstChunk = false | ||
} else { | ||
chunkTransformed.resolve(CHUNK_TRANSFORMED) | ||
} | ||
const chunk = await nextChunk.promise | ||
nextChunk = defer() | ||
return { value: chunk } | ||
} | ||
}) | ||
const source = (async function * () { | ||
const it = await promise | ||
for await (const chunk of it) yield chunk | ||
})() | ||
let nextOutputChunkPromise | ||
return new Transform({ | ||
async transform (chunk, enc, cb) { | ||
nextChunk.resolve(chunk) | ||
try { | ||
while (true) { | ||
if (!nextOutputChunkPromise) { | ||
nextOutputChunkPromise = outputSource.next() | ||
} | ||
const res = await Promise.race([ | ||
chunkTransformed.promise, | ||
nextOutputChunkPromise | ||
]) | ||
if (res === CHUNK_TRANSFORMED) { | ||
chunkTransformed = defer() | ||
break // We completed transforming a chunk | ||
} | ||
nextOutputChunkPromise = null | ||
if (!this.push(res.value)) { | ||
// We pushed a value but we should not push more | ||
// TODO? does this happen in transform streams? | ||
} | ||
} | ||
} catch (err) { | ||
return cb(err) | ||
} | ||
cb() | ||
} | ||
}) | ||
return toDuplex({ sink: s => resolve(transform(s)), source }, options) | ||
} |
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
16336
372