AirflowのOperatorに対する単体テストの書き方
この記事は、ソフトウェアテストの小ネタ Advent Calendar 2024 の18日目の記事です。
ソフトウェアテストの小ネタ Advent Calendar 2024
https://qiita.com/advent-calendar/2024/software-testing-koneta
このエントリでは、
AirflowのOperatorに対する単体テストの書き方について記載します。
基本的には、次のドキュメントに記載されている内容です。
Unit test | Best Practices | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#unit-tests
→ Unit test for custom operator
環境の準備
Python仮想環境作成、Airflow・pytestをインストールを行います。
$ mkdir airflow-operator-unittest && cd $_
$ python -m venv venv
$ . venv/bin/activate
$ AIRFLOW_VERSION=2.10.4
$ PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"
$ CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
$ pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
$ pip install pytest
参考: Quick Start | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/start.html
テスト対象の実装
テスト対象のOperatorを実装します。
ここでは、data_interval_endが日曜日の時に処理をスキップするOperatorを実装しました。
dags/operators/sunday_skip_operator.py
from typing import Iterable
from airflow.operators.branch import BaseBranchOperator
from airflow.utils.context import Context
class SundaySkipOperator(BaseBranchOperator):
def choose_branch(self, context: Context) -> str | Iterable[str]:
data_interval_end_dt = context["data_interval_end"]
if data_interval_end_dt.weekday() == 6:
return [] # Sunday
return list(context["task"].get_direct_relative_ids(upstream=False))
単体テストの実装
単体テストを実装します。
tests/unit/conftest.py
import os
import pytest
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
os.environ["AIRFLOW_HOME"] = "/tmp/airflow-unittest"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = os.path.join(BASE_DIR, "dags")
os.environ["SQLALCHEMY_SILENCE_UBER_WARNING"] = "1"
@pytest.fixture(scope="session", autouse=True)
def reset_db() -> None:
from airflow.utils import db
db.resetdb()
tests/unit/test_operator.py
import pendulum
import pytest
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.utils.state import DagRunState, TaskInstanceState
from airflow.utils.types import DagRunType
from operators.sunday_skip_operator import SundaySkipOperator
@pytest.mark.parametrize(
"data_interval_start, data_interval_end, expected_statuses",
[
(
pendulum.datetime(2024, 12, 14, tz="UTC"),
pendulum.datetime(2024, 12, 15, tz="UTC"), # Sunday
{
"branch": TaskInstanceState.SUCCESS,
"empty": TaskInstanceState.SKIPPED,
},
),
(
pendulum.datetime(2024, 12, 15, tz="UTC"),
pendulum.datetime(2024, 12, 16, tz="UTC"), # Monday
{
"branch": TaskInstanceState.SUCCESS,
"empty": TaskInstanceState.SUCCESS,
},
),
],
)
def test_operator_sunday_skip(
data_interval_start, data_interval_end, expected_statuses
):
with DAG(
dag_id="unittest_dag", schedule="@daily", start_date=data_interval_start
) as dag:
SundaySkipOperator(task_id="branch") >> EmptyOperator(task_id="empty")
dagrun = dag.create_dagrun(
state=DagRunState.RUNNING,
execution_date=data_interval_start,
data_interval=(data_interval_start, data_interval_end),
start_date=data_interval_start,
run_type=DagRunType.SCHEDULED,
)
for k in ["branch", "empty"]:
ti = dagrun.get_task_instance(task_id=k)
ti.task = dag.get_task(task_id=k)
ti.run(ignore_ti_state=True)
assert ti.state == expected_statuses[k], f"task_id={k}"
テストコードを見ると分かると思いますが、
テストコード内で、テスト対象Operatorを利用するDAGを定義して、
各タスクを逐次実行して処理ステータスを確認する流れでテストすることが可能です。
このエントリで記載した方法以外に、
dag.test()でDAGを実行するテストを書くこともできますが、
依存関係やスケジューリング以外の処理はOperatorとして切り出し、
DAG自体をシンプルに記載しておけば、
Operator側の単体テストで多くは事足りるのかなと思います。
# DAG実行をともなうテストは実行に時間がかかるという問題もあるので。
以上。