基于on_retry_callback参数的解决方案
编辑:我将此解决方案用于 Airflow 版本 2.0.1。正如@obayram 所述,clear_task_instances 中的activate_dag_runs 参数在2.1.1 版本中已弃用。
您可以将内置模块airflow.models.taskinstance 中的clear_task_instances 函数与运算符中的on_retry_callback 参数结合起来,以在当前状态下重试最后n 个任务任务失败。
您可以简单地将以下 python 代码添加到您的 DAG 文件中:
from airflow.models.taskinstance import clear_task_instances
from airflow.utils.db import provide_session
@provide_session
def retry_upstream_tasks(context, session = None, adr = False):
task_ids_to_retry = []
j, a_task = 0, context['task']
while j < context['params']['retry_upstream_depth']:
num_upstream_tasks = len(a_task.upstream_task_ids)
if num_upstream_tasks != 1:
raise ValueError(f'The # of upstream tasks of "{a_task}" must be 1, but "{num_upstream_tasks}"')
upstream_task_id = list(a_task.upstream_task_ids)[0]
task_ids_to_retry.append(upstream_task_id)
upstream_task = [t for t in context['dag'].tasks if t.task_id == upstream_task_id][0]
a_task = upstream_task
j += 1
all_task_ids_to_instances = {t_ins.task_id: t_ins for t_ins in context['dag_run'].get_task_instances()}
task_instances_to_retry = [all_task_ids_to_instances[tid] for tid in task_ids_to_retry[::-1]]
clear_task_instances(tis = task_instances_to_retry, session = session, activate_dag_runs = adr, dag = context['dag'])
task_depends_on_previous_tasks = ANY_OPERATOR( # You can apply this to any operator.
task_id='task_depends_on_previous_tasks',
...
on_retry_callback=retry_upstream_tasks,
retries=3,
params={'retry_upstream_depth': 2} # You can change the depth
)
将{'retry_upstream_depth': n} 值传递给任务运算符的params 参数。您可以更改 n 来控制在当前任务之前要重试多少个任务。
例如
你的任务顺序如下:
task_1 >> task_2 >> task_depends_on_previous_tasks
当task_depends_on_previous_tasks 失败时,您想按顺序重试task_1 和task_2。
那么,你应该将retry_upstream_depth设置为2。
重要提示