Security News
Research
Data Theft Repackaged: A Case Study in Malicious Wrapper Packages on npm
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
argo-workflow-tools is a set of tools intended to easue the usage of argo for data science and data engineerign workflows
argo-workflow-tools is published to the Python Package Index (PyPI) under the name argo-workflow-tools. To install it, run:
pip install argo-workflow-tools
Argo Submitter is an easy to use argo client that allows data scientists to easily execute and control Argo Workflows from code and interactive notebooks.
The simplest way to submit a new workflow is by running a workflow from template
ARGO_CLIENT = 'http://localhost:2746'
client = ArgoClient(ARGO_CLIENT, options=ArgoOptions(client_side_validation=False, namespace='argo'))
result = client.submit('test-workflow', params={'message':'hello world'})
result.wait_for_completion()
You can wait for template completion by setting wait=True parameter, or calling wait_for_completion()
result = client.submit('test-workflow', params={'message':'hello world'}, wait=True)
You may send parameters, through the params dictionary
result = client.submit('test-workflow', params={'message':'hello world'}, wait=True)
You send objects as parameters, and they will be automatically serialized to json.
ARGO_CLIENT = 'http://localhost:2746'
client = ArgoClient(ARGO_CLIENT, options=ArgoOptions(client_side_validation=False, namespace='argo'))
result = client.submit('test-workflow',
params={'name':
{'first':'Lorne','last':'Malvo'}
},
wait=True)
if you have a custom workflow manifest , you can run it by using create
result = client.create(workflow_manifest, wait=True)
You can check the status of a workflow by calling the status field
result.status
You can fetch output parametes and artifacts throut the output field
print(result.outputs['message'])
As well as reach artifacts through the s3 path property
pandas.read_csv(result.outputs['users'].s3)
You may cancel a running flow through the cancel method
result.cancel()
You may ssuspend, resume or cancel your workflow at any time
result = client.submit('test-workflow', params={'message':'hello world'}, wait=False)
result.suspend()
...
result.resume()
You can retry a failing workflow through the retry method
result.retry()
Fargo is a library for autoring Argo Workflows in a Python and friendly way. The main goal of Hera are
pythonic DSL is an opinionate subset of writing Argo workflows, it favors simplicity, readability and "pythonic flow" over leveraging the entire capability set Argo Workflows brings.
####Hello World
def say_hello(name):
message = f"hello {name}"
print(message)
return message
say_hello("Brian")
to run this simple task in argo all we need to do is to decorate our code in a task decorator
@dsl.Task(image="python3:3.10")
def say_hello(name:str):
message = f"hello {name}"
print(message)
return message
workflow = Workflow(name="hello-world", entrypoint=say_hello, arguments={"name": "Brian"})
print(workflow.to_yaml())
DAG functions are functions that define a workflow by calling other tasks or nested DAGs, We support task depndency declaration implicitly by analyzing inputs and outputs of each task.
When writing DAG functions make sure you keep it a simple as possible, call only DAG or TASK flows.
@dsl.Task(image="python:3.10")
def multiply_task(x: int):
return x * 2
@dsl.Task(image="python:3.10")
def sum_task(x: int, y: int):
return x + y
@dsl.DAG()
def diamond(num: int):
a = multiply_task(num)
b = multiply_task(a)
c = multiply_task(num)
return sum_task(b, c)
In case a task does not return a parameter, you can set an explicit dependency by sending wait_for argument to the next task
@dsl.Task(image="python:3.10")
def print_task():
print(f"text")
@dsl.DAG()
def diamond():
a = print_task()
b = print_task(wait_for=a)
c = print_task(wait_for=a)
print_task(wait_for=[c, b])
we support map reduce workflows through [for in]
loop, the iterable input must be a parameter, an output of a previous task, or a sequence object.
currently only one level of nesting is supported, in case you wish to use nested loops, extract the second loop into a fucntion and decorate it as well with a DAG decorater.
@dsl.Task(image="python:3.10")
def generate_list(partitions: int, partition_size: int):
items = []
for i in range(partitions):
items.append(list(range(1, partition_size)))
return items
@dsl.Task(image="python:3.10")
def sum_task(items:list[int]):
return sum(items)
@dsl.DAG()
def map_reduce(partitions, partition_size):
partition_list = generate_list(partitions, partition_size)
partition_sums = [sum_task(partition) for partition in partition_list]
return sum_task(partition_sums)
we support conditional task run by employing the 'with condition' syntax
@Task(image="python:3.10")
def say_hello(name: str):
message = f"hello {name}"
print(message)
return message
@Task(image="python:3.10")
def say_goodbye(name: str):
message = f"goodbye {name}"
print(message)
return message
@DAG()
def command_hello(name, command):
with Condition().equals(command, "hello"):
say_hello(name)
with Condition().equals(command, "goodbye"):
say_goodbye(name)
Have any feedback? Wish to implement an extenstion or new capability? Want to help us make argo better and easier to use? Every contribution to Argo Workflow Tools is greatly appreciated.
FAQs
A suite of tools to ease ML pipeline development with Argo Workflows
We found that argo-workflow-tools demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 2 open source maintainers collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Security News
Research
The Socket Research Team breaks down a malicious wrapper package that uses obfuscation to harvest credentials and exfiltrate sensitive data.
Research
Security News
Attackers used a malicious npm package typosquatting a popular ESLint plugin to steal sensitive data, execute commands, and exploit developer systems.
Security News
The Ultralytics' PyPI Package was compromised four times in one weekend through GitHub Actions cache poisoning and failure to rotate previously compromised API tokens.