AirflowのDAG単体テストについて、
かなり前にimportのテストのやり方は書いたのですが、
DAGの実行をともなう単体テストに触れていなかったので、
このエントリに記載することにします。

AirflowのDAG単体テストの事始め | takemikami.com
https://takemikami.com/2021/1205-airflowdag.html

環境の準備

リポジトリ・Python仮想環境を作成します。

$ mkdir airflow-unittest && cd $_
$ python -m venv venv
$ . venv/bin/activate

Airflowをインストールします。
インストール方法は、次のサイトに従います。

Quick Start | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/start.html

$ AIRFLOW_VERSION=2.10.2
$ PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"
$ CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
$ pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

単体テストの実行のためにpytest、
DAGを呼び出す時間を制御するためにfreezegun、
をインストールしておきます。

$ pip install pytest freezegun

テスト対象の実装

テスト対象のDAGを実装します。

このDAGでは、以下の処理を実装しています。

  • LatestOnlyOperatorで過去分の実行はスキップ
  • FileSensorで入力ファイルを待ち合わせ
  • EmptyOperatorは何もしない(実際にはなんらかのデータ処理が入る)

dags/sample.py

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago

with DAG(
    "sample",
    start_date=days_ago(3),
    schedule="0 * * * *",
) as dag:
    latest_only = LatestOnlyOperator(task_id="latest_only")
    file_sensor = FileSensor(task_id="file_sensor", filepath="/tmp/datafile")
    empty = EmptyOperator(task_id="empty")
    latest_only >> file_sensor >> empty

テストの実装

テストを実装します。

このテストでは、以下の処理を実装しています。

  • tests/conftest.py
    • Airflowの実行を制御するための環境変数の指定
    • テスト実行前のDB初期化 (reset_db)
    • freezegunで時間固定した時点でimportしたDAGを返却するfixture (freezegun_dagbag)
  • tests/test_sample.py
    • importのテスト (test_impot_dags)
    • DAG実行のテスト (test_unittest)

tests/conftest.py

import os
import zoneinfo

import freezegun
import pytest

BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
os.environ["AIRFLOW_HOME"] = "/tmp/airflow-unittest"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = os.path.join(BASE_DIR, "dags")
os.environ["SQLALCHEMY_SILENCE_UBER_WARNING"] = "1"


@pytest.fixture(scope="session", autouse=True)
def reset_db() -> None:
    from airflow.utils import db

    db.resetdb()


@pytest.fixture()
def freezegun_dagbag(request):
    from airflow.models import DagBag

    freeze_date = request.param
    with freezegun.freeze_time(freeze_date.astimezone(zoneinfo.ZoneInfo(key="UTC"))):
        yield DagBag()

tests/test_sample.py

import datetime
import itertools
import zoneinfo

import pendulum
import pytest
from airflow.models import DagBag, DagRun


def test_impot_dags():
    dag_id = "sample"
    dagbag = DagBag()
    assert len(dagbag.import_errors) == 0, "DAG import failures. Errors: {}".format(
        dagbag.import_errors
    )
    assert dagbag.get_dag(dag_id=dag_id) is not None


@pytest.mark.parametrize(
    "freezegun_dagbag, execution_date, expected_tasks_states",
    [
        (
            datetime.datetime(2024, 9, 5, 10, 5, tzinfo=zoneinfo.ZoneInfo(key="UTC")),
            datetime.datetime(2024, 9, 5, 10, 0, tzinfo=zoneinfo.ZoneInfo(key="UTC")),
            {"empty": "success"},
        ),
        (
            datetime.datetime(2024, 9, 5, 11, 5, tzinfo=zoneinfo.ZoneInfo(key="UTC")),
            datetime.datetime(2024, 9, 5, 10, 0, tzinfo=zoneinfo.ZoneInfo(key="UTC")),
            {"empty": "skipped"},
        ),
    ],
    indirect=["freezegun_dagbag"],
    ids=["normal", "backfill"],
)
def test_unittest(freezegun_dagbag, execution_date, expected_tasks_states):
    dag_id = "sample"
    dag = freezegun_dagbag.get_dag(dag_id=dag_id)
    resp: DagRun = dag.test(
        execution_date=pendulum.instance(
            execution_date.astimezone(zoneinfo.ZoneInfo(key="UTC"))
        ),
        mark_success_pattern=".*_sensor",
    )
    task_status = {
        id: ",".join(set([s[1] for s in statuses]))
        for id, statuses in itertools.groupby(
            sorted(
                [(e.task_id.split(".")[0], e.state) for e in resp.get_task_instances()],
                key=lambda x: x[0],
            ),
            key=lambda x: x[0],
        )
    }
    assert {
        k: task_status[k] for k in task_status if k in expected_tasks_states
    } == expected_tasks_states

dag.test() ではDAGの実行をともなう単体テストを行います。
指定しているパラメータは、次の通りとなります。

execution_dateには、実行時間を指定します。
この例の場合は、
execution_dateに 2024/09/05 10:00:00 を指定して、
data_interval_start/endが、2024/09/05 09:00:00~2024/09/05 10:00:00
の処理を実行しています。
execution_dateが含まれる1つ前のdata_intervalが実行されるという事になります。
# data_intervalは末尾の時点を含まないので、
# 2024/09/05 10:00:00は、2024/09/05 10:00:00~2024/09/05 11:00:00に含まれます

mark_success_patternには、処理を行わずに成功させるタスクIDを正規表現で指定します。
この例では、.*_sensorとして、file_sensorに適用しています。
これによってfile_sensorで指定したファイルの有無にかかわらずタスクが成功します。
外部システムに接続が必要なタスクなど、単体テストの対象としにくいタスクは、
固定で成功させておくと都合がよいです。

テストの実行

単体テストを実行します。

$ pytest tests

以上。