![NPM](https://img.shields.io/npm/l/unbounded-async-channel)
Documentation
The unbounded-async-channel library provides a simple unbounded channel that allows for concurrent reading and writing of values.
Writing to the channel is synchronous and non-blocking, while reading from the channel blocks until a value is written or the channel is closed.
If there are no readers when a value is written, the value is buffered and available for future readers.
Installation
npm i unbounded-async-channel
Basic Usage
import { createUnboundedAsyncChannel } from "unbounded-async-channel";
const channel = createUnboundedAsyncChannel<number>();
channel.write(1);
const res = await channel.read();
if (res.closed) {
if (res.error) {
console.log("Channel closed with error", res.error);
} else {
console.log("Channel closed without any error");
}
} else {
console.log(res.value);
}
Reading values using for await loop
UnboundedAsyncChannel can be iterated using for await loop.
import { createUnboundedAsyncChannel } from "unbounded-async-channel";
const channel = createUnboundedAsyncChannel<number>();
for (let i = 0; i < 10; i++) {
channel.write(i);
}
channel.close();
try {
for await (const value of channel) {
console.log(value);
}
console.log("channel closed without error");
} catch (err) {
console.log("channel closed with error");
}
Closing a channel with error
If a channel is closed with an error and there are no readers, a future reader can receive that error after consuming all the buffered values in the channel.
A for await loop will also first consume all buffered values before throwing an error. If the channel was closed without any error, a for await loop will cleanly end.
Clearing a channel after closing
A closed channel may contain buffered values.
To clear all buffered values consume all the messages
channel.close();
for await (const value of channel) {
}
Example: Concurrent processing of tasks
import { createUnboundedAsyncChannel } from "unbounded-async-channel";
const tasks = createUnboundedAsyncChannel<string>();
const taskResults = createUnboundedAsyncChannel<string>();
for (let i = 0; i < 1000; i++) {
tasks.write(`some_url_${i}`);
}
tasks.close();
const workerPromises = [];
for (let i = 0; i < 10; i++) {
workerPromises.push(
new Promise<void>(async (resolve) => {
try {
for await (const url of tasks) {
const result = `processed ${url}`;
taskResults.write(result);
}
} catch (e) {
console.log("channel closed with error", e);
}
resolve();
}),
);
}
Promise.all(workerPromises).then(() => {
taskResults.close();
});
for await (const result of taskResults) {
console.log(result);
}