Stream-Editor

Features
Partial replacement
A partial replacement is replacing only the 1st parenthesized capture group substring match with replacement specified, allowing a simpler syntax and a minimum modification.
Take the following snippet converting something like import x from "../src/x.mjs" into import x from "../build/x.mjs" as an example:
import { sed as updateFileContent } from "stream-editor";
updateFileContent({
file: "index.mjs",
search: matchParentFolderImport(/(src\/(.+?))/),
replacement: "build/$2",
maxTimes: 2,
required: true
});
function matchImport (addtionalPattern) {
const parts = /import\s+.+\s+from\s*['"](.+?)['"];?/.source.split("(.+?)");
return new RegExp([
parts[0],
addtionalPattern.source,
parts[1]
].join(""));
}
Special replacement patterns (parenthesized capture group placeholders) are well supported in a partial replacement, either for function replacements or string replacements. And all other concepts are designed to keep firmly to their origins in vanilla String.prototype.replace method, though the $& (also the 1st supplied value to replace function) and $1 (the 2nd param passed) always have the same value, supplying the matched substring in 1st PCG.
You can specify a truthy isFullReplacement to perform a full replacment instead.
Asynchronous replacement
Yes, asynchronous function replacements just work like a charm.
import { streamEdit } from "stream-editor";
const filepath = "./index.js";
const dest = "./build/index.js";
streamEdit({
from: fs.createReadStream(filepath),
to: fs.createWriteStream(dest),
replace: [
{
match: /\$\{\{\s*([A-Z_-]+?)\s*\(\s*(.+?)\s*\)\s*\}\}/i,
replacement: async (whole, method, input) => {
switch (method.toUpperCase()) {
case "IMPORT":
input = input.replace(/^["']|["']$/g, "");
const importFilePath = path.join(path.dirname(filepath), input);
return fs.promises.readFile(importFilePath, "utf-8");
case "EXPR":
return (async () => String(await eval(input)))();
default:
throw new Error(`unknown method ${method} in ${whole}`);
}
}
}
],
defaultOptions: {
isFullReplacement: true
}
});
Substituting texts within files in streaming fashion
This package will create readable and writable streams connected to a single file at the same time, while disallowing any write operations to advance further than the current reading index. This feature is based on rw-stream's great work.
To accommodate RegEx replacement (which requires intact strings rather than chunks that may begin or end at any position) with streams, we brings separator (default: /(?<=\r?\n)/) and join (default: '') options into use. You should NOT specify separators that may divide text structures targeted by your RegEx searches, which would result in undefined behavior.
Moreover, as the RegEx replacement part in options is actually optional, stream-editor can also be used to break up streams and reassemble them like split2 does:
const { streamEdit } = require("stream-editor");
const filepath = join(__dirname, `./file.ndjson`);
await streamEdit({
file: filepath,
separator: "\r\n",
join: "\n"
});
await streamEdit({
from: createReadStream(filepath),
to: new Writable({
objectMode: true,
write(parsedObj, _enc, cb) {
return (
doSomething()
.then(() => cb())
.catch(cb)
);
}
}),
separator: "\n",
readableObjectMode: true,
postProcessing: part => JSON.parse(part),
abortController: new AbortController()
});
You can specify null as the separator to completely disable splitting.
Setting limits on Regular Expressions' maximum executed times
This is achieved by altering all replacement into replacement functions and adding layers of proxying on them.
const { sed: updateFiles } = require("stream-editor");
updateFiles({
files: commonjsFiles,
match: /^().*(\r?\n)/,
replacement: `"use strict";$2`,
maxTimes: 1
});
updateFiles({
files: commonjsFiles,
replace: [
{
match: /^().*(\r?\n)/,
replacement: `"use strict";$2`,
limit: 1
}
]
limit: 1
});
Once the limit specified by option limit is reached, underlying transform stream will become a transparent passThrough stream if option truncate is falsy, otherwise the remaining part will be discarded. In contrast, maxTimes just performs a removal on that search.
Transcoding streams or files
streamEdit({
from: createReadStream("gbk.txt"),
to: createWriteStream("hex.txt"),
decodeBuffers: "gbk",
encoding: "hex"
});
Option decodeBuffers is the specific character encoding, like utf-8, iso-8859-2, koi8, cp1261, gbk, etc for decoding the input raw buffer. Some encodings are only available for Node embedded the entire ICU but the good news is that full-icu has been made the default since v14+ (see https://github.com/nodejs/node/pull/29522).
Note that option decodeBuffers only makes sense when no encoding is assigned and stream data are passed as buffers. Below are some wrong input examples:
streamEdit({
from:
createReadStream("gbk.txt").setEncoding("utf8"),
to: createWriteStream("hex.txt"),
decodeBuffers: "gbk",
encoding: "hex"
});
streamEdit({
from:
createReadStream("gbk.txt", "utf8"),
to: createWriteStream("hex.txt"),
decodeBuffers: "gbk",
encoding: "hex"
});
Option encoding is for encoding all processed and joined strings to buffers with according encoding. Following options are supported by Node.js: ascii, utf8, utf-8, utf16le, ucs2, ucs-2, base64, latin1, binary, hex.
Piping/teeing/confluencing streams with proper error handling & propagation
Confluence:
const yamlFiles = await (
fsp.readdir(folderpath, { withFileTypes: true })
.then(dirents =>
dirents
.filter(dirent => dirent.isFile() && dirent.name.endsWith(".yaml"))
.sort((a, b) => a.name.localeCompare(b.name))
.map(({ name }) => createReadStream(join(folderpath, name)))
)
);
streamEdit({
from: yamlFiles,
to: createWriteStream(resultPath),
contentJoin: "\n\n"
});
Teeing:
streamEdit({
readableStream: new Readable({
read(size) {
}
}),
writableStreams: new Array(6).fill(0).map((_, i) =>
createWriteStream(join(resultFolderPath, `./test-source${i}`))
)
});
You can have a look at tests regarding error handling here.
No dependency
stream-editor previously depends on rw-stream, but for some historical reasons, I refactored rw-stream and bundled it as a part of this package. See src/rw-stream.
Currently, stream-editor has zero dependency.
High coverage tests
See https://github.com/edfus/stream-editor/tree/master/test.
Normalize & Replace
√ can handle sticky regular expressions
√ can handle string match with special characters
√ can handle partial replacement with placeholders
√ can handle non-capture-group parenthesized pattern: Assertions
√ can handle non-capture-group parenthesized pattern: Round brackets
√ can handle pattern starts with a capture group
√ can handle malformed (without capture groups) partial replacement
√ can await replace partially with function
√ recognize $\d{1,3} $& $` $' and check validity (throw warnings)
√ produce the same result as String.prototype.replace
Edit streams
√ should check arguments
√ should warn unknown/unneeded options
√ should respect FORCE_COLOR, NO_COLOR, NODE_DISABLE_COLORS
√ should pipe one Readable to multiple dumps (54ms)
√ should replace CRLF with LF
√ should have replaced /dum(b)/i to dumpling (while preserving dum's case)
√ should have global and local limits on replacement amount
√ should have line buffer maxLength
√ should edit and combine multiple Readable into one Writable
√ has readableObjectMode
√ can handle async replacements
√ can signal an unsuccessful substitution using beforeCompletion
√ can declare a limit below which a substitution is considered failed for a search
cancelation
√ should check validity
√ can abort a substitution before it has completed.
√ can handle already aborted controller
√ doesn't have memory leaks (is using WeakRef)
truncation & limitation
√ truncating the rest when limitations reached
√ not: self rw-stream
√ not: piping stream
transcoding
√ gbk to utf8 buffer
√ gbk to hex with HWM
error handling
√ destroys streams properly when one of them closed prematurely
√ destroys streams properly if errors occurred during initialization
√ multiple-to-one: can correctly propagate errors emitted by readableStreams
√ multiple-to-one: can handle prematurely destroyed readableStreams
√ multiple-to-one: can correctly propagate errors emitted by writableStream
√ multiple-to-one: can handle prematurely ended writableStream
√ multiple-to-one: can handle prematurely destroyed writableStream
√ one-to-multiple: can correctly propagate errors emitted by writableStreams
√ one-to-multiple: can handle prematurely ended writableStreams
√ one-to-multiple: can handle prematurely destroyed writableStreams
√ can handle errors thrown from postProcessing
√ can handle errors thrown from join functions
√ can handle errors thrown from replacement functions
corner cases
√ can handle empty content
√ can handle regular expressions that always match
√ can handle non-string in a regExp separator's split result
try-on
√ can handle files larger than 64KiB
49 passing (275ms)
API
Overview
This package has two named function exports: streamEdit and sed (an alias for streamEdit).
streamEdit returns a promise that resolves to void | void[] for files, a promise that resolves to Writable[] | Writable for streams (which keeps output streams' references).
An object input with one or more following options is acceptable to streamEdit:
Options for replacement
| search | match | string | RegExp | ✔ | none |
| replacement | x | string | [async] (wholeMatch, ...args) => string | ✔ | none |
| limit | x | number | ✔ | Infinity |
| maxTimes | x | number | ✔ | Infinity |
| minTimes | x | number | ✔ | 0 |
| required | x | boolean | ✔ | false |
| isFullReplacement | x | boolean | ✔ | false |
| disablePlaceholders | x | boolean | ✔ | false |
| replace | x | an Array of { search, replacement } | ✔ | none |
| defaultOptions | x | BasicReplaceOptions | ✔ | {} |
| join | x | string | (part: string) => string | null | ✔ | part => part |
| postProcessing | x | (part: string, isLastPart: boolean) => any | ✔ | none |
| beforeCompletion | x | `() => promise | void` | ✔ |
type GlobalLimit = number;
type LocalLimit = number;
interface BasicReplaceOptions {
isFullReplacement?: Boolean;
disablePlaceholders?: Boolean;
limit?: LocalLimit;
maxTimes?: number;
minTimes?: number;
required?: boolean;
}
interface SearchAndReplaceOptions extends BasicReplaceOptions {
search?: string | RegExp;
replacement?: string | ((wholeMatch: string, ...args: string[]) => string);
}
interface MultipleReplacementOptions {
limit?: GlobalLimit;
replace?: Array<SearchAndReplaceOptions | MatchAndReplaceOptions>;
defaultOptions?: BasicReplaceOptions;
}
type ReplaceOptions = MultipleReplacementOptions `OR` SearchAndReplaceOptions;
interface BasicOptions extends ReplaceOptions {
join?: string | ((part: string) => string) | null;
postProcessing?: (part: string, isLastPart: boolean) => any;
beforeCompletion?: () => Promise<void> | void;
}
Options for stream transform
| separator | x | string | RegExp | null | ✔ | /(?<=\r?\n)/ |
| encoding | x | string | null | ✔ | null |
| decodeBuffers | x | string | ✔ | "utf8" |
| truncate | x | boolean | ✔ | false |
| maxLength | x | number | ✔ | Infinity |
| readableObjectMode | x | boolean | ✔ | false |
| abortController | x | AbortController | ✔ | null |
Options that are only available under certain context:
| readStart | x | number | file[s] | 0 |
| writeStart | x | number | file[s] | 0 |
| contentJoin | x | string | Buffer | readableStreams | "" |
interface BasicOptions extends ReplaceOptions {
separator?: string | RegExp | null;
encoding?: BufferEncoding | null;
decodeBuffers?: string;
truncate?: Boolean;
maxLength?: number;
readableObjectMode?: boolean;
abortController?: AbortController;
}
interface UpdateFileOptions extends BasicOptions {
file: string;
readStart?: number;
writeStart?: number;
}
interface UpdateFilesOptions extends BasicOptions {
files: string[];
readStart?: number;
writeStart?: number;
}
interface MultipleReadablesToWritableOptions<T> extends BasicOptions {
from: Array<Readable>;
to: T;
contentJoin: string | Buffer;
}
interface MultipleReadablesToWritableOptionsAlias<T> extends BasicOptions {
readableStreams: Array<Readable>;
writableStream: T;
contentJoin: string | Buffer;
}
Options for stream input/output
| file | x | string | self | none |
| files | x | an Array of string | self | none |
| readableStream | from | Readable | writableStream[s] | none |
| writableStream | to | Writable | readableStream[s] | none |
| readableStreams | from | an Array of Readable | writableStream | none |
| writableStreams | to | an Array of Writable | readableStream | none |
file:
interface UpdateFileOptions extends BasicOptions {
file: string;
readStart?: number;
writeStart?: number;
}
function streamEdit(options: UpdateFileOptions): Promise<void>;
files:
interface UpdateFilesOptions extends BasicOptions {
files: string[];
readStart?: number;
writeStart?: number;
}
function streamEdit(options: UpdateFilesOptions): Promise<void[]>;
transform Readable:
interface TransformReadableOptions<T> extends BasicOptions {
[ from | readableStream ]: Readable;
[ to | writableStream ]: T;
}
function streamEdit<T extends Writable>(
options: TransformReadableOptions<T>
): Promise<T>;
readables -> writable:
interface MultipleReadablesToWritableOptions<T> extends BasicOptions {
[ from | readableStreams ]: Array<Readable>;
[ to | writableStream ]: T;
contentJoin: string | Buffer;
}
function streamEdit<T extends Writable>(
options: MultipleReadablesToWritableOptions<T>
): Promise< T >;
readable -> writables
interface ReadableToMultipleWritablesOptions<T> extends BasicOptions {
[ from | readableStream ]: Readable;
[ to | writableStreams ]: Array<T>;
}
function streamEdit<T extends Writable>(
options: ReadableToMultipleWritablesOptions<T>
): Promise< T[]>;
For further reading, take a look at the declaration file.
Examples
See ./examples and esm2cjs