Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

coroflow

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

coroflow

Asynchronous pipeline builder

  • 4.0.1
  • PyPI
  • Socket score

Maintainers
1

Coroflow: Easy and Fast Pipelines

Coroflow makes it easy to run pipelines with coroutines and also support mixing in blocking functions and generators.

Coroflow does a lot of heavy-lifting for you:

  • Manage all tasks in the pipelinen concurently in one thread using coroutines
  • Pass data between tasks with queues
  • Easily specify concurrency limits
  • Connect stages of the pipeline with fan-out/fan-in patterns or load-balancer patterns
  • Define tasks as coroutines, normal (blocking) functions, async generators or normal generators; coroflow will run it appropriately in either the event-loop, a thread pool, or optionally in a processes pool
  • Provides an apache-ariflow-like api for connecting tasks

Getting Started

Coroflow makes it easy to run pipelines with coroutines and also support mixing in blocking functions and generators

    from coroflow import Node, Pipeline
    import asyncio
    import time


    class GenNode(Node):
        async def execute():
            """
            The execute method of the first/root Node has to be a generator,
            either async or synchronous.
            """
            for url in ['img_url_1', 'img_url_2', 'img_url_3']:
                print(f"Yielding {url}")
                await asyncio.sleep(1)
                yield url
            print("Generator is exhausted")
            return


    class DoSomething(Node):
        async def execute(inpt, param=None):
            """
            The execute method of all non-root Nodes should be a async
            or synchronous method.
            """
            # do your async pipelined work
            await asyncio.sleep(1)  # simulated IO delay
            outp = inpt
            print(f"func1: T1 sending {inpt}")
            return outp


    p = Pipeline()
    t0 = GenNode('gen', p)
    t1 = DoSomething('func1', p, kwargs={'param': 'param_t1'})
    t2 = DoSomething('func2', p, kwargs={'param': 'param_t2'})
    t0.set_downstream(t1)
    t1.set_downstream(t2)


    start_time = time.time()
    p.run()
    print(f"Asynchronous duration: {time.time() - start_time}s.")

Tests

Run like so:

$ pytest

Keywords

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc