Huge News!Announcing our $40M Series B led by Abstract Ventures.Learn More
Socket
Sign inDemoInstall
Socket

argo-workflow-tools

Package Overview
Dependencies
Maintainers
2
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

argo-workflow-tools

A suite of tools to ease ML pipeline development with Argo Workflows

  • 0.9.55
  • PyPI
  • Socket score

Maintainers
2

Argo Workflow Tools

argo-workflow-tools is a set of tools intended to easue the usage of argo for data science and data engineerign workflows Python Versions Supported

Installation

argo-workflow-tools is published to the Python Package Index (PyPI) under the name argo-workflow-tools. To install it, run:

pip install argo-workflow-tools

Argo Submitter

Argo Submitter is an easy to use argo client that allows data scientists to easily execute and control Argo Workflows from code and interactive notebooks.

Quick Start

Running workflows from templates

The simplest way to submit a new workflow is by running a workflow from template

ARGO_CLIENT = 'http://localhost:2746'
client = ArgoClient(ARGO_CLIENT, options=ArgoOptions(client_side_validation=False, namespace='argo'))
result = client.submit('test-workflow', params={'message':'hello world'})
result.wait_for_completion()

You can wait for template completion by setting wait=True parameter, or calling wait_for_completion()

result = client.submit('test-workflow', params={'message':'hello world'}, wait=True)

You may send parameters, through the params dictionary

result = client.submit('test-workflow', params={'message':'hello world'}, wait=True)

You send objects as parameters, and they will be automatically serialized to json.

ARGO_CLIENT = 'http://localhost:2746'
client = ArgoClient(ARGO_CLIENT, options=ArgoOptions(client_side_validation=False, namespace='argo'))
result = client.submit('test-workflow',
                                     params={'name':
                                                {'first':'Lorne','last':'Malvo'}
                                             },
                                     wait=True)
Running workflows from specification

if you have a custom workflow manifest , you can run it by using create

result = client.create(workflow_manifest, wait=True)
Working with workflow results

You can check the status of a workflow by calling the status field

result.status

You can fetch output parametes and artifacts throut the output field

print(result.outputs['message'])

As well as reach artifacts through the s3 path property

pandas.read_csv(result.outputs['users'].s3)
Controlling workflows

You may cancel a running flow through the cancel method

result.cancel()

You may ssuspend, resume or cancel your workflow at any time

result = client.submit('test-workflow', params={'message':'hello world'}, wait=False)
result.suspend()
...
result.resume()

You can retry a failing workflow through the retry method

result.retry()

Pythonic workflow DSL

Fargo is a library for autoring Argo Workflows in a Python and friendly way. The main goal of Hera are

  • Make Argo Workflows accessible by leveraging pythonic style of dag
  • Allow seamlless local runs, for debug or testing while maintaining the same codebase for running DAG's at scale

pythonic DSL is an opinionate subset of writing Argo workflows, it favors simplicity, readability and "pythonic flow" over leveraging the entire capability set Argo Workflows brings.

Concepts

  • task - atomic python code
  • DAG - atonic workflows code

Quick start

####Hello World


def say_hello(name):
    message = f"hello {name}"
    print(message)
    return message

say_hello("Brian")

to run this simple task in argo all we need to do is to decorate our code in a task decorator

@dsl.Task(image="python3:3.10")
def say_hello(name:str):
    message = f"hello {name}"
    print(message)
    return message


workflow = Workflow(name="hello-world", entrypoint=say_hello, arguments={"name": "Brian"})
print(workflow.to_yaml())

DAG

DAG functions are functions that define a workflow by calling other tasks or nested DAGs, We support task depndency declaration implicitly by analyzing inputs and outputs of each task.

When writing DAG functions make sure you keep it a simple as possible, call only DAG or TASK flows.

@dsl.Task(image="python:3.10")
def multiply_task(x: int):
    return x * 2


@dsl.Task(image="python:3.10")
def sum_task(x: int, y: int):
    return x + y


@dsl.DAG()
def diamond(num: int):
    a = multiply_task(num)
    b = multiply_task(a)
    c = multiply_task(num)
    return sum_task(b, c)

Explicit depndencies

In case a task does not return a parameter, you can set an explicit dependency by sending wait_for argument to the next task

@dsl.Task(image="python:3.10")
def print_task():
    print(f"text")


@dsl.DAG()
def diamond():
    a = print_task()
    b = print_task(wait_for=a)
    c = print_task(wait_for=a)
    print_task(wait_for=[c, b])

Loops

we support map reduce workflows through [for in] loop, the iterable input must be a parameter, an output of a previous task, or a sequence object.

currently only one level of nesting is supported, in case you wish to use nested loops, extract the second loop into a fucntion and decorate it as well with a DAG decorater.

@dsl.Task(image="python:3.10")
def generate_list(partitions: int, partition_size: int):
    items = []
    for i in range(partitions):
        items.append(list(range(1, partition_size)))

    return items


@dsl.Task(image="python:3.10")
def sum_task(items:list[int]):
    return sum(items)


@dsl.DAG()
def map_reduce(partitions, partition_size):
    partition_list = generate_list(partitions, partition_size)
    partition_sums = [sum_task(partition) for partition in partition_list]
    return sum_task(partition_sums)
Conditions

we support conditional task run by employing the 'with condition' syntax

    @Task(image="python:3.10")
    def say_hello(name: str):
        message = f"hello {name}"
        print(message)
        return message
    
    @Task(image="python:3.10")
    def say_goodbye(name: str):
        message = f"goodbye {name}"
        print(message)
        return message


    @DAG()
    def command_hello(name, command):
        with Condition().equals(command, "hello"):
            say_hello(name)
        with Condition().equals(command, "goodbye"):
            say_goodbye(name)

How to contribute

Have any feedback? Wish to implement an extenstion or new capability? Want to help us make argo better and easier to use? Every contribution to Argo Workflow Tools is greatly appreciated.

FAQs


Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts

SocketSocket SOC 2 Logo

Product

  • Package Alerts
  • Integrations
  • Docs
  • Pricing
  • FAQ
  • Roadmap
  • Changelog

Packages

npm

Stay in touch

Get open source security insights delivered straight into your inbox.


  • Terms
  • Privacy
  • Security

Made with ⚡️ by Socket Inc