Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

asyncio-buffered-pipeline

Package Overview
Dependencies
Maintainers
1
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

asyncio-buffered-pipeline

Parallelize pipelines of Python async iterables/generators

  • 0.0.8
  • PyPI
  • Socket score

Maintainers
1

asyncio-buffered-pipeline CircleCI Test Coverage

Parallelise pipelines of Python async iterables/generators.

Installation

pip install asyncio-buffered-pipeline

Usage / What problem does this solve?

If you have a chain of async generators, even though each is async, only one runs at any given time. For example, the below runs in (just over) 30 seconds.

import asyncio

async def gen_1():
    for value in range(0, 10):
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value

async def gen_2(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value * 2

async def gen_3(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value + 3

async def main():
    it_1 = gen_1()
    it_2 = gen_2(it_1)
    it_3 = gen_3(it_2)

    async for val in it_3:
        print(val)

asyncio.run(main())

The buffered_pipeline function allows you to make to a small change, passing each generator through its return value, to parallelise the generators to reduce this to (just over) 12 seconds.

import asyncio
from asyncio_buffered_pipeline import buffered_pipeline

async def gen_1():
    for value in range(0, 10):
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value

async def gen_2(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value * 2

async def gen_3(it):
    async for value in it:
        await asyncio.sleep(1)  # Could be a slow HTTP request
        yield value + 3

async def main():
    buffer_iterable = buffered_pipeline()
    it_1 = buffer_iterable(gen_1())
    it_2 = buffer_iterable(gen_2(it_1))
    it_3 = buffer_iterable(gen_3(it_2))

    async for val in it_3:
        print(val)

asyncio.run(main())

The buffered_pipeline ensures internal tasks are cancelled on any exception.

Buffer size

The default buffer size is 1. This is suitable if each iteration takes approximately the same amount of time. If this is not the case, you may wish to change it using the buffer_size parameter of buffer_iterable.

it = buffer_iterable(gen(), buffer_size=2)

Features

  • Only one task is created for each buffer_iterable, in which the iterable is iterated over, with its values stored in an internal buffer.

  • All the tasks of the pipeline are cancelled if any of the generators raise an exception.

  • If a generator raises an exception, the exception is propagated to calling code.

  • The buffer size of each step in the pipeline is configurable.

  • The "chaining" is not abstracted away. You still have full control over the arguments passed to each step, and you don't need to buffer each iterable in the pipeline if you don't want to: just don't pass those through buffer_iterable.

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc