from-node-stream
Advanced tools
| import type { Duplex } from "stream"; | ||
| import { fromReadable } from "./fromReadable"; | ||
| import { fromWritable } from "./fromWritable"; | ||
| /** | ||
| * Converts a Node.js Duplex stream to a Web API TransformStream | ||
| * @template IN - The type of data being written (string or Uint8Array) | ||
| * @template OUT - The type of data being read (string or Uint8Array) | ||
| * @param duplex - The Node.js duplex stream to convert | ||
| * @returns A Web API TransformStream that wraps the Node.js duplex stream | ||
| */ | ||
| export function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| duplex: Duplex | TransformStream | ||
| ): TransformStream<IN, OUT> { | ||
| if (duplex instanceof TransformStream) return duplex; | ||
| return { | ||
| readable: fromReadable<OUT>(duplex), | ||
| writable: fromWritable<IN>(duplex), | ||
| }; | ||
| } |
| /** | ||
| * Test utilities to replace sflow functionality | ||
| */ | ||
| /** | ||
| * Creates a ReadableStream from a string | ||
| */ | ||
| function fromString(input: string): ReadableStream<Uint8Array> { | ||
| const encoder = new TextEncoder(); | ||
| const encoded = encoder.encode(input); | ||
| return new ReadableStream({ | ||
| start(controller) { | ||
| controller.enqueue(encoded); | ||
| controller.close(); | ||
| } | ||
| }); | ||
| } | ||
| /** | ||
| * Converts a ReadableStream to text | ||
| */ | ||
| async function streamToText(stream: ReadableStream): Promise<string> { | ||
| const reader = stream.getReader(); | ||
| const decoder = new TextDecoder(); | ||
| let result = ''; | ||
| try { | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| // Handle different value types | ||
| if (value instanceof Uint8Array || value instanceof ArrayBuffer) { | ||
| result += decoder.decode(value, { stream: true }); | ||
| } else if (typeof value === 'string') { | ||
| result += value; | ||
| } else { | ||
| // Convert to Uint8Array if it's another type | ||
| const encoder = new TextEncoder(); | ||
| result += decoder.decode(encoder.encode(String(value)), { stream: true }); | ||
| } | ||
| } | ||
| result += decoder.decode(); // flush | ||
| } finally { | ||
| reader.releaseLock(); | ||
| } | ||
| return result; | ||
| } | ||
| /** | ||
| * A chainable stream utility class to replace sflow functionality | ||
| */ | ||
| class StreamFlow { | ||
| constructor(private stream: ReadableStream) {} | ||
| /** | ||
| * Pipe through a transform stream | ||
| */ | ||
| by(transform: TransformStream): StreamFlow { | ||
| return new StreamFlow(this.stream.pipeThrough(transform)); | ||
| } | ||
| /** | ||
| * Convert to text | ||
| */ | ||
| async text(): Promise<string> { | ||
| return streamToText(this.stream); | ||
| } | ||
| /** | ||
| * Pipe to a writable stream | ||
| */ | ||
| async pipeTo(writable: WritableStream): Promise<void> { | ||
| return this.stream.pipeTo(writable); | ||
| } | ||
| } | ||
| /** | ||
| * Main sflow replacement function | ||
| */ | ||
| function sflow(input: string | ReadableStream): StreamFlow { | ||
| if (typeof input === 'string') { | ||
| return new StreamFlow(fromString(input)); | ||
| } | ||
| return new StreamFlow(input); | ||
| } | ||
| export default sflow; |
+46
-7
@@ -1,6 +0,5 @@ | ||
| // index.ts | ||
| import { mergeStream } from "sflow"; | ||
| // fromReadable.ts | ||
| function fromReadable(i) { | ||
| if (i instanceof ReadableStream) | ||
| return i; | ||
| return new ReadableStream({ | ||
@@ -18,2 +17,4 @@ start: (c) => { | ||
| function fromWritable(i) { | ||
| if (i instanceof WritableStream) | ||
| return i; | ||
| return new WritableStream({ | ||
@@ -27,3 +28,40 @@ start: (c) => (i.on("error", (err) => c.error(err)), undefined), | ||
| // fromDuplex.ts | ||
| function fromDuplex(duplex) { | ||
| if (duplex instanceof TransformStream) | ||
| return duplex; | ||
| return { | ||
| readable: fromReadable(duplex), | ||
| writable: fromWritable(duplex) | ||
| }; | ||
| } | ||
| // index.ts | ||
| function mergeStream(...streams) { | ||
| return new ReadableStream({ | ||
| start(controller) { | ||
| let activeStreams = streams.length; | ||
| streams.forEach(async (stream) => { | ||
| try { | ||
| const reader = stream.getReader(); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) | ||
| break; | ||
| controller.enqueue(value); | ||
| } | ||
| reader.releaseLock(); | ||
| } catch (error) { | ||
| controller.error(error); | ||
| return; | ||
| } finally { | ||
| activeStreams--; | ||
| if (activeStreams === 0) { | ||
| controller.close(); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
| } | ||
| function fromStdioDropErr(p) { | ||
@@ -47,3 +85,3 @@ return { | ||
| const stdout = fromReadable(p.stdout); | ||
| if (p.stderr?.pipe) | ||
| if (p.stderr) | ||
| fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
@@ -63,3 +101,3 @@ return { | ||
| } else { | ||
| if (p.stderr?.pipe) | ||
| if (p.stderr) | ||
| fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
@@ -75,5 +113,6 @@ return fromStdioDropErr(p); | ||
| fromStdio, | ||
| fromReadable | ||
| fromReadable, | ||
| fromDuplex | ||
| }; | ||
| //# debugId=C247144402D037BB64756E2164756E21 | ||
| //# debugId=11A3A3E91965B03864756E2164756E21 |
| { | ||
| "version": 3, | ||
| "sources": ["../index.ts", "../fromReadable.ts", "../fromWritable.ts"], | ||
| "sources": ["../fromReadable.ts", "../fromWritable.ts", "../fromDuplex.ts", "../index.ts"], | ||
| "sourcesContent": [ | ||
| "import { mergeStream } from \"sflow\"; // TODO: ensure tree shake sflow\nimport { Readable, Writable } from \"stream\";\nimport { fromReadable } from \"./fromReadable\";\nimport { fromWritable } from \"./fromWritable\";\n\n/**\n * Creates a TransformStream from a process's stdio, dropping stderr output\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to stdout, ignoring stderr\n */\nexport function fromStdioDropErr<IN extends string|Uint8Array, OUT extends string|Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | null;\n stdout?: Readable | null;\n stderr?: Readable | null;\n }\n): TransformStream<IN, OUT> {\n return {\n writable: fromWritable<IN>(p.stdin!),\n readable: fromReadable<OUT>(p.stdout!),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, merging stdout and stderr\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to a merged stdout+stderr stream\n */\nexport function fromStdioMergeError<IN extends string|Uint8Array, OUT extends string|Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | null;\n stdout?: Readable | null;\n stderr?: Readable | null;\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n const stderr = fromReadable<OUT>(p.stderr!);\n return {\n writable: stdin,\n readable: mergeStream(stdout, stderr),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object\n * @param options.stderr - The writable stream to forward stderr to\n * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately\n */\nexport function fromStdioAndForwardError<IN extends string|Uint8Array, OUT extends string|Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | null;\n stdout?: Readable | null;\n stderr?: Readable | null;\n },\n { stderr }: {\n stderr: Writable\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n if (p.stderr?.pipe)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return {\n writable: stdin, readable: stdout,\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio with configurable stderr handling\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object for stderr handling\n * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout\n * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior\n */\nexport function fromStdio<IN extends string|Uint8Array, OUT extends string|Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | null;\n stdout?: Readable | null;\n stderr?: Readable | null;\n },\n {\n stderr,\n }: {\n /** specify stderr to forward, or set to null to drop. */\n stderr?: Writable | null;\n } = {}\n): TransformStream<IN, OUT> {\n if (stderr === undefined) {\n return fromStdioMergeError(p);\n\n } else if (stderr === null) {\n return fromStdioDropErr(p);\n } else {\n // forward stderr if stderr is specified\n if (p.stderr?.pipe)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return fromStdioDropErr(p);\n }\n\n}\n\nexport { fromReadable, fromWritable };\n", | ||
| "import type { Readable } from \"stream\";\n\n/**\n * Converts a Node.js Readable stream to a Web API ReadableStream\n * @template T - The type of data being read (string or Uint8Array)\n * @param i - The Node.js readable stream to convert\n * @returns A Web API ReadableStream that wraps the Node.js stream\n */\nexport function fromReadable<T extends string | Uint8Array>(\n i: Readable | NodeJS.ReadableStream\n): ReadableStream<T> {\n return new ReadableStream({\n start: (c) => {\n i.on(\"data\", (data) => c.enqueue(data));\n i.on(\"close\", () => c.close());\n i.on(\"error\", (err) => c.error(err));\n },\n cancel: (reason) => (\n (i as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n });\n}\n", | ||
| "import type { Writable } from \"stream\";\n\n/**\n * Converts a Node.js Writable stream to a Web API WritableStream\n * @template T - The type of data being written (string or Uint8Array)\n * @param i - The Node.js writable stream to convert\n * @returns A Web API WritableStream that wraps the Node.js stream\n */\nexport function fromWritable<T extends string | Uint8Array>(\n i: Writable | NodeJS.WritableStream\n): WritableStream<T> {\n return new WritableStream({\n start: (c) => (i.on(\"error\", (err) => c.error(err)), undefined),\n abort: (reason) => (\n (i as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n write: (data: string | Uint8Array, c) => (i.write(data), undefined),\n close: () => (i.end(), undefined),\n });\n}\n" | ||
| "import type { Readable } from \"stream\";\n\n/**\n * Converts a Node.js Readable stream to a Web API ReadableStream\n * @template T - The type of data being read (string or Uint8Array)\n * @param i - The Node.js readable stream to convert\n * @returns A Web API ReadableStream that wraps the Node.js stream\n */\nexport function fromReadable<T extends string | Uint8Array>(\n i: Readable | NodeJS.ReadableStream | ReadableStream\n): ReadableStream<T> {\n if (i instanceof ReadableStream) return i\n return new ReadableStream({\n start: (c) => {\n i.on(\"data\", (data) => c.enqueue(data));\n i.on(\"close\", () => c.close());\n i.on(\"error\", (err) => c.error(err));\n },\n cancel: (reason) => (\n (i as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n });\n}\n", | ||
| "import type { Writable } from \"stream\";\n\n/**\n * Converts a Node.js Writable stream to a Web API WritableStream\n * @template T - The type of data being written (string or Uint8Array)\n * @param i - The Node.js writable stream to convert\n * @returns A Web API WritableStream that wraps the Node.js stream\n */\nexport function fromWritable<T extends string | Uint8Array>(\n i: Writable | NodeJS.WritableStream | WritableStream\n): WritableStream<T> {\n if(i instanceof WritableStream) return i\n return new WritableStream({\n start: (c) => (i.on(\"error\", (err) => c.error(err)), undefined),\n abort: (reason) => (\n (i as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n write: (data: string | Uint8Array, c) => (i.write(data), undefined),\n close: () => (i.end(), undefined),\n });\n}\n", | ||
| "import type { Duplex } from \"stream\";\nimport { fromReadable } from \"./fromReadable\";\nimport { fromWritable } from \"./fromWritable\";\n\n/**\n * Converts a Node.js Duplex stream to a Web API TransformStream\n * @template IN - The type of data being written (string or Uint8Array)\n * @template OUT - The type of data being read (string or Uint8Array)\n * @param duplex - The Node.js duplex stream to convert\n * @returns A Web API TransformStream that wraps the Node.js duplex stream\n */\nexport function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n duplex: Duplex | TransformStream\n): TransformStream<IN, OUT> {\n if (duplex instanceof TransformStream) return duplex;\n \n return {\n readable: fromReadable<OUT>(duplex),\n writable: fromWritable<IN>(duplex),\n };\n}", | ||
| "// Native implementation to replace sflow mergeStream\nfunction mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> {\n return new ReadableStream<T>({\n start(controller) {\n let activeStreams = streams.length;\n\n streams.forEach(async (stream) => {\n try {\n const reader = stream.getReader();\n\n while (true) {\n const { done, value } = await reader.read();\n if (done) break;\n controller.enqueue(value);\n }\n\n reader.releaseLock();\n } catch (error) {\n controller.error(error);\n return;\n } finally {\n activeStreams--;\n if (activeStreams === 0) {\n controller.close();\n }\n }\n });\n }\n });\n}\nimport { Readable, Writable } from \"stream\";\nimport { fromReadable } from \"./fromReadable\";\nimport { fromWritable } from \"./fromWritable\";\nimport { fromDuplex } from \"./fromDuplex\";\n\n/**\n * Creates a TransformStream from a process's stdio, dropping stderr output\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to stdout, ignoring stderr\n */\nexport function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n return {\n writable: fromWritable<IN>(p.stdin!),\n readable: fromReadable<OUT>(p.stdout!),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, merging stdout and stderr\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to a merged stdout+stderr stream\n */\nexport function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n const stderr = fromReadable<OUT>(p.stderr!);\n return {\n writable: stdin,\n readable: mergeStream(stdout, stderr),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object\n * @param options.stderr - The writable stream to forward stderr to\n * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately\n */\nexport function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n { stderr }: {\n stderr: Writable | WritableStream\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return {\n writable: stdin, readable: stdout,\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio with configurable stderr handling\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object for stderr handling\n * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout\n * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior\n */\nexport function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n {\n stderr,\n }: {\n /** specify stderr to forward, or set to null to drop. */\n stderr?: Writable | WritableStream | null;\n } = {}\n): TransformStream<IN, OUT> {\n if (stderr === undefined) {\n return fromStdioMergeError(p);\n\n } else if (stderr === null) {\n return fromStdioDropErr(p);\n } else {\n // forward stderr if stderr is specified\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return fromStdioDropErr(p);\n }\n\n}\n\nexport { fromReadable, fromWritable, fromDuplex };\n" | ||
| ], | ||
| "mappings": ";AAAA;;;ACQO,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,MAAM;AAAA,MACZ,EAAE,GAAG,QAAQ,CAAC,SAAS,EAAE,QAAQ,IAAI,CAAC;AAAA,MACtC,EAAE,GAAG,SAAS,MAAM,EAAE,MAAM,CAAC;AAAA,MAC7B,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC;AAAA;AAAA,IAErC,QAAQ,CAAC,YACN,EAAyD,UACxD,MACF,GACA;AAAA,EAEJ,CAAC;AAAA;;;ACfI,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,OAAO,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC,GAAG;AAAA,IACrD,OAAO,CAAC,YACL,EAAyD,UACxD,MACF,GACA;AAAA,IAEF,OAAO,CAAC,MAA2B,OAAO,EAAE,MAAM,IAAI,GAAG;AAAA,IACzD,OAAO,OAAO,EAAE,IAAI,GAAG;AAAA,EACzB,CAAC;AAAA;;;AFTI,SAAS,gBAA6E,CAE3F,GAK0B;AAAA,EAC1B,OAAO;AAAA,IACL,UAAU,aAAiB,EAAE,KAAM;AAAA,IACnC,UAAU,aAAkB,EAAE,MAAO;AAAA,EACvC;AAAA;AAUK,SAAS,mBAAgF,CAE9F,GAK0B;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,OAAO;AAAA,IACL,UAAU;AAAA,IACV,UAAU,YAAY,QAAQ,MAAM;AAAA,EACtC;AAAA;AAYK,SAAS,wBAAqF,CAEnG,KAKE,UAGwB;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,IAAI,EAAE,QAAQ;AAAA,IACZ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,EACpD,OAAO;AAAA,IACL,UAAU;AAAA,IAAO,UAAU;AAAA,EAC7B;AAAA;AAYK,SAAS,SAAsE,CAEpF;AAAA,EAME;AAAA,IAIE,CAAC,GACqB;AAAA,EAC1B,IAAI,WAAW,WAAW;AAAA,IACxB,OAAO,oBAAoB,CAAC;AAAA,EAE9B,EAAO,SAAI,WAAW,MAAM;AAAA,IAC1B,OAAO,iBAAiB,CAAC;AAAA,EAC3B,EAAO;AAAA,IAEL,IAAI,EAAE,QAAQ;AAAA,MACZ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,IACpD,OAAO,iBAAiB,CAAC;AAAA;AAAA;", | ||
| "debugId": "C247144402D037BB64756E2164756E21", | ||
| "mappings": ";AAQO,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,IAAI,aAAa;AAAA,IAAgB,OAAO;AAAA,EACxC,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,MAAM;AAAA,MACZ,EAAE,GAAG,QAAQ,CAAC,SAAS,EAAE,QAAQ,IAAI,CAAC;AAAA,MACtC,EAAE,GAAG,SAAS,MAAM,EAAE,MAAM,CAAC;AAAA,MAC7B,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC;AAAA;AAAA,IAErC,QAAQ,CAAC,YACN,EAAyD,UACxD,MACF,GACA;AAAA,EAEJ,CAAC;AAAA;;;AChBI,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,IAAG,aAAa;AAAA,IAAgB,OAAO;AAAA,EACvC,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,OAAO,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC,GAAG;AAAA,IACrD,OAAO,CAAC,YACL,EAAyD,UACxD,MACF,GACA;AAAA,IAEF,OAAO,CAAC,MAA2B,OAAO,EAAE,MAAM,IAAI,GAAG;AAAA,IACzD,OAAO,OAAO,EAAE,IAAI,GAAG;AAAA,EACzB,CAAC;AAAA;;;ACXI,SAAS,UAA2E,CACzF,QAC0B;AAAA,EAC1B,IAAI,kBAAkB;AAAA,IAAiB,OAAO;AAAA,EAE9C,OAAO;AAAA,IACL,UAAU,aAAkB,MAAM;AAAA,IAClC,UAAU,aAAiB,MAAM;AAAA,EACnC;AAAA;;;AClBF,SAAS,WAAc,IAAI,SAAiD;AAAA,EAC1E,OAAO,IAAI,eAAkB;AAAA,IAC3B,KAAK,CAAC,YAAY;AAAA,MAChB,IAAI,gBAAgB,QAAQ;AAAA,MAE5B,QAAQ,QAAQ,OAAO,WAAW;AAAA,QAChC,IAAI;AAAA,UACF,MAAM,SAAS,OAAO,UAAU;AAAA,UAEhC,OAAO,MAAM;AAAA,YACX,QAAQ,MAAM,UAAU,MAAM,OAAO,KAAK;AAAA,YAC1C,IAAI;AAAA,cAAM;AAAA,YACV,WAAW,QAAQ,KAAK;AAAA,UAC1B;AAAA,UAEA,OAAO,YAAY;AAAA,UACnB,OAAO,OAAO;AAAA,UACd,WAAW,MAAM,KAAK;AAAA,UACtB;AAAA,kBACA;AAAA,UACA;AAAA,UACA,IAAI,kBAAkB,GAAG;AAAA,YACvB,WAAW,MAAM;AAAA,UACnB;AAAA;AAAA,OAEH;AAAA;AAAA,EAEL,CAAC;AAAA;AAcI,SAAS,gBAAiF,CAE/F,GAK0B;AAAA,EAC1B,OAAO;AAAA,IACL,UAAU,aAAiB,EAAE,KAAM;AAAA,IACnC,UAAU,aAAkB,EAAE,MAAO;AAAA,EACvC;AAAA;AAUK,SAAS,mBAAoF,CAElG,GAK0B;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,OAAO;AAAA,IACL,UAAU;AAAA,IACV,UAAU,YAAY,QAAQ,MAAM;AAAA,EACtC;AAAA;AAYK,SAAS,wBAAyF,CAEvG,KAKE,UAGwB;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,IAAI,EAAE;AAAA,IACJ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,EACpD,OAAO;AAAA,IACL,UAAU;AAAA,IAAO,UAAU;AAAA,EAC7B;AAAA;AAYK,SAAS,SAA0E,CAExF;AAAA,EAME;AAAA,IAIE,CAAC,GACqB;AAAA,EAC1B,IAAI,WAAW,WAAW;AAAA,IACxB,OAAO,oBAAoB,CAAC;AAAA,EAE9B,EAAO,SAAI,WAAW,MAAM;AAAA,IAC1B,OAAO,iBAAiB,CAAC;AAAA,EAC3B,EAAO;AAAA,IAEL,IAAI,EAAE;AAAA,MACJ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,IACpD,OAAO,iBAAiB,CAAC;AAAA;AAAA;", | ||
| "debugId": "11A3A3E91965B03864756E2164756E21", | ||
| "names": [] | ||
| } |
+2
-1
@@ -10,4 +10,5 @@ import type { Readable } from "stream"; | ||
| export function fromReadable<T extends string | Uint8Array>( | ||
| i: Readable | NodeJS.ReadableStream | ||
| i: Readable | NodeJS.ReadableStream | ReadableStream | ||
| ): ReadableStream<T> { | ||
| if (i instanceof ReadableStream) return i | ||
| return new ReadableStream({ | ||
@@ -14,0 +15,0 @@ start: (c) => { |
+2
-1
@@ -10,4 +10,5 @@ import type { Writable } from "stream"; | ||
| export function fromWritable<T extends string | Uint8Array>( | ||
| i: Writable | NodeJS.WritableStream | ||
| i: Writable | NodeJS.WritableStream | WritableStream | ||
| ): WritableStream<T> { | ||
| if(i instanceof WritableStream) return i | ||
| return new WritableStream({ | ||
@@ -14,0 +15,0 @@ start: (c) => (i.on("error", (err) => c.error(err)), undefined), |
+28
-1
| import { exec } from "child_process"; | ||
| import sflow from "sflow"; | ||
| import { Transform } from "stream"; | ||
| import sflow from "./test-utils"; | ||
| import { fromStdioDropErr, fromStdioMergeError } from "."; | ||
| import { fromDuplex } from "./fromDuplex"; | ||
| import { fromReadable } from "./fromReadable"; | ||
@@ -42,1 +44,26 @@ import { fromWritable } from "./fromWritable"; | ||
| }); | ||
| it("fromDuplex works with Transform stream", async () => { | ||
| const transform = new Transform({ | ||
| transform(chunk, encoding, callback) { | ||
| this.push(chunk.toString().toUpperCase()); | ||
| callback(); | ||
| } | ||
| }); | ||
| const output = await sflow("hello world\n") | ||
| .by(fromDuplex(transform)) | ||
| .text(); | ||
| expect(output).toBe("HELLO WORLD\n"); | ||
| }); | ||
| it("fromDuplex works with existing TransformStream", async () => { | ||
| const upperCaseTransform = new TransformStream({ | ||
| transform(chunk, controller) { | ||
| controller.enqueue(chunk.toString().toUpperCase()); | ||
| } | ||
| }); | ||
| const result = fromDuplex(upperCaseTransform); | ||
| expect(result).toBe(upperCaseTransform); | ||
| }); |
+52
-22
@@ -1,5 +0,35 @@ | ||
| import { mergeStream } from "sflow"; // TODO: ensure tree shake sflow | ||
| // Native implementation to replace sflow mergeStream | ||
| function mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> { | ||
| return new ReadableStream<T>({ | ||
| start(controller) { | ||
| let activeStreams = streams.length; | ||
| streams.forEach(async (stream) => { | ||
| try { | ||
| const reader = stream.getReader(); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| controller.enqueue(value); | ||
| } | ||
| reader.releaseLock(); | ||
| } catch (error) { | ||
| controller.error(error); | ||
| return; | ||
| } finally { | ||
| activeStreams--; | ||
| if (activeStreams === 0) { | ||
| controller.close(); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
| } | ||
| import { Readable, Writable } from "stream"; | ||
| import { fromReadable } from "./fromReadable"; | ||
| import { fromWritable } from "./fromWritable"; | ||
| import { fromDuplex } from "./fromDuplex"; | ||
@@ -13,8 +43,8 @@ /** | ||
| */ | ||
| export function fromStdioDropErr<IN extends string|Uint8Array, OUT extends string|Uint8Array>( | ||
| export function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | null; | ||
| stdout?: Readable | null; | ||
| stderr?: Readable | null; | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| } | ||
@@ -35,8 +65,8 @@ ): TransformStream<IN, OUT> { | ||
| */ | ||
| export function fromStdioMergeError<IN extends string|Uint8Array, OUT extends string|Uint8Array>( | ||
| export function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | null; | ||
| stdout?: Readable | null; | ||
| stderr?: Readable | null; | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| } | ||
@@ -62,11 +92,11 @@ ): TransformStream<IN, OUT> { | ||
| */ | ||
| export function fromStdioAndForwardError<IN extends string|Uint8Array, OUT extends string|Uint8Array>( | ||
| export function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | null; | ||
| stdout?: Readable | null; | ||
| stderr?: Readable | null; | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| }, | ||
| { stderr }: { | ||
| stderr: Writable | ||
| stderr: Writable | WritableStream | ||
| } | ||
@@ -76,3 +106,3 @@ ): TransformStream<IN, OUT> { | ||
| const stdout = fromReadable<OUT>(p.stdout!); | ||
| if (p.stderr?.pipe) | ||
| if (p.stderr) | ||
| fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
@@ -93,8 +123,8 @@ return { | ||
| */ | ||
| export function fromStdio<IN extends string|Uint8Array, OUT extends string|Uint8Array>( | ||
| export function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | null; | ||
| stdout?: Readable | null; | ||
| stderr?: Readable | null; | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| }, | ||
@@ -105,3 +135,3 @@ { | ||
| /** specify stderr to forward, or set to null to drop. */ | ||
| stderr?: Writable | null; | ||
| stderr?: Writable | WritableStream | null; | ||
| } = {} | ||
@@ -116,3 +146,3 @@ ): TransformStream<IN, OUT> { | ||
| // forward stderr if stderr is specified | ||
| if (p.stderr?.pipe) | ||
| if (p.stderr) | ||
| fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
@@ -124,2 +154,2 @@ return fromStdioDropErr(p); | ||
| export { fromReadable, fromWritable }; | ||
| export { fromReadable, fromWritable, fromDuplex }; |
+6
-9
| { | ||
| "name": "from-node-stream", | ||
| "version": "0.1.0", | ||
| "version": "0.1.2", | ||
| "description": "convert nodejs-stream into webstream", | ||
@@ -10,5 +10,6 @@ "keywords": [ | ||
| "Writable", | ||
| "Duplex", | ||
| "ReadableStream", | ||
| "WritableStream", | ||
| "TransformSteam" | ||
| "TransformStream" | ||
| ], | ||
@@ -38,5 +39,3 @@ "homepage": "https://github.com/snomiao/from-node-stream#readme", | ||
| "scripts": { | ||
| "build": "bun build:type && bun build:js", | ||
| "build:js": "bun build . --outdir=dist --sourcemap=external --packages=external", | ||
| "build:type": "tsc -d --noEmit false --emitDeclarationOnly --outDir dist", | ||
| "build": "bun build ./index.ts --outdir=dist --sourcemap=external", | ||
| "prerelease": "bun run build && bun run test", | ||
@@ -47,6 +46,5 @@ "release": "bunx standard-version && git push --follow-tags && npm publish", | ||
| }, | ||
| "dependencies": { | ||
| "phpdie": "^1.2.14" | ||
| }, | ||
| "dependencies": {}, | ||
| "devDependencies": { | ||
| "phpdie": "^1.2.14", | ||
| "@types/bun": "^1.1.11", | ||
@@ -57,5 +55,4 @@ "@types/jest": "^29.5.13", | ||
| "semantic-release": "^24.2.1", | ||
| "sflow": "^1.19.1", | ||
| "typescript": "^5.6.3" | ||
| } | ||
| } |
| export {}; |
| import type { Readable } from "stream"; | ||
| /** | ||
| * Converts a Node.js Readable stream to a Web API ReadableStream | ||
| * @template T - The type of data being read (string or Uint8Array) | ||
| * @param i - The Node.js readable stream to convert | ||
| * @returns A Web API ReadableStream that wraps the Node.js stream | ||
| */ | ||
| export declare function fromReadable<T extends string | Uint8Array>(i: Readable | NodeJS.ReadableStream): ReadableStream<T>; |
| import type { Writable } from "stream"; | ||
| /** | ||
| * Converts a Node.js Writable stream to a Web API WritableStream | ||
| * @template T - The type of data being written (string or Uint8Array) | ||
| * @param i - The Node.js writable stream to convert | ||
| * @returns A Web API WritableStream that wraps the Node.js stream | ||
| */ | ||
| export declare function fromWritable<T extends string | Uint8Array>(i: Writable | NodeJS.WritableStream): WritableStream<T>; |
| import { Readable, Writable } from "stream"; | ||
| import { fromReadable } from "./fromReadable"; | ||
| import { fromWritable } from "./fromWritable"; | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, dropping stderr output | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @returns A TransformStream that connects stdin to stdout, ignoring stderr | ||
| */ | ||
| export declare function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | null; | ||
| stdout?: Readable | null; | ||
| stderr?: Readable | null; | ||
| }): TransformStream<IN, OUT>; | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, merging stdout and stderr | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @returns A TransformStream that connects stdin to a merged stdout+stderr stream | ||
| */ | ||
| export declare function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | null; | ||
| stdout?: Readable | null; | ||
| stderr?: Readable | null; | ||
| }): TransformStream<IN, OUT>; | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @param options - Configuration object | ||
| * @param options.stderr - The writable stream to forward stderr to | ||
| * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately | ||
| */ | ||
| export declare function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | null; | ||
| stdout?: Readable | null; | ||
| stderr?: Readable | null; | ||
| }, { stderr }: { | ||
| stderr: Writable; | ||
| }): TransformStream<IN, OUT>; | ||
| /** | ||
| * Creates a TransformStream from a process's stdio with configurable stderr handling | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @param options - Configuration object for stderr handling | ||
| * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout | ||
| * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior | ||
| */ | ||
| export declare function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | null; | ||
| stdout?: Readable | null; | ||
| stderr?: Readable | null; | ||
| }, { stderr, }?: { | ||
| /** specify stderr to forward, or set to null to drop. */ | ||
| stderr?: Writable | null; | ||
| }): TransformStream<IN, OUT>; | ||
| export { fromReadable, fromWritable }; |
| export {}; |
30240
15.88%0
-100%445
28.61%11
-21.43%- Removed
- Removed
- Removed
- Removed