AirflowのDAGの単体テストをtaskを置換して実行する
このエントリでは、
AirflowのDAG単体テストを行う時に、
外部システムに依存するtaskがあり、その結果に後続処理が影響を受ける場合、
どのように単体テストを書けば良いかを考えたいと思います。
以下のエントリで書いたとおり、
taskを成功させる場合は、mark_success_pattern
にtaskを指定するだけで対応できます。
しかし、失敗時のdownstreamののタスクの挙動を確認したい場合に対応できません。
dag.test()によるAirflowのDAGの単体テストの書き方 | takemikami.com
https://takemikami.com/2024/0929-airflow-dag-test.html
そこでこのエントリでは、dag.test()を実行する前にtaskを置換して、
強制的に成功・失敗させる方法でテストすることを考えます。
動作確認環境:
- apache-airflow==2.10.2
- pytest==8.3.3
テスト対象の実装
テスト対象のDAGを実装します。
このDAGでは、以下の処理を実装しています。
- FileSensorで入力ファイルを待ち合わせ
- EmptyOperatorは何もしない(実際にはなんらかのデータ処理が入る)
dags/sample.py
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago
with DAG(
"sample",
start_date=days_ago(3),
schedule="0 * * * *",
) as dag:
file_sensor = FileSensor(task_id="file_sensor", filepath="/tmp/datafile")
empty = EmptyOperator(task_id="empty")
file_sensor >> empty
テストの実装
次にテストを実装します。
tests/conftest.py
import os
import pytest
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
os.environ["AIRFLOW_HOME"] = "/tmp/airflow-unittest"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = os.path.join(BASE_DIR, "dags")
os.environ["SQLALCHEMY_SILENCE_UBER_WARNING"] = "1"
@pytest.fixture(scope="session", autouse=True)
def reset_db() -> None:
from airflow.utils import db
db.resetdb()
tests/test_sample.py
import datetime
import pendulum
from airflow.exceptions import AirflowException
from airflow.models import DagBag, DagRun
from airflow.operators.python import PythonOperator
def test_unittest():
dagbag = DagBag()
dag = dagbag.get_dag(dag_id="sample")
# taskの置換
def _fail_fn():
raise AirflowException()
task_id = "file_sensor"
file_sensor_task = dag.get_task(task_id)
new_op = PythonOperator(task_id=task_id, python_callable=_fail_fn)
new_op.downstream_task_ids = file_sensor_task.downstream_task_ids
new_op.upstream_task_ids = file_sensor_task.upstream_task_ids
dag._remove_task(task_id)
dag.add_task(new_op)
resp: DagRun = dag.test(
execution_date=pendulum.instance(datetime.datetime.now()),
)
task_status = {e.task_id: e.state for e in resp.get_task_instances()}
assert task_status["empty"] == "upstream_failed"
dag.test()
を実行する前に、
dagから本来のtaskをremove, 強制的に成功させるtaskをaddしています。
テストの実行
ここまでのようにテストを実装したら、単体テストを実行します。
$ pytest tests
先行のfile_sensorが失敗した影響で、
後続のemptyがupstream_failedとなり、
テストが成功することが確認できます。
以上。