Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

from-node-stream

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

from-node-stream - npm Package Compare versions

Comparing version
0.1.2
to
0.2.0
+102
dist/index.d.mts
import { Duplex, Readable, Writable } from "stream";
//#region ts/fromReadable.d.ts
/**
* Converts a Node.js Readable stream to a Web API ReadableStream
* @template T - The type of data being read (string or Uint8Array)
* @param i - The Node.js readable stream to convert
* @returns A Web API ReadableStream that wraps the Node.js stream
*/
declare function fromReadable<T extends string | Uint8Array>(i: Readable | NodeJS.ReadableStream | ReadableStream): ReadableStream<T>;
//#endregion
//#region ts/fromWritable.d.ts
/**
* Converts a Node.js Writable stream to a Web API WritableStream
* @template T - The type of data being written (string or Uint8Array)
* @param i - The Node.js writable stream to convert
* @returns A Web API WritableStream that wraps the Node.js stream
*/
declare function fromWritable<T extends string | Uint8Array>(i: Writable | NodeJS.WritableStream | WritableStream): WritableStream<T>;
//#endregion
//#region ts/fromDuplex.d.ts
/**
* Converts a Node.js Duplex stream to a Web API TransformStream
* @template IN - The type of data being written (string or Uint8Array)
* @template OUT - The type of data being read (string or Uint8Array)
* @param duplex - The Node.js duplex stream to convert
* @returns A Web API TransformStream that wraps the Node.js duplex stream
*/
declare function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(duplex: Duplex | TransformStream): TransformStream<IN, OUT>;
//#endregion
//#region ts/index.d.ts
/**
* Creates a TransformStream from a process's stdio, dropping stderr output
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @returns A TransformStream that connects stdin to stdout, ignoring stderr
*/
declare function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
}): TransformStream<IN, OUT>;
/**
* Creates a TransformStream from a process's stdio, merging stdout and stderr
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @returns A TransformStream that connects stdin to a merged stdout+stderr stream
*/
declare function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
}): TransformStream<IN, OUT>;
/**
* Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @param options - Configuration object
* @param options.stderr - The writable stream to forward stderr to
* @returns A TransformStream that connects stdin to stdout, forwarding stderr separately
*/
declare function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
}, {
stderr
}: {
stderr: Writable | WritableStream;
}): TransformStream<IN, OUT>;
/**
* Creates a TransformStream from a process's stdio with configurable stderr handling
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @param options - Configuration object for stderr handling
* @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout
* @returns A TransformStream that connects stdin to stdout with the specified stderr behavior
*/
declare function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
}, {
stderr
}?: {
/** specify stderr to forward, or set to null to drop. */stderr?: Writable | WritableStream | null;
}): TransformStream<IN, OUT>;
//#endregion
export { fromDuplex, fromReadable, fromStdio, fromStdioAndForwardError, fromStdioDropErr, fromStdioMergeError, fromWritable };
//# sourceMappingURL=index.d.mts.map
//#region ts/fromReadable.ts
/**
* Converts a Node.js Readable stream to a Web API ReadableStream
* @template T - The type of data being read (string or Uint8Array)
* @param i - The Node.js readable stream to convert
* @returns A Web API ReadableStream that wraps the Node.js stream
*/
function fromReadable(i) {
if (i instanceof ReadableStream) return i;
const stream = i;
return new ReadableStream({
start: (c) => {
stream.on("data", (data) => c.enqueue(data));
stream.on("close", () => c.close());
stream.on("error", (err) => c.error(err));
},
cancel: (reason) => (stream.destroy?.(reason), void 0)
});
}
//#endregion
//#region ts/fromWritable.ts
/**
* Converts a Node.js Writable stream to a Web API WritableStream
* @template T - The type of data being written (string or Uint8Array)
* @param i - The Node.js writable stream to convert
* @returns A Web API WritableStream that wraps the Node.js stream
*/
function fromWritable(i) {
if (i instanceof WritableStream) return i;
const stream = i;
return new WritableStream({
start: (c) => (stream.on("error", (err) => c.error(err)), void 0),
abort: (reason) => (stream.destroy?.(reason), void 0),
write: (data, c) => (stream.write(data), void 0),
close: () => (stream.end(), void 0)
});
}
//#endregion
//#region ts/fromDuplex.ts
/**
* Converts a Node.js Duplex stream to a Web API TransformStream
* @template IN - The type of data being written (string or Uint8Array)
* @template OUT - The type of data being read (string or Uint8Array)
* @param duplex - The Node.js duplex stream to convert
* @returns A Web API TransformStream that wraps the Node.js duplex stream
*/
function fromDuplex(duplex) {
if (duplex instanceof TransformStream) return duplex;
return {
readable: fromReadable(duplex),
writable: fromWritable(duplex)
};
}
//#endregion
//#region ts/index.ts
function mergeStream(...streams) {
return new ReadableStream({ start(controller) {
let activeStreams = streams.length;
streams.forEach(async (stream) => {
try {
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
controller.enqueue(value);
}
reader.releaseLock();
} catch (error) {
controller.error(error);
return;
} finally {
activeStreams--;
if (activeStreams === 0) controller.close();
}
});
} });
}
/**
* Creates a TransformStream from a process's stdio, dropping stderr output
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @returns A TransformStream that connects stdin to stdout, ignoring stderr
*/
function fromStdioDropErr(p) {
return {
writable: fromWritable(p.stdin),
readable: fromReadable(p.stdout)
};
}
/**
* Creates a TransformStream from a process's stdio, merging stdout and stderr
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @returns A TransformStream that connects stdin to a merged stdout+stderr stream
*/
function fromStdioMergeError(p) {
return {
writable: fromWritable(p.stdin),
readable: mergeStream(fromReadable(p.stdout), fromReadable(p.stderr))
};
}
/**
* Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @param options - Configuration object
* @param options.stderr - The writable stream to forward stderr to
* @returns A TransformStream that connects stdin to stdout, forwarding stderr separately
*/
function fromStdioAndForwardError(p, { stderr }) {
const stdin = fromWritable(p.stdin);
const stdout = fromReadable(p.stdout);
if (p.stderr) fromReadable(p.stderr).pipeTo(fromWritable(stderr));
return {
writable: stdin,
readable: stdout
};
}
/**
* Creates a TransformStream from a process's stdio with configurable stderr handling
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @param options - Configuration object for stderr handling
* @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout
* @returns A TransformStream that connects stdin to stdout with the specified stderr behavior
*/
function fromStdio(p, { stderr } = {}) {
if (stderr === void 0) return fromStdioMergeError(p);
else if (stderr === null) return fromStdioDropErr(p);
else {
if (p.stderr) fromReadable(p.stderr).pipeTo(fromWritable(stderr));
return fromStdioDropErr(p);
}
}
//#endregion
export { fromDuplex, fromReadable, fromStdio, fromStdioAndForwardError, fromStdioDropErr, fromStdioMergeError, fromWritable };
//# sourceMappingURL=index.mjs.map
{"version":3,"file":"index.mjs","names":[],"sources":["../ts/fromReadable.ts","../ts/fromWritable.ts","../ts/fromDuplex.ts","../ts/index.ts"],"sourcesContent":["import type { Readable } from \"stream\";\n\n/**\n * Converts a Node.js Readable stream to a Web API ReadableStream\n * @template T - The type of data being read (string or Uint8Array)\n * @param i - The Node.js readable stream to convert\n * @returns A Web API ReadableStream that wraps the Node.js stream\n */\nexport function fromReadable<T extends string | Uint8Array>(\n i: Readable | NodeJS.ReadableStream | ReadableStream\n): ReadableStream<T> {\n if (i instanceof ReadableStream) return i\n const stream = i as Readable;\n return new ReadableStream({\n start: (c) => {\n stream.on(\"data\", (data: T) => c.enqueue(data));\n stream.on(\"close\", () => c.close());\n stream.on(\"error\", (err: Error) => c.error(err));\n },\n cancel: (reason) => (\n (stream as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n });\n}\n","import type { Writable } from \"stream\";\n\n/**\n * Converts a Node.js Writable stream to a Web API WritableStream\n * @template T - The type of data being written (string or Uint8Array)\n * @param i - The Node.js writable stream to convert\n * @returns A Web API WritableStream that wraps the Node.js stream\n */\nexport function fromWritable<T extends string | Uint8Array>(\n i: Writable | NodeJS.WritableStream | WritableStream\n): WritableStream<T> {\n if(i instanceof WritableStream) return i\n const stream = i as Writable;\n return new WritableStream({\n start: (c) => (stream.on(\"error\", (err: Error) => c.error(err)), undefined),\n abort: (reason) => (\n (stream as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n write: (data: string | Uint8Array, c) => (stream.write(data), undefined),\n close: () => (stream.end(), undefined),\n });\n}\n","import type { Duplex } from \"stream\";\nimport { fromReadable } from \"./fromReadable.ts\";\nimport { fromWritable } from \"./fromWritable.ts\";\n\n/**\n * Converts a Node.js Duplex stream to a Web API TransformStream\n * @template IN - The type of data being written (string or Uint8Array)\n * @template OUT - The type of data being read (string or Uint8Array)\n * @param duplex - The Node.js duplex stream to convert\n * @returns A Web API TransformStream that wraps the Node.js duplex stream\n */\nexport function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n duplex: Duplex | TransformStream\n): TransformStream<IN, OUT> {\n if (duplex instanceof TransformStream) return duplex;\n \n return {\n readable: fromReadable<OUT>(duplex),\n writable: fromWritable<IN>(duplex),\n };\n}","import type { Readable, Writable } from \"stream\";\nimport { fromReadable } from \"./fromReadable.ts\";\nimport { fromWritable } from \"./fromWritable.ts\";\nimport { fromDuplex } from \"./fromDuplex.ts\";\n\n// Native implementation to replace sflow mergeStream\nfunction mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> {\n return new ReadableStream<T>({\n start(controller) {\n let activeStreams = streams.length;\n\n streams.forEach(async (stream) => {\n try {\n const reader = stream.getReader();\n\n while (true) {\n const { done, value } = await reader.read();\n if (done) break;\n controller.enqueue(value);\n }\n\n reader.releaseLock();\n } catch (error) {\n controller.error(error);\n return;\n } finally {\n activeStreams--;\n if (activeStreams === 0) {\n controller.close();\n }\n }\n });\n }\n });\n}\n\n/**\n * Creates a TransformStream from a process's stdio, dropping stderr output\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to stdout, ignoring stderr\n */\nexport function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n return {\n writable: fromWritable<IN>(p.stdin!),\n readable: fromReadable<OUT>(p.stdout!),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, merging stdout and stderr\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to a merged stdout+stderr stream\n */\nexport function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n const stderr = fromReadable<OUT>(p.stderr!);\n return {\n writable: stdin,\n readable: mergeStream(stdout, stderr),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object\n * @param options.stderr - The writable stream to forward stderr to\n * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately\n */\nexport function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n { stderr }: {\n stderr: Writable | WritableStream\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return {\n writable: stdin, readable: stdout,\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio with configurable stderr handling\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object for stderr handling\n * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout\n * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior\n */\nexport function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n {\n stderr,\n }: {\n /** specify stderr to forward, or set to null to drop. */\n stderr?: Writable | WritableStream | null;\n } = {}\n): TransformStream<IN, OUT> {\n if (stderr === undefined) {\n return fromStdioMergeError(p);\n\n } else if (stderr === null) {\n return fromStdioDropErr(p);\n } else {\n // forward stderr if stderr is specified\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return fromStdioDropErr(p);\n }\n\n}\n\nexport { fromReadable, fromWritable, fromDuplex };\n"],"mappings":";;;;;;;AAQA,SAAgB,aACd,GACmB;AACnB,KAAI,aAAa,eAAgB,QAAO;CACxC,MAAM,SAAS;AACf,QAAO,IAAI,eAAe;EACxB,QAAQ,MAAM;AACZ,UAAO,GAAG,SAAS,SAAY,EAAE,QAAQ,KAAK,CAAC;AAC/C,UAAO,GAAG,eAAe,EAAE,OAAO,CAAC;AACnC,UAAO,GAAG,UAAU,QAAe,EAAE,MAAM,IAAI,CAAC;;EAElD,SAAS,YACN,OAA8D,UAC7D,OACD,EACD,KAAA;EAEH,CAAC;;;;;;;;;;ACjBJ,SAAgB,aACd,GACmB;AACnB,KAAG,aAAa,eAAgB,QAAO;CACvC,MAAM,SAAS;AACf,QAAO,IAAI,eAAe;EACxB,QAAQ,OAAO,OAAO,GAAG,UAAU,QAAe,EAAE,MAAM,IAAI,CAAC,EAAE,KAAA;EACjE,QAAQ,YACL,OAA8D,UAC7D,OACD,EACD,KAAA;EAEF,QAAQ,MAA2B,OAAO,OAAO,MAAM,KAAK,EAAE,KAAA;EAC9D,cAAc,OAAO,KAAK,EAAE,KAAA;EAC7B,CAAC;;;;;;;;;;;ACZJ,SAAgB,WACd,QAC0B;AAC1B,KAAI,kBAAkB,gBAAiB,QAAO;AAE9C,QAAO;EACL,UAAU,aAAkB,OAAO;EACnC,UAAU,aAAiB,OAAO;EACnC;;;;ACbH,SAAS,YAAe,GAAG,SAAiD;AAC1E,QAAO,IAAI,eAAkB,EAC3B,MAAM,YAAY;EAChB,IAAI,gBAAgB,QAAQ;AAE5B,UAAQ,QAAQ,OAAO,WAAW;AAChC,OAAI;IACF,MAAM,SAAS,OAAO,WAAW;AAEjC,WAAO,MAAM;KACX,MAAM,EAAE,MAAM,UAAU,MAAM,OAAO,MAAM;AAC3C,SAAI,KAAM;AACV,gBAAW,QAAQ,MAAM;;AAG3B,WAAO,aAAa;YACb,OAAO;AACd,eAAW,MAAM,MAAM;AACvB;aACQ;AACR;AACA,QAAI,kBAAkB,EACpB,YAAW,OAAO;;IAGtB;IAEL,CAAC;;;;;;;;;AAUJ,SAAgB,iBAEd,GAK0B;AAC1B,QAAO;EACL,UAAU,aAAiB,EAAE,MAAO;EACpC,UAAU,aAAkB,EAAE,OAAQ;EACvC;;;;;;;;;AAUH,SAAgB,oBAEd,GAK0B;AAI1B,QAAO;EACL,UAJY,aAAiB,EAAE,MAAO;EAKtC,UAAU,YAJG,aAAkB,EAAE,OAAQ,EAC5B,aAAkB,EAAE,OAAQ,CAGJ;EACtC;;;;;;;;;;;AAYH,SAAgB,yBAEd,GAKA,EAAE,UAGwB;CAC1B,MAAM,QAAQ,aAAiB,EAAE,MAAO;CACxC,MAAM,SAAS,aAAkB,EAAE,OAAQ;AAC3C,KAAI,EAAE,OACJ,cAAa,EAAE,OAAO,CAAC,OAAO,aAAa,OAAO,CAAC;AACrD,QAAO;EACL,UAAU;EAAO,UAAU;EAC5B;;;;;;;;;;;AAYH,SAAgB,UAEd,GAKA,EACE,WAIE,EAAE,EACoB;AAC1B,KAAI,WAAW,KAAA,EACb,QAAO,oBAAoB,EAAE;UAEpB,WAAW,KACpB,QAAO,iBAAiB,EAAE;MACrB;AAEL,MAAI,EAAE,OACJ,cAAa,EAAE,OAAO,CAAC,OAAO,aAAa,OAAO,CAAC;AACrD,SAAO,iBAAiB,EAAE"}
import type { Duplex } from "stream";
import { fromReadable } from "./fromReadable.ts";
import { fromWritable } from "./fromWritable.ts";
/**
* Converts a Node.js Duplex stream to a Web API TransformStream
* @template IN - The type of data being written (string or Uint8Array)
* @template OUT - The type of data being read (string or Uint8Array)
* @param duplex - The Node.js duplex stream to convert
* @returns A Web API TransformStream that wraps the Node.js duplex stream
*/
export function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
duplex: Duplex | TransformStream
): TransformStream<IN, OUT> {
if (duplex instanceof TransformStream) return duplex;
return {
readable: fromReadable<OUT>(duplex),
writable: fromWritable<IN>(duplex),
};
}
import type { Readable } from "stream";
/**
* Converts a Node.js Readable stream to a Web API ReadableStream
* @template T - The type of data being read (string or Uint8Array)
* @param i - The Node.js readable stream to convert
* @returns A Web API ReadableStream that wraps the Node.js stream
*/
export function fromReadable<T extends string | Uint8Array>(
i: Readable | NodeJS.ReadableStream | ReadableStream
): ReadableStream<T> {
if (i instanceof ReadableStream) return i
const stream = i as Readable;
return new ReadableStream({
start: (c) => {
stream.on("data", (data: T) => c.enqueue(data));
stream.on("close", () => c.close());
stream.on("error", (err: Error) => c.error(err));
},
cancel: (reason) => (
(stream as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.(
reason
),
undefined
),
});
}
import type { Writable } from "stream";
/**
* Converts a Node.js Writable stream to a Web API WritableStream
* @template T - The type of data being written (string or Uint8Array)
* @param i - The Node.js writable stream to convert
* @returns A Web API WritableStream that wraps the Node.js stream
*/
export function fromWritable<T extends string | Uint8Array>(
i: Writable | NodeJS.WritableStream | WritableStream
): WritableStream<T> {
if(i instanceof WritableStream) return i
const stream = i as Writable;
return new WritableStream({
start: (c) => (stream.on("error", (err: Error) => c.error(err)), undefined),
abort: (reason) => (
(stream as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.(
reason
),
undefined
),
write: (data: string | Uint8Array, c) => (stream.write(data), undefined),
close: () => (stream.end(), undefined),
});
}
import { exec } from "child_process";
import { Transform } from "stream";
import sflow from "./test-utils.ts";
import { fromStdioDropErr, fromStdioMergeError } from "./index.ts";
import { fromDuplex } from "./fromDuplex.ts";
import { fromReadable } from "./fromReadable.ts";
import { fromWritable } from "./fromWritable.ts";
it("from node streams, read + write", async () => {
// sh instance
const p = exec("sh");
await sflow("echo hello, world\n").pipeTo(fromWritable(p.stdin!));
const output = await sflow(fromReadable(p.stdout!)).text();
expect(output).toBe("hello, world\n");
});
it("fromStdio works", async () => {
const p = exec("sh");
const output = await sflow("echo hello, world\n")
.by(fromStdioDropErr(p))
.text();
expect(output).toBe("hello, world\n");
});
it("fromStdio drop error", async () => {
const p = exec("sh");
const output = await sflow("echo oops, error>&2 && echo hell, word\n")
.by(fromStdioDropErr(p))
.text();
expect(output).toBe("hell, word\n");
});
it("fromStdio merge error", async () => {
const p = exec("sh");
const output = await sflow("echo oops, error>&2 && echo hell, word\n")
.by(fromStdioMergeError(p))
.text();
expect(output).toBe("oops, error\nhell, word\n");
});
it("fromDuplex works with Transform stream", async () => {
const transform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
const output = await sflow("hello world\n")
.by(fromDuplex(transform))
.text();
expect(output).toBe("HELLO WORLD\n");
});
it("fromDuplex works with existing TransformStream", async () => {
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toString().toUpperCase());
}
});
const result = fromDuplex(upperCaseTransform);
expect(result).toBe(upperCaseTransform);
});
import type { Readable, Writable } from "stream";
import { fromReadable } from "./fromReadable.ts";
import { fromWritable } from "./fromWritable.ts";
import { fromDuplex } from "./fromDuplex.ts";
// Native implementation to replace sflow mergeStream
function mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> {
return new ReadableStream<T>({
start(controller) {
let activeStreams = streams.length;
streams.forEach(async (stream) => {
try {
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
controller.enqueue(value);
}
reader.releaseLock();
} catch (error) {
controller.error(error);
return;
} finally {
activeStreams--;
if (activeStreams === 0) {
controller.close();
}
}
});
}
});
}
/**
* Creates a TransformStream from a process's stdio, dropping stderr output
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @returns A TransformStream that connects stdin to stdout, ignoring stderr
*/
export function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
}
): TransformStream<IN, OUT> {
return {
writable: fromWritable<IN>(p.stdin!),
readable: fromReadable<OUT>(p.stdout!),
};
}
/**
* Creates a TransformStream from a process's stdio, merging stdout and stderr
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @returns A TransformStream that connects stdin to a merged stdout+stderr stream
*/
export function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
}
): TransformStream<IN, OUT> {
const stdin = fromWritable<IN>(p.stdin!);
const stdout = fromReadable<OUT>(p.stdout!);
const stderr = fromReadable<OUT>(p.stderr!);
return {
writable: stdin,
readable: mergeStream(stdout, stderr),
};
}
/**
* Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @param options - Configuration object
* @param options.stderr - The writable stream to forward stderr to
* @returns A TransformStream that connects stdin to stdout, forwarding stderr separately
*/
export function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
},
{ stderr }: {
stderr: Writable | WritableStream
}
): TransformStream<IN, OUT> {
const stdin = fromWritable<IN>(p.stdin!);
const stdout = fromReadable<OUT>(p.stdout!);
if (p.stderr)
fromReadable(p.stderr).pipeTo(fromWritable(stderr));
return {
writable: stdin, readable: stdout,
};
}
/**
* Creates a TransformStream from a process's stdio with configurable stderr handling
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @param options - Configuration object for stderr handling
* @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout
* @returns A TransformStream that connects stdin to stdout with the specified stderr behavior
*/
export function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
},
{
stderr,
}: {
/** specify stderr to forward, or set to null to drop. */
stderr?: Writable | WritableStream | null;
} = {}
): TransformStream<IN, OUT> {
if (stderr === undefined) {
return fromStdioMergeError(p);
} else if (stderr === null) {
return fromStdioDropErr(p);
} else {
// forward stderr if stderr is specified
if (p.stderr)
fromReadable(p.stderr).pipeTo(fromWritable(stderr));
return fromStdioDropErr(p);
}
}
export { fromReadable, fromWritable, fromDuplex };
/**
* Test utilities to replace sflow functionality
*/
/**
* Creates a ReadableStream from a string
*/
function fromString(input: string): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
const encoded = encoder.encode(input);
return new ReadableStream({
start(controller) {
controller.enqueue(encoded);
controller.close();
}
});
}
/**
* Converts a ReadableStream to text
*/
async function streamToText(stream: ReadableStream): Promise<string> {
const reader = stream.getReader();
const decoder = new TextDecoder();
let result = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Handle different value types
if (value instanceof Uint8Array || value instanceof ArrayBuffer) {
result += decoder.decode(value, { stream: true });
} else if (typeof value === 'string') {
result += value;
} else {
// Convert to Uint8Array if it's another type
const encoder = new TextEncoder();
result += decoder.decode(encoder.encode(String(value)), { stream: true });
}
}
result += decoder.decode(); // flush
} finally {
reader.releaseLock();
}
return result;
}
/**
* A chainable stream utility class to replace sflow functionality
*/
class StreamFlow {
constructor(private stream: ReadableStream) {}
/**
* Pipe through a transform stream
*/
by(transform: TransformStream): StreamFlow {
return new StreamFlow(this.stream.pipeThrough(transform));
}
/**
* Convert to text
*/
async text(): Promise<string> {
return streamToText(this.stream);
}
/**
* Pipe to a writable stream
*/
async pipeTo(writable: WritableStream): Promise<void> {
return this.stream.pipeTo(writable);
}
}
/**
* Main sflow replacement function
*/
function sflow(input: string | ReadableStream): StreamFlow {
if (typeof input === 'string') {
return new StreamFlow(fromString(input));
}
return new StreamFlow(input);
}
export default sflow;
+20
-16
{
"name": "from-node-stream",
"version": "0.1.2",
"version": "0.2.0",
"description": "convert nodejs-stream into webstream",

@@ -27,29 +27,33 @@ "keywords": [

"exports": {
"import": "./dist/index.js",
"types": "./index.ts"
"import": "./dist/index.mjs",
"types": "./ts/index.ts"
},
"main": "index.js",
"module": "index.ts",
"types": "./index.ts",
"main": "dist/index.mjs",
"module": "dist/index.mjs",
"types": "./ts/index.ts",
"files": [
"*.ts",
"ts",
"dist"
],
"scripts": {
"build": "bun build ./index.ts --outdir=dist --sourcemap=external",
"build": "tsdown ts/index.ts --format esm --sourcemap",
"typecheck": "tsgo --noEmit",
"prerelease": "bun run build && bun run test",
"release": "bunx standard-version && git push --follow-tags && npm publish",
"test": "bun test",
"prepare": "husky"
"prepublishOnly": "bun run build && bun run test",
"prepare": "husky",
"dev": "tsdown ts/index.ts --format esm --watch"
},
"dependencies": {},
"devDependencies": {
"phpdie": "^1.2.14",
"@types/bun": "^1.1.11",
"@types/jest": "^29.5.13",
"@types/node": "^22.10.7",
"@types/bun": "^1.3.11",
"@types/jest": "^30.0.0",
"@types/node": "^25.5.0",
"@typescript/native-preview": "^7.0.0-dev.20260318.1",
"husky": "^9.1.7",
"semantic-release": "^24.2.1",
"typescript": "^5.6.3"
"phpdie": "^1.7.0",
"semantic-release": "^25.0.3",
"tsdown": "^0.21.4",
"typescript": "^5.9.3"
}
}
// fromReadable.ts
function fromReadable(i) {
if (i instanceof ReadableStream)
return i;
return new ReadableStream({
start: (c) => {
i.on("data", (data) => c.enqueue(data));
i.on("close", () => c.close());
i.on("error", (err) => c.error(err));
},
cancel: (reason) => (i.destroy?.(reason), undefined)
});
}
// fromWritable.ts
function fromWritable(i) {
if (i instanceof WritableStream)
return i;
return new WritableStream({
start: (c) => (i.on("error", (err) => c.error(err)), undefined),
abort: (reason) => (i.destroy?.(reason), undefined),
write: (data, c) => (i.write(data), undefined),
close: () => (i.end(), undefined)
});
}
// fromDuplex.ts
function fromDuplex(duplex) {
if (duplex instanceof TransformStream)
return duplex;
return {
readable: fromReadable(duplex),
writable: fromWritable(duplex)
};
}
// index.ts
function mergeStream(...streams) {
return new ReadableStream({
start(controller) {
let activeStreams = streams.length;
streams.forEach(async (stream) => {
try {
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done)
break;
controller.enqueue(value);
}
reader.releaseLock();
} catch (error) {
controller.error(error);
return;
} finally {
activeStreams--;
if (activeStreams === 0) {
controller.close();
}
}
});
}
});
}
function fromStdioDropErr(p) {
return {
writable: fromWritable(p.stdin),
readable: fromReadable(p.stdout)
};
}
function fromStdioMergeError(p) {
const stdin = fromWritable(p.stdin);
const stdout = fromReadable(p.stdout);
const stderr = fromReadable(p.stderr);
return {
writable: stdin,
readable: mergeStream(stdout, stderr)
};
}
function fromStdioAndForwardError(p, { stderr }) {
const stdin = fromWritable(p.stdin);
const stdout = fromReadable(p.stdout);
if (p.stderr)
fromReadable(p.stderr).pipeTo(fromWritable(stderr));
return {
writable: stdin,
readable: stdout
};
}
function fromStdio(p, {
stderr
} = {}) {
if (stderr === undefined) {
return fromStdioMergeError(p);
} else if (stderr === null) {
return fromStdioDropErr(p);
} else {
if (p.stderr)
fromReadable(p.stderr).pipeTo(fromWritable(stderr));
return fromStdioDropErr(p);
}
}
export {
fromWritable,
fromStdioMergeError,
fromStdioDropErr,
fromStdioAndForwardError,
fromStdio,
fromReadable,
fromDuplex
};
//# debugId=11A3A3E91965B03864756E2164756E21
{
"version": 3,
"sources": ["../fromReadable.ts", "../fromWritable.ts", "../fromDuplex.ts", "../index.ts"],
"sourcesContent": [
"import type { Readable } from \"stream\";\n\n/**\n * Converts a Node.js Readable stream to a Web API ReadableStream\n * @template T - The type of data being read (string or Uint8Array)\n * @param i - The Node.js readable stream to convert\n * @returns A Web API ReadableStream that wraps the Node.js stream\n */\nexport function fromReadable<T extends string | Uint8Array>(\n i: Readable | NodeJS.ReadableStream | ReadableStream\n): ReadableStream<T> {\n if (i instanceof ReadableStream) return i\n return new ReadableStream({\n start: (c) => {\n i.on(\"data\", (data) => c.enqueue(data));\n i.on(\"close\", () => c.close());\n i.on(\"error\", (err) => c.error(err));\n },\n cancel: (reason) => (\n (i as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n });\n}\n",
"import type { Writable } from \"stream\";\n\n/**\n * Converts a Node.js Writable stream to a Web API WritableStream\n * @template T - The type of data being written (string or Uint8Array)\n * @param i - The Node.js writable stream to convert\n * @returns A Web API WritableStream that wraps the Node.js stream\n */\nexport function fromWritable<T extends string | Uint8Array>(\n i: Writable | NodeJS.WritableStream | WritableStream\n): WritableStream<T> {\n if(i instanceof WritableStream) return i\n return new WritableStream({\n start: (c) => (i.on(\"error\", (err) => c.error(err)), undefined),\n abort: (reason) => (\n (i as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n write: (data: string | Uint8Array, c) => (i.write(data), undefined),\n close: () => (i.end(), undefined),\n });\n}\n",
"import type { Duplex } from \"stream\";\nimport { fromReadable } from \"./fromReadable\";\nimport { fromWritable } from \"./fromWritable\";\n\n/**\n * Converts a Node.js Duplex stream to a Web API TransformStream\n * @template IN - The type of data being written (string or Uint8Array)\n * @template OUT - The type of data being read (string or Uint8Array)\n * @param duplex - The Node.js duplex stream to convert\n * @returns A Web API TransformStream that wraps the Node.js duplex stream\n */\nexport function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n duplex: Duplex | TransformStream\n): TransformStream<IN, OUT> {\n if (duplex instanceof TransformStream) return duplex;\n \n return {\n readable: fromReadable<OUT>(duplex),\n writable: fromWritable<IN>(duplex),\n };\n}",
"// Native implementation to replace sflow mergeStream\nfunction mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> {\n return new ReadableStream<T>({\n start(controller) {\n let activeStreams = streams.length;\n\n streams.forEach(async (stream) => {\n try {\n const reader = stream.getReader();\n\n while (true) {\n const { done, value } = await reader.read();\n if (done) break;\n controller.enqueue(value);\n }\n\n reader.releaseLock();\n } catch (error) {\n controller.error(error);\n return;\n } finally {\n activeStreams--;\n if (activeStreams === 0) {\n controller.close();\n }\n }\n });\n }\n });\n}\nimport { Readable, Writable } from \"stream\";\nimport { fromReadable } from \"./fromReadable\";\nimport { fromWritable } from \"./fromWritable\";\nimport { fromDuplex } from \"./fromDuplex\";\n\n/**\n * Creates a TransformStream from a process's stdio, dropping stderr output\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to stdout, ignoring stderr\n */\nexport function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n return {\n writable: fromWritable<IN>(p.stdin!),\n readable: fromReadable<OUT>(p.stdout!),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, merging stdout and stderr\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to a merged stdout+stderr stream\n */\nexport function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n const stderr = fromReadable<OUT>(p.stderr!);\n return {\n writable: stdin,\n readable: mergeStream(stdout, stderr),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object\n * @param options.stderr - The writable stream to forward stderr to\n * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately\n */\nexport function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n { stderr }: {\n stderr: Writable | WritableStream\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return {\n writable: stdin, readable: stdout,\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio with configurable stderr handling\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object for stderr handling\n * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout\n * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior\n */\nexport function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n {\n stderr,\n }: {\n /** specify stderr to forward, or set to null to drop. */\n stderr?: Writable | WritableStream | null;\n } = {}\n): TransformStream<IN, OUT> {\n if (stderr === undefined) {\n return fromStdioMergeError(p);\n\n } else if (stderr === null) {\n return fromStdioDropErr(p);\n } else {\n // forward stderr if stderr is specified\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return fromStdioDropErr(p);\n }\n\n}\n\nexport { fromReadable, fromWritable, fromDuplex };\n"
],
"mappings": ";AAQO,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,IAAI,aAAa;AAAA,IAAgB,OAAO;AAAA,EACxC,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,MAAM;AAAA,MACZ,EAAE,GAAG,QAAQ,CAAC,SAAS,EAAE,QAAQ,IAAI,CAAC;AAAA,MACtC,EAAE,GAAG,SAAS,MAAM,EAAE,MAAM,CAAC;AAAA,MAC7B,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC;AAAA;AAAA,IAErC,QAAQ,CAAC,YACN,EAAyD,UACxD,MACF,GACA;AAAA,EAEJ,CAAC;AAAA;;;AChBI,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,IAAG,aAAa;AAAA,IAAgB,OAAO;AAAA,EACvC,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,OAAO,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC,GAAG;AAAA,IACrD,OAAO,CAAC,YACL,EAAyD,UACxD,MACF,GACA;AAAA,IAEF,OAAO,CAAC,MAA2B,OAAO,EAAE,MAAM,IAAI,GAAG;AAAA,IACzD,OAAO,OAAO,EAAE,IAAI,GAAG;AAAA,EACzB,CAAC;AAAA;;;ACXI,SAAS,UAA2E,CACzF,QAC0B;AAAA,EAC1B,IAAI,kBAAkB;AAAA,IAAiB,OAAO;AAAA,EAE9C,OAAO;AAAA,IACL,UAAU,aAAkB,MAAM;AAAA,IAClC,UAAU,aAAiB,MAAM;AAAA,EACnC;AAAA;;;AClBF,SAAS,WAAc,IAAI,SAAiD;AAAA,EAC1E,OAAO,IAAI,eAAkB;AAAA,IAC3B,KAAK,CAAC,YAAY;AAAA,MAChB,IAAI,gBAAgB,QAAQ;AAAA,MAE5B,QAAQ,QAAQ,OAAO,WAAW;AAAA,QAChC,IAAI;AAAA,UACF,MAAM,SAAS,OAAO,UAAU;AAAA,UAEhC,OAAO,MAAM;AAAA,YACX,QAAQ,MAAM,UAAU,MAAM,OAAO,KAAK;AAAA,YAC1C,IAAI;AAAA,cAAM;AAAA,YACV,WAAW,QAAQ,KAAK;AAAA,UAC1B;AAAA,UAEA,OAAO,YAAY;AAAA,UACnB,OAAO,OAAO;AAAA,UACd,WAAW,MAAM,KAAK;AAAA,UACtB;AAAA,kBACA;AAAA,UACA;AAAA,UACA,IAAI,kBAAkB,GAAG;AAAA,YACvB,WAAW,MAAM;AAAA,UACnB;AAAA;AAAA,OAEH;AAAA;AAAA,EAEL,CAAC;AAAA;AAcI,SAAS,gBAAiF,CAE/F,GAK0B;AAAA,EAC1B,OAAO;AAAA,IACL,UAAU,aAAiB,EAAE,KAAM;AAAA,IACnC,UAAU,aAAkB,EAAE,MAAO;AAAA,EACvC;AAAA;AAUK,SAAS,mBAAoF,CAElG,GAK0B;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,OAAO;AAAA,IACL,UAAU;AAAA,IACV,UAAU,YAAY,QAAQ,MAAM;AAAA,EACtC;AAAA;AAYK,SAAS,wBAAyF,CAEvG,KAKE,UAGwB;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,IAAI,EAAE;AAAA,IACJ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,EACpD,OAAO;AAAA,IACL,UAAU;AAAA,IAAO,UAAU;AAAA,EAC7B;AAAA;AAYK,SAAS,SAA0E,CAExF;AAAA,EAME;AAAA,IAIE,CAAC,GACqB;AAAA,EAC1B,IAAI,WAAW,WAAW;AAAA,IACxB,OAAO,oBAAoB,CAAC;AAAA,EAE9B,EAAO,SAAI,WAAW,MAAM;AAAA,IAC1B,OAAO,iBAAiB,CAAC;AAAA,EAC3B,EAAO;AAAA,IAEL,IAAI,EAAE;AAAA,MACJ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,IACpD,OAAO,iBAAiB,CAAC;AAAA;AAAA;",
"debugId": "11A3A3E91965B03864756E2164756E21",
"names": []
}
import type { Duplex } from "stream";
import { fromReadable } from "./fromReadable";
import { fromWritable } from "./fromWritable";
/**
* Converts a Node.js Duplex stream to a Web API TransformStream
* @template IN - The type of data being written (string or Uint8Array)
* @template OUT - The type of data being read (string or Uint8Array)
* @param duplex - The Node.js duplex stream to convert
* @returns A Web API TransformStream that wraps the Node.js duplex stream
*/
export function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
duplex: Duplex | TransformStream
): TransformStream<IN, OUT> {
if (duplex instanceof TransformStream) return duplex;
return {
readable: fromReadable<OUT>(duplex),
writable: fromWritable<IN>(duplex),
};
}
import type { Readable } from "stream";
/**
* Converts a Node.js Readable stream to a Web API ReadableStream
* @template T - The type of data being read (string or Uint8Array)
* @param i - The Node.js readable stream to convert
* @returns A Web API ReadableStream that wraps the Node.js stream
*/
export function fromReadable<T extends string | Uint8Array>(
i: Readable | NodeJS.ReadableStream | ReadableStream
): ReadableStream<T> {
if (i instanceof ReadableStream) return i
return new ReadableStream({
start: (c) => {
i.on("data", (data) => c.enqueue(data));
i.on("close", () => c.close());
i.on("error", (err) => c.error(err));
},
cancel: (reason) => (
(i as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.(
reason
),
undefined
),
});
}
import type { Writable } from "stream";
/**
* Converts a Node.js Writable stream to a Web API WritableStream
* @template T - The type of data being written (string or Uint8Array)
* @param i - The Node.js writable stream to convert
* @returns A Web API WritableStream that wraps the Node.js stream
*/
export function fromWritable<T extends string | Uint8Array>(
i: Writable | NodeJS.WritableStream | WritableStream
): WritableStream<T> {
if(i instanceof WritableStream) return i
return new WritableStream({
start: (c) => (i.on("error", (err) => c.error(err)), undefined),
abort: (reason) => (
(i as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.(
reason
),
undefined
),
write: (data: string | Uint8Array, c) => (i.write(data), undefined),
close: () => (i.end(), undefined),
});
}
import { exec } from "child_process";
import { Transform } from "stream";
import sflow from "./test-utils";
import { fromStdioDropErr, fromStdioMergeError } from ".";
import { fromDuplex } from "./fromDuplex";
import { fromReadable } from "./fromReadable";
import { fromWritable } from "./fromWritable";
it("from node streams, read + write", async () => {
// sh instance
const p = exec("sh");
await sflow("echo hello, world\n").pipeTo(fromWritable(p.stdin!));
const output = await sflow(fromReadable(p.stdout!)).text();
expect(output).toBe("hello, world\n");
});
it("fromStdio works", async () => {
const p = exec("sh");
const output = await sflow("echo hello, world\n")
.by(fromStdioDropErr(p))
.text();
expect(output).toBe("hello, world\n");
});
it("fromStdio drop error", async () => {
const p = exec("sh");
const output = await sflow("echo oops, error>&2 && echo hell, word\n")
.by(fromStdioDropErr(p))
.text();
expect(output).toBe("hell, word\n");
});
it("fromStdio merge error", async () => {
const p = exec("sh");
const output = await sflow("echo oops, error>&2 && echo hell, word\n")
.by(fromStdioMergeError(p))
.text();
expect(output).toBe("oops, error\nhell, word\n");
});
it("fromDuplex works with Transform stream", async () => {
const transform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
const output = await sflow("hello world\n")
.by(fromDuplex(transform))
.text();
expect(output).toBe("HELLO WORLD\n");
});
it("fromDuplex works with existing TransformStream", async () => {
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toString().toUpperCase());
}
});
const result = fromDuplex(upperCaseTransform);
expect(result).toBe(upperCaseTransform);
});
// Native implementation to replace sflow mergeStream
function mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> {
return new ReadableStream<T>({
start(controller) {
let activeStreams = streams.length;
streams.forEach(async (stream) => {
try {
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
controller.enqueue(value);
}
reader.releaseLock();
} catch (error) {
controller.error(error);
return;
} finally {
activeStreams--;
if (activeStreams === 0) {
controller.close();
}
}
});
}
});
}
import { Readable, Writable } from "stream";
import { fromReadable } from "./fromReadable";
import { fromWritable } from "./fromWritable";
import { fromDuplex } from "./fromDuplex";
/**
* Creates a TransformStream from a process's stdio, dropping stderr output
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @returns A TransformStream that connects stdin to stdout, ignoring stderr
*/
export function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
}
): TransformStream<IN, OUT> {
return {
writable: fromWritable<IN>(p.stdin!),
readable: fromReadable<OUT>(p.stdout!),
};
}
/**
* Creates a TransformStream from a process's stdio, merging stdout and stderr
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @returns A TransformStream that connects stdin to a merged stdout+stderr stream
*/
export function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
}
): TransformStream<IN, OUT> {
const stdin = fromWritable<IN>(p.stdin!);
const stdout = fromReadable<OUT>(p.stdout!);
const stderr = fromReadable<OUT>(p.stderr!);
return {
writable: stdin,
readable: mergeStream(stdout, stderr),
};
}
/**
* Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @param options - Configuration object
* @param options.stderr - The writable stream to forward stderr to
* @returns A TransformStream that connects stdin to stdout, forwarding stderr separately
*/
export function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
},
{ stderr }: {
stderr: Writable | WritableStream
}
): TransformStream<IN, OUT> {
const stdin = fromWritable<IN>(p.stdin!);
const stdout = fromReadable<OUT>(p.stdout!);
if (p.stderr)
fromReadable(p.stderr).pipeTo(fromWritable(stderr));
return {
writable: stdin, readable: stdout,
};
}
/**
* Creates a TransformStream from a process's stdio with configurable stderr handling
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @param options - Configuration object for stderr handling
* @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout
* @returns A TransformStream that connects stdin to stdout with the specified stderr behavior
*/
export function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
},
{
stderr,
}: {
/** specify stderr to forward, or set to null to drop. */
stderr?: Writable | WritableStream | null;
} = {}
): TransformStream<IN, OUT> {
if (stderr === undefined) {
return fromStdioMergeError(p);
} else if (stderr === null) {
return fromStdioDropErr(p);
} else {
// forward stderr if stderr is specified
if (p.stderr)
fromReadable(p.stderr).pipeTo(fromWritable(stderr));
return fromStdioDropErr(p);
}
}
export { fromReadable, fromWritable, fromDuplex };
/**
* Test utilities to replace sflow functionality
*/
/**
* Creates a ReadableStream from a string
*/
function fromString(input: string): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
const encoded = encoder.encode(input);
return new ReadableStream({
start(controller) {
controller.enqueue(encoded);
controller.close();
}
});
}
/**
* Converts a ReadableStream to text
*/
async function streamToText(stream: ReadableStream): Promise<string> {
const reader = stream.getReader();
const decoder = new TextDecoder();
let result = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Handle different value types
if (value instanceof Uint8Array || value instanceof ArrayBuffer) {
result += decoder.decode(value, { stream: true });
} else if (typeof value === 'string') {
result += value;
} else {
// Convert to Uint8Array if it's another type
const encoder = new TextEncoder();
result += decoder.decode(encoder.encode(String(value)), { stream: true });
}
}
result += decoder.decode(); // flush
} finally {
reader.releaseLock();
}
return result;
}
/**
* A chainable stream utility class to replace sflow functionality
*/
class StreamFlow {
constructor(private stream: ReadableStream) {}
/**
* Pipe through a transform stream
*/
by(transform: TransformStream): StreamFlow {
return new StreamFlow(this.stream.pipeThrough(transform));
}
/**
* Convert to text
*/
async text(): Promise<string> {
return streamToText(this.stream);
}
/**
* Pipe to a writable stream
*/
async pipeTo(writable: WritableStream): Promise<void> {
return this.stream.pipeTo(writable);
}
}
/**
* Main sflow replacement function
*/
function sflow(input: string | ReadableStream): StreamFlow {
if (typeof input === 'string') {
return new StreamFlow(fromString(input));
}
return new StreamFlow(input);
}
export default sflow;