
Security News
Axios Supply Chain Attack Reaches OpenAI macOS Signing Pipeline, Forces Certificate Rotation
OpenAI rotated macOS signing certificates after a malicious Axios package reached its CI pipeline in a broader software supply chain attack.
async-stream-generator
Advanced tools
Pipe ES6 Async Generators through Node.js Streams.
streamify is a function that takes an async generator function and when invoked, returns a Readable Stream.
const fs = require("fs");
const streamify = require("async-stream-generator");
async function* generator(stream) {
for await (const chunk of stream) {
yield chunk;
}
}
const main = () => {
const readStream = fs.createReadStream("path-to-data.json");
streamify(generator(readStream)).pipe(process.stdout);
};
main();
I/O in node is asynchronous. The early days of Node.js required interacting with the disk and network by passing callbacks to functions.
For example, here is code that serves up a file from disk:
const http = require("http");
const fs = require("fs");
const server = http.createServer((request, response) => {
fs.readFile(__dirname + "/mock-data.json", (error, data) => {
response.end(data);
});
});
server.listen(8000);
This code works but it buffers up the entire file into memory for every request before writing the result back to clients. If the file is very large, your program could start eating a lot of memory as it serves lots of users concurrently, particularly for users on slow connections.
The user experience is poor too because users will need to wait for the whole file to be buffered into memory on your server before they can start receiving any content.
However, both request and response are streams.
const http = require("http");
const fs = require("fs");
const server = http.createServer((req, res) => {
const stream = fs.createReadStream(__dirname + "/mock-data.json");
stream.pipe(res);
});
server.listen(8000);
This is where Node.js shines. .pipe() will write to clients one chunk at at a time immediately as they are received from disk.
Using .pipe() has other benefits too, like handling backpressure automatically so that node won't buffer chunks into memory needlessly when the remote client is on a really slow or high-latency connection.
This is very much like what you might do on the command-line to pipe programs together except in node instead of the shell!
a | b | c | d
Once you learn the stream api, you can just snap together streaming modules like lego bricks instead of having to remember how to push data through non-streaming, custom APIs.
Streams make programming in node simple, elegant, and composable.
Previously to read the contents of a stream asynchronously, you used callbacks:
const fs = require("fs");
const main = inputFilePath => {
const readStream = fs.createReadStream(inputFilePath, {
encoding: "utf8",
highWaterMark: 256
});
readStream.on("data", chunk => {
console.log(">>> " + chunk);
console.log("\n");
});
readStream.on("end", () => {
console.log("### DONE ###");
});
};
main("./mock-data.json");
As of Node.js v10, you can use asynchronous iteration to read the stream of a file, which enables the for-await-of syntax:
const fs = require("fs");
const main = async inputFilePath => {
const readStream = fs.createReadStream(inputFilePath, {
encoding: "utf8",
highWaterMark: 256
});
for await (const chunk of readStream) {
console.log(">>> " + chunk);
console.log("\n");
}
console.log("### DONE ###");
};
main("./mock-data.json");
Output for both:
...
>>> ld":"Indonesia","customer_title":"Honorable"}
{"guid":"bf62800e-b3b1-46f2-a3f2-dc17c66c90a1","car_make":"Ford","car_model":"Bronco II","car_model_year":1986,"car_color":"Pink","car_country_cold":"Philippines","customer_title":"Rev"}
{"guid":"32a2f79b-5a0b-
>>> 4072-9ebb-0e3600d0f714","car_make":"Toyota","car_model":"RAV4","car_model_year":2001,"car_color":"Purple","car_country_cold":"China","customer_title":"Mr"}
{"guid":"6d52f031-c7e7-4167-81bc-e2879d6630e2","car_make":"Lexus","car_model":"SC","car_model_year":
>>> 1998,"car_color":"Teal","car_country_cold":"Russia","customer_title":"Rev"}
### DONE ###
You can use async generators to process input similiar to Unix piping. Generator functions use the async and function* keywords, consume an async iterator and use yield instead of return.
Example of Generator #1, which will process our chunks of data into lines:
async function* chunksToLines(chunks) {
let previous = "";
for await (const chunk of chunks) {
previous += chunk;
let eolIndex;
while ((eolIndex = previous.indexOf("\n")) >= 0) {
// this line includes the EOL
const line = previous.slice(0, eolIndex + 1);
yield line;
previous = previous.slice(eolIndex + 1);
}
}
if (previous.length > 0) {
yield previous;
}
}
Example of Generator #2, which will number each line
async function* numberOfLines(lines) {
let counter = 1;
for await (const line of lines) {
yield counter + ": " + line;
counter++;
}
}
Now you can snap these generators together using function composition to stream the file to the console line by line.
The whole program will read in the file 256 bytes at a time (defined by highWaterMark). Break each chunk into lines, number them, print them, and repeat.
const printAsyncIterable = async numberedLines => {
for await (const line of numberedLines) {
console.log(line);
}
};
const main = () => {
const readStream = fs.createReadStream("./mock-data.json", {
encoding: "utf8",
highWaterMark: 256
});
printAsyncIterable(numberOfLines(chunksToLines(readStream)));
};
main();
Output
...
3999: {"guid":"32a2f79b-5a0b-4072-9ebb-0e3600d0f714","car_make":"Toyota","car_model":"RAV4","car_model_year":2001,"car_color":"Purple","car_country_cold":"China","customer_title":"Mr"}
4000: {"guid":"6d52f031-c7e7-4167-81bc-e2879d6630e2","car_make":"Lexus","car_model":"SC","car_model_year":1998,"car_color":"Teal","car_country_cold":"Russia","customer_title":"Rev"}
These new tools are great for reading streams, however, it's still not clear how to write() to another stream or create a processing pipeline with pipe().
This was discussed here.
Enter this module.
Using the same generators from above, we can pipe() the results to a writeable stream.
const http = require("http");
const fs = require("fs");
const streamify = require("async-stream-generator");
const server = http.createServer(async (req, res) => {
const readStream = fs.createReadStream("./mock-data.json", {
encoding: "utf8",
highWaterMark: 256
});
streamify(numberOfLines(chunksToLines(readStream))).pipe(res);
});
server.listen(8000);
All code can be found in the examples directory.
This was forked from @mimetnet's module stream-generators, which offers the same functionality to synchronous generators.
Early stream examples and a deeper dive into streams can be found at @substack's Stream Handbook.
Async Generator and Iterator examples from 2ality.com
Node.js Support for Symbol.asyncIterator.
FAQs
Pipe ES6 Async Generators through Node.js Streams
The npm package async-stream-generator receives a total of 567 weekly downloads. As such, async-stream-generator popularity was classified as not popular.
We found that async-stream-generator demonstrated a not healthy version release cadence and project activity because the last version was released a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
OpenAI rotated macOS signing certificates after a malicious Axios package reached its CI pipeline in a broader software supply chain attack.

Security News
Open source is under attack because of how much value it creates. It has been the foundation of every major software innovation for the last three decades. This is not the time to walk away from it.

Security News
Socket CEO Feross Aboukhadijeh breaks down how North Korea hijacked Axios and what it means for the future of software supply chain security.