New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

@rodgerai/workflows

Package Overview
Dependencies
Maintainers
1
Versions
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

@rodgerai/workflows

Workflow orchestration for AI agents - framework agnostic, production ready

latest
Source
npmnpm
Version
2.0.0
Version published
Maintainers
1
Created
Source

rodger/workflows

Workflow orchestration for AI agents. Build complex, multi-step processes with suspend/resume, conditional branching, and state persistence. Framework agnostic and production ready.

Features

  • Framework Agnostic - Works with Rodger agents, AI SDK, Mastra, or plain functions
  • Beautiful Fluent API - Reads like your execution flow
  • Conditional Branching - Different paths based on previous results
  • Suspend/Resume - Human-in-the-loop workflows with state persistence
  • Streaming Support - Real-time workflow execution events
  • Retry & Error Handling - Per-step retry logic with exponential backoff
  • Backend Integration - Seamless integration with @rodger/backend
  • TypeScript First - Full type inference and safety
  • Production Ready - State persistence, auth, rate limiting built-in

Installation

npm install rodger @rodger/core
# or
pnpm add @rodger/workflows @rodger/core

Quick Start

1. Create a Simple Workflow

import { createWorkflow } from 'rodger/workflows';
import { myAgent } from './agent';

const workflow = createWorkflow('data-pipeline')
  .step('extract', async (ctx) => {
    return await fetchData();
  })
  .then('transform', myAgent)
  .then('load', async (ctx) => {
    const data = ctx.previousResults.transform;
    await saveToDatabase(data);
    return { success: true };
  })
  .commit();

// Execute
const result = await workflow.execute({ source: 'api' });
console.log('Result:', result.stepResults);

2. Integrate with Backend

// app/api/workflow/route.ts
import { handleWorkflow } from 'rodger/backend';
import { workflow } from '@/lib/workflow';
import { storage } from '@/lib/storage';

export async function POST(request: Request) {
  return handleWorkflow(request, {
    workflow,
    storage, // Optional: for state persistence
  });
}

Core Concepts

Steps

Steps are the building blocks of workflows. Each step can be an agent, a function, or async operation.

const workflow = createWorkflow('example')
  // Agent step
  .step('analyze', analyzerAgent)

  // Function step
  .then('process', async (ctx) => {
    return processData(ctx.input);
  })

  // Agent with context
  .then('summarize', summarizerAgent, {
    context: (ctx) => {
      // Pass previous results to agent
      return `Process result: ${ctx.previousResults.process}`;
    }
  })

  .commit();

Context

The workflow context carries state between steps:

const workflow = createWorkflow('example')
  .step('first', async (ctx) => {
    console.log('Input:', ctx.input); // Original input
    console.log('Session:', ctx.sessionId);
    console.log('User:', ctx.userId);
    return { value: 42 };
  })
  .then('second', async (ctx) => {
    // Access previous step results
    const firstResult = ctx.previousResults.first;
    console.log('Previous:', firstResult.value); // 42

    // Access all completed steps
    console.log('Completed:', ctx.completedSteps); // ['first']

    return { doubled: firstResult.value * 2 };
  })
  .commit();

Conditional Branching

Execute different steps based on conditions:

const workflow = createWorkflow('approval')
  .step('analyze', analyzerAgent)

  // Branch 1: High confidence path
  .then('auto-approve', async (ctx) => {
    return { approved: true };
  }, {
    when: (ctx) => ctx.previousResults.analyze.confidence > 0.9
  })

  // Branch 2: Manual review path
  .then('manual-review', async (ctx) => {
    await ctx.suspend(); // Pause for human review
    return { approved: true };
  }, {
    when: (ctx) => ctx.previousResults.analyze.confidence <= 0.9
  })

  // Both branches converge here
  .after(['auto-approve', 'manual-review'])
    .step('notify', notifierAgent)

  .commit();

Suspend/Resume

Pause workflows for human approval or external data:

// Create workflow with suspension
const workflow = createWorkflow('approval')
  .step('analyze', analyzerAgent)
  .then('review', async (ctx) => {
    if (ctx.previousResults.analyze.needsReview) {
      // Suspend - saves snapshot if storage provided
      await ctx.suspend();
    }
    return { reviewed: true };
  })
  .then('execute', executorAgent)
  .commit();

// Initial execution
const result = await workflow.execute(input, { storage: workflowStorage });

if (result.status === 'suspended') {
  console.log('Awaiting review:', result.executionId);

  // Later, resume after approval
  const resumed = await workflow.resume(result.executionId, workflowStorage);
  console.log('Completed:', resumed.result);
}

Complete Example: Loan Processing

import { createAgent } from 'rodger';
import { createWorkflow } from 'rodger/workflows';

// Create specialized agents for each step
const infoCollector = createAgent({
  name: 'Info Collector',
  llm: { provider: 'openai', model: 'gpt-4o' },
  instructions: 'Collect loan application information from the user.'
});

const riskAnalyzer = createAgent({
  name: 'Risk Analyzer',
  llm: { provider: 'openai', model: 'gpt-4o' },
  instructions: 'Analyze risk factors for the loan application.',
  tools: { calculateRisk, checkCreditScore }
});

const termsGenerator = createAgent({
  name: 'Terms Generator',
  llm: { provider: 'openai', model: 'gpt-4o' },
  instructions: 'Generate loan terms based on risk analysis.',
  tools: { generateTerms }
});

// Build the workflow
const loanWorkflow = createWorkflow('loan-processing')
  // Step 1: Collect information
  .step('collect', infoCollector, {
    retry: { attempts: 2, delay: 1000 },
    timeout: 60000
  })

  // Step 2: Analyze risk
  .then('analyze', riskAnalyzer, {
    retry: { attempts: 3, delay: 500, backoff: 'exponential' }
  })

  // Step 3a: Auto-approve (low risk)
  .then('auto-approve', termsGenerator, {
    when: (ctx) => ctx.previousResults.analyze.riskScore < 0.3,
    onError: async (ctx, error) => {
      // Fallback to standard terms
      return { interestRate: 5.5, error: 'Using standard terms' };
    }
  })

  // Step 3b: Manual review (high risk)
  .after('analyze')
  .step('manual-review', async (ctx) => {
    // Suspend for human approval
    await ctx.suspend();
    return { approved: true };
  }, {
    when: (ctx) => ctx.previousResults.analyze.riskScore >= 0.3
  })

  // Step 4: Send notification (after either path)
  .after(['auto-approve', 'manual-review'])
    .step('notify', async (ctx) => {
      const hasTerms = ctx.previousResults['auto-approve'];
      const wasReviewed = ctx.previousResults['manual-review'];

      if (hasTerms) {
        await sendApprovalEmail(ctx.userId, hasTerms);
      } else if (wasReviewed) {
        await sendReviewCompleteEmail(ctx.userId);
      }

      return { notified: true };
    })

  .commit();

// Execute the workflow
const result = await loanWorkflow.execute(
  { loanAmount: 50000, creditScore: 720 },
  { sessionId: 'session-123', userId: 'user-456' }
);

console.log('Status:', result.status);
console.log('Duration:', result.duration);
console.log('Results:', result.stepResults);

Workflow Storage

Implement WorkflowStorage to enable suspend/resume across server restarts.

Interface

import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';

interface WorkflowStorage {
  saveSnapshot(snapshot: WorkflowSnapshot): Promise<void>;
  getSnapshot(executionId: string): Promise<WorkflowSnapshot | null>;
}

Redis Implementation

import { Redis } from 'ioredis';
import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';

export class RedisWorkflowStorage implements WorkflowStorage {
  constructor(private redis: Redis) {}

  async saveSnapshot(snapshot: WorkflowSnapshot): Promise<void> {
    const key = `workflow:${snapshot.executionId}`;
    // 24 hour expiry - workflows auto-expire after completion
    await this.redis.setex(key, 86400, JSON.stringify(snapshot));
  }

  async getSnapshot(executionId: string): Promise<WorkflowSnapshot | null> {
    const key = `workflow:${executionId}`;
    const data = await this.redis.get(key);
    return data ? JSON.parse(data) : null;
  }
}

// Usage
const storage = new RedisWorkflowStorage(redis);
const result = await workflow.execute(input, { storage });

Supabase Implementation

import { createClient, SupabaseClient } from '@supabase/supabase-js';
import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';

export class SupabaseWorkflowStorage implements WorkflowStorage {
  constructor(private supabase: SupabaseClient) {}

  async saveSnapshot(snapshot: WorkflowSnapshot): Promise<void> {
    await this.supabase
      .from('workflow_snapshots')
      .upsert({
        execution_id: snapshot.executionId,
        workflow_id: snapshot.workflowId,
        data: snapshot, // Store entire snapshot as JSONB
        status: snapshot.status,
        updated_at: snapshot.updatedAt,
      });
  }

  async getSnapshot(executionId: string): Promise<WorkflowSnapshot | null> {
    const { data } = await this.supabase
      .from('workflow_snapshots')
      .select('data')
      .eq('execution_id', executionId)
      .single();

    return data?.data || null;
  }
}

// Usage
const storage = new SupabaseWorkflowStorage(supabase);

Database Schema (Supabase/Postgres)

CREATE TABLE workflow_snapshots (
  execution_id TEXT PRIMARY KEY,
  workflow_id TEXT NOT NULL,
  data JSONB NOT NULL,
  status TEXT NOT NULL,
  created_at TIMESTAMPTZ DEFAULT NOW(),
  updated_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX idx_workflow_snapshots_workflow ON workflow_snapshots(workflow_id);
CREATE INDEX idx_workflow_snapshots_status ON workflow_snapshots(status);
CREATE INDEX idx_workflow_snapshots_updated ON workflow_snapshots(updated_at);

Streaming Execution

Get real-time events as workflow executes:

const workflow = createWorkflow('streaming')
  .step('step1', agent1)
  .then('step2', agent2)
  .then('step3', agent3)
  .commit();

// Stream execution events
for await (const event of workflow.executeStream(input)) {
  switch (event.type) {
    case 'workflow-start':
      console.log('Started:', event.workflowId);
      break;

    case 'step-start':
      console.log('Step starting:', event.stepName);
      break;

    case 'step-complete':
      console.log('Step done:', event.stepName, event.result);
      break;

    case 'step-error':
      console.error('Step failed:', event.stepName, event.error);
      break;

    case 'workflow-complete':
      console.log('Workflow done:', event.result);
      break;

    case 'workflow-suspended':
      console.log('Suspended at:', event.stepName);
      console.log('Resume with:', event.executionId);
      break;
  }
}

API Reference

createWorkflow(id: string)

Create a new workflow builder.

const workflow = createWorkflow('my-workflow')
  .step('first', myAgent)
  .then('second', myFunction)
  .commit();

.step(name, executor, options?)

Add a step to the workflow.

Parameters:

  • name: Unique step identifier
  • executor: Agent, function, or async operation
  • options: Step configuration

Options:

interface StepOptions {
  // Conditional execution
  when?: (ctx: WorkflowContext) => boolean | Promise<boolean>;

  // Retry configuration
  retry?: {
    attempts: number;
    delay: number;
    backoff?: 'linear' | 'exponential';
  };

  // Timeout in milliseconds
  timeout?: number;

  // Error handling
  onError?: (ctx: WorkflowContext, error: Error) => Promise<any>;

  // Context customization (for agents)
  context?: (ctx: WorkflowContext) => string;
}

.then(name, executor, options?)

Add a sequential step (alias for .step()).

.after(stepNames)

Create steps that run after specific step(s) complete.

workflow
  .step('step1', agent1)
  .step('step2', agent2)
  .after(['step1', 'step2'])
    .step('step3', agent3) // Runs after both complete
  .commit();

.commit()

Finalize and return the executable workflow.

workflow.execute(input, options?)

Execute the workflow.

Parameters:

interface ExecutionOptions {
  sessionId?: string;
  userId?: string;
  metadata?: Record<string, any>;
  storage?: WorkflowStorage;
  trace?: TraceConfig;
}

Returns:

interface WorkflowResult {
  workflowId: string;
  executionId: string;
  status: 'completed' | 'failed' | 'suspended';
  result?: any;
  error?: Error;
  stepResults: Map<string, any>;
  startedAt: Date;
  completedAt: Date;
  duration: number;
  traces?: ExecutionTraceEntry[];
}

workflow.executeStream(input, options?)

Execute with streaming events.

Returns: AsyncIterable<WorkflowStreamEvent>

workflow.resume(executionId, storage)

Resume a suspended workflow.

const result = await workflow.resume(
  'exec-123',
  workflowStorage
);

Best Practices

1. Use Specialized Agents per Step

// ✅ Good: Focused agents
const workflow = createWorkflow('process')
  .step('extract', extractorAgent)
  .then('analyze', analyzerAgent)
  .then('summarize', summarizerAgent)
  .commit();

// ❌ Bad: One agent for everything
const workflow = createWorkflow('process')
  .step('everything', generalAgent)
  .commit();

2. Implement Proper Error Handling

const workflow = createWorkflow('robust')
  .step('critical', criticalAgent, {
    retry: { attempts: 3, delay: 1000, backoff: 'exponential' },
    onError: async (ctx, error) => {
      // Fallback logic
      await notifyAdmin(error);
      return { fallback: true };
    }
  })
  .commit();

3. Use Conditional Branching

const workflow = createWorkflow('adaptive')
  .step('analyze', analyzerAgent)
  .then('fast-path', fastAgent, {
    when: (ctx) => ctx.previousResults.analyze.simple === true
  })
  .then('complex-path', complexAgent, {
    when: (ctx) => ctx.previousResults.analyze.simple === false
  })
  .commit();

4. Store Workflow State for Resume

// Always provide storage for workflows that might suspend
const result = await workflow.execute(input, {
  storage: workflowStorage, // Required for suspend/resume
  sessionId: 'session-123',
  userId: 'user-456'
});

Examples

See complete examples in the examples directory:

License

MIT

Keywords

ai

FAQs

Package last updated on 23 Oct 2025

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts