async-readable
Advanced tools
Comparing version 0.2.0-1 to 0.2.0-2
/// <reference types="node" /> | ||
/// <reference lib="es2018.asynciterable" /> | ||
import { Readable } from 'stream'; | ||
export declare type ReadableStream = Pick<NodeJS.ReadableStream, 'on' | 'off' | 'read'>; | ||
export declare type ReadableStream = Pick<NodeJS.ReadableStream, 'on' | 'off' | 'once' | 'read'>; | ||
export declare type AsyncReadable = ReturnType<typeof asyncReadable>; | ||
export declare type Read = AsyncReadable['read']; | ||
export declare type Off = AsyncReadable['off']; | ||
export declare type Gen<T> = (readable: AsyncReadable) => AsyncIterableIterator<T>; | ||
export declare function toAsyncIterable<T>(gen: Gen<T>): (source: Pick<NodeJS.ReadableStream, "on" | "off" | "read">) => AsyncIterableIterator<T>; | ||
export declare function toReadableStream<T>(gen: Gen<T>): (source: Pick<NodeJS.ReadableStream, "on" | "off" | "read">) => Readable; | ||
export declare function reader<T>(source: AsyncIterable<T>, destroy?: () => void): (this: Readable) => Promise<void>; | ||
export declare type Gen<T> = (readable: AsyncReadable) => AsyncIterable<T>; | ||
export declare function toAsyncIterable<T>(gen: Gen<T>): (source: Pick<NodeJS.ReadableStream, "on" | "off" | "once" | "read">) => AsyncIterable<T>; | ||
export declare function toReadableStream<T>(gen: Gen<T>): (source: Pick<NodeJS.ReadableStream, "on" | "off" | "once" | "read">) => Readable; | ||
export declare function asyncReadable<T extends Buffer>(stream: ReadableStream): Readonly<{ | ||
read: (size: number) => Promise<T>; | ||
read: (size: number) => Promise<unknown>; | ||
off: () => void; | ||
}>; |
import { Readable } from 'stream'; | ||
import { reader, rejection } from './utils'; | ||
export function toAsyncIterable(gen) { | ||
@@ -15,29 +16,2 @@ return function (source) { | ||
} | ||
export function reader(source, destroy = () => { }) { | ||
const iterator = source[Symbol.asyncIterator](); | ||
let reading = false; | ||
return async function () { | ||
if (reading) { | ||
return; | ||
} | ||
reading = true; | ||
try { | ||
while (true) { | ||
const { value, done } = await iterator.next(); | ||
if (done) { | ||
break; | ||
} | ||
if (this.push(value) === false) { | ||
reading = false; | ||
return; | ||
} | ||
} | ||
this.push(null); | ||
} | ||
catch (error) { | ||
destroy(); | ||
this.destroy(error); | ||
} | ||
}; | ||
} | ||
export function asyncReadable(stream) { | ||
@@ -47,2 +21,3 @@ let next = 0; | ||
const iterator = gen(); | ||
const [error, reject] = rejection(); | ||
return Object.freeze({ | ||
@@ -54,3 +29,6 @@ read, | ||
iterator.next(); | ||
return iterator.next(size).then(({ value }) => value); | ||
return Promise.race([ | ||
iterator.next(size).then(({ value }) => value), | ||
error, | ||
]); | ||
} | ||
@@ -62,2 +40,3 @@ function off() { | ||
stream.on('readable', onReadable); | ||
stream.once('error', reject); | ||
while (true) { | ||
@@ -64,0 +43,0 @@ next = yield; |
export * from './async-readable'; | ||
export { reader } from './utils'; |
39
index.js
@@ -7,15 +7,2 @@ 'use strict'; | ||
function toAsyncIterable(gen) { | ||
return function (source) { | ||
return gen(asyncReadable(source)); | ||
}; | ||
} | ||
function toReadableStream(gen) { | ||
return function (source) { | ||
return new stream.Readable({ | ||
objectMode: true, | ||
read: reader(toAsyncIterable(gen)(source)), | ||
}); | ||
}; | ||
} | ||
function reader(source, destroy = () => { }) { | ||
@@ -48,2 +35,21 @@ const iterator = source[Symbol.asyncIterator](); | ||
} | ||
function rejection() { | ||
let reject = (error) => { }; | ||
const error = new Promise((_res, rej) => reject = rej); | ||
return [error, reject]; | ||
} | ||
function toAsyncIterable(gen) { | ||
return function (source) { | ||
return gen(asyncReadable(source)); | ||
}; | ||
} | ||
function toReadableStream(gen) { | ||
return function (source) { | ||
return new stream.Readable({ | ||
objectMode: true, | ||
read: reader(toAsyncIterable(gen)(source)), | ||
}); | ||
}; | ||
} | ||
function asyncReadable(stream) { | ||
@@ -53,2 +59,3 @@ let next = 0; | ||
const iterator = gen(); | ||
const [error, reject] = rejection(); | ||
return Object.freeze({ | ||
@@ -60,3 +67,6 @@ read, | ||
iterator.next(); | ||
return iterator.next(size).then(({ value }) => value); | ||
return Promise.race([ | ||
iterator.next(size).then(({ value }) => value), | ||
error, | ||
]); | ||
} | ||
@@ -68,2 +78,3 @@ function off() { | ||
stream.on('readable', onReadable); | ||
stream.once('error', reject); | ||
while (true) { | ||
@@ -70,0 +81,0 @@ next = yield; |
{ | ||
"name": "async-readable", | ||
"version": "0.2.0-1", | ||
"version": "0.2.0-2", | ||
"description": "Async read for NodeJS.ReadableStream", | ||
@@ -5,0 +5,0 @@ "main": "index.js", |
Sorry, the diff of this file is not supported yet
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
License Policy Violation
LicenseThis package is not allowed per your license policy. Review the package's license to ensure compliance.
Found 1 instance in 1 package
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
10808
10
292
0