New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@yume-chan/stream-extra

Package Overview
Dependencies
Maintainers
0
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@yume-chan/stream-extra - npm Package Compare versions

Comparing version 0.0.24 to 1.0.0

esm/maybe-consumable-ns.d.ts

23

CHANGELOG.md
# Change Log - @yume-chan/stream-extra
## 1.0.1
### Patch Changes
- 53688d3: Use PNPM workspace and Changesets to manage the monorepo.
Because Changesets doesn't support alpha versions (`0.x.x`), this version is `1.0.0`. Future versions will follow SemVer rules, for example, breaking API changes will introduce a new major version.
- ea5002b: Polyfill `ReadableStream.from` and `ReadableStream.prototype.values`
- Updated dependencies [53688d3]
- Updated dependencies [db8466f]
- Updated dependencies [db8466f]
- @yume-chan/struct@1.0.1
This log was last generated on Tue, 18 Jun 2024 02:49:43 GMT and should not be manually modified.
## 0.0.24
Tue, 18 Jun 2024 02:49:43 GMT

@@ -15,2 +30,3 @@

## 0.0.23
Thu, 21 Mar 2024 03:15:10 GMT

@@ -25,2 +41,3 @@

## 0.0.22
Wed, 13 Dec 2023 05:57:27 GMT

@@ -31,2 +48,3 @@

## 0.0.21
Fri, 25 Aug 2023 14:05:18 GMT

@@ -39,2 +57,3 @@

## 0.0.20
Mon, 05 Jun 2023 02:51:41 GMT

@@ -48,2 +67,3 @@

## 0.0.19
Sun, 09 Apr 2023 05:55:33 GMT

@@ -56,2 +76,3 @@

## 0.0.18
Wed, 25 Jan 2023 21:33:49 GMT

@@ -64,5 +85,5 @@

## 0.0.17
Tue, 18 Oct 2022 09:32:30 GMT
_Initial release_

6

esm/buffered-transform.d.ts

@@ -1,2 +0,2 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import type { MaybePromiseLike } from "@yume-chan/async";
import { BufferedReadableStream } from "./buffered.js";

@@ -8,5 +8,5 @@ import type { ReadableWritablePair } from "./stream.js";

get readable(): ReadableStream<T>;
get writable(): WritableStream<Uint8Array>;
constructor(transform: (stream: BufferedReadableStream) => ValueOrPromise<T>);
get writable(): WritableStream<Uint8Array<ArrayBufferLike>>;
constructor(transform: (stream: BufferedReadableStream) => MaybePromiseLike<T>);
}
//# sourceMappingURL=buffered-transform.d.ts.map

@@ -0,1 +1,2 @@

import type { MaybePromiseLike } from "@yume-chan/async";
import type { AsyncExactReadable } from "@yume-chan/struct";

@@ -9,9 +10,5 @@ import type { ReadableStream, ReadableStreamDefaultReader } from "./stream.js";

