New Case Study:See how Anthropic automated 95% of dependency reviews with Socket.Learn More
Sign inDemoInstall


Package Overview
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies



A client for node or the browser to generate and consume streaming json

  • 3.0.0
  • latest
  • Source
  • npm
  • Socket score

Version published
Weekly downloads
increased by4.49%
Weekly downloads


> Type-safe structured extraction from LLM streams with progressive validation

zod-stream Island AI docs follow

zod-stream adds structured output validation and streaming capabilities to LLM responses. Built on top of schema-stream, it enables type-safe extraction with progressive validation.

Key Features

  • 🔄 Stream structured LLM outputs with validation
  • 🎯 Multiple response modes (TOOLS, FUNCTIONS, JSON, etc.)
  • 📝 OpenAI client integration
  • 🌳 Progressive validation with partial results
  • ⚡ Built on schema-stream
  • 🔍 Full TypeScript support

Why zod-stream?

zod-stream solves key challenges in handling streaming LLM responses:

  • Dependency Management: Process data as soon as dependencies are met, rather than waiting for complete responses

    if (isPathComplete(['user', 'preferences'], chunk)) {
      // Start personalizing immediately, don't wait for content
  • Type-Safe LLM Integration: Full TypeScript support for structured outputs from OpenAI and other providers

    const params = withResponseModel({
      response_model: { schema, name: "Extract" },
      mode: "TOOLS"  // or "FUNCTIONS", "JSON", etc.
  • Progressive Processing: Built on schema-stream for immediate access to partial results

    for await (const chunk of stream) {
      // Safely access partial data with full type inference
      chunk._meta._completedPaths.forEach(path => {
        processDependency(path, chunk);
  • Provider Flexibility: Consistent interface across different LLM response formats

    // Works with various response modes
    const stream = OAIStream({ res: completion });  // OpenAI tools/functions
    const stream = JSONStream({ res: completion }); // Direct JSON

Think of it as a type-safe pipeline for handling streaming LLM data where you need to:

  • Start processing before the full response arrives
  • Ensure type safety throughout the stream
  • Handle complex data dependencies
  • Work with multiple LLM response formats


# npm
npm install zod-stream zod openai

# pnpm
pnpm add zod-stream zod openai

# bun
bun add zod-stream zod openai

Core Concepts

The ZodStream client provides real-time validation and metadata for streaming LLM responses:

import ZodStream from "zod-stream";
import { z } from "zod";

const client = new ZodStream({
  debug: true  // Enable debug logging

// Define your extraction schema
const schema = z.object({
  content: z.string(),
  metadata: z.object({
    confidence: z.number(),
    category: z.string()

// Create streaming extraction
const stream = await client.create({
  completionPromise: async () => {
    const response = await fetch("/api/extract", {
      method: "POST",
      body: JSON.stringify({ prompt: "..." })
    return response.body;
  response_model: {
    name: "ContentExtraction"

// Process with validation metadata
for await (const chunk of stream) {
    data: chunk,              // Partial extraction result
    isValid: chunk._meta._isValid,    // Current validation state
    activePath: chunk._meta._activePath,    // Currently processing path
    completedPaths: chunk._meta._completedPaths  // Completed paths

Progressive Processing

zod-stream enables processing dependent data as soon as relevant paths complete, without waiting for the full response:

// Define schema for a complex analysis
const schema = z.object({
  user: z.object({
    id: z.string(),
    preferences: z.object({
      theme: z.string(),
      language: z.string()
  content: z.object({
    title: z.string(),
    body: z.string(),
    metadata: z.object({
      keywords: z.array(z.string()),
      category: z.string()
  recommendations: z.array(z.object({
    id: z.string(),
    score: z.number(),
    reason: z.string()

// Process data as it becomes available
for await (const chunk of stream) {
  // Start personalizing UI as soon as user preferences are ready
  if (isPathComplete(['user', 'preferences'], chunk)) {

  // Begin content indexing once we have title and keywords
  if (isPathComplete(['content', 'metadata', 'keywords'], chunk) && 
      isPathComplete(['content', 'title'], chunk)) {
      title: chunk.content.title,
      keywords: chunk.content.metadata.keywords

  // Start fetching recommended content in parallel
  chunk._meta._completedPaths.forEach(path => {
    if (path[0] === 'recommendations' && path.length === 2) {
      const index = path[1] as number;
      const recommendation = chunk.recommendations[index];
      if (recommendation?.id) {

This approach enables:

  • Early UI updates based on user preferences
  • Parallel processing of independent data
  • Optimistic loading of related content
  • Better perceived performance
  • Resource optimization

Stream Metadata

Every streamed chunk includes metadata about validation state:

type CompletionMeta = {
  _isValid: boolean;           // Schema validation status
  _activePath: (string | number)[];     // Current parsing path
  _completedPaths: (string | number)[][]; // All completed paths

// Example chunk
  content: "partial content...",
  metadata: {
    confidence: 0.95
  _meta: {
    _isValid: false,  // Not valid yet
    _activePath: ["metadata", "category"],
    _completedPaths: [
      ["metadata", "confidence"]

Schema Stubs

Get typed stub objects for initialization:

const schema = z.object({
  users: z.array(z.object({
    name: z.string(),
    age: z.number()

const client = new ZodStream();
const stub = client.getSchemaStub({
  defaultData: {
    users: [{ name: "loading...", age: 0 }]

Debug Logging

Enable detailed logging for debugging:

const client = new ZodStream({ debug: true });

// Logs will include:
// - Stream initialization
// - Validation results
// - Path completion
// - Errors with full context

Using Response Models

The withResponseModel helper configures OpenAI parameters based on your schema and chosen mode:

import { withResponseModel } from "zod-stream";
import { z } from "zod";

const schema = z.object({
  sentiment: z.string(),
  keywords: z.array(z.string()),
  confidence: z.number()

// Configure for OpenAI tools mode
const params = withResponseModel({
  response_model: {
    name: "Analysis",
    description: "Extract sentiment and keywords"
  mode: "TOOLS",
  params: {
    messages: [{ role: "user", content: "Analyze this text..." }],
    model: "gpt-4"

const completion = await{
  stream: true

Response Modes

zod-stream supports multiple modes for structured LLM responses:

import { MODE } from "zod-stream";

const modes = {
  FUNCTIONS: "FUNCTIONS",   // OpenAI function calling
  TOOLS: "TOOLS",          // OpenAI tools API
  JSON: "JSON",            // Direct JSON response
  MD_JSON: "MD_JSON",      // JSON in markdown blocks
  JSON_SCHEMA: "JSON_SCHEMA", // JSON with schema validation
  THINKING_MD_JSON: "THINKING_MD_JSON" // JSON with thinking in markdown blocks (deepseek r1)
} as const;

Mode-Specific Behaviors

// Results in OpenAI tool configuration
  tool_choice: {
    type: "function",
    function: { name: "Analysis" }
  tools: [{
    type: "function",
    function: {
      name: "Analysis",
      description: "Extract sentiment and keywords",
      parameters: {/* Generated from schema */}
FUNCTIONS Mode (Legacy)
// Results in OpenAI function configuration
  function_call: { name: "Analysis" },
  functions: [{
    name: "Analysis",
    description: "Extract sentiment and keywords",
    parameters: {/* Generated from schema */}
// Results in direct JSON response configuration
  response_format: { type: "json_object" },
  messages: [
      role: "system",
      content: "Return JSON matching schema..."
    // ... user messages

Response Parsing

Built-in parsers handle different response formats:

import { 
} from "zod-stream";

// Automatic format detection
const result = OAIResponseParser(response);

// Format-specific parsing
const toolArgs = OAIResponseToolArgsParser(response);
const fnArgs = OAIResponseFnArgsParser(response);
const jsonContent = OAIResponseJSONParser(response);
const thinkingJson = thinkingJsonParser(response);

Streaming Utilities

Handle streaming responses with built-in utilities:

import { OAIStream, readableStreamToAsyncGenerator } from "zod-stream";

// Create streaming response"/api/stream", async (req, res) => {
  const completion = await{
    stream: true

  return new Response(
    OAIStream({ res: completion })

// Convert stream to async generator
const generator = readableStreamToAsyncGenerator(stream);
for await (const chunk of generator) {

Path Tracking Utilities

Monitor completion status of specific paths:

import { isPathComplete } from "zod-stream";

const activePath = ["analysis", "sentiment"];
const isComplete = isPathComplete(activePath, {
  _meta: {
    _completedPaths: [["analysis", "sentiment"]],
    _activePath: ["analysis", "keywords"],
    _isValid: false

Error Handling

zod-stream provides error handling at multiple levels:

const stream = await client.create({
  completionPromise: async () => response.body,
  response_model: { schema }

let finalResult

// Path tracking for progressive updates
for await (const chunk of stream) {
  finalResult = chunk
  // Check which paths are complete
  console.log("Completed paths:", chunk._meta._completedPaths);
  console.log("Current path:", chunk._meta._activePath);

// Final validation happens after stream completes
const isValid = finalResult._meta._isValid

Real-World Use Cases

1. Progressive Data Analysis

const analysisSchema = z.object({
  marketData: z.object({
    trends: z.array(z.object({
      metric: z.string(),
      value: z.number()
    summary: z.string()
  competitors: z.array(z.object({
    name: z.string(),
    strengths: z.array(z.string()),
    weaknesses: z.array(z.string())
  recommendations: z.object({
    immediate: z.array(z.string()),
    longTerm: z.array(z.string()),
    budget: z.number()

for await (const chunk of stream) {
  // Start visualizing market trends immediately
  if (isPathComplete(['marketData', 'trends'], chunk)) {

  // Begin competitor analysis in parallel
  chunk._meta._completedPaths.forEach(path => {
    if (path[0] === 'competitors' && path.length === 2) {
      const competitor = chunk.competitors[path[1] as number];

  // Start budget planning once we have immediate recommendations
  if (isPathComplete(['recommendations', 'immediate'], chunk) && 
      isPathComplete(['recommendations', 'budget'], chunk)) {
      actions: chunk.recommendations.immediate,
      budget: chunk.recommendations.budget

2. Document Processing Pipeline

const documentSchema = z.object({
  metadata: z.object({
    title: z.string(),
    author: z.string(),
    topics: z.array(z.string())
  sections: z.array(z.object({
    heading: z.string(),
    content: z.string(),
    annotations: z.array(z.object({
      type: z.string(),
      text: z.string(),
      confidence: z.number()
  summary: z.object({
    abstract: z.string(),
    keyPoints: z.array(z.string()),
    readingTime: z.number()

for await (const chunk of stream) {
  // Start document indexing as soon as metadata is available
  if (isPathComplete(['metadata'], chunk)) {
      title: chunk.metadata.title,
      topics: chunk.metadata.topics

  // Process sections as they complete
  chunk._meta._completedPaths.forEach(path => {
    if (path[0] === 'sections' && isPathComplete([...path, 'annotations'], chunk)) {
      const sectionIndex = path[1] as number;
      const section = chunk.sections[sectionIndex];
      // Process annotations for each completed section
        heading: section.heading,
        annotations: section.annotations

  // Generate preview once we have abstract and reading time
  if (isPathComplete(['summary', 'abstract'], chunk) && 
      isPathComplete(['summary', 'readingTime'], chunk)) {
      abstract: chunk.summary.abstract,
      readingTime: chunk.summary.readingTime

3. E-commerce Product Enrichment

const productSchema = z.object({
  basic: z.object({
    id: z.string(),
    name: z.string(),
    category: z.string()
  pricing: z.object({
    base: z.number(),
    discounts: z.array(z.object({
      type: z.string(),
      amount: z.number()
    final: z.number()
  inventory: z.object({
    status: z.string(),
    locations: z.array(z.object({
      id: z.string(),
      quantity: z.number()
  enrichment: z.object({
    seoDescription: z.string(),
    searchKeywords: z.array(z.string()),
    relatedProducts: z.array(z.string())

for await (const chunk of stream) {
  // Start inventory checks as soon as basic info is available
  if (isPathComplete(['basic'], chunk)) {

  // Update pricing as soon as final price is calculated
  if (isPathComplete(['pricing', 'final'], chunk)) {
    // If we also have inventory, update buy button
    if (isPathComplete(['inventory', 'status'], chunk)) {
        status: chunk.inventory.status

  // Start SEO optimization in parallel
  if (isPathComplete(['enrichment', 'seoDescription'], chunk) &&
      isPathComplete(['enrichment', 'searchKeywords'], chunk)) {
      description: chunk.enrichment.seoDescription,
      keywords: chunk.enrichment.searchKeywords

  // Prefetch related products as they're identified
  if (isPathComplete(['enrichment', 'relatedProducts'], chunk)) {

With Next.js API Routes

// pages/api/extract.ts
import { withResponseModel, OAIStream } from "zod-stream";
import { z } from "zod";

const schema = z.object({
  summary: z.string(),
  topics: z.array(z.string()),
  sentiment: z.object({
    score: z.number(),
    label: z.string()

export default async function handler(req, res) {
  const { content } = await req.json();

  const params = withResponseModel({
    response_model: { 
      name: "ContentAnalysis"
    mode: "TOOLS",
    params: {
      messages: [{ 
        role: "user", 
        content: `Analyze: ${content}` 
      model: "gpt-4"

  const stream = await{
    stream: true

  return new Response(OAIStream({ res: stream }));

With React and stream-hooks

import { useJsonStream } from "stream-hooks";
import { z } from "zod";

const schema = z.object({
  summary: z.string(),
  topics: z.array(z.string())

function AnalysisComponent() {
  const [data, setData] = useState<z.infer<typeof schema>>();

  const { 
  } = useJsonStream({
    onReceive: (data) => {

  return (
        onClick={() => startStream({
          url: "/api/extract",
          method: "POST",
          body: { content: "..." }
        Start Analysis

      {loading && <LoadingState paths={data._meta._completedPaths} />}
      {error && <ErrorDisplay error={error} />}
        isComplete={data._meta._completedPaths.length > 0}

Integration with Island AI

Part of the Island AI toolkit:


We welcome contributions! Check out:





Package last updated on 27 Jan 2025

Did you know?


Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.


Related posts

SocketSocket SOC 2 Logo


  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog



Stay in touch

Get open source security insights delivered straight into your inbox.

  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc