【问题标题】:How to run same dag two times in a single run in Airflow如何在 Airflow 中一次运行两次相同的 dag
【发布时间】:2020-10-01 11:03:37
【问题描述】:

我对 Airflow 完全陌生。我有一个要求,我必须运行两个 EMR 作业。 .目前我有一个依赖于一些输入文件的 python 脚本,如果存在它会触发 EMR 作业。

我的新要求是,我将不得不输入不同的输入文件(相同类型),这两个文件将输入到 emr 作业中,在这两种情况下,火花都会做同样的事情,但只有输入文件不一样。

create_job_workflow = EmrCreateJobFlowOperator(
    task_id='some-task',
    job_flow_overrides=job_flow_args,
    aws_conn_id=aws_conn,
    emr_conn_id=emr_conn,
    dag=dag
)

我可以通过仅更改 spark-submit 中的输入文件来实现两个相同的 dag 运行,基本上每当我将执行“trigger DAG”时,它都会获取两个不同的输入文件并在两个不同的 emr 集群中触发两个不同的 emr 作业。或者你能给我一些最佳实践吗?或者如何通过改变 ma​​x_active_runs=2

【问题讨论】:

    标签: python airflow airflow-scheduler


    【解决方案1】:

    最佳做法是为其设置两个不同的任务。通过设置 max_active_runs=2,您只需将并发 dag_runs 的数量限制为 2。您可以借助任何数据结构为您的任务设置配置,对其进行迭代并根据每个属性构建任务。

    你可以做的另一件事:

    您可以接收文件名作为 dag 的有效负载 像这样访问它: context['dag_run'].conf.get('filename')

    并使用触发 dag_run 操作符重新触发相同的 dag,使用另一个文件更新所需的有效负载

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-08-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多