constructor(stream: ReadableStream<Uint8Array>);
iterateExactly(length: number): Iterator<MaybePromiseLike<Uint8Array>, void, void>;
readExactly: (this: BufferedReadableStream, length: number) => MaybePromiseLike<Uint8Array<ArrayBufferLike>>;
/**
*
* @param length
* @returns
*/
readExactly(length: number): Uint8Array | Promise<Uint8Array>;
/**
* Return a readable stream with unconsumed data (if any) and

@@ -18,0 +15,0 @@ * all data from the wrapped stream.

@@ -1,8 +0,8 @@

import { ExactReadableEndedError } from "@yume-chan/struct";
import { bipedal, ExactReadableEndedError } from "@yume-chan/struct";
import { PushReadableStream } from "./push-readable.js";
const NOOP = () => {
// no-op
};
import { tryCancel } from "./try-close.js";
export class BufferedReadableStream {
#buffered;
// PERF: `subarray` is slow
// don't use it until absolutely necessary
#bufferedOffset = 0;

@@ -20,3 +20,21 @@ #bufferedLength = 0;

}
async #readSource() {
#readBuffered(length) {
if (!this.#buffered) {
return undefined;
}
const value = this.#buffered.subarray(this.#bufferedOffset, this.#bufferedOffset + length);
// PERF: Synchronous path for reading from internal buffer
if (this.#bufferedLength > length) {
this.#position += length;
this.#bufferedOffset += length;
this.#bufferedLength -= length;
return value;
}
this.#position += this.#bufferedLength;
this.#buffered = undefined;
this.#bufferedOffset = 0;
this.#bufferedLength = 0;
return value;
}
async #readSource(length) {
const { done, value } = await this.reader.read();

@@ -26,81 +44,78 @@ if (done) {

}
if (value.length > length) {
this.#buffered = value;
this.#bufferedOffset = length;
this.#bufferedLength = value.length - length;
this.#position += length;
return value.subarray(0, length);
}
this.#position += value.length;
return value;
}
async #readAsync(length, initial) {
iterateExactly(length) {
let state = this.#buffered ? 0 : 1;
return {
next: () => {
switch (state) {
case 0: {
const value = this.#readBuffered(length);
if (value.length === length) {
state = 2;
}
else {
length -= value.length;
state = 1;
}
return { done: false, value };
}
case 1:
state = 3;
return {
done: false,
value: this.#readSource(length).then((value) => {
if (value.length === length) {
state = 2;
}
else {
length -= value.length;
state = 1;
}
return value;
}),
};
case 2:
return { done: true, value: undefined };
case 3:
throw new Error("Can't call `next` before previous Promise resolves");
default:
throw new Error("unreachable");
}
},
};
}
readExactly = bipedal(function* (then, length) {
let result;
let index;
let index = 0;
const initial = this.#readBuffered(length);
if (initial) {
if (initial.length === length) {
return initial;
}
result = new Uint8Array(length);
result.set(initial);
index = initial.length;
result.set(initial, index);
index += initial.length;
length -= initial.length;
}
else {
const array = await this.#readSource();
if (array.length === length) {
this.#position += length;
return array;
}
if (array.length > length) {
this.#buffered = array;
this.#bufferedOffset = length;
this.#bufferedLength = array.length - length;
this.#position += length;
return array.subarray(0, length);
}
result = new Uint8Array(length);
result.set(array);
index = array.length;
length -= array.length;
this.#position += array.length;
}
while (length > 0) {
const array = await this.#readSource();
if (array.length === length) {
result.set(array, index);
this.#position += length;
return result;
}
if (array.length > length) {
this.#buffered = array;
this.#bufferedOffset = length;
this.#bufferedLength = array.length - length;
result.set(array.subarray(0, length), index);
this.#position += length;
return result;
}
result.set(array, index);
index += array.length;
length -= array.length;
this.#position += array.length;
const value = yield* then(this.#readSource(length));
result.set(value, index);
index += value.length;
length -= value.length;
}
return result;
}
});
/**
*
* @param length
* @returns
*/
readExactly(length) {
// PERF: Add a synchronous path for reading from internal buffer
if (this.#buffered) {
const array = this.#buffered;
const offset = this.#bufferedOffset;
if (this.#bufferedLength > length) {
// PERF: `subarray` is slow
// don't use it until absolutely necessary
this.#bufferedOffset += length;
this.#bufferedLength -= length;
this.#position += length;
return array.subarray(offset, offset + length);
}
this.#buffered = undefined;
this.#bufferedLength = 0;
this.#bufferedOffset = 0;
this.#position += array.length - offset;
return this.#readAsync(length, array.subarray(offset));
}
return this.#readAsync(length);
}
/**
* Return a readable stream with unconsumed data (if any) and

@@ -117,4 +132,3 @@ * all data from the wrapped stream.

controller.abortSignal.addEventListener("abort", () => {
// NOOP: the reader might already be released
this.reader.cancel().catch(NOOP);
void tryCancel(this.reader);
});

@@ -127,5 +141,3 @@ // Manually pipe the stream

}
else {
await controller.enqueue(value);
}
await controller.enqueue(value);
}

@@ -132,0 +144,0 @@ });

import { PromiseResolver } from "@yume-chan/async";
import { EMPTY_UINT8_ARRAY } from "@yume-chan/struct";
import { EmptyUint8Array } from "@yume-chan/struct";
import { ReadableStream, WritableStream } from "./stream.js";

@@ -82,3 +82,3 @@ // `TransformStream` only calls its `source.flush` method when its `readable` is being read.

case 0:
result = EMPTY_UINT8_ARRAY;
result = EmptyUint8Array;
break;

@@ -85,0 +85,0 @@ case 1:

import type { QueuingStrategy, WritableStreamDefaultController, WritableStreamDefaultWriter } from "./stream.js";
import { ReadableStream as NativeReadableStream, WritableStream as NativeWritableStream } from "./stream.js";
declare class WritableStream<in T> extends NativeWritableStream<Consumable<T>> {
static write<T>(writer: WritableStreamDefaultWriter<Consumable<T>>, value: T): Promise<void>;
constructor(sink: Consumable.WritableStreamSink<T>, strategy?: QueuingStrategy<T>);
}
declare class ReadableStream<T> extends NativeReadableStream<Consumable<T>> {
static enqueue<T>(controller: {
enqueue: (chunk: Consumable<T>) => void;
}, chunk: T): Promise<void>;
constructor(source: Consumable.ReadableStreamSource<T>, strategy?: QueuingStrategy<T>);
}
export declare class Consumable<T> {
#private;
static readonly WritableStream: typeof WritableStream;
static readonly ReadableStream: typeof ReadableStream;
readonly value: T;

@@ -19,6 +31,3 @@ readonly consumed: Promise<void>;

}
class WritableStream<in T> extends NativeWritableStream<Consumable<T>> {
static write<T>(writer: WritableStreamDefaultWriter<Consumable<T>>, value: T): Promise<void>;
constructor(sink: WritableStreamSink<T>, strategy?: QueuingStrategy<T>);
}
type WritableStream<in T> = typeof Consumable.WritableStream<T>;
interface ReadableStreamController<T> {

@@ -34,9 +43,5 @@ enqueue(chunk: T): Promise<void>;

}
class ReadableStream<T> extends NativeReadableStream<Consumable<T>> {
static enqueue<T>(controller: {
enqueue: (chunk: Consumable<T>) => void;
}, chunk: T): Promise<void>;
constructor(source: ReadableStreamSource<T>, strategy?: QueuingStrategy<T>);
}
type ReadableStream<T> = typeof Consumable.ReadableStream<T>;
}
export {};
//# sourceMappingURL=consumable.d.ts.map

@@ -1,8 +0,87 @@

import { PromiseResolver } from "@yume-chan/async";
import { PromiseResolver, isPromiseLike } from "@yume-chan/async";
import { ReadableStream as NativeReadableStream, WritableStream as NativeWritableStream, } from "./stream.js";
import { createTask } from "./task.js";
function isPromiseLike(value) {
return typeof value === "object" && value !== null && "then" in value;
// Workaround https://github.com/evanw/esbuild/issues/3923
class WritableStream extends NativeWritableStream {
static async write(writer, value) {
const consumable = new Consumable(value);
await writer.write(consumable);
await consumable.consumed;
}
constructor(sink, strategy) {
let wrappedStrategy;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size(chunk instanceof Consumable ? chunk.value : chunk);
};
}
}
super({
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await chunk.tryConsume((chunk) => sink.write?.(chunk, controller));
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
}, wrappedStrategy);
}
}
class ReadableStream extends NativeReadableStream {
static async enqueue(controller, chunk) {
const output = new Consumable(chunk);
controller.enqueue(output);
await output.consumed;
}
constructor(source, strategy) {
let wrappedController;
let wrappedStrategy;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size(chunk.value);
};
}
}
super({
async start(controller) {
wrappedController = {
async enqueue(chunk) {
await ReadableStream.enqueue(controller, chunk);
},
close() {
controller.close();
},
error(reason) {
controller.error(reason);
},
};
await source.start?.(wrappedController);
},
async pull() {
await source.pull?.(wrappedController);
},
async cancel(reason) {
await source.cancel?.(reason);
},
}, wrappedStrategy);
}
}
export class Consumable {
static WritableStream = WritableStream;
static ReadableStream = ReadableStream;
#task;

@@ -47,85 +126,2 @@ #resolver;

}
(function (Consumable) {
class WritableStream extends NativeWritableStream {
static async write(writer, value) {
const consumable = new Consumable(value);
await writer.write(consumable);
await consumable.consumed;
}
constructor(sink, strategy) {
let wrappedStrategy;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size(chunk instanceof Consumable ? chunk.value : chunk);
};
}
}
super({
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await chunk.tryConsume((chunk) => sink.write?.(chunk, controller));
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
}, wrappedStrategy);
}
}
Consumable.WritableStream = WritableStream;
class ReadableStream extends NativeReadableStream {
static async enqueue(controller, chunk) {
const output = new Consumable(chunk);
controller.enqueue(output);
await output.consumed;
}
constructor(source, strategy) {
let wrappedController;
let wrappedStrategy;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size(chunk.value);
};
}
}
super({
async start(controller) {
wrappedController = {
async enqueue(chunk) {
await ReadableStream.enqueue(controller, chunk);
},
close() {
controller.close();
},
error(reason) {
controller.error(reason);
},
};
await source.start?.(wrappedController);
},
async pull() {
await source.pull?.(wrappedController);
},
async cancel(reason) {
await source.cancel?.(reason);
},
}, wrappedStrategy);
}
}
Consumable.ReadableStream = ReadableStream;
})(Consumable || (Consumable = {}));
//# sourceMappingURL=consumable.js.map

@@ -1,2 +0,2 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import type { MaybePromiseLike } from "@yume-chan/async";
import type { QueuingStrategy, ReadableStream } from "./stream.js";

@@ -18,3 +18,3 @@ import { WritableStream } from "./stream.js";

*/
close?: (() => ValueOrPromise<boolean | void>) | undefined;
close?: (() => MaybePromiseLike<boolean | void>) | undefined;
/**

@@ -21,0 +21,0 @@ * Callback when any `ReadableStream` is closed (the other peer doesn't produce any more data),

import { PromiseResolver } from "@yume-chan/async";
import { WritableStream } from "./stream.js";
import { tryClose } from "./try-close.js";
import { WrapReadableStream } from "./wrap-readable.js";

@@ -83,8 +84,3 @@ const NOOP = () => {

for (const controller of this.#readableControllers) {
try {
controller.close();
}
catch {
// ignore
}
tryClose(controller);
}

@@ -91,0 +87,0 @@ await this.#options.dispose?.();

@@ -17,4 +17,5 @@ export * from "./buffered-transform.js";

export * from "./task.js";
export * from "./try-close.js";
export * from "./wrap-readable.js";
export * from "./wrap-writable.js";
//# sourceMappingURL=index.d.ts.map

@@ -17,4 +17,5 @@ export * from "./buffered-transform.js";

export * from "./task.js";
export * from "./try-close.js";
export * from "./wrap-readable.js";
export * from "./wrap-writable.js";
//# sourceMappingURL=index.js.map

@@ -1,21 +0,4 @@

import { Consumable } from "./consumable.js";
import type { QueuingStrategy, WritableStreamDefaultController } from "./stream.js";
import { WritableStream as NativeWritableStream, TransformStream } from "./stream.js";
import type { Consumable } from "./consumable.js";
export type MaybeConsumable<T> = T | Consumable<T>;
export declare namespace MaybeConsumable {
function getValue<T>(value: MaybeConsumable<T>): T;
function tryConsume<T, R>(value: T, callback: (value: T extends Consumable<infer U> ? U : T) => R): R;
class UnwrapStream<T> extends TransformStream<MaybeConsumable<T>, T> {
constructor();
}
interface WritableStreamSink<in T> {
start?(controller: WritableStreamDefaultController): void | PromiseLike<void>;
write?(chunk: T, controller: WritableStreamDefaultController): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
class WritableStream<in T> extends NativeWritableStream<MaybeConsumable<T>> {
constructor(sink: WritableStreamSink<T>, strategy?: QueuingStrategy<T>);
}
}
export * as MaybeConsumable from "./maybe-consumable-ns.js";
//# sourceMappingURL=maybe-consumable.d.ts.map

@@ -1,62 +0,2 @@

import { Consumable } from "./consumable.js";
import { WritableStream as NativeWritableStream, TransformStream, } from "./stream.js";
export var MaybeConsumable;
(function (MaybeConsumable) {
function getValue(value) {
return value instanceof Consumable ? value.value : value;
}
MaybeConsumable.getValue = getValue;
function tryConsume(value, callback) {
if (value instanceof Consumable) {
return value.tryConsume(callback);
}
else {
return callback(value);
}
}
MaybeConsumable.tryConsume = tryConsume;
class UnwrapStream extends TransformStream {
constructor() {
super({
transform(chunk, controller) {
MaybeConsumable.tryConsume(chunk, (chunk) => {
controller.enqueue(chunk);
});
},
});
}
}
MaybeConsumable.UnwrapStream = UnwrapStream;
class WritableStream extends NativeWritableStream {
constructor(sink, strategy) {
let wrappedStrategy;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size(chunk instanceof Consumable ? chunk.value : chunk);
};
}
}
super({
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await MaybeConsumable.tryConsume(chunk, (chunk) => sink.write?.(chunk, controller));
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
}, wrappedStrategy);
}
}
MaybeConsumable.WritableStream = WritableStream;
})(MaybeConsumable || (MaybeConsumable = {}));
export * as MaybeConsumable from "./maybe-consumable-ns.js";
//# sourceMappingURL=maybe-consumable.js.map

@@ -10,4 +10,18 @@ import type { AbortSignal, QueuingStrategy } from "./stream.js";

export type PushReadableStreamSource<T> = (controller: PushReadableStreamController<T>) => void | Promise<void>;
export type PushReadableLogger<T> = (event: {
source: "producer";
operation: "enqueue";
value: T;
phase: "start" | "waiting" | "ignored" | "complete";
} | {
source: "producer";
operation: "close" | "error";
explicit: boolean;
phase: "start" | "ignored" | "complete";
} | {
source: "consumer";
operation: "pull" | "cancel";
phase: "start" | "complete";
}) => void;
export declare class PushReadableStream<T> extends ReadableStream<T> {
#private;
/**

@@ -20,4 +34,4 @@ * Create a new `PushReadableStream` from a source.

*/
constructor(source: PushReadableStreamSource<T>, strategy?: QueuingStrategy<T>);
constructor(source: PushReadableStreamSource<T>, strategy?: QueuingStrategy<T>, logger?: PushReadableLogger<T>);
}
//# sourceMappingURL=push-readable.d.ts.map
import { PromiseResolver } from "@yume-chan/async";
import { AbortController, ReadableStream } from "./stream.js";
export class PushReadableStream extends ReadableStream {
#zeroHighWaterMarkAllowEnqueue = false;
/**

@@ -12,15 +11,42 @@ * Create a new `PushReadableStream` from a source.

*/
constructor(source, strategy) {
constructor(source, strategy, logger) {
let waterMarkLow;
let zeroHighWaterMarkAllowEnqueue = false;
const abortController = new AbortController();
super({
start: async (controller) => {
await Promise.resolve();
start: (controller) => {
const result = source({
abortSignal: abortController.signal,
enqueue: async (chunk) => {
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "start",
});
if (abortController.signal.aborted) {
// If the stream is already cancelled,
// throw immediately.
throw abortController.signal.reason;
// In original `ReadableStream`, calling `enqueue` or `close`
// on an cancelled stream will throw an error,
//
// But in `PushReadableStream`, `enqueue` is an async function,
// the producer can't just check `abortSignal.aborted`
// before calling `enqueue`, as it might change when waiting
// for the backpressure to be reduced.
//
// So IMO it's better to handle this for the producer
// by simply ignoring the `enqueue` call.
//
// Note that we check `abortSignal.aborted` instead of `stopped`,
// as it's not allowed for the producer to call `enqueue` after
// they called `close` or `error`.
//
// Obviously, the producer should listen to the `abortSignal` and
// stop producing, but most pushing data sources don't support that.
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "ignored",
});
return;
}

@@ -31,34 +57,147 @@ if (controller.desiredSize === null) {

controller.enqueue(chunk);
// istanbul ignore next
return;
}
if (this.#zeroHighWaterMarkAllowEnqueue) {
this.#zeroHighWaterMarkAllowEnqueue = false;
if (zeroHighWaterMarkAllowEnqueue) {
// When `highWaterMark` is set to `0`,
// `controller.desiredSize` will always be `0`,
// even if the consumer has called `reader.read()`.
// (in this case, each `reader.read()`/`pull`
// should allow one `enqueue` of any size)
//
// If the consumer has already called `reader.read()`,
// before the producer tries to `enqueue`,
// `controller.desiredSize` is `0` and normal `waterMarkLow` signal
// will never trigger,
// (because `ReadableStream` prevents reentrance of `pull`)
// The stream will stuck.
//
// So we need a special signal for this case.
zeroHighWaterMarkAllowEnqueue = false;
controller.enqueue(chunk);
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "complete",
});
return;
}
if (controller.desiredSize <= 0) {
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "waiting",
});
waterMarkLow = new PromiseResolver();
await waterMarkLow.promise;
// Recheck consumer cancellation after async operations.
if (abortController.signal.aborted) {
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "ignored",
});
return;
}
}
// `controller.enqueue` will throw error for us
// if the stream is already errored.
controller.enqueue(chunk);
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "complete",
});
},
close() {
logger?.({
source: "producer",
operation: "close",
explicit: true,
phase: "start",
});
// Since `enqueue` on an cancelled stream won't throw an error,
// so does `close`.
if (abortController.signal.aborted) {
logger?.({
source: "producer",
operation: "close",
explicit: true,
phase: "ignored",
});
return;
}
controller.close();
logger?.({
source: "producer",
operation: "close",
explicit: true,
phase: "complete",
});
},
error(e) {
logger?.({
source: "producer",
operation: "error",
explicit: true,
phase: "start",
});
// Calling `error` on an already closed or errored stream is a no-op.
controller.error(e);
logger?.({
source: "producer",
operation: "error",
explicit: true,
phase: "complete",
});
},
});
if (result && "then" in result) {
// If `source` returns a `Promise`,
// close the stream when the `Promise` is resolved,
// and error the stream when the `Promise` is rejected.
// The producer can return a never-settling `Promise`
// to disable this behavior.
result.then(() => {
logger?.({
source: "producer",
operation: "close",
explicit: false,
phase: "start",
});
try {
controller.close();
logger?.({
source: "producer",
operation: "close",
explicit: false,
phase: "complete",
});
}
catch (e) {
// controller already closed
catch {
logger?.({
source: "producer",
operation: "close",
explicit: false,
phase: "ignored",
});
// The stream is already closed by the producer,
// Or cancelled by the consumer.
}
}, (e) => {
logger?.({
source: "producer",
operation: "error",
explicit: false,
phase: "start",
});
controller.error(e);
logger?.({
source: "producer",
operation: "error",
explicit: false,
phase: "complete",
});
});

@@ -68,13 +207,33 @@ }

pull: () => {
logger?.({
source: "consumer",
operation: "pull",
phase: "start",
});
if (waterMarkLow) {
waterMarkLow.resolve();
return;
}
if (strategy?.highWaterMark === 0) {
this.#zeroHighWaterMarkAllowEnqueue = true;
else if (strategy?.highWaterMark === 0) {
zeroHighWaterMarkAllowEnqueue = true;
}
logger?.({
source: "consumer",
operation: "pull",
phase: "complete",
});
},
cancel: (reason) => {
logger?.({
source: "consumer",
operation: "cancel",
phase: "start",
});
abortController.abort(reason);
waterMarkLow?.reject(reason);
// Resolve it on cancellation. `pull` will check `abortSignal.aborted` again.
waterMarkLow?.resolve();
logger?.({
source: "consumer",
operation: "cancel",
phase: "complete",
});
},

@@ -81,0 +240,0 @@ }, strategy);

