この記事は、
データ基盤・BIツール・データエンジニアリング Advent Calendar 2021 の5日目の記事です。

データ基盤・BIツール・データエンジニアリング Advent Calendar 2021
https://qiita.com/advent-calendar/2021/dataengineering

AirflowのDAG単体テストの書き方は、
以下のドキュメントに書かれているのですが、
実行環境の整備するためには、
pytestやairflow設定の知識が必要なので、手順としてまとめました。

Unit tests | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/2.2.2/best-practices.html#testing-a-dag

Airflow・pytestのセットアップ

Python3で仮想環境を作ります。

mkdir airflow-unittest-sample && cd $_
python3 -m venv venv
source venv/bin/activate

Airflow2.2.2をインストールします。

AIRFLOW_VERSION=2.2.2
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

インストールしたpackageを、requirement.txtに書き出しておきます。

pip freeze > requirements.txt

単体テストの実行用に、pytestもインストールします。

echo "pytest==6.2.5" > test-requirements.txt
pip install -r test-requirements.txt

ディレクトリ構成

テスト対象のDAG、単体テストは、
次のようなディレクトリ構成で格納することを想定します。

.
├── dags
│   └── sample.py
├── requirements.txt
├── test-requirements.txt
└── tests
    ├── __init__.py
    ├── conftest.py
    └── test_dags.py

dags配下に、テスト対象
tests配下に、単体テスト
という構成で格納します。

テスト対象のDAG作成

テスト対象のDAGを作成します。
ここでは、意図的に閉路(サイクル)を持つ不正なDAGにしています。

dags/sample.py

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator


with DAG(
        'sample',
        start_date=datetime(2021, 12, 1),
        schedule_interval=None,
) as dag:
    dummy = DummyOperator(task_id='dummy')
    dummy >> dummy

単体テストの作成

単体テストを作成します。
このテストでは、importエラーが無いかを検査しています。

tests/test_dags.py

import unittest
from airflow.models import DagBag


class TestDags(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.dagbag = DagBag()

    def test_import_dags(self):
        self.assertFalse(
            len(self.dagbag.import_errors),
            'DAG import failures. Errors: {}'.format(
                self.dagbag.import_errors
            )
        )

単体テストを実行するため、
conftest.pyで、テスト実行時のAirflowの設定や、テスト用DBの初期化を行います。

# os.environで指定している環境変数は、airflow.cfgに指定する設定で、
# 「AIRFLOW__{SECTION}__{KEY}」形式で指定します。

tests/conftest.py

import os
import pytest
import pathlib

os.environ["AIRFLOW_HOME"] = "/tmp/airflow-unittest"
os.environ["AIRFLOW__CORE__LOAD_EXAMPLES"] = "False"
os.environ["AIRFLOW__CORE__UNIT_TEST_MODE"] = "True"
os.environ["AIRFLOW__CORE__EXECUTOR"] = "DebugExecutor"
os.environ["AIRFLOW__CORE__DAGS_FOLDER"] = str(pathlib.Path("{}/../dags".format(os.path.dirname(__file__))).resolve())


@pytest.fixture(scope='session', autouse=True)
def reset_db() -> None:
    from airflow.utils import db

    db.resetdb()

テストを実行する

pytestコマンドで、単体テストを実行します。

pytest tests

先ほどのDAGは、閉路(サイクル)を持っているので、
次のように「Cycle detected in DAG.」と出て失敗します。

FAILED tests/test_dags.py::TestDags::test_import_dags - AssertionError: 1 is not false : DAG import failures. Errors: {'/Users/mikamitakeshi/tmp/airflow-unittest-sample/dags/sample.py': 'Cycle detected in DAG. Faulty task: dummy'}

本エントリのように設定しておくと、
一般的なpytestと同じように実行することが出来るので。
GitHubActionsなどのCIに組み込むことも可能です。