You're Invited:Meet the Socket Team at BlackHat and DEF CON in Las Vegas, Aug 4-6.RSVP
Socket
Book a DemoInstallSign in
Socket

puffinflow

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

puffinflow

A powerful Python workflow orchestration framework with advanced resource management and observability

2.dev0
pipPyPI
Maintainers
1

🐧 PuffinFlow

PyPI version Python versions License: MIT

PuffinFlow is a powerful Python framework for developers who need to rapidly prototype LLM workflows and seamlessly transition them to production-ready systems.

Perfect for AI engineers, data scientists, and backend developers who want to focus on workflow logic rather than infrastructure complexity.

Get started

Install PuffinFlow:

pip install puffinflow

Then, create an agent using the state decorator:

from puffinflow import Agent, state

class DataProcessor(Agent):
    @state(cpu=2.0, memory=1024.0)
    async def fetch_data(self, context):
        """Fetch data from external source."""
        data = await get_external_data()
        context.set_variable("raw_data", data)
        return "validate_data" if data else "error"

    @state(cpu=1.0, memory=512.0)
    async def validate_data(self, context):
        """Validate the fetched data."""
        data = context.get_variable("raw_data")
        if self.is_valid(data):
            return "process_data"
        return "error"

    @state(cpu=4.0, memory=2048.0)
    async def process_data(self, context):
        """Process the validated data."""
        data = context.get_variable("raw_data")
        result = await self.transform_data(data)
        context.set_output("processed_data", result)
        return "complete"

# Run the agent
agent = DataProcessor("data-processor")
result = await agent.run()

For more information, see the Quickstart. Or, to learn how to build complex multi-agent workflows with coordination and observability, see the Advanced Examples.

Core benefits

PuffinFlow bridges the gap between quick prototyping and production deployment. Start building your LLM workflow in minutes, then scale to production without rewriting code:

Prototype to Production: Begin with simple agents and seamlessly add resource management, observability, and coordination as your needs grow.

Intelligent resource management: Automatically allocate and manage CPU, memory, and other resources based on state requirements with built-in quotas and limits.

Zero-config observability: Comprehensive monitoring with OpenTelemetry integration, custom metrics, distributed tracing, and real-time alerting that works out of the box.

Built-in reliability: Circuit breakers, bulkheads, and leak detection ensure robust operation under various failure conditions without additional configuration.

Agent coordination: Scale from single agents to complex multi-agent workflows with teams, pools, and orchestrators using the same simple API.

Production performance: Achieve 567,000+ operations/second with sub-millisecond latency, designed for real-world production workloads.

PuffinFlow's ecosystem

While PuffinFlow can be used standalone, it integrates with popular Python frameworks and tools:

FastAPI & Django — Seamlessly integrate PuffinFlow agents into web applications with built-in async support and resource management.

Celery & Redis — Enhance existing task queues with stateful workflows, advanced coordination, and comprehensive monitoring.

OpenTelemetry — Full observability stack with distributed tracing, metrics collection, and integration with monitoring platforms like Prometheus and Jaeger.

Kubernetes — Production-ready deployment with container orchestration, automatic scaling, and cloud-native observability.

Additional resources

  • Documentation: Complete guides and API reference
  • Examples: Ready-to-run code examples for common patterns
  • Advanced Guides: Deep dives into resource management, coordination, and observability
  • Benchmarks: Performance metrics and optimization guides

Real-World Examples

🔥 Image Processing Pipeline

class ImageProcessor(Agent):
    @state(cpu=2.0, memory=1024.0)
    async def resize_image(self, context):
        image_url = context.get_variable("image_url")
        resized = await resize_image(image_url, size=(800, 600))
        context.set_variable("resized_image", resized)
        return "add_watermark"

    @state(cpu=1.0, memory=512.0)
    async def add_watermark(self, context):
        image = context.get_variable("resized_image")
        watermarked = await add_watermark(image)
        context.set_variable("final_image", watermarked)
        return "upload_to_storage"

    @state(cpu=1.0, memory=256.0)
    async def upload_to_storage(self, context):
        image = context.get_variable("final_image")
        url = await upload_to_s3(image)
        context.set_output("result_url", url)
        return "complete"

🤖 ML Model Training

class MLTrainer(Agent):
    @state(cpu=8.0, memory=4096.0)
    async def train_model(self, context):
        dataset = context.get_variable("dataset")
        model = await train_neural_network(dataset)
        context.set_variable("model", model)
        context.set_output("accuracy", model.accuracy)

        if model.accuracy > 0.9:
            return "deploy_model"
        return "retrain_with_more_data"

    @state(cpu=2.0, memory=1024.0)
    async def deploy_model(self, context):
        model = context.get_variable("model")
        await deploy_to_production(model)
        context.set_output("deployment_status", "success")
        return "complete"

🔄 Multi-Agent Coordination

from puffinflow import create_team, AgentTeam

# Coordinate multiple agents
email_team = create_team([
    EmailValidator("validator"),
    EmailProcessor("processor"),
    EmailTracker("tracker")
])

# Execute with built-in coordination
result = await email_team.execute_parallel()

🎯 Use Cases

📊 Data Pipelines — Build resilient ETL workflows with automatic retries, resource management, and comprehensive monitoring.

🤖 ML Workflows — Orchestrate training pipelines, model deployment, and inference workflows with checkpointing and observability.

🌐 Microservices — Coordinate distributed services with circuit breakers, bulkheads, and intelligent load balancing.

⚡ Event Processing — Handle high-throughput event streams with backpressure control and automatic scaling.

📊 Performance

PuffinFlow is built for production workloads with excellent performance characteristics:

Core Performance Metrics

  • 567,000+ operations/second for basic agent operations
  • 27,000+ operations/second for complex data processing
  • 1,100+ operations/second for CPU-intensive tasks
  • Sub-millisecond state transition latency (0.00-1.97ms range)

Benchmark Results (Latest)

Operation TypeAvg LatencyThroughputUse Case
Agent State Transitions0.00ms567,526 ops/sBasic workflow steps
Data Processing0.04ms27,974 ops/sETL operations
Resource Management0.01ms104,719 ops/sMemory/CPU allocation
Async Coordination1.23ms811 ops/sMulti-agent workflows
CPU-Intensive Tasks0.91ms1,100 ops/sML training steps

Benchmarks run on: Linux WSL2, 16 cores, 3.68GB RAM, Python 3.12

View detailed benchmarks →

🤝 Community & Support

Acknowledgements

PuffinFlow is inspired by workflow orchestration principles and builds upon the Python async ecosystem. The framework emphasizes practical workflow management with production-ready features. PuffinFlow is built by Mohamed Ahmed, designed for developers who need reliable, observable, and scalable workflow orchestration.

📜 License

PuffinFlow is released under the MIT License. Free for commercial and personal use.

Ready to build production-ready workflows?

Get Started → | View Examples → | Join Community →

Keywords

workflow

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts