AirflowのPriority Weightsを使ったタスクの優先制御
このエントリでは、
AirflowのPriority Weightsを使ったタスクの優先制御を試してみます。
参考: Priority Weights | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/priority-weight.html
参考に上げたページの通り、absolute
は、キューに投入された順にタスクが処理され、downstream
は、後続が多いタスクほど優先度が高くなります(upstream
は逆)。
既定はdownstream
。
これ以外にも独自ルールを定義出来るので、
このエントリでは、SLAに基づいた優先制御を考えたいと思います。
作成する独自ルールでは、
タスクを完了させるべき時間までの猶予時間が少ないほど優先度を高くします。
動作確認したAirflowのversionは、3.0.0です。
ファイル構成とサンプルコード
ファイル構成と試してみたサンプルコードです。
ファイル構成
root/
- dags/
- priority_study.py
- plugins/
- priority_weight_strategy.py
サンプルコード
dags/priority_study.py
import time
import pendulum
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import DAG
with DAG(
dag_id="priority_study",
start_date=pendulum.datetime(2025, 5, 1),
catchup=False,
schedule="0 */2 * * *",
max_active_runs=1,
default_args={
"weight_rule": "priority_weight_strategy.DeadlinePriorityStrategy",
},
):
def _sleep(**context):
time.sleep(3)
start = EmptyOperator(task_id="start")
task1_1 = PythonOperator(
task_id="task1_1",
python_callable=_sleep,
params={"sla_timedelta": "02:00:00"},
)
task2_1 = PythonOperator(
task_id="task2_1",
python_callable=_sleep,
params={"sla_timedelta": "01:00:00"},
)
task2_2 = PythonOperator(
task_id="task2_2",
python_callable=_sleep,
params={"sla_timedelta": "01:00:00"},
)
task2_3 = PythonOperator(
task_id="task2_3",
python_callable=_sleep,
params={"sla_timedelta": "01:00:00"},
)
task2_4 = PythonOperator(
task_id="task2_4",
python_callable=_sleep,
params={"sla_timedelta": "01:00:00"},
)
start >> [task1_1, task2_1]
task2_1 >> task2_2 >> task2_3 >> task2_4
plugins/priority_weight_strategy.py
import datetime
import pendulum
from airflow.models import TaskInstance
from airflow.plugins_manager import AirflowPlugin
from airflow.task.priority_strategy import PriorityWeightStrategy
class DeadlinePriorityStrategy(PriorityWeightStrategy):
def get_weight(self, ti: TaskInstance):
if not ti.dag_run:
return 1
sla_timedelta_dt = datetime.datetime.strptime(
ti.task.params["sla_timedelta"], "%H:%M:%S"
)
sla_datetime = ti.dag_run.data_interval_end + datetime.timedelta(
hours=sla_timedelta_dt.hour,
minutes=sla_timedelta_dt.minute,
seconds=sla_timedelta_dt.second,
)
sla_min = (sla_datetime - pendulum.now()).total_seconds() / 60
weight = max(min(int(120 - sla_min), 120), 1)
return weight
class DeadlinePriorityWeightStrategyPlugin(AirflowPlugin):
name = "priority_weight_strategy.DeadlinePriorityStrategy"
priority_weight_strategies = [DeadlinePriorityStrategy]
簡単な説明
plugins/priority_weight_strategy.py
で独自ルールを定義しています。
Priority Weightは次の式で定義します。
- (Priority Weight) = 120 - {(タスクを完了させるべき時間) - (現在時刻)}
- (タスクを完了させるべき時間) = data_interval_end + sla_timedelta
- ※Priority Weightは1〜120の範囲とするため、0以下は1、120以上は120に丸めます。
dags/priority_study.py
は独自ルールを利用するDAGの例です。
paramsのsla_timedelta
で、SLAを指定しています。
task1_* が sla_timedelta=02:00:00
task2_* が sla_timedelta=01:00:00
なので、task_2*が先に実行されることになります。
動作確認する時は、PoolのSlotsを1にすると分かりやすいと思います。
以上。