AI Streaming Engine
A powerful abstraction layer for streaming text completions from multiple AI providers through a single unified toolkit.
Overview
@bnk/ai
(AI Streaming Engine) provides a flexible, plugin-based abstraction for streaming AI-generated text from multiple providers such as OpenAI, Anthropic, Ollama, etc. The goal is for your application to integrate once, then swap or combine providers at will without modifying your client-side streaming logic.
Key features:
- Unified streaming abstraction: use one engine in your client code to handle partial message streaming, done signals, and error handling.
- Plugin-based: each AI provider is implemented as a plugin that knows how to prepare the request and parse the stream.
- Simple SSE-based interface: your app interacts with the engine through a set of handlers (e.g.
onPartial
, onDone
). - Extensible: easily add new providers by implementing a
ProviderPlugin
.
How It Works
Architecture
-
Your application calls createSSEStream(params)
with:
userMessage
and optionally a systemMessage
- The desired plugin (provider)
- Options you want to pass to the provider (model, temperature, etc.)
- Handlers for partial chunks, done signal, errors, etc.
-
Inside createSSEStream
:
- The plugin’s
prepareRequest()
method is called, returning a readable stream or reader containing the SSE data from the AI provider. - The engine reads SSE lines, calls the plugin’s
parseServerSentEvent()
to extract text chunks, and forwards them to your handlers (onPartial
, onDone
, etc.). - The engine also returns a
ReadableStream<Uint8Array>
which you could pipe back to your clients if desired (for real-time text updates in a browser).
-
Plugins (e.g. OpenAiLikePlugin
, AnthropicPlugin
) each implement:
interface ProviderPlugin {
prepareRequest(params: SSEEngineParams): Promise<ReadableStream<Uint8Array> | ReadableStreamDefaultReader<Uint8Array>>;
parseServerSentEvent(line: string): string | null;
}
prepareRequest()
handles how to call the provider’s API and get back a streaming SSE.parseServerSentEvent()
extracts only the text from each SSE chunk, returning [DONE]
or null
when appropriate.
Typical Usage
-
Install the package (private or local reference as appropriate):
bun add @bnk/ai
npm install @bnk/ai
-
Import the engine and plugin(s) you want:
import { createSSEStream, OpenAiLikePlugin } from '@bnk/ai';
const plugin = new OpenAiLikePlugin(myOpenAiClient, 'gpt-4');
-
Create the stream and provide handlers:
const userMessage = "Explain the concept of neural networks.";
const handlers = {
onPartial: (chunk) => {
console.log("Partial chunk:", chunk.content);
},
onDone: (final) => {
console.log("All done:", final.content);
},
onError: (error) => {
console.error("Stream error:", error);
},
};
const stream = await createSSEStream({
userMessage,
plugin,
handlers,
options: { model: "gpt-4", temperature: 0.5 },
});
API
createSSEStream(params: SSEEngineParams): Promise<ReadableStream<Uint8Array>>
Creates a new SSE stream. The SSEEngineParams
object includes:
userMessage: string
– the user’s promptsystemMessage?: string
– an optional system-level instructionplugin: ProviderPlugin
– the provider plugin handling the requestoptions?: Record<string, any>
– any provider-specific options (model, temperature, etc.)handlers: SSEEngineHandlers
– callbacks for partial, done, error, etc.
Example usage:
await createSSEStream({
userMessage: "Hello world",
systemMessage: "You are a helpful assistant",
plugin: myPlugin,
options: { model: "myModel", temperature: 0.9 },
handlers: {
onPartial: (chunk) => {
console.log("Partial:", chunk.content);
},
onDone: (message) => {
console.log("Complete:", message.content);
},
},
});
Handlers
interface SSEEngineHandlers {
onSystemMessage?: (message: SSEMessage) => void;
onUserMessage?: (message: SSEMessage) => void;
onPartial?: (partial: SSEMessage) => void;
onDone?: (fullContent: SSEMessage) => void;
onError?: (error: unknown, partialSoFar: SSEMessage) => void;
}
onSystemMessage
: Invoked once if a system message is provided.onUserMessage
: Invoked for the user's message.onPartial
: Invoked for each streamed chunk of assistant text.onDone
: Final callback when the stream is complete or [DONE]
is encountered.onError
: If an error occurs, you get the error plus the text accumulated so far.
Providers & Plugins
Creating a Plugin
To add a new provider, implement:
import { ProviderPlugin } from "@bnk/ai";
export class MyProviderPlugin implements ProviderPlugin {
async prepareRequest(params: SSEEngineParams) {
}
parseServerSentEvent(line: string): string | null {
}
}
Then pass an instance to the engine via plugin: new MyProviderPlugin(...)
.
Example: OpenAiLikePlugin
- Calls OpenAI’s streaming Chat Completions API.
- Returns a stream of SSE lines that the engine processes.
Example: AnthropicPlugin
- Calls Anthropic’s API.
- Parses its SSE format to gather partial text.
Extending
You can build your own higher-level logic on top of this engine:
- Save partial responses to a database
- Stream updates to websockets or SSE endpoints
- Provide multiple fallback plugins (e.g., if one fails, switch to another)
Repository Structure
src/
– main code for the streaming engine, plugins, and typessrc/streaming-engine.ts
– core createSSEStream
functionsrc/streaming-types.ts
– shared SSE engine types (SSEEngineParams
, SSEMessage
, etc.)src/plugins/
– plugin implementations for different AI providerspackage.json
– package metadata
Getting Started
- Install dependencies:
bun install
- Build or run tests:
bun test
Contributing
Contributions are welcome! To add a new provider:
- Create a new plugin in
src/plugins/
. - Implement
prepareRequest()
and parseServerSentEvent()
. - Export it in
src/index.ts
.
License
MIT License (or appropriate license text here)