zod-stream
> Type-safe structured extraction from LLM streams with progressive validation
zod-stream
adds structured output validation and streaming capabilities to LLM responses. Built on top of schema-stream
, it enables type-safe extraction with progressive validation.
Key Features
- 🔄 Stream structured LLM outputs with validation
- 🎯 Multiple response modes (TOOLS, FUNCTIONS, JSON, etc.)
- 📝 OpenAI client integration
- 🌳 Progressive validation with partial results
- ⚡ Built on schema-stream
- 🔍 Full TypeScript support
Why zod-stream?
zod-stream
solves key challenges in handling streaming LLM responses:
-
Dependency Management: Process data as soon as dependencies are met, rather than waiting for complete responses
if (isPathComplete(['user', 'preferences'], chunk)) {
initializeUserExperience(chunk.user.preferences);
}
-
Type-Safe LLM Integration: Full TypeScript support for structured outputs from OpenAI and other providers
const params = withResponseModel({
response_model: { schema, name: "Extract" },
mode: "TOOLS"
});
-
Progressive Processing: Built on schema-stream
for immediate access to partial results
for await (const chunk of stream) {
chunk._meta._completedPaths.forEach(path => {
processDependency(path, chunk);
});
}
-
Provider Flexibility: Consistent interface across different LLM response formats
const stream = OAIStream({ res: completion });
const stream = JSONStream({ res: completion });
Think of it as a type-safe pipeline for handling streaming LLM data where you need to:
- Start processing before the full response arrives
- Ensure type safety throughout the stream
- Handle complex data dependencies
- Work with multiple LLM response formats
Installation
npm install zod-stream zod openai
pnpm add zod-stream zod openai
bun add zod-stream zod openai
Core Concepts
The ZodStream
client provides real-time validation and metadata for streaming LLM responses:
import ZodStream from "zod-stream";
import { z } from "zod";
const client = new ZodStream({
debug: true
});
const schema = z.object({
content: z.string(),
metadata: z.object({
confidence: z.number(),
category: z.string()
})
});
const stream = await client.create({
completionPromise: async () => {
const response = await fetch("/api/extract", {
method: "POST",
body: JSON.stringify({ prompt: "..." })
});
return response.body;
},
response_model: {
schema,
name: "ContentExtraction"
}
});
for await (const chunk of stream) {
console.log({
data: chunk,
isValid: chunk._meta._isValid,
activePath: chunk._meta._activePath,
completedPaths: chunk._meta._completedPaths
});
}
Progressive Processing
zod-stream
enables processing dependent data as soon as relevant paths complete, without waiting for the full response:
const schema = z.object({
user: z.object({
id: z.string(),
preferences: z.object({
theme: z.string(),
language: z.string()
})
}),
content: z.object({
title: z.string(),
body: z.string(),
metadata: z.object({
keywords: z.array(z.string()),
category: z.string()
})
}),
recommendations: z.array(z.object({
id: z.string(),
score: z.number(),
reason: z.string()
}))
});
for await (const chunk of stream) {
if (isPathComplete(['user', 'preferences'], chunk)) {
applyUserTheme(chunk.user.preferences.theme);
setLanguage(chunk.user.preferences.language);
}
if (isPathComplete(['content', 'metadata', 'keywords'], chunk) &&
isPathComplete(['content', 'title'], chunk)) {
indexContent({
title: chunk.content.title,
keywords: chunk.content.metadata.keywords
});
}
chunk._meta._completedPaths.forEach(path => {
if (path[0] === 'recommendations' && path.length === 2) {
const index = path[1] as number;
const recommendation = chunk.recommendations[index];
if (recommendation?.id) {
prefetchContent(recommendation.id);
}
}
});
}
This approach enables:
- Early UI updates based on user preferences
- Parallel processing of independent data
- Optimistic loading of related content
- Better perceived performance
- Resource optimization
Stream Metadata
Every streamed chunk includes metadata about validation state:
type CompletionMeta = {
_isValid: boolean;
_activePath: (string | number)[];
_completedPaths: (string | number)[][];
}
{
content: "partial content...",
metadata: {
confidence: 0.95
},
_meta: {
_isValid: false,
_activePath: ["metadata", "category"],
_completedPaths: [
["content"],
["metadata", "confidence"]
]
}
}
Schema Stubs
Get typed stub objects for initialization:
const schema = z.object({
users: z.array(z.object({
name: z.string(),
age: z.number()
}))
});
const client = new ZodStream();
const stub = client.getSchemaStub({
schema,
defaultData: {
users: [{ name: "loading...", age: 0 }]
}
});
Debug Logging
Enable detailed logging for debugging:
const client = new ZodStream({ debug: true });
Using Response Models
The withResponseModel
helper configures OpenAI parameters based on your schema and chosen mode:
import { withResponseModel } from "zod-stream";
import { z } from "zod";
const schema = z.object({
sentiment: z.string(),
keywords: z.array(z.string()),
confidence: z.number()
});
const params = withResponseModel({
response_model: {
schema,
name: "Analysis",
description: "Extract sentiment and keywords"
},
mode: "TOOLS",
params: {
messages: [{ role: "user", content: "Analyze this text..." }],
model: "gpt-4"
}
});
const completion = await oai.chat.completions.create({
...params,
stream: true
});
Response Modes
zod-stream
supports multiple modes for structured LLM responses:
import { MODE } from "zod-stream";
const modes = {
FUNCTIONS: "FUNCTIONS",
TOOLS: "TOOLS",
JSON: "JSON",
MD_JSON: "MD_JSON",
JSON_SCHEMA: "JSON_SCHEMA",
THINKING_MD_JSON: "THINKING_MD_JSON"
} as const;
Mode-Specific Behaviors
TOOLS Mode
{
tool_choice: {
type: "function",
function: { name: "Analysis" }
},
tools: [{
type: "function",
function: {
name: "Analysis",
description: "Extract sentiment and keywords",
parameters: {}
}
}]
}
FUNCTIONS Mode (Legacy)
{
function_call: { name: "Analysis" },
functions: [{
name: "Analysis",
description: "Extract sentiment and keywords",
parameters: {}
}]
}
JSON Mode
{
response_format: { type: "json_object" },
messages: [
{
role: "system",
content: "Return JSON matching schema..."
},
]
}
Response Parsing
Built-in parsers handle different response formats:
import {
OAIResponseParser,
OAIResponseToolArgsParser,
OAIResponseFnArgsParser,
OAIResponseJSONParser,
thinkingJsonParser
} from "zod-stream";
const result = OAIResponseParser(response);
const toolArgs = OAIResponseToolArgsParser(response);
const fnArgs = OAIResponseFnArgsParser(response);
const jsonContent = OAIResponseJSONParser(response);
const thinkingJson = thinkingJsonParser(response);
Streaming Utilities
Handle streaming responses with built-in utilities:
import { OAIStream, readableStreamToAsyncGenerator } from "zod-stream";
app.post("/api/stream", async (req, res) => {
const completion = await oai.chat.completions.create({
...params,
stream: true
});
return new Response(
OAIStream({ res: completion })
);
});
const generator = readableStreamToAsyncGenerator(stream);
for await (const chunk of generator) {
console.log(chunk);
}
Path Tracking Utilities
Monitor completion status of specific paths:
import { isPathComplete } from "zod-stream";
const activePath = ["analysis", "sentiment"];
const isComplete = isPathComplete(activePath, {
_meta: {
_completedPaths: [["analysis", "sentiment"]],
_activePath: ["analysis", "keywords"],
_isValid: false
}
});
Error Handling
zod-stream
provides error handling at multiple levels:
const stream = await client.create({
completionPromise: async () => response.body,
response_model: { schema }
});
let finalResult
for await (const chunk of stream) {
finalResult = chunk
console.log("Completed paths:", chunk._meta._completedPaths);
console.log("Current path:", chunk._meta._activePath);
}
const isValid = finalResult._meta._isValid
Real-World Use Cases
1. Progressive Data Analysis
const analysisSchema = z.object({
marketData: z.object({
trends: z.array(z.object({
metric: z.string(),
value: z.number()
})),
summary: z.string()
}),
competitors: z.array(z.object({
name: z.string(),
strengths: z.array(z.string()),
weaknesses: z.array(z.string())
})),
recommendations: z.object({
immediate: z.array(z.string()),
longTerm: z.array(z.string()),
budget: z.number()
})
});
for await (const chunk of stream) {
if (isPathComplete(['marketData', 'trends'], chunk)) {
initializeCharts(chunk.marketData.trends);
}
chunk._meta._completedPaths.forEach(path => {
if (path[0] === 'competitors' && path.length === 2) {
const competitor = chunk.competitors[path[1] as number];
fetchCompetitorData(competitor.name);
}
});
if (isPathComplete(['recommendations', 'immediate'], chunk) &&
isPathComplete(['recommendations', 'budget'], chunk)) {
planBudgetAllocation({
actions: chunk.recommendations.immediate,
budget: chunk.recommendations.budget
});
}
}
2. Document Processing Pipeline
const documentSchema = z.object({
metadata: z.object({
title: z.string(),
author: z.string(),
topics: z.array(z.string())
}),
sections: z.array(z.object({
heading: z.string(),
content: z.string(),
annotations: z.array(z.object({
type: z.string(),
text: z.string(),
confidence: z.number()
}))
})),
summary: z.object({
abstract: z.string(),
keyPoints: z.array(z.string()),
readingTime: z.number()
})
});
for await (const chunk of stream) {
if (isPathComplete(['metadata'], chunk)) {
indexDocument({
title: chunk.metadata.title,
topics: chunk.metadata.topics
});
}
chunk._meta._completedPaths.forEach(path => {
if (path[0] === 'sections' && isPathComplete([...path, 'annotations'], chunk)) {
const sectionIndex = path[1] as number;
const section = chunk.sections[sectionIndex];
processAnnotations({
heading: section.heading,
annotations: section.annotations
});
}
});
if (isPathComplete(['summary', 'abstract'], chunk) &&
isPathComplete(['summary', 'readingTime'], chunk)) {
generatePreview({
abstract: chunk.summary.abstract,
readingTime: chunk.summary.readingTime
});
}
}
3. E-commerce Product Enrichment
const productSchema = z.object({
basic: z.object({
id: z.string(),
name: z.string(),
category: z.string()
}),
pricing: z.object({
base: z.number(),
discounts: z.array(z.object({
type: z.string(),
amount: z.number()
})),
final: z.number()
}),
inventory: z.object({
status: z.string(),
locations: z.array(z.object({
id: z.string(),
quantity: z.number()
}))
}),
enrichment: z.object({
seoDescription: z.string(),
searchKeywords: z.array(z.string()),
relatedProducts: z.array(z.string())
})
});
for await (const chunk of stream) {
if (isPathComplete(['basic'], chunk)) {
initializeProductCard(chunk.basic);
}
if (isPathComplete(['pricing', 'final'], chunk)) {
updatePriceDisplay(chunk.pricing.final);
if (isPathComplete(['inventory', 'status'], chunk)) {
updateBuyButton({
price: chunk.pricing.final,
status: chunk.inventory.status
});
}
}
if (isPathComplete(['enrichment', 'seoDescription'], chunk) &&
isPathComplete(['enrichment', 'searchKeywords'], chunk)) {
optimizeProductSEO({
description: chunk.enrichment.seoDescription,
keywords: chunk.enrichment.searchKeywords
});
}
if (isPathComplete(['enrichment', 'relatedProducts'], chunk)) {
prefetchRelatedProducts(chunk.enrichment.relatedProducts);
}
}
With Next.js API Routes
import { withResponseModel, OAIStream } from "zod-stream";
import { z } from "zod";
const schema = z.object({
summary: z.string(),
topics: z.array(z.string()),
sentiment: z.object({
score: z.number(),
label: z.string()
})
});
export default async function handler(req, res) {
const { content } = await req.json();
const params = withResponseModel({
response_model: {
schema,
name: "ContentAnalysis"
},
mode: "TOOLS",
params: {
messages: [{
role: "user",
content: `Analyze: ${content}`
}],
model: "gpt-4"
}
});
const stream = await oai.chat.completions.create({
...params,
stream: true
});
return new Response(OAIStream({ res: stream }));
}
With React and stream-hooks
import { useJsonStream } from "stream-hooks";
import { z } from "zod";
const schema = z.object({
summary: z.string(),
topics: z.array(z.string())
});
function AnalysisComponent() {
const [data, setData] = useState<z.infer<typeof schema>>();
const {
loading,
error,
startStream
} = useJsonStream({
schema,
onReceive: (data) => {
setData(data)
}
});
return (
<div>
<button
onClick={() => startStream({
url: "/api/extract",
method: "POST",
body: { content: "..." }
})}
disabled={loading}
>
Start Analysis
</button>
{loading && <LoadingState paths={data._meta._completedPaths} />}
{error && <ErrorDisplay error={error} />}
<ProgressiveDisplay
data={data}
isComplete={data._meta._completedPaths.length > 0}
/>
</div>
);
}
Integration with Island AI
Part of the Island AI toolkit:
Contributing
We welcome contributions! Check out:
License
MIT © hack.dance