このエントリでは、
Pythonのプロファイラをつかって、
Airflowのloadに時間がかかっているDAGを調べる方法を示します。

AirflowのBestPracticeにも、
(解析に時間がかかるので)DAGに不要な処理を書かないように言及されていますが、
既に時間がかかる状態になってしまっている場合に、
該当処理がどこなのかを探す方法も知っていると便利かと思います。

Top level Python Code | Best Practices
https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code

このエントリで確認したversion:

  • Python 3.11.9
  • apache-airflow 2.10.4

検査対象を用意する

Quick startを参考に、Airflowをインストールします。

参考: Quick Start | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/start.html

次のようなDAGを用意します。
# _slowとついているDAGには不要な処理があります

  • dags
    • example_python_operator.py
    • example_python_operator_slow.py

example_python_operator.py

import pendulum

from airflow import DAG
from airflow.decorators import task


def expensive_api_call():
    sleep(1000)
    return "Hello from Airflow!"


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        my_expensive_response = expensive_api_call()
        print(my_expensive_response)

example_python_operator.py_slow.py

import pendulum
import time

from airflow import DAG
from airflow.decorators import task


def expensive_api_call():
    print("Hello from Airflow!")
    time.sleep(5)


my_expensive_response = expensive_api_call()

with DAG(
    dag_id="example_python_operator_slow",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        print(my_expensive_response)

airflowを実行して、作成したDAGがloadできることを確認します。

$ export AIRFLOW__CORE__LOAD_EXAMPLES=False
$ export AIRFLOW__CORE__DAGS_FOLDER=$(pwd)/dags
$ airflow standalone

loadに時間がかかっているDAGを調べる

dagをloadするスクリプトを用意します。

import_dags.py

from airflow.models import DagBag
DagBag()

DAGのloadのプロファイリングを行います。

$ export AIRFLOW__CORE__LOAD_EXAMPLES=False
$ export AIRFLOW__CORE__DAGS_FOLDER=$(pwd)/dags
$ python -m cProfile import_dags.py > profile_result.txt

プロファイリング結果から、各DAGの所要時間を調べます。

$ grep import_dags.py profile_result.txt
        1    0.000    0.000    6.311    6.311 import_dags.py:1(<module>)
$ grep example_python_operator.py profile_result.txt
        1    0.000    0.000    0.044    0.044 example_python_operator.py:1(<module>)
$ grep example_python_operator_slow.py profile_result.txt
        1    0.000    0.000    5.137    5.137 example_python_operator_slow.py:1(<module>)
        1    0.000    0.000    5.136    5.136 example_python_operator_slow.py:8(expensive_api_call)

上記の結果から、次の通り各DAGのloadの所要時間がわかります。

  • 全体のload: 6.3111s
  • example_python_operatorのload: 0.044s
  • example_python_operator_slowのload: 5.137s

以上。