New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

dag-workflow-engine

Package Overview
Dependencies
Maintainers
1
Versions
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

dag-workflow-engine

A production-grade DAG-based API orchestration engine for Node.js

latest
npmnpm
Version
1.0.0
Version published
Weekly downloads
3
200%
Maintainers
1
Weekly downloads
 
Created
Source

API Chainer

A production-grade, DAG-based API orchestration engine for Node.js.

Features

  • DAG-Based Execution: Define workflows as Directed Acyclic Graphs with automatic dependency resolution
  • Parallel Execution: Concurrent task execution with configurable concurrency limits
  • Built-in Executors: HTTP and function executors out of the box
  • Extensible: Register custom executors for any task type
  • Retry & Timeout: Per-task retry policies with fixed or exponential backoff
  • Error Modes: fail-fast or continue execution on failures
  • Template Variables: Dynamic value interpolation using {{nodeId.path}} syntax
  • Observability: Pluggable logger, metrics, and tracer interfaces
  • Lifecycle Events: Subscribe to workflow and node events
  • Type-Safe: Full TypeScript support with comprehensive type definitions

Installation

npm install api-chainer

Quick Start

import { ApiChainer } from 'api-chainer';

const chainer = new ApiChainer({
  concurrency: 5,
  errorMode: 'fail-fast',
});

const result = await chainer.run({
  tasks: [
    // First task: fetch user data
    {
      id: 'getUser',
      type: 'http',
      request: {
        method: 'GET',
        url: 'https://api.example.com/users/123',
      },
    },
    // Second task: fetch user's posts (depends on first task)
    {
      id: 'getPosts',
      type: 'http',
      dependsOn: ['getUser'],
      request: {
        method: 'GET',
        url: 'https://api.example.com/users/{{getUser.data.id}}/posts',
      },
    },
    // Third task: process results with a function
    {
      id: 'processData',
      type: 'function',
      dependsOn: ['getUser', 'getPosts'],
      handler: async (inputs, results) => {
        return {
          user: results.getUser.data,
          postCount: results.getPosts.data.length,
        };
      },
    },
  ],
});

console.log(result.success); // true or false
console.log(result.results); // { getUser: {...}, getPosts: {...}, processData: {...} }

Workflow Definition

Task Types

HTTP Tasks

{
  id: 'fetchData',
  type: 'http',
  request: {
    method: 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE',
    url: 'https://api.example.com/data',
    headers: { 'Authorization': 'Bearer {{inputs.token}}' },
    params: { page: 1 },
    body: { name: '{{inputs.name}}' },
    responseType: 'json' | 'text' | 'blob',
  },
  policies: {
    timeout: 5000,
    retry: { attempts: 3, backoff: 'exponential', delayMs: 100 },
  },
}

Function Tasks

{
  id: 'transform',
  type: 'function',
  dependsOn: ['fetchData'],
  handler: async (inputs, results) => {
    // inputs = original workflow input
    // results = { fetchData: { status, data, ... } }
    return transformedData;
  },
}

Template Variables

Use {{path}} syntax to reference values:

  • {{inputs.propertyName}} - Access workflow input
  • {{nodeId.data.path}} - Access result from completed node
{
  url: 'https://api.example.com/users/{{getUser.data.id}}',
  headers: { 'Authorization': 'Bearer {{inputs.token}}' },
  body: { userId: '{{getUser.data.id}}', name: '{{inputs.name}}' },
}

Configuration

const chainer = new ApiChainer({
  // Max concurrent task executions
  concurrency: 10,
  
  // Error handling: 'fail-fast' stops on first error, 'continue' runs independent branches
  errorMode: 'fail-fast',
  
  // Default timeout for all tasks (ms)
  defaultTimeout: 30000,
  
  // HTTP executor defaults
  http: {
    baseUrl: 'https://api.example.com',
    defaultHeaders: { 'X-API-Key': 'your-key' },
  },
  
  // Observability
  observability: {
    logger: customLogger,
    metrics: customMetrics,
    tracer: customTracer,
  },
});

Lifecycle Events

chainer.lifecycle.on('workflow:start', (event) => {
  console.log(`Workflow started: ${event.traceId}`);
});

chainer.lifecycle.on('node:success', (event) => {
  console.log(`Node ${event.nodeId} completed in ${event.durationMs}ms`);
});

chainer.lifecycle.on('node:failure', (event) => {
  console.error(`Node ${event.nodeId} failed:`, event.error);
});

chainer.lifecycle.on('workflow:end', (event) => {
  console.log(`Workflow completed: success=${event.success}, duration=${event.durationMs}ms`);
});

Custom Executors

import { Executor, DagNode, ContextSnapshot } from 'api-chainer';

class DatabaseExecutor implements Executor {
  async execute(node: DagNode, context: ContextSnapshot, signal: AbortSignal) {
    const config = node.config as { query: string };
    // Execute database query...
    return { rows: [...] };
  }
}

chainer.registerExecutor('database', new DatabaseExecutor());

Error Handling

Fail-Fast Mode (default)

Stops workflow execution immediately when any task fails.

Continue Mode

Continues executing independent branches even if some tasks fail. Dependent tasks are automatically skipped.

const result = await chainer.run(workflow);

if (!result.success) {
  console.log('Failed nodes:', Object.keys(result.errors));
  console.log('Skipped:', result.stats.skipped);
}

Result Format

interface WorkflowResult {
  success: boolean;
  results: Record<string, unknown>;
  errors?: Record<string, Error>;
  durationMs: number;
  traceId: string;
  stats: {
    total: number;
    completed: number;
    failed: number;
    skipped: number;
  };
}

License

MIT

Keywords

dag

FAQs

Package last updated on 09 Jan 2026

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts