dag.test()によるAirflowのDAGの単体テストの書き方
AirflowのDAG単体テストについて、
かなり前にimportのテストのやり方は書いたのですが、
DAGの実行をともなう単体テストに触れていなかったので、
このエントリに記載することにします。
AirflowのDAG単体テストの事始め | takemikami.com
https://takemikami.com/2021/1205-airflowdag.html
環境の準備
リポジトリ・Python仮想環境を作成します。
$ mkdir airflow-unittest && cd $_
$ python -m venv venv
$ . venv/bin/activate
Airflowをインストールします。
インストール方法は、次のサイトに従います。
Quick Start | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/start.html
$ AIRFLOW_VERSION=2.10.2
$ PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"
$ CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
$ pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
単体テストの実行のためにpytest、
DAGを呼び出す時間を制御するためにfreezegun、
をインストールしておきます。
$ pip install pytest freezegun
テスト対象の実装
テスト対象のDAGを実装します。
このDAGでは、以下の処理を実装しています。
- LatestOnlyOperatorで過去分の実行はスキップ
- FileSensorで入力ファイルを待ち合わせ
- EmptyOperatorは何もしない(実際にはなんらかのデータ処理が入る)
dags/sample.py
from airflow import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from airflow.sensors.filesystem import FileSensor
from airflow.utils.dates import days_ago
with DAG(
"sample",
start_date=days_ago(3),
schedule="0 * * * *",
) as dag:
latest_only = LatestOnlyOperator(task_id="latest_only")
file_sensor = FileSensor(task_id="file_sensor", filepath="/tmp/datafile")
empty = EmptyOperator(task_id="empty")
latest_only >> file_sensor >> empty
テストの実装
テストを実装します。
このテストでは、以下の処理を実装しています。
- tests/conftest.py
- Airflowの実行を制御するための環境変数の指定
- テスト実行前のDB初期化 (reset_db)
- freezegunで時間固定した時点でimportしたDAGを返却するfixture (freezegun_dagbag)
- tests/test_sample.py
- importのテスト (test_impot_dags)
- DAG実行のテスト (test_unittest)
tests/conftest.py
import os
import zoneinfo
import freezegun
import pytest
BASE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
os.environ["AIRFLOW_HOME"] = "/tmp/airflow-unittest"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = os.path.join(BASE_DIR, "dags")
os.environ["SQLALCHEMY_SILENCE_UBER_WARNING"] = "1"
@pytest.fixture(scope="session", autouse=True)
def reset_db() -> None:
from airflow.utils import db
db.resetdb()
@pytest.fixture()
def freezegun_dagbag(request):
from airflow.models import DagBag
freeze_date = request.param
with freezegun.freeze_time(freeze_date.astimezone(zoneinfo.ZoneInfo(key="UTC"))):
yield DagBag()
tests/test_sample.py
import datetime
import itertools
import zoneinfo
import pendulum
import pytest
from airflow.models import DagBag, DagRun
def test_impot_dags():
dag_id = "sample"
dagbag = DagBag()
assert len(dagbag.import_errors) == 0, "DAG import failures. Errors: {}".format(
dagbag.import_errors
)
assert dagbag.get_dag(dag_id=dag_id) is not None
@pytest.mark.parametrize(
"freezegun_dagbag, execution_date, expected_tasks_states",
[
(
datetime.datetime(2024, 9, 5, 10, 5, tzinfo=zoneinfo.ZoneInfo(key="UTC")),
datetime.datetime(2024, 9, 5, 10, 0, tzinfo=zoneinfo.ZoneInfo(key="UTC")),
{"empty": "success"},
),
(
datetime.datetime(2024, 9, 5, 11, 5, tzinfo=zoneinfo.ZoneInfo(key="UTC")),
datetime.datetime(2024, 9, 5, 10, 0, tzinfo=zoneinfo.ZoneInfo(key="UTC")),
{"empty": "skipped"},
),
],
indirect=["freezegun_dagbag"],
ids=["normal", "backfill"],
)
def test_unittest(freezegun_dagbag, execution_date, expected_tasks_states):
dag_id = "sample"
dag = freezegun_dagbag.get_dag(dag_id=dag_id)
resp: DagRun = dag.test(
execution_date=pendulum.instance(
execution_date.astimezone(zoneinfo.ZoneInfo(key="UTC"))
),
mark_success_pattern=".*_sensor",
)
task_status = {
id: ",".join(set([s[1] for s in statuses]))
for id, statuses in itertools.groupby(
sorted(
[(e.task_id.split(".")[0], e.state) for e in resp.get_task_instances()],
key=lambda x: x[0],
),
key=lambda x: x[0],
)
}
assert {
k: task_status[k] for k in task_status if k in expected_tasks_states
} == expected_tasks_states
dag.test()
ではDAGの実行をともなう単体テストを行います。
指定しているパラメータは、次の通りとなります。
execution_dateには、実行時間を指定します。
この例の場合は、
execution_dateに 2024/09/05 10:00:00
を指定して、
data_interval_start/endが、2024/09/05 09:00:00~2024/09/05 10:00:00
の処理を実行しています。
execution_dateが含まれる1つ前のdata_intervalが実行されるという事になります。
# data_intervalは末尾の時点を含まないので、
# 2024/09/05 10:00:00
は、2024/09/05 10:00:00~2024/09/05 11:00:00
に含まれます
mark_success_patternには、処理を行わずに成功させるタスクIDを正規表現で指定します。
この例では、.*_sensor
として、file_sensor
に適用しています。
これによってfile_sensor
で指定したファイルの有無にかかわらずタスクが成功します。
外部システムに接続が必要なタスクなど、単体テストの対象としにくいタスクは、
固定で成功させておくと都合がよいです。
テストの実行
単体テストを実行します。
$ pytest tests
以上。