@ahoo-wang/fetcher-eventstream

Power your real-time applications with Server-Sent Events support, specially designed for Large Language Model streaming
APIs.
๐ Features
- ๐ก Event Stream Conversion: Converts
text/event-stream responses to async generators of ServerSentEvent objects
- ๐ Side-Effect Module Import: Automatically adds
eventStream() and jsonEventStream() methods to the global Response.prototype for responses with
text/event-stream content
type
- ๐ SSE Parsing: Parses Server-Sent Events according to the specification, including data, event, id, and retry
fields
- ๐ Streaming Support: Handles chunked data and multi-line events correctly
- ๐ฌ Comment Handling: Properly ignores comment lines (lines starting with
:) as per SSE specification
- ๐ก๏ธ TypeScript Support: Complete TypeScript type definitions
- โก Performance Optimized: Efficient parsing and streaming for high-performance applications
- ๐ค LLM Streaming Ready: Native support for streaming responses from popular LLM APIs like OpenAI GPT, Claude, etc.
- ๐ Stream Termination: Automatic stream termination detection for clean resource management and completion handling
๐ Quick Start
Installation
npm install @ahoo-wang/fetcher-eventstream
pnpm add @ahoo-wang/fetcher-eventstream
yarn add @ahoo-wang/fetcher-eventstream
Module Import
To use the event stream functionality, you need to import the module for its side effects:
import '@ahoo-wang/fetcher-eventstream';
This import automatically extends the Response interface with methods for handling Server-Sent Events streams:
eventStream() - Converts a Response with text/event-stream content type to a ServerSentEventStream
jsonEventStream<DATA>() - Converts a Response with text/event-stream content type to a
JsonServerSentEventStream<DATA>
isEventStream getter - Checks if the Response has a text/event-stream content type
requiredEventStream() - Gets a ServerSentEventStream, throwing an error if not available
requiredJsonEventStream<DATA>() - Gets a JsonServerSentEventStream<DATA>, throwing an error if not available
This is a common pattern in JavaScript/TypeScript for extending existing types with additional functionality without
modifying the original type definitions.
Integration Test Example: LLM Client with Event Stream
The following example shows how to create an LLM client with event stream support, similar to the integration test in
the Fetcher project. You can find the complete implementation
in integration-test/src/eventstream/llmClient.ts.
This example demonstrates how to interact with popular LLM APIs like OpenAI's GPT models using Fetcher's streaming
capabilities.
import {
BaseURLCapable,
ContentTypeValues,
FetchExchange,
NamedFetcher,
REQUEST_BODY_INTERCEPTOR_ORDER,
RequestInterceptor,
} from '@ahoo-wang/fetcher';
import {
api,
autoGeneratedError,
body,
post,
ResultExtractors,
} from '@ahoo-wang/fetcher-decorator';
import '@ahoo-wang/fetcher-eventstream';
import { JsonServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
import { ChatRequest, ChatResponse } from './types';
export const llmFetcherName = 'llm';
export interface LlmOptions extends BaseURLCapable {
apiKey: string;
model?: string;
}
export class LlmRequestInterceptor implements RequestInterceptor {
readonly name: string = 'LlmRequestInterceptor';
readonly order: number = REQUEST_BODY_INTERCEPTOR_ORDER - 1;
constructor(private llmOptions: LlmOptions) {}
intercept(exchange: FetchExchange): void {
const chatRequest = exchange.request.body as ChatRequest;
if (!chatRequest.model) {
chatRequest.model = this.llmOptions.model;
}
}
}
export function createLlmFetcher(options: LlmOptions): NamedFetcher {
const llmFetcher = new NamedFetcher(llmFetcherName, {
baseURL: options.baseURL,
headers: {
Authorization: `Bearer ${options.apiKey}`,
'Content-Type': ContentTypeValues.APPLICATION_JSON,
},
});
llmFetcher.interceptors.request.use(new LlmRequestInterceptor(options));
return llmFetcher;
}
@api('/chat', {
fetcher: llmFetcherName,
resultExtractor: ResultExtractors.JsonEventStream,
})
export class LlmClient {
@post('/completions')
streamChat(
@body() body: ChatRequest,
): Promise<JsonServerSentEventStream<ChatResponse>> {
throw autoGeneratedError(body);
}
@post('/completions', { resultExtractor: ResultExtractors.Json })
chat(@body() body: ChatRequest): Promise<ChatResponse> {
throw autoGeneratedError(body);
}
}
Using streamChat for Real-time Responses
Here's how to use the streamChat method to get real-time responses from an LLM API:
import { createLlmFetcher, LlmClient } from './llmClient';
const llmFetcher = createLlmFetcher({
baseURL: 'https://api.openai.com/v1',
apiKey: process.env.OPENAI_API_KEY || 'your-api-key',
model: 'gpt-3.5-turbo',
});
const llmClient = new LlmClient();
async function streamChatExample() {
try {
const stream = await llmClient.streamChat({
messages: [
{ role: 'system', content: 'You are a helpful assistant.' },
{ role: 'user', content: 'Explain quantum computing in simple terms.' },
],
model: 'gpt-3.5-turbo',
stream: true,
});
let fullResponse = '';
for await (const event of stream) {
if (event.data) {
const chunk = event.data;
const content = chunk.choices[0]?.delta?.content || '';
fullResponse += content;
console.log('New token:', content);
updateUI(content);
}
}
console.log('Full response:', fullResponse);
} catch (error) {
console.error('Error streaming chat:', error);
}
}
function updateUI(content: string) {
process.stdout.write(content);
}
Manual Conversion
import { toServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
const response = await fetch('/events');
const eventStream = toServerSentEventStream(response);
const reader = eventStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log('Received event:', value);
}
} finally {
reader.releaseLock();
}
Basic Usage
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
const response = await fetcher.get('/events');
for await (const event of response.requiredEventStream()) {
console.log('Received event:', event);
}
const jsonResponse = await fetcher.get('/json-events');
for await (const event of response.requiredJsonEventStream<MyDataType>()) {
console.log('Received JSON event:', event.data);
}
Advanced Usage with Termination Detection
import { Fetcher } from '@ahoo-wang/fetcher';
import {
toJsonServerSentEventStream,
type TerminateDetector,
} from '@ahoo-wang/fetcher-eventstream';
const fetcher = new Fetcher({
baseURL: 'https://api.openai.com/v1',
});
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';
const response = await fetcher.post('/chat/completions', {
body: {
model: 'gpt-3.5-turbo',
messages: [{ role: 'user', content: 'Hello!' }],
stream: true,
},
});
const jsonStream = toJsonServerSentEventStream<ChatCompletionChunk>(
response.requiredEventStream(),
terminateOnDone,
);
for await (const event of jsonStream) {
const content = event.data.choices[0]?.delta?.content;
if (content) {
console.log('Token:', content);
}
}
Manual Conversion
import { toServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
const response = await fetch('/events');
const eventStream = toServerSentEventStream(response);
const reader = eventStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
console.log('Received event:', value);
}
} finally {
reader.releaseLock();
}
๐ API Reference
Response Prototype Extension
Importing this module patches the global Response.prototype with the following properties:
contentType - The Content-Type header value
isEventStream - Boolean getter, true if Content-Type is text/event-stream
eventStream() - Returns ServerSentEventStream if available, undefined otherwise
requiredEventStream() - Returns ServerSentEventStream or throws if not available
jsonEventStream<DATA>() - Returns JsonServerSentEventStream<DATA> if available
requiredJsonEventStream<DATA>() - Returns JsonServerSentEventStream<DATA> or throws if not available
toJsonServerSentEventStream
Converts a ServerSentEventStream to a JsonServerSentEventStream<DATA> for handling Server-Sent Events with JSON
data. Optionally supports stream termination detection for automatic stream closure.
Signature
function toJsonServerSentEventStream<DATA>(
serverSentEventStream: ServerSentEventStream,
terminateDetector?: TerminateDetector,
): JsonServerSentEventStream<DATA>;
Parameters
serverSentEventStream: The ServerSentEventStream to convert
terminateDetector: Optional function to detect when the stream should be terminated. When provided, the stream will automatically close when the detector returns true for an event.
Returns
JsonServerSentEventStream<DATA>: A readable stream of JsonServerSentEvent<DATA> objects
Examples
const jsonStream = toJsonServerSentEventStream<MyData>(serverSentEventStream);
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';
const terminatingStream = toJsonServerSentEventStream<MyData>(
serverSentEventStream,
terminateOnDone,
);
const terminateOnError: TerminateDetector = event => {
return event.event === 'error' || event.data.includes('ERROR');
};
const errorHandlingStream = toJsonServerSentEventStream<MyData>(
serverSentEventStream,
terminateOnError,
);
JsonServerSentEvent
Interface defining the structure of a Server-Sent Event with JSON data.
interface JsonServerSentEvent<DATA> extends Omit<ServerSentEvent, 'data'> {
data: DATA;
}
JsonServerSentEventStream
Type alias for a readable stream of JsonServerSentEvent<DATA> objects.
type JsonServerSentEventStream<DATA> = ReadableStream<
JsonServerSentEvent<DATA>
>;
TerminateDetector
A function type for detecting when a Server-Sent Event stream should be terminated. This is commonly used with LLM APIs that send a special termination event to signal the end of a response stream.
Signature
type TerminateDetector = (event: ServerSentEvent) => boolean;
Parameters
event: The current ServerSentEvent being processed
Returns
boolean: true if the stream should be terminated, false otherwise
Examples
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';
const terminateOnComplete: TerminateDetector = event => event.event === 'done';
const terminateOnFinish: TerminateDetector = event => {
return (
event.event === 'done' ||
event.event === 'error' ||
event.data === '[DONE]' ||
event.data.includes('TERMINATE')
);
};
const stream = toJsonServerSentEventStream<MyData>(
serverSentEventStream,
terminateOnDone,
);
Common Use Cases
- LLM Streaming: Detect completion markers like
[DONE] from OpenAI, Claude, or other LLM APIs
- Error Handling: Terminate streams when error events are received
- Custom Protocols: Implement application-specific termination logic
- Resource Management: Automatically close streams when certain conditions are met
toServerSentEventStream
Converts a Response object with a text/event-stream body to a ServerSentEventStream.
Signature
function toServerSentEventStream(response: Response): ServerSentEventStream;
Parameters
response: An HTTP response with text/event-stream content type
Returns
ServerSentEventStream: A readable stream of ServerSentEvent objects
ServerSentEvent
Interface defining the structure of a Server-Sent Event.
interface ServerSentEvent {
data: string;
event?: string;
id?: string;
retry?: number;
}
ServerSentEventStream
Type alias for a readable stream of ServerSentEvent objects.
type ServerSentEventStream = ReadableStream<ServerSentEvent>;
๐ ๏ธ Examples
Real-time Notifications
import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
const response = await fetcher.get('/notifications');
for await (const event of response.requiredEventStream()) {
switch (event.event) {
case 'message':
showNotification('Message', event.data);
break;
case 'alert':
showAlert('Alert', event.data);
break;
case 'update':
handleUpdate(JSON.parse(event.data));
break;
default:
console.log('Unknown event:', event);
}
}
Progress Updates
import { Fetcher } from '@ahoo-wang/fetcher';
const fetcher = new Fetcher({
baseURL: 'https://api.example.com',
});
const response = await fetcher.get('/tasks/123/progress');
for await (const event of response.requiredEventStream()) {
if (event.event === 'progress') {
const progress = JSON.parse(event.data);
updateProgressBar(progress.percentage);
} else if (event.event === 'complete') {
showCompletionMessage(event.data);
break;
}
}
Chat Application
import { Fetcher } from '@ahoo-wang/fetcher';
const fetcher = new Fetcher({
baseURL: 'https://chat-api.example.com',
});
const response = await fetcher.get('/rooms/123/messages');
for await (const event of response.requiredEventStream()) {
if (event.event === 'message') {
const message = JSON.parse(event.data);
displayMessage(message);
} else if (event.event === 'user-joined') {
showUserJoined(event.data);
} else if (event.event === 'user-left') {
showUserLeft(event.data);
}
}
๐งช Testing
pnpm test
pnpm test --coverage
The test suite includes:
- Event stream conversion tests
- Interceptor functionality tests
- Edge case handling (malformed events, chunked data, etc.)
- Performance tests for large event streams
๐ Server-Sent Events Specification Compliance
This package fully implements
the Server-Sent Events specification:
- Data field: Supports multi-line data fields
- Event field: Custom event types
- ID field: Last event ID tracking
- Retry field: Automatic reconnection timeout
- Comment lines: Lines starting with
: are ignored
- Event dispatching: Proper event dispatching with default event type 'message'
๐ค Contributing
Contributions are welcome! Please see
the contributing guide for more details.
๐ License
This project is licensed under the Apache-2.0 License.
Part of the Fetcher ecosystem