このエントリは、AirflowをSupervisorで動かす手順のメモです。

Apache Airflow
https://airflow.apache.org/

Supervisor
http://supervisord.org/

Airflowは、公式でDockerで動かす方法が用意されています。

Running Airflow in Docker
https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

リソース・性能面・デバッグ作業の効率などの理由から、
Dockerを使いたくない場合もあると思うので、
公式のdocker-composeを置き換えるような感じで、
Supervisorで動かす手順を整理してみました。

2.10.3用のdocker-compose.yaml
https://airflow.apache.org/docs/apache-airflow/2.10.3/docker-compose.yaml

試した環境は、WSL上のUbuntu22.04です。

利用するミドルウェアのインストール

利用するミドルウェア群をインストールしてきます。

Supservisorのインストール。

sudo apt install supervisor

Supervisorのログを見やすくするために、supervisor-stdlogもインストール。
参考: https://github.com/razielanarki/supervisor-stdlog

pip install supervisor-stdlog

PostgreSQLのインストール。

sudo apt install postgresql postgresql-contrib

psycopg2を使うのに必要なので、libpq-devもインストール。

sudo apt install libpq-dev

Redisのインストール。

sudo apt install redis

各サービスについて、不要であれば無効化しておきます。

sudo systemctl stop supervisor
sudo systemctl disable supervisor
sudo systemctl stop postgresql
sudo systemctl disable postgresql
sudo systemctl stop redis-server
sudo systemctl disable redis-server

Airflowアプリの作業ディレクトリ準備

Airflowアプリの作業ディレクトリを作成していきます。

このエントリでのディレクトリ構成は、
次のように.data配下にDB等を格納するよう構成します。

  • 作業ディレクトリルート
    • dags … カスタムDAGの格納先
    • venv … Python仮想環境
    • .data … Airflow用DBなど格納先
      • airflow
      • postgres
        • db
        • socket
    • supervisord.conf.tpl … Supervisorの定義ファイルのテンプレート

仮想環境の作成、Airflowのインストール

作業ディレクトリを準備します。

mkdir airflow-supervisord-study && cd $_

仮想環境を用意します。

python -m venv venv
. venv/bin/activate

以下の手順を参考に、Airflowをインストールします。

Quick Start | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/2.10.3/start.html

AIRFLOW_VERSION=2.10.3

# Extract the version of Python you have installed. If you're currently using a Python version that is not supported by Airflow, you may want to set this manually.
# See above for supported versions.
PYTHON_VERSION="$(python -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')"

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
# For example this would install 2.10.3 with python 3.8: https://raw.githubusercontent.com/apache/airflow/constraints-2.10.3/constraints-3.8.txt

pip install "apache-airflow[celery]==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

BackendにPostgreSQLを使うので、psycopg2も入れておきます。

pip install psycopg2

環境変数の設定

次のとおり、環境変数を指定します。
direnvなどで自動的に設定されるようにしておくと良いと思います。

export POSTGRES_BIN_DIR=/usr/lib/postgresql/14/bin
export REDIS_BIN_DIR=/usr/bin

export POSTGRES_DATA_DIR=$PWD/.data/postgres/db
export POSTGRES_SOCKET_DIR=$PWD/.data/postgres/socket

export AIRFLOW_HOME=$PWD/.data/airflow
export AIRFLOW__CORE__EXECUTOR=CeleryExecutor
export AIRFLOW__CORE__DAGS_FOLDER=$PWD/dags
export AIRFLOW__CORE__LOAD_EXAMPLES=False
export AIRFLOW__DATABASE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow_user:airflow_pass@localhost:5432/airflow_db
export AIRFLOW__CELERY__BROKER_URL=redis://localhost:6379/0

POSTGRES_BIN_DIR, REDIS_BIN_DIRは、
PostgeSQL, Redisの実行ファイルが存在する場所で環境依存になります。

POSTGRES_DATA_DIR, POSTGRES_SOCKET_DIRはPostgreSQLのDB等の格納先。

AIRFLOW_HOMEは、Airflowのホームディレクトリ。
AIRFLOW__で始まる変数はAirflowの挙動を制御するためのパラメータです。

Airflowホーム・PostgreSQLデータベースの作成

Airflowのホームディレクトリを作成します。

rm -rf $AIRFLOW_HOME && mkdir -p $AIRFLOW_HOME

PostgreSQLのDBを作成します。

rm -rf $POSTGRES_DATA_DIR && mkdir -p $POSTGRES_DATA_DIR
rm -rf $POSTGRES_SOCKET_DIR && mkdir -p $POSTGRES_SOCKET_DIR
$POSTGRES_BIN_DIR/initdb -D $POSTGRES_DATA_DIR

Airflow用のデータベースを作成

PostgreSQLを起動します。

$POSTGRES_BIN_DIR/postgres -D $POSTGRES_DATA_DIR -k $POSTGRES_SOCKET_DIR -p 5432

Airflow用のデータベースを作成。
PostgreSQLを起動させたまま、以下を実行する。

$POSTGRES_BIN_DIR/dropdb -h $POSTGRES_SOCKET_DIR -p 5432  --if-exists airflow_db
$POSTGRES_BIN_DIR/createdb -h $POSTGRES_SOCKET_DIR -p 5432 airflow_db
$POSTGRES_BIN_DIR/psql -h $POSTGRES_SOCKET_DIR -p 5432 -d airflow_db -c "
  CREATE USER airflow_user WITH PASSWORD 'airflow_pass';
  GRANT ALL PRIVILEGES ON DATABASE airflow_db TO airflow_user;
"
airflow db init
airflow users create \
  --username admin \
  --password admin \
  --firstname Peter \
  --lastname Parker \
  --role Admin \
  --email spiderman@superhero.org

参考) Setting up a PostgreSQL Database | Set up a Database Backend | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html

PostgreSQLを終了させます。
PostgreSQLを起動したターミナルでCtrl+C。

Supervisorの設定・起動

Supervisorの設定ファイルを用意します。
パスを環境変数で置き換えるため、templateファイルを作成します。

supervisord.conf.tpl

[supervisord]
nodaemon = true
[program:redis]
command = ${REDIS_BIN_DIR}/redis-server --port 6379
stdout_events_enabled = true
stderr_events_enabled = true
[program:postgresql]
command = ${POSTGRES_BIN_DIR}/postgres -D ${POSTGRES_DATA_DIR} -k ${POSTGRES_SOCKET_DIR} -p 5432
stdout_events_enabled = true
stderr_events_enabled = true
[program:webserver]
command = airflow webserver
stdout_events_enabled = true
stderr_events_enabled = true
[program:scheduler]
command = airflow scheduler
stdout_events_enabled = true
stderr_events_enabled = true
[program:worker]
command = airflow celery worker
stdout_events_enabled = true
stderr_events_enabled = true
[program:flower]
command = airflow celery flower
stdout_events_enabled = true
stderr_events_enabled = true
[eventlistener:stdlog]
events = PROCESS_LOG
command = supervisor-stdlog
result_handler = supervisor_stdlog:log_handler
buffer_size = 100

templateの変数を置換して、supervisord.confを作成。

cat supervisord.conf.tpl \
  | sed s\|\${REDIS_BIN_DIR}\|$REDIS_BIN_DIR\|g \
  | sed s\|\${POSTGRES_BIN_DIR}\|$POSTGRES_BIN_DIR\|g \
  | sed s\|\${POSTGRES_DATA_DIR}\|$POSTGRES_DATA_DIR\|g \
  | sed s\|\${POSTGRES_SOCKET_DIR}\|$POSTGRES_SOCKET_DIR\|g \
  > supervisord.conf

Supervisorを起動。

/usr/bin/supervisord -c supervisord.conf

AirflowのUIは次のURLから、admin/adminでログイン。
http://localhost:8080/

flowerのUIは次のURLから。
http://localhost:5555/

終了させたい場合は、Supervisorを起動したターミナルでCtrl+C。

サンプルDAGを追加

次のようなサンプルDAGを作成して配置、
しばらく待って、AirflowUIの一覧にサンプルDAGが表示されればOK。

dags/sample.py

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(dag_id="sample") as dag:
    EmptyOperator(task_id="empty")

以上。 データベース初期化まわりが面倒なので、 スクリプト用意しておいた方が良いかもしれないです。