New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details
Socket
Book a DemoSign in
Socket

swarmflow

Package Overview
Dependencies
Maintainers
1
Versions
23
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

swarmflow

SwarmFlow: A distributed multi-agent orchestration framework

Source
pipPyPI
Version
0.5.0
Maintainers
1

SwarmFlow

A distributed multi-agent orchestration framework for building scalable AI workflows with comprehensive observability.

🚀 Features

  • Dead-Simple API: Minimal @swarm_task decorator and run() function
  • Auto-Dependency Inference: Dependencies automatically inferred from function parameters
  • Agent Orchestration: Create complex workflows with multiple AI agents
  • Retry Logic: Built-in retry mechanisms for resilient agent execution
  • Observability: OpenTelemetry integration for tracing and monitoring
  • Error Handling: Graceful failure propagation and recovery
  • Real-time Monitoring: Send task traces to your monitoring dashboard
  • Cycle Detection: Automatic detection of circular dependencies
  • Production Ready: Comprehensive error handling and logging
  • Hooks System: Powerful before/after/error/final hooks for custom orchestration logic
  • Shared Memory: Cross-task state sharing with flow.memory
  • Policy Enforcement: DAG-level rules for cost limits, abort conditions, and validation
  • Modular Telemetry: Comprehensive provider support with automatic metadata extraction
  • Cost Tracking: Automatic cost calculation and tracking across all major LLM providers

📦 Installation

pip install swarmflow

🎯 Quick Start

from swarmflow import swarm_task, run

@swarm_task
def fetch_data():
    return "Some data from API"

@swarm_task
def process_data(fetch_data):
    return f"Processed: {fetch_data}"

@swarm_task
def display_result(process_data):
    print(f"Final result: {process_data}")

# Run workflow - that's it!
run()

That's it! No complex setup, no manual dependency management. SwarmFlow automatically:

  • ✅ Registers your tasks
  • ✅ Infers dependencies from function parameters
  • ✅ Executes in the correct order
  • ✅ Handles retries and errors
  • ✅ Sends traces to your dashboard

🔧 Advanced Usage

Retry Logic

@swarm_task(retries=3)
def unreliable_task():
    # This task will retry up to 3 times on failure
    pass

Multiple Dependencies

@swarm_task
def step1():
    return "Step 1 completed"

@swarm_task
def step2():
    return "Step 2 completed"

@swarm_task
def step3():
    return "Step 3 completed"

@swarm_task
def final_step(step1, step2, step3):
    # Dependencies automatically inferred from parameter names
    return f"Combined: {step1}, {step2}, {step3}"

run()

Multi-Provider LLM Support

from groq import Groq
from openai import OpenAI
from anthropic import Anthropic

@swarm_task
def groq_task():
    client = Groq()
    response = client.chat.completions.create(
        model="llama-3-70b",
        messages=[{"role": "user", "content": "Hello"}]
    )
    return response

@swarm_task
def openai_task():
    client = OpenAI()
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Hello"}]
    )
    return response

@swarm_task
def anthropic_task():
    client = Anthropic()
    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=100,
        messages=[{"role": "user", "content": "Hello"}]
    )
    return response

# SwarmFlow automatically detects and extracts metadata from all providers:
# - Model name and provider identification
# - Token usage (prompt + completion tokens)
# - Precise cost calculation (USD) using current pricing
# - Timing metrics (queue, prompt, completion, total time)
# - All added to task.metadata automatically

# Example output with metadata:
# [SwarmFlow] Task: groq_task
#   ↳ Status: success
#   ↳ Duration: 1234 ms
#   ↳ Output: <Groq ChatCompletion object>
#   ↳ Metadata: {'provider': 'Groq', 'model': 'llama-3-70b', 'tokens_used': 150, 'cost_usd': 0.000089, 'queue_time_s': 0.1, 'prompt_time_s': 0.5, 'completion_time_s': 0.8, 'total_time_s': 1.4}

API Key Configuration

SwarmFlow automatically handles API keys with Martian-style simplicity:

# Option 1: Set environment variable
export SWARMFLOW_API_KEY="sk_abc123..."
run()  # Automatically uses key from environment

# Option 2: Pass directly
run(api_key="sk_abc123...")

# Option 3: No key (logs warning but continues)
run()  # Shows warning but executes normally

Hooks System & Shared Memory

SwarmFlow now includes a powerful hooks system for custom orchestration logic and shared memory for cross-task state management:

from swarmflow import swarm_task, run
from swarmflow.hooks import write_output_to_memory, read_memory_into_arg, log_input_output

@swarm_task(before=log_input_output()[0], after=log_input_output()[1])
def fetch_data():
    return "Some data from API"

@swarm_task(after=write_output_to_memory("processed_data"))
def process_data(fetch_data):
    return f"Processed: {fetch_data}"

@swarm_task(before=read_memory_into_arg("processed_data", "input_data"))
def display_result(process_data, input_data=None):
    print(f"Final result: {process_data}")
    print(f"From memory: {input_data}")

# Run workflow - that's it!
run()

Available Hooks:

  • before: Execute before task runs
  • after: Execute after task succeeds
  • on_error: Execute when task fails
  • on_final: Execute after task completes (success or failure)

Built-in Hook Utilities:

  • write_output_to_memory(key): Save task output to shared memory
  • read_memory_into_arg(mem_key, arg_name): Inject memory value into task arguments
  • log_input_output(): Log task inputs and outputs
  • enforce_max_cost(max_usd): Abort if total cost exceeds limit
  • set_flag_on_failure(flag_key): Set memory flag when task fails
  • skip_if_flag_set(flag_key): Skip task if memory flag is True

Policy Enforcement

Set DAG-level policies for cost limits, abort conditions, and validation:

from swarmflow import swarm_task, run, SwarmFlow

# Create flow for policy configuration
flow = SwarmFlow()
flow.set_policy("max_cost", 0.10)  # Abort if total cost > $0.10
flow.set_policy("abort_on_flag", "error_detected")  # Abort if flag is True
flow.set_policy("require_outputs", ["final_result"])  # Abort if missing outputs

@swarm_task
def task1():
    return "Task 1 result"

@swarm_task
def task2(task1):
    return "Task 2 result"

# Run with policies enforced
run()

Real-time Monitoring

SwarmFlow automatically sends task traces to the SwarmFlow backend service at http://localhost:8000/api/trace for real-time monitoring and analytics.

Trace Structure:

{
  "id": "task-uuid",
  "run_id": "dag-run-uuid",  // Consistent across all tasks in the same DAG run
  "name": "task_name",
  "status": "success|failure|retrying|skipped",
  "duration_ms": 1234,
  "output": "task output",
  "metadata": {
    "agent": "LLMProcessor",
    "provider": "Groq",
    "model": "llama-3-70b",
    "tokens_used": 150,
    "cost_usd": 0.000089
  },
  "dependencies": ["dep1", "dep2"],
  "flow_memory": {"key": "value"},  // Shared memory state
  "flow_policy": {"max_cost": 0.10}  // Active policies
}

Observability

SwarmFlow automatically provides:

  • Task execution traces with OpenTelemetry
  • Performance metrics (execution time, success rates)
  • Dependency visualization and cycle detection
  • Error tracking and failure propagation
  • Multi-provider metadata extraction (Groq, OpenAI, Anthropic with precise cost calculation and timing metrics)
  • Comprehensive cost tracking across all supported LLM providers

🏗️ Architecture

SwarmFlow is designed for production multi-agent systems with dead-simple usage:

User's Agent Functions → @swarm_task decorator → run() → Observability Dashboard
  • Minimal: Just decorator + run function
  • Scalable: Handles complex dependency graphs
  • Observable: Real-time monitoring and debugging
  • Resilient: Built-in retry logic and error handling

📊 Monitoring Dashboard

Get comprehensive insights into your multi-agent workflows:

  • Real-time execution monitoring
  • Performance analytics and optimization
  • Error tracking and debugging
  • Cost analysis for LLM usage (auto-calculated across all providers)
  • Workflow visualization and dependency graphs
  • Multi-provider metadata extraction (Groq, OpenAI, Anthropic with comprehensive model support)
  • DAG run tracking with unique run_id for grouping and analytics

🚀 Deployment Configuration

API Key Authentication

SwarmFlow supports API key authentication for secure trace reporting:

# Option 1: Environment variable (recommended)
export SWARMFLOW_API_KEY="sk_abc123..."
run()  # Automatically picks up from environment

# Option 2: Pass directly
run(api_key="sk_abc123...")

# Option 3: No authentication (logs warning but continues)
run()  # Shows warning but executes normally

Backend Configuration

SwarmFlow automatically sends traces to http://localhost:8000/api/trace. For production deployment, update the backend URL in the SDK code to point to your centralized backend service.

🤝 Contributing

We welcome contributions! Please see our Contributing Guidelines.

📚 Documentation

For detailed documentation, visit: https://github.com/anirame128/swarmflow

📄 License

SDK License

The SwarmFlow SDK is licensed under the MIT License - see LICENSE file for details.

Backend Services

SwarmFlow backend services, dashboard, and infrastructure require separate service agreements and API keys. The SDK is designed to work with official SwarmFlow backend services only.

Why this model?

  • Free SDK: Developers can use the SDK without restrictions
  • Paid Services: Backend services and dashboard require API keys
  • Industry Standard: Follows the same model as Google Maps, Stripe, AWS SDKs
  • Developer Friendly: Maximizes adoption while protecting your business model

Keywords

ai

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts