【问题标题】:CeleryExecutor in Airflow are not parallelizing tasks in a subdagAirflow 中的 CeleryExecutor 没有并行化 subdag 中的任务
【发布时间】:2018-08-16 21:12:36
【问题描述】:

我们正在使用 Airflow:1.10.0,经过一些分析,为什么我们的一些 ETL 流程需要这么长时间,我们看到子代码使用 SequentialExecutor 而不是使用 BaseExecutor 或者当我们配置 @987654325 @。

我想知道这是 Airflow 的错误还是预期行为。有一些并行执行任务的能力没有任何意义,但在某些特定类型的任务中,这种能力会丢失。

【问题讨论】:

    标签: airflow airflow-scheduler


    【解决方案1】:

    在 subdags 中使用 SequentialExecutor 是一种典型的模式,其想法是您经常执行许多类似的相关任务,并且不一定希望通过添加到 celery 中的队列等来增加开销。请参阅子标签的 Airflow 文档中的“其他提示”部分:https://airflow.apache.org/concepts.html#subdags

    默认情况下,子标签设置为使用顺序执行器(请参阅:https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38),但您可以更改它。

    要使用 celery 执行器,请在您的 subdag 创建中添加以下内容:

    from airflow.executors.celery_executor import CeleryExecutor
    mysubdag = SubDagOperator(
        executor=CeleryExecutor()
        ...
    )
    

    【讨论】:

    • KubernetesExecutor 的行为是否相同?是否同样适用?
    【解决方案2】:

    可能有点晚了,但实施 LocalExecutor 对我有用。

    from airflow.executors.local_executor import LocalExecutor
    
    subdag = SubDagOperator(
      task_id=task_id,
      default_args=default_args,
      executor= LocalExecutor(),
      dag=dag
    )
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-06-21
      • 1970-01-01
      • 2021-09-19
      • 1970-01-01
      相关资源
      最近更新 更多