【问题标题】:In airflow, on failure, is there a way to repeat a group of tasks?在气流中,在失败时,有没有办法重复一组任务?
【发布时间】:2019-08-14 17:16:19
【问题描述】:

在我的 DAG 中,我有一个这样的任务流程:

... >> EmrAddStepsOperator >> EmrStepSensor

EmrAddStepsOperator 的成功意味着“我能够告诉 EMR 开始”。 EmrStepSensor 失败意味着“EMR 任务出现问题”。我可能对这些描述有点不理解,但这与我要表达的观点无关:

如果第二个任务失败,我想重新启动第一个任务,而不是第二个。当 second 任务失败时,如何让气流重新启动 first 任务?

【问题讨论】:

  • 我认为除了编写自定义 EmrAddStepsBlockingOperator(它是 EmrAddStepsOperatorEmrStepSensor 的融合)之外,没有直接的方法可以实现这一点。见this提示:您可以考虑为此使用Python's multiple inheritance,但是当我尝试时,我遇到了this
  • @y2k-shubham 嗯,这很有趣。当我说我想要这样的东西时,我是不是在想错了?我无法想象传感器会发生故障并且您不需要 EmrAddStepsOperator 重试的场景。
  • 一个非常不整洁的解决方案是让您的 DAG 将所有失败的任务推送到外部存储,作为另一个 DAG 的输入。所以实际上,每天第二个 DAG 都会挑选在第一个 DAG 中失败的任务并重新运行它们;第二个 DAG 的结构将是动态的(这取决于第一个 DAG 中哪些/多少任务失败)。阅读UPDATE-1 部分here

标签: python airflow


【解决方案1】:

基于on_retry_callback参数的解决方案

编辑:我将此解决方案用于 Airflow 版本 2.0.1。正如@obayram 所述,clear_task_instances 中的activate_dag_runs 参数在2.1.1 版本中已弃用。

您可以将内置模块airflow.models.taskinstance 中的clear_task_instances 函数与运算符中的on_retry_callback 参数结合起来,以在当前状态下重试最后n 个任务任务失败

您可以简单地将以下 python 代码添加到您的 DAG 文件中:

from airflow.models.taskinstance import clear_task_instances
from airflow.utils.db import provide_session

@provide_session
def retry_upstream_tasks(context, session = None, adr = False):
    task_ids_to_retry = []
    j, a_task = 0, context['task']
    while j < context['params']['retry_upstream_depth']:
        num_upstream_tasks = len(a_task.upstream_task_ids)
        if num_upstream_tasks != 1:
            raise ValueError(f'The # of upstream tasks of "{a_task}" must be 1, but "{num_upstream_tasks}"')
        upstream_task_id = list(a_task.upstream_task_ids)[0]
        task_ids_to_retry.append(upstream_task_id)
        upstream_task = [t for t in context['dag'].tasks if t.task_id == upstream_task_id][0]
        a_task = upstream_task
        j += 1

    all_task_ids_to_instances = {t_ins.task_id: t_ins for t_ins in context['dag_run'].get_task_instances()}
    task_instances_to_retry = [all_task_ids_to_instances[tid] for tid in task_ids_to_retry[::-1]]

    clear_task_instances(tis = task_instances_to_retry, session = session, activate_dag_runs = adr, dag = context['dag'])

task_depends_on_previous_tasks = ANY_OPERATOR( # You can apply this to any operator.
            task_id='task_depends_on_previous_tasks',
            ...
            on_retry_callback=retry_upstream_tasks,
            retries=3,
            params={'retry_upstream_depth': 2} # You can change the depth
        )

{'retry_upstream_depth': n} 值传递给任务运算符的params 参数。您可以更改 n 来控制在当前任务之前要重试多少个任务。

例如

你的任务顺序如下:

task_1 >> task_2 >> task_depends_on_previous_tasks

task_depends_on_previous_tasks 失败时,您想按顺序重试task_1task_2

那么,你应该将retry_upstream_depth设置为2

重要提示

  • 在这种情况下,将被重试的任务(第一个/最旧的任务除外)应该只有一个上游任务,即这些任务应该在一个 直线

  • 重试次数受限于当前任务中的retries参数。因此,如果retries=3,当前任务最多可以失败 3 次,并且在每次重试中,之前的 n 个任务会被触发在当前任务被触发之前

【讨论】:

  • 这个解决方案对我有用。然而,clear_task_instances 函数的 active_dag_run 参数从 Airflow 2.1.1 开始被贬值,所以我省略了这个参数。
  • 这个解决方案非常适合我!
【解决方案2】:

在 Airflow 1 上,我在 PythonOperator 中使用了运算符和传感器。

基本上,从操作员返回的所有数据都会发送到 xcom。您可以通过调用特定 Operator 的 execute 方法从上一个任务中获取信息并将其设置在变量中。 稍后使用您需要的数据运行您的传感器,如果传感器失败,它将生成气流异常,PythonOperator 将再次尝试,因为它已被参数化。

例子:

def python_emr_job(emr):

    job_flow_creator = EmrCreateJobFlowOperator(
        task_id='emr_create_job',
        job_flow_overrides=emr,
        aws_conn_id='...',
        emr_conn_id='...'
    )

    job_flow_id = job_flow_creator.execute(dict())

    job_sensor = EmrJobFlowSensor(
        task_id='emr_job_sensor',
        job_flow_id=job_flow_id,
        aws_conn_id='...'
    )

    job_sensor.execute(dict())

with DAG(...) as dag:

    emr_confg = {...}

    emr_task = PythonOperator(
        task_id='emr_task',
        python_callable=python_emr_job,
        op_kwargs={'emr': emr_confg},
        retries=3
    )

【讨论】:

    【解决方案3】:

    我猜你可以很容易地把这两个运算符放在一个 subdag 运算符中,并在 subdag 运算符上设置重试参数。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-03-30
      • 2019-12-02
      • 2017-08-24
      相关资源
      最近更新 更多