
Research
Malicious npm Packages Impersonate Flashbots SDKs, Targeting Ethereum Wallet Credentials
Four npm packages disguised as cryptographic tools steal developer credentials and send them to attacker-controlled Telegram infrastructure.
Apache Airflow utilities for running long-running or always-on jobs with supervisord
This library provides a configuration-driven way of generating supervisor configurations and airflow operators/sensors for long-running or always-on jobs. Configuration is managed by Pydantic, Hydra, and OmegaConf via the supervisor-pydantic library.
airflow-supervisor
can be installed in your airflow server environment and imported in your dag files. It provides two convenient top level DAG subclasses:
Supervisor
: creates a DAG representing a local supervisor instance running on the airflow worker node (underlying task will use PythonOperator
and BashOperator
to communicate between airflow and supervisor)SupervisorSSH
: creates a DAG representing a remote supervisor instance running on another machine (underlying tasks will use SSHOperator
to communicate between airflow and supervisor)We expose DAGs composed of a variety of tasks and sensors, which are exposed as a discrete pipeline of steps:
supervisord
configurationsupervisord
daemonsupervisorctl
supervisord
daemon from (2)This setup provides maximal configureability with a minimal requirements on the machine (for example, no requirements on an existing supervisord
daemon via e.g. systemd
). It also lets you hook your own tasks into any step of the process. For example, if we detect a process has died in step (5), you could configure your own task to take some custom action before/instead of the default restart of step 6.
Here is a nice overview of the DAG, with annotations for code paths and the actions taken by Supervisor:
More docs and code examples coming soon!
from airflow import DAG
from datetime import timedelta, datetime
from airflow_supervisor import SupervisorAirflowConfiguration, Supervisor, ProgramConfiguration
# Create supervisor configuration
cfg = SupervisorAirflowConfiguration(
working_dir="/data/airflow/supervisor",
config_path="/data/airflow/supervisor/supervisor.conf",
program={
"test": ProgramConfiguration(
command="bash -c 'sleep 14400; exit 1'",
)
},
)
# Create DAG as normal
with DAG(
dag_id="test-supervisor",
schedule=timedelta(days=1),
start_date=datetime(2024, 1, 1),
catchup=False,
) as dag:
# Link supervisor config to dag
supervisor = Supervisor(dag=dag, cfg=cfg)
airflow-config
# @package _global_
_target_: airflow_config.Configuration
default_args:
_target_: airflow_config.DefaultArgs
retries: 0
depends_on_past: false
all_dags:
_target_: airflow_config.DagArgs
start_date: "2024-01-01"
catchup: false
extensions:
supervisor:
_target_: airflow_supervisor.SupervisorAirflowConfiguration
port: 9091
working_dir: "/data/airflow/supervisor"
config_path: "/data/airflow/supervisor/supervisor.conf"
program:
test:
_target_: airflow_supervisor.ProgramConfiguration
command: "bash -c 'sleep 14400; exit 1'"
from datetime import timedelta
from airflow_config import load_config, DAG
from airflow_supervisor import Supervisor
config = load_config(config_name="airflow")
with DAG(
dag_id="test-supervisor",
schedule=timedelta(days=1),
config=config,
) as dag:
supervisor = Supervisor(dag=dag, cfg=config.extensions["supervisor"])
See supervisor-pydantic for reference.
SupervisorAirflowConfiguration
: Wrapper around supervisor_pydantic.SupervisorConvenienceConfiguration
, with added airflow-specific configurationSupervisorSSHAirflowConfiguration
: Wrapper around SupervisorAirflowConfiguration
, with added parameters for airflow's SSHOperator
classDiagram
SupervisorConvenienceConfiguration <|-- SupervisorAirflowConfiguration
SupervisorAirflowConfiguration <|-- SupervisorSSHAirflowConfiguration
class SupervisorConvenienceConfiguration {
supervisor_pydantic.SupervisorConvenienceConfiguration
}
class SupervisorAirflowConfiguration{
# PythonSensor arguments
check_interval: timedelta
check_timeout: timedelta
# HighAvailabilityOperator arguments
runtime: timedelta
endtime: time
maxretrigger: int
reference_date: str
# Airflow arguments
stop_on_exit: bool
cleanup: bool
restart_on_initial: bool
restart_on_retrigger: bool
}
class SupervisorSSHAirflowConfiguration {
command_prefix: str
# Airflow SSHOperator Arguments
ssh_operator_args: SSHOperatorArgs
}
[!NOTE] This library is built on supervisor-pydantic, which provides configuration elements for all supervisor structures, as well as self-contained tools for interacting with supervisor instances.
[!NOTE] This library was generated using copier from the Base Python Project Template repository.
FAQs
Supervisor operators and configuration for long-running tasks
We found that airflow-supervisor demonstrated a healthy version release cadence and project activity because the last version was released less than a year ago. It has 1 open source maintainer collaborating on the project.
Did you know?
Socket for GitHub automatically highlights issues in each pull request and monitors the health of all your open source dependencies. Discover the contents of your packages and block harmful activity before you install or update your dependencies.
Research
Four npm packages disguised as cryptographic tools steal developer credentials and send them to attacker-controlled Telegram infrastructure.
Security News
Ruby maintainers from Bundler and rbenv teams are building rv to bring Python uv's speed and unified tooling approach to Ruby development.
Security News
Following last week’s supply chain attack, Nx published findings on the GitHub Actions exploit and moved npm publishing to Trusted Publishers.