このエントリでは、
AirflowからEMRを利用する時に、途中のStepから再実行出来るようにする方法を示します。

以下のドキュメントに示されているとおり、
Amazon EMR Operatorsを利用すると、EMRで処理を実行出来ます。

Amazon EMR Operator | Apache Airflow
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr.html

課題の整理

次のページで上げられているサンプルコードでは、次の流れで処理を行っています。

  • EMRクラスタを作成 (EmrCreateJobFlowOperator)
  • Stepを追加 (EmrAddStepsOperator)
  • Step終了まで待ち合わせ (EmrStepSensor)
  • EMRクラスタを終了 (EmrTerminateJobFlowOperator)

サンプルコード
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/example_dags/example_emr_job_flow_manual_steps.html

上記のサンプルを元に、2つのStepを順に実行するコードを考えると、次のようになります。

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
)

step_adder1 = EmrAddStepsOperator(
    task_id='add_steps1',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_STEPS,
)

step_checker1 = EmrStepSensor(
    task_id='watch_step1',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
)

step_adder2 = EmrAddStepsOperator(
    task_id='add_steps2',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=SPARK_STEPS,
)

step_checker2 = EmrStepSensor(
    task_id='watch_step2',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps2', key='return_value')[0] }}",
    aws_conn_id='aws_default',
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
)

cluster_creator >> step_adder1 >> step_checker1 >> step_adder2 >> step_checker2 >> cluster_remover

処理の流れを書くと次の通りです。

  • ①EMRクラスタを作成 (EmrCreateJobFlowOperator)
  • ②Step1を追加 (EmrAddStepsOperator)
  • ③Step1終了まで待ち合わせ (EmrStepSensor)
  • ④Step2を追加 (EmrAddStepsOperator)
  • ⑤Step2終了まで待ち合わせ (EmrStepSensor)
  • ⑥EMRクラスタを終了 (EmrTerminateJobFlowOperator)

処理が正常に動いている限りはこれで問題が無いのですが、
途中で処理が失敗したり、その後に途中から再実行したい場合には、
このままでは、課題があります。

途中で失敗するケースの課題

まず一つ目の課題として。

処理②〜⑤のどこかで失敗した場合、
⑥のEMRクラスタ終了が実行されずに、
EMRクラスタが起動したままになるという課題があります。

対応方法1: ActionOnFailure

この課題への対応方法として、
各STEPのActionOnFailureをTERMINATE_CLUSTERにする方法があります。

参考: 削除保護の使用 | Amazon EMR
https://docs.aws.amazon.com/ja_jp/emr/latest/ManagementGuide/UsingEMR_TerminationProtection.html

但し、この方法には課題があり、
処理②〜⑤の間に、EMRのStep実行以外の処理が含まれるときに、
Step実行以外の処理で失敗すると、EMRクラスタが終了しません。

以下のような処理の場合です。

  • EMRクラスタを作成 (EmrCreateJobFlowOperator)
  • Step1を追加 (EmrAddStepsOperator)
  • Step1終了まで待ち合わせ (EmrStepSensor)
  • EMRのStep実行以外の処理 (PythonOperator等) ←ここで失敗
  • Step2を追加 (EmrAddStepsOperator)
  • Step2終了まで待ち合わせ (EmrStepSensor)
  • EMRクラスタを終了 (EmrTerminateJobFlowOperator)

対応方法2: trigger_rule

対応方法1の課題解決のために、
trigger_ruleを利用した方法を考えます。

参考: trigger_rule | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#trigger-rules

次のように、
⑥のEMRクラスタ終了が常に実行されるように、
trigger_rule=‘all_done’を指定します。

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    trigger_rule='all_done',
)

これで処理②〜⑤の成功・失敗に関わらず、
⑥のEMRクラスタ終了が実行されます。

但しこのままでは、処理が失敗した場合も、
DAG全体が成功ステータスとなってしまうため、
末尾に、処理結果を反映させるためのDummyOperatorを追加します。

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    trigger_rule='all_done',
)

terminate_status = DummyOperator(
    task_id='terminate_status',
)

cluster_creator >> step_adder1 >> step_checker1 >> step_adder2 >> step_checker2 >> cluster_remover >> terminate_status
step_checker2 >> terminate_status

このようにすると、
Step1が失敗した場合は、次の図のような状態になります。

DAG Graph

途中から再実行するケースの課題

途中で失敗するケースの課題に対応すると、
今度は途中から再実行をしたくなります。

ところが、
Step1(②③)が成功、Step2(④⑤)が失敗した場合に、
Step2を(④から)再実行しようとしても、
EMRクラスタが起動していないので実行出来ません。

この問題に対処するために、
EmrAddStepsOperatorを拡張して、
クラスタが起動していない場合は、起動するように変更します。

以下のコードで2つのOperatorを定義します。

  • EmrAddStepsPrepareClusterOperator: Step追加時、EMRクラスタが無ければ起動
  • EmrTerminateJobFlowByNameOperator: EMRクラスタを名称指定で終了
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.emr_hook import EmrHook

class EmrAddStepsPrepareClusterOperator(BaseOperator):
    template_fields = ['steps', 'job_flow_overrides']
    template_ext = ()
    ui_color = '#f9c915'

    @apply_defaults
    def __init__(
            self,
            steps,
            job_flow_overrides,
            aws_conn_id='aws_default',
            emr_conn_id='emr_default',
            region_name=None,
            *args, **kwargs):
        super(EmrAddStepsPrepareClusterOperator, self).__init__(*args, **kwargs)
        self.steps = steps
        self.job_flow_overrides = job_flow_overrides
        self.aws_conn_id = aws_conn_id
        self.emr_conn_id = emr_conn_id
        self.region_name = region_name

    def execute(self, context):
        emr_hook = EmrHook(aws_conn_id=self.aws_conn_id,
                           emr_conn_id=self.emr_conn_id,
                           region_name=self.region_name)
        emr = emr_hook.get_conn()

        # get emr cluster if exists
        job_flow_name = self.job_flow_overrides['Name']
        job_flow_id = emr_hook.get_cluster_id_by_name(
            job_flow_name,
            cluster_states=['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING']
        )

        # create emr cluster if not exists
        if job_flow_id is None:
            response = emr_hook.create_job_flow(self.job_flow_overrides)
            if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
                raise AirflowException('JobFlow creation failed: %s' % response)
            else:
                job_flow_id = response['JobFlowId']
                self.log.info('JobFlow with id %s created', job_flow_id)

        # add step to emr cluster
        self.log.info('Adding steps to %s', job_flow_id)
        response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=self.steps)

        if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
            raise AirflowException('Adding steps failed: %s' % response)
        else:
            self.log.info('Steps %s added to JobFlow', response['StepIds'])
            return [job_flow_id, response['StepIds'][0]]


class EmrTerminateJobFlowByNameOperator(BaseOperator):
    template_fields = ['job_flow_name']
    template_ext = ()
    ui_color = '#f9c915'

    @apply_defaults
    def __init__(
            self,
            job_flow_name,
            aws_conn_id='aws_default',
            *args, **kwargs):
        super(EmrTerminateJobFlowByNameOperator, self).__init__(*args, **kwargs)
        self.job_flow_name = job_flow_name
        self.aws_conn_id = aws_conn_id

    def execute(self, context):
        emr_hook = EmrHook(aws_conn_id=self.aws_conn_id)
        emr = emr_hook.get_conn()

        job_flow_id = emr_hook.get_cluster_id_by_name(
            self.job_flow_name,
            cluster_states=['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING']
        )
        if job_flow_id is None:
            return

        self.log.info('Terminating JobFlow %s', job_flow_id)
        response = emr.terminate_job_flows(JobFlowIds=[job_flow_id])

        if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
            raise AirflowException(
                'JobFlow termination failed: %s' % response)
        else:
            self.log.info('JobFlow with id %s terminated', job_flow_id)

DAGも前述のOperatorを使うように修正します。
# cluster_creatorは不要になります。

step_adder1 = EmrAddStepsPrepareClusterOperator(
    task_id='add_steps1',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    steps=SPARK_STEPS,
)

step_checker1 = EmrStepSensor(
    task_id='watch_step1',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[1] }}",
    aws_conn_id='aws_default',
)

step_adder2 = EmrAddStepsPrepareClusterOperator(
    task_id='add_steps2',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    steps=SPARK_STEPS,
)

step_checker2 = EmrStepSensor(
    task_id='watch_step2',
    job_flow_id="{{ task_instance.xcom_pull(task_ids='add_steps2', key='return_value')[0] }}",
    step_id="{{ task_instance.xcom_pull(task_ids='add_steps2', key='return_value')[1] }}",
    aws_conn_id='aws_default',
)

cluster_remover = EmrTerminateJobFlowByNameOperator(
    task_id='remove_cluster',
    job_flow_name=JOB_FLOW_OVERRIDES['Name'],
    aws_conn_id='aws_default',
    trigger_rule='all_done',
)

terminate_status = DummyOperator(
    task_id='terminate_status',
)

step_adder1 >> step_checker1 >> step_adder2 >> step_checker2 >> cluster_remover >> terminate_status
step_checker2 >> terminate_status

backfillの時など、並行してDAGが実行された時のために、
EMRクラスタ名に、execution_dateを入れておくと良いと思います。

JOB_FLOW_OVERRIDES = {
    'Name': 'PiCalc-{{ execution_date.strftime("%Y%m%dT%H%M%S%f") }}',
    'ReleaseLabel': 'emr-5.29.0',
省略

以上の対応で、
AirflowからEMRを利用する時に、途中のStepから再実行出来るようになりました。

この方法でも、Step追加が並行して実行されるDAGは上手く処理出来ないことに注意して下さい。
# 並行して実行させたいStepに対して、
# 別名でEMRクラスタを起動するようにDAGを組む必要があります