๐Ÿš€ DAY 5 OF LAUNCH WEEK: Introducing Socket Firewall Enterprise.Learn more โ†’
Socket
Book a DemoInstallSign in
Socket

@ahoo-wang/fetcher-eventstream

Package Overview
Dependencies
Maintainers
1
Versions
204
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@ahoo-wang/fetcher-eventstream

Server-Sent Events (SSE) support for Fetcher HTTP client with native LLM streaming API support. Enables real-time data streaming and token-by-token LLM response handling.

Source
npmnpm
Version
2.16.0
Version published
Weekly downloads
2K
-38.29%
Maintainers
1
Weekly downloads
ย 
Created
Source

@ahoo-wang/fetcher-eventstream

npm version Build Status codecov License npm downloads npm bundle size Ask DeepWiki Storybook

Power your real-time applications with Server-Sent Events support, specially designed for Large Language Model streaming APIs.

๐ŸŒŸ Features

  • ๐Ÿ“ก Event Stream Conversion: Converts text/event-stream responses to async generators of ServerSentEvent objects
  • ๐Ÿ”Œ Interceptor Integration: Automatically adds eventStream() and jsonEventStream() methods to responses with text/event-stream content type
  • ๐Ÿ“‹ SSE Parsing: Parses Server-Sent Events according to the specification, including data, event, id, and retry fields
  • ๐Ÿ”„ Streaming Support: Handles chunked data and multi-line events correctly
  • ๐Ÿ’ฌ Comment Handling: Properly ignores comment lines (lines starting with :) as per SSE specification
  • ๐Ÿ›ก๏ธ TypeScript Support: Complete TypeScript type definitions
  • โšก Performance Optimized: Efficient parsing and streaming for high-performance applications
  • ๐Ÿค– LLM Streaming Ready: Native support for streaming responses from popular LLM APIs like OpenAI GPT, Claude, etc.
  • ๐Ÿ”š Stream Termination: Automatic stream termination detection for clean resource management and completion handling

๐Ÿš€ Quick Start

Installation

# Using npm
npm install @ahoo-wang/fetcher-eventstream

# Using pnpm
pnpm add @ahoo-wang/fetcher-eventstream

# Using yarn
yarn add @ahoo-wang/fetcher-eventstream

Module Import

To use the event stream functionality, you need to import the module for its side effects:

import '@ahoo-wang/fetcher-eventstream';

This import automatically extends the Response interface with methods for handling Server-Sent Events streams:

  • eventStream() - Converts a Response with text/event-stream content type to a ServerSentEventStream
  • jsonEventStream<DATA>() - Converts a Response with text/event-stream content type to a JsonServerSentEventStream<DATA>
  • isEventStream getter - Checks if the Response has a text/event-stream content type
  • requiredEventStream() - Gets a ServerSentEventStream, throwing an error if not available
  • requiredJsonEventStream<DATA>() - Gets a JsonServerSentEventStream<DATA>, throwing an error if not available

This is a common pattern in JavaScript/TypeScript for extending existing types with additional functionality without modifying the original type definitions.

Integration Test Example: LLM Client with Event Stream

The following example shows how to create an LLM client with event stream support, similar to the integration test in the Fetcher project. You can find the complete implementation in integration-test/src/eventstream/llmClient.ts.

This example demonstrates how to interact with popular LLM APIs like OpenAI's GPT models using Fetcher's streaming capabilities.

import {
  BaseURLCapable,
  ContentTypeValues,
  FetchExchange,
  NamedFetcher,
  REQUEST_BODY_INTERCEPTOR_ORDER,
  RequestInterceptor,
} from '@ahoo-wang/fetcher';
import {
  api,
  autoGeneratedError,
  body,
  post,
  ResultExtractors,
} from '@ahoo-wang/fetcher-decorator';
import '@ahoo-wang/fetcher-eventstream';
import { JsonServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';
import { ChatRequest, ChatResponse } from './types';

export const llmFetcherName = 'llm';

export interface LlmOptions extends BaseURLCapable {
  apiKey: string;
  model?: string;
}

export class LlmRequestInterceptor implements RequestInterceptor {
  readonly name: string = 'LlmRequestInterceptor';
  readonly order: number = REQUEST_BODY_INTERCEPTOR_ORDER - 1;

  constructor(private llmOptions: LlmOptions) {}

  intercept(exchange: FetchExchange): void {
    const chatRequest = exchange.request.body as ChatRequest;
    if (!chatRequest.model) {
      chatRequest.model = this.llmOptions.model;
    }
  }
}

export function createLlmFetcher(options: LlmOptions): NamedFetcher {
  const llmFetcher = new NamedFetcher(llmFetcherName, {
    baseURL: options.baseURL,
    headers: {
      Authorization: `Bearer ${options.apiKey}`,
      'Content-Type': ContentTypeValues.APPLICATION_JSON,
    },
  });
  llmFetcher.interceptors.request.use(new LlmRequestInterceptor(options));
  return llmFetcher;
}

@api('/chat', {
  fetcher: llmFetcherName,
  resultExtractor: ResultExtractors.JsonEventStream,
})
export class LlmClient {
  @post('/completions')
  streamChat(
    @body() body: ChatRequest,
  ): Promise<JsonServerSentEventStream<ChatResponse>> {
    throw autoGeneratedError(body);
  }

  @post('/completions', { resultExtractor: ResultExtractors.Json })
  chat(@body() body: ChatRequest): Promise<ChatResponse> {
    throw autoGeneratedError(body);
  }
}

Using streamChat for Real-time Responses

Here's how to use the streamChat method to get real-time responses from an LLM API:

import { createLlmFetcher, LlmClient } from './llmClient';

// Initialize the LLM client with your API configuration
const llmFetcher = createLlmFetcher({
  baseURL: 'https://api.openai.com/v1', // Example for OpenAI
  apiKey: process.env.OPENAI_API_KEY || 'your-api-key',
  model: 'gpt-3.5-turbo', // Default model
});

// Create the client instance
const llmClient = new LlmClient();

// Example: Stream a chat completion response in real-time
async function streamChatExample() {
  try {
    // Stream the response token by token
    const stream = await llmClient.streamChat({
      messages: [
        { role: 'system', content: 'You are a helpful assistant.' },
        { role: 'user', content: 'Explain quantum computing in simple terms.' },
      ],
      model: 'gpt-3.5-turbo', // Override default model if needed
      stream: true, // Enable streaming
    });

    // Process the streamed response
    let fullResponse = '';
    for await (const event of stream) {
      // Each event contains a partial response
      if (event.data) {
        const chunk = event.data;
        const content = chunk.choices[0]?.delta?.content || '';
        fullResponse += content;
        console.log('New token:', content);

        // Update UI in real-time as tokens arrive
        updateUI(content);
      }
    }

    console.log('Full response:', fullResponse);
  } catch (error) {
    console.error('Error streaming chat:', error);
  }
}

// Helper function to simulate UI updates
function updateUI(content: string) {
  // In a real application, this would update your UI
  process.stdout.write(content);
}

Manual Conversion

import { toServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';

// Convert a Response object manually
const response = await fetch('/events');
const eventStream = toServerSentEventStream(response);

// Read events from the stream
const reader = eventStream.getReader();
try {
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    console.log('Received event:', value);
  }
} finally {
  reader.releaseLock();
}

Basic Usage

import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';

const fetcher = new Fetcher({
  baseURL: 'https://api.example.com',
});

// In responses with text/event-stream content type,
// Response objects will automatically have eventStream() and jsonEventStream() methods
const response = await fetcher.get('/events');
for await (const event of response.requiredEventStream()) {
  console.log('Received event:', event);
}

// Using jsonEventStream for JSON data
const jsonResponse = await fetcher.get('/json-events');
for await (const event of response.requiredJsonEventStream<MyDataType>()) {
  console.log('Received JSON event:', event.data);
}

Advanced Usage with Termination Detection

import { Fetcher } from '@ahoo-wang/fetcher';
import {
  toJsonServerSentEventStream,
  type TerminateDetector,
} from '@ahoo-wang/fetcher-eventstream';

const fetcher = new Fetcher({
  baseURL: 'https://api.openai.com/v1',
});

// Define termination detector for OpenAI-style completion
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';

// Get raw event stream
const response = await fetcher.post('/chat/completions', {
  body: {
    model: 'gpt-3.5-turbo',
    messages: [{ role: 'user', content: 'Hello!' }],
    stream: true,
  },
});

// Convert to typed JSON stream with automatic termination
const jsonStream = toJsonServerSentEventStream<ChatCompletionChunk>(
  response.requiredEventStream(),
  terminateOnDone,
);

// Process streaming response with automatic termination
for await (const event of jsonStream) {
  const content = event.data.choices[0]?.delta?.content;
  if (content) {
    console.log('Token:', content);
    // Stream automatically terminates when '[DONE]' is received
  }
}

Manual Conversion

import { toServerSentEventStream } from '@ahoo-wang/fetcher-eventstream';

// Convert a Response object manually
const response = await fetch('/events');
const eventStream = toServerSentEventStream(response);

// Read events from the stream
const reader = eventStream.getReader();
try {
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    console.log('Received event:', value);
  }
} finally {
  reader.releaseLock();
}

๐Ÿ“š API Reference

Module Import

To use the event stream functionality, you need to import the module for its side effects:

import '@ahoo-wang/fetcher-eventstream';

This import automatically extends the global Response interface with methods for handling Server-Sent Events streams:

  • eventStream() - Converts a Response with text/event-stream content type to a ServerSentEventStream
  • jsonEventStream<DATA>() - Converts a Response with text/event-stream content type to a JsonServerSentEventStream<DATA>
  • isEventStream getter - Checks if the Response has a text/event-stream content type
  • requiredEventStream() - Gets a ServerSentEventStream, throwing an error if not available
  • requiredJsonEventStream<DATA>() - Gets a JsonServerSentEventStream<DATA>, throwing an error if not available

This is a common pattern in JavaScript/TypeScript for extending existing types with additional functionality without modifying the original type definitions.

In integration tests and real applications, this import is essential for working with event streams. For example:

import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';

const fetcher = new Fetcher({
  baseURL: 'https://api.example.com',
});

// Response objects will automatically have eventStream() and jsonEventStream() methods
const response = await fetcher.get('/events');
// Handle event stream
for await (const event of response.requiredEventStream()) {
  console.log('Received event:', event);
}

toJsonServerSentEventStream

Converts a ServerSentEventStream to a JsonServerSentEventStream<DATA> for handling Server-Sent Events with JSON data. Optionally supports stream termination detection for automatic stream closure.

Signature

function toJsonServerSentEventStream<DATA>(
  serverSentEventStream: ServerSentEventStream,
  terminateDetector?: TerminateDetector,
): JsonServerSentEventStream<DATA>;

Parameters

  • serverSentEventStream: The ServerSentEventStream to convert
  • terminateDetector: Optional function to detect when the stream should be terminated. When provided, the stream will automatically close when the detector returns true for an event.

Returns

  • JsonServerSentEventStream<DATA>: A readable stream of JsonServerSentEvent<DATA> objects

Examples

// Basic usage without termination detection
const jsonStream = toJsonServerSentEventStream<MyData>(serverSentEventStream);

// With termination detection for OpenAI-style completion
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';
const terminatingStream = toJsonServerSentEventStream<MyData>(
  serverSentEventStream,
  terminateOnDone,
);

// Custom termination logic
const terminateOnError: TerminateDetector = event => {
  return event.event === 'error' || event.data.includes('ERROR');
};
const errorHandlingStream = toJsonServerSentEventStream<MyData>(
  serverSentEventStream,
  terminateOnError,
);

JsonServerSentEvent

Interface defining the structure of a Server-Sent Event with JSON data.

interface JsonServerSentEvent<DATA> extends Omit<ServerSentEvent, 'data'> {
  data: DATA; // The event data parsed as JSON
}

JsonServerSentEventStream

Type alias for a readable stream of JsonServerSentEvent<DATA> objects.

type JsonServerSentEventStream<DATA> = ReadableStream<
  JsonServerSentEvent<DATA>
>;

TerminateDetector

A function type for detecting when a Server-Sent Event stream should be terminated. This is commonly used with LLM APIs that send a special termination event to signal the end of a response stream.

Signature

type TerminateDetector = (event: ServerSentEvent) => boolean;

Parameters

  • event: The current ServerSentEvent being processed

Returns

  • boolean: true if the stream should be terminated, false otherwise

Examples

// OpenAI-style termination (common pattern)
const terminateOnDone: TerminateDetector = event => event.data === '[DONE]';

// Event-based termination
const terminateOnComplete: TerminateDetector = event => event.event === 'done';

// Custom termination with multiple conditions
const terminateOnFinish: TerminateDetector = event => {
  return (
    event.event === 'done' ||
    event.event === 'error' ||
    event.data === '[DONE]' ||
    event.data.includes('TERMINATE')
  );
};

// Usage with toJsonServerSentEventStream
const stream = toJsonServerSentEventStream<MyData>(
  serverSentEventStream,
  terminateOnDone,
);

Common Use Cases

  • LLM Streaming: Detect completion markers like [DONE] from OpenAI, Claude, or other LLM APIs
  • Error Handling: Terminate streams when error events are received
  • Custom Protocols: Implement application-specific termination logic
  • Resource Management: Automatically close streams when certain conditions are met

toServerSentEventStream

Converts a Response object with a text/event-stream body to a ServerSentEventStream.

Signature

function toServerSentEventStream(response: Response): ServerSentEventStream;

Parameters

  • response: An HTTP response with text/event-stream content type

Returns

  • ServerSentEventStream: A readable stream of ServerSentEvent objects

ServerSentEvent

Interface defining the structure of a Server-Sent Event.

interface ServerSentEvent {
  data: string; // The event data (required)
  event?: string; // The event type (optional, defaults to 'message')
  id?: string; // The event ID (optional)
  retry?: number; // The reconnection time in milliseconds (optional)
}

ServerSentEventStream

Type alias for a readable stream of ServerSentEvent objects.

type ServerSentEventStream = ReadableStream<ServerSentEvent>;

๐Ÿ› ๏ธ Examples

Real-time Notifications

import { Fetcher } from '@ahoo-wang/fetcher';
import '@ahoo-wang/fetcher-eventstream';

const fetcher = new Fetcher({
  baseURL: 'https://api.example.com',
});

// Listen for real-time notifications
const response = await fetcher.get('/notifications');
for await (const event of response.requiredEventStream()) {
  switch (event.event) {
    case 'message':
      showNotification('Message', event.data);
      break;
    case 'alert':
      showAlert('Alert', event.data);
      break;
    case 'update':
      handleUpdate(JSON.parse(event.data));
      break;
    default:
      console.log('Unknown event:', event);
  }
}

Progress Updates

import { Fetcher } from '@ahoo-wang/fetcher';

const fetcher = new Fetcher({
  baseURL: 'https://api.example.com',
});

// Track long-running task progress
const response = await fetcher.get('/tasks/123/progress');
for await (const event of response.requiredEventStream()) {
  if (event.event === 'progress') {
    const progress = JSON.parse(event.data);
    updateProgressBar(progress.percentage);
  } else if (event.event === 'complete') {
    showCompletionMessage(event.data);
    break;
  }
}

Chat Application

import { Fetcher } from '@ahoo-wang/fetcher';

const fetcher = new Fetcher({
  baseURL: 'https://chat-api.example.com',
});

// Real-time chat messages
const response = await fetcher.get('/rooms/123/messages');
for await (const event of response.requiredEventStream()) {
  if (event.event === 'message') {
    const message = JSON.parse(event.data);
    displayMessage(message);
  } else if (event.event === 'user-joined') {
    showUserJoined(event.data);
  } else if (event.event === 'user-left') {
    showUserLeft(event.data);
  }
}

๐Ÿงช Testing

# Run tests
pnpm test

# Run tests with coverage
pnpm test --coverage

The test suite includes:

  • Event stream conversion tests
  • Interceptor functionality tests
  • Edge case handling (malformed events, chunked data, etc.)
  • Performance tests for large event streams

๐Ÿ“‹ Server-Sent Events Specification Compliance

This package fully implements the Server-Sent Events specification:

  • Data field: Supports multi-line data fields
  • Event field: Custom event types
  • ID field: Last event ID tracking
  • Retry field: Automatic reconnection timeout
  • Comment lines: Lines starting with : are ignored
  • Event dispatching: Proper event dispatching with default event type 'message'

๐Ÿค Contributing

Contributions are welcome! Please see the contributing guide for more details.

๐Ÿ“„ License

This project is licensed under the Apache-2.0 License.

Part of the Fetcher ecosystem

Keywords

fetch

FAQs

Package last updated on 30 Oct 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts