このエントリでは、
AirflowのPriority Weightsを使ったタスクの優先制御を試してみます。

参考: Priority Weights | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/priority-weight.html

参考に上げたページの通り、
absoluteは、キューに投入された順にタスクが処理され、
downstreamは、後続が多いタスクほど優先度が高くなります(upstreamは逆)。
既定はdownstream

これ以外にも独自ルールを定義出来るので、
このエントリでは、SLAに基づいた優先制御を考えたいと思います。
作成する独自ルールでは、
タスクを完了させるべき時間までの猶予時間が少ないほど優先度を高くします。

動作確認したAirflowのversionは、3.0.0です。

ファイル構成とサンプルコード

ファイル構成と試してみたサンプルコードです。

ファイル構成

root/

  • dags/
    • priority_study.py
  • plugins/
    • priority_weight_strategy.py

サンプルコード

dags/priority_study.py

import time

import pendulum
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import DAG

with DAG(
    dag_id="priority_study",
    start_date=pendulum.datetime(2025, 5, 1),
    catchup=False,
    schedule="0 */2 * * *",
    max_active_runs=1,
    default_args={
        "weight_rule": "priority_weight_strategy.DeadlinePriorityStrategy",
    },
):

    def _sleep(**context):
        time.sleep(3)

    start = EmptyOperator(task_id="start")
    task1_1 = PythonOperator(
        task_id="task1_1",
        python_callable=_sleep,
        params={"sla_timedelta": "02:00:00"},
    )
    task2_1 = PythonOperator(
        task_id="task2_1",
        python_callable=_sleep,
        params={"sla_timedelta": "01:00:00"},
    )
    task2_2 = PythonOperator(
        task_id="task2_2",
        python_callable=_sleep,
        params={"sla_timedelta": "01:00:00"},
    )
    task2_3 = PythonOperator(
        task_id="task2_3",
        python_callable=_sleep,
        params={"sla_timedelta": "01:00:00"},
    )
    task2_4 = PythonOperator(
        task_id="task2_4",
        python_callable=_sleep,
        params={"sla_timedelta": "01:00:00"},
    )

    start >> [task1_1, task2_1]
    task2_1 >> task2_2 >> task2_3 >> task2_4

plugins/priority_weight_strategy.py

import datetime

import pendulum
from airflow.models import TaskInstance
from airflow.plugins_manager import AirflowPlugin
from airflow.task.priority_strategy import PriorityWeightStrategy


class DeadlinePriorityStrategy(PriorityWeightStrategy):
    def get_weight(self, ti: TaskInstance):
        if not ti.dag_run:
            return 1
        sla_timedelta_dt = datetime.datetime.strptime(
            ti.task.params["sla_timedelta"], "%H:%M:%S"
        )
        sla_datetime = ti.dag_run.data_interval_end + datetime.timedelta(
            hours=sla_timedelta_dt.hour,
            minutes=sla_timedelta_dt.minute,
            seconds=sla_timedelta_dt.second,
        )
        sla_min = (sla_datetime - pendulum.now()).total_seconds() / 60
        weight = max(min(int(120 - sla_min), 120), 1)
        return weight


class DeadlinePriorityWeightStrategyPlugin(AirflowPlugin):
    name = "priority_weight_strategy.DeadlinePriorityStrategy"
    priority_weight_strategies = [DeadlinePriorityStrategy]

簡単な説明

plugins/priority_weight_strategy.pyで独自ルールを定義しています。

Priority Weightは次の式で定義します。

  • (Priority Weight) = 120 - {(タスクを完了させるべき時間) - (現在時刻)}
  • (タスクを完了させるべき時間) = data_interval_end + sla_timedelta
  • ※Priority Weightは1〜120の範囲とするため、0以下は1、120以上は120に丸めます。

dags/priority_study.pyは独自ルールを利用するDAGの例です。
paramsのsla_timedeltaで、SLAを指定しています。
task1_* が sla_timedelta=02:00:00
task2_* が sla_timedelta=01:00:00
なので、task_2*が先に実行されることになります。 動作確認する時は、PoolのSlotsを1にすると分かりやすいと思います。

以上。