Pylint-Airflowを使ってみた
Pylint-AirflowというAirflowのDAGのLinterがあったので、試してみました。
Pylint-Airflow | GitHub
https://github.com/BasPH/pylint-airflow
2019年から更新されていないので、
ちょっと不安だったのですが、問題無く動いてそうです。
# AirflowのVersionも上がってますし
セットアップ
※ 利用するAirflow関連のライブラリなどのセットアップは完了している前提です.
Pylint-Airflowを追加します。
pip install pylint-airflow
Pylintの設定ファイルを生成します。
pylint --generate-rcfile > .pylintrc
Pylintの設定ファイルを書き換えていきます。
まずは、次のようにプラグインにPylint-Airflowを追加します。
.pylintrc 抜粋
[MASTER]
load-plugins=pylint_airflow
次の設定は無くても構わないのですが、
Pylint-Airflowのチェック項目だけを有効にして、
他のPylintのチェック項目を対象外にしたかったので、次のように変更しました。
# flake8やmypyと併用していたので、
# チェック項目が競合・重複するのを避けたかったためです。
.pylintrc 抜粋
[MESSAGES CONTROL]
confidence=
disable=all
enable=different-operator-varname-taskid,
match-callable-taskid,
mixed-dependency-directions,
task-no-dependencies,
task-context-argname,
task-context-separate-arg,
match-dagid-filename,
unused-xcom,
basehook-top-level,
duplicate-dag-name,
duplicate-task-name,
duplicate-dependency,
dag-with-cycles,
task-no-dag
dagsディレクトリ配下に、dagの定義を置いているとして、
以下のようにPylintを実行します。
pylint dags
実行例
実際に適当なDAGを書いて動かしてみました。
以下は、適用してDAGとその実行結果です。
dags/sample.py
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
'sample_dag',
start_date=datetime(2021, 6, 1),
schedule_interval=None,
max_active_runs=1,
catchup=True,
) as dag:
def func_do_nothing():
pass
do_nothing = PythonOperator(
task_id='do_nothing_task',
python_callable=func_do_nothing
)
実行結果
% pylint dags/sample.py
************* Module sample
dags/sample.py:1:0: C8306: For consistency match the DAG filename with the dag_id (match-dagid-filename)
dags/sample.py:16:4: C8300: Operator variable name and task_id argument should match (different-operator-varname-taskid)
dags/sample.py:16:4: C8301: Name the python_callable function '_[task_id]' (match-callable-taskid)
見ての通りですが、次のような指摘です。
- 「ファイル名」と「dag_id」は揃えましょう
- 「オペレータを代入する変数」と「task_id」は揃えましょう
- PythonOperatorの「python_callable」の関数名は、「_」+「task_id」にしましょう
全て修正すると、以下のようになります。
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
'sample',
start_date=datetime(2021, 6, 1),
schedule_interval=None,
max_active_runs=1,
catchup=True,
) as dag:
def _do_nothing():
pass
do_nothing = PythonOperator(
task_id='do_nothing',
python_callable=_do_nothing
)
dag_idの命名規則などは、チェックするのも手間なので、
CIに、こういった仕組みを入れておくのも良いかと思います。