Pylint-AirflowというAirflowのDAGのLinterがあったので、試してみました。

Pylint-Airflow | GitHub
https://github.com/BasPH/pylint-airflow

2019年から更新されていないので、
ちょっと不安だったのですが、問題無く動いてそうです。
# AirflowのVersionも上がってますし

セットアップ

※ 利用するAirflow関連のライブラリなどのセットアップは完了している前提です.

Pylint-Airflowを追加します。

pip install pylint-airflow

Pylintの設定ファイルを生成します。

pylint --generate-rcfile > .pylintrc

Pylintの設定ファイルを書き換えていきます。

まずは、次のようにプラグインにPylint-Airflowを追加します。

.pylintrc 抜粋

[MASTER]
load-plugins=pylint_airflow

次の設定は無くても構わないのですが、
Pylint-Airflowのチェック項目だけを有効にして、
他のPylintのチェック項目を対象外にしたかったので、次のように変更しました。
# flake8やmypyと併用していたので、
# チェック項目が競合・重複するのを避けたかったためです。

.pylintrc 抜粋

[MESSAGES CONTROL]
confidence=
disable=all
enable=different-operator-varname-taskid,
       match-callable-taskid,
       mixed-dependency-directions,
       task-no-dependencies,
       task-context-argname,
       task-context-separate-arg,
       match-dagid-filename,
       unused-xcom,
       basehook-top-level,
       duplicate-dag-name,
       duplicate-task-name,
       duplicate-dependency,
       dag-with-cycles,
       task-no-dag

dagsディレクトリ配下に、dagの定義を置いているとして、
以下のようにPylintを実行します。

pylint dags

実行例

実際に適当なDAGを書いて動かしてみました。
以下は、適用してDAGとその実行結果です。

dags/sample.py

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG(
        'sample_dag',
        start_date=datetime(2021, 6, 1),
        schedule_interval=None,
        max_active_runs=1,
        catchup=True,
) as dag:
    def func_do_nothing():
        pass


    do_nothing = PythonOperator(
        task_id='do_nothing_task',
        python_callable=func_do_nothing
    )

実行結果

% pylint dags/sample.py
************* Module sample
dags/sample.py:1:0: C8306: For consistency match the DAG filename with the dag_id (match-dagid-filename)
dags/sample.py:16:4: C8300: Operator variable name and task_id argument should match (different-operator-varname-taskid)
dags/sample.py:16:4: C8301: Name the python_callable function '_[task_id]' (match-callable-taskid)

見ての通りですが、次のような指摘です。

  • 「ファイル名」と「dag_id」は揃えましょう
  • 「オペレータを代入する変数」と「task_id」は揃えましょう
  • PythonOperatorの「python_callable」の関数名は、「_」+「task_id」にしましょう

全て修正すると、以下のようになります。

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

with DAG(
        'sample',
        start_date=datetime(2021, 6, 1),
        schedule_interval=None,
        max_active_runs=1,
        catchup=True,
) as dag:
    def _do_nothing():
        pass


    do_nothing = PythonOperator(
        task_id='do_nothing',
        python_callable=_do_nothing
    )

dag_idの命名規則などは、チェックするのも手間なので、
CIに、こういった仕組みを入れておくのも良いかと思います。