@yume-chan/stream-extra
Advanced tools
Comparing version 0.0.0-next-20240917062356 to 0.0.0-next-20241129144018
# Change Log - @yume-chan/stream-extra | ||
## 0.0.0-next-20240917062356 | ||
## 0.0.0-next-20241129144018 | ||
### Major Changes | ||
- 53688d3: Use PNPM workspace and Changesets to manage the monorepo. | ||
- 53688d3: Use PNPM workspace and Changesets to manage the monorepo. | ||
@@ -13,4 +13,7 @@ Because Changesets doesn't support alpha versions (`0.x.x`), this version is `1.0.0`. Future versions will follow SemVer rules, for example, breaking API changes will introduce a new major version. | ||
- Updated dependencies [53688d3] | ||
- @yume-chan/struct@0.0.0-next-20240917062356 | ||
- ea5002b: Polyfill `ReadableStream.from` and `ReadableStream.prototype.values` | ||
- Updated dependencies [53688d3] | ||
- Updated dependencies [db8466f] | ||
- Updated dependencies [db8466f] | ||
- @yume-chan/struct@0.0.0-next-20241129144018 | ||
@@ -25,5 +28,5 @@ This log was last generated on Tue, 18 Jun 2024 02:49:43 GMT and should not be manually modified. | ||
- Add `MaybeConsumable` type. It's also a namespace containing related types. | ||
- Re-export global `TextDecoderStream` to replace `DecodeUtf8Stream` which doesn't work correctly in stream mode | ||
- Move `Consumable` related types to the `Consumable` namespace. In future, more types will be moved to namespaces. | ||
- Add `MaybeConsumable` type. It's also a namespace containing related types. | ||
- Re-export global `TextDecoderStream` to replace `DecodeUtf8Stream` which doesn't work correctly in stream mode | ||
- Move `Consumable` related types to the `Consumable` namespace. In future, more types will be moved to namespaces. | ||
@@ -36,5 +39,5 @@ ## 0.0.23 | ||
- Fix `ConsumableWritableStream.write` calls `chunk.consume` twice. (doesn't cause any issue) | ||
- Fix `WrapWritableStream` might close the inner stream twice. (and throw an error) | ||
- Remove web-streams-polyfill dependency. The runtime must have global stream implementations (or you can add a polyfill yourself). | ||
- Fix `ConsumableWritableStream.write` calls `chunk.consume` twice. (doesn't cause any issue) | ||
- Fix `WrapWritableStream` might close the inner stream twice. (and throw an error) | ||
- Remove web-streams-polyfill dependency. The runtime must have global stream implementations (or you can add a polyfill yourself). | ||
@@ -53,3 +56,3 @@ ## 0.0.22 | ||
- Replace `GatherStringStream` with `ConcatStringStream` which can be treated as a Promise | ||
- Replace `GatherStringStream` with `ConcatStringStream` which can be treated as a Promise | ||
@@ -62,4 +65,4 @@ ## 0.0.20 | ||
- Fix a bug where `BufferedReadableStream#release` might output duplicate data. | ||
- Use ECMAScript private class fields syntax (supported by Chrome 74, Firefox 90, Safari 14.1 and Node.js 12.0.0). | ||
- Fix a bug where `BufferedReadableStream#release` might output duplicate data. | ||
- Use ECMAScript private class fields syntax (supported by Chrome 74, Firefox 90, Safari 14.1 and Node.js 12.0.0). | ||
@@ -72,3 +75,3 @@ ## 0.0.19 | ||
- Add an option to combine small chunks into target size in `ChunkStream`, and rename it to `DistributionStream` | ||
- Add an option to combine small chunks into target size in `ChunkStream`, and rename it to `DistributionStream` | ||
@@ -81,3 +84,3 @@ ## 0.0.18 | ||
- Change to load native Web Streams API implementation from `globalThis` if available | ||
- Change to load native Web Streams API implementation from `globalThis` if available | ||
@@ -84,0 +87,0 @@ ## 0.0.17 |
@@ -1,2 +0,2 @@ | ||
import type { ValueOrPromise } from "@yume-chan/struct"; | ||
import type { MaybePromiseLike } from "@yume-chan/async"; | ||
import { BufferedReadableStream } from "./buffered.js"; | ||
@@ -8,5 +8,5 @@ import type { ReadableWritablePair } from "./stream.js"; | ||
get readable(): ReadableStream<T>; | ||
get writable(): WritableStream<Uint8Array>; | ||
constructor(transform: (stream: BufferedReadableStream) => ValueOrPromise<T>); | ||
get writable(): WritableStream<Uint8Array<ArrayBufferLike>>; | ||
constructor(transform: (stream: BufferedReadableStream) => MaybePromiseLike<T>); | ||
} | ||
//# sourceMappingURL=buffered-transform.d.ts.map |
@@ -0,1 +1,2 @@ | ||
import type { MaybePromiseLike } from "@yume-chan/async"; | ||
import type { AsyncExactReadable } from "@yume-chan/struct"; | ||
@@ -9,9 +10,5 @@ import type { ReadableStream, ReadableStreamDefaultReader } from "./stream.js"; | ||
constructor(stream: ReadableStream<Uint8Array>); | ||
iterateExactly(length: number): Iterator<MaybePromiseLike<Uint8Array>, void, void>; | ||
readExactly: (this: BufferedReadableStream, length: number) => MaybePromiseLike<Uint8Array<ArrayBufferLike>>; | ||
/** | ||
* | ||
* @param length | ||
* @returns | ||
*/ | ||
readExactly(length: number): Uint8Array | Promise<Uint8Array>; | ||
/** | ||
* Return a readable stream with unconsumed data (if any) and | ||
@@ -18,0 +15,0 @@ * all data from the wrapped stream. |
@@ -1,2 +0,2 @@ | ||
import { ExactReadableEndedError } from "@yume-chan/struct"; | ||
import { bipedal, ExactReadableEndedError } from "@yume-chan/struct"; | ||
import { PushReadableStream } from "./push-readable.js"; | ||
@@ -6,2 +6,4 @@ import { tryCancel } from "./try-close.js"; | ||
#buffered; | ||
// PERF: `subarray` is slow | ||
// don't use it until absolutely necessary | ||
#bufferedOffset = 0; | ||
@@ -19,3 +21,21 @@ #bufferedLength = 0; | ||
} | ||
async #readSource() { | ||
#readBuffered(length) { | ||
if (!this.#buffered) { | ||
return undefined; | ||
} | ||
const value = this.#buffered.subarray(this.#bufferedOffset, this.#bufferedOffset + length); | ||
// PERF: Synchronous path for reading from internal buffer | ||
if (this.#bufferedLength > length) { | ||
this.#position += length; | ||
this.#bufferedOffset += length; | ||
this.#bufferedLength -= length; | ||
return value; | ||
} | ||
this.#position += this.#bufferedLength; | ||
this.#buffered = undefined; | ||
this.#bufferedOffset = 0; | ||
this.#bufferedLength = 0; | ||
return value; | ||
} | ||
async #readSource(length) { | ||
const { done, value } = await this.reader.read(); | ||
@@ -25,81 +45,78 @@ if (done) { | ||
} | ||
if (value.length > length) { | ||
this.#buffered = value; | ||
this.#bufferedOffset = length; | ||
this.#bufferedLength = value.length - length; | ||
this.#position += length; | ||
return value.subarray(0, length); | ||
} | ||
this.#position += value.length; | ||
return value; | ||
} | ||
async #readAsync(length, initial) { | ||
iterateExactly(length) { | ||
let state = this.#buffered ? 0 : 1; | ||
return { | ||
next: () => { | ||
switch (state) { | ||
case 0: { | ||
const value = this.#readBuffered(length); | ||
if (value.length === length) { | ||
state = 2; | ||
} | ||
else { | ||
length -= value.length; | ||
state = 1; | ||
} | ||
return { done: false, value }; | ||
} | ||
case 1: | ||
state = 3; | ||
return { | ||
done: false, | ||
value: this.#readSource(length).then((value) => { | ||
if (value.length === length) { | ||
state = 2; | ||
} | ||
else { | ||
length -= value.length; | ||
state = 1; | ||
} | ||
return value; | ||
}), | ||
}; | ||
case 2: | ||
return { done: true, value: undefined }; | ||
case 3: | ||
throw new Error("Can't call `next` before previous Promise resolves"); | ||
default: | ||
throw new Error("unreachable"); | ||
} | ||
}, | ||
}; | ||
} | ||
readExactly = bipedal(function* (then, length) { | ||
let result; | ||
let index; | ||
let index = 0; | ||
const initial = this.#readBuffered(length); | ||
if (initial) { | ||
if (initial.length === length) { | ||
return initial; | ||
} | ||
result = new Uint8Array(length); | ||
result.set(initial); | ||
index = initial.length; | ||
result.set(initial, index); | ||
index += initial.length; | ||
length -= initial.length; | ||
} | ||
else { | ||
const array = await this.#readSource(); | ||
if (array.length === length) { | ||
this.#position += length; | ||
return array; | ||
} | ||
if (array.length > length) { | ||
this.#buffered = array; | ||
this.#bufferedOffset = length; | ||
this.#bufferedLength = array.length - length; | ||
this.#position += length; | ||
return array.subarray(0, length); | ||
} | ||
result = new Uint8Array(length); | ||
result.set(array); | ||
index = array.length; | ||
length -= array.length; | ||
this.#position += array.length; | ||
} | ||
while (length > 0) { | ||
const array = await this.#readSource(); | ||
if (array.length === length) { | ||
result.set(array, index); | ||
this.#position += length; | ||
return result; | ||
} | ||
if (array.length > length) { | ||
this.#buffered = array; | ||
this.#bufferedOffset = length; | ||
this.#bufferedLength = array.length - length; | ||
result.set(array.subarray(0, length), index); | ||
this.#position += length; | ||
return result; | ||
} | ||
result.set(array, index); | ||
index += array.length; | ||
length -= array.length; | ||
this.#position += array.length; | ||
const value = yield* then(this.#readSource(length)); | ||
result.set(value, index); | ||
index += value.length; | ||
length -= value.length; | ||
} | ||
return result; | ||
} | ||
}); | ||
/** | ||
* | ||
* @param length | ||
* @returns | ||
*/ | ||
readExactly(length) { | ||
// PERF: Add a synchronous path for reading from internal buffer | ||
if (this.#buffered) { | ||
const array = this.#buffered; | ||
const offset = this.#bufferedOffset; | ||
if (this.#bufferedLength > length) { | ||
// PERF: `subarray` is slow | ||
// don't use it until absolutely necessary | ||
this.#bufferedOffset += length; | ||
this.#bufferedLength -= length; | ||
this.#position += length; | ||
return array.subarray(offset, offset + length); | ||
} | ||
this.#buffered = undefined; | ||
this.#bufferedLength = 0; | ||
this.#bufferedOffset = 0; | ||
this.#position += array.length - offset; | ||
return this.#readAsync(length, array.subarray(offset)); | ||
} | ||
return this.#readAsync(length); | ||
} | ||
/** | ||
* Return a readable stream with unconsumed data (if any) and | ||
@@ -106,0 +123,0 @@ * all data from the wrapped stream. |
import { PromiseResolver } from "@yume-chan/async"; | ||
import { EMPTY_UINT8_ARRAY } from "@yume-chan/struct"; | ||
import { EmptyUint8Array } from "@yume-chan/struct"; | ||
import { ReadableStream, WritableStream } from "./stream.js"; | ||
@@ -82,3 +82,3 @@ // `TransformStream` only calls its `source.flush` method when its `readable` is being read. | ||
case 0: | ||
result = EMPTY_UINT8_ARRAY; | ||
result = EmptyUint8Array; | ||
break; | ||
@@ -85,0 +85,0 @@ case 1: |
import type { QueuingStrategy, WritableStreamDefaultController, WritableStreamDefaultWriter } from "./stream.js"; | ||
import { ReadableStream as NativeReadableStream, WritableStream as NativeWritableStream } from "./stream.js"; | ||
declare class WritableStream<in T> extends NativeWritableStream<Consumable<T>> { | ||
static write<T>(writer: WritableStreamDefaultWriter<Consumable<T>>, value: T): Promise<void>; | ||
constructor(sink: Consumable.WritableStreamSink<T>, strategy?: QueuingStrategy<T>); | ||
} | ||
declare class ReadableStream<T> extends NativeReadableStream<Consumable<T>> { | ||
static enqueue<T>(controller: { | ||
enqueue: (chunk: Consumable<T>) => void; | ||
}, chunk: T): Promise<void>; | ||
constructor(source: Consumable.ReadableStreamSource<T>, strategy?: QueuingStrategy<T>); | ||
} | ||
export declare class Consumable<T> { | ||
#private; | ||
static readonly WritableStream: { | ||
new <in T_1>(sink: Consumable.WritableStreamSink<T_1>, strategy?: QueuingStrategy<T_1>): { | ||
readonly locked: boolean; | ||
abort(reason?: unknown): Promise<void>; | ||
close(): Promise<undefined>; | ||
getWriter(): WritableStreamDefaultWriter<Consumable<T_1>>; | ||
}; | ||
write<T_1>(writer: WritableStreamDefaultWriter<Consumable<T_1>>, value: T_1): Promise<void>; | ||
}; | ||
static readonly ReadableStream: { | ||
new <T_1>(source: Consumable.ReadableStreamSource<T_1>, strategy?: QueuingStrategy<T_1>): { | ||
readonly locked: boolean; | ||
cancel(reason?: unknown): Promise<void>; | ||
getReader({ mode }: { | ||
mode: "byob"; | ||
}): import("./types.js").ReadableStreamBYOBReader; | ||
getReader(): import("./types.js").ReadableStreamDefaultReader<Consumable<T_1>>; | ||
pipeThrough<RS extends import("./types.js").ReadableStream<unknown>>(transform: { | ||
readable: RS; | ||
writable: import("./types.js").WritableStream<Consumable<T_1>>; | ||
}, options?: import("./types.js").StreamPipeOptions): RS; | ||
pipeTo(destination: import("./types.js").WritableStream<Consumable<T_1>>, options?: import("./types.js").StreamPipeOptions): Promise<void>; | ||
tee(): [import("./types.js").ReadableStream<Consumable<T_1>>, import("./types.js").ReadableStream<Consumable<T_1>>]; | ||
values(options?: import("./types.js").ReadableStreamIteratorOptions): import("./types.js").ReadableStreamAsyncIterator<Consumable<T_1>>; | ||
[Symbol.asyncIterator](options?: import("./types.js").ReadableStreamIteratorOptions): import("./types.js").ReadableStreamAsyncIterator<Consumable<T_1>>; | ||
}; | ||
enqueue<T_1>(controller: { | ||
enqueue: (chunk: Consumable<T_1>) => void; | ||
}, chunk: T_1): Promise<void>; | ||
from<R>(asyncIterable: Iterable<R> | AsyncIterable<R> | import("./types.js").ReadableStreamLike<R>): import("./types.js").ReadableStream<R>; | ||
}; | ||
static readonly WritableStream: typeof WritableStream; | ||
static readonly ReadableStream: typeof ReadableStream; | ||
readonly value: T; | ||
@@ -62,2 +44,3 @@ readonly consumed: Promise<void>; | ||
} | ||
export {}; | ||
//# sourceMappingURL=consumable.d.ts.map |
@@ -1,87 +0,87 @@ | ||
import { PromiseResolver } from "@yume-chan/async"; | ||
import { PromiseResolver, isPromiseLike } from "@yume-chan/async"; | ||
import { ReadableStream as NativeReadableStream, WritableStream as NativeWritableStream, } from "./stream.js"; | ||
import { createTask } from "./task.js"; | ||
function isPromiseLike(value) { | ||
return typeof value === "object" && value !== null && "then" in value; | ||
} | ||
export class Consumable { | ||
static WritableStream = class WritableStream extends NativeWritableStream { | ||
static async write(writer, value) { | ||
const consumable = new Consumable(value); | ||
await writer.write(consumable); | ||
await consumable.consumed; | ||
} | ||
constructor(sink, strategy) { | ||
let wrappedStrategy; | ||
if (strategy) { | ||
wrappedStrategy = {}; | ||
if ("highWaterMark" in strategy) { | ||
wrappedStrategy.highWaterMark = strategy.highWaterMark; | ||
} | ||
if ("size" in strategy) { | ||
wrappedStrategy.size = (chunk) => { | ||
return strategy.size(chunk instanceof Consumable ? chunk.value : chunk); | ||
}; | ||
} | ||
// Workaround https://github.com/evanw/esbuild/issues/3923 | ||
class WritableStream extends NativeWritableStream { | ||
static async write(writer, value) { | ||
const consumable = new Consumable(value); | ||
await writer.write(consumable); | ||
await consumable.consumed; | ||
} | ||
constructor(sink, strategy) { | ||
let wrappedStrategy; | ||
if (strategy) { | ||
wrappedStrategy = {}; | ||
if ("highWaterMark" in strategy) { | ||
wrappedStrategy.highWaterMark = strategy.highWaterMark; | ||
} | ||
super({ | ||
start(controller) { | ||
return sink.start?.(controller); | ||
}, | ||
async write(chunk, controller) { | ||
await chunk.tryConsume((chunk) => sink.write?.(chunk, controller)); | ||
}, | ||
abort(reason) { | ||
return sink.abort?.(reason); | ||
}, | ||
close() { | ||
return sink.close?.(); | ||
}, | ||
}, wrappedStrategy); | ||
if ("size" in strategy) { | ||
wrappedStrategy.size = (chunk) => { | ||
return strategy.size(chunk instanceof Consumable ? chunk.value : chunk); | ||
}; | ||
} | ||
} | ||
}; | ||
static ReadableStream = class ReadableStream extends NativeReadableStream { | ||
static async enqueue(controller, chunk) { | ||
const output = new Consumable(chunk); | ||
controller.enqueue(output); | ||
await output.consumed; | ||
} | ||
constructor(source, strategy) { | ||
let wrappedController; | ||
let wrappedStrategy; | ||
if (strategy) { | ||
wrappedStrategy = {}; | ||
if ("highWaterMark" in strategy) { | ||
wrappedStrategy.highWaterMark = strategy.highWaterMark; | ||
} | ||
if ("size" in strategy) { | ||
wrappedStrategy.size = (chunk) => { | ||
return strategy.size(chunk.value); | ||
}; | ||
} | ||
super({ | ||
start(controller) { | ||
return sink.start?.(controller); | ||
}, | ||
async write(chunk, controller) { | ||
await chunk.tryConsume((chunk) => sink.write?.(chunk, controller)); | ||
}, | ||
abort(reason) { | ||
return sink.abort?.(reason); | ||
}, | ||
close() { | ||
return sink.close?.(); | ||
}, | ||
}, wrappedStrategy); | ||
} | ||
} | ||
class ReadableStream extends NativeReadableStream { | ||
static async enqueue(controller, chunk) { | ||
const output = new Consumable(chunk); | ||
controller.enqueue(output); | ||
await output.consumed; | ||
} | ||
constructor(source, strategy) { | ||
let wrappedController; | ||
let wrappedStrategy; | ||
if (strategy) { | ||
wrappedStrategy = {}; | ||
if ("highWaterMark" in strategy) { | ||
wrappedStrategy.highWaterMark = strategy.highWaterMark; | ||
} | ||
super({ | ||
async start(controller) { | ||
wrappedController = { | ||
async enqueue(chunk) { | ||
await ReadableStream.enqueue(controller, chunk); | ||
}, | ||
close() { | ||
controller.close(); | ||
}, | ||
error(reason) { | ||
controller.error(reason); | ||
}, | ||
}; | ||
await source.start?.(wrappedController); | ||
}, | ||
async pull() { | ||
await source.pull?.(wrappedController); | ||
}, | ||
async cancel(reason) { | ||
await source.cancel?.(reason); | ||
}, | ||
}, wrappedStrategy); | ||
if ("size" in strategy) { | ||
wrappedStrategy.size = (chunk) => { | ||
return strategy.size(chunk.value); | ||
}; | ||
} | ||
} | ||
}; | ||
super({ | ||
async start(controller) { | ||
wrappedController = { | ||
async enqueue(chunk) { | ||
await ReadableStream.enqueue(controller, chunk); | ||
}, | ||
close() { | ||
controller.close(); | ||
}, | ||
error(reason) { | ||
controller.error(reason); | ||
}, | ||
}; | ||
await source.start?.(wrappedController); | ||
}, | ||
async pull() { | ||
await source.pull?.(wrappedController); | ||
}, | ||
async cancel(reason) { | ||
await source.cancel?.(reason); | ||
}, | ||
}, wrappedStrategy); | ||
} | ||
} | ||
export class Consumable { | ||
static WritableStream = WritableStream; | ||
static ReadableStream = ReadableStream; | ||
#task; | ||
@@ -88,0 +88,0 @@ #resolver; |
@@ -1,2 +0,2 @@ | ||
import type { ValueOrPromise } from "@yume-chan/struct"; | ||
import type { MaybePromiseLike } from "@yume-chan/async"; | ||
import type { QueuingStrategy, ReadableStream } from "./stream.js"; | ||
@@ -18,3 +18,3 @@ import { WritableStream } from "./stream.js"; | ||
*/ | ||
close?: (() => ValueOrPromise<boolean | void>) | undefined; | ||
close?: (() => MaybePromiseLike<boolean | void>) | undefined; | ||
/** | ||
@@ -21,0 +21,0 @@ * Callback when any `ReadableStream` is closed (the other peer doesn't produce any more data), |
import { Consumable } from "./consumable.js"; | ||
import type { MaybeConsumable } from "./maybe-consumable.js"; | ||
import type { QueuingStrategy, WritableStreamDefaultController } from "./stream.js"; | ||
import { WritableStream as NativeWritableStream, TransformStream } from "./stream.js"; | ||
import { WritableStream as NativeWritableStream } from "./stream.js"; | ||
export declare function getValue<T>(value: MaybeConsumable<T>): T; | ||
export declare function tryConsume<T, R>(value: T, callback: (value: T extends Consumable<infer U> ? U : T) => R): R; | ||
export declare class UnwrapStream<T> extends TransformStream<MaybeConsumable<T>, T> { | ||
constructor(); | ||
} | ||
export interface WritableStreamSink<in T> { | ||
@@ -11,0 +8,0 @@ start?(controller: WritableStreamDefaultController): void | PromiseLike<void>; |
import { Consumable } from "./consumable.js"; | ||
import { WritableStream as NativeWritableStream, TransformStream, } from "./stream.js"; | ||
import { WritableStream as NativeWritableStream } from "./stream.js"; | ||
export function getValue(value) { | ||
@@ -14,13 +14,2 @@ return value instanceof Consumable ? value.value : value; | ||
} | ||
export class UnwrapStream extends TransformStream { | ||
constructor() { | ||
super({ | ||
transform(chunk, controller) { | ||
tryConsume(chunk, (chunk) => { | ||
controller.enqueue(chunk); | ||
}); | ||
}, | ||
}); | ||
} | ||
} | ||
export class WritableStream extends NativeWritableStream { | ||
@@ -27,0 +16,0 @@ constructor(sink, strategy) { |
@@ -33,3 +33,3 @@ import { PromiseResolver } from "@yume-chan/async"; | ||
// before calling `enqueue`, as it might change when waiting | ||
// for the backpressure to be resolved. | ||
// for the backpressure to be reduced. | ||
// | ||
@@ -44,3 +44,3 @@ // So IMO it's better to handle this for the producer | ||
// Obviously, the producer should listen to the `abortSignal` and | ||
// stop producing, but most pushing data sources can't be stopped. | ||
// stop producing, but most pushing data sources don't support that. | ||
logger?.({ | ||
@@ -47,0 +47,0 @@ source: "producer", |
import type { AbortSignal, ReadableStream as ReadableStreamType, TransformStream as TransformStreamType, WritableStream as WritableStreamType } from "./types.js"; | ||
export * from "./types.js"; | ||
export { ReadableStream }; | ||
/** A controller object that allows you to abort one or more DOM requests as and when desired. */ | ||
@@ -18,6 +19,8 @@ export interface AbortController { | ||
} | ||
export type ReadableStream<out T> = ReadableStreamType<T>; | ||
export type WritableStream<in T> = WritableStreamType<T>; | ||
export declare const AbortController: AbortControllerConstructor; | ||
export type ReadableStream<T> = ReadableStreamType<T>; | ||
export type WritableStream<T> = WritableStreamType<T>; | ||
export type TransformStream<I, O> = TransformStreamType<I, O>; | ||
export declare const AbortController: AbortControllerConstructor, ReadableStream: typeof ReadableStreamType, WritableStream: typeof WritableStreamType, TransformStream: typeof TransformStreamType; | ||
declare const ReadableStream: typeof ReadableStreamType; | ||
export declare const WritableStream: typeof WritableStreamType, TransformStream: typeof TransformStreamType; | ||
//# sourceMappingURL=stream.d.ts.map |
export * from "./types.js"; | ||
export const { AbortController, ReadableStream, WritableStream, TransformStream, } = globalThis; | ||
export { ReadableStream }; | ||
export const { AbortController } = globalThis; | ||
const ReadableStream = /* #__PURE__ */ (() => { | ||
const { ReadableStream } = globalThis; | ||
if (!ReadableStream.from) { | ||
ReadableStream.from = function (iterable) { | ||
const iterator = Symbol.asyncIterator in iterable | ||
? iterable[Symbol.asyncIterator]() | ||
: iterable[Symbol.iterator](); | ||
return new ReadableStream({ | ||
async pull(controller) { | ||
const result = await iterator.next(); | ||
if (result.done) { | ||
controller.close(); | ||
return; | ||
} | ||
controller.enqueue(result.value); | ||
}, | ||
async cancel(reason) { | ||
await iterator.return?.(reason); | ||
}, | ||
}); | ||
}; | ||
} | ||
if (!ReadableStream.prototype[Symbol.asyncIterator] || | ||
!ReadableStream.prototype.values) { | ||
ReadableStream.prototype.values = async function* (options) { | ||
const reader = this.getReader(); | ||
try { | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
return; | ||
} | ||
yield value; | ||
} | ||
} | ||
finally { | ||
// Calling `iterator.return` will enter this `finally` block. | ||
// We don't need to care about the parameter to `iterator.return`, | ||
// it will be returned as the final `result.value` automatically. | ||
if (!options?.preventCancel) { | ||
await reader.cancel(); | ||
} | ||
reader.releaseLock(); | ||
} | ||
}; | ||
ReadableStream.prototype[Symbol.asyncIterator] = | ||
// eslint-disable-next-line @typescript-eslint/unbound-method | ||
ReadableStream.prototype.values; | ||
} | ||
return ReadableStream; | ||
})(); | ||
export const { WritableStream, TransformStream } = globalThis; | ||
//# sourceMappingURL=stream.js.map |
@@ -1,7 +0,6 @@ | ||
import type Struct from "@yume-chan/struct"; | ||
import type { StructValueType } from "@yume-chan/struct"; | ||
import type { StructLike } from "@yume-chan/struct"; | ||
import { BufferedTransformStream } from "./buffered-transform.js"; | ||
export declare class StructDeserializeStream<T extends Struct<object, PropertyKey, object, unknown>> extends BufferedTransformStream<StructValueType<T>> { | ||
constructor(struct: T); | ||
export declare class StructDeserializeStream<T> extends BufferedTransformStream<T> { | ||
constructor(struct: StructLike<T>); | ||
} | ||
//# sourceMappingURL=struct-deserialize.d.ts.map |
@@ -1,6 +0,6 @@ | ||
import type Struct from "@yume-chan/struct"; | ||
import type { StructInit, StructLike } from "@yume-chan/struct"; | ||
import { TransformStream } from "./stream.js"; | ||
export declare class StructSerializeStream<T extends Struct<object, PropertyKey, object, unknown>> extends TransformStream<T["TInit"], Uint8Array> { | ||
export declare class StructSerializeStream<T extends StructLike<unknown>> extends TransformStream<StructInit<T>, Uint8Array> { | ||
constructor(struct: T); | ||
} | ||
//# sourceMappingURL=struct-serialize.d.ts.map |
// `createTask` allows browser DevTools to track the call stack across async boundaries. | ||
const { console } = globalThis; | ||
export const createTask = console?.createTask?.bind(console) ?? | ||
export const createTask = /* #__PURE__ */ (() => console?.createTask?.bind(console) ?? | ||
(() => ({ | ||
@@ -8,3 +8,3 @@ run(callback) { | ||
}, | ||
})); | ||
})))(); | ||
//# sourceMappingURL=task.js.map |
@@ -219,3 +219,3 @@ /** | ||
*/ | ||
static from<R>(asyncIterable: Iterable<R> | AsyncIterable<R> | ReadableStreamLike<R>): ReadableStream<R>; | ||
static from<R>(asyncIterable: Iterable<R> | AsyncIterable<R>): ReadableStream<R>; | ||
} | ||
@@ -228,3 +228,3 @@ /** | ||
export declare interface ReadableStreamAsyncIterator<R> extends AsyncIterableIterator<R> { | ||
next(): Promise<IteratorResult<R, undefined>>; | ||
next(): Promise<IteratorResult<R, void>>; | ||
return(value?: R): Promise<IteratorResult<R>>; | ||
@@ -231,0 +231,0 @@ } |
@@ -1,9 +0,10 @@ | ||
import type { ValueOrPromise } from "@yume-chan/struct"; | ||
import type { MaybePromiseLike } from "@yume-chan/async"; | ||
import type { QueuingStrategy, ReadableStreamDefaultController } from "./stream.js"; | ||
import { ReadableStream } from "./stream.js"; | ||
export type WrapReadableStreamStart<T> = (controller: ReadableStreamDefaultController<T>) => ValueOrPromise<ReadableStream<T>>; | ||
export type WrapReadableStreamStart<T> = (controller: ReadableStreamDefaultController<T>) => MaybePromiseLike<ReadableStream<T>>; | ||
export interface ReadableStreamWrapper<T> { | ||
start: WrapReadableStreamStart<T>; | ||
cancel?(reason?: unknown): ValueOrPromise<void>; | ||
close?(): ValueOrPromise<void>; | ||
cancel?: (reason?: unknown) => MaybePromiseLike<void>; | ||
close?: () => MaybePromiseLike<void>; | ||
error?: (reason?: unknown) => MaybePromiseLike<void>; | ||
} | ||
@@ -10,0 +11,0 @@ /** |
@@ -28,13 +28,18 @@ import { ReadableStream } from "./stream.js"; | ||
start: async (controller) => { | ||
// `start` is invoked before `ReadableStream`'s constructor finish, | ||
// so using `this` synchronously causes | ||
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". | ||
// Queue a microtask to avoid this. | ||
await Promise.resolve(); | ||
this.readable = await getWrappedReadableStream(wrapper, controller); | ||
const readable = await getWrappedReadableStream(wrapper, controller); | ||
// `start` is called in `super()`, so can't use `this` synchronously. | ||
// but it's fine after the first `await` | ||
this.readable = readable; | ||
this.#reader = this.readable.getReader(); | ||
}, | ||
pull: async (controller) => { | ||
const result = await this.#reader.read(); | ||
if (result.done) { | ||
const { done, value } = await this.#reader | ||
.read() | ||
.catch((e) => { | ||
if ("error" in wrapper) { | ||
wrapper.error(e); | ||
} | ||
throw e; | ||
}); | ||
if (done) { | ||
controller.close(); | ||
@@ -46,3 +51,3 @@ if ("close" in wrapper) { | ||
else { | ||
controller.enqueue(result.value); | ||
controller.enqueue(value); | ||
} | ||
@@ -49,0 +54,0 @@ }, |
@@ -1,5 +0,5 @@ | ||
import type { ValueOrPromise } from "@yume-chan/struct"; | ||
import type { MaybePromiseLike } from "@yume-chan/async"; | ||
import type { TransformStream } from "./stream.js"; | ||
import { WritableStream } from "./stream.js"; | ||
export type WrapWritableStreamStart<T> = () => ValueOrPromise<WritableStream<T>>; | ||
export type WrapWritableStreamStart<T> = () => MaybePromiseLike<WritableStream<T>>; | ||
export interface WritableStreamWrapper<T> { | ||
@@ -6,0 +6,0 @@ start: WrapWritableStreamStart<T>; |
@@ -21,8 +21,6 @@ import { WritableStream } from "./stream.js"; | ||
start: async () => { | ||
// `start` is invoked before `ReadableStream`'s constructor finish, | ||
// so using `this` synchronously causes | ||
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". | ||
// Queue a microtask to avoid this. | ||
await Promise.resolve(); | ||
this.writable = await getWrappedWritableStream(start); | ||
const writable = await getWrappedWritableStream(start); | ||
// `start` is called in `super()`, so can't use `this` synchronously. | ||
// but it's fine after the first `await` | ||
this.writable = writable; | ||
this.#writer = this.writable.getWriter(); | ||
@@ -29,0 +27,0 @@ }, |
{ | ||
"name": "@yume-chan/stream-extra", | ||
"version": "0.0.0-next-20240917062356", | ||
"version": "0.0.0-next-20241129144018", | ||
"description": "Extensions to Web Streams API", | ||
@@ -29,12 +29,12 @@ "keywords": [ | ||
"dependencies": { | ||
"@yume-chan/async": "^2.2.0", | ||
"@yume-chan/struct": "^0.0.0-next-20240917062356" | ||
"@yume-chan/async": "^4.0.2", | ||
"@yume-chan/struct": "^0.0.0-next-20241129144018" | ||
}, | ||
"devDependencies": { | ||
"@types/node": "^22.5.5", | ||
"prettier": "^3.3.3", | ||
"typescript": "^5.6.2", | ||
"@types/node": "^22.10.0", | ||
"prettier": "^3.4.1", | ||
"typescript": "^5.7.2", | ||
"@yume-chan/tsconfig": "^1.0.0", | ||
"@yume-chan/test-runner": "^1.0.0", | ||
"@yume-chan/eslint-config": "^1.0.0", | ||
"@yume-chan/tsconfig": "^1.0.0" | ||
"@yume-chan/eslint-config": "^1.0.0" | ||
}, | ||
@@ -41,0 +41,0 @@ "scripts": { |
@@ -1,2 +0,2 @@ | ||
import type { ValueOrPromise } from "@yume-chan/struct"; | ||
import type { MaybePromiseLike } from "@yume-chan/async"; | ||
import { StructEmptyError } from "@yume-chan/struct"; | ||
@@ -25,3 +25,3 @@ | ||
constructor( | ||
transform: (stream: BufferedReadableStream) => ValueOrPromise<T>, | ||
transform: (stream: BufferedReadableStream) => MaybePromiseLike<T>, | ||
) { | ||
@@ -28,0 +28,0 @@ // Convert incoming chunks to a `BufferedReadableStream` |
@@ -0,3 +1,4 @@ | ||
import type { MaybePromiseLike } from "@yume-chan/async"; | ||
import type { AsyncExactReadable } from "@yume-chan/struct"; | ||
import { ExactReadableEndedError } from "@yume-chan/struct"; | ||
import { bipedal, ExactReadableEndedError } from "@yume-chan/struct"; | ||
@@ -10,2 +11,4 @@ import { PushReadableStream } from "./push-readable.js"; | ||
#buffered: Uint8Array | undefined; | ||
// PERF: `subarray` is slow | ||
// don't use it until absolutely necessary | ||
#bufferedOffset = 0; | ||
@@ -27,3 +30,28 @@ #bufferedLength = 0; | ||
async #readSource() { | ||
#readBuffered(length: number) { | ||
if (!this.#buffered) { | ||
return undefined; | ||
} | ||
const value = this.#buffered.subarray( | ||
this.#bufferedOffset, | ||
this.#bufferedOffset + length, | ||
); | ||
// PERF: Synchronous path for reading from internal buffer | ||
if (this.#bufferedLength > length) { | ||
this.#position += length; | ||
this.#bufferedOffset += length; | ||
this.#bufferedLength -= length; | ||
return value; | ||
} | ||
this.#position += this.#bufferedLength; | ||
this.#buffered = undefined; | ||
this.#bufferedOffset = 0; | ||
this.#bufferedLength = 0; | ||
return value; | ||
} | ||
async #readSource(length: number): Promise<Uint8Array> { | ||
const { done, value } = await this.reader.read(); | ||
@@ -33,92 +61,92 @@ if (done) { | ||
} | ||
if (value.length > length) { | ||
this.#buffered = value; | ||
this.#bufferedOffset = length; | ||
this.#bufferedLength = value.length - length; | ||
this.#position += length; | ||
return value.subarray(0, length); | ||
} | ||
this.#position += value.length; | ||
return value; | ||
} | ||
async #readAsync(length: number, initial?: Uint8Array) { | ||
let result: Uint8Array; | ||
let index: number; | ||
iterateExactly( | ||
length: number, | ||
): Iterator<MaybePromiseLike<Uint8Array>, void, void> { | ||
let state = this.#buffered ? 0 : 1; | ||
return { | ||
next: () => { | ||
switch (state) { | ||
case 0: { | ||
const value = this.#readBuffered(length)!; | ||
if (value.length === length) { | ||
state = 2; | ||
} else { | ||
length -= value.length; | ||
state = 1; | ||
} | ||
return { done: false, value }; | ||
} | ||
case 1: | ||
state = 3; | ||
return { | ||
done: false, | ||
value: this.#readSource(length).then((value) => { | ||
if (value.length === length) { | ||
state = 2; | ||
} else { | ||
length -= value.length; | ||
state = 1; | ||
} | ||
return value; | ||
}), | ||
}; | ||
case 2: | ||
return { done: true, value: undefined }; | ||
case 3: | ||
throw new Error( | ||
"Can't call `next` before previous Promise resolves", | ||
); | ||
default: | ||
throw new Error("unreachable"); | ||
} | ||
}, | ||
}; | ||
} | ||
readExactly = bipedal(function* ( | ||
this: BufferedReadableStream, | ||
then, | ||
length: number, | ||
) { | ||
let result: Uint8Array | undefined; | ||
let index = 0; | ||
const initial = this.#readBuffered(length); | ||
if (initial) { | ||
if (initial.length === length) { | ||
return initial; | ||
} | ||
result = new Uint8Array(length); | ||
result.set(initial); | ||
index = initial.length; | ||
result.set(initial, index); | ||
index += initial.length; | ||
length -= initial.length; | ||
} else { | ||
const array = await this.#readSource(); | ||
if (array.length === length) { | ||
this.#position += length; | ||
return array; | ||
} | ||
if (array.length > length) { | ||
this.#buffered = array; | ||
this.#bufferedOffset = length; | ||
this.#bufferedLength = array.length - length; | ||
this.#position += length; | ||
return array.subarray(0, length); | ||
} | ||
result = new Uint8Array(length); | ||
result.set(array); | ||
index = array.length; | ||
length -= array.length; | ||
this.#position += array.length; | ||
} | ||
while (length > 0) { | ||
const array = await this.#readSource(); | ||
if (array.length === length) { | ||
result.set(array, index); | ||
this.#position += length; | ||
return result; | ||
} | ||
if (array.length > length) { | ||
this.#buffered = array; | ||
this.#bufferedOffset = length; | ||
this.#bufferedLength = array.length - length; | ||
result.set(array.subarray(0, length), index); | ||
this.#position += length; | ||
return result; | ||
} | ||
result.set(array, index); | ||
index += array.length; | ||
length -= array.length; | ||
this.#position += array.length; | ||
const value = yield* then(this.#readSource(length)); | ||
result.set(value, index); | ||
index += value.length; | ||
length -= value.length; | ||
} | ||
return result; | ||
} | ||
}); | ||
/** | ||
* | ||
* @param length | ||
* @returns | ||
*/ | ||
readExactly(length: number): Uint8Array | Promise<Uint8Array> { | ||
// PERF: Add a synchronous path for reading from internal buffer | ||
if (this.#buffered) { | ||
const array = this.#buffered; | ||
const offset = this.#bufferedOffset; | ||
if (this.#bufferedLength > length) { | ||
// PERF: `subarray` is slow | ||
// don't use it until absolutely necessary | ||
this.#bufferedOffset += length; | ||
this.#bufferedLength -= length; | ||
this.#position += length; | ||
return array.subarray(offset, offset + length); | ||
} | ||
this.#buffered = undefined; | ||
this.#bufferedLength = 0; | ||
this.#bufferedOffset = 0; | ||
this.#position += array.length - offset; | ||
return this.#readAsync(length, array.subarray(offset)); | ||
} | ||
return this.#readAsync(length); | ||
} | ||
/** | ||
* Return a readable stream with unconsumed data (if any) and | ||
@@ -125,0 +153,0 @@ * all data from the wrapped stream. |
import { PromiseResolver } from "@yume-chan/async"; | ||
import { EMPTY_UINT8_ARRAY } from "@yume-chan/struct"; | ||
import { EmptyUint8Array } from "@yume-chan/struct"; | ||
@@ -104,3 +104,3 @@ import type { ReadableStreamDefaultController } from "./stream.js"; | ||
case 0: | ||
result = EMPTY_UINT8_ARRAY; | ||
result = EmptyUint8Array; | ||
break; | ||
@@ -107,0 +107,0 @@ case 1: |
@@ -1,2 +0,2 @@ | ||
import { PromiseResolver } from "@yume-chan/async"; | ||
import { PromiseResolver, isPromiseLike } from "@yume-chan/async"; | ||
@@ -15,122 +15,119 @@ import type { | ||
function isPromiseLike(value: unknown): value is PromiseLike<unknown> { | ||
return typeof value === "object" && value !== null && "then" in value; | ||
} | ||
// Workaround https://github.com/evanw/esbuild/issues/3923 | ||
class WritableStream<in T> extends NativeWritableStream<Consumable<T>> { | ||
static async write<T>( | ||
writer: WritableStreamDefaultWriter<Consumable<T>>, | ||
value: T, | ||
) { | ||
const consumable = new Consumable(value); | ||
await writer.write(consumable); | ||
await consumable.consumed; | ||
} | ||
export class Consumable<T> { | ||
static readonly WritableStream = class WritableStream< | ||
in T, | ||
> extends NativeWritableStream<Consumable<T>> { | ||
static async write<T>( | ||
writer: WritableStreamDefaultWriter<Consumable<T>>, | ||
value: T, | ||
) { | ||
const consumable = new Consumable(value); | ||
await writer.write(consumable); | ||
await consumable.consumed; | ||
constructor( | ||
sink: Consumable.WritableStreamSink<T>, | ||
strategy?: QueuingStrategy<T>, | ||
) { | ||
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined; | ||
if (strategy) { | ||
wrappedStrategy = {}; | ||
if ("highWaterMark" in strategy) { | ||
wrappedStrategy.highWaterMark = strategy.highWaterMark; | ||
} | ||
if ("size" in strategy) { | ||
wrappedStrategy.size = (chunk) => { | ||
return strategy.size!( | ||
chunk instanceof Consumable ? chunk.value : chunk, | ||
); | ||
}; | ||
} | ||
} | ||
constructor( | ||
sink: Consumable.WritableStreamSink<T>, | ||
strategy?: QueuingStrategy<T>, | ||
) { | ||
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined; | ||
if (strategy) { | ||
wrappedStrategy = {}; | ||
if ("highWaterMark" in strategy) { | ||
wrappedStrategy.highWaterMark = strategy.highWaterMark; | ||
} | ||
if ("size" in strategy) { | ||
wrappedStrategy.size = (chunk) => { | ||
return strategy.size!( | ||
chunk instanceof Consumable ? chunk.value : chunk, | ||
); | ||
}; | ||
} | ||
} | ||
super( | ||
{ | ||
start(controller) { | ||
return sink.start?.(controller); | ||
}, | ||
async write(chunk, controller) { | ||
await chunk.tryConsume((chunk) => | ||
sink.write?.(chunk, controller), | ||
); | ||
}, | ||
abort(reason) { | ||
return sink.abort?.(reason); | ||
}, | ||
close() { | ||
return sink.close?.(); | ||
}, | ||
super( | ||
{ | ||
start(controller) { | ||
return sink.start?.(controller); | ||
}, | ||
wrappedStrategy, | ||
); | ||
} | ||
}; | ||
async write(chunk, controller) { | ||
await chunk.tryConsume((chunk) => | ||
sink.write?.(chunk, controller), | ||
); | ||
}, | ||
abort(reason) { | ||
return sink.abort?.(reason); | ||
}, | ||
close() { | ||
return sink.close?.(); | ||
}, | ||
}, | ||
wrappedStrategy, | ||
); | ||
} | ||
} | ||
static readonly ReadableStream = class ReadableStream< | ||
T, | ||
> extends NativeReadableStream<Consumable<T>> { | ||
static async enqueue<T>( | ||
controller: { enqueue: (chunk: Consumable<T>) => void }, | ||
chunk: T, | ||
) { | ||
const output = new Consumable(chunk); | ||
controller.enqueue(output); | ||
await output.consumed; | ||
} | ||
class ReadableStream<T> extends NativeReadableStream<Consumable<T>> { | ||
static async enqueue<T>( | ||
controller: { enqueue: (chunk: Consumable<T>) => void }, | ||
chunk: T, | ||
) { | ||
const output = new Consumable(chunk); | ||
controller.enqueue(output); | ||
await output.consumed; | ||
} | ||
constructor( | ||
source: Consumable.ReadableStreamSource<T>, | ||
strategy?: QueuingStrategy<T>, | ||
) { | ||
let wrappedController: | ||
| Consumable.ReadableStreamController<T> | ||
| undefined; | ||
constructor( | ||
source: Consumable.ReadableStreamSource<T>, | ||
strategy?: QueuingStrategy<T>, | ||
) { | ||
let wrappedController: | ||
| Consumable.ReadableStreamController<T> | ||
| undefined; | ||
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined; | ||
if (strategy) { | ||
wrappedStrategy = {}; | ||
if ("highWaterMark" in strategy) { | ||
wrappedStrategy.highWaterMark = strategy.highWaterMark; | ||
} | ||
if ("size" in strategy) { | ||
wrappedStrategy.size = (chunk) => { | ||
return strategy.size!(chunk.value); | ||
}; | ||
} | ||
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined; | ||
if (strategy) { | ||
wrappedStrategy = {}; | ||
if ("highWaterMark" in strategy) { | ||
wrappedStrategy.highWaterMark = strategy.highWaterMark; | ||
} | ||
if ("size" in strategy) { | ||
wrappedStrategy.size = (chunk) => { | ||
return strategy.size!(chunk.value); | ||
}; | ||
} | ||
} | ||
super( | ||
{ | ||
async start(controller) { | ||
wrappedController = { | ||
async enqueue(chunk) { | ||
await ReadableStream.enqueue(controller, chunk); | ||
}, | ||
close() { | ||
controller.close(); | ||
}, | ||
error(reason) { | ||
controller.error(reason); | ||
}, | ||
}; | ||
super( | ||
{ | ||
async start(controller) { | ||
wrappedController = { | ||
async enqueue(chunk) { | ||
await ReadableStream.enqueue(controller, chunk); | ||
}, | ||
close() { | ||
controller.close(); | ||
}, | ||
error(reason) { | ||
controller.error(reason); | ||
}, | ||
}; | ||
await source.start?.(wrappedController); | ||
}, | ||
async pull() { | ||
await source.pull?.(wrappedController!); | ||
}, | ||
async cancel(reason) { | ||
await source.cancel?.(reason); | ||
}, | ||
await source.start?.(wrappedController); | ||
}, | ||
wrappedStrategy, | ||
); | ||
} | ||
}; | ||
async pull() { | ||
await source.pull?.(wrappedController!); | ||
}, | ||
async cancel(reason) { | ||
await source.cancel?.(reason); | ||
}, | ||
}, | ||
wrappedStrategy, | ||
); | ||
} | ||
} | ||
export class Consumable<T> { | ||
static readonly WritableStream = WritableStream; | ||
static readonly ReadableStream = ReadableStream; | ||
readonly #task: Task; | ||
@@ -137,0 +134,0 @@ readonly #resolver: PromiseResolver<void>; |
@@ -0,3 +1,3 @@ | ||
import type { MaybePromiseLike } from "@yume-chan/async"; | ||
import { PromiseResolver } from "@yume-chan/async"; | ||
import type { ValueOrPromise } from "@yume-chan/struct"; | ||
@@ -31,3 +31,3 @@ import type { | ||
*/ | ||
close?: (() => ValueOrPromise<boolean | void>) | undefined; | ||
close?: (() => MaybePromiseLike<boolean | void>) | undefined; | ||
@@ -34,0 +34,0 @@ /** |
@@ -7,6 +7,3 @@ import { Consumable } from "./consumable.js"; | ||
} from "./stream.js"; | ||
import { | ||
WritableStream as NativeWritableStream, | ||
TransformStream, | ||
} from "./stream.js"; | ||
import { WritableStream as NativeWritableStream } from "./stream.js"; | ||
@@ -28,14 +25,2 @@ export function getValue<T>(value: MaybeConsumable<T>): T { | ||
export class UnwrapStream<T> extends TransformStream<MaybeConsumable<T>, T> { | ||
constructor() { | ||
super({ | ||
transform(chunk, controller) { | ||
tryConsume(chunk, (chunk) => { | ||
controller.enqueue(chunk as T); | ||
}); | ||
}, | ||
}); | ||
} | ||
} | ||
export interface WritableStreamSink<in T> { | ||
@@ -42,0 +27,0 @@ start?( |
@@ -78,3 +78,3 @@ import { PromiseResolver } from "@yume-chan/async"; | ||
// before calling `enqueue`, as it might change when waiting | ||
// for the backpressure to be resolved. | ||
// for the backpressure to be reduced. | ||
// | ||
@@ -89,3 +89,3 @@ // So IMO it's better to handle this for the producer | ||
// Obviously, the producer should listen to the `abortSignal` and | ||
// stop producing, but most pushing data sources can't be stopped. | ||
// stop producing, but most pushing data sources don't support that. | ||
logger?.({ | ||
@@ -92,0 +92,0 @@ source: "producer", |
import type { | ||
AbortSignal, | ||
ReadableStreamIteratorOptions, | ||
ReadableStream as ReadableStreamType, | ||
@@ -9,2 +10,3 @@ TransformStream as TransformStreamType, | ||
export * from "./types.js"; | ||
export { ReadableStream }; | ||
@@ -36,11 +38,71 @@ /** A controller object that allows you to abort one or more DOM requests as and when desired. */ | ||
export type ReadableStream<out T> = ReadableStreamType<T>; | ||
export type WritableStream<in T> = WritableStreamType<T>; | ||
export const { AbortController } = globalThis as unknown as GlobalExtension; | ||
export type ReadableStream<T> = ReadableStreamType<T>; | ||
export type WritableStream<T> = WritableStreamType<T>; | ||
export type TransformStream<I, O> = TransformStreamType<I, O>; | ||
export const { | ||
AbortController, | ||
ReadableStream, | ||
WritableStream, | ||
TransformStream, | ||
} = globalThis as unknown as GlobalExtension; | ||
const ReadableStream = /* #__PURE__ */ (() => { | ||
const { ReadableStream } = globalThis as unknown as GlobalExtension; | ||
if (!ReadableStream.from) { | ||
ReadableStream.from = function (iterable) { | ||
const iterator = | ||
Symbol.asyncIterator in iterable | ||
? iterable[Symbol.asyncIterator]() | ||
: iterable[Symbol.iterator](); | ||
return new ReadableStream({ | ||
async pull(controller) { | ||
const result = await iterator.next(); | ||
if (result.done) { | ||
controller.close(); | ||
return; | ||
} | ||
controller.enqueue(result.value); | ||
}, | ||
async cancel(reason) { | ||
await iterator.return?.(reason); | ||
}, | ||
}); | ||
}; | ||
} | ||
if ( | ||
!ReadableStream.prototype[Symbol.asyncIterator] || | ||
!ReadableStream.prototype.values | ||
) { | ||
ReadableStream.prototype.values = async function* <R>( | ||
this: ReadableStream<R>, | ||
options?: ReadableStreamIteratorOptions, | ||
) { | ||
const reader = this.getReader(); | ||
try { | ||
while (true) { | ||
const { done, value } = await reader.read(); | ||
if (done) { | ||
return; | ||
} | ||
yield value; | ||
} | ||
} finally { | ||
// Calling `iterator.return` will enter this `finally` block. | ||
// We don't need to care about the parameter to `iterator.return`, | ||
// it will be returned as the final `result.value` automatically. | ||
if (!options?.preventCancel) { | ||
await reader.cancel(); | ||
} | ||
reader.releaseLock(); | ||
} | ||
}; | ||
ReadableStream.prototype[Symbol.asyncIterator] = | ||
// eslint-disable-next-line @typescript-eslint/unbound-method | ||
ReadableStream.prototype.values; | ||
} | ||
return ReadableStream; | ||
})(); | ||
export const { WritableStream, TransformStream } = | ||
globalThis as unknown as GlobalExtension; |
@@ -1,10 +0,7 @@ | ||
import type Struct from "@yume-chan/struct"; | ||
import type { StructValueType } from "@yume-chan/struct"; | ||
import type { StructLike } from "@yume-chan/struct"; | ||
import { BufferedTransformStream } from "./buffered-transform.js"; | ||
export class StructDeserializeStream< | ||
T extends Struct<object, PropertyKey, object, unknown>, | ||
> extends BufferedTransformStream<StructValueType<T>> { | ||
constructor(struct: T) { | ||
export class StructDeserializeStream<T> extends BufferedTransformStream<T> { | ||
constructor(struct: StructLike<T>) { | ||
super((stream) => { | ||
@@ -11,0 +8,0 @@ return struct.deserialize(stream) as never; |
@@ -1,2 +0,2 @@ | ||
import type Struct from "@yume-chan/struct"; | ||
import type { StructInit, StructLike } from "@yume-chan/struct"; | ||
@@ -6,4 +6,4 @@ import { TransformStream } from "./stream.js"; | ||
export class StructSerializeStream< | ||
T extends Struct<object, PropertyKey, object, unknown>, | ||
> extends TransformStream<T["TInit"], Uint8Array> { | ||
T extends StructLike<unknown>, | ||
> extends TransformStream<StructInit<T>, Uint8Array> { | ||
constructor(struct: T) { | ||
@@ -10,0 +10,0 @@ super({ |
@@ -15,3 +15,3 @@ export interface Task { | ||
const { console } = globalThis as unknown as GlobalExtension; | ||
export const createTask: (name: string) => Task = | ||
export const createTask: (name: string) => Task = /* #__PURE__ */ (() => | ||
console?.createTask?.bind(console) ?? | ||
@@ -22,2 +22,2 @@ (() => ({ | ||
}, | ||
})); | ||
})))(); |
@@ -245,3 +245,3 @@ /// <reference lib="es2018.asynciterable" /> | ||
static from<R>( | ||
asyncIterable: Iterable<R> | AsyncIterable<R> | ReadableStreamLike<R>, | ||
asyncIterable: Iterable<R> | AsyncIterable<R>, | ||
): ReadableStream<R>; | ||
@@ -257,3 +257,3 @@ } | ||
extends AsyncIterableIterator<R> { | ||
next(): Promise<IteratorResult<R, undefined>>; | ||
next(): Promise<IteratorResult<R, void>>; | ||
return(value?: R): Promise<IteratorResult<R>>; | ||
@@ -260,0 +260,0 @@ } |
@@ -1,2 +0,2 @@ | ||
import type { ValueOrPromise } from "@yume-chan/struct"; | ||
import type { MaybePromiseLike } from "@yume-chan/async"; | ||
@@ -12,8 +12,9 @@ import type { | ||
controller: ReadableStreamDefaultController<T>, | ||
) => ValueOrPromise<ReadableStream<T>>; | ||
) => MaybePromiseLike<ReadableStream<T>>; | ||
export interface ReadableStreamWrapper<T> { | ||
start: WrapReadableStreamStart<T>; | ||
cancel?(reason?: unknown): ValueOrPromise<void>; | ||
close?(): ValueOrPromise<void>; | ||
cancel?: (reason?: unknown) => MaybePromiseLike<void>; | ||
close?: () => MaybePromiseLike<void>; | ||
error?: (reason?: unknown) => MaybePromiseLike<void>; | ||
} | ||
@@ -61,17 +62,22 @@ | ||
start: async (controller) => { | ||
// `start` is invoked before `ReadableStream`'s constructor finish, | ||
// so using `this` synchronously causes | ||
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". | ||
// Queue a microtask to avoid this. | ||
await Promise.resolve(); | ||
this.readable = await getWrappedReadableStream( | ||
const readable = await getWrappedReadableStream( | ||
wrapper, | ||
controller, | ||
); | ||
// `start` is called in `super()`, so can't use `this` synchronously. | ||
// but it's fine after the first `await` | ||
this.readable = readable; | ||
this.#reader = this.readable.getReader(); | ||
}, | ||
pull: async (controller) => { | ||
const result = await this.#reader.read(); | ||
if (result.done) { | ||
const { done, value } = await this.#reader | ||
.read() | ||
.catch((e) => { | ||
if ("error" in wrapper) { | ||
wrapper.error(e); | ||
} | ||
throw e; | ||
}); | ||
if (done) { | ||
controller.close(); | ||
@@ -82,3 +88,3 @@ if ("close" in wrapper) { | ||
} else { | ||
controller.enqueue(result.value); | ||
controller.enqueue(value); | ||
} | ||
@@ -85,0 +91,0 @@ }, |
@@ -1,2 +0,2 @@ | ||
import type { ValueOrPromise } from "@yume-chan/struct"; | ||
import type { MaybePromiseLike } from "@yume-chan/async"; | ||
@@ -6,3 +6,3 @@ import type { TransformStream, WritableStreamDefaultWriter } from "./stream.js"; | ||
export type WrapWritableStreamStart<T> = () => ValueOrPromise< | ||
export type WrapWritableStreamStart<T> = () => MaybePromiseLike< | ||
WritableStream<T> | ||
@@ -46,9 +46,6 @@ >; | ||
start: async () => { | ||
// `start` is invoked before `ReadableStream`'s constructor finish, | ||
// so using `this` synchronously causes | ||
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor". | ||
// Queue a microtask to avoid this. | ||
await Promise.resolve(); | ||
this.writable = await getWrappedWritableStream(start); | ||
const writable = await getWrappedWritableStream(start); | ||
// `start` is called in `super()`, so can't use `this` synchronously. | ||
// but it's fine after the first `await` | ||
this.writable = writable; | ||
this.#writer = this.writable.getWriter(); | ||
@@ -55,0 +52,0 @@ }, |
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
Sorry, the diff of this file is not supported yet
4827
280722
116
- Removed@yume-chan/async@2.2.0(transitive)
- Removedtslib@2.8.1(transitive)
Updated@yume-chan/async@^4.0.2