Streamutils
A collection of simple stream implementations for building data pipelines.
Latest Update: v0.3
- refactor into monorepo
- add jq package
- update node-jq, remove patch
- add redis package
Usage
check out this example here: examples/example-1
Let's say you have some input tokens in this form:
token.txt
"Erfurt"
"Ansbach"
You then build your pipeline like this:
import { json, split, toString } from "@odms/streamutils";
import { Transform } from "node:stream";
import { pipeline } from "node:stream/promises";
pipeline(
process.stdin,
toString(),
split("\n"),
json.parse(),
new Transform({
objectMode: true,
transform: (city: string, _, callback) => {
callback(null, { city });
},
}),
json.toLines(),
process.stdout
);
Explanation:
- process.stdin – read from stdin
- toString – convert input Buffer into string
- split – split the input chunk into multiple chunks using newline
- json.parse – remove outer double quotes
- ... here comes your custom stream ...
- json.toLines – stringify into line-json
- process.stdout – write to stdout
Run pipeline:
cat token.txt | bun index.ts
This will produce this output:
{"city":"Erfurt"}
{"city":"Ansbach"}
Documentation
jq
Transform the stream using jq
.
import { jq } from "@odms/streamutils";
pipeline(process.stdin, jq(".filter[].items"), process.out);
Thanks to node-js. For filter syntax, refer to the jq manual.
Parameters
Proudly developing with Bun