import type { AbortSignal, ReadableStream as ReadableStreamType, TransformStream as TransformStreamType, WritableStream as WritableStreamType } from "./types.js";
export * from "./types.js";
export { ReadableStream };
/** A controller object that allows you to abort one or more DOM requests as and when desired. */

@@ -19,8 +20,7 @@ export interface AbortController {

export declare const AbortController: AbortControllerConstructor;
export type ReadableStream<out T> = ReadableStreamType<T>;
export declare const ReadableStream: typeof ReadableStreamType;
export type WritableStream<in T> = WritableStreamType<T>;
export declare const WritableStream: typeof WritableStreamType;
export type ReadableStream<T> = ReadableStreamType<T>;
export type WritableStream<T> = WritableStreamType<T>;
export type TransformStream<I, O> = TransformStreamType<I, O>;
export declare const TransformStream: typeof TransformStreamType;
declare const ReadableStream: typeof ReadableStreamType;
export declare const WritableStream: typeof WritableStreamType, TransformStream: typeof TransformStreamType;
//# sourceMappingURL=stream.d.ts.map
export * from "./types.js";
const Global = globalThis;
export const AbortController = Global.AbortController;
export const ReadableStream = Global.ReadableStream;
export const WritableStream = Global.WritableStream;
export const TransformStream = Global.TransformStream;
export { ReadableStream };
export const { AbortController } = globalThis;
const ReadableStream = /* #__PURE__ */ (() => {
const { ReadableStream } = globalThis;
if (!ReadableStream.from) {
ReadableStream.from = function (iterable) {
const iterator = Symbol.asyncIterator in iterable
? iterable[Symbol.asyncIterator]()
: iterable[Symbol.iterator]();
return new ReadableStream({
async pull(controller) {
const result = await iterator.next();
if (result.done) {
controller.close();
return;
}
controller.enqueue(result.value);
},
async cancel(reason) {
await iterator.return?.(reason);
},
});
};
}
if (!ReadableStream.prototype[Symbol.asyncIterator] ||
!ReadableStream.prototype.values) {
ReadableStream.prototype.values = async function* (options) {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
// Calling `iterator.return` will enter this `finally` block.
// We don't need to care about the parameter to `iterator.return`,
// it will be returned as the final `result.value` automatically.
if (!options?.preventCancel) {
await reader.cancel();
}
reader.releaseLock();
}
};
ReadableStream.prototype[Symbol.asyncIterator] =
// eslint-disable-next-line @typescript-eslint/unbound-method
ReadableStream.prototype.values;
}
return ReadableStream;
})();
export const { WritableStream, TransformStream } = globalThis;
//# sourceMappingURL=stream.js.map

@@ -1,7 +0,6 @@

import type Struct from "@yume-chan/struct";
import type { StructValueType } from "@yume-chan/struct";
import type { StructLike } from "@yume-chan/struct";
import { BufferedTransformStream } from "./buffered-transform.js";
export declare class StructDeserializeStream<T extends Struct<object, PropertyKey, object, unknown>> extends BufferedTransformStream<StructValueType<T>> {
constructor(struct: T);
export declare class StructDeserializeStream<T> extends BufferedTransformStream<T> {
constructor(struct: StructLike<T>);
}
//# sourceMappingURL=struct-deserialize.d.ts.map

@@ -1,6 +0,6 @@

import type Struct from "@yume-chan/struct";
import type { StructInit, StructLike } from "@yume-chan/struct";
import { TransformStream } from "./stream.js";
export declare class StructSerializeStream<T extends Struct<object, PropertyKey, object, unknown>> extends TransformStream<T["TInit"], Uint8Array> {
export declare class StructSerializeStream<T extends StructLike<unknown>> extends TransformStream<StructInit<T>, Uint8Array> {
constructor(struct: T);
}
//# sourceMappingURL=struct-serialize.d.ts.map
// `createTask` allows browser DevTools to track the call stack across async boundaries.
const { console } = globalThis;
export const createTask = console?.createTask?.bind(console) ??
export const createTask = /* #__PURE__ */ (() => console?.createTask?.bind(console) ??
(() => ({

@@ -8,3 +8,3 @@ run(callback) {

},
}));
})))();
//# sourceMappingURL=task.js.map

@@ -1,2 +0,1 @@

/// <reference lib="es2018.asynciterable" />
/**

@@ -220,3 +219,3 @@ * A signal object that allows you to communicate with a request and abort it if required

*/
static from<R>(asyncIterable: Iterable<R> | AsyncIterable<R> | ReadableStreamLike<R>): ReadableStream<R>;
static from<R>(asyncIterable: Iterable<R> | AsyncIterable<R>): ReadableStream<R>;
}

@@ -229,3 +228,3 @@ /**

export declare interface ReadableStreamAsyncIterator<R> extends AsyncIterableIterator<R> {
next(): Promise<IteratorResult<R, undefined>>;
next(): Promise<IteratorResult<R, void>>;
return(value?: R): Promise<IteratorResult<R>>;

@@ -232,0 +231,0 @@ }

@@ -1,9 +0,10 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import type { MaybePromiseLike } from "@yume-chan/async";
import type { QueuingStrategy, ReadableStreamDefaultController } from "./stream.js";
import { ReadableStream } from "./stream.js";
export type WrapReadableStreamStart<T> = (controller: ReadableStreamDefaultController<T>) => ValueOrPromise<ReadableStream<T>>;
export type WrapReadableStreamStart<T> = (controller: ReadableStreamDefaultController<T>) => MaybePromiseLike<ReadableStream<T>>;
export interface ReadableStreamWrapper<T> {
start: WrapReadableStreamStart<T>;
cancel?(reason?: unknown): ValueOrPromise<void>;
close?(): ValueOrPromise<void>;
cancel?: (reason?: unknown) => MaybePromiseLike<void>;
close?: () => MaybePromiseLike<void>;
error?: (reason?: unknown) => MaybePromiseLike<void>;
}

@@ -10,0 +11,0 @@ /**

@@ -28,13 +28,18 @@ import { ReadableStream } from "./stream.js";

start: async (controller) => {
// `start` is invoked before `ReadableStream`'s constructor finish,
// so using `this` synchronously causes
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor".
// Queue a microtask to avoid this.
await Promise.resolve();
this.readable = await getWrappedReadableStream(wrapper, controller);
const readable = await getWrappedReadableStream(wrapper, controller);
// `start` is called in `super()`, so can't use `this` synchronously.
// but it's fine after the first `await`
this.readable = readable;
this.#reader = this.readable.getReader();
},
pull: async (controller) => {
const result = await this.#reader.read();
if (result.done) {
const { done, value } = await this.#reader
.read()
.catch((e) => {
if ("error" in wrapper) {
wrapper.error(e);
}
throw e;
});
if (done) {
controller.close();

@@ -46,3 +51,3 @@ if ("close" in wrapper) {

else {
controller.enqueue(result.value);
controller.enqueue(value);
}

@@ -49,0 +54,0 @@ },

@@ -1,5 +0,5 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import type { MaybePromiseLike } from "@yume-chan/async";
import type { TransformStream } from "./stream.js";
import { WritableStream } from "./stream.js";
export type WrapWritableStreamStart<T> = () => ValueOrPromise<WritableStream<T>>;
export type WrapWritableStreamStart<T> = () => MaybePromiseLike<WritableStream<T>>;
export interface WritableStreamWrapper<T> {

@@ -6,0 +6,0 @@ start: WrapWritableStreamStart<T>;

@@ -21,8 +21,6 @@ import { WritableStream } from "./stream.js";

start: async () => {
// `start` is invoked before `ReadableStream`'s constructor finish,
// so using `this` synchronously causes
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor".
// Queue a microtask to avoid this.
await Promise.resolve();
this.writable = await getWrappedWritableStream(start);
const writable = await getWrappedWritableStream(start);
// `start` is called in `super()`, so can't use `this` synchronously.
// but it's fine after the first `await`
this.writable = writable;
this.#writer = this.writable.getWriter();

@@ -29,0 +27,0 @@ },

{
"name": "@yume-chan/stream-extra",
"version": "0.0.24",
"version": "1.0.0",
"description": "Extensions to Web Streams API",

@@ -27,22 +27,20 @@ "keywords": [

"types": "esm/index.d.ts",
"sideEffects": false,
"dependencies": {
"@yume-chan/async": "^2.2.0",
"@yume-chan/struct": "^0.0.24"
"@yume-chan/async": "^4.0.2",
"@yume-chan/struct": "^1.0.0"
},
"devDependencies": {
"@jest/globals": "^30.0.0-alpha.4",
"@types/node": "^22.10.0",
"prettier": "^3.4.1",
"typescript": "^5.7.2",
"@yume-chan/test-runner": "^1.0.0",
"@yume-chan/eslint-config": "^1.0.0",
"@yume-chan/tsconfig": "^1.0.0",
"cross-env": "^7.0.3",
"jest": "^30.0.0-alpha.4",
"prettier": "^3.3.2",
"ts-jest": "^29.1.4",
"typescript": "^5.4.5"
"@yume-chan/tsconfig": "^1.0.0"
},
"scripts": {
"build": "tsc -b tsconfig.build.json",
"build:watch": "tsc -b tsconfig.build.json",
"test": "cross-env NODE_OPTIONS=\"--experimental-vm-modules --no-warnings\" TS_JEST_DISABLE_VER_CHECKER=true jest --coverage",
"lint": "run-eslint && prettier src/**/*.ts --write --tab-width 4"
"lint": "run-eslint && prettier src/**/*.ts --write --tab-width 4",
"test": "run-test"
}
}

@@ -1,2 +0,2 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import type { MaybePromiseLike } from "@yume-chan/async";
import { StructEmptyError } from "@yume-chan/struct";

@@ -25,3 +25,3 @@

constructor(
transform: (stream: BufferedReadableStream) => ValueOrPromise<T>,
transform: (stream: BufferedReadableStream) => MaybePromiseLike<T>,
) {

@@ -28,0 +28,0 @@ // Convert incoming chunks to a `BufferedReadableStream`

@@ -0,13 +1,13 @@

import type { MaybePromiseLike } from "@yume-chan/async";
import type { AsyncExactReadable } from "@yume-chan/struct";
import { ExactReadableEndedError } from "@yume-chan/struct";
import { bipedal, ExactReadableEndedError } from "@yume-chan/struct";
import { PushReadableStream } from "./push-readable.js";
import type { ReadableStream, ReadableStreamDefaultReader } from "./stream.js";
import { tryCancel } from "./try-close.js";
const NOOP = () => {
// no-op
};
export class BufferedReadableStream implements AsyncExactReadable {
#buffered: Uint8Array | undefined;
// PERF: `subarray` is slow
// don't use it until absolutely necessary
#bufferedOffset = 0;

@@ -29,3 +29,28 @@ #bufferedLength = 0;

async #readSource() {
#readBuffered(length: number) {
if (!this.#buffered) {
return undefined;
}
const value = this.#buffered.subarray(
this.#bufferedOffset,
this.#bufferedOffset + length,
);
// PERF: Synchronous path for reading from internal buffer
if (this.#bufferedLength > length) {
this.#position += length;
this.#bufferedOffset += length;
this.#bufferedLength -= length;
return value;
}
this.#position += this.#bufferedLength;
this.#buffered = undefined;
this.#bufferedOffset = 0;
this.#bufferedLength = 0;
return value;
}
async #readSource(length: number): Promise<Uint8Array> {
const { done, value } = await this.reader.read();

@@ -35,92 +60,92 @@ if (done) {

}
if (value.length > length) {
this.#buffered = value;
this.#bufferedOffset = length;
this.#bufferedLength = value.length - length;
this.#position += length;
return value.subarray(0, length);
}
this.#position += value.length;
return value;
}
async #readAsync(length: number, initial?: Uint8Array) {
let result: Uint8Array;
let index: number;
iterateExactly(
length: number,
): Iterator<MaybePromiseLike<Uint8Array>, void, void> {
let state = this.#buffered ? 0 : 1;
return {
next: () => {
switch (state) {
case 0: {
const value = this.#readBuffered(length)!;
if (value.length === length) {
state = 2;
} else {
length -= value.length;
state = 1;
}
return { done: false, value };
}
case 1:
state = 3;
return {
done: false,
value: this.#readSource(length).then((value) => {
if (value.length === length) {
state = 2;
} else {
length -= value.length;
state = 1;
}
return value;
}),
};
case 2:
return { done: true, value: undefined };
case 3:
throw new Error(
"Can't call `next` before previous Promise resolves",
);
default:
throw new Error("unreachable");
}
},
};
}
readExactly = bipedal(function* (
this: BufferedReadableStream,
then,
length: number,
) {
let result: Uint8Array | undefined;
let index = 0;
const initial = this.#readBuffered(length);
if (initial) {
if (initial.length === length) {
return initial;
}
result = new Uint8Array(length);
result.set(initial);
index = initial.length;
result.set(initial, index);
index += initial.length;
length -= initial.length;
} else {
const array = await this.#readSource();
if (array.length === length) {
this.#position += length;
return array;
}
if (array.length > length) {
this.#buffered = array;
this.#bufferedOffset = length;
this.#bufferedLength = array.length - length;
this.#position += length;
return array.subarray(0, length);
}
result = new Uint8Array(length);
result.set(array);
index = array.length;
length -= array.length;
this.#position += array.length;
}
while (length > 0) {
const array = await this.#readSource();
if (array.length === length) {
result.set(array, index);
this.#position += length;
return result;
}
if (array.length > length) {
this.#buffered = array;
this.#bufferedOffset = length;
this.#bufferedLength = array.length - length;
result.set(array.subarray(0, length), index);
this.#position += length;
return result;
}
result.set(array, index);
index += array.length;
length -= array.length;
this.#position += array.length;
const value = yield* then(this.#readSource(length));
result.set(value, index);
index += value.length;
length -= value.length;
}
return result;
}
});
/**
*
* @param length
* @returns
*/
readExactly(length: number): Uint8Array | Promise<Uint8Array> {
// PERF: Add a synchronous path for reading from internal buffer
if (this.#buffered) {
const array = this.#buffered;
const offset = this.#bufferedOffset;
if (this.#bufferedLength > length) {
// PERF: `subarray` is slow
// don't use it until absolutely necessary
this.#bufferedOffset += length;
this.#bufferedLength -= length;
this.#position += length;
return array.subarray(offset, offset + length);
}
this.#buffered = undefined;
this.#bufferedLength = 0;
this.#bufferedOffset = 0;
this.#position += array.length - offset;
return this.#readAsync(length, array.subarray(offset));
}
return this.#readAsync(length);
}
/**
* Return a readable stream with unconsumed data (if any) and

@@ -138,4 +163,3 @@ * all data from the wrapped stream.

controller.abortSignal.addEventListener("abort", () => {
// NOOP: the reader might already be released
this.reader.cancel().catch(NOOP);
void tryCancel(this.reader);
});

@@ -148,5 +172,5 @@

return;
} else {
await controller.enqueue(value);
}
await controller.enqueue(value);
}

@@ -153,0 +177,0 @@ });

import { PromiseResolver } from "@yume-chan/async";
import { EMPTY_UINT8_ARRAY } from "@yume-chan/struct";
import { EmptyUint8Array } from "@yume-chan/struct";

@@ -104,3 +104,3 @@ import type { ReadableStreamDefaultController } from "./stream.js";

case 0:
result = EMPTY_UINT8_ARRAY;
result = EmptyUint8Array;
break;

@@ -107,0 +107,0 @@ case 1:

@@ -1,2 +0,2 @@

import { PromiseResolver } from "@yume-chan/async";
import { PromiseResolver, isPromiseLike } from "@yume-chan/async";

@@ -15,7 +15,119 @@ import type {

function isPromiseLike(value: unknown): value is PromiseLike<unknown> {
return typeof value === "object" && value !== null && "then" in value;
// Workaround https://github.com/evanw/esbuild/issues/3923
class WritableStream<in T> extends NativeWritableStream<Consumable<T>> {
static async write<T>(
writer: WritableStreamDefaultWriter<Consumable<T>>,
value: T,
) {
const consumable = new Consumable(value);
await writer.write(consumable);
await consumable.consumed;
}
constructor(
sink: Consumable.WritableStreamSink<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(
chunk instanceof Consumable ? chunk.value : chunk,
);
};
}
}
super(
{
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await chunk.tryConsume((chunk) =>
sink.write?.(chunk, controller),
);
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
},
wrappedStrategy,
);
}
}
class ReadableStream<T> extends NativeReadableStream<Consumable<T>> {
static async enqueue<T>(
controller: { enqueue: (chunk: Consumable<T>) => void },
chunk: T,
) {
const output = new Consumable(chunk);
controller.enqueue(output);
await output.consumed;
}
constructor(
source: Consumable.ReadableStreamSource<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedController:
| Consumable.ReadableStreamController<T>
| undefined;
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(chunk.value);
};
}
}
super(
{
async start(controller) {
wrappedController = {
async enqueue(chunk) {
await ReadableStream.enqueue(controller, chunk);
},
close() {
controller.close();
},
error(reason) {
controller.error(reason);
},
};
await source.start?.(wrappedController);
},
async pull() {
await source.pull?.(wrappedController!);
},
async cancel(reason) {
await source.cancel?.(reason);
},
},
wrappedStrategy,
);
}
}
export class Consumable<T> {
static readonly WritableStream = WritableStream;
static readonly ReadableStream = ReadableStream;
readonly #task: Task;

@@ -80,55 +192,4 @@ readonly #resolver: PromiseResolver<void>;

export class WritableStream<in T> extends NativeWritableStream<
Consumable<T>
> {
static async write<T>(
writer: WritableStreamDefaultWriter<Consumable<T>>,
value: T,
) {
const consumable = new Consumable(value);
await writer.write(consumable);
await consumable.consumed;
}
export type WritableStream<in T> = typeof Consumable.WritableStream<T>;
constructor(
sink: WritableStreamSink<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(
chunk instanceof Consumable ? chunk.value : chunk,
);
};
}
}
super(
{
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await chunk.tryConsume((chunk) =>
sink.write?.(chunk, controller),
);
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
},
wrappedStrategy,
);
}
}
export interface ReadableStreamController<T> {

@@ -150,59 +211,3 @@ enqueue(chunk: T): Promise<void>;

export class ReadableStream<T> extends NativeReadableStream<Consumable<T>> {
static async enqueue<T>(
controller: { enqueue: (chunk: Consumable<T>) => void },
chunk: T,
) {
const output = new Consumable(chunk);
controller.enqueue(output);
await output.consumed;
}
constructor(
source: ReadableStreamSource<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedController: ReadableStreamController<T> | undefined;
let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(chunk.value);
};
}
}
super(
{
async start(controller) {
wrappedController = {
async enqueue(chunk) {
await ReadableStream.enqueue(controller, chunk);
},
close() {
controller.close();
},
error(reason) {
controller.error(reason);
},
};
await source.start?.(wrappedController);
},
async pull() {
await source.pull?.(wrappedController!);
},
async cancel(reason) {
await source.cancel?.(reason);
},
},
wrappedStrategy,
);
}
}
export type ReadableStream<T> = typeof Consumable.ReadableStream<T>;
}

@@ -0,3 +1,3 @@

import type { MaybePromiseLike } from "@yume-chan/async";
import { PromiseResolver } from "@yume-chan/async";
import type { ValueOrPromise } from "@yume-chan/struct";

@@ -11,2 +11,3 @@ import type {

import { WritableStream } from "./stream.js";
import { tryClose } from "./try-close.js";
import { WrapReadableStream } from "./wrap-readable.js";

@@ -31,3 +32,3 @@

*/
close?: (() => ValueOrPromise<boolean | void>) | undefined;
close?: (() => MaybePromiseLike<boolean | void>) | undefined;

@@ -139,7 +140,3 @@ /**

for (const controller of this.#readableControllers) {
try {
controller.close();
} catch {
// ignore
}
tryClose(controller);
}

@@ -146,0 +143,0 @@

@@ -17,3 +17,4 @@ export * from "./buffered-transform.js";

export * from "./task.js";
export * from "./try-close.js";
export * from "./wrap-readable.js";
export * from "./wrap-writable.js";

@@ -1,101 +0,5 @@

import { Consumable } from "./consumable.js";
import type {
QueuingStrategy,
WritableStreamDefaultController,
} from "./stream.js";
import {
WritableStream as NativeWritableStream,
TransformStream,
} from "./stream.js";
import type { Consumable } from "./consumable.js";
export type MaybeConsumable<T> = T | Consumable<T>;
export namespace MaybeConsumable {
export function getValue<T>(value: MaybeConsumable<T>): T {
return value instanceof Consumable ? value.value : value;
}
export function tryConsume<T, R>(
value: T,
callback: (value: T extends Consumable<infer U> ? U : T) => R,
): R {
if (value instanceof Consumable) {
return value.tryConsume(callback);
} else {
return callback(value as never);
}
}
export class UnwrapStream<T> extends TransformStream<
MaybeConsumable<T>,
T
> {
constructor() {
super({
transform(chunk, controller) {
MaybeConsumable.tryConsume(chunk, (chunk) => {
controller.enqueue(chunk as T);
});
},
});
}
}
export interface WritableStreamSink<in T> {
start?(
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
write?(
chunk: T,
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
export class WritableStream<in T> extends NativeWritableStream<
MaybeConsumable<T>
> {
constructor(
sink: WritableStreamSink<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedStrategy:
| QueuingStrategy<MaybeConsumable<T>>
| undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(
chunk instanceof Consumable ? chunk.value : chunk,
);
};
}
}
super(
{
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await MaybeConsumable.tryConsume(chunk, (chunk) =>
sink.write?.(chunk as T, controller),
);
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
},
wrappedStrategy,
);
}
}
}
export * as MaybeConsumable from "./maybe-consumable-ns.js";

@@ -20,5 +20,24 @@ import { PromiseResolver } from "@yume-chan/async";

export type PushReadableLogger<T> = (
event:
| {
source: "producer";
operation: "enqueue";
value: T;
phase: "start" | "waiting" | "ignored" | "complete";
}
| {
source: "producer";
operation: "close" | "error";
explicit: boolean;
phase: "start" | "ignored" | "complete";
}
| {
source: "consumer";
operation: "pull" | "cancel";
phase: "start" | "complete";
},
) => void;
export class PushReadableStream<T> extends ReadableStream<T> {
#zeroHighWaterMarkAllowEnqueue = false;
/**

@@ -34,4 +53,6 @@ * Create a new `PushReadableStream` from a source.

strategy?: QueuingStrategy<T>,
logger?: PushReadableLogger<T>,
) {
let waterMarkLow: PromiseResolver<void> | undefined;
let zeroHighWaterMarkAllowEnqueue = false;
const abortController = new AbortController();

@@ -41,12 +62,38 @@

{
start: async (controller) => {
await Promise.resolve();
start: (controller) => {
const result = source({
abortSignal: abortController.signal,
enqueue: async (chunk) => {
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "start",
});
if (abortController.signal.aborted) {
// If the stream is already cancelled,
// throw immediately.
throw abortController.signal.reason;
// In original `ReadableStream`, calling `enqueue` or `close`
// on an cancelled stream will throw an error,
//
// But in `PushReadableStream`, `enqueue` is an async function,
// the producer can't just check `abortSignal.aborted`
// before calling `enqueue`, as it might change when waiting
// for the backpressure to be reduced.
//
// So IMO it's better to handle this for the producer
// by simply ignoring the `enqueue` call.
//
// Note that we check `abortSignal.aborted` instead of `stopped`,
// as it's not allowed for the producer to call `enqueue` after
// they called `close` or `error`.
//
// Obviously, the producer should listen to the `abortSignal` and
// stop producing, but most pushing data sources don't support that.
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "ignored",
});
return;
}

@@ -58,8 +105,29 @@

controller.enqueue(chunk);
// istanbul ignore next
return;
}
if (this.#zeroHighWaterMarkAllowEnqueue) {
this.#zeroHighWaterMarkAllowEnqueue = false;
if (zeroHighWaterMarkAllowEnqueue) {
// When `highWaterMark` is set to `0`,
// `controller.desiredSize` will always be `0`,
// even if the consumer has called `reader.read()`.
// (in this case, each `reader.read()`/`pull`
// should allow one `enqueue` of any size)
//
// If the consumer has already called `reader.read()`,
// before the producer tries to `enqueue`,
// `controller.desiredSize` is `0` and normal `waterMarkLow` signal
// will never trigger,
// (because `ReadableStream` prevents reentrance of `pull`)
// The stream will stuck.
//
// So we need a special signal for this case.
zeroHighWaterMarkAllowEnqueue = false;
controller.enqueue(chunk);
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "complete",
});
return;

@@ -69,15 +137,77 @@ }

if (controller.desiredSize <= 0) {
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "waiting",
});
waterMarkLow = new PromiseResolver<void>();
await waterMarkLow.promise;
// Recheck consumer cancellation after async operations.
if (abortController.signal.aborted) {
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "ignored",
});
return;
}
}
// `controller.enqueue` will throw error for us
// if the stream is already errored.
controller.enqueue(chunk);
logger?.({
source: "producer",
operation: "enqueue",
value: chunk,
phase: "complete",
});
},
close() {
logger?.({
source: "producer",
operation: "close",
explicit: true,
phase: "start",
});
// Since `enqueue` on an cancelled stream won't throw an error,
// so does `close`.
if (abortController.signal.aborted) {
logger?.({
source: "producer",
operation: "close",
explicit: true,
phase: "ignored",
});
return;
}
controller.close();
logger?.({
source: "producer",
operation: "close",
explicit: true,
phase: "complete",
});
},
error(e) {
logger?.({
source: "producer",
operation: "error",
explicit: true,
phase: "start",
});
// Calling `error` on an already closed or errored stream is a no-op.
controller.error(e);
logger?.({
source: "producer",
operation: "error",
explicit: true,
phase: "complete",
});
},

@@ -87,12 +217,53 @@ });

if (result && "then" in result) {
// If `source` returns a `Promise`,
// close the stream when the `Promise` is resolved,
// and error the stream when the `Promise` is rejected.
// The producer can return a never-settling `Promise`
// to disable this behavior.
result.then(
() => {
logger?.({
source: "producer",
operation: "close",
explicit: false,
phase: "start",
});
try {
controller.close();
} catch (e) {
// controller already closed
logger?.({
source: "producer",
operation: "close",
explicit: false,
phase: "complete",
});
} catch {
logger?.({
source: "producer",
operation: "close",
explicit: false,
phase: "ignored",
});
// The stream is already closed by the producer,
// Or cancelled by the consumer.
}
},
(e) => {
logger?.({
source: "producer",
operation: "error",
explicit: false,
phase: "start",
});
controller.error(e);
logger?.({
source: "producer",
operation: "error",
explicit: false,
phase: "complete",
});
},

@@ -103,13 +274,36 @@ );

pull: () => {
logger?.({
source: "consumer",
operation: "pull",
phase: "start",
});
if (waterMarkLow) {
waterMarkLow.resolve();
return;
} else if (strategy?.highWaterMark === 0) {
zeroHighWaterMarkAllowEnqueue = true;
}
if (strategy?.highWaterMark === 0) {
this.#zeroHighWaterMarkAllowEnqueue = true;
}
logger?.({
source: "consumer",
operation: "pull",
phase: "complete",
});
},
cancel: (reason) => {
logger?.({
source: "consumer",
operation: "cancel",
phase: "start",
});
abortController.abort(reason);
waterMarkLow?.reject(reason);
// Resolve it on cancellation. `pull` will check `abortSignal.aborted` again.
waterMarkLow?.resolve();
logger?.({
source: "consumer",
operation: "cancel",
phase: "complete",
});
},

@@ -116,0 +310,0 @@ },

import type {
AbortSignal,
ReadableStreamIteratorOptions,
ReadableStream as ReadableStreamType,

@@ -9,2 +10,3 @@ TransformStream as TransformStreamType,

export * from "./types.js";
export { ReadableStream };

@@ -36,13 +38,71 @@ /** A controller object that allows you to abort one or more DOM requests as and when desired. */

const Global = globalThis as unknown as GlobalExtension;
export const { AbortController } = globalThis as unknown as GlobalExtension;
export const AbortController = Global.AbortController;
export type ReadableStream<T> = ReadableStreamType<T>;
export type WritableStream<T> = WritableStreamType<T>;
export type TransformStream<I, O> = TransformStreamType<I, O>;
export type ReadableStream<out T> = ReadableStreamType<T>;
export const ReadableStream = Global.ReadableStream;
const ReadableStream = /* #__PURE__ */ (() => {
const { ReadableStream } = globalThis as unknown as GlobalExtension;
export type WritableStream<in T> = WritableStreamType<T>;
export const WritableStream = Global.WritableStream;
if (!ReadableStream.from) {
ReadableStream.from = function (iterable) {
const iterator =
Symbol.asyncIterator in iterable
? iterable[Symbol.asyncIterator]()
: iterable[Symbol.iterator]();
export type TransformStream<I, O> = TransformStreamType<I, O>;
export const TransformStream = Global.TransformStream;
return new ReadableStream({
async pull(controller) {
const result = await iterator.next();
if (result.done) {
controller.close();
return;
}
controller.enqueue(result.value);
},
async cancel(reason) {
await iterator.return?.(reason);
},
});
};
}
if (
!ReadableStream.prototype[Symbol.asyncIterator] ||
!ReadableStream.prototype.values
) {
ReadableStream.prototype.values = async function* <R>(
this: ReadableStream<R>,
options?: ReadableStreamIteratorOptions,
) {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
return;
}
yield value;
}
} finally {
// Calling `iterator.return` will enter this `finally` block.
// We don't need to care about the parameter to `iterator.return`,
// it will be returned as the final `result.value` automatically.
if (!options?.preventCancel) {
await reader.cancel();
}
reader.releaseLock();
}
};
ReadableStream.prototype[Symbol.asyncIterator] =
// eslint-disable-next-line @typescript-eslint/unbound-method
ReadableStream.prototype.values;
}
return ReadableStream;
})();
export const { WritableStream, TransformStream } =
globalThis as unknown as GlobalExtension;

@@ -1,10 +0,7 @@

import type Struct from "@yume-chan/struct";
import type { StructValueType } from "@yume-chan/struct";
import type { StructLike } from "@yume-chan/struct";
import { BufferedTransformStream } from "./buffered-transform.js";
export class StructDeserializeStream<
T extends Struct<object, PropertyKey, object, unknown>,
> extends BufferedTransformStream<StructValueType<T>> {
constructor(struct: T) {
export class StructDeserializeStream<T> extends BufferedTransformStream<T> {
constructor(struct: StructLike<T>) {
super((stream) => {

@@ -11,0 +8,0 @@ return struct.deserialize(stream) as never;

@@ -1,2 +0,2 @@

import type Struct from "@yume-chan/struct";
import type { StructInit, StructLike } from "@yume-chan/struct";

@@ -6,4 +6,4 @@ import { TransformStream } from "./stream.js";

export class StructSerializeStream<
T extends Struct<object, PropertyKey, object, unknown>,
> extends TransformStream<T["TInit"], Uint8Array> {
T extends StructLike<unknown>,
> extends TransformStream<StructInit<T>, Uint8Array> {
constructor(struct: T) {

@@ -10,0 +10,0 @@ super({

@@ -15,3 +15,3 @@ export interface Task {

const { console } = globalThis as unknown as GlobalExtension;
export const createTask: (name: string) => Task =
export const createTask: (name: string) => Task = /* #__PURE__ */ (() =>
console?.createTask?.bind(console) ??

@@ -22,2 +22,2 @@ (() => ({

},
}));
})))();

@@ -245,3 +245,3 @@ /// <reference lib="es2018.asynciterable" />

static from<R>(
asyncIterable: Iterable<R> | AsyncIterable<R> | ReadableStreamLike<R>,
asyncIterable: Iterable<R> | AsyncIterable<R>,
): ReadableStream<R>;

@@ -257,3 +257,3 @@ }

extends AsyncIterableIterator<R> {
next(): Promise<IteratorResult<R, undefined>>;
next(): Promise<IteratorResult<R, void>>;
return(value?: R): Promise<IteratorResult<R>>;

@@ -260,0 +260,0 @@ }

@@ -1,2 +0,2 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import type { MaybePromiseLike } from "@yume-chan/async";

@@ -12,8 +12,9 @@ import type {

controller: ReadableStreamDefaultController<T>,
) => ValueOrPromise<ReadableStream<T>>;
) => MaybePromiseLike<ReadableStream<T>>;
export interface ReadableStreamWrapper<T> {
start: WrapReadableStreamStart<T>;
cancel?(reason?: unknown): ValueOrPromise<void>;
close?(): ValueOrPromise<void>;
cancel?: (reason?: unknown) => MaybePromiseLike<void>;
close?: () => MaybePromiseLike<void>;
error?: (reason?: unknown) => MaybePromiseLike<void>;
}

@@ -61,17 +62,22 @@

start: async (controller) => {
// `start` is invoked before `ReadableStream`'s constructor finish,
// so using `this` synchronously causes
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor".
// Queue a microtask to avoid this.
await Promise.resolve();
this.readable = await getWrappedReadableStream(
const readable = await getWrappedReadableStream(
wrapper,
controller,
);
// `start` is called in `super()`, so can't use `this` synchronously.
// but it's fine after the first `await`
this.readable = readable;
this.#reader = this.readable.getReader();
},
pull: async (controller) => {
const result = await this.#reader.read();
if (result.done) {
const { done, value } = await this.#reader
.read()
.catch((e) => {
if ("error" in wrapper) {
wrapper.error(e);
}
throw e;
});
if (done) {
controller.close();

@@ -82,3 +88,3 @@ if ("close" in wrapper) {

} else {
controller.enqueue(result.value);
controller.enqueue(value);
}

@@ -85,0 +91,0 @@ },

@@ -1,2 +0,2 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import type { MaybePromiseLike } from "@yume-chan/async";

@@ -6,3 +6,3 @@ import type { TransformStream, WritableStreamDefaultWriter } from "./stream.js";

export type WrapWritableStreamStart<T> = () => ValueOrPromise<
export type WrapWritableStreamStart<T> = () => MaybePromiseLike<
WritableStream<T>

@@ -46,9 +46,6 @@ >;

start: async () => {
// `start` is invoked before `ReadableStream`'s constructor finish,
// so using `this` synchronously causes
// "Must call super constructor in derived class before accessing 'this' or returning from derived constructor".
// Queue a microtask to avoid this.
await Promise.resolve();
this.writable = await getWrappedWritableStream(start);
const writable = await getWrappedWritableStream(start);
// `start` is called in `super()`, so can't use `this` synchronously.
// but it's fine after the first `await`
this.writable = writable;
this.#writer = this.writable.getWriter();

@@ -55,0 +52,0 @@ },

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc