AirflowのConnectionのパラメータを既存のDBから設定する方法
AirflowのConnectionやVariableは、SecretBackendを使うと、
AWS SSM ParameterやGCP Secrets Managerで管理することが出来ます。
Alternative secrets backend | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/1.10.10/howto/use-alternative-secrets-backend.html
ただし既に他の方法で、Connectionに利用する認証情報を管理している場合は、
これらのSecretBackendにコピーするよりも、直接アクセス出来た方が運用上の都合が良いです。
その場合は、
「Connectionを管理しているDBなどに接続して情報を返却する自前のSecretBackendを作る」
と言う方法が良さそうですが。
既に一部の情報をAWS SSM Parameterなどに保存している場合は、その方法も使えません。
# 複数のSecretBackendをChainして呼び出すSecretBackendを作れば出来ますが、
このようなケースに対応するため、このエントリでは、
既存のDBからConnection情報を取得 → 環境変数に設定 → 環境変数からConnectionを利用
という方法で、Connectionを利用する方法を紹介します。
想定シナリオ
このエントリで想定するシナリオは、次の内容です。
- PostgreSQLにAWSのアクセスキーが格納されている
- S3にあるCSVファイルをコピーする
- コピー元: aws_defaultで接続する
- コピー先: PostgreSQLから取得したアクセスキーで接続する
PostgreSQLには、以下のようにアクセスキーが入っているとします。
# select * from connections;
id | aws_access_key_id | aws_secret_access_key | region_name
---------+-------------------+-----------------------+----------------
tenant1 | *** | ******** | ap-northeast-1
(1 row)
また、上記アクセスキーが入っているPostgresへのConnectionは、
postgres_defaultに設定されているとします。
DAGの実装
DAGの実装は、次のようになります。
import os, json
from datetime import datetime
from airflow import DAG
from airflow.models import Connection
from airflow.providers.amazon.aws.operators.s3_file_transform import S3FileTransformOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from psycopg2.extras import DictCursor
# PostgreSQLからAWSの接続情報を取得
pg_hook = PostgresHook.get_hook(conn_id='postgres_default')
with pg_hook.get_conn() as pgconn:
with pgconn.cursor(cursor_factory=DictCursor) as cur:
cur.execute('SELECT * FROM connections')
aws_cred = cur.fetchone()
# 接続文字列の組み立て・環境変数への設定
conn = Connection(
conn_id='aws_tenant1',
conn_type='aws',
description=None,
host=None,
login=aws_cred['aws_access_key_id'],
password=aws_cred['aws_secret_access_key'],
extra=json.dumps(dict(region_name=aws_cred['region_name'])),
)
os.environ["AIRFLOW_CONN_AWS_TENANT1"] = conn.get_uri()
# DAGの作成
with DAG(
'sample',
start_date=datetime(2021, 6, 1),
schedule_interval=None,
max_active_runs=1,
catchup=True,
) as dag:
S3FileTransformOperator(
task_id='s3_upload',
source_s3_key='s3://{転送元Bucket}/{転送元Key}',
dest_s3_key='s3://{転送先Bucket}/{転送先Key}',
select_expression='SELECT * FROM S3Object',
dest_aws_conn_id='aws_tenant1',
)
接続文字列の組み立て・環境変数については、以下を参考にします。
Generating a connection URI | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#generating-a-connection-uri
Storing a Connection in Environment Variables | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#storing-a-connection-in-environment-variables
このような方法を使えば、集計したなんらかの結果を、
Airflowに用意されているOperatorを利用して、取引先毎のS3に送るようなことも出来ますね。
以上。