【发布时间】:2018-08-16 21:12:36
【问题描述】:
我们正在使用 Airflow:1.10.0,经过一些分析,为什么我们的一些 ETL 流程需要这么长时间,我们看到子代码使用 SequentialExecutor 而不是使用 BaseExecutor 或者当我们配置 @987654325 @。
我想知道这是 Airflow 的错误还是预期行为。有一些并行执行任务的能力没有任何意义,但在某些特定类型的任务中,这种能力会丢失。
【问题讨论】:
我们正在使用 Airflow:1.10.0,经过一些分析,为什么我们的一些 ETL 流程需要这么长时间,我们看到子代码使用 SequentialExecutor 而不是使用 BaseExecutor 或者当我们配置 @987654325 @。
我想知道这是 Airflow 的错误还是预期行为。有一些并行执行任务的能力没有任何意义,但在某些特定类型的任务中,这种能力会丢失。
【问题讨论】:
在 subdags 中使用 SequentialExecutor 是一种典型的模式,其想法是您经常执行许多类似的相关任务,并且不一定希望通过添加到 celery 中的队列等来增加开销。请参阅子标签的 Airflow 文档中的“其他提示”部分:https://airflow.apache.org/concepts.html#subdags
默认情况下,子标签设置为使用顺序执行器(请参阅:https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/operators/subdag_operator.py#L38),但您可以更改它。
要使用 celery 执行器,请在您的 subdag 创建中添加以下内容:
from airflow.executors.celery_executor import CeleryExecutor
mysubdag = SubDagOperator(
executor=CeleryExecutor()
...
)
【讨论】:
KubernetesExecutor 的行为是否相同?是否同样适用?
可能有点晚了,但实施 LocalExecutor 对我有用。
from airflow.executors.local_executor import LocalExecutor
subdag = SubDagOperator(
task_id=task_id,
default_args=default_args,
executor= LocalExecutor(),
dag=dag
)
【讨论】: