
Security News
The Hidden Blast Radius of the Axios Compromise
The Axios compromise shows how time-dependent dependency resolution makes exposure harder to detect and contain.
@rodgerai/workflows
Advanced tools
Workflow orchestration for AI agents - framework agnostic, production ready
Workflow orchestration for AI agents. Build complex, multi-step processes with suspend/resume, conditional branching, and state persistence. Framework agnostic and production ready.
npm install rodger @rodger/core
# or
pnpm add @rodger/workflows @rodger/core
import { createWorkflow } from 'rodger/workflows';
import { myAgent } from './agent';
const workflow = createWorkflow('data-pipeline')
.step('extract', async (ctx) => {
return await fetchData();
})
.then('transform', myAgent)
.then('load', async (ctx) => {
const data = ctx.previousResults.transform;
await saveToDatabase(data);
return { success: true };
})
.commit();
// Execute
const result = await workflow.execute({ source: 'api' });
console.log('Result:', result.stepResults);
// app/api/workflow/route.ts
import { handleWorkflow } from 'rodger/backend';
import { workflow } from '@/lib/workflow';
import { storage } from '@/lib/storage';
export async function POST(request: Request) {
return handleWorkflow(request, {
workflow,
storage, // Optional: for state persistence
});
}
Steps are the building blocks of workflows. Each step can be an agent, a function, or async operation.
const workflow = createWorkflow('example')
// Agent step
.step('analyze', analyzerAgent)
// Function step
.then('process', async (ctx) => {
return processData(ctx.input);
})
// Agent with context
.then('summarize', summarizerAgent, {
context: (ctx) => {
// Pass previous results to agent
return `Process result: ${ctx.previousResults.process}`;
}
})
.commit();
The workflow context carries state between steps:
const workflow = createWorkflow('example')
.step('first', async (ctx) => {
console.log('Input:', ctx.input); // Original input
console.log('Session:', ctx.sessionId);
console.log('User:', ctx.userId);
return { value: 42 };
})
.then('second', async (ctx) => {
// Access previous step results
const firstResult = ctx.previousResults.first;
console.log('Previous:', firstResult.value); // 42
// Access all completed steps
console.log('Completed:', ctx.completedSteps); // ['first']
return { doubled: firstResult.value * 2 };
})
.commit();
Execute different steps based on conditions:
const workflow = createWorkflow('approval')
.step('analyze', analyzerAgent)
// Branch 1: High confidence path
.then('auto-approve', async (ctx) => {
return { approved: true };
}, {
when: (ctx) => ctx.previousResults.analyze.confidence > 0.9
})
// Branch 2: Manual review path
.then('manual-review', async (ctx) => {
await ctx.suspend(); // Pause for human review
return { approved: true };
}, {
when: (ctx) => ctx.previousResults.analyze.confidence <= 0.9
})
// Both branches converge here
.after(['auto-approve', 'manual-review'])
.step('notify', notifierAgent)
.commit();
Pause workflows for human approval or external data:
// Create workflow with suspension
const workflow = createWorkflow('approval')
.step('analyze', analyzerAgent)
.then('review', async (ctx) => {
if (ctx.previousResults.analyze.needsReview) {
// Suspend - saves snapshot if storage provided
await ctx.suspend();
}
return { reviewed: true };
})
.then('execute', executorAgent)
.commit();
// Initial execution
const result = await workflow.execute(input, { storage: workflowStorage });
if (result.status === 'suspended') {
console.log('Awaiting review:', result.executionId);
// Later, resume after approval
const resumed = await workflow.resume(result.executionId, workflowStorage);
console.log('Completed:', resumed.result);
}
import { createAgent } from 'rodger';
import { createWorkflow } from 'rodger/workflows';
// Create specialized agents for each step
const infoCollector = createAgent({
name: 'Info Collector',
llm: { provider: 'openai', model: 'gpt-4o' },
instructions: 'Collect loan application information from the user.'
});
const riskAnalyzer = createAgent({
name: 'Risk Analyzer',
llm: { provider: 'openai', model: 'gpt-4o' },
instructions: 'Analyze risk factors for the loan application.',
tools: { calculateRisk, checkCreditScore }
});
const termsGenerator = createAgent({
name: 'Terms Generator',
llm: { provider: 'openai', model: 'gpt-4o' },
instructions: 'Generate loan terms based on risk analysis.',
tools: { generateTerms }
});
// Build the workflow
const loanWorkflow = createWorkflow('loan-processing')
// Step 1: Collect information
.step('collect', infoCollector, {
retry: { attempts: 2, delay: 1000 },
timeout: 60000
})
// Step 2: Analyze risk
.then('analyze', riskAnalyzer, {
retry: { attempts: 3, delay: 500, backoff: 'exponential' }
})
// Step 3a: Auto-approve (low risk)
.then('auto-approve', termsGenerator, {
when: (ctx) => ctx.previousResults.analyze.riskScore < 0.3,
onError: async (ctx, error) => {
// Fallback to standard terms
return { interestRate: 5.5, error: 'Using standard terms' };
}
})
// Step 3b: Manual review (high risk)
.after('analyze')
.step('manual-review', async (ctx) => {
// Suspend for human approval
await ctx.suspend();
return { approved: true };
}, {
when: (ctx) => ctx.previousResults.analyze.riskScore >= 0.3
})
// Step 4: Send notification (after either path)
.after(['auto-approve', 'manual-review'])
.step('notify', async (ctx) => {
const hasTerms = ctx.previousResults['auto-approve'];
const wasReviewed = ctx.previousResults['manual-review'];
if (hasTerms) {
await sendApprovalEmail(ctx.userId, hasTerms);
} else if (wasReviewed) {
await sendReviewCompleteEmail(ctx.userId);
}
return { notified: true };
})
.commit();
// Execute the workflow
const result = await loanWorkflow.execute(
{ loanAmount: 50000, creditScore: 720 },
{ sessionId: 'session-123', userId: 'user-456' }
);
console.log('Status:', result.status);
console.log('Duration:', result.duration);
console.log('Results:', result.stepResults);
Implement WorkflowStorage to enable suspend/resume across server restarts.
import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';
interface WorkflowStorage {
saveSnapshot(snapshot: WorkflowSnapshot): Promise<void>;
getSnapshot(executionId: string): Promise<WorkflowSnapshot | null>;
}
import { Redis } from 'ioredis';
import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';
export class RedisWorkflowStorage implements WorkflowStorage {
constructor(private redis: Redis) {}
async saveSnapshot(snapshot: WorkflowSnapshot): Promise<void> {
const key = `workflow:${snapshot.executionId}`;
// 24 hour expiry - workflows auto-expire after completion
await this.redis.setex(key, 86400, JSON.stringify(snapshot));
}
async getSnapshot(executionId: string): Promise<WorkflowSnapshot | null> {
const key = `workflow:${executionId}`;
const data = await this.redis.get(key);
return data ? JSON.parse(data) : null;
}
}
// Usage
const storage = new RedisWorkflowStorage(redis);
const result = await workflow.execute(input, { storage });
import { createClient, SupabaseClient } from '@supabase/supabase-js';
import type { WorkflowStorage, WorkflowSnapshot } from 'rodger/workflows';
export class SupabaseWorkflowStorage implements WorkflowStorage {
constructor(private supabase: SupabaseClient) {}
async saveSnapshot(snapshot: WorkflowSnapshot): Promise<void> {
await this.supabase
.from('workflow_snapshots')
.upsert({
execution_id: snapshot.executionId,
workflow_id: snapshot.workflowId,
data: snapshot, // Store entire snapshot as JSONB
status: snapshot.status,
updated_at: snapshot.updatedAt,
});
}
async getSnapshot(executionId: string): Promise<WorkflowSnapshot | null> {
const { data } = await this.supabase
.from('workflow_snapshots')
.select('data')
.eq('execution_id', executionId)
.single();
return data?.data || null;
}
}
// Usage
const storage = new SupabaseWorkflowStorage(supabase);
CREATE TABLE workflow_snapshots (
execution_id TEXT PRIMARY KEY,
workflow_id TEXT NOT NULL,
data JSONB NOT NULL,
status TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL
);
CREATE INDEX idx_workflow_snapshots_workflow ON workflow_snapshots(workflow_id);
CREATE INDEX idx_workflow_snapshots_status ON workflow_snapshots(status);
CREATE INDEX idx_workflow_snapshots_updated ON workflow_snapshots(updated_at);
Get real-time events as workflow executes:
const workflow = createWorkflow('streaming')
.step('step1', agent1)
.then('step2', agent2)
.then('step3', agent3)
.commit();
// Stream execution events
for await (const event of workflow.executeStream(input)) {
switch (event.type) {
case 'workflow-start':
console.log('Started:', event.workflowId);
break;
case 'step-start':
console.log('Step starting:', event.stepName);
break;
case 'step-complete':
console.log('Step done:', event.stepName, event.result);
break;
case 'step-error':
console.error('Step failed:', event.stepName, event.error);
break;
case 'workflow-complete':
console.log('Workflow done:', event.result);
break;
case 'workflow-suspended':
console.log('Suspended at:', event.stepName);
console.log('Resume with:', event.executionId);
break;
}
}
Create a new workflow builder.
const workflow = createWorkflow('my-workflow')
.step('first', myAgent)
.then('second', myFunction)
.commit();
Add a step to the workflow.
Parameters:
name: Unique step identifierexecutor: Agent, function, or async operationoptions: Step configurationOptions:
interface StepOptions {
// Conditional execution
when?: (ctx: WorkflowContext) => boolean | Promise<boolean>;
// Retry configuration
retry?: {
attempts: number;
delay: number;
backoff?: 'linear' | 'exponential';
};
// Timeout in milliseconds
timeout?: number;
// Error handling
onError?: (ctx: WorkflowContext, error: Error) => Promise<any>;
// Context customization (for agents)
context?: (ctx: WorkflowContext) => string;
}
Add a sequential step (alias for .step()).
Create steps that run after specific step(s) complete.
workflow
.step('step1', agent1)
.step('step2', agent2)
.after(['step1', 'step2'])
.step('step3', agent3) // Runs after both complete
.commit();
Finalize and return the executable workflow.
Execute the workflow.
Parameters:
interface ExecutionOptions {
sessionId?: string;
userId?: string;
metadata?: Record<string, any>;
storage?: WorkflowStorage;
trace?: TraceConfig;
}
Returns:
interface WorkflowResult {
workflowId: string;
executionId: string;
status: 'completed' | 'failed' | 'suspended';
result?: any;
error?: Error;
stepResults: Map<string, any>;
startedAt: Date;
completedAt: Date;
duration: number;
traces?: ExecutionTraceEntry[];
}
Execute with streaming events.
Returns: AsyncIterable<WorkflowStreamEvent>
Resume a suspended workflow.
const result = await workflow.resume(
'exec-123',
workflowStorage
);
// ✅ Good: Focused agents
const workflow = createWorkflow('process')
.step('extract', extractorAgent)
.then('analyze', analyzerAgent)
.then('summarize', summarizerAgent)
.commit();
// ❌ Bad: One agent for everything
const workflow = createWorkflow('process')
.step('everything', generalAgent)
.commit();
const workflow = createWorkflow('robust')
.step('critical', criticalAgent, {
retry: { attempts: 3, delay: 1000, backoff: 'exponential' },
onError: async (ctx, error) => {
// Fallback logic
await notifyAdmin(error);
return { fallback: true };
}
})
.commit();
const workflow = createWorkflow('adaptive')
.step('analyze', analyzerAgent)
.then('fast-path', fastAgent, {
when: (ctx) => ctx.previousResults.analyze.simple === true
})
.then('complex-path', complexAgent, {
when: (ctx) => ctx.previousResults.analyze.simple === false
})
.commit();
// Always provide storage for workflows that might suspend
const result = await workflow.execute(input, {
storage: workflowStorage, // Required for suspend/resume
sessionId: 'session-123',
userId: 'user-456'
});
See complete examples in the examples directory:
MIT
FAQs
Workflow orchestration for AI agents - framework agnostic, production ready
We found that @rodgerai/workflows demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
The Axios compromise shows how time-dependent dependency resolution makes exposure harder to detect and contain.

Research
A supply chain attack on Axios introduced a malicious dependency, plain-crypto-js@4.2.1, published minutes earlier and absent from the project’s GitHub releases.

Research
Malicious versions of the Telnyx Python SDK on PyPI delivered credential-stealing malware via a multi-stage supply chain attack.