ai.matey.utils
Shared utility functions for streaming, validation, and conversions.
Part of the ai.matey monorepo.
Installation
npm install ai.matey.utils
Stream Utilities
Comprehensive utilities for working with IR chat streams.
Basic Stream Operations
import {
collectStream,
collectStreamFull,
streamToText,
streamToResponse,
} from 'ai.matey.utils';
const chunks = await collectStream(stream);
const result = await collectStreamFull(stream);
console.log(result.content);
console.log(result.message);
console.log(result.finishReason);
console.log(result.usage);
const text = await streamToText(stream);
Processing Streams
import { processStream, streamToLines, streamToTextIterator } from 'ai.matey.utils';
const result = await processStream(stream, {
onStart: (requestId) => console.log('Started:', requestId),
onContent: (delta, accumulated) => process.stdout.write(delta),
onDone: (result) => console.log('\nTokens:', result.usage?.totalTokens),
onError: (error) => console.error('Error:', error),
});
for await (const text of streamToTextIterator(stream)) {
process.stdout.write(text);
}
for await (const line of streamToLines(stream)) {
console.log('Line:', line);
}
Transforming Streams
import {
transformStream,
filterStream,
mapStream,
tapStream,
} from 'ai.matey.utils';
const transformed = transformStream(stream, (chunk) => ({
...chunk,
delta: chunk.type === 'content' ? chunk.delta.toUpperCase() : chunk.delta,
}));
const contentOnly = filterStream(stream, (chunk) => chunk.type === 'content');
const textOnly = mapStream(stream, (chunk) =>
chunk.type === 'content' ? chunk.delta : ''
);
const logged = tapStream(stream, (chunk) => console.log('Chunk:', chunk.type));
Stream Control
import {
throttleStream,
rateLimitStream,
teeStream,
splitStream,
} from 'ai.matey.utils';
for await (const chunk of throttleStream(stream, 50)) {
updateUI(chunk);
}
const limited = rateLimitStream(stream, 10);
const [stream1, stream2] = teeStream(stream, 2);
await Promise.all([
processStream(stream1, { onContent: (d) => logger.log(d) }),
processStream(stream2, { onContent: (d) => ui.append(d) }),
]);
Stream Validation
import { validateStream, validateChunkSequence } from 'ai.matey.utils';
const validated = validateStream(stream, {
requireStart: true,
requireDone: true,
requireContent: true,
});
const chunks = await collectStream(stream);
const isValid = validateChunkSequence(chunks);
Error Handling
import { catchStreamErrors, streamWithTimeout, createStreamError } from 'ai.matey.utils';
const safe = catchStreamErrors(stream, (error) => {
console.error('Stream error:', error);
});
const withTimeout = streamWithTimeout(stream, 30000);
const errorChunk = createStreamError(new Error('Something went wrong'));
Stream Accumulator
Build responses incrementally from chunks:
import {
createStreamAccumulator,
accumulateChunk,
accumulatorToMessage,
accumulatorToResponse,
} from 'ai.matey.utils';
const accumulator = createStreamAccumulator();
for await (const chunk of stream) {
accumulateChunk(accumulator, chunk);
console.log('Current text:', accumulator.content);
}
const message = accumulatorToMessage(accumulator);
const response = accumulatorToResponse(accumulator);
Content Utilities
import { getContentDeltas, isContentChunk, isDoneChunk, isErrorChunk } from 'ai.matey.utils';
for await (const delta of getContentDeltas(stream)) {
process.stdout.write(delta);
}
for await (const chunk of stream) {
if (isContentChunk(chunk)) {
console.log('Content:', chunk.delta);
} else if (isDoneChunk(chunk)) {
console.log('Done:', chunk.finishReason);
} else if (isErrorChunk(chunk)) {
console.error('Error:', chunk.error);
}
}
Web Stream Conversion
import {
asyncGeneratorToReadableStream,
readableStreamToAsyncGenerator,
} from 'ai.matey.utils';
const readable = asyncGeneratorToReadableStream(stream);
const generator = readableStreamToAsyncGenerator(readable);
Types
import type {
CollectedStream,
ProcessStreamOptions,
StreamValidationOptions,
} from 'ai.matey.utils';
API Reference
Collection Functions
collectStream(stream) | Collect chunks into array |
collectStreamFull(stream) | Collect with rich metadata |
streamToText(stream) | Get accumulated text |
streamToResponse(stream) | Convert to IR response |
Processing Functions
processStream(stream, options) | Process with callbacks |
streamToLines(stream) | Yield complete lines |
streamToTextIterator(stream) | Yield text deltas |
getContentDeltas(stream) | Yield only content |
Transform Functions
transformStream(stream, fn) | Transform each chunk |
filterStream(stream, predicate) | Filter chunks |
mapStream(stream, fn) | Map chunks to new values |
tapStream(stream, fn) | Side effects without modification |
Control Functions
throttleStream(stream, ms) | Batch chunks by time interval |
rateLimitStream(stream, rate) | Limit chunks per second |
teeStream(stream, count) | Split into multiple streams |
splitStream(stream, count) | Split into multiple streams |
Validation Functions
validateStream(stream, options) | Validate stream structure |
validateChunkSequence(chunks) | Check sequence validity |
Error Functions
catchStreamErrors(stream, handler) | Handle errors gracefully |
streamWithTimeout(stream, ms) | Add timeout |
createStreamError(error) | Create error chunk |
License
MIT - see LICENSE for details.