Big News: Socket raises $60M Series C at a $1B valuation to secure software supply chains for AI-driven development.Announcement
Sign In

from-node-stream

Package Overview
Dependencies
Maintainers
1
Versions
14
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

from-node-stream - npm Package Compare versions

Comparing version
0.1.0
to
0.1.2
+21
fromDuplex.ts
import type { Duplex } from "stream";
import { fromReadable } from "./fromReadable";
import { fromWritable } from "./fromWritable";
/**
* Converts a Node.js Duplex stream to a Web API TransformStream
* @template IN - The type of data being written (string or Uint8Array)
* @template OUT - The type of data being read (string or Uint8Array)
* @param duplex - The Node.js duplex stream to convert
* @returns A Web API TransformStream that wraps the Node.js duplex stream
*/
export function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
duplex: Duplex | TransformStream
): TransformStream<IN, OUT> {
if (duplex instanceof TransformStream) return duplex;
return {
readable: fromReadable<OUT>(duplex),
writable: fromWritable<IN>(duplex),
};
}
/**
* Test utilities to replace sflow functionality
*/
/**
* Creates a ReadableStream from a string
*/
function fromString(input: string): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
const encoded = encoder.encode(input);
return new ReadableStream({
start(controller) {
controller.enqueue(encoded);
controller.close();
}
});
}
/**
* Converts a ReadableStream to text
*/
async function streamToText(stream: ReadableStream): Promise<string> {
const reader = stream.getReader();
const decoder = new TextDecoder();
let result = '';
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
// Handle different value types
if (value instanceof Uint8Array || value instanceof ArrayBuffer) {
result += decoder.decode(value, { stream: true });
} else if (typeof value === 'string') {
result += value;
} else {
// Convert to Uint8Array if it's another type
const encoder = new TextEncoder();
result += decoder.decode(encoder.encode(String(value)), { stream: true });
}
}
result += decoder.decode(); // flush
} finally {
reader.releaseLock();
}
return result;
}
/**
* A chainable stream utility class to replace sflow functionality
*/
class StreamFlow {
constructor(private stream: ReadableStream) {}
/**
* Pipe through a transform stream
*/
by(transform: TransformStream): StreamFlow {
return new StreamFlow(this.stream.pipeThrough(transform));
}
/**
* Convert to text
*/
async text(): Promise<string> {
return streamToText(this.stream);
}
/**
* Pipe to a writable stream
*/
async pipeTo(writable: WritableStream): Promise<void> {
return this.stream.pipeTo(writable);
}
}
/**
* Main sflow replacement function
*/
function sflow(input: string | ReadableStream): StreamFlow {
if (typeof input === 'string') {
return new StreamFlow(fromString(input));
}
return new StreamFlow(input);
}
export default sflow;
+46
-7

@@ -1,6 +0,5 @@

// index.ts
import { mergeStream } from "sflow";
// fromReadable.ts
function fromReadable(i) {
if (i instanceof ReadableStream)
return i;
return new ReadableStream({

@@ -18,2 +17,4 @@ start: (c) => {

function fromWritable(i) {
if (i instanceof WritableStream)
return i;
return new WritableStream({

@@ -27,3 +28,40 @@ start: (c) => (i.on("error", (err) => c.error(err)), undefined),

// fromDuplex.ts
function fromDuplex(duplex) {
if (duplex instanceof TransformStream)
return duplex;
return {
readable: fromReadable(duplex),
writable: fromWritable(duplex)
};
}
// index.ts
function mergeStream(...streams) {
return new ReadableStream({
start(controller) {
let activeStreams = streams.length;
streams.forEach(async (stream) => {
try {
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done)
break;
controller.enqueue(value);
}
reader.releaseLock();
} catch (error) {
controller.error(error);
return;
} finally {
activeStreams--;
if (activeStreams === 0) {
controller.close();
}
}
});
}
});
}
function fromStdioDropErr(p) {

@@ -47,3 +85,3 @@ return {

const stdout = fromReadable(p.stdout);
if (p.stderr?.pipe)
if (p.stderr)
fromReadable(p.stderr).pipeTo(fromWritable(stderr));

@@ -63,3 +101,3 @@ return {

} else {
if (p.stderr?.pipe)
if (p.stderr)
fromReadable(p.stderr).pipeTo(fromWritable(stderr));

@@ -75,5 +113,6 @@ return fromStdioDropErr(p);

fromStdio,
fromReadable
fromReadable,
fromDuplex
};
//# debugId=C247144402D037BB64756E2164756E21
//# debugId=11A3A3E91965B03864756E2164756E21
+7
-6
{
"version": 3,
"sources": ["../index.ts", "../fromReadable.ts", "../fromWritable.ts"],
"sources": ["../fromReadable.ts", "../fromWritable.ts", "../fromDuplex.ts", "../index.ts"],
"sourcesContent": [
"import { mergeStream } from \"sflow\"; // TODO: ensure tree shake sflow\nimport { Readable, Writable } from \"stream\";\nimport { fromReadable } from \"./fromReadable\";\nimport { fromWritable } from \"./fromWritable\";\n\n/**\n * Creates a TransformStream from a process's stdio, dropping stderr output\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to stdout, ignoring stderr\n */\nexport function fromStdioDropErr<IN extends string|Uint8Array, OUT extends string|Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | null;\n stdout?: Readable | null;\n stderr?: Readable | null;\n }\n): TransformStream<IN, OUT> {\n return {\n writable: fromWritable<IN>(p.stdin!),\n readable: fromReadable<OUT>(p.stdout!),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, merging stdout and stderr\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to a merged stdout+stderr stream\n */\nexport function fromStdioMergeError<IN extends string|Uint8Array, OUT extends string|Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | null;\n stdout?: Readable | null;\n stderr?: Readable | null;\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n const stderr = fromReadable<OUT>(p.stderr!);\n return {\n writable: stdin,\n readable: mergeStream(stdout, stderr),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object\n * @param options.stderr - The writable stream to forward stderr to\n * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately\n */\nexport function fromStdioAndForwardError<IN extends string|Uint8Array, OUT extends string|Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | null;\n stdout?: Readable | null;\n stderr?: Readable | null;\n },\n { stderr }: {\n stderr: Writable\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n if (p.stderr?.pipe)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return {\n writable: stdin, readable: stdout,\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio with configurable stderr handling\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object for stderr handling\n * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout\n * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior\n */\nexport function fromStdio<IN extends string|Uint8Array, OUT extends string|Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | null;\n stdout?: Readable | null;\n stderr?: Readable | null;\n },\n {\n stderr,\n }: {\n /** specify stderr to forward, or set to null to drop. */\n stderr?: Writable | null;\n } = {}\n): TransformStream<IN, OUT> {\n if (stderr === undefined) {\n return fromStdioMergeError(p);\n\n } else if (stderr === null) {\n return fromStdioDropErr(p);\n } else {\n // forward stderr if stderr is specified\n if (p.stderr?.pipe)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return fromStdioDropErr(p);\n }\n\n}\n\nexport { fromReadable, fromWritable };\n",
"import type { Readable } from \"stream\";\n\n/**\n * Converts a Node.js Readable stream to a Web API ReadableStream\n * @template T - The type of data being read (string or Uint8Array)\n * @param i - The Node.js readable stream to convert\n * @returns A Web API ReadableStream that wraps the Node.js stream\n */\nexport function fromReadable<T extends string | Uint8Array>(\n i: Readable | NodeJS.ReadableStream\n): ReadableStream<T> {\n return new ReadableStream({\n start: (c) => {\n i.on(\"data\", (data) => c.enqueue(data));\n i.on(\"close\", () => c.close());\n i.on(\"error\", (err) => c.error(err));\n },\n cancel: (reason) => (\n (i as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n });\n}\n",
"import type { Writable } from \"stream\";\n\n/**\n * Converts a Node.js Writable stream to a Web API WritableStream\n * @template T - The type of data being written (string or Uint8Array)\n * @param i - The Node.js writable stream to convert\n * @returns A Web API WritableStream that wraps the Node.js stream\n */\nexport function fromWritable<T extends string | Uint8Array>(\n i: Writable | NodeJS.WritableStream\n): WritableStream<T> {\n return new WritableStream({\n start: (c) => (i.on(\"error\", (err) => c.error(err)), undefined),\n abort: (reason) => (\n (i as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n write: (data: string | Uint8Array, c) => (i.write(data), undefined),\n close: () => (i.end(), undefined),\n });\n}\n"
"import type { Readable } from \"stream\";\n\n/**\n * Converts a Node.js Readable stream to a Web API ReadableStream\n * @template T - The type of data being read (string or Uint8Array)\n * @param i - The Node.js readable stream to convert\n * @returns A Web API ReadableStream that wraps the Node.js stream\n */\nexport function fromReadable<T extends string | Uint8Array>(\n i: Readable | NodeJS.ReadableStream | ReadableStream\n): ReadableStream<T> {\n if (i instanceof ReadableStream) return i\n return new ReadableStream({\n start: (c) => {\n i.on(\"data\", (data) => c.enqueue(data));\n i.on(\"close\", () => c.close());\n i.on(\"error\", (err) => c.error(err));\n },\n cancel: (reason) => (\n (i as Partial<Readable> & Partial<NodeJS.ReadableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n });\n}\n",
"import type { Writable } from \"stream\";\n\n/**\n * Converts a Node.js Writable stream to a Web API WritableStream\n * @template T - The type of data being written (string or Uint8Array)\n * @param i - The Node.js writable stream to convert\n * @returns A Web API WritableStream that wraps the Node.js stream\n */\nexport function fromWritable<T extends string | Uint8Array>(\n i: Writable | NodeJS.WritableStream | WritableStream\n): WritableStream<T> {\n if(i instanceof WritableStream) return i\n return new WritableStream({\n start: (c) => (i.on(\"error\", (err) => c.error(err)), undefined),\n abort: (reason) => (\n (i as Partial<Writable> & Partial<NodeJS.WritableStream>).destroy?.(\n reason\n ),\n undefined\n ),\n write: (data: string | Uint8Array, c) => (i.write(data), undefined),\n close: () => (i.end(), undefined),\n });\n}\n",
"import type { Duplex } from \"stream\";\nimport { fromReadable } from \"./fromReadable\";\nimport { fromWritable } from \"./fromWritable\";\n\n/**\n * Converts a Node.js Duplex stream to a Web API TransformStream\n * @template IN - The type of data being written (string or Uint8Array)\n * @template OUT - The type of data being read (string or Uint8Array)\n * @param duplex - The Node.js duplex stream to convert\n * @returns A Web API TransformStream that wraps the Node.js duplex stream\n */\nexport function fromDuplex<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n duplex: Duplex | TransformStream\n): TransformStream<IN, OUT> {\n if (duplex instanceof TransformStream) return duplex;\n \n return {\n readable: fromReadable<OUT>(duplex),\n writable: fromWritable<IN>(duplex),\n };\n}",
"// Native implementation to replace sflow mergeStream\nfunction mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> {\n return new ReadableStream<T>({\n start(controller) {\n let activeStreams = streams.length;\n\n streams.forEach(async (stream) => {\n try {\n const reader = stream.getReader();\n\n while (true) {\n const { done, value } = await reader.read();\n if (done) break;\n controller.enqueue(value);\n }\n\n reader.releaseLock();\n } catch (error) {\n controller.error(error);\n return;\n } finally {\n activeStreams--;\n if (activeStreams === 0) {\n controller.close();\n }\n }\n });\n }\n });\n}\nimport { Readable, Writable } from \"stream\";\nimport { fromReadable } from \"./fromReadable\";\nimport { fromWritable } from \"./fromWritable\";\nimport { fromDuplex } from \"./fromDuplex\";\n\n/**\n * Creates a TransformStream from a process's stdio, dropping stderr output\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to stdout, ignoring stderr\n */\nexport function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n return {\n writable: fromWritable<IN>(p.stdin!),\n readable: fromReadable<OUT>(p.stdout!),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, merging stdout and stderr\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @returns A TransformStream that connects stdin to a merged stdout+stderr stream\n */\nexport function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n const stderr = fromReadable<OUT>(p.stderr!);\n return {\n writable: stdin,\n readable: mergeStream(stdout, stderr),\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object\n * @param options.stderr - The writable stream to forward stderr to\n * @returns A TransformStream that connects stdin to stdout, forwarding stderr separately\n */\nexport function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n { stderr }: {\n stderr: Writable | WritableStream\n }\n): TransformStream<IN, OUT> {\n const stdin = fromWritable<IN>(p.stdin!);\n const stdout = fromReadable<OUT>(p.stdout!);\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return {\n writable: stdin, readable: stdout,\n };\n}\n\n/**\n * Creates a TransformStream from a process's stdio with configurable stderr handling\n * @template IN - Input data type (string or Uint8Array)\n * @template OUT - Output data type (string or Uint8Array)\n * @param p - A process object with stdin, stdout, and stderr streams\n * @param options - Configuration object for stderr handling\n * @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout\n * @returns A TransformStream that connects stdin to stdout with the specified stderr behavior\n */\nexport function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(\n /** a process, which has stdin, stdout, stderr */\n p: {\n stdin?: Writable | WritableStream | null;\n stdout?: Readable | ReadableStream | null;\n stderr?: Readable | ReadableStream | null;\n },\n {\n stderr,\n }: {\n /** specify stderr to forward, or set to null to drop. */\n stderr?: Writable | WritableStream | null;\n } = {}\n): TransformStream<IN, OUT> {\n if (stderr === undefined) {\n return fromStdioMergeError(p);\n\n } else if (stderr === null) {\n return fromStdioDropErr(p);\n } else {\n // forward stderr if stderr is specified\n if (p.stderr)\n fromReadable(p.stderr).pipeTo(fromWritable(stderr));\n return fromStdioDropErr(p);\n }\n\n}\n\nexport { fromReadable, fromWritable, fromDuplex };\n"
],
"mappings": ";AAAA;;;ACQO,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,MAAM;AAAA,MACZ,EAAE,GAAG,QAAQ,CAAC,SAAS,EAAE,QAAQ,IAAI,CAAC;AAAA,MACtC,EAAE,GAAG,SAAS,MAAM,EAAE,MAAM,CAAC;AAAA,MAC7B,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC;AAAA;AAAA,IAErC,QAAQ,CAAC,YACN,EAAyD,UACxD,MACF,GACA;AAAA,EAEJ,CAAC;AAAA;;;ACfI,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,OAAO,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC,GAAG;AAAA,IACrD,OAAO,CAAC,YACL,EAAyD,UACxD,MACF,GACA;AAAA,IAEF,OAAO,CAAC,MAA2B,OAAO,EAAE,MAAM,IAAI,GAAG;AAAA,IACzD,OAAO,OAAO,EAAE,IAAI,GAAG;AAAA,EACzB,CAAC;AAAA;;;AFTI,SAAS,gBAA6E,CAE3F,GAK0B;AAAA,EAC1B,OAAO;AAAA,IACL,UAAU,aAAiB,EAAE,KAAM;AAAA,IACnC,UAAU,aAAkB,EAAE,MAAO;AAAA,EACvC;AAAA;AAUK,SAAS,mBAAgF,CAE9F,GAK0B;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,OAAO;AAAA,IACL,UAAU;AAAA,IACV,UAAU,YAAY,QAAQ,MAAM;AAAA,EACtC;AAAA;AAYK,SAAS,wBAAqF,CAEnG,KAKE,UAGwB;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,IAAI,EAAE,QAAQ;AAAA,IACZ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,EACpD,OAAO;AAAA,IACL,UAAU;AAAA,IAAO,UAAU;AAAA,EAC7B;AAAA;AAYK,SAAS,SAAsE,CAEpF;AAAA,EAME;AAAA,IAIE,CAAC,GACqB;AAAA,EAC1B,IAAI,WAAW,WAAW;AAAA,IACxB,OAAO,oBAAoB,CAAC;AAAA,EAE9B,EAAO,SAAI,WAAW,MAAM;AAAA,IAC1B,OAAO,iBAAiB,CAAC;AAAA,EAC3B,EAAO;AAAA,IAEL,IAAI,EAAE,QAAQ;AAAA,MACZ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,IACpD,OAAO,iBAAiB,CAAC;AAAA;AAAA;",
"debugId": "C247144402D037BB64756E2164756E21",
"mappings": ";AAQO,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,IAAI,aAAa;AAAA,IAAgB,OAAO;AAAA,EACxC,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,MAAM;AAAA,MACZ,EAAE,GAAG,QAAQ,CAAC,SAAS,EAAE,QAAQ,IAAI,CAAC;AAAA,MACtC,EAAE,GAAG,SAAS,MAAM,EAAE,MAAM,CAAC;AAAA,MAC7B,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC;AAAA;AAAA,IAErC,QAAQ,CAAC,YACN,EAAyD,UACxD,MACF,GACA;AAAA,EAEJ,CAAC;AAAA;;;AChBI,SAAS,YAA2C,CACzD,GACmB;AAAA,EACnB,IAAG,aAAa;AAAA,IAAgB,OAAO;AAAA,EACvC,OAAO,IAAI,eAAe;AAAA,IACxB,OAAO,CAAC,OAAO,EAAE,GAAG,SAAS,CAAC,QAAQ,EAAE,MAAM,GAAG,CAAC,GAAG;AAAA,IACrD,OAAO,CAAC,YACL,EAAyD,UACxD,MACF,GACA;AAAA,IAEF,OAAO,CAAC,MAA2B,OAAO,EAAE,MAAM,IAAI,GAAG;AAAA,IACzD,OAAO,OAAO,EAAE,IAAI,GAAG;AAAA,EACzB,CAAC;AAAA;;;ACXI,SAAS,UAA2E,CACzF,QAC0B;AAAA,EAC1B,IAAI,kBAAkB;AAAA,IAAiB,OAAO;AAAA,EAE9C,OAAO;AAAA,IACL,UAAU,aAAkB,MAAM;AAAA,IAClC,UAAU,aAAiB,MAAM;AAAA,EACnC;AAAA;;;AClBF,SAAS,WAAc,IAAI,SAAiD;AAAA,EAC1E,OAAO,IAAI,eAAkB;AAAA,IAC3B,KAAK,CAAC,YAAY;AAAA,MAChB,IAAI,gBAAgB,QAAQ;AAAA,MAE5B,QAAQ,QAAQ,OAAO,WAAW;AAAA,QAChC,IAAI;AAAA,UACF,MAAM,SAAS,OAAO,UAAU;AAAA,UAEhC,OAAO,MAAM;AAAA,YACX,QAAQ,MAAM,UAAU,MAAM,OAAO,KAAK;AAAA,YAC1C,IAAI;AAAA,cAAM;AAAA,YACV,WAAW,QAAQ,KAAK;AAAA,UAC1B;AAAA,UAEA,OAAO,YAAY;AAAA,UACnB,OAAO,OAAO;AAAA,UACd,WAAW,MAAM,KAAK;AAAA,UACtB;AAAA,kBACA;AAAA,UACA;AAAA,UACA,IAAI,kBAAkB,GAAG;AAAA,YACvB,WAAW,MAAM;AAAA,UACnB;AAAA;AAAA,OAEH;AAAA;AAAA,EAEL,CAAC;AAAA;AAcI,SAAS,gBAAiF,CAE/F,GAK0B;AAAA,EAC1B,OAAO;AAAA,IACL,UAAU,aAAiB,EAAE,KAAM;AAAA,IACnC,UAAU,aAAkB,EAAE,MAAO;AAAA,EACvC;AAAA;AAUK,SAAS,mBAAoF,CAElG,GAK0B;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,OAAO;AAAA,IACL,UAAU;AAAA,IACV,UAAU,YAAY,QAAQ,MAAM;AAAA,EACtC;AAAA;AAYK,SAAS,wBAAyF,CAEvG,KAKE,UAGwB;AAAA,EAC1B,MAAM,QAAQ,aAAiB,EAAE,KAAM;AAAA,EACvC,MAAM,SAAS,aAAkB,EAAE,MAAO;AAAA,EAC1C,IAAI,EAAE;AAAA,IACJ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,EACpD,OAAO;AAAA,IACL,UAAU;AAAA,IAAO,UAAU;AAAA,EAC7B;AAAA;AAYK,SAAS,SAA0E,CAExF;AAAA,EAME;AAAA,IAIE,CAAC,GACqB;AAAA,EAC1B,IAAI,WAAW,WAAW;AAAA,IACxB,OAAO,oBAAoB,CAAC;AAAA,EAE9B,EAAO,SAAI,WAAW,MAAM;AAAA,IAC1B,OAAO,iBAAiB,CAAC;AAAA,EAC3B,EAAO;AAAA,IAEL,IAAI,EAAE;AAAA,MACJ,aAAa,EAAE,MAAM,EAAE,OAAO,aAAa,MAAM,CAAC;AAAA,IACpD,OAAO,iBAAiB,CAAC;AAAA;AAAA;",
"debugId": "11A3A3E91965B03864756E2164756E21",
"names": []
}

@@ -10,4 +10,5 @@ import type { Readable } from "stream";

export function fromReadable<T extends string | Uint8Array>(
i: Readable | NodeJS.ReadableStream
i: Readable | NodeJS.ReadableStream | ReadableStream
): ReadableStream<T> {
if (i instanceof ReadableStream) return i
return new ReadableStream({

@@ -14,0 +15,0 @@ start: (c) => {

@@ -10,4 +10,5 @@ import type { Writable } from "stream";

export function fromWritable<T extends string | Uint8Array>(
i: Writable | NodeJS.WritableStream
i: Writable | NodeJS.WritableStream | WritableStream
): WritableStream<T> {
if(i instanceof WritableStream) return i
return new WritableStream({

@@ -14,0 +15,0 @@ start: (c) => (i.on("error", (err) => c.error(err)), undefined),

import { exec } from "child_process";
import sflow from "sflow";
import { Transform } from "stream";
import sflow from "./test-utils";
import { fromStdioDropErr, fromStdioMergeError } from ".";
import { fromDuplex } from "./fromDuplex";
import { fromReadable } from "./fromReadable";

@@ -42,1 +44,26 @@ import { fromWritable } from "./fromWritable";

});
it("fromDuplex works with Transform stream", async () => {
const transform = new Transform({
transform(chunk, encoding, callback) {
this.push(chunk.toString().toUpperCase());
callback();
}
});
const output = await sflow("hello world\n")
.by(fromDuplex(transform))
.text();
expect(output).toBe("HELLO WORLD\n");
});
it("fromDuplex works with existing TransformStream", async () => {
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toString().toUpperCase());
}
});
const result = fromDuplex(upperCaseTransform);
expect(result).toBe(upperCaseTransform);
});
+52
-22

@@ -1,5 +0,35 @@

import { mergeStream } from "sflow"; // TODO: ensure tree shake sflow
// Native implementation to replace sflow mergeStream
function mergeStream<T>(...streams: ReadableStream<T>[]): ReadableStream<T> {
return new ReadableStream<T>({
start(controller) {
let activeStreams = streams.length;
streams.forEach(async (stream) => {
try {
const reader = stream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
controller.enqueue(value);
}
reader.releaseLock();
} catch (error) {
controller.error(error);
return;
} finally {
activeStreams--;
if (activeStreams === 0) {
controller.close();
}
}
});
}
});
}
import { Readable, Writable } from "stream";
import { fromReadable } from "./fromReadable";
import { fromWritable } from "./fromWritable";
import { fromDuplex } from "./fromDuplex";

@@ -13,8 +43,8 @@ /**

*/
export function fromStdioDropErr<IN extends string|Uint8Array, OUT extends string|Uint8Array>(
export function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | null;
stdout?: Readable | null;
stderr?: Readable | null;
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
}

@@ -35,8 +65,8 @@ ): TransformStream<IN, OUT> {

*/
export function fromStdioMergeError<IN extends string|Uint8Array, OUT extends string|Uint8Array>(
export function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | null;
stdout?: Readable | null;
stderr?: Readable | null;
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
}

@@ -62,11 +92,11 @@ ): TransformStream<IN, OUT> {

*/
export function fromStdioAndForwardError<IN extends string|Uint8Array, OUT extends string|Uint8Array>(
export function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | null;
stdout?: Readable | null;
stderr?: Readable | null;
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
},
{ stderr }: {
stderr: Writable
stderr: Writable | WritableStream
}

@@ -76,3 +106,3 @@ ): TransformStream<IN, OUT> {

const stdout = fromReadable<OUT>(p.stdout!);
if (p.stderr?.pipe)
if (p.stderr)
fromReadable(p.stderr).pipeTo(fromWritable(stderr));

@@ -93,8 +123,8 @@ return {

*/
export function fromStdio<IN extends string|Uint8Array, OUT extends string|Uint8Array>(
export function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | null;
stdout?: Readable | null;
stderr?: Readable | null;
stdin?: Writable | WritableStream | null;
stdout?: Readable | ReadableStream | null;
stderr?: Readable | ReadableStream | null;
},

@@ -105,3 +135,3 @@ {

/** specify stderr to forward, or set to null to drop. */
stderr?: Writable | null;
stderr?: Writable | WritableStream | null;
} = {}

@@ -116,3 +146,3 @@ ): TransformStream<IN, OUT> {

// forward stderr if stderr is specified
if (p.stderr?.pipe)
if (p.stderr)
fromReadable(p.stderr).pipeTo(fromWritable(stderr));

@@ -124,2 +154,2 @@ return fromStdioDropErr(p);

export { fromReadable, fromWritable };
export { fromReadable, fromWritable, fromDuplex };
{
"name": "from-node-stream",
"version": "0.1.0",
"version": "0.1.2",
"description": "convert nodejs-stream into webstream",

@@ -10,5 +10,6 @@ "keywords": [

"Writable",
"Duplex",
"ReadableStream",
"WritableStream",
"TransformSteam"
"TransformStream"
],

@@ -38,5 +39,3 @@ "homepage": "https://github.com/snomiao/from-node-stream#readme",

"scripts": {
"build": "bun build:type && bun build:js",
"build:js": "bun build . --outdir=dist --sourcemap=external --packages=external",
"build:type": "tsc -d --noEmit false --emitDeclarationOnly --outDir dist",
"build": "bun build ./index.ts --outdir=dist --sourcemap=external",
"prerelease": "bun run build && bun run test",

@@ -47,6 +46,5 @@ "release": "bunx standard-version && git push --follow-tags && npm publish",

},
"dependencies": {
"phpdie": "^1.2.14"
},
"dependencies": {},
"devDependencies": {
"phpdie": "^1.2.14",
"@types/bun": "^1.1.11",

@@ -57,5 +55,4 @@ "@types/jest": "^29.5.13",

"semantic-release": "^24.2.1",
"sflow": "^1.19.1",
"typescript": "^5.6.3"
}
}
import type { Readable } from "stream";
/**
* Converts a Node.js Readable stream to a Web API ReadableStream
* @template T - The type of data being read (string or Uint8Array)
* @param i - The Node.js readable stream to convert
* @returns A Web API ReadableStream that wraps the Node.js stream
*/
export declare function fromReadable<T extends string | Uint8Array>(i: Readable | NodeJS.ReadableStream): ReadableStream<T>;
import type { Writable } from "stream";
/**
* Converts a Node.js Writable stream to a Web API WritableStream
* @template T - The type of data being written (string or Uint8Array)
* @param i - The Node.js writable stream to convert
* @returns A Web API WritableStream that wraps the Node.js stream
*/
export declare function fromWritable<T extends string | Uint8Array>(i: Writable | NodeJS.WritableStream): WritableStream<T>;
import { Readable, Writable } from "stream";
import { fromReadable } from "./fromReadable";
import { fromWritable } from "./fromWritable";
/**
* Creates a TransformStream from a process's stdio, dropping stderr output
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @returns A TransformStream that connects stdin to stdout, ignoring stderr
*/
export declare function fromStdioDropErr<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | null;
stdout?: Readable | null;
stderr?: Readable | null;
}): TransformStream<IN, OUT>;
/**
* Creates a TransformStream from a process's stdio, merging stdout and stderr
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @returns A TransformStream that connects stdin to a merged stdout+stderr stream
*/
export declare function fromStdioMergeError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | null;
stdout?: Readable | null;
stderr?: Readable | null;
}): TransformStream<IN, OUT>;
/**
* Creates a TransformStream from a process's stdio, forwarding stderr to a specified stream
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @param options - Configuration object
* @param options.stderr - The writable stream to forward stderr to
* @returns A TransformStream that connects stdin to stdout, forwarding stderr separately
*/
export declare function fromStdioAndForwardError<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | null;
stdout?: Readable | null;
stderr?: Readable | null;
}, { stderr }: {
stderr: Writable;
}): TransformStream<IN, OUT>;
/**
* Creates a TransformStream from a process's stdio with configurable stderr handling
* @template IN - Input data type (string or Uint8Array)
* @template OUT - Output data type (string or Uint8Array)
* @param p - A process object with stdin, stdout, and stderr streams
* @param options - Configuration object for stderr handling
* @param options.stderr - Writable stream to forward stderr to, or null to drop stderr, or undefined to merge with stdout
* @returns A TransformStream that connects stdin to stdout with the specified stderr behavior
*/
export declare function fromStdio<IN extends string | Uint8Array, OUT extends string | Uint8Array>(
/** a process, which has stdin, stdout, stderr */
p: {
stdin?: Writable | null;
stdout?: Readable | null;
stderr?: Readable | null;
}, { stderr, }?: {
/** specify stderr to forward, or set to null to drop. */
stderr?: Writable | null;
}): TransformStream<IN, OUT>;
export { fromReadable, fromWritable };