Socket
Book a DemoInstallSign in
Socket

py-turbo

Package Overview
Maintainers
1
Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install
Source code not available
We could not scan this package. Some page functionalities have been disabled

py-turbo

A pipeline system for efficient execution.

pipPyPI
Metadata Only
Version
0.7.0
Maintainers
1

Pyturbo Package

PyPI version Downloads Publish to PyPI

Author: Lijun Yu

Email: lijun@lj-y.com

A pipeline system for efficient execution.

Installation

pip install py-turbo

Introduction

Pyturbo utilizes multiple level of abstract to efficiently execute parallel tasks.

  • Worker: a process.
  • Stage: a group of peer workers processing the same type of tasks.
  • Task: a data unit transferred between stages. At each stage, a task is processed by one worker and will result in one or multiple downstream tasks.
  • Pipeline: a set of sequential stages.
  • Job: a data unit for a pipeline, typically a wrapped task for the first stage.
  • Result: output of a job processed by one pipeline, typically a set of output tasks from the last stage.
  • System: a set of peer pipelines processing the same type of jobs.

abstract.png

Get Started

from pyturbo import ReorderStage, Stage, System

class Stage1(Stage): # Define a stage

    def allocate_resource(self, resources, ...):
        ... # Optional: split resources and determine number of workers.

    def process(self, task):
        ... # Process function for each worker process. Returns one or a series of downstream tasks.

... # Repeat for Stage2, Stage3

class Stage4(ReorderStage): # Define a reorder stage, typically for the final stage

    def get_sequence_id(self, task):
        ... # Return the order of each task. Start from 0.

    def process(self, task):
        ...

class MySystem(System):

    def get_stages(self, resources):
        ... # Define the stages in a pipeline with given resources.

    def get_results(self, results_gen):
        ... # Define how to extract final results from output tasks.

def main():
    system = MySystem(num_pipeline) # Set debug=True to run in a single process
    system.start() # Build and start system
    jobs = [...]
    system.add_jobs(jobs) # Submit jobs
    for job in system.wait_jobs(len(jobs)):
        print(job.results) # Process result
    system.end() # End system

Options

See options.md

Demo

abstract.png

See demo.py for an example implementation.

Version History

See version.md.

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts