New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@yume-chan/stream-extra

Package Overview
Dependencies
Maintainers
1
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@yume-chan/stream-extra - npm Package Compare versions

Comparing version 0.0.17 to 0.0.18

esm/trace.d.ts

12

CHANGELOG.json

@@ -5,2 +5,14 @@ {

{
"version": "0.0.18",
"tag": "@yume-chan/stream-extra_v0.0.18",
"date": "Wed, 25 Jan 2023 21:33:49 GMT",
"comments": {
"none": [
{
"comment": "Change to load native Web Streams API implementation from `globalThis` if available"
}
]
}
},
{
"version": "0.0.17",

@@ -7,0 +19,0 @@ "tag": "@yume-chan/stream-extra_v0.0.17",

9

CHANGELOG.md
# Change Log - @yume-chan/stream-extra
This log was last generated on Tue, 18 Oct 2022 09:32:30 GMT and should not be manually modified.
This log was last generated on Wed, 25 Jan 2023 21:33:49 GMT and should not be manually modified.
## 0.0.18
Wed, 25 Jan 2023 21:33:49 GMT
### Updates
- Change to load native Web Streams API implementation from `globalThis` if available
## 0.0.17

@@ -6,0 +13,0 @@ Tue, 18 Oct 2022 09:32:30 GMT

6

esm/buffered-transform.d.ts

@@ -1,4 +0,4 @@

import type { ValueOrPromise } from '@yume-chan/struct';
import { BufferedReadableStream } from './buffered.js';
import { ReadableStream, ReadableWritablePair, WritableStream } from './stream.js';
import { type ValueOrPromise } from "@yume-chan/struct";
import { BufferedReadableStream } from "./buffered.js";
import { ReadableStream, WritableStream, type ReadableWritablePair } from "./stream.js";
export declare class BufferedTransformStream<T> implements ReadableWritablePair<T, Uint8Array> {

@@ -5,0 +5,0 @@ private _readable;

@@ -1,14 +0,20 @@

import { BufferedReadableStream, BufferedReadableStreamEndedError } from './buffered.js';
import { PushReadableStream } from './push-readable.js';
import { ReadableStream, WritableStream } from './stream.js';
import { BufferedReadableStream, BufferedReadableStreamEndedError, } from "./buffered.js";
import { PushReadableStream, } from "./push-readable.js";
import { ReadableStream, WritableStream, } from "./stream.js";
// TODO: BufferedTransformStream: find better implementation
export class BufferedTransformStream {
_readable;
get readable() { return this._readable; }
get readable() {
return this._readable;
}
_writable;
get writable() { return this._writable; }
get writable() {
return this._writable;
}
constructor(transform) {
// Convert incoming chunks to a `BufferedReadableStream`
let sourceStreamController;
const buffered = new BufferedReadableStream(new PushReadableStream(controller => sourceStreamController = controller));
const buffered = new BufferedReadableStream(new PushReadableStream((controller) => {
sourceStreamController = controller;
}));
this._readable = new ReadableStream({

@@ -21,5 +27,5 @@ async pull(controller) {

catch (e) {
// TODO: BufferedTransformStream: The semantic of stream ending is not clear
// If the `transform` started but did not finish, it should really be an error?
// But we can't detect that, unless there is a `peek` method on buffered stream.
// Treat `BufferedReadableStreamEndedError` as a normal end.
// If the `transform` method doesn't have enough data to return a value,
// it should throw another error to indicate that.
if (e instanceof BufferedReadableStreamEndedError) {

@@ -35,4 +41,4 @@ controller.close();

// So future writes will be rejected
buffered.cancel(reason);
}
return buffered.cancel(reason);
},
});

@@ -39,0 +45,0 @@ this._writable = new WritableStream({

@@ -1,2 +0,2 @@

import type { ReadableStream, ReadableStreamDefaultReader } from "./stream.js";
import { type ReadableStream, type ReadableStreamDefaultReader } from "./stream.js";
export declare class BufferedReadableStreamEndedError extends Error {

@@ -26,4 +26,4 @@ constructor();

release(): ReadableStream<Uint8Array>;
cancel(reason?: any): Promise<void>;
cancel(reason?: unknown): Promise<void>;
}
//# sourceMappingURL=buffered.d.ts.map
import { PushReadableStream } from "./push-readable.js";
export class BufferedReadableStreamEndedError extends Error {
constructor() {
super('Stream ended');
super("Stream ended");
// Fix Error's prototype chain when compiling to ES5

@@ -104,15 +104,8 @@ Object.setPrototypeOf(this, new.target.prototype);

while (true) {
try {
const { done, value } = await this.reader.read();
if (done) {
controller.close();
break;
}
else {
await controller.enqueue(value);
}
const { done, value } = await this.reader.read();
if (done) {
return;
}
catch (e) {
controller.error(e);
break;
else {
await controller.enqueue(value);
}

@@ -119,0 +112,0 @@ }

@@ -1,2 +0,2 @@

import { decodeUtf8 } from '@yume-chan/struct';
import { decodeUtf8 } from "@yume-chan/struct";
import { TransformStream } from "./stream.js";

@@ -3,0 +3,0 @@ export class DecodeUtf8Stream extends TransformStream {

@@ -1,2 +0,2 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import { type ValueOrPromise } from "@yume-chan/struct";
import { WritableStream, type ReadableStream } from "./stream.js";

@@ -10,3 +10,3 @@ import { WrapReadableStream } from "./wrap-readable.js";

*
* Usually you want to let the other peer know that the duplex stream should be clsoed.
* Usually you want to let the other peer know that the duplex stream should be closed.
*

@@ -13,0 +13,0 @@ * `dispose` will automatically be called after `close` completes,

import { PromiseResolver } from "@yume-chan/async";
import { WritableStream } from "./stream.js";
import { WritableStream, } from "./stream.js";
import { WrapReadableStream } from "./wrap-readable.js";

@@ -14,5 +14,9 @@ /**

_writableClosed = false;
get writableClosed() { return this._writableClosed; }
get writableClosed() {
return this._writableClosed;
}
_closed = new PromiseResolver();
get closed() { return this._closed.promise; }
get closed() {
return this._closed.promise;
}
options;

@@ -53,6 +57,5 @@ constructor(options) {

close: async () => {
try {
await writer.close();
}
catch { }
await writer.close().catch((e) => {
void e;
});
await this.close();

@@ -68,3 +71,3 @@ },

// Call `close` first, so it can still write data to `WritableStream`s.
if (await this.options.close?.() !== false) {
if ((await this.options.close?.()) !== false) {
// `close` can return `false` to disable automatic `dispose`.

@@ -74,6 +77,5 @@ await this.dispose();

for (const writer of this.writers) {
try {
await writer.close();
}
catch { }
await writer.close().catch((e) => {
void e;
});
}

@@ -88,3 +90,5 @@ }

}
catch { }
catch (e) {
void e;
}
}

@@ -91,0 +95,0 @@ await this.options.dispose?.();

import { WritableStream, type ReadableWritablePair } from "./stream.js";
/**
* Create a new `WritableStream` that, when written to, will write that chunk to
* `pair.writable`, when pipe `pair.readable` to `writable`.
* Pipe `pair.readable` to `writable`, then returns `pair.writable`.
*
* It's the opposite of `ReadableStream.pipeThrough`.
* This is the opposite of `ReadableStream#pipeThrough()`.
*
* @param writable The `WritableStream` to write to.
* @param pair A `TransformStream` that converts chunks.
* @returns A new `WritableStream`.
* @returns `pair`'s `writable` stream.
*/
export declare function pipeFrom<W, T>(writable: WritableStream<W>, pair: ReadableWritablePair<W, T>): WritableStream<T>;
export declare function pipeFrom<W, T>(writable: WritableStream<W>, pair: ReadableWritablePair<W, T>): import("web-streams-polyfill").WritableStream<T>;
//# sourceMappingURL=pipe-from.d.ts.map
import { WritableStream } from "./stream.js";
/**
* Create a new `WritableStream` that, when written to, will write that chunk to
* `pair.writable`, when pipe `pair.readable` to `writable`.
* Pipe `pair.readable` to `writable`, then returns `pair.writable`.
*
* It's the opposite of `ReadableStream.pipeThrough`.
* This is the opposite of `ReadableStream#pipeThrough()`.
*
* @param writable The `WritableStream` to write to.
* @param pair A `TransformStream` that converts chunks.
* @returns A new `WritableStream`.
* @returns `pair`'s `writable` stream.
*/

@@ -12,0 +11,0 @@ export function pipeFrom(writable, pair) {

@@ -1,2 +0,2 @@

import { AbortSignal, QueuingStrategy, ReadableStream } from "./stream.js";
import { ReadableStream, type AbortSignal, type QueuingStrategy } from "./stream.js";
export interface PushReadableStreamController<T> {

@@ -6,8 +6,15 @@ abortSignal: AbortSignal;

close(): void;
error(e?: any): void;
error(e?: unknown): void;
}
export declare type PushReadableStreamSource<T> = (controller: PushReadableStreamController<T>) => void;
export type PushReadableStreamSource<T> = (controller: PushReadableStreamController<T>) => void | Promise<void>;
export declare class PushReadableStream<T> extends ReadableStream<T> {
/**
* Create a new `PushReadableStream` from a source.
*
* @param source If `source` returns a `Promise`, the stream will be closed
* when the `Promise` is resolved, and be errored when the `Promise` is rejected.
* @param strategy
*/
constructor(source: PushReadableStreamSource<T>, strategy?: QueuingStrategy<T>);
}
//# sourceMappingURL=push-readable.d.ts.map

@@ -1,4 +0,11 @@

import { PromiseResolver } from '@yume-chan/async';
import { AbortController, ReadableStream } from "./stream.js";
import { PromiseResolver } from "@yume-chan/async";
import { AbortController, ReadableStream, } from "./stream.js";
export class PushReadableStream extends ReadableStream {
/**
* Create a new `PushReadableStream` from a source.
*
* @param source If `source` returns a `Promise`, the stream will be closed
* when the `Promise` is resolved, and be errored when the `Promise` is rejected.
* @param strategy
*/
constructor(source, strategy) {

@@ -9,3 +16,3 @@ let waterMarkLow;

start: (controller) => {
source({
const result = source({
abortSignal: canceled.signal,

@@ -16,3 +23,4 @@ async enqueue(chunk) {

// throw immediately.
throw canceled.signal.reason ?? new Error('Aborted');
throw (canceled.signal.reason ??
new Error("Aborted"));
}

@@ -38,2 +46,9 @@ // Only when the stream is errored, `desiredSize` will be `null`.

});
if (result && "then" in result) {
result.then(() => {
controller.close();
}, (e) => {
controller.error(e);
});
}
},

@@ -43,3 +58,3 @@ pull: () => {

},
cancel: async (reason) => {
cancel: (reason) => {
canceled.abort(reason);

@@ -46,0 +61,0 @@ waterMarkLow?.reject(reason);

@@ -1,3 +0,3 @@

import type { AbortSignal } from "web-streams-polyfill";
export * from 'web-streams-polyfill';
import { ReadableStream as ReadableStreamPolyfill, TransformStream as TransformStreamPolyfill, WritableStream as WritableStreamPolyfill, type AbortSignal } from "web-streams-polyfill";
export * from "web-streams-polyfill";
/** A controller object that allows you to abort one or more DOM requests as and when desired. */

@@ -14,6 +14,13 @@ export interface AbortController {

}
export declare let AbortController: {
interface AbortControllerConstructor {
prototype: AbortController;
new (): AbortController;
};
}
export declare const AbortController: AbortControllerConstructor;
export type ReadableStream<R = any> = ReadableStreamPolyfill<R>;
export declare let ReadableStream: typeof ReadableStreamPolyfill;
export type WritableStream<W = any> = WritableStreamPolyfill<W>;
export declare let WritableStream: typeof WritableStreamPolyfill;
export type TransformStream<I = any, O = any> = TransformStreamPolyfill<I, O>;
export declare let TransformStream: typeof TransformStreamPolyfill;
//# sourceMappingURL=stream.d.ts.map

@@ -1,4 +0,27 @@

export * from 'web-streams-polyfill';
export let AbortController;
({ AbortController } = globalThis);
import { ReadableStream as ReadableStreamPolyfill, TransformStream as TransformStreamPolyfill, WritableStream as WritableStreamPolyfill, } from "web-streams-polyfill";
export * from "web-streams-polyfill";
const GLOBAL = globalThis;
export const AbortController = GLOBAL.AbortController;
export let ReadableStream = ReadableStreamPolyfill;
export let WritableStream = WritableStreamPolyfill;
export let TransformStream = TransformStreamPolyfill;
if (GLOBAL.ReadableStream && GLOBAL.WritableStream && GLOBAL.TransformStream) {
// Use browser native implementation
ReadableStream = GLOBAL.ReadableStream;
WritableStream = GLOBAL.WritableStream;
TransformStream = GLOBAL.TransformStream;
}
else {
// TODO: enable loading Node.js stream implementation when bundler supports Top Level Await
// try {
// // Use Node.js native implementation
// const MODULE_NAME = "node:stream/web";
// const StreamWeb = (await import(MODULE_NAME)) as GlobalExtension;
// ReadableStream = StreamWeb.ReadableStream;
// WritableStream = StreamWeb.WritableStream;
// TransformStream = StreamWeb.TransformStream;
// } catch {
// // ignore
// }
}
//# sourceMappingURL=stream.js.map
import type Struct from "@yume-chan/struct";
import type { StructValueType } from "@yume-chan/struct";
import { BufferedTransformStream } from './buffered-transform.js';
import { type StructValueType } from "@yume-chan/struct";
import { BufferedTransformStream } from "./buffered-transform.js";
export declare class StructDeserializeStream<T extends Struct<any, any, any, any>> extends BufferedTransformStream<StructValueType<T>> {

@@ -5,0 +5,0 @@ constructor(struct: T);

@@ -1,2 +0,2 @@

import { BufferedTransformStream } from './buffered-transform.js';
import { BufferedTransformStream } from "./buffered-transform.js";
export class StructDeserializeStream extends BufferedTransformStream {

@@ -3,0 +3,0 @@ constructor(struct) {

import type Struct from "@yume-chan/struct";
import { TransformStream } from "./stream.js";
export declare class StructSerializeStream<T extends Struct<any, any, any, any>> extends TransformStream<T['TInit'], Uint8Array> {
export declare class StructSerializeStream<T extends Struct<any, any, any, any>> extends TransformStream<T["TInit"], Uint8Array> {
constructor(struct: T);
}
//# sourceMappingURL=struct-serialize.d.ts.map

@@ -1,7 +0,7 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import { ReadableStream, ReadableStreamDefaultController } from "./stream.js";
export declare type WrapReadableStreamStart<T> = (controller: ReadableStreamDefaultController<T>) => ValueOrPromise<ReadableStream<T>>;
import { type ValueOrPromise } from "@yume-chan/struct";
import { ReadableStream, type ReadableStreamDefaultController } from "./stream.js";
export type WrapReadableStreamStart<T> = (controller: ReadableStreamDefaultController<T>) => ValueOrPromise<ReadableStream<T>>;
export interface ReadableStreamWrapper<T> {
start: WrapReadableStreamStart<T>;
cancel?(reason?: any): ValueOrPromise<void>;
cancel?(reason?: unknown): ValueOrPromise<void>;
close?(): ValueOrPromise<void>;

@@ -8,0 +8,0 @@ }

@@ -1,7 +0,7 @@

import { ReadableStream } from "./stream.js";
import { ReadableStream, } from "./stream.js";
function getWrappedReadableStream(wrapper, controller) {
if ('start' in wrapper) {
if ("start" in wrapper) {
return wrapper.start(controller);
}
else if (typeof wrapper === 'function') {
else if (typeof wrapper === "function") {
return wrapper(controller);

@@ -38,3 +38,3 @@ }

await this.reader.cancel(reason);
if ('cancel' in wrapper) {
if ("cancel" in wrapper) {
await wrapper.cancel?.(reason);

@@ -47,3 +47,3 @@ }

controller.close();
if ('close' in wrapper) {
if ("close" in wrapper) {
await wrapper.close?.();

@@ -55,3 +55,3 @@ }

}
}
},
});

@@ -58,0 +58,0 @@ }

@@ -1,7 +0,7 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import { type ValueOrPromise } from "@yume-chan/struct";
import { WritableStream } from "./stream.js";
export declare type WrapWritableStreamStart<T> = () => ValueOrPromise<WritableStream<T>>;
export type WrapWritableStreamStart<T> = () => ValueOrPromise<WritableStream<T>>;
export interface WritableStreamWrapper<T> {
start: WrapWritableStreamStart<T>;
close?(): Promise<void>;
close?(): void | Promise<void>;
}

@@ -8,0 +8,0 @@ export declare class WrapWritableStream<T> extends WritableStream<T> {

import { WritableStream } from "./stream.js";
async function getWrappedWritableStream(wrapper) {
if ('start' in wrapper) {
if ("start" in wrapper) {
return await wrapper.start();
}
else if (typeof wrapper === 'function') {
else if (typeof wrapper === "function") {
return await wrapper();

@@ -36,3 +36,3 @@ }

await this.writer.abort(reason);
if ('close' in wrapper) {
if ("close" in wrapper) {
await wrapper.close?.();

@@ -47,3 +47,3 @@ }

await this.writer.close();
if ('close' in wrapper) {
if ("close" in wrapper) {
await wrapper.close?.();

@@ -50,0 +50,0 @@ }

{
"name": "@yume-chan/stream-extra",
"version": "0.0.17",
"version": "0.0.18",
"description": "Extensions to Web Streams API",

@@ -29,13 +29,15 @@ "keywords": [

"@yume-chan/async": "^2.2.0",
"@yume-chan/struct": "^0.0.17",
"tslib": "^2.4.0",
"@yume-chan/struct": "^0.0.18",
"tslib": "^2.4.1",
"web-streams-polyfill": "^4.0.0-beta.3"
},
"devDependencies": {
"@jest/globals": "^28.1.2",
"@yume-chan/ts-package-builder": "^1.0.0",
"@jest/globals": "^29.3.1",
"@yume-chan/eslint-config": "^1.0.0",
"@yume-chan/tsconfig": "^1.0.0",
"cross-env": "^7.0.3",
"jest": "^28.1.2",
"ts-jest": "^28.0.5",
"typescript": "^4.7.4"
"eslint": "^8.31.0",
"jest": "^29.3.1",
"ts-jest": "^29.0.4",
"typescript": "^4.9.4"
},

@@ -45,4 +47,5 @@ "scripts": {

"build:watch": "tsc -b tsconfig.build.json",
"test": "cross-env NODE_OPTIONS=--experimental-vm-modules jest --coverage"
"test": "cross-env NODE_OPTIONS=--experimental-vm-modules jest --coverage",
"lint": "eslint src/**/*.ts --fix"
}
}
# @yume-chan/stream-extra
Some useful extensions for Web Streams API.
Some useful extensions for working with binary streams. Conforms to the [Web Streams API](https://streams.spec.whatwg.org/).
Currently it's using [web-streams-polyfill](https://github.com/MattiasBuelens/web-streams-polyfill) because it's hard to load native implementations from both browsers and Node.js. (An experimental implementation using Top Level Await is available in `native.ts`, but not exported).
## Find an implementation
If all of `ReadableStream`, `WritableStream` and `TransformStream` fields are available on `globalThis`, they will be used. Otherwise, the [web-streams-polyfill](https://github.com/MattiasBuelens/web-streams-polyfill) package will be used.
Google Chrome 89 and Mozilla Firefox 102 provide full support for Web Streams API natively.
In Node.js, it's not possible to load the `stream/web` module while keeping the compatibility with both Web and bundlers:
- Webpack has poor support with Top Level Await, for example, Hot Module Replacement doesn't work when any module is using TLA.
- Web doesn't have the `module` module, thus requires a shim in import map.
Assigning `ReadableStream`, `WritableStream` and `TransformStream` from `stream/web` module to `globalThis`, before loading this library, will still work. Other custom polyfill can also be loaded this way.
## Compatibility issue with `ReadableStream#pipeTo` and `ReadableStream#pipeThrough`
The [Web Streams API spec](https://streams.spec.whatwg.org/#readable-stream-pipe-to) specifies that `ReadableStream#pipeTo` must check the argument to be an instance of `WritableStream`, so it can optimize the performance by calling internal methods directly.
Native implementations will perform this check, so `new globalThis.ReadableStream().pipeTo(new Polyfill.WritableStream())` will throw an error.
The `WrapReadableStream` class can be used to bypass this check:
```ts
import { WrapReadableStream } from "@yume-chan/stream-extra";
import { WritableStream as PolyfillWritableStream } from "web-streams-polyfill";
const nativeReadable = new globalThis.ReadableStream();
const wrappedReadable = new WrapReadableStream(new globalThis.ReadableStream());
nativeReadable.pipeTo(new PolyfillWritableStream()); // Error
wrappedReadable.pipeTo(new PolyfillWritableStream()); // OK
```
web-streams-polyfill package's `ReadableStream#pipeTo` only uses public methods, so it can be used with any `WritableStream` implementation.
## `BufferedReadableStream`

@@ -8,0 +40,0 @@

@@ -1,22 +0,42 @@

import type { ValueOrPromise } from '@yume-chan/struct';
import { BufferedReadableStream, BufferedReadableStreamEndedError } from './buffered.js';
import { PushReadableStream, PushReadableStreamController } from './push-readable.js';
import { ReadableStream, ReadableWritablePair, WritableStream } from './stream.js';
import { type ValueOrPromise } from "@yume-chan/struct";
import {
BufferedReadableStream,
BufferedReadableStreamEndedError,
} from "./buffered.js";
import {
PushReadableStream,
type PushReadableStreamController,
} from "./push-readable.js";
import {
ReadableStream,
WritableStream,
type ReadableWritablePair,
} from "./stream.js";
// TODO: BufferedTransformStream: find better implementation
export class BufferedTransformStream<T> implements ReadableWritablePair<T, Uint8Array> {
export class BufferedTransformStream<T>
implements ReadableWritablePair<T, Uint8Array>
{
private _readable: ReadableStream<T>;
public get readable() { return this._readable; }
public get readable() {
return this._readable;
}
private _writable: WritableStream<Uint8Array>;
public get writable() { return this._writable; }
public get writable() {
return this._writable;
}
constructor(transform: (stream: BufferedReadableStream) => ValueOrPromise<T>) {
constructor(
transform: (stream: BufferedReadableStream) => ValueOrPromise<T>
) {
// Convert incoming chunks to a `BufferedReadableStream`
let sourceStreamController!: PushReadableStreamController<Uint8Array>;
const buffered = new BufferedReadableStream(new PushReadableStream<Uint8Array>(
controller =>
sourceStreamController = controller,
));
const buffered = new BufferedReadableStream(
new PushReadableStream<Uint8Array>((controller) => {
sourceStreamController = controller;
})
);

@@ -29,5 +49,5 @@ this._readable = new ReadableStream<T>({

} catch (e) {
// TODO: BufferedTransformStream: The semantic of stream ending is not clear
// If the `transform` started but did not finish, it should really be an error?
// But we can't detect that, unless there is a `peek` method on buffered stream.
// Treat `BufferedReadableStreamEndedError` as a normal end.
// If the `transform` method doesn't have enough data to return a value,
// it should throw another error to indicate that.
if (e instanceof BufferedReadableStreamEndedError) {

@@ -43,4 +63,4 @@ controller.close();

// So future writes will be rejected
buffered.cancel(reason);
}
return buffered.cancel(reason);
},
});

@@ -47,0 +67,0 @@

import { PushReadableStream } from "./push-readable.js";
import type { ReadableStream, ReadableStreamDefaultReader } from "./stream.js";
import {
type ReadableStream,
type ReadableStreamDefaultReader,
} from "./stream.js";
export class BufferedReadableStreamEndedError extends Error {
public constructor() {
super('Stream ended');
super("Stream ended");

@@ -118,3 +121,3 @@ // Fix Error's prototype chain when compiling to ES5

if (this.buffered) {
return new PushReadableStream<Uint8Array>(async controller => {
return new PushReadableStream<Uint8Array>(async (controller) => {
// Put the remaining data back to the stream

@@ -125,13 +128,7 @@ await controller.enqueue(this.buffered!);

while (true) {
try {
const { done, value } = await this.reader.read();
if (done) {
controller.close();
break;
} else {
await controller.enqueue(value);
}
} catch (e) {
controller.error(e);
break;
const { done, value } = await this.reader.read();
if (done) {
return;
} else {
await controller.enqueue(value);
}

@@ -147,5 +144,5 @@ }

public cancel(reason?: any) {
public cancel(reason?: unknown) {
return this.reader.cancel(reason);
}
}

@@ -1,5 +0,6 @@

import { decodeUtf8 } from '@yume-chan/struct';
import { decodeUtf8 } from "@yume-chan/struct";
import { TransformStream } from "./stream.js";
export class DecodeUtf8Stream extends TransformStream<Uint8Array, string>{
export class DecodeUtf8Stream extends TransformStream<Uint8Array, string> {
public constructor() {

@@ -6,0 +7,0 @@ super({

import { PromiseResolver } from "@yume-chan/async";
import type { ValueOrPromise } from "@yume-chan/struct";
import { WritableStream, type ReadableStream, type ReadableStreamDefaultController, type WritableStreamDefaultWriter } from "./stream.js";
import { type ValueOrPromise } from "@yume-chan/struct";
import {
WritableStream,
type ReadableStream,
type ReadableStreamDefaultController,
type WritableStreamDefaultWriter,
} from "./stream.js";
import { WrapReadableStream } from "./wrap-readable.js";

@@ -12,3 +18,3 @@

*
* Usually you want to let the other peer know that the duplex stream should be clsoed.
* Usually you want to let the other peer know that the duplex stream should be closed.
*

@@ -44,6 +50,10 @@ * `dispose` will automatically be called after `close` completes,

private _writableClosed = false;
public get writableClosed() { return this._writableClosed; }
public get writableClosed() {
return this._writableClosed;
}
private _closed = new PromiseResolver<void>();
public get closed() { return this._closed.promise; }
public get closed() {
return this._closed.promise;
}

@@ -89,3 +99,5 @@ private options: DuplexStreamFactoryOptions;

close: async () => {
try { await writer.close(); } catch { }
await writer.close().catch((e) => {
void e;
});
await this.close();

@@ -103,3 +115,3 @@ },

// Call `close` first, so it can still write data to `WritableStream`s.
if (await this.options.close?.() !== false) {
if ((await this.options.close?.()) !== false) {
// `close` can return `false` to disable automatic `dispose`.

@@ -110,3 +122,5 @@ await this.dispose();

for (const writer of this.writers) {
try { await writer.close(); } catch { }
await writer.close().catch((e) => {
void e;
});
}

@@ -120,3 +134,7 @@ }

for (const controller of this.readableControllers) {
try { controller.close(); } catch { }
try {
controller.close();
} catch (e) {
void e;
}
}

@@ -123,0 +141,0 @@

import { WritableStream, type ReadableWritablePair } from "./stream.js";
/**
* Create a new `WritableStream` that, when written to, will write that chunk to
* `pair.writable`, when pipe `pair.readable` to `writable`.
* Pipe `pair.readable` to `writable`, then returns `pair.writable`.
*
* It's the opposite of `ReadableStream.pipeThrough`.
* This is the opposite of `ReadableStream#pipeThrough()`.
*
* @param writable The `WritableStream` to write to.
* @param pair A `TransformStream` that converts chunks.
* @returns A new `WritableStream`.
* @returns `pair`'s `writable` stream.
*/

@@ -13,0 +12,0 @@ export function pipeFrom<W, T>(writable: WritableStream<W>, pair: ReadableWritablePair<W, T>) {

@@ -1,4 +0,10 @@

import { PromiseResolver } from '@yume-chan/async';
import { AbortController, AbortSignal, QueuingStrategy, ReadableStream } from "./stream.js";
import { PromiseResolver } from "@yume-chan/async";
import {
AbortController,
ReadableStream,
type AbortSignal,
type QueuingStrategy,
} from "./stream.js";
export interface PushReadableStreamController<T> {

@@ -11,53 +17,82 @@ abortSignal: AbortSignal;

error(e?: any): void;
error(e?: unknown): void;
}
export type PushReadableStreamSource<T> = (controller: PushReadableStreamController<T>) => void;
export type PushReadableStreamSource<T> = (
controller: PushReadableStreamController<T>
) => void | Promise<void>;
export class PushReadableStream<T> extends ReadableStream<T> {
public constructor(source: PushReadableStreamSource<T>, strategy?: QueuingStrategy<T>) {
/**
* Create a new `PushReadableStream` from a source.
*
* @param source If `source` returns a `Promise`, the stream will be closed
* when the `Promise` is resolved, and be errored when the `Promise` is rejected.
* @param strategy
*/
public constructor(
source: PushReadableStreamSource<T>,
strategy?: QueuingStrategy<T>
) {
let waterMarkLow: PromiseResolver<void> | undefined;
const canceled: AbortController = new AbortController();
super({
start: (controller) => {
source({
abortSignal: canceled.signal,
async enqueue(chunk) {
if (canceled.signal.aborted) {
// If the stream is already cancelled,
// throw immediately.
throw canceled.signal.reason ?? new Error('Aborted');
}
super(
{
start: (controller) => {
const result = source({
abortSignal: canceled.signal,
async enqueue(chunk) {
if (canceled.signal.aborted) {
// If the stream is already cancelled,
// throw immediately.
throw (
canceled.signal.reason ??
new Error("Aborted")
);
}
// Only when the stream is errored, `desiredSize` will be `null`.
// But since `null <= 0` is `true`
// (`null <= 0` is evaluated as `!(null > 0)` => `!false` => `true`),
// not handling it will cause a deadlock.
if ((controller.desiredSize ?? 1) <= 0) {
waterMarkLow = new PromiseResolver<void>();
await waterMarkLow.promise;
}
// Only when the stream is errored, `desiredSize` will be `null`.
// But since `null <= 0` is `true`
// (`null <= 0` is evaluated as `!(null > 0)` => `!false` => `true`),
// not handling it will cause a deadlock.
if ((controller.desiredSize ?? 1) <= 0) {
waterMarkLow = new PromiseResolver<void>();
await waterMarkLow.promise;
}
// `controller.enqueue` will throw error for us
// if the stream is already errored.
controller.enqueue(chunk);
},
close() {
controller.close();
},
error(e) {
controller.error(e);
},
});
// `controller.enqueue` will throw error for us
// if the stream is already errored.
controller.enqueue(chunk);
},
close() {
controller.close();
},
error(e) {
controller.error(e);
},
});
if (result && "then" in result) {
result.then(
() => {
controller.close();
},
(e) => {
controller.error(e);
}
);
}
},
pull: () => {
waterMarkLow?.resolve();
},
cancel: (reason) => {
canceled.abort(reason);
waterMarkLow?.reject(reason);
},
},
pull: () => {
waterMarkLow?.resolve();
},
cancel: async (reason) => {
canceled.abort(reason);
waterMarkLow?.reject(reason);
},
}, strategy);
strategy
);
}
}

@@ -1,3 +0,8 @@

import type { AbortSignal } from "web-streams-polyfill";
export * from 'web-streams-polyfill';
import {
ReadableStream as ReadableStreamPolyfill,
TransformStream as TransformStreamPolyfill,
WritableStream as WritableStreamPolyfill,
type AbortSignal,
} from "web-streams-polyfill";
export * from "web-streams-polyfill";

@@ -9,4 +14,4 @@ /** A controller object that allows you to abort one or more DOM requests as and when desired. */

*/
readonly signal: AbortSignal;
readonly signal: AbortSignal;
/**

@@ -18,7 +23,44 @@ * Invoking this method will set this object's AbortSignal's aborted flag and signal to any observers that the associated activity is to be aborted.

export let AbortController: {
interface AbortControllerConstructor {
prototype: AbortController;
new(): AbortController;
};
new (): AbortController;
}
({ AbortController } = globalThis as any);
interface GlobalExtension {
AbortController: AbortControllerConstructor;
ReadableStream: typeof ReadableStreamPolyfill;
WritableStream: typeof WritableStreamPolyfill;
TransformStream: typeof TransformStreamPolyfill;
}
const GLOBAL = globalThis as unknown as GlobalExtension;
export const AbortController = GLOBAL.AbortController;
export type ReadableStream<R = any> = ReadableStreamPolyfill<R>;
export let ReadableStream = ReadableStreamPolyfill;
export type WritableStream<W = any> = WritableStreamPolyfill<W>;
export let WritableStream = WritableStreamPolyfill;
export type TransformStream<I = any, O = any> = TransformStreamPolyfill<I, O>;
export let TransformStream = TransformStreamPolyfill;
if (GLOBAL.ReadableStream && GLOBAL.WritableStream && GLOBAL.TransformStream) {
// Use browser native implementation
ReadableStream = GLOBAL.ReadableStream;
WritableStream = GLOBAL.WritableStream;
TransformStream = GLOBAL.TransformStream;
} else {
// TODO: enable loading Node.js stream implementation when bundler supports Top Level Await
// try {
// // Use Node.js native implementation
// const MODULE_NAME = "node:stream/web";
// const StreamWeb = (await import(MODULE_NAME)) as GlobalExtension;
// ReadableStream = StreamWeb.ReadableStream;
// WritableStream = StreamWeb.WritableStream;
// TransformStream = StreamWeb.TransformStream;
// } catch {
// // ignore
// }
}
import type Struct from "@yume-chan/struct";
import type { StructValueType } from "@yume-chan/struct";
import { BufferedTransformStream } from './buffered-transform.js';
import { type StructValueType } from "@yume-chan/struct";
export class StructDeserializeStream<T extends Struct<any, any, any, any>>
extends BufferedTransformStream<StructValueType<T>> {
import { BufferedTransformStream } from "./buffered-transform.js";
export class StructDeserializeStream<
T extends Struct<any, any, any, any>
> extends BufferedTransformStream<StructValueType<T>> {
public constructor(struct: T) {
super((stream) => {
return struct.deserialize(stream)
return struct.deserialize(stream);
});
}
}
import type Struct from "@yume-chan/struct";
import { TransformStream } from "./stream.js";
export class StructSerializeStream<T extends Struct<any, any, any, any>>
extends TransformStream<T['TInit'], Uint8Array>{
export class StructSerializeStream<
T extends Struct<any, any, any, any>
> extends TransformStream<T["TInit"], Uint8Array> {
constructor(struct: T) {

@@ -7,0 +9,0 @@ super({

@@ -1,9 +0,16 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import { ReadableStream, ReadableStreamDefaultController, ReadableStreamDefaultReader } from "./stream.js";
import { type ValueOrPromise } from "@yume-chan/struct";
export type WrapReadableStreamStart<T> = (controller: ReadableStreamDefaultController<T>) => ValueOrPromise<ReadableStream<T>>;
import {
ReadableStream,
type ReadableStreamDefaultController,
type ReadableStreamDefaultReader,
} from "./stream.js";
export type WrapReadableStreamStart<T> = (
controller: ReadableStreamDefaultController<T>
) => ValueOrPromise<ReadableStream<T>>;
export interface ReadableStreamWrapper<T> {
start: WrapReadableStreamStart<T>;
cancel?(reason?: any): ValueOrPromise<void>;
cancel?(reason?: unknown): ValueOrPromise<void>;
close?(): ValueOrPromise<void>;

@@ -13,8 +20,11 @@ }

function getWrappedReadableStream<T>(
wrapper: ReadableStream<T> | WrapReadableStreamStart<T> | ReadableStreamWrapper<T>,
wrapper:
| ReadableStream<T>
| WrapReadableStreamStart<T>
| ReadableStreamWrapper<T>,
controller: ReadableStreamDefaultController<T>
) {
if ('start' in wrapper) {
if ("start" in wrapper) {
return wrapper.start(controller);
} else if (typeof wrapper === 'function') {
} else if (typeof wrapper === "function") {
return wrapper(controller);

@@ -35,3 +45,3 @@ } else {

*/
export class WrapReadableStream<T> extends ReadableStream<T>{
export class WrapReadableStream<T> extends ReadableStream<T> {
public readable!: ReadableStream<T>;

@@ -41,3 +51,8 @@

public constructor(wrapper: ReadableStream<T> | WrapReadableStreamStart<T> | ReadableStreamWrapper<T>) {
public constructor(
wrapper:
| ReadableStream<T>
| WrapReadableStreamStart<T>
| ReadableStreamWrapper<T>
) {
super({

@@ -51,3 +66,6 @@ start: async (controller) => {

this.readable = await getWrappedReadableStream(wrapper, controller);
this.readable = await getWrappedReadableStream(
wrapper,
controller
);
this.reader = this.readable.getReader();

@@ -57,3 +75,3 @@ },

await this.reader.cancel(reason);
if ('cancel' in wrapper) {
if ("cancel" in wrapper) {
await wrapper.cancel?.(reason);

@@ -66,3 +84,3 @@ }

controller.close();
if ('close' in wrapper) {
if ("close" in wrapper) {
await wrapper.close?.();

@@ -73,5 +91,5 @@ }

}
}
},
});
}
}

@@ -1,17 +0,23 @@

import type { ValueOrPromise } from "@yume-chan/struct";
import { WritableStream, WritableStreamDefaultWriter } from "./stream.js";
import { type ValueOrPromise } from "@yume-chan/struct";
export type WrapWritableStreamStart<T> = () => ValueOrPromise<WritableStream<T>>;
import { WritableStream, type WritableStreamDefaultWriter } from "./stream.js";
export type WrapWritableStreamStart<T> = () => ValueOrPromise<
WritableStream<T>
>;
export interface WritableStreamWrapper<T> {
start: WrapWritableStreamStart<T>;
close?(): Promise<void>;
close?(): void | Promise<void>;
}
async function getWrappedWritableStream<T>(
wrapper: WritableStream<T> | WrapWritableStreamStart<T> | WritableStreamWrapper<T>
wrapper:
| WritableStream<T>
| WrapWritableStreamStart<T>
| WritableStreamWrapper<T>
) {
if ('start' in wrapper) {
if ("start" in wrapper) {
return await wrapper.start();
} else if (typeof wrapper === 'function') {
} else if (typeof wrapper === "function") {
return await wrapper();

@@ -30,3 +36,8 @@ } else {

public constructor(wrapper: WritableStream<T> | WrapWritableStreamStart<T> | WritableStreamWrapper<T>) {
public constructor(
wrapper:
| WritableStream<T>
| WrapWritableStreamStart<T>
| WritableStreamWrapper<T>
) {
super({

@@ -50,3 +61,3 @@ start: async () => {

await this.writer.abort(reason);
if ('close' in wrapper) {
if ("close" in wrapper) {
await wrapper.close?.();

@@ -61,3 +72,3 @@ }

await this.writer.close();
if ('close' in wrapper) {
if ("close" in wrapper) {
await wrapper.close?.();

@@ -64,0 +75,0 @@ }

{
"extends": "./node_modules/@yume-chan/ts-package-builder/tsconfig.base.json"
"extends": "./node_modules/@yume-chan/tsconfig/tsconfig.base.json"
}

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc