AirflowのConnectionやVariableは、SecretBackendを使うと、
AWS SSM ParameterやGCP Secrets Managerで管理することが出来ます。

Alternative secrets backend | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/1.10.10/howto/use-alternative-secrets-backend.html

ただし既に他の方法で、Connectionに利用する認証情報を管理している場合は、
これらのSecretBackendにコピーするよりも、直接アクセス出来た方が運用上の都合が良いです。

その場合は、
「Connectionを管理しているDBなどに接続して情報を返却する自前のSecretBackendを作る」
と言う方法が良さそうですが。
既に一部の情報をAWS SSM Parameterなどに保存している場合は、その方法も使えません。
# 複数のSecretBackendをChainして呼び出すSecretBackendを作れば出来ますが、

このようなケースに対応するため、このエントリでは、
既存のDBからConnection情報を取得 → 環境変数に設定 → 環境変数からConnectionを利用
という方法で、Connectionを利用する方法を紹介します。

想定シナリオ

このエントリで想定するシナリオは、次の内容です。

  • PostgreSQLにAWSのアクセスキーが格納されている
  • S3にあるCSVファイルをコピーする
    • コピー元: aws_defaultで接続する
    • コピー先: PostgreSQLから取得したアクセスキーで接続する

PostgreSQLには、以下のようにアクセスキーが入っているとします。

# select * from connections;
   id    | aws_access_key_id | aws_secret_access_key |  region_name   
---------+-------------------+-----------------------+----------------
 tenant1 | ***               | ********              | ap-northeast-1
(1 row)

また、上記アクセスキーが入っているPostgresへのConnectionは、
postgres_defaultに設定されているとします。

DAGの実装

DAGの実装は、次のようになります。

import os, json
from datetime import datetime
from airflow import DAG
from airflow.models import Connection
from airflow.providers.amazon.aws.operators.s3_file_transform import S3FileTransformOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from psycopg2.extras import DictCursor

# PostgreSQLからAWSの接続情報を取得
pg_hook = PostgresHook.get_hook(conn_id='postgres_default')

with pg_hook.get_conn() as pgconn:
    with pgconn.cursor(cursor_factory=DictCursor) as cur:
        cur.execute('SELECT * FROM connections')
        aws_cred = cur.fetchone()

# 接続文字列の組み立て・環境変数への設定
conn = Connection(
    conn_id='aws_tenant1',
    conn_type='aws',
    description=None,
    host=None,
    login=aws_cred['aws_access_key_id'],
    password=aws_cred['aws_secret_access_key'],
    extra=json.dumps(dict(region_name=aws_cred['region_name'])),
)
os.environ["AIRFLOW_CONN_AWS_TENANT1"] = conn.get_uri()

# DAGの作成
with DAG(
        'sample',
        start_date=datetime(2021, 6, 1),
        schedule_interval=None,
        max_active_runs=1,
        catchup=True,
) as dag:
    S3FileTransformOperator(
        task_id='s3_upload',
        source_s3_key='s3://{転送元Bucket}/{転送元Key}',
        dest_s3_key='s3://{転送先Bucket}/{転送先Key}',
        select_expression='SELECT * FROM S3Object',
        dest_aws_conn_id='aws_tenant1',
    )

接続文字列の組み立て・環境変数については、以下を参考にします。

Generating a connection URI | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#generating-a-connection-uri

Storing a Connection in Environment Variables | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html#storing-a-connection-in-environment-variables

このような方法を使えば、集計したなんらかの結果を、
Airflowに用意されているOperatorを利用して、取引先毎のS3に送るようなことも出来ますね。

以上。