You're Invited:Meet the Socket Team at BlackHat and DEF CON in Las Vegas, Aug 4-6.RSVP
Socket
Book a DemoInstallSign in
Socket

apache-airflow-client

Package Overview
Dependencies
Maintainers
0
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

apache-airflow-client

Apache Airflow API (Stable)

3.0.2
pipPyPI
Maintainers
0

Apache Airflow Python Client

Overview

To facilitate management, Apache Airflow supports a range of REST API endpoints across its objects. This section provides an overview of the API design, methods, and supported use cases.

Most of the endpoints accept JSON as input and return JSON responses. This means that you must usually add the following headers to your request:

Content-type: application/json
Accept: application/json

Resources

The term resource refers to a single type of object in the Airflow metadata. An API is broken up by its endpoint's corresponding resource. The name of a resource is typically plural and expressed in camelCase. Example: dagRuns.

Resource names are used as part of endpoint URLs, as well as in API parameters and responses.

CRUD Operations

The platform supports Create, Read, Update, and Delete operations on most resources. You can review the standards for these operations and their standard parameters below.

Some endpoints have special behavior as exceptions.

Create

To create a resource, you typically submit an HTTP POST request with the resource's required metadata in the request body. The response returns a 201 Created response code upon success with the resource's metadata, including its internal id, in the response body.

Read

The HTTP GET request can be used to read a resource or to list a number of resources.

A resource's id can be submitted in the request parameters to read a specific resource. The response usually returns a 200 OK response code upon success, with the resource's metadata in the response body.

If a GET request does not include a specific resource id, it is treated as a list request. The response usually returns a 200 OK response code upon success, with an object containing a list of resources' metadata in the response body.

When reading resources, some common query parameters are usually available. e.g.:

/api/v2/connections?limit=25&offset=25
Query ParameterTypeDescription
limitintegerMaximum number of objects to fetch. Usually 25 by default
offsetintegerOffset after which to start returning objects. For use with limit query parameter.

Update

Updating a resource requires the resource id, and is typically done using an HTTP PATCH request, with the fields to modify in the request body. The response usually returns a 200 OK response code upon success, with information about the modified resource in the response body.

Delete

Deleting a resource requires the resource id and is typically executing via an HTTP DELETE request. The response usually returns a 204 No Content response code upon success.

Conventions

  • Resource names are plural and expressed in camelCase.

  • Names are consistent between URL parameter name and field name.

  • Field names are in snake_case.

{
    \"name\": \"string\",
    \"slots\": 0,
    \"occupied_slots\": 0,
    \"used_slots\": 0,
    \"queued_slots\": 0,
    \"open_slots\": 0
}

Update Mask

Update mask is available as a query parameter in patch endpoints. It is used to notify the API which fields you want to update. Using update_mask makes it easier to update objects by helping the server know which fields to update in an object instead of updating all fields. The update request ignores any fields that aren't specified in the field mask, leaving them with their current values.

Example:

import requests

resource = requests.get("/resource/my-id").json()
resource["my_field"] = "new-value"
requests.patch("/resource/my-id?update_mask=my_field", data=json.dumps(resource))

Versioning and Endpoint Lifecycle

  • API versioning is not synchronized to specific releases of the Apache Airflow.
  • APIs are designed to be backward compatible.
  • Any changes to the API will first go through a deprecation phase.

Trying the API

You can use a third party client, such as curl, HTTPie, Postman or the Insomnia rest client to test the Apache Airflow API.

Note that you will need to pass authentication credentials. If your Airflow deployment supports Bearer token authentication, you can use the following example:

For example, here is how to pause a DAG with curl, using a Bearer token:

curl -X PATCH 'https://example.com/api/v2/dags/{dag_id}?update_mask=is_paused' \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Bearer YOUR_ACCESS_TOKEN' \
  -d '{
      \"is_paused\": true
  }'

Using a graphical tool such as Postman or Insomnia, it is possible to import the API specifications directly:

  • Download the API specification by clicking the Download button at top of this document.
  • Import the JSON specification in the graphical tool of your choice.
  • In Postman, you can click the import button at the top
  • With Insomnia, you can just drag-and-drop the file on the UI

Note that with Postman, you can also generate code snippets by selecting a request and clicking on the Code button.

Enabling CORS

Cross-origin resource sharing (CORS) is a browser security feature that restricts HTTP requests that are initiated from scripts running in the browser.

For details on enabling/configuring CORS, see Enabling CORS.

Authentication

To be able to meet the requirements of many organizations, Airflow supports many authentication methods, and it is even possible to add your own method.

The default is to deny all requests.

For details on configuring the authentication, see API Authorization.

Errors

We follow the error response format proposed in RFC 7807 also known as Problem Details for HTTP APIs. As with our normal API responses, your client must be prepared to gracefully handle additional members of the response.

Unauthenticated

This indicates that the request has not been applied because it lacks valid authentication credentials for the target resource. Please check that you have valid credentials.

PermissionDenied

This response means that the server understood the request but refuses to authorize it because it lacks sufficient rights to the resource. It happens when you do not have the necessary permission to execute the action you performed. You need to get the appropriate permissions in other to resolve this error.

BadRequest

This response means that the server cannot or will not process the request due to something that is perceived to be a client error (e.g., malformed request syntax, invalid request message framing, or deceptive request routing). To resolve this, please ensure that your syntax is correct.

NotFound

This client error response indicates that the server cannot find the requested resource.

MethodNotAllowed

Indicates that the request method is known by the server but is not supported by the target resource.

NotAcceptable

The target resource does not have a current representation that would be acceptable to the user agent, according to the proactive negotiation header fields received in the request, and the server is unwilling to supply a default representation.

AlreadyExists

The request could not be completed due to a conflict with the current state of the target resource, e.g. the resource it tries to create already exists.

Unknown

This means that the server encountered an unexpected condition that prevented it from fulfilling the request.

This Python package is automatically generated by the OpenAPI Generator project:

  • API version: 2.9.0
  • Package version: 2.9.0
  • Build package: org.openapitools.codegen.languages.PythonClientCodegen

For more information, please visit https://airflow.apache.org

Requirements.

Python >=3.9

Installation & Usage

pip install

You can install the client using standard Python installation tools. It is hosted in PyPI with apache-airflow-client package id so the easiest way to get the latest version is to run:

pip install apache-airflow-client

If the python package is hosted on a repository, you can install directly using:

pip install git+https://github.com/apache/airflow-client-python.git

Import check

Then import the package:

import airflow_client.client

Getting Started

Before attempting the following examples ensure you have an account with API access. As an example you can create an account for usage with the API as follows using the Airflow CLI.

airflow users create -u admin-api -e admin-api@example.com -f admin-api -l admin-api -p $PASSWORD -r Admin

Please follow the installation procedure and then run the following:

import airflow_client.client
import requests
from airflow_client.client.rest import ApiException
from pprint import pprint
from pydantic import BaseModel


# What we expect back from auth/token
class AirflowAccessTokenResponse(BaseModel):
    access_token: str


# An optional helper function to retrieve an access token
def get_airflow_client_access_token(
    host: str,
    username: str,
    password: str,
) -> str:
    url = f"{host}/auth/token"
    payload = {
        "username": username,
        "password": password,
    }
    headers = {"Content-Type": "application/json"}
    response = requests.post(url, json=payload, headers=headers)
    if response.status_code != 201:
        raise RuntimeError(f"Failed to get access token: {response.status_code} {response.text}")
    response_success = AirflowAccessTokenResponse(**response.json())
    return response_success.access_token


# Defining the host is optional and defaults to http://localhost
# See configuration.py for a list of all supported configuration parameters.
host = "http://localhost"
configuration = airflow_client.client.Configuration(host=host)

# The client must configure the authentication and authorization parameters
# in accordance with the API server security policy.
# Examples for each auth method are provided below, use the example that
# satisfies your auth use case.

configuration.access_token = get_airflow_client_access_token(
    host=host,
    username="admin-api",
    password=os.environ["PASSWORD"],
)

# Enter a context with an instance of the API client
with airflow_client.client.ApiClient(configuration) as api_client:
    # Create an instance of the API class
    api_instance = airflow_client.client.AssetApi(api_client)
    create_asset_events_body = airflow_client.client.CreateAssetEventsBody()  # CreateAssetEventsBody |

    try:
        # Create Asset Event
        api_response = api_instance.create_asset_event(create_asset_events_body)
        print("The response of AssetApi->create_asset_event:\n")
        pprint(api_response)
    except ApiException as e:
        print("Exception when calling AssetApi->create_asset_event: %s\n" % e)

Documentation for API Endpoints

All URIs are relative to http://localhost

ClassMethodHTTP requestDescription
AssetApicreate_asset_eventPOST /api/v2/assets/eventsCreate Asset Event
AssetApidelete_asset_queued_eventsDELETE /api/v2/assets/{asset_id}/queuedEventsDelete Asset Queued Events
AssetApidelete_dag_asset_queued_eventDELETE /api/v2/dags/{dag_id}/assets/{asset_id}/queuedEventsDelete Dag Asset Queued Event
AssetApidelete_dag_asset_queued_eventsDELETE /api/v2/dags/{dag_id}/assets/queuedEventsDelete Dag Asset Queued Events
AssetApiget_assetGET /api/v2/assets/{asset_id}Get Asset
AssetApiget_asset_aliasGET /api/v2/assets/aliases/{asset_alias_id}Get Asset Alias
AssetApiget_asset_aliasesGET /api/v2/assets/aliasesGet Asset Aliases
AssetApiget_asset_eventsGET /api/v2/assets/eventsGet Asset Events
AssetApiget_asset_queued_eventsGET /api/v2/assets/{asset_id}/queuedEventsGet Asset Queued Events
AssetApiget_assetsGET /api/v2/assetsGet Assets
AssetApiget_dag_asset_queued_eventGET /api/v2/dags/{dag_id}/assets/{asset_id}/queuedEventsGet Dag Asset Queued Event
AssetApiget_dag_asset_queued_eventsGET /api/v2/dags/{dag_id}/assets/queuedEventsGet Dag Asset Queued Events
AssetApimaterialize_assetPOST /api/v2/assets/{asset_id}/materializeMaterialize Asset
BackfillApicancel_backfillPUT /api/v2/backfills/{backfill_id}/cancelCancel Backfill
BackfillApicreate_backfillPOST /api/v2/backfillsCreate Backfill
BackfillApicreate_backfill_dry_runPOST /api/v2/backfills/dry_runCreate Backfill Dry Run
BackfillApiget_backfillGET /api/v2/backfills/{backfill_id}Get Backfill
BackfillApilist_backfillsGET /api/v2/backfillsList Backfills
BackfillApipause_backfillPUT /api/v2/backfills/{backfill_id}/pausePause Backfill
BackfillApiunpause_backfillPUT /api/v2/backfills/{backfill_id}/unpauseUnpause Backfill
ConfigApiget_configGET /api/v2/configGet Config
ConfigApiget_config_valueGET /api/v2/config/section/{section}/option/{option}Get Config Value
ConnectionApibulk_connectionsPATCH /api/v2/connectionsBulk Connections
ConnectionApicreate_default_connectionsPOST /api/v2/connections/defaultsCreate Default Connections
ConnectionApidelete_connectionDELETE /api/v2/connections/{connection_id}Delete Connection
ConnectionApiget_connectionGET /api/v2/connections/{connection_id}Get Connection
ConnectionApiget_connectionsGET /api/v2/connectionsGet Connections
ConnectionApipatch_connectionPATCH /api/v2/connections/{connection_id}Patch Connection
ConnectionApipost_connectionPOST /api/v2/connectionsPost Connection
ConnectionApitest_connectionPOST /api/v2/connections/testTest Connection
DAGApidelete_dagDELETE /api/v2/dags/{dag_id}Delete Dag
DAGApiget_dagGET /api/v2/dags/{dag_id}Get Dag
DAGApiget_dag_detailsGET /api/v2/dags/{dag_id}/detailsGet Dag Details
DAGApiget_dag_tagsGET /api/v2/dagTagsGet Dag Tags
DAGApiget_dagsGET /api/v2/dagsGet Dags
DAGApipatch_dagPATCH /api/v2/dags/{dag_id}Patch Dag
DAGApipatch_dagsPATCH /api/v2/dagsPatch Dags
DAGParsingApireparse_dag_filePUT /api/v2/parseDagFile/{file_token}Reparse Dag File
DagReportApiget_dag_reportsGET /api/v2/dagReportsGet Dag Reports
DagRunApiclear_dag_runPOST /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/clearClear Dag Run
DagRunApidelete_dag_runDELETE /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}Delete Dag Run
DagRunApiget_dag_runGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}Get Dag Run
DagRunApiget_dag_runsGET /api/v2/dags/{dag_id}/dagRunsGet Dag Runs
DagRunApiget_list_dag_runs_batchPOST /api/v2/dags/{dag_id}/dagRuns/listGet List Dag Runs Batch
DagRunApiget_upstream_asset_eventsGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamAssetEventsGet Upstream Asset Events
DagRunApipatch_dag_runPATCH /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}Patch Dag Run
DagRunApitrigger_dag_runPOST /api/v2/dags/{dag_id}/dagRunsTrigger Dag Run
DagSourceApiget_dag_sourceGET /api/v2/dagSources/{dag_id}Get Dag Source
DagStatsApiget_dag_statsGET /api/v2/dagStatsGet Dag Stats
DagVersionApiget_dag_versionGET /api/v2/dags/{dag_id}/dagVersions/{version_number}Get Dag Version
DagVersionApiget_dag_versionsGET /api/v2/dags/{dag_id}/dagVersionsGet Dag Versions
DagWarningApilist_dag_warningsGET /api/v2/dagWarningsList Dag Warnings
EventLogApiget_event_logGET /api/v2/eventLogs/{event_log_id}Get Event Log
EventLogApiget_event_logsGET /api/v2/eventLogsGet Event Logs
ExtraLinksApiget_extra_linksGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/linksGet Extra Links
ImportErrorApiget_import_errorGET /api/v2/importErrors/{import_error_id}Get Import Error
ImportErrorApiget_import_errorsGET /api/v2/importErrorsGet Import Errors
JobApiget_jobsGET /api/v2/jobsGet Jobs
LoginApiloginGET /api/v2/auth/loginLogin
LoginApilogoutGET /api/v2/auth/logoutLogout
MonitorApiget_healthGET /api/v2/monitor/healthGet Health
PluginApiget_pluginsGET /api/v2/pluginsGet Plugins
PoolApibulk_poolsPATCH /api/v2/poolsBulk Pools
PoolApidelete_poolDELETE /api/v2/pools/{pool_name}Delete Pool
PoolApiget_poolGET /api/v2/pools/{pool_name}Get Pool
PoolApiget_poolsGET /api/v2/poolsGet Pools
PoolApipatch_poolPATCH /api/v2/pools/{pool_name}Patch Pool
PoolApipost_poolPOST /api/v2/poolsPost Pool
ProviderApiget_providersGET /api/v2/providersGet Providers
TaskApiget_taskGET /api/v2/dags/{dag_id}/tasks/{task_id}Get Task
TaskApiget_tasksGET /api/v2/dags/{dag_id}/tasksGet Tasks
TaskInstanceApiget_extra_linksGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/linksGet Extra Links
TaskInstanceApiget_logGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/logs/{try_number}Get Log
TaskInstanceApiget_mapped_task_instanceGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}Get Mapped Task Instance
TaskInstanceApiget_mapped_task_instance_triesGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/triesGet Mapped Task Instance Tries
TaskInstanceApiget_mapped_task_instance_try_detailsGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/tries/{task_try_number}Get Mapped Task Instance Try Details
TaskInstanceApiget_mapped_task_instancesGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/listMappedGet Mapped Task Instances
TaskInstanceApiget_task_instanceGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}Get Task Instance
TaskInstanceApiget_task_instance_dependenciesGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/dependenciesGet Task Instance Dependencies
TaskInstanceApiget_task_instance_dependencies_by_map_indexGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/dependenciesGet Task Instance Dependencies
TaskInstanceApiget_task_instance_triesGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/triesGet Task Instance Tries
TaskInstanceApiget_task_instance_try_detailsGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/tries/{task_try_number}Get Task Instance Try Details
TaskInstanceApiget_task_instancesGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstancesGet Task Instances
TaskInstanceApiget_task_instances_batchPOST /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/listGet Task Instances Batch
TaskInstanceApipatch_task_instancePATCH /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}Patch Task Instance
TaskInstanceApipatch_task_instance_by_map_indexPATCH /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}Patch Task Instance
TaskInstanceApipatch_task_instance_dry_runPATCH /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/dry_runPatch Task Instance Dry Run
TaskInstanceApipatch_task_instance_dry_run_by_map_indexPATCH /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/dry_runPatch Task Instance Dry Run
TaskInstanceApipost_clear_task_instancesPOST /api/v2/dags/{dag_id}/clearTaskInstancesPost Clear Task Instances
VariableApibulk_variablesPATCH /api/v2/variablesBulk Variables
VariableApidelete_variableDELETE /api/v2/variables/{variable_key}Delete Variable
VariableApiget_variableGET /api/v2/variables/{variable_key}Get Variable
VariableApiget_variablesGET /api/v2/variablesGet Variables
VariableApipatch_variablePATCH /api/v2/variables/{variable_key}Patch Variable
VariableApipost_variablePOST /api/v2/variablesPost Variable
VersionApiget_versionGET /api/v2/versionGet Version
XComApicreate_xcom_entryPOST /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntriesCreate Xcom Entry
XComApiget_xcom_entriesGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntriesGet Xcom Entries
XComApiget_xcom_entryGET /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}Get Xcom Entry
XComApiupdate_xcom_entryPATCH /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key}Update Xcom Entry

Documentation For Models

Documentation For Authorization

By default the generated client supports the three authentication schemes:

  • Basic
  • GoogleOpenID
  • Kerberos
  • OAuth2PasswordBearer

However, you can generate client and documentation with your own schemes by adding your own schemes in the security section of the OpenAPI specification. You can do it with Breeze CLI by adding the --security-schemes option to the breeze release-management prepare-python-client command.

Basic "smoke" tests

You can run basic smoke tests to check if the client is working properly - we have a simple test script that uses the API to run the tests. To do that, you need to:

  • install the apache-airflow-client package as described above
  • install rich Python package
  • download the test_python_client.py file
  • make sure you have test airflow installation running. Do not experiment with your production deployment
  • configure your airflow webserver to enable basic authentication In the [api] section of your airflow.cfg set:
[api]
auth_backend = airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth

You can also set it by env variable: export AIRFLOW__API__AUTH_BACKENDS=airflow.providers.fab.auth_manager.api.auth.backend.session,airflow.providers.fab.auth_manager.api.auth.backend.basic_auth

  • configure your airflow webserver to load example dags In the [core] section of your airflow.cfg set:
[core]
load_examples = True

You can also set it by env variable: export AIRFLOW__CORE__LOAD_EXAMPLES=True

  • optionally expose configuration (NOTE! that this is dangerous setting). The script will happily run with the default setting, but if you want to see the configuration, you need to expose it. In the [api] section of your airflow.cfg set:
[api]
expose_config = True

You can also set it by env variable: export AIRFLOW__API__EXPOSE_CONFIG=True

  • Configure your host/ip/user/password in the test_python_client.py file
import airflow_client

# get the access token from Airflow API Server via /auth/token
configuration = airflow_client.client.Configuration(host="http://localhost:8080", access_token=access_token)
  • Run scheduler (or dag file processor you have setup with standalone dag file processor) for few parsing loops (you can pass --num-runs parameter to it or keep it running in the background). The script relies on example DAGs being serialized to the DB and this only happens when scheduler runs with core/load_examples set to True.

  • Run webserver - reachable at the host/port for the test script you want to run. Make sure it had enough time to initialize.

Run python test_python_client.py and you should see colored output showing attempts to connect and status.

Notes for Large OpenAPI documents

If the OpenAPI document is large, imports in client.apis and client.models may fail with a RecursionError indicating the maximum recursion limit has been exceeded. In that case, there are a couple of solutions:

Solution 1: Use specific imports for apis and models like:

  • from airflow_client.client.api.default_api import DefaultApi
  • from airflow_client.client.model.pet import Pet

Solution 2: Before importing the package, adjust the maximum recursion limit as shown below:

import sys

sys.setrecursionlimit(1500)
import airflow_client.client
from airflow_client.client.api import *
from airflow_client.client.models import *

Authors

dev@airflow.apache.org

Keywords

Apache Airflow API (Stable)

FAQs

Did you know?

Socket

Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.

Install

Related posts