Kestra Python Client
This Python client provides functionality to interact with the Kestra server for sending metrics, outputs, and logs, as well as executing/polling flows.
Installation
pip install kestra
Kestra Class
The Kestra
class is responsible for sending metrics, outputs, and logs to the Kestra server.
Methods
- send(map: dict): Sends a message to the Kestra server.
- format(map_: dict) -> str: Formats a message to be sent to the Kestra server.
- metrics(name: str, type: str, value: int, tags: dict | None = None): Sends a metric to the Kestra server.
- outputs(map_: dict): Sends outputs to the Kestra server.
- counter(name: str, value: int, tags: dict | None = None): Sends a counter to the Kestra server.
- timer(name: str, duration: int | Callable, tags: dict | None = None): Sends a timer to the Kestra server.
- logger() -> Logger: Retrieves the logger for the Kestra server.
Flow Class
The Flow
class is used to execute a Kestra flow and optionally wait for its completion. It can also be used to get the status of an execution and the logs of an execution.
Initialization
flow = Flow(
wait_for_completion=True,
poll_interval=1,
labels_from_inputs=False,
tenant=None
)
You can also set the hostname and authentication credentials using environment variables:
export KESTRA_HOSTNAME=http://localhost:8080
export KESTRA_USER=admin
export KESTRA_PASSWORD=admin
export KESTRA_API_TOKEN=my_api_token
It is worth noting that the KESTRA_API_TOKEN or KESTRA_USER and KESTRA_PASSWORD need to be used, you do not need all at once. The possible Authentication patterns are:
- KESTRA_API_TOKEN
- KESTRA_USER and KESTRA_PASSWORD
- No Authentication (not recommended for production environments)
Methods
- _make_request(method: str, url: str, **kwargs) -> requests.Response: Makes a request to the Kestra server with optional authentication and retries.
- check_status(execution_id: str) -> requests.Response: Checks the status of an execution.
- get_logs(execution_id: str) -> requests.Response: Retrieves the logs of an execution.
- execute(namespace: str, flow: str, inputs: dict = None) -> namedtuple: Executes a Kestra flow and optionally waits for its completion. The namedtuple returned is a namedtuple with the following properties:
- status: The status of the execution.
- log: The log of the execution.
- error: The error of the execution.
Usage Examples
-
Trigger a flow and wait for its completion:
from kestra import Flow
flow = Flow()
flow.execute('mynamespace', 'myflow', {'param': 'value'})
-
Set labels from inputs:
from kestra import Flow
flow = Flow(labels_from_inputs=True)
flow.execute('mynamespace', 'myflow', {'param': 'value'})
-
Pass a text file to an input of type FILE named 'myfile':
from kestra import Flow
flow = Flow()
with open('example.txt', 'rb') as fh:
flow.execute('mynamespace', 'myflow', {'files': ('myfile', fh, 'text/plain')})
-
Fire and forget:
from kestra import Flow
flow = Flow(wait_for_completion=False)
flow.execute('mynamespace', 'myflow', {'param': 'value'})
-
Overwrite the username and password:
from kestra import Flow
flow = Flow()
flow.user = 'admin'
flow.password = 'admin'
flow.execute('mynamespace', 'myflow')
-
Set the hostname, username, and password using environment variables:
from kestra import Flow
import os
os.environ["KESTRA_HOSTNAME"] = "http://localhost:8080"
os.environ["KESTRA_USER"] = "admin"
os.environ["KESTRA_PASSWORD"] = "admin"
flow = Flow()
flow.execute('mynamespace', 'myflow', {'param': 'value'})
Error Handling
The client includes retry logic with exponential backoff for certain HTTP status codes, and raises a FailedExponentialBackoff
exception if the request fails after multiple retries.
Kestra Class
Logging
The Kestra
class provides a logger that formats logs in JSON format, making it easier to integrate with log management systems.
from kestra import Kestra
Kestra.logger().info("Hello, world!")
Outputs
The Kestra
class provides a method to send key-value-based outputs to
the Kestra server. If you want to output large objects, write them to a
file and specify them within the outputFiles
property of the Python
script task.
Kestra.outputs({"my_output": "my_value"})
Counters
The Kestra
class provides a method to send counter metrics to the Kestra server.
Kestra.counter("my_counter", 1)
Timers
The Kestra
class provides a method to send timer metrics to the Kestra server.
Kestra.timer("my_timer", 1)