AirflowのDAG単体テストの事始め
この記事は、
データ基盤・BIツール・データエンジニアリング Advent Calendar 2021 の5日目の記事です。
データ基盤・BIツール・データエンジニアリング Advent Calendar 2021
https://qiita.com/advent-calendar/2021/dataengineering
AirflowのDAG単体テストの書き方は、
以下のドキュメントに書かれているのですが、
実行環境の整備するためには、
pytestやairflow設定の知識が必要なので、手順としてまとめました。
Unit tests | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/2.2.2/best-practices.html#testing-a-dag
Airflow・pytestのセットアップ
Python3で仮想環境を作ります。
mkdir airflow-unittest-sample && cd $_
python3 -m venv venv
source venv/bin/activate
Airflow2.2.2をインストールします。
AIRFLOW_VERSION=2.2.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
インストールしたpackageを、requirement.txtに書き出しておきます。
pip freeze > requirements.txt
単体テストの実行用に、pytestもインストールします。
echo "pytest==6.2.5" > test-requirements.txt
pip install -r test-requirements.txt
ディレクトリ構成
テスト対象のDAG、単体テストは、
次のようなディレクトリ構成で格納することを想定します。
.
├── dags
│ └── sample.py
├── requirements.txt
├── test-requirements.txt
└── tests
├── __init__.py
├── conftest.py
└── test_dags.py
dags配下に、テスト対象
tests配下に、単体テスト
という構成で格納します。
テスト対象のDAG作成
テスト対象のDAGを作成します。
ここでは、意図的に閉路(サイクル)を持つ不正なDAGにしています。
dags/sample.py
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
with DAG(
'sample',
start_date=datetime(2021, 12, 1),
schedule_interval=None,
) as dag:
dummy = DummyOperator(task_id='dummy')
dummy >> dummy
単体テストの作成
単体テストを作成します。
このテストでは、importエラーが無いかを検査しています。
tests/test_dags.py
import unittest
from airflow.models import DagBag
class TestDags(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.dagbag = DagBag()
def test_import_dags(self):
self.assertFalse(
len(self.dagbag.import_errors),
'DAG import failures. Errors: {}'.format(
self.dagbag.import_errors
)
)
単体テストを実行するため、
conftest.pyで、テスト実行時のAirflowの設定や、テスト用DBの初期化を行います。
# os.environで指定している環境変数は、airflow.cfgに指定する設定で、
# 「AIRFLOW_{SECTION}_{KEY}」形式で指定します。
tests/conftest.py
import os
import pytest
import pathlib
os.environ["AIRFLOW_HOME"] = "/tmp/airflow-unittest"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = str(pathlib.Path("{}/../dags".format(os.path.dirname(__file__))).resolve())
@pytest.fixture(scope='session', autouse=True)
def reset_db() -> None:
from airflow.utils import db
db.resetdb()
テストを実行する
pytestコマンドで、単体テストを実行します。
pytest tests
先ほどのDAGは、閉路(サイクル)を持っているので、
次のように「Cycle detected in DAG.」と出て失敗します。
FAILED tests/test_dags.py::TestDags::test_import_dags - AssertionError: 1 is not false : DAG import failures. Errors: {'/Users/mikamitakeshi/tmp/airflow-unittest-sample/dags/sample.py': 'Cycle detected in DAG. Faulty task: dummy'}
本エントリのように設定しておくと、
一般的なpytestと同じように実行することが出来るので。
GitHubActionsなどのCIに組み込むことも可能です。