New Research: Supply Chain Attack on Axios Pulls Malicious Dependency from npm.Details →
Socket
Book a DemoSign in
Socket

penguiflow

Package Overview
Dependencies
Maintainers
1
Versions
49
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

penguiflow

Async-first orchestration library for multi-agent and data pipelines

pipPyPI
Version
3.6.3
Maintainers
1

PenguiFlow

PenguiFlow logo

CI Status PyPI version Docs Benchmarks License

Async-first orchestration library for typed, reliable, concurrent workflows — from deterministic data pipelines to LLM agents.

Why PenguiFlow

  • Graph runtime: run async node graphs with bounded queues (backpressure).
  • Reliability controls: per-node timeouts + retries, plus per-trace cancellation and deadlines (envelope mode).
  • Streaming: emit partial output (StreamChunk) and a final answer with deterministic correlation.
  • Planner (ReactPlanner): JSON-first tool orchestration with pause/resume (HITL), parallel fan-out + joins, and trajectory logging.
  • Tool integrations: native + ToolNode (MCP / UTCP / HTTP) with auth and resilience patterns.

Concepts at a glance

  • Flow: a directed graph (runtime) you run(), emit() into, and fetch() results from.
  • Node: an async function + NodePolicy (validation, retries, timeout).
  • Message (recommended for production): Message(payload=..., headers=Headers(tenant=...), trace_id=...) enabling trace correlation, cancellation, deadlines, and streaming.
  • StateStore (optional): durability/audit/event persistence for distributed and “ops-ready” deployments.

Install

Requirements: Python 3.11+

pip install penguiflow

Common extras:

pip install "penguiflow[planner]"      # ReactPlanner + ToolNode integrations
pip install "penguiflow[a2a-server]"   # A2A HTTP+JSON server bindings
pip install "penguiflow[a2a-client]"   # A2A client bindings

If you use uv:

uv pip install penguiflow

Quickstart

1) Minimal typed flow (runtime)

from __future__ import annotations

import asyncio

from pydantic import BaseModel

from penguiflow import ModelRegistry, Node, NodePolicy, create


class In(BaseModel):
    text: str


class Out(BaseModel):
    upper: str


async def to_upper(msg: In, _ctx) -> Out:
    return Out(upper=msg.text.upper())


async def main() -> None:
    node = Node(to_upper, name="to_upper", policy=NodePolicy(validate="both"))

    registry = ModelRegistry()
    registry.register("to_upper", In, Out)

    flow = create(node.to())
    flow.run(registry=registry)

    await flow.emit(In(text="hello"))
    result: Out = await flow.fetch()
    await flow.stop()

    print(result.upper)


if __name__ == "__main__":
    asyncio.run(main())

2) ReactPlanner via CLI (fastest path)

uv run penguiflow new my-agent --template react
cd my-agent
uv sync
uv run penguiflow dev --project-root .

Documentation (canonical)

Suggested starting points (in-repo sources):

Stability, versioning, and public API

PenguiFlow follows a 2.x line and aims to follow SemVer with a clear public surface.

Contributing, security, and support

License

MIT — see LICENSE.

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts