AirflowからEMRを利用する時に、途中のStepから再実行出来るようにする
このエントリでは、
AirflowからEMRを利用する時に、途中のStepから再実行出来るようにする方法を示します。
以下のドキュメントに示されているとおり、
Amazon EMR Operatorsを利用すると、EMRで処理を実行出来ます。
Amazon EMR Operator | Apache Airflow
https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/emr.html
課題の整理
次のページで上げられているサンプルコードでは、次の流れで処理を行っています。
- EMRクラスタを作成 (EmrCreateJobFlowOperator)
- Stepを追加 (EmrAddStepsOperator)
- Step終了まで待ち合わせ (EmrStepSensor)
- EMRクラスタを終了 (EmrTerminateJobFlowOperator)
上記のサンプルを元に、2つのStepを順に実行するコードを考えると、次のようになります。
cluster_creator = EmrCreateJobFlowOperator(
task_id='create_job_flow',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
)
step_adder1 = EmrAddStepsOperator(
task_id='add_steps1',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker1 = EmrStepSensor(
task_id='watch_step1',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
step_adder2 = EmrAddStepsOperator(
task_id='add_steps2',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=SPARK_STEPS,
)
step_checker2 = EmrStepSensor(
task_id='watch_step2',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps2', key='return_value')[0] }}",
aws_conn_id='aws_default',
)
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
)
cluster_creator >> step_adder1 >> step_checker1 >> step_adder2 >> step_checker2 >> cluster_remover
処理の流れを書くと次の通りです。
- ①EMRクラスタを作成 (EmrCreateJobFlowOperator)
- ②Step1を追加 (EmrAddStepsOperator)
- ③Step1終了まで待ち合わせ (EmrStepSensor)
- ④Step2を追加 (EmrAddStepsOperator)
- ⑤Step2終了まで待ち合わせ (EmrStepSensor)
- ⑥EMRクラスタを終了 (EmrTerminateJobFlowOperator)
処理が正常に動いている限りはこれで問題が無いのですが、
途中で処理が失敗したり、その後に途中から再実行したい場合には、
このままでは、課題があります。
途中で失敗するケースの課題
まず一つ目の課題として。
処理②〜⑤のどこかで失敗した場合、
⑥のEMRクラスタ終了が実行されずに、
EMRクラスタが起動したままになるという課題があります。
対応方法1: ActionOnFailure
この課題への対応方法として、
各STEPのActionOnFailureをTERMINATE_CLUSTERにする方法があります。
参考: 削除保護の使用 | Amazon EMR
https://docs.aws.amazon.com/ja_jp/emr/latest/ManagementGuide/UsingEMR_TerminationProtection.html
但し、この方法には課題があり、
処理②〜⑤の間に、EMRのStep実行以外の処理が含まれるときに、
Step実行以外の処理で失敗すると、EMRクラスタが終了しません。
以下のような処理の場合です。
- EMRクラスタを作成 (EmrCreateJobFlowOperator)
- Step1を追加 (EmrAddStepsOperator)
- Step1終了まで待ち合わせ (EmrStepSensor)
- EMRのStep実行以外の処理 (PythonOperator等) ←ここで失敗
- Step2を追加 (EmrAddStepsOperator)
- Step2終了まで待ち合わせ (EmrStepSensor)
- EMRクラスタを終了 (EmrTerminateJobFlowOperator)
対応方法2: trigger_rule
対応方法1の課題解決のために、
trigger_ruleを利用した方法を考えます。
参考: trigger_rule | Apache Airflow
https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#trigger-rules
次のように、
⑥のEMRクラスタ終了が常に実行されるように、
trigger_rule='all_done'を指定します。
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
trigger_rule='all_done',
)
これで処理②〜⑤の成功・失敗に関わらず、
⑥のEMRクラスタ終了が実行されます。
但しこのままでは、処理が失敗した場合も、
DAG全体が成功ステータスとなってしまうため、
末尾に、処理結果を反映させるためのDummyOperatorを追加します。
cluster_remover = EmrTerminateJobFlowOperator(
task_id='remove_cluster',
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
trigger_rule='all_done',
)
terminate_status = DummyOperator(
task_id='terminate_status',
)
cluster_creator >> step_adder1 >> step_checker1 >> step_adder2 >> step_checker2 >> cluster_remover >> terminate_status
step_checker2 >> terminate_status
このようにすると、
Step1が失敗した場合は、次の図のような状態になります。
途中から再実行するケースの課題
途中で失敗するケースの課題に対応すると、
今度は途中から再実行をしたくなります。
ところが、
Step1(②③)が成功、Step2(④⑤)が失敗した場合に、
Step2を(④から)再実行しようとしても、
EMRクラスタが起動していないので実行出来ません。
この問題に対処するために、
EmrAddStepsOperatorを拡張して、
クラスタが起動していない場合は、起動するように変更します。
以下のコードで2つのOperatorを定義します。
- EmrAddStepsPrepareClusterOperator: Step追加時、EMRクラスタが無ければ起動
- EmrTerminateJobFlowByNameOperator: EMRクラスタを名称指定で終了
from airflow.models import BaseOperator
from airflow.exceptions import AirflowException
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.emr_hook import EmrHook
class EmrAddStepsPrepareClusterOperator(BaseOperator):
template_fields = ['steps', 'job_flow_overrides']
template_ext = ()
ui_color = '#f9c915'
@apply_defaults
def __init__(
self,
steps,
job_flow_overrides,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
region_name=None,
*args, **kwargs):
super(EmrAddStepsPrepareClusterOperator, self).__init__(*args, **kwargs)
self.steps = steps
self.job_flow_overrides = job_flow_overrides
self.aws_conn_id = aws_conn_id
self.emr_conn_id = emr_conn_id
self.region_name = region_name
def execute(self, context):
emr_hook = EmrHook(aws_conn_id=self.aws_conn_id,
emr_conn_id=self.emr_conn_id,
region_name=self.region_name)
emr = emr_hook.get_conn()
# get emr cluster if exists
job_flow_name = self.job_flow_overrides['Name']
job_flow_id = emr_hook.get_cluster_id_by_name(
job_flow_name,
cluster_states=['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING']
)
# create emr cluster if not exists
if job_flow_id is None:
response = emr_hook.create_job_flow(self.job_flow_overrides)
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('JobFlow creation failed: %s' % response)
else:
job_flow_id = response['JobFlowId']
self.log.info('JobFlow with id %s created', job_flow_id)
# add step to emr cluster
self.log.info('Adding steps to %s', job_flow_id)
response = emr.add_job_flow_steps(JobFlowId=job_flow_id, Steps=self.steps)
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException('Adding steps failed: %s' % response)
else:
self.log.info('Steps %s added to JobFlow', response['StepIds'])
return [job_flow_id, response['StepIds'][0]]
class EmrTerminateJobFlowByNameOperator(BaseOperator):
template_fields = ['job_flow_name']
template_ext = ()
ui_color = '#f9c915'
@apply_defaults
def __init__(
self,
job_flow_name,
aws_conn_id='aws_default',
*args, **kwargs):
super(EmrTerminateJobFlowByNameOperator, self).__init__(*args, **kwargs)
self.job_flow_name = job_flow_name
self.aws_conn_id = aws_conn_id
def execute(self, context):
emr_hook = EmrHook(aws_conn_id=self.aws_conn_id)
emr = emr_hook.get_conn()
job_flow_id = emr_hook.get_cluster_id_by_name(
self.job_flow_name,
cluster_states=['STARTING', 'BOOTSTRAPPING', 'RUNNING', 'WAITING']
)
if job_flow_id is None:
return
self.log.info('Terminating JobFlow %s', job_flow_id)
response = emr.terminate_job_flows(JobFlowIds=[job_flow_id])
if not response['ResponseMetadata']['HTTPStatusCode'] == 200:
raise AirflowException(
'JobFlow termination failed: %s' % response)
else:
self.log.info('JobFlow with id %s terminated', job_flow_id)
DAGも前述のOperatorを使うように修正します。
# cluster_creatorは不要になります。
step_adder1 = EmrAddStepsPrepareClusterOperator(
task_id='add_steps1',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
steps=SPARK_STEPS,
)
step_checker1 = EmrStepSensor(
task_id='watch_step1',
job_flow_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[0] }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps', key='return_value')[1] }}",
aws_conn_id='aws_default',
)
step_adder2 = EmrAddStepsPrepareClusterOperator(
task_id='add_steps2',
job_flow_overrides=JOB_FLOW_OVERRIDES,
aws_conn_id='aws_default',
emr_conn_id='emr_default',
steps=SPARK_STEPS,
)
step_checker2 = EmrStepSensor(
task_id='watch_step2',
job_flow_id="{{ task_instance.xcom_pull(task_ids='add_steps2', key='return_value')[0] }}",
step_id="{{ task_instance.xcom_pull(task_ids='add_steps2', key='return_value')[1] }}",
aws_conn_id='aws_default',
)
cluster_remover = EmrTerminateJobFlowByNameOperator(
task_id='remove_cluster',
job_flow_name=JOB_FLOW_OVERRIDES['Name'],
aws_conn_id='aws_default',
trigger_rule='all_done',
)
terminate_status = DummyOperator(
task_id='terminate_status',
)
step_adder1 >> step_checker1 >> step_adder2 >> step_checker2 >> cluster_remover >> terminate_status
step_checker2 >> terminate_status
backfillの時など、並行してDAGが実行された時のために、
EMRクラスタ名に、execution_dateを入れておくと良いと思います。
JOB_FLOW_OVERRIDES = {
'Name': 'PiCalc-{{ execution_date.strftime("%Y%m%dT%H%M%S%f") }}',
'ReleaseLabel': 'emr-5.29.0',
※省略※
以上の対応で、
AirflowからEMRを利用する時に、途中のStepから再実行出来るようになりました。
この方法でも、Step追加が並行して実行されるDAGは上手く処理出来ないことに注意して下さい。
# 並行して実行させたいStepに対して、
# 別名でEMRクラスタを起動するようにDAGを組む必要があります