New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Socket
Sign inDemoInstall
Socket

@yume-chan/stream-extra

Package Overview
Dependencies
Maintainers
0
Versions
13
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@yume-chan/stream-extra - npm Package Compare versions

Comparing version 0.0.0-20240714132542 to 0.0.0-next-20240917062356

esm/buffered.spec.d.ts

13

CHANGELOG.md
# Change Log - @yume-chan/stream-extra
## 0.0.0-20240714132542
## 0.0.0-next-20240917062356
### Major Changes
- 53688d3: Use PNPM workspace and Changesets to manage the monorepo.
Because Changesets doesn't support alpha versions (`0.x.x`), this version is `1.0.0`. Future versions will follow SemVer rules, for example, breaking API changes will introduce a new major version.
### Patch Changes
- Switch to PNPM workspace and changesets
- Updated dependencies
- @yume-chan/struct@0.0.0-20240714132542
- Updated dependencies [53688d3]
- @yume-chan/struct@0.0.0-next-20240917062356

@@ -11,0 +16,0 @@ This log was last generated on Tue, 18 Jun 2024 02:49:43 GMT and should not be manually modified.

import { ExactReadableEndedError } from "@yume-chan/struct";
import { PushReadableStream } from "./push-readable.js";
const NOOP = () => {
// no-op
};
import { tryCancel } from "./try-close.js";
export class BufferedReadableStream {

@@ -115,4 +113,3 @@ #buffered;

controller.abortSignal.addEventListener("abort", () => {
// NOOP: the reader might already be released
this.reader.cancel().catch(NOOP);
void tryCancel(this.reader);
});

@@ -119,0 +116,0 @@ // Manually pipe the stream

import type { QueuingStrategy, WritableStreamDefaultController, WritableStreamDefaultWriter } from "./stream.js";
import { ReadableStream as NativeReadableStream, WritableStream as NativeWritableStream } from "./stream.js";
export declare class Consumable<T> {
#private;
static readonly WritableStream: {
new <in T_1>(sink: Consumable.WritableStreamSink<T_1>, strategy?: QueuingStrategy<T_1>): {
readonly locked: boolean;
abort(reason?: unknown): Promise<void>;
close(): Promise<undefined>;
getWriter(): WritableStreamDefaultWriter<Consumable<T_1>>;
};
write<T_1>(writer: WritableStreamDefaultWriter<Consumable<T_1>>, value: T_1): Promise<void>;
};
static readonly ReadableStream: {
new <T_1>(source: Consumable.ReadableStreamSource<T_1>, strategy?: QueuingStrategy<T_1>): {
readonly locked: boolean;
cancel(reason?: unknown): Promise<void>;
getReader({ mode }: {
mode: "byob";
}): import("./types.js").ReadableStreamBYOBReader;
getReader(): import("./types.js").ReadableStreamDefaultReader<Consumable<T_1>>;
pipeThrough<RS extends import("./types.js").ReadableStream<unknown>>(transform: {
readable: RS;
writable: import("./types.js").WritableStream<Consumable<T_1>>;
}, options?: import("./types.js").StreamPipeOptions): RS;
pipeTo(destination: import("./types.js").WritableStream<Consumable<T_1>>, options?: import("./types.js").StreamPipeOptions): Promise<void>;
tee(): [import("./types.js").ReadableStream<Consumable<T_1>>, import("./types.js").ReadableStream<Consumable<T_1>>];
values(options?: import("./types.js").ReadableStreamIteratorOptions): import("./types.js").ReadableStreamAsyncIterator<Consumable<T_1>>;
[Symbol.asyncIterator](options?: import("./types.js").ReadableStreamIteratorOptions): import("./types.js").ReadableStreamAsyncIterator<Consumable<T_1>>;
};
enqueue<T_1>(controller: {
enqueue: (chunk: Consumable<T_1>) => void;
}, chunk: T_1): Promise<void>;
from<R>(asyncIterable: Iterable<R> | AsyncIterable<R> | import("./types.js").ReadableStreamLike<R>): import("./types.js").ReadableStream<R>;
};
readonly value: T;

@@ -19,6 +49,3 @@ readonly consumed: Promise<void>;

}
class WritableStream<in T> extends NativeWritableStream<Consumable<T>> {
static write<T>(writer: WritableStreamDefaultWriter<Consumable<T>>, value: T): Promise<void>;
constructor(sink: WritableStreamSink<T>, strategy?: QueuingStrategy<T>);
}
type WritableStream<in T> = typeof Consumable.WritableStream<T>;
interface ReadableStreamController<T> {

@@ -34,9 +61,4 @@ enqueue(chunk: T): Promise<void>;

}
class ReadableStream<T> extends NativeReadableStream<Consumable<T>> {
static enqueue<T>(controller: {
enqueue: (chunk: Consumable<T>) => void;
}, chunk: T): Promise<void>;
constructor(source: ReadableStreamSource<T>, strategy?: QueuingStrategy<T>);
}
type ReadableStream<T> = typeof Consumable.ReadableStream<T>;
}
//# sourceMappingURL=consumable.d.ts.map

@@ -8,43 +8,3 @@ import { PromiseResolver } from "@yume-chan/async";

export class Consumable {
#task;
#resolver;
value;
consumed;
constructor(value) {
this.#task = createTask("Consumable");
this.value = value;
this.#resolver = new PromiseResolver();
this.consumed = this.#resolver.promise;
}
consume() {
this.#resolver.resolve();
}
error(error) {
this.#resolver.reject(error);
}
tryConsume(callback) {
try {
let result = this.#task.run(() => callback(this.value));
if (isPromiseLike(result)) {
result = result.then((value) => {
this.#resolver.resolve();
return value;
}, (e) => {
this.#resolver.reject(e);
throw e;
});
}
else {
this.#resolver.resolve();
}
return result;
}
catch (e) {
this.#resolver.reject(e);
throw e;
}
}
}
(function (Consumable) {
class WritableStream extends NativeWritableStream {
static WritableStream = class WritableStream extends NativeWritableStream {
static async write(writer, value) {

@@ -83,5 +43,4 @@ const consumable = new Consumable(value);

}
}
Consumable.WritableStream = WritableStream;
class ReadableStream extends NativeReadableStream {
};
static ReadableStream = class ReadableStream extends NativeReadableStream {
static async enqueue(controller, chunk) {

@@ -129,5 +88,42 @@ const output = new Consumable(chunk);

}
};
#task;
#resolver;
value;
consumed;
constructor(value) {
this.#task = createTask("Consumable");
this.value = value;
this.#resolver = new PromiseResolver();
this.consumed = this.#resolver.promise;
}
Consumable.ReadableStream = ReadableStream;
})(Consumable || (Consumable = {}));
consume() {
this.#resolver.resolve();
}
error(error) {
this.#resolver.reject(error);
}
tryConsume(callback) {
try {
let result = this.#task.run(() => callback(this.value));
if (isPromiseLike(result)) {
result = result.then((value) => {
this.#resolver.resolve();
return value;
}, (e) => {
this.#resolver.reject(e);
throw e;
});
}
else {
this.#resolver.resolve();
}
return result;
}
catch (e) {
this.#resolver.reject(e);
throw e;
}
}
}
//# sourceMappingURL=consumable.js.map
import { PromiseResolver } from "@yume-chan/async";
import { WritableStream } from "./stream.js";
import { tryClose } from "./try-close.js";
import { WrapReadableStream } from "./wrap-readable.js";

@@ -83,8 +84,3 @@ const NOOP = () => {

for (const controller of this.#readableControllers) {
try {
controller.close();
}
catch {
// ignore
}
tryClose(controller);
}

@@ -91,0 +87,0 @@ await this.#options.dispose?.();

@@ -17,4 +17,5 @@ export * from "./buffered-transform.js";

export * from "./task.js";
export * from "./try-close.js";
export * from "./wrap-readable.js";
export * from "./wrap-writable.js";
//# sourceMappingURL=index.d.ts.map

@@ -17,4 +17,5 @@ export * from "./buffered-transform.js";

export * from "./task.js";
export * from "./try-close.js";
export * from "./wrap-readable.js";
export * from "./wrap-writable.js";
//# sourceMappingURL=index.js.map

@@ -1,21 +0,4 @@

import { Consumable } from "./consumable.js";
import type { QueuingStrategy, WritableStreamDefaultController } from "./stream.js";
import { WritableStream as NativeWritableStream, TransformStream } from "./stream.js";
import type { Consumable } from "./consumable.js";
export type MaybeConsumable<T> = T | Consumable<T>;
export declare namespace MaybeConsumable {
function getValue<T>(value: MaybeConsumable<T>): T;
function tryConsume<T, R>(value: T, callback: (value: T extends Consumable<infer U> ? U : T) => R): R;
class UnwrapStream<T> extends TransformStream<MaybeConsumable<T>, T> {
constructor();
}
interface WritableStreamSink<in T> {
start?(controller: WritableStreamDefaultController): void | PromiseLike<void>;
write?(chunk: T, controller: WritableStreamDefaultController): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
class WritableStream<in T> extends NativeWritableStream<MaybeConsumable<T>> {
constructor(sink: WritableStreamSink<T>, strategy?: QueuingStrategy<T>);
}
}
export * as MaybeConsumable from "./maybe-consumable-ns.js";
//# sourceMappingURL=maybe-consumable.d.ts.map

@@ -1,62 +0,2 @@

import { Consumable } from "./consumable.js";
import { WritableStream as NativeWritableStream, TransformStream, } from "./stream.js";
export var MaybeConsumable;
(function (MaybeConsumable) {
function getValue(value) {
return value instanceof Consumable ? value.value : value;
}
MaybeConsumable.getValue = getValue;
function tryConsume(value, callback) {
if (value instanceof Consumable) {
return value.tryConsume(callback);
}
else {
return callback(value);
}
}
MaybeConsumable.tryConsume = tryConsume;
class UnwrapStream extends TransformStream {
constructor() {
super({
transform(chunk, controller) {
MaybeConsumable.tryConsume(chunk, (chunk) => {
controller.enqueue(chunk);
});
},
});
}
}
MaybeConsumable.UnwrapStream = UnwrapStream;
class WritableStream extends NativeWritableStream {
constructor(sink, strategy) {
let wrappedStrategy;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size(chunk instanceof Consumable ? chunk.value : chunk);
};
}
}
super({
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await MaybeConsumable.tryConsume(chunk, (chunk) => sink.write?.(chunk, controller));
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
}, wrappedStrategy);
}
}
MaybeConsumable.WritableStream = WritableStream;
})(MaybeConsumable || (MaybeConsumable = {}));
export * as MaybeConsumable from "./maybe-consumable-ns.js";
//# sourceMappingURL=maybe-consumable.js.map

@@ -18,9 +18,6 @@ import type { AbortSignal, ReadableStream as ReadableStreamType, TransformStream as TransformStreamType, WritableStream as WritableStreamType } from "./types.js";

}
export declare const AbortController: AbortControllerConstructor;
export type ReadableStream<out T> = ReadableStreamType<T>;
export declare const ReadableStream: typeof ReadableStreamType;
export type WritableStream<in T> = WritableStreamType<T>;
export declare const WritableStream: typeof WritableStreamType;
export type TransformStream<I, O> = TransformStreamType<I, O>;
export declare const TransformStream: typeof TransformStreamType;
export declare const AbortController: AbortControllerConstructor, ReadableStream: typeof ReadableStreamType, WritableStream: typeof WritableStreamType, TransformStream: typeof TransformStreamType;
//# sourceMappingURL=stream.d.ts.map
export * from "./types.js";
const Global = globalThis;
export const AbortController = Global.AbortController;
export const ReadableStream = Global.ReadableStream;
export const WritableStream = Global.WritableStream;
export const TransformStream = Global.TransformStream;
export const { AbortController, ReadableStream, WritableStream, TransformStream, } = globalThis;
//# sourceMappingURL=stream.js.map
{
"name": "@yume-chan/stream-extra",
"version": "0.0.0-20240714132542",
"version": "0.0.0-next-20240917062356",
"description": "Extensions to Web Streams API",

@@ -30,20 +30,17 @@ "keywords": [

"@yume-chan/async": "^2.2.0",
"@yume-chan/struct": "0.0.0-20240714132542"
"@yume-chan/struct": "^0.0.0-next-20240917062356"
},
"devDependencies": {
"@jest/globals": "^30.0.0-alpha.4",
"@types/node": "^22.5.5",
"prettier": "^3.3.3",
"typescript": "^5.6.2",
"@yume-chan/test-runner": "^1.0.0",
"@yume-chan/eslint-config": "^1.0.0",
"@yume-chan/tsconfig": "^1.0.0",
"cross-env": "^7.0.3",
"jest": "^30.0.0-alpha.4",
"prettier": "^3.3.3",
"ts-jest": "^29.2.2",
"typescript": "^5.5.3"
"@yume-chan/tsconfig": "^1.0.0"
},
"scripts": {
"build": "tsc -b tsconfig.build.json",
"build:watch": "tsc -b tsconfig.build.json",
"test": "cross-env NODE_OPTIONS=\"--experimental-vm-modules --no-warnings\" TS_JEST_DISABLE_VER_CHECKER=true jest --coverage",
"lint": "run-eslint && prettier src/**/*.ts --write --tab-width 4"
"lint": "run-eslint && prettier src/**/*.ts --write --tab-width 4",
"test": "run-test"
}
}

@@ -6,7 +6,4 @@ import type { AsyncExactReadable } from "@yume-chan/struct";

import type { ReadableStream, ReadableStreamDefaultReader } from "./stream.js";
import { tryCancel } from "./try-close.js";
const NOOP = () => {
// no-op
};
export class BufferedReadableStream implements AsyncExactReadable {

@@ -137,4 +134,3 @@ #buffered: Uint8Array | undefined;

controller.abortSignal.addEventListener("abort", () => {
// NOOP: the reader might already be released
this.reader.cancel().catch(NOOP);
void tryCancel(this.reader);
});

@@ -141,0 +137,0 @@

@@ -20,64 +20,5 @@ import { PromiseResolver } from "@yume-chan/async";

export class Consumable<T> {
readonly #task: Task;
readonly #resolver: PromiseResolver<void>;
readonly value: T;
readonly consumed: Promise<void>;
constructor(value: T) {
this.#task = createTask("Consumable");
this.value = value;
this.#resolver = new PromiseResolver<void>();
this.consumed = this.#resolver.promise;
}
consume() {
this.#resolver.resolve();
}
error(error: unknown) {
this.#resolver.reject(error);
}
tryConsume<U>(callback: (value: T) => U) {
try {
let result = this.#task.run(() => callback(this.value));
if (isPromiseLike(result)) {
result = result.then(
(value) => {
this.#resolver.resolve();
return value;
},
(e) => {
this.#resolver.reject(e);
throw e;
},
) as U;
} else {
this.#resolver.resolve();
}
return result;
} catch (e) {
this.#resolver.reject(e);
throw e;
}
}
}
export namespace Consumable {
export interface WritableStreamSink<in T> {
start?(
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
write?(
chunk: T,
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
export class WritableStream<in T> extends NativeWritableStream<
Consumable<T>
> {
static readonly WritableStream = class WritableStream<
in T,
> extends NativeWritableStream<Consumable<T>> {
static async write<T>(

@@ -93,3 +34,3 @@ writer: WritableStreamDefaultWriter<Consumable<T>>,

constructor(
sink: WritableStreamSink<T>,
sink: Consumable.WritableStreamSink<T>,
strategy?: QueuingStrategy<T>,

@@ -132,21 +73,7 @@ ) {

}
}
};
export interface ReadableStreamController<T> {
enqueue(chunk: T): Promise<void>;
close(): void;
error(reason: unknown): void;
}
export interface ReadableStreamSource<T> {
start?(
controller: ReadableStreamController<T>,
): void | PromiseLike<void>;
pull?(
controller: ReadableStreamController<T>,
): void | PromiseLike<void>;
cancel?(reason: unknown): void | PromiseLike<void>;
}
export class ReadableStream<T> extends NativeReadableStream<Consumable<T>> {
static readonly ReadableStream = class ReadableStream<
T,
> extends NativeReadableStream<Consumable<T>> {
static async enqueue<T>(

@@ -162,6 +89,8 @@ controller: { enqueue: (chunk: Consumable<T>) => void },

constructor(
source: ReadableStreamSource<T>,
source: Consumable.ReadableStreamSource<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedController: ReadableStreamController<T> | undefined;
let wrappedController:
| Consumable.ReadableStreamController<T>
| undefined;

@@ -208,3 +137,82 @@ let wrappedStrategy: QueuingStrategy<Consumable<T>> | undefined;

}
};
readonly #task: Task;
readonly #resolver: PromiseResolver<void>;
readonly value: T;
readonly consumed: Promise<void>;
constructor(value: T) {
this.#task = createTask("Consumable");
this.value = value;
this.#resolver = new PromiseResolver<void>();
this.consumed = this.#resolver.promise;
}
consume() {
this.#resolver.resolve();
}
error(error: unknown) {
this.#resolver.reject(error);
}
tryConsume<U>(callback: (value: T) => U) {
try {
let result = this.#task.run(() => callback(this.value));
if (isPromiseLike(result)) {
result = result.then(
(value) => {
this.#resolver.resolve();
return value;
},
(e) => {
this.#resolver.reject(e);
throw e;
},
) as U;
} else {
this.#resolver.resolve();
}
return result;
} catch (e) {
this.#resolver.reject(e);
throw e;
}
}
}
export namespace Consumable {
export interface WritableStreamSink<in T> {
start?(
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
write?(
chunk: T,
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
export type WritableStream<in T> = typeof Consumable.WritableStream<T>;
export interface ReadableStreamController<T> {
enqueue(chunk: T): Promise<void>;
close(): void;
error(reason: unknown): void;
}
export interface ReadableStreamSource<T> {
start?(
controller: ReadableStreamController<T>,
): void | PromiseLike<void>;
pull?(
controller: ReadableStreamController<T>,
): void | PromiseLike<void>;
cancel?(reason: unknown): void | PromiseLike<void>;
}
export type ReadableStream<T> = typeof Consumable.ReadableStream<T>;
}

@@ -11,2 +11,3 @@ import { PromiseResolver } from "@yume-chan/async";

import { WritableStream } from "./stream.js";
import { tryClose } from "./try-close.js";
import { WrapReadableStream } from "./wrap-readable.js";

@@ -138,7 +139,3 @@

for (const controller of this.#readableControllers) {
try {
controller.close();
} catch {
// ignore
}
tryClose(controller);
}

@@ -145,0 +142,0 @@

@@ -17,3 +17,4 @@ export * from "./buffered-transform.js";

export * from "./task.js";
export * from "./try-close.js";
export * from "./wrap-readable.js";
export * from "./wrap-writable.js";

@@ -1,101 +0,5 @@

import { Consumable } from "./consumable.js";
import type {
QueuingStrategy,
WritableStreamDefaultController,
} from "./stream.js";
import {
WritableStream as NativeWritableStream,
TransformStream,
} from "./stream.js";
import type { Consumable } from "./consumable.js";
export type MaybeConsumable<T> = T | Consumable<T>;
export namespace MaybeConsumable {
export function getValue<T>(value: MaybeConsumable<T>): T {
return value instanceof Consumable ? value.value : value;
}
export function tryConsume<T, R>(
value: T,
callback: (value: T extends Consumable<infer U> ? U : T) => R,
): R {
if (value instanceof Consumable) {
return value.tryConsume(callback);
} else {
return callback(value as never);
}
}
export class UnwrapStream<T> extends TransformStream<
MaybeConsumable<T>,
T
> {
constructor() {
super({
transform(chunk, controller) {
MaybeConsumable.tryConsume(chunk, (chunk) => {
controller.enqueue(chunk as T);
});
},
});
}
}
export interface WritableStreamSink<in T> {
start?(
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
write?(
chunk: T,
controller: WritableStreamDefaultController,
): void | PromiseLike<void>;
abort?(reason: unknown): void | PromiseLike<void>;
close?(): void | PromiseLike<void>;
}
export class WritableStream<in T> extends NativeWritableStream<
MaybeConsumable<T>
> {
constructor(
sink: WritableStreamSink<T>,
strategy?: QueuingStrategy<T>,
) {
let wrappedStrategy:
| QueuingStrategy<MaybeConsumable<T>>
| undefined;
if (strategy) {
wrappedStrategy = {};
if ("highWaterMark" in strategy) {
wrappedStrategy.highWaterMark = strategy.highWaterMark;
}
if ("size" in strategy) {
wrappedStrategy.size = (chunk) => {
return strategy.size!(
chunk instanceof Consumable ? chunk.value : chunk,
);
};
}
}
super(
{
start(controller) {
return sink.start?.(controller);
},
async write(chunk, controller) {
await MaybeConsumable.tryConsume(chunk, (chunk) =>
sink.write?.(chunk as T, controller),
);
},
abort(reason) {
return sink.abort?.(reason);
},
close() {
return sink.close?.();
},
},
wrappedStrategy,
);
}
}
}
export * as MaybeConsumable from "./maybe-consumable-ns.js";

@@ -35,13 +35,11 @@ import type {

const Global = globalThis as unknown as GlobalExtension;
export const AbortController = Global.AbortController;
export type ReadableStream<out T> = ReadableStreamType<T>;
export const ReadableStream = Global.ReadableStream;
export type WritableStream<in T> = WritableStreamType<T>;
export const WritableStream = Global.WritableStream;
export type TransformStream<I, O> = TransformStreamType<I, O>;
export type TransformStream<I, O> = TransformStreamType<I, O>;
export const TransformStream = Global.TransformStream;
export const {
AbortController,
ReadableStream,
WritableStream,
TransformStream,
} = globalThis as unknown as GlobalExtension;

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is not supported yet

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc