from-node-stream
Advanced tools
+102
| import { Duplex, Readable, Writable } from "stream"; | ||
| //#region ts/fromReadable.d.ts | ||
| /** | ||
| * Converts a Node.js Readable stream to a Web API ReadableStream | ||
| * @template T - The type of data being read (string or Uint8Array) | ||
| * @param i - The Node.js readable stream to convert | ||
| * @returns A Web API ReadableStream that wraps the Node.js stream | ||
| */ | ||
| declare function fromReadable<T extends string | Uint8Array>(i: Readable | NodeJS.ReadableStream | ReadableStream): ReadableStream<T>; | ||
| //#endregion | ||
| //#region ts/fromWritable.d.ts | ||
| /** | ||
| * Converts a Node.js Writable stream to a Web API WritableStream | ||
| * @template T - The type of data being written (string or Uint8Array) | ||
| * @param i - The Node.js writable stream to convert | ||
| * @returns A Web API WritableStream that wraps the Node.js stream | ||
| */ | ||
| declare function fromWritable<T extends string | Uint8Array>(i: Writable | NodeJS.WritableStream | WritableStream): WritableStream<T>; | ||
| //#endregion | ||
| //#region ts/fromDuplex.d.ts | ||
| /** | ||
| * Converts a Node.js Duplex stream to a Web API TransformStream | ||
| * @template IN - The type of data being written (string or Uint8Array) | ||
| * @template OUT - The type of data being read (string or Uint8Array) | ||
| * @param duplex - The Node.js duplex stream to convert | ||
| * @returns A Web API TransformStream that wraps the Node.js duplex stream | ||
| */ | ||
| declare function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(duplex: Duplex | TransformStream): TransformStream<IN, OUT>; | ||
| //#endregion | ||
| //#region ts/index.d.ts | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, dropping stderr output | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @returns A TransformStream that connects stdin to stdout, ignoring stderr | ||
| */ | ||
| declare function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(/** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| }): TransformStream<IN, OUT>; | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, merging stdout and stderr | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @returns A TransformStream that connects stdin to a merged stdout+stderr stream | ||
| */ | ||
| declare function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(/** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| }): TransformStream<IN, OUT>; | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @param options - Configuration object | ||
| * @param options.stderr - The writable stream to forward stderr to | ||
| * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately | ||
| */ | ||
| declare function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(/** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| }, { | ||
| stderr | ||
| }: { | ||
| stderr: Writable | WritableStream; | ||
| }): TransformStream<IN, OUT>; | ||
| /** | ||
| * Creates a TransformStream from a process's stdio with configurable stderr handling | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @param options - Configuration object for stderr handling | ||
| * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout | ||
| * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior | ||
| */ | ||
| declare function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(/** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| }, { | ||
| stderr | ||
| }?: { | ||
| /** specify stderr to forward, or set to null to drop. */stderr?: Writable | WritableStream | null; | ||
| }): TransformStream<IN, OUT>; | ||
| //#endregion | ||
| export { fromDuplex, fromReadable, fromStdio, fromStdioAndForwardError, fromStdioDropErr, fromStdioMergeError, fromWritable }; | ||
| //# sourceMappingURL=index.d.mts.map |
+142
| //#region ts/fromReadable.ts | ||
| /** | ||
| * Converts a Node.js Readable stream to a Web API ReadableStream | ||
| * @template T - The type of data being read (string or Uint8Array) | ||
| * @param i - The Node.js readable stream to convert | ||
| * @returns A Web API ReadableStream that wraps the Node.js stream | ||
| */ | ||
| function fromReadable(i) { | ||
| if (i instanceof ReadableStream) return i; | ||
| const stream = i; | ||
| return new ReadableStream({ | ||
| start: (c) => { | ||
| stream.on("data", (data) => c.enqueue(data)); | ||
| stream.on("close", () => c.close()); | ||
| stream.on("error", (err) => c.error(err)); | ||
| }, | ||
| cancel: (reason) => (stream.destroy?.(reason), void 0) | ||
| }); | ||
| } | ||
| //#endregion | ||
| //#region ts/fromWritable.ts | ||
| /** | ||
| * Converts a Node.js Writable stream to a Web API WritableStream | ||
| * @template T - The type of data being written (string or Uint8Array) | ||
| * @param i - The Node.js writable stream to convert | ||
| * @returns A Web API WritableStream that wraps the Node.js stream | ||
| */ | ||
| function fromWritable(i) { | ||
| if (i instanceof WritableStream) return i; | ||
| const stream = i; | ||
| return new WritableStream({ | ||
| start: (c) => (stream.on("error", (err) => c.error(err)), void 0), | ||
| abort: (reason) => (stream.destroy?.(reason), void 0), | ||
| write: (data, c) => (stream.write(data), void 0), | ||
| close: () => (stream.end(), void 0) | ||
| }); | ||
| } | ||
| //#endregion | ||
| //#region ts/fromDuplex.ts | ||
| /** | ||
| * Converts a Node.js Duplex stream to a Web API TransformStream | ||
| * @template IN - The type of data being written (string or Uint8Array) | ||
| * @template OUT - The type of data being read (string or Uint8Array) | ||
| * @param duplex - The Node.js duplex stream to convert | ||
| * @returns A Web API TransformStream that wraps the Node.js duplex stream | ||
| */ | ||
| function fromDuplex(duplex) { | ||
| if (duplex instanceof TransformStream) return duplex; | ||
| return { | ||
| readable: fromReadable(duplex), | ||
| writable: fromWritable(duplex) | ||
| }; | ||
| } | ||
| //#endregion | ||
| //#region ts/index.ts | ||
| function mergeStream(...streams) { | ||
| return new ReadableStream({ start(controller) { | ||
| let activeStreams = streams.length; | ||
| streams.forEach(async (stream) => { | ||
| try { | ||
| const reader = stream.getReader(); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| controller.enqueue(value); | ||
| } | ||
| reader.releaseLock(); | ||
| } catch (error) { | ||
| controller.error(error); | ||
| return; | ||
| } finally { | ||
| activeStreams--; | ||
| if (activeStreams === 0) controller.close(); | ||
| } | ||
| }); | ||
| } }); | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, dropping stderr output | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @returns A TransformStream that connects stdin to stdout, ignoring stderr | ||
| */ | ||
| function fromStdioDropErr(p) { | ||
| return { | ||
| writable: fromWritable(p.stdin), | ||
| readable: fromReadable(p.stdout) | ||
| }; | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, merging stdout and stderr | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @returns A TransformStream that connects stdin to a merged stdout+stderr stream | ||
| */ | ||
| function fromStdioMergeError(p) { | ||
| return { | ||
| writable: fromWritable(p.stdin), | ||
| readable: mergeStream(fromReadable(p.stdout), fromReadable(p.stderr)) | ||
| }; | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @param options - Configuration object | ||
| * @param options.stderr - The writable stream to forward stderr to | ||
| * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately | ||
| */ | ||
| function fromStdioAndForwardError(p, { stderr }) { | ||
| const stdin = fromWritable(p.stdin); | ||
| const stdout = fromReadable(p.stdout); | ||
| if (p.stderr) fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
| return { | ||
| writable: stdin, | ||
| readable: stdout | ||
| }; | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio with configurable stderr handling | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @param options - Configuration object for stderr handling | ||
| * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout | ||
| * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior | ||
| */ | ||
| function fromStdio(p, { stderr } = {}) { | ||
| if (stderr === void 0) return fromStdioMergeError(p); | ||
| else if (stderr === null) return fromStdioDropErr(p); | ||
| else { | ||
| if (p.stderr) fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
| return fromStdioDropErr(p); | ||
| } | ||
| } | ||
| //#endregion | ||
| export { fromDuplex, fromReadable, fromStdio, fromStdioAndForwardError, fromStdioDropErr, fromStdioMergeError, fromWritable }; | ||
| //# sourceMappingURL=index.mjs.map |
| {"version":3,"file":"index.mjs","names":[],"sources":["../ts/fromReadable.ts","../ts/fromWritable.ts","../ts/fromDuplex.ts","../ts/index.ts"],"sourcesContent":["import type { Readable } from \"stream\";\n\n/**\n * Converts a Node.js Readable stream to a Web API ReadableStream\n * @template T - The type of data being read (string or Uint8Array)\n * @param i - The Node.js readable stream to convert\n * @returns A Web API ReadableStream that wraps the Node.js stream\n */\nexport function fromReadable<T extends string | Uint8Array>(\n i: Readable | NodeJS.ReadableStream | ReadableStream\n): ReadableStream<T> {\n if (i instanceof ReadableStream) return i\n const stream = i as Readable;\n return new ReadableStream({\n start: (c) => {\n stream.on(\"data\", (data: T) => c.enqueue(data));\n stream.on(\"close\", () => c.close());\n stream.on(\"error\", (err: Error) => c.error(err));\n },\n cancel: (reason) => (\n (stream as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n });\n}\n","import type { Writable } from \"stream\";\n\n/**\n * Converts a Node.js Writable stream to a Web API WritableStream\n * @template T - The type of data being written (string or Uint8Array)\n * @param i - The Node.js writable stream to convert\n * @returns A Web API WritableStream that wraps the Node.js stream\n */\nexport function fromWritable<T extends string | Uint8Array>(\n i: Writable | NodeJS.WritableStream | WritableStream\n): WritableStream<T> {\n if(i instanceof WritableStream) return i\n const stream = i as Writable;\n return new WritableStream({\n start: (c) => (stream.on(\"error\", (err: Error) => c.error(err)), undefined),\n abort: (reason) => (\n (stream as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n write: (data: string | Uint8Array, c) => (stream.write(data), undefined),\n close: () => (stream.end(), undefined),\n });\n}\n","import type { Duplex } from \"stream\";\nimport { fromReadable } from \"./fromReadable.ts\";\nimport { fromWritable } from \"./fromWritable.ts\";\n\n/**\n * Converts a Node.js Duplex stream to a Web API TransformStream\n * @template IN - The type of data being written (string or Uint8Array)\n * @template OUT - The type of data being read (string or Uint8Array)\n * @param duplex - The Node.js duplex stream to convert\n * @returns A Web API TransformStream that wraps the Node.js duplex stream\n */\nexport function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n duplex: Duplex | TransformStream\n): TransformStream<IN, OUT> {\n if (duplex instanceof TransformStream) return duplex;\n \n return {\n readable: fromReadable<OUT>(duplex),\n writable: fromWritable<IN>(duplex),\n };\n}","import type { Readable, Writable } from \"stream\";\nimport { fromReadable } from \"./fromReadable.ts\";\nimport { fromWritable } from \"./fromWritable.ts\";\nimport { fromDuplex } from \"./fromDuplex.ts\";\n\n// Native implementation to replace sflow mergeStream\nfunction mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> {\n return new ReadableStream<T>({\n start(controller) {\n let activeStreams = streams.length;\n\n streams.forEach(async (stream) => {\n try {\n const reader = stream.getReader();\n\n while (true) {\n const { done, value } = await reader.read();\n if (done) break;\n controller.enqueue(value);\n }\n\n reader.releaseLock();\n } catch (error) {\n controller.error(error);\n return;\n } finally {\n activeStreams--;\n if (activeStreams === 0) {\n controller.close();\n }\n }\n });\n }\n });\n}\n\n/**\n * Creates a TransformStream from a process's stdio, dropping stderr output\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to stdout, ignoring stderr\n */\nexport function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n return {\n writable: fromWritable<IN>(p.stdin!),\n readable: fromReadable<OUT>(p.stdout!),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, merging stdout and stderr\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to a merged stdout+stderr stream\n */\nexport function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n const stderr = fromReadable<OUT>(p.stderr!);\n return {\n writable: stdin,\n readable: mergeStream(stdout, stderr),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object\n * @param options.stderr - The writable stream to forward stderr to\n * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately\n */\nexport function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n { stderr }: {\n stderr: Writable | WritableStream\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return {\n writable: stdin, readable: stdout,\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio with configurable stderr handling\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object for stderr handling\n * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout\n * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior\n */\nexport function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n {\n stderr,\n }: {\n /** specify stderr to forward, or set to null to drop. */\n stderr?: Writable | WritableStream | null;\n } = {}\n): TransformStream<IN, OUT> {\n if (stderr === undefined) {\n return fromStdioMergeError(p);\n\n } else if (stderr === null) {\n return fromStdioDropErr(p);\n } else {\n // forward stderr if stderr is specified\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return fromStdioDropErr(p);\n }\n\n}\n\nexport { fromReadable, fromWritable, fromDuplex };\n"],"mappings":";;;;;;;AAQA,SAAgB,aACd,GACmB;AACnB,KAAI,aAAa,eAAgB,QAAO;CACxC,MAAM,SAAS;AACf,QAAO,IAAI,eAAe;EACxB,QAAQ,MAAM;AACZ,UAAO,GAAG,SAAS,SAAY,EAAE,QAAQ,KAAK,CAAC;AAC/C,UAAO,GAAG,eAAe,EAAE,OAAO,CAAC;AACnC,UAAO,GAAG,UAAU,QAAe,EAAE,MAAM,IAAI,CAAC;;EAElD,SAAS,YACN,OAA8D,UAC7D,OACD,EACD,KAAA;EAEH,CAAC;;;;;;;;;;ACjBJ,SAAgB,aACd,GACmB;AACnB,KAAG,aAAa,eAAgB,QAAO;CACvC,MAAM,SAAS;AACf,QAAO,IAAI,eAAe;EACxB,QAAQ,OAAO,OAAO,GAAG,UAAU,QAAe,EAAE,MAAM,IAAI,CAAC,EAAE,KAAA;EACjE,QAAQ,YACL,OAA8D,UAC7D,OACD,EACD,KAAA;EAEF,QAAQ,MAA2B,OAAO,OAAO,MAAM,KAAK,EAAE,KAAA;EAC9D,cAAc,OAAO,KAAK,EAAE,KAAA;EAC7B,CAAC;;;;;;;;;;;ACZJ,SAAgB,WACd,QAC0B;AAC1B,KAAI,kBAAkB,gBAAiB,QAAO;AAE9C,QAAO;EACL,UAAU,aAAkB,OAAO;EACnC,UAAU,aAAiB,OAAO;EACnC;;;;ACbH,SAAS,YAAe,GAAG,SAAiD;AAC1E,QAAO,IAAI,eAAkB,EAC3B,MAAM,YAAY;EAChB,IAAI,gBAAgB,QAAQ;AAE5B,UAAQ,QAAQ,OAAO,WAAW;AAChC,OAAI;IACF,MAAM,SAAS,OAAO,WAAW;AAEjC,WAAO,MAAM;KACX,MAAM,EAAE,MAAM,UAAU,MAAM,OAAO,MAAM;AAC3C,SAAI,KAAM;AACV,gBAAW,QAAQ,MAAM;;AAG3B,WAAO,aAAa;YACb,OAAO;AACd,eAAW,MAAM,MAAM;AACvB;aACQ;AACR;AACA,QAAI,kBAAkB,EACpB,YAAW,OAAO;;IAGtB;IAEL,CAAC;;;;;;;;;AAUJ,SAAgB,iBAEd,GAK0B;AAC1B,QAAO;EACL,UAAU,aAAiB,EAAE,MAAO;EACpC,UAAU,aAAkB,EAAE,OAAQ;EACvC;;;;;;;;;AAUH,SAAgB,oBAEd,GAK0B;AAI1B,QAAO;EACL,UAJY,aAAiB,EAAE,MAAO;EAKtC,UAAU,YAJG,aAAkB,EAAE,OAAQ,EAC5B,aAAkB,EAAE,OAAQ,CAGJ;EACtC;;;;;;;;;;;AAYH,SAAgB,yBAEd,GAKA,EAAE,UAGwB;CAC1B,MAAM,QAAQ,aAAiB,EAAE,MAAO;CACxC,MAAM,SAAS,aAAkB,EAAE,OAAQ;AAC3C,KAAI,EAAE,OACJ,cAAa,EAAE,OAAO,CAAC,OAAO,aAAa,OAAO,CAAC;AACrD,QAAO;EACL,UAAU;EAAO,UAAU;EAC5B;;;;;;;;;;;AAYH,SAAgB,UAEd,GAKA,EACE,WAIE,EAAE,EACoB;AAC1B,KAAI,WAAW,KAAA,EACb,QAAO,oBAAoB,EAAE;UAEpB,WAAW,KACpB,QAAO,iBAAiB,EAAE;MACrB;AAEL,MAAI,EAAE,OACJ,cAAa,EAAE,OAAO,CAAC,OAAO,aAAa,OAAO,CAAC;AACrD,SAAO,iBAAiB,EAAE"} |
| import type { Duplex } from "stream"; | ||
| import { fromReadable } from "./fromReadable.ts"; | ||
| import { fromWritable } from "./fromWritable.ts"; | ||
| /** | ||
| * Converts a Node.js Duplex stream to a Web API TransformStream | ||
| * @template IN - The type of data being written (string or Uint8Array) | ||
| * @template OUT - The type of data being read (string or Uint8Array) | ||
| * @param duplex - The Node.js duplex stream to convert | ||
| * @returns A Web API TransformStream that wraps the Node.js duplex stream | ||
| */ | ||
| export function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| duplex: Duplex | TransformStream | ||
| ): TransformStream<IN, OUT> { | ||
| if (duplex instanceof TransformStream) return duplex; | ||
| return { | ||
| readable: fromReadable<OUT>(duplex), | ||
| writable: fromWritable<IN>(duplex), | ||
| }; | ||
| } |
| import type { Readable } from "stream"; | ||
| /** | ||
| * Converts a Node.js Readable stream to a Web API ReadableStream | ||
| * @template T - The type of data being read (string or Uint8Array) | ||
| * @param i - The Node.js readable stream to convert | ||
| * @returns A Web API ReadableStream that wraps the Node.js stream | ||
| */ | ||
| export function fromReadable<T extends string | Uint8Array>( | ||
| i: Readable | NodeJS.ReadableStream | ReadableStream | ||
| ): ReadableStream<T> { | ||
| if (i instanceof ReadableStream) return i | ||
| const stream = i as Readable; | ||
| return new ReadableStream({ | ||
| start: (c) => { | ||
| stream.on("data", (data: T) => c.enqueue(data)); | ||
| stream.on("close", () => c.close()); | ||
| stream.on("error", (err: Error) => c.error(err)); | ||
| }, | ||
| cancel: (reason) => ( | ||
| (stream as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.( | ||
| reason | ||
| ), | ||
| undefined | ||
| ), | ||
| }); | ||
| } |
| import type { Writable } from "stream"; | ||
| /** | ||
| * Converts a Node.js Writable stream to a Web API WritableStream | ||
| * @template T - The type of data being written (string or Uint8Array) | ||
| * @param i - The Node.js writable stream to convert | ||
| * @returns A Web API WritableStream that wraps the Node.js stream | ||
| */ | ||
| export function fromWritable<T extends string | Uint8Array>( | ||
| i: Writable | NodeJS.WritableStream | WritableStream | ||
| ): WritableStream<T> { | ||
| if(i instanceof WritableStream) return i | ||
| const stream = i as Writable; | ||
| return new WritableStream({ | ||
| start: (c) => (stream.on("error", (err: Error) => c.error(err)), undefined), | ||
| abort: (reason) => ( | ||
| (stream as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.( | ||
| reason | ||
| ), | ||
| undefined | ||
| ), | ||
| write: (data: string | Uint8Array, c) => (stream.write(data), undefined), | ||
| close: () => (stream.end(), undefined), | ||
| }); | ||
| } |
| import { exec } from "child_process"; | ||
| import { Transform } from "stream"; | ||
| import sflow from "./test-utils.ts"; | ||
| import { fromStdioDropErr, fromStdioMergeError } from "./index.ts"; | ||
| import { fromDuplex } from "./fromDuplex.ts"; | ||
| import { fromReadable } from "./fromReadable.ts"; | ||
| import { fromWritable } from "./fromWritable.ts"; | ||
| it("from node streams, read + write", async () => { | ||
| // sh instance | ||
| const p = exec("sh"); | ||
| await sflow("echo hello, world\n").pipeTo(fromWritable(p.stdin!)); | ||
| const output = await sflow(fromReadable(p.stdout!)).text(); | ||
| expect(output).toBe("hello, world\n"); | ||
| }); | ||
| it("fromStdio works", async () => { | ||
| const p = exec("sh"); | ||
| const output = await sflow("echo hello, world\n") | ||
| .by(fromStdioDropErr(p)) | ||
| .text(); | ||
| expect(output).toBe("hello, world\n"); | ||
| }); | ||
| it("fromStdio drop error", async () => { | ||
| const p = exec("sh"); | ||
| const output = await sflow("echo oops, error>&2 && echo hell, word\n") | ||
| .by(fromStdioDropErr(p)) | ||
| .text(); | ||
| expect(output).toBe("hell, word\n"); | ||
| }); | ||
| it("fromStdio merge error", async () => { | ||
| const p = exec("sh"); | ||
| const output = await sflow("echo oops, error>&2 && echo hell, word\n") | ||
| .by(fromStdioMergeError(p)) | ||
| .text(); | ||
| expect(output).toBe("oops, error\nhell, word\n"); | ||
| }); | ||
| it("fromDuplex works with Transform stream", async () => { | ||
| const transform = new Transform({ | ||
| transform(chunk, encoding, callback) { | ||
| this.push(chunk.toString().toUpperCase()); | ||
| callback(); | ||
| } | ||
| }); | ||
| const output = await sflow("hello world\n") | ||
| .by(fromDuplex(transform)) | ||
| .text(); | ||
| expect(output).toBe("HELLO WORLD\n"); | ||
| }); | ||
| it("fromDuplex works with existing TransformStream", async () => { | ||
| const upperCaseTransform = new TransformStream({ | ||
| transform(chunk, controller) { | ||
| controller.enqueue(chunk.toString().toUpperCase()); | ||
| } | ||
| }); | ||
| const result = fromDuplex(upperCaseTransform); | ||
| expect(result).toBe(upperCaseTransform); | ||
| }); |
+148
| import type { Readable, Writable } from "stream"; | ||
| import { fromReadable } from "./fromReadable.ts"; | ||
| import { fromWritable } from "./fromWritable.ts"; | ||
| import { fromDuplex } from "./fromDuplex.ts"; | ||
| // Native implementation to replace sflow mergeStream | ||
| function mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> { | ||
| return new ReadableStream<T>({ | ||
| start(controller) { | ||
| let activeStreams = streams.length; | ||
| streams.forEach(async (stream) => { | ||
| try { | ||
| const reader = stream.getReader(); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| controller.enqueue(value); | ||
| } | ||
| reader.releaseLock(); | ||
| } catch (error) { | ||
| controller.error(error); | ||
| return; | ||
| } finally { | ||
| activeStreams--; | ||
| if (activeStreams === 0) { | ||
| controller.close(); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, dropping stderr output | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @returns A TransformStream that connects stdin to stdout, ignoring stderr | ||
| */ | ||
| export function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| } | ||
| ): TransformStream<IN, OUT> { | ||
| return { | ||
| writable: fromWritable<IN>(p.stdin!), | ||
| readable: fromReadable<OUT>(p.stdout!), | ||
| }; | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, merging stdout and stderr | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @returns A TransformStream that connects stdin to a merged stdout+stderr stream | ||
| */ | ||
| export function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| } | ||
| ): TransformStream<IN, OUT> { | ||
| const stdin = fromWritable<IN>(p.stdin!); | ||
| const stdout = fromReadable<OUT>(p.stdout!); | ||
| const stderr = fromReadable<OUT>(p.stderr!); | ||
| return { | ||
| writable: stdin, | ||
| readable: mergeStream(stdout, stderr), | ||
| }; | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @param options - Configuration object | ||
| * @param options.stderr - The writable stream to forward stderr to | ||
| * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately | ||
| */ | ||
| export function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| }, | ||
| { stderr }: { | ||
| stderr: Writable | WritableStream | ||
| } | ||
| ): TransformStream<IN, OUT> { | ||
| const stdin = fromWritable<IN>(p.stdin!); | ||
| const stdout = fromReadable<OUT>(p.stdout!); | ||
| if (p.stderr) | ||
| fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
| return { | ||
| writable: stdin, readable: stdout, | ||
| }; | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio with configurable stderr handling | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @param options - Configuration object for stderr handling | ||
| * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout | ||
| * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior | ||
| */ | ||
| export function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| }, | ||
| { | ||
| stderr, | ||
| }: { | ||
| /** specify stderr to forward, or set to null to drop. */ | ||
| stderr?: Writable | WritableStream | null; | ||
| } = {} | ||
| ): TransformStream<IN, OUT> { | ||
| if (stderr === undefined) { | ||
| return fromStdioMergeError(p); | ||
| } else if (stderr === null) { | ||
| return fromStdioDropErr(p); | ||
| } else { | ||
| // forward stderr if stderr is specified | ||
| if (p.stderr) | ||
| fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
| return fromStdioDropErr(p); | ||
| } | ||
| } | ||
| export { fromReadable, fromWritable, fromDuplex }; |
| /** | ||
| * Test utilities to replace sflow functionality | ||
| */ | ||
| /** | ||
| * Creates a ReadableStream from a string | ||
| */ | ||
| function fromString(input: string): ReadableStream<Uint8Array> { | ||
| const encoder = new TextEncoder(); | ||
| const encoded = encoder.encode(input); | ||
| return new ReadableStream({ | ||
| start(controller) { | ||
| controller.enqueue(encoded); | ||
| controller.close(); | ||
| } | ||
| }); | ||
| } | ||
| /** | ||
| * Converts a ReadableStream to text | ||
| */ | ||
| async function streamToText(stream: ReadableStream): Promise<string> { | ||
| const reader = stream.getReader(); | ||
| const decoder = new TextDecoder(); | ||
| let result = ''; | ||
| try { | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| // Handle different value types | ||
| if (value instanceof Uint8Array || value instanceof ArrayBuffer) { | ||
| result += decoder.decode(value, { stream: true }); | ||
| } else if (typeof value === 'string') { | ||
| result += value; | ||
| } else { | ||
| // Convert to Uint8Array if it's another type | ||
| const encoder = new TextEncoder(); | ||
| result += decoder.decode(encoder.encode(String(value)), { stream: true }); | ||
| } | ||
| } | ||
| result += decoder.decode(); // flush | ||
| } finally { | ||
| reader.releaseLock(); | ||
| } | ||
| return result; | ||
| } | ||
| /** | ||
| * A chainable stream utility class to replace sflow functionality | ||
| */ | ||
| class StreamFlow { | ||
| constructor(private stream: ReadableStream) {} | ||
| /** | ||
| * Pipe through a transform stream | ||
| */ | ||
| by(transform: TransformStream): StreamFlow { | ||
| return new StreamFlow(this.stream.pipeThrough(transform)); | ||
| } | ||
| /** | ||
| * Convert to text | ||
| */ | ||
| async text(): Promise<string> { | ||
| return streamToText(this.stream); | ||
| } | ||
| /** | ||
| * Pipe to a writable stream | ||
| */ | ||
| async pipeTo(writable: WritableStream): Promise<void> { | ||
| return this.stream.pipeTo(writable); | ||
| } | ||
| } | ||
| /** | ||
| * Main sflow replacement function | ||
| */ | ||
| function sflow(input: string | ReadableStream): StreamFlow { | ||
| if (typeof input === 'string') { | ||
| return new StreamFlow(fromString(input)); | ||
| } | ||
| return new StreamFlow(input); | ||
| } | ||
| export default sflow; |
+20
-16
| { | ||
| "name": "from-node-stream", | ||
| "version": "0.1.2", | ||
| "version": "0.2.0", | ||
| "description": "convert nodejs-stream into webstream", | ||
@@ -27,29 +27,33 @@ "keywords": [ | ||
| "exports": { | ||
| "import": "./dist/index.js", | ||
| "types": "./index.ts" | ||
| "import": "./dist/index.mjs", | ||
| "types": "./ts/index.ts" | ||
| }, | ||
| "main": "index.js", | ||
| "module": "index.ts", | ||
| "types": "./index.ts", | ||
| "main": "dist/index.mjs", | ||
| "module": "dist/index.mjs", | ||
| "types": "./ts/index.ts", | ||
| "files": [ | ||
| "*.ts", | ||
| "ts", | ||
| "dist" | ||
| ], | ||
| "scripts": { | ||
| "build": "bun build ./index.ts --outdir=dist --sourcemap=external", | ||
| "build": "tsdown ts/index.ts --format esm --sourcemap", | ||
| "typecheck": "tsgo --noEmit", | ||
| "prerelease": "bun run build && bun run test", | ||
| "release": "bunx standard-version && git push --follow-tags && npm publish", | ||
| "test": "bun test", | ||
| "prepare": "husky" | ||
| "prepublishOnly": "bun run build && bun run test", | ||
| "prepare": "husky", | ||
| "dev": "tsdown ts/index.ts --format esm --watch" | ||
| }, | ||
| "dependencies": {}, | ||
| "devDependencies": { | ||
| "phpdie": "^1.2.14", | ||
| "@types/bun": "^1.1.11", | ||
| "@types/jest": "^29.5.13", | ||
| "@types/node": "^22.10.7", | ||
| "@types/bun": "^1.3.11", | ||
| "@types/jest": "^30.0.0", | ||
| "@types/node": "^25.5.0", | ||
| "@typescript/native-preview": "^7.0.0-dev.20260318.1", | ||
| "husky": "^9.1.7", | ||
| "semantic-release": "^24.2.1", | ||
| "typescript": "^5.6.3" | ||
| "phpdie": "^1.7.0", | ||
| "semantic-release": "^25.0.3", | ||
| "tsdown": "^0.21.4", | ||
| "typescript": "^5.9.3" | ||
| } | ||
| } |
-113
| // fromReadable.ts | ||
| function fromReadable(i) { | ||
| if (i instanceof ReadableStream) | ||
| return i; | ||
| return new ReadableStream({ | ||
| start: (c) => { | ||
| i.on("data", (data) => c.enqueue(data)); | ||
| i.on("close", () => c.close()); | ||
| i.on("error", (err) => c.error(err)); | ||
| }, | ||
| cancel: (reason) => (i.destroy?.(reason), undefined) | ||
| }); | ||
| } | ||
| // fromWritable.ts | ||
| function fromWritable(i) { | ||
| if (i instanceof WritableStream) | ||
| return i; | ||
| return new WritableStream({ | ||
| start: (c) => (i.on("error", (err) => c.error(err)), undefined), | ||
| abort: (reason) => (i.destroy?.(reason), undefined), | ||
| write: (data, c) => (i.write(data), undefined), | ||
| close: () => (i.end(), undefined) | ||
| }); | ||
| } | ||
| // fromDuplex.ts | ||
| function fromDuplex(duplex) { | ||
| if (duplex instanceof TransformStream) | ||
| return duplex; | ||
| return { | ||
| readable: fromReadable(duplex), | ||
| writable: fromWritable(duplex) | ||
| }; | ||
| } | ||
| // index.ts | ||
| function mergeStream(...streams) { | ||
| return new ReadableStream({ | ||
| start(controller) { | ||
| let activeStreams = streams.length; | ||
| streams.forEach(async (stream) => { | ||
| try { | ||
| const reader = stream.getReader(); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) | ||
| break; | ||
| controller.enqueue(value); | ||
| } | ||
| reader.releaseLock(); | ||
| } catch (error) { | ||
| controller.error(error); | ||
| return; | ||
| } finally { | ||
| activeStreams--; | ||
| if (activeStreams === 0) { | ||
| controller.close(); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
| } | ||
| function fromStdioDropErr(p) { | ||
| return { | ||
| writable: fromWritable(p.stdin), | ||
| readable: fromReadable(p.stdout) | ||
| }; | ||
| } | ||
| function fromStdioMergeError(p) { | ||
| const stdin = fromWritable(p.stdin); | ||
| const stdout = fromReadable(p.stdout); | ||
| const stderr = fromReadable(p.stderr); | ||
| return { | ||
| writable: stdin, | ||
| readable: mergeStream(stdout, stderr) | ||
| }; | ||
| } | ||
| function fromStdioAndForwardError(p, { stderr }) { | ||
| const stdin = fromWritable(p.stdin); | ||
| const stdout = fromReadable(p.stdout); | ||
| if (p.stderr) | ||
| fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
| return { | ||
| writable: stdin, | ||
| readable: stdout | ||
| }; | ||
| } | ||
| function fromStdio(p, { | ||
| stderr | ||
| } = {}) { | ||
| if (stderr === undefined) { | ||
| return fromStdioMergeError(p); | ||
| } else if (stderr === null) { | ||
| return fromStdioDropErr(p); | ||
| } else { | ||
| if (p.stderr) | ||
| fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
| return fromStdioDropErr(p); | ||
| } | ||
| } | ||
| export { | ||
| fromWritable, | ||
| fromStdioMergeError, | ||
| fromStdioDropErr, | ||
| fromStdioAndForwardError, | ||
| fromStdio, | ||
| fromReadable, | ||
| fromDuplex | ||
| }; | ||
| //# debugId=11A3A3E91965B03864756E2164756E21 |
| { | ||
| "version": 3, | ||
| "sources": ["../fromReadable.ts", "../fromWritable.ts", "../fromDuplex.ts", "../index.ts"], | ||
| "sourcesContent": [ | ||
| "import type { Readable } from \"stream\";\n\n/**\n * Converts a Node.js Readable stream to a Web API ReadableStream\n * @template T - The type of data being read (string or Uint8Array)\n * @param i - The Node.js readable stream to convert\n * @returns A Web API ReadableStream that wraps the Node.js stream\n */\nexport function fromReadable<T extends string | Uint8Array>(\n i: Readable | NodeJS.ReadableStream | ReadableStream\n): ReadableStream<T> {\n if (i instanceof ReadableStream) return i\n return new ReadableStream({\n start: (c) => {\n i.on(\"data\", (data) => c.enqueue(data));\n i.on(\"close\", () => c.close());\n i.on(\"error\", (err) => c.error(err));\n },\n cancel: (reason) => (\n (i as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n });\n}\n", | ||
| "import type { Writable } from \"stream\";\n\n/**\n * Converts a Node.js Writable stream to a Web API WritableStream\n * @template T - The type of data being written (string or Uint8Array)\n * @param i - The Node.js writable stream to convert\n * @returns A Web API WritableStream that wraps the Node.js stream\n */\nexport function fromWritable<T extends string | Uint8Array>(\n i: Writable | NodeJS.WritableStream | WritableStream\n): WritableStream<T> {\n if(i instanceof WritableStream) return i\n return new WritableStream({\n start: (c) => (i.on(\"error\", (err) => c.error(err)), undefined),\n abort: (reason) => (\n (i as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n write: (data: string | Uint8Array, c) => (i.write(data), undefined),\n close: () => (i.end(), undefined),\n });\n}\n", | ||
| "import type { Duplex } from \"stream\";\nimport { fromReadable } from \"./fromReadable\";\nimport { fromWritable } from \"./fromWritable\";\n\n/**\n * Converts a Node.js Duplex stream to a Web API TransformStream\n * @template IN - The type of data being written (string or Uint8Array)\n * @template OUT - The type of data being read (string or Uint8Array)\n * @param duplex - The Node.js duplex stream to convert\n * @returns A Web API TransformStream that wraps the Node.js duplex stream\n */\nexport function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n duplex: Duplex | TransformStream\n): TransformStream<IN, OUT> {\n if (duplex instanceof TransformStream) return duplex;\n \n return {\n readable: fromReadable<OUT>(duplex),\n writable: fromWritable<IN>(duplex),\n };\n}", | ||
| "// Native implementation to replace sflow mergeStream\nfunction mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> {\n return new ReadableStream<T>({\n start(controller) {\n let activeStreams = streams.length;\n\n streams.forEach(async (stream) => {\n try {\n const reader = stream.getReader();\n\n while (true) {\n const { done, value } = await reader.read();\n if (done) break;\n controller.enqueue(value);\n }\n\n reader.releaseLock();\n } catch (error) {\n controller.error(error);\n return;\n } finally {\n activeStreams--;\n if (activeStreams === 0) {\n controller.close();\n }\n }\n });\n }\n });\n}\nimport { Readable, Writable } from \"stream\";\nimport { fromReadable } from \"./fromReadable\";\nimport { fromWritable } from \"./fromWritable\";\nimport { fromDuplex } from \"./fromDuplex\";\n\n/**\n * Creates a TransformStream from a process's stdio, dropping stderr output\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to stdout, ignoring stderr\n */\nexport function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n return {\n writable: fromWritable<IN>(p.stdin!),\n readable: fromReadable<OUT>(p.stdout!),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, merging stdout and stderr\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to a merged stdout+stderr stream\n */\nexport function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n const stderr = fromReadable<OUT>(p.stderr!);\n return {\n writable: stdin,\n readable: mergeStream(stdout, stderr),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object\n * @param options.stderr - The writable stream to forward stderr to\n * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately\n */\nexport function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n { stderr }: {\n stderr: Writable | WritableStream\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return {\n writable: stdin, readable: stdout,\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio with configurable stderr handling\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object for stderr handling\n * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout\n * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior\n */\nexport function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n {\n stderr,\n }: {\n /** specify stderr to forward, or set to null to drop. */\n stderr?: Writable | WritableStream | null;\n } = {}\n): TransformStream<IN, OUT> {\n if (stderr === undefined) {\n return fromStdioMergeError(p);\n\n } else if (stderr === null) {\n return fromStdioDropErr(p);\n } else {\n // forward stderr if stderr is specified\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return fromStdioDropErr(p);\n }\n\n}\n\nexport { fromReadable, fromWritable, fromDuplex };\n" | ||
| ], | ||
| "mappings": ";AAQO,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,IAAI,aAAa;AAAA,IAAgB,OAAO;AAAA,EACxC,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,MAAM;AAAA,MACZ,EAAE,GAAG,QAAQ,CAAC,SAAS,EAAE,QAAQ,IAAI,CAAC;AAAA,MACtC,EAAE,GAAG,SAAS,MAAM,EAAE,MAAM,CAAC;AAAA,MAC7B,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC;AAAA;AAAA,IAErC,QAAQ,CAAC,YACN,EAAyD,UACxD,MACF,GACA;AAAA,EAEJ,CAAC;AAAA;;;AChBI,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,IAAG,aAAa;AAAA,IAAgB,OAAO;AAAA,EACvC,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,OAAO,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC,GAAG;AAAA,IACrD,OAAO,CAAC,YACL,EAAyD,UACxD,MACF,GACA;AAAA,IAEF,OAAO,CAAC,MAA2B,OAAO,EAAE,MAAM,IAAI,GAAG;AAAA,IACzD,OAAO,OAAO,EAAE,IAAI,GAAG;AAAA,EACzB,CAAC;AAAA;;;ACXI,SAAS,UAA2E,CACzF,QAC0B;AAAA,EAC1B,IAAI,kBAAkB;AAAA,IAAiB,OAAO;AAAA,EAE9C,OAAO;AAAA,IACL,UAAU,aAAkB,MAAM;AAAA,IAClC,UAAU,aAAiB,MAAM;AAAA,EACnC;AAAA;;;AClBF,SAAS,WAAc,IAAI,SAAiD;AAAA,EAC1E,OAAO,IAAI,eAAkB;AAAA,IAC3B,KAAK,CAAC,YAAY;AAAA,MAChB,IAAI,gBAAgB,QAAQ;AAAA,MAE5B,QAAQ,QAAQ,OAAO,WAAW;AAAA,QAChC,IAAI;AAAA,UACF,MAAM,SAAS,OAAO,UAAU;AAAA,UAEhC,OAAO,MAAM;AAAA,YACX,QAAQ,MAAM,UAAU,MAAM,OAAO,KAAK;AAAA,YAC1C,IAAI;AAAA,cAAM;AAAA,YACV,WAAW,QAAQ,KAAK;AAAA,UAC1B;AAAA,UAEA,OAAO,YAAY;AAAA,UACnB,OAAO,OAAO;AAAA,UACd,WAAW,MAAM,KAAK;AAAA,UACtB;AAAA,kBACA;AAAA,UACA;AAAA,UACA,IAAI,kBAAkB,GAAG;AAAA,YACvB,WAAW,MAAM;AAAA,UACnB;AAAA;AAAA,OAEH;AAAA;AAAA,EAEL,CAAC;AAAA;AAcI,SAAS,gBAAiF,CAE/F,GAK0B;AAAA,EAC1B,OAAO;AAAA,IACL,UAAU,aAAiB,EAAE,KAAM;AAAA,IACnC,UAAU,aAAkB,EAAE,MAAO;AAAA,EACvC;AAAA;AAUK,SAAS,mBAAoF,CAElG,GAK0B;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,OAAO;AAAA,IACL,UAAU;AAAA,IACV,UAAU,YAAY,QAAQ,MAAM;AAAA,EACtC;AAAA;AAYK,SAAS,wBAAyF,CAEvG,KAKE,UAGwB;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,IAAI,EAAE;AAAA,IACJ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,EACpD,OAAO;AAAA,IACL,UAAU;AAAA,IAAO,UAAU;AAAA,EAC7B;AAAA;AAYK,SAAS,SAA0E,CAExF;AAAA,EAME;AAAA,IAIE,CAAC,GACqB;AAAA,EAC1B,IAAI,WAAW,WAAW;AAAA,IACxB,OAAO,oBAAoB,CAAC;AAAA,EAE9B,EAAO,SAAI,WAAW,MAAM;AAAA,IAC1B,OAAO,iBAAiB,CAAC;AAAA,EAC3B,EAAO;AAAA,IAEL,IAAI,EAAE;AAAA,MACJ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,IACpD,OAAO,iBAAiB,CAAC;AAAA;AAAA;", | ||
| "debugId": "11A3A3E91965B03864756E2164756E21", | ||
| "names": [] | ||
| } |
| import type { Duplex } from "stream"; | ||
| import { fromReadable } from "./fromReadable"; | ||
| import { fromWritable } from "./fromWritable"; | ||
| /** | ||
| * Converts a Node.js Duplex stream to a Web API TransformStream | ||
| * @template IN - The type of data being written (string or Uint8Array) | ||
| * @template OUT - The type of data being read (string or Uint8Array) | ||
| * @param duplex - The Node.js duplex stream to convert | ||
| * @returns A Web API TransformStream that wraps the Node.js duplex stream | ||
| */ | ||
| export function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| duplex: Duplex | TransformStream | ||
| ): TransformStream<IN, OUT> { | ||
| if (duplex instanceof TransformStream) return duplex; | ||
| return { | ||
| readable: fromReadable<OUT>(duplex), | ||
| writable: fromWritable<IN>(duplex), | ||
| }; | ||
| } |
| import type { Readable } from "stream"; | ||
| /** | ||
| * Converts a Node.js Readable stream to a Web API ReadableStream | ||
| * @template T - The type of data being read (string or Uint8Array) | ||
| * @param i - The Node.js readable stream to convert | ||
| * @returns A Web API ReadableStream that wraps the Node.js stream | ||
| */ | ||
| export function fromReadable<T extends string | Uint8Array>( | ||
| i: Readable | NodeJS.ReadableStream | ReadableStream | ||
| ): ReadableStream<T> { | ||
| if (i instanceof ReadableStream) return i | ||
| return new ReadableStream({ | ||
| start: (c) => { | ||
| i.on("data", (data) => c.enqueue(data)); | ||
| i.on("close", () => c.close()); | ||
| i.on("error", (err) => c.error(err)); | ||
| }, | ||
| cancel: (reason) => ( | ||
| (i as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.( | ||
| reason | ||
| ), | ||
| undefined | ||
| ), | ||
| }); | ||
| } |
| import type { Writable } from "stream"; | ||
| /** | ||
| * Converts a Node.js Writable stream to a Web API WritableStream | ||
| * @template T - The type of data being written (string or Uint8Array) | ||
| * @param i - The Node.js writable stream to convert | ||
| * @returns A Web API WritableStream that wraps the Node.js stream | ||
| */ | ||
| export function fromWritable<T extends string | Uint8Array>( | ||
| i: Writable | NodeJS.WritableStream | WritableStream | ||
| ): WritableStream<T> { | ||
| if(i instanceof WritableStream) return i | ||
| return new WritableStream({ | ||
| start: (c) => (i.on("error", (err) => c.error(err)), undefined), | ||
| abort: (reason) => ( | ||
| (i as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.( | ||
| reason | ||
| ), | ||
| undefined | ||
| ), | ||
| write: (data: string | Uint8Array, c) => (i.write(data), undefined), | ||
| close: () => (i.end(), undefined), | ||
| }); | ||
| } |
| import { exec } from "child_process"; | ||
| import { Transform } from "stream"; | ||
| import sflow from "./test-utils"; | ||
| import { fromStdioDropErr, fromStdioMergeError } from "."; | ||
| import { fromDuplex } from "./fromDuplex"; | ||
| import { fromReadable } from "./fromReadable"; | ||
| import { fromWritable } from "./fromWritable"; | ||
| it("from node streams, read + write", async () => { | ||
| // sh instance | ||
| const p = exec("sh"); | ||
| await sflow("echo hello, world\n").pipeTo(fromWritable(p.stdin!)); | ||
| const output = await sflow(fromReadable(p.stdout!)).text(); | ||
| expect(output).toBe("hello, world\n"); | ||
| }); | ||
| it("fromStdio works", async () => { | ||
| const p = exec("sh"); | ||
| const output = await sflow("echo hello, world\n") | ||
| .by(fromStdioDropErr(p)) | ||
| .text(); | ||
| expect(output).toBe("hello, world\n"); | ||
| }); | ||
| it("fromStdio drop error", async () => { | ||
| const p = exec("sh"); | ||
| const output = await sflow("echo oops, error>&2 && echo hell, word\n") | ||
| .by(fromStdioDropErr(p)) | ||
| .text(); | ||
| expect(output).toBe("hell, word\n"); | ||
| }); | ||
| it("fromStdio merge error", async () => { | ||
| const p = exec("sh"); | ||
| const output = await sflow("echo oops, error>&2 && echo hell, word\n") | ||
| .by(fromStdioMergeError(p)) | ||
| .text(); | ||
| expect(output).toBe("oops, error\nhell, word\n"); | ||
| }); | ||
| it("fromDuplex works with Transform stream", async () => { | ||
| const transform = new Transform({ | ||
| transform(chunk, encoding, callback) { | ||
| this.push(chunk.toString().toUpperCase()); | ||
| callback(); | ||
| } | ||
| }); | ||
| const output = await sflow("hello world\n") | ||
| .by(fromDuplex(transform)) | ||
| .text(); | ||
| expect(output).toBe("HELLO WORLD\n"); | ||
| }); | ||
| it("fromDuplex works with existing TransformStream", async () => { | ||
| const upperCaseTransform = new TransformStream({ | ||
| transform(chunk, controller) { | ||
| controller.enqueue(chunk.toString().toUpperCase()); | ||
| } | ||
| }); | ||
| const result = fromDuplex(upperCaseTransform); | ||
| expect(result).toBe(upperCaseTransform); | ||
| }); |
-147
| // Native implementation to replace sflow mergeStream | ||
| function mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> { | ||
| return new ReadableStream<T>({ | ||
| start(controller) { | ||
| let activeStreams = streams.length; | ||
| streams.forEach(async (stream) => { | ||
| try { | ||
| const reader = stream.getReader(); | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| controller.enqueue(value); | ||
| } | ||
| reader.releaseLock(); | ||
| } catch (error) { | ||
| controller.error(error); | ||
| return; | ||
| } finally { | ||
| activeStreams--; | ||
| if (activeStreams === 0) { | ||
| controller.close(); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
| }); | ||
| } | ||
| import { Readable, Writable } from "stream"; | ||
| import { fromReadable } from "./fromReadable"; | ||
| import { fromWritable } from "./fromWritable"; | ||
| import { fromDuplex } from "./fromDuplex"; | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, dropping stderr output | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @returns A TransformStream that connects stdin to stdout, ignoring stderr | ||
| */ | ||
| export function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| } | ||
| ): TransformStream<IN, OUT> { | ||
| return { | ||
| writable: fromWritable<IN>(p.stdin!), | ||
| readable: fromReadable<OUT>(p.stdout!), | ||
| }; | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, merging stdout and stderr | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @returns A TransformStream that connects stdin to a merged stdout+stderr stream | ||
| */ | ||
| export function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| } | ||
| ): TransformStream<IN, OUT> { | ||
| const stdin = fromWritable<IN>(p.stdin!); | ||
| const stdout = fromReadable<OUT>(p.stdout!); | ||
| const stderr = fromReadable<OUT>(p.stderr!); | ||
| return { | ||
| writable: stdin, | ||
| readable: mergeStream(stdout, stderr), | ||
| }; | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @param options - Configuration object | ||
| * @param options.stderr - The writable stream to forward stderr to | ||
| * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately | ||
| */ | ||
| export function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| }, | ||
| { stderr }: { | ||
| stderr: Writable | WritableStream | ||
| } | ||
| ): TransformStream<IN, OUT> { | ||
| const stdin = fromWritable<IN>(p.stdin!); | ||
| const stdout = fromReadable<OUT>(p.stdout!); | ||
| if (p.stderr) | ||
| fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
| return { | ||
| writable: stdin, readable: stdout, | ||
| }; | ||
| } | ||
| /** | ||
| * Creates a TransformStream from a process's stdio with configurable stderr handling | ||
| * @template IN - Input data type (string or Uint8Array) | ||
| * @template OUT - Output data type (string or Uint8Array) | ||
| * @param p - A process object with stdin, stdout, and stderr streams | ||
| * @param options - Configuration object for stderr handling | ||
| * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout | ||
| * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior | ||
| */ | ||
| export function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>( | ||
| /** a process, which has stdin, stdout, stderr */ | ||
| p: { | ||
| stdin?: Writable | WritableStream | null; | ||
| stdout?: Readable | ReadableStream | null; | ||
| stderr?: Readable | ReadableStream | null; | ||
| }, | ||
| { | ||
| stderr, | ||
| }: { | ||
| /** specify stderr to forward, or set to null to drop. */ | ||
| stderr?: Writable | WritableStream | null; | ||
| } = {} | ||
| ): TransformStream<IN, OUT> { | ||
| if (stderr === undefined) { | ||
| return fromStdioMergeError(p); | ||
| } else if (stderr === null) { | ||
| return fromStdioDropErr(p); | ||
| } else { | ||
| // forward stderr if stderr is specified | ||
| if (p.stderr) | ||
| fromReadable(p.stderr).pipeTo(fromWritable(stderr)); | ||
| return fromStdioDropErr(p); | ||
| } | ||
| } | ||
| export { fromReadable, fromWritable, fromDuplex }; |
| /** | ||
| * Test utilities to replace sflow functionality | ||
| */ | ||
| /** | ||
| * Creates a ReadableStream from a string | ||
| */ | ||
| function fromString(input: string): ReadableStream<Uint8Array> { | ||
| const encoder = new TextEncoder(); | ||
| const encoded = encoder.encode(input); | ||
| return new ReadableStream({ | ||
| start(controller) { | ||
| controller.enqueue(encoded); | ||
| controller.close(); | ||
| } | ||
| }); | ||
| } | ||
| /** | ||
| * Converts a ReadableStream to text | ||
| */ | ||
| async function streamToText(stream: ReadableStream): Promise<string> { | ||
| const reader = stream.getReader(); | ||
| const decoder = new TextDecoder(); | ||
| let result = ''; | ||
| try { | ||
| while (true) { | ||
| const { done, value } = await reader.read(); | ||
| if (done) break; | ||
| // Handle different value types | ||
| if (value instanceof Uint8Array || value instanceof ArrayBuffer) { | ||
| result += decoder.decode(value, { stream: true }); | ||
| } else if (typeof value === 'string') { | ||
| result += value; | ||
| } else { | ||
| // Convert to Uint8Array if it's another type | ||
| const encoder = new TextEncoder(); | ||
| result += decoder.decode(encoder.encode(String(value)), { stream: true }); | ||
| } | ||
| } | ||
| result += decoder.decode(); // flush | ||
| } finally { | ||
| reader.releaseLock(); | ||
| } | ||
| return result; | ||
| } | ||
| /** | ||
| * A chainable stream utility class to replace sflow functionality | ||
| */ | ||
| class StreamFlow { | ||
| constructor(private stream: ReadableStream) {} | ||
| /** | ||
| * Pipe through a transform stream | ||
| */ | ||
| by(transform: TransformStream): StreamFlow { | ||
| return new StreamFlow(this.stream.pipeThrough(transform)); | ||
| } | ||
| /** | ||
| * Convert to text | ||
| */ | ||
| async text(): Promise<string> { | ||
| return streamToText(this.stream); | ||
| } | ||
| /** | ||
| * Pipe to a writable stream | ||
| */ | ||
| async pipeTo(writable: WritableStream): Promise<void> { | ||
| return this.stream.pipeTo(writable); | ||
| } | ||
| } | ||
| /** | ||
| * Main sflow replacement function | ||
| */ | ||
| function sflow(input: string | ReadableStream): StreamFlow { | ||
| if (typeof input === 'string') { | ||
| return new StreamFlow(fromString(input)); | ||
| } | ||
| return new StreamFlow(input); | ||
| } | ||
| export default sflow; |
Major refactor
Supply chain riskPackage has recently undergone a major refactor. It may be unstable or indicate significant internal changes. Use caution when updating to versions that include significant changes.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
Shell access
Supply chain riskThis module accesses the system shell. Accessing the system shell increases the risk of executing arbitrary code.
Found 1 instance in 1 package
37459
23.87%12
9.09%478
7.42%9
28.57%2
100%