このエントリでは、
AirflowのDAG単体テストを行う時に、
外部システムに依存するtaskがあり、その結果に後続処理が影響を受ける場合、
どのように単体テストを書けば良いかを考えたいと思います。

以下のエントリで書いたとおり、
taskを成功させる場合は、mark_success_patternにtaskを指定するだけで対応できます。
しかし、失敗時のdownstreamののタスクの挙動を確認したい場合に対応できません。

dag.test()によるAirflowのDAGの単体テストの書き方 | takemikami.com
https://takemikami.com/2024/0929-airflow-dag-test.html

そこでこのエントリでは、dag.test()を実行する前にtaskを置換して、
強制的に成功・失敗させる方法でテストすることを考えます。

動作確認環境:

  • apache-airflow==2.10.2
  • pytest==8.3.3

テスト対象の実装

テスト対象のDAGを実装します。

このDAGでは、以下の処理を実装しています。

  • FileSensorで入力ファイルを待ち合わせ
  • EmptyOperatorは何もしない(実際にはなんらかのデータ処理が入る)

dags/sample.py

from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago

with DAG(
    "sample",
    start_date=days_ago(3),
    schedule="0 * * * *",
) as dag:
    file_sensor = FileSensor(task_id="file_sensor", filepath="/tmp/datafile")
    empty = EmptyOperator(task_id="empty")
    file_sensor >> empty

テストの実装

次にテストを実装します。

tests/conftest.py

import os

import pytest

BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
os.environ["AIRFLOW_HOME"] = "/tmp/airflow-unittest"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = os.path.join(BASE_DIR, "dags")
os.environ["SQLALCHEMY_SILENCE_UBER_WARNING"] = "1"


@pytest.fixture(scope="session", autouse=True)
def reset_db() -> None:
    from airflow.utils import db

    db.resetdb()

tests/test_sample.py

import datetime

import pendulum
from airflow.exceptions import AirflowException
from airflow.models import DagBag, DagRun
from airflow.operators.python import PythonOperator


def test_unittest():
    dagbag = DagBag()
    dag = dagbag.get_dag(dag_id="sample")

    # taskの置換
    def _fail_fn():
        raise AirflowException()

    task_id = "file_sensor"
    file_sensor_task = dag.get_task(task_id)
    new_op = PythonOperator(task_id=task_id, python_callable=_fail_fn)
    new_op.downstream_task_ids = file_sensor_task.downstream_task_ids
    new_op.upstream_task_ids = file_sensor_task.upstream_task_ids
    dag._remove_task(task_id)
    dag.add_task(new_op)

    resp: DagRun = dag.test(
        execution_date=pendulum.instance(datetime.datetime.now()),
    )

    task_status = {e.task_id: e.state for e in resp.get_task_instances()}
    assert task_status["empty"] == "upstream_failed"

dag.test()を実行する前に、
dagから本来のtaskをremove, 強制的に成功させるtaskをaddしています。

テストの実行

ここまでのようにテストを実装したら、単体テストを実行します。

$ pytest tests

先行のfile_sensorが失敗した影響で、
後続のemptyがupstream_failedとなり、
テストが成功することが確認できます。

以上。