
Security News
Axios Supply Chain Attack Reaches OpenAI macOS Signing Pipeline, Forces Certificate Rotation
OpenAI rotated macOS signing certificates after a malicious Axios package reached its CI pipeline in a broader software supply chain attack.
swarmflow
Advanced tools
A distributed multi-agent orchestration framework for building scalable AI workflows with comprehensive observability.
@swarm_task decorator and run() functionflow.memorypip install swarmflow
from swarmflow import swarm_task, run
@swarm_task
def fetch_data():
return "Some data from API"
@swarm_task
def process_data(fetch_data):
return f"Processed: {fetch_data}"
@swarm_task
def display_result(process_data):
print(f"Final result: {process_data}")
# Run workflow - that's it!
run()
That's it! No complex setup, no manual dependency management. SwarmFlow automatically:
@swarm_task(retries=3)
def unreliable_task():
# This task will retry up to 3 times on failure
pass
@swarm_task
def step1():
return "Step 1 completed"
@swarm_task
def step2():
return "Step 2 completed"
@swarm_task
def step3():
return "Step 3 completed"
@swarm_task
def final_step(step1, step2, step3):
# Dependencies automatically inferred from parameter names
return f"Combined: {step1}, {step2}, {step3}"
run()
from groq import Groq
from openai import OpenAI
from anthropic import Anthropic
@swarm_task
def groq_task():
client = Groq()
response = client.chat.completions.create(
model="llama-3-70b",
messages=[{"role": "user", "content": "Hello"}]
)
return response
@swarm_task
def openai_task():
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello"}]
)
return response
@swarm_task
def anthropic_task():
client = Anthropic()
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=100,
messages=[{"role": "user", "content": "Hello"}]
)
return response
# SwarmFlow automatically detects and extracts metadata from all providers:
# - Model name and provider identification
# - Token usage (prompt + completion tokens)
# - Precise cost calculation (USD) using current pricing
# - Timing metrics (queue, prompt, completion, total time)
# - All added to task.metadata automatically
# Example output with metadata:
# [SwarmFlow] Task: groq_task
# ↳ Status: success
# ↳ Duration: 1234 ms
# ↳ Output: <Groq ChatCompletion object>
# ↳ Metadata: {'provider': 'Groq', 'model': 'llama-3-70b', 'tokens_used': 150, 'cost_usd': 0.000089, 'queue_time_s': 0.1, 'prompt_time_s': 0.5, 'completion_time_s': 0.8, 'total_time_s': 1.4}
SwarmFlow automatically handles API keys with Martian-style simplicity:
# Option 1: Set environment variable
export SWARMFLOW_API_KEY="sk_abc123..."
run() # Automatically uses key from environment
# Option 2: Pass directly
run(api_key="sk_abc123...")
# Option 3: No key (logs warning but continues)
run() # Shows warning but executes normally
SwarmFlow now includes a powerful hooks system for custom orchestration logic and shared memory for cross-task state management:
from swarmflow import swarm_task, run
from swarmflow.hooks import write_output_to_memory, read_memory_into_arg, log_input_output
@swarm_task(before=log_input_output()[0], after=log_input_output()[1])
def fetch_data():
return "Some data from API"
@swarm_task(after=write_output_to_memory("processed_data"))
def process_data(fetch_data):
return f"Processed: {fetch_data}"
@swarm_task(before=read_memory_into_arg("processed_data", "input_data"))
def display_result(process_data, input_data=None):
print(f"Final result: {process_data}")
print(f"From memory: {input_data}")
# Run workflow - that's it!
run()
Available Hooks:
before: Execute before task runsafter: Execute after task succeedson_error: Execute when task failson_final: Execute after task completes (success or failure)Built-in Hook Utilities:
write_output_to_memory(key): Save task output to shared memoryread_memory_into_arg(mem_key, arg_name): Inject memory value into task argumentslog_input_output(): Log task inputs and outputsenforce_max_cost(max_usd): Abort if total cost exceeds limitset_flag_on_failure(flag_key): Set memory flag when task failsskip_if_flag_set(flag_key): Skip task if memory flag is TrueSet DAG-level policies for cost limits, abort conditions, and validation:
from swarmflow import swarm_task, run, SwarmFlow
# Create flow for policy configuration
flow = SwarmFlow()
flow.set_policy("max_cost", 0.10) # Abort if total cost > $0.10
flow.set_policy("abort_on_flag", "error_detected") # Abort if flag is True
flow.set_policy("require_outputs", ["final_result"]) # Abort if missing outputs
@swarm_task
def task1():
return "Task 1 result"
@swarm_task
def task2(task1):
return "Task 2 result"
# Run with policies enforced
run()
SwarmFlow automatically sends task traces to the SwarmFlow backend service at http://localhost:8000/api/trace for real-time monitoring and analytics.
Trace Structure:
{
"id": "task-uuid",
"run_id": "dag-run-uuid", // Consistent across all tasks in the same DAG run
"name": "task_name",
"status": "success|failure|retrying|skipped",
"duration_ms": 1234,
"output": "task output",
"metadata": {
"agent": "LLMProcessor",
"provider": "Groq",
"model": "llama-3-70b",
"tokens_used": 150,
"cost_usd": 0.000089
},
"dependencies": ["dep1", "dep2"],
"flow_memory": {"key": "value"}, // Shared memory state
"flow_policy": {"max_cost": 0.10} // Active policies
}
SwarmFlow automatically provides:
SwarmFlow is designed for production multi-agent systems with dead-simple usage:
User's Agent Functions → @swarm_task decorator → run() → Observability Dashboard
Get comprehensive insights into your multi-agent workflows:
SwarmFlow supports API key authentication for secure trace reporting:
# Option 1: Environment variable (recommended)
export SWARMFLOW_API_KEY="sk_abc123..."
run() # Automatically picks up from environment
# Option 2: Pass directly
run(api_key="sk_abc123...")
# Option 3: No authentication (logs warning but continues)
run() # Shows warning but executes normally
SwarmFlow automatically sends traces to http://localhost:8000/api/trace. For production deployment, update the backend URL in the SDK code to point to your centralized backend service.
We welcome contributions! Please see our Contributing Guidelines.
For detailed documentation, visit: https://github.com/anirame128/swarmflow
The SwarmFlow SDK is licensed under the MIT License - see LICENSE file for details.
SwarmFlow backend services, dashboard, and infrastructure require separate service agreements and API keys. The SDK is designed to work with official SwarmFlow backend services only.
Why this model?
FAQs
SwarmFlow: A distributed multi-agent orchestration framework
We found that swarmflow demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Security News
OpenAI rotated macOS signing certificates after a malicious Axios package reached its CI pipeline in a broader software supply chain attack.

Security News
Open source is under attack because of how much value it creates. It has been the foundation of every major software innovation for the last three decades. This is not the time to walk away from it.

Security News
Socket CEO Feross Aboukhadijeh breaks down how North Korea hijacked Axios and what it means for the future of software supply chain security.