yield-stream
A small library for switching between streams, generators, and arrays.
export const compose = <T>(
...generators: GeneratorFn<T>[]
): GeneratorFn<T> => {
return generators.reduce(
(prev, next) => async function* (data) {
for await (const chunk of prev(data)) {
yield* next(chunk);
}
},
);
};
export const pipeline = <T>(
stream: ReadableStream<T>,
...transforms: GeneratorFn<T>[]
): ReadableStream<T> => {
const composed = compose(...transforms);
return generateStream(
async function* () {
for await (const chunk of yieldStream(stream)) {
yield* composed(chunk);
}
}
);
};
export const yieldStream = async function* <T>(
stream: ReadableStream<T>,
controller?: AbortController
) {
const reader = stream.getReader();
while (true) {
if (controller?.signal.aborted) {
break;
}
const { done, value } = await reader.read();
if (done) {
break;
}
yield value;
}
};
export const generateStream = <T, TReturn, D>(
G: StreamGenerator<D, T, TReturn>,
data?: D
): ReadableStream<T> => {
return new ReadableStream<T>({
async start(controller) {
for await (const chunk of G(data)) {
controller.enqueue(chunk);
}
controller.close();
},
});
};
export const streamArray = <T>(array: T[]): ReadableStream<T> => {
return generateStream(function* () {
for (const item of array) {
yield item;
}
});
};
export const buffer = async function* <T>(stream: ReadableStream<T>) {
const buffer: T[] = [];
for await (const chunk of yieldStream(stream)) {
buffer.push(chunk);
yield buffer;
}
};