【问题标题】:Guarantee that some operators will be executed on the same airflow worker保证一些算子会在同一个气流工作者上执行
【发布时间】:2021-08-10 20:36:40
【问题描述】:

我有一个 DAG

  1. 从云存储下载 csv 文件
  2. 通过 https 将 csv 文件上传到第三方

我正在执行的气流集群默认使用CeleryExecutor,所以我担心当我扩大工作人员数量时,这些任务可能会在不同的工作人员上执行。例如。工作人员 A 下载,工作人员 B 尝试上传,但找不到文件(因为它在工作人员 A 上)

是否有可能以某种方式保证下载和上传操作符都将在同一个气流工作人员上执行?

【问题讨论】:

    标签: airflow apache-airflow


    【解决方案1】:

    将第 1 步(csv 下载)和第 2 步(csv 上传)放入一个 subdag,然后通过 SubDagOperator 触发它,并将 executor 选项设置为 SequentialExecutor - 这将确保第 1 步和2 在同一个工人上运行。

    这是一个说明该概念的工作 DAG 文件(实际操作被删除为 DummyOperators),并在一些更大的过程的上下文中包含下载/上传步骤:

    from datetime import datetime, timedelta
    from airflow.models import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.subdag_operator import SubDagOperator
    from airflow.executors.sequential_executor import SequentialExecutor
    
    PARENT_DAG_NAME='subdaggy'
    CHILD_DAG_NAME='subby'
    
    def make_sub_dag(parent_dag_name, child_dag_name, start_date, schedule_interval):
        dag = DAG(
            '%s.%s' % (parent_dag_name, child_dag_name),
            schedule_interval=schedule_interval,
            start_date=start_date
            )
    
        task_download = DummyOperator(
            task_id = 'task_download_csv',
            dag=dag
            )
    
        task_upload = DummyOperator(
            task_id = 'task_upload_csv',
            dag=dag
            )
    
        task_download >> task_upload
    
        return dag
    main_dag = DAG(
        PARENT_DAG_NAME,
        schedule_interval=None,
        start_date=datetime(2017,1,1)
    )
    
    main_task_1 = DummyOperator(
        task_id = 'main_1',
        dag = main_dag
    )
    
    main_task_2 = SubDagOperator(
        task_id = CHILD_DAG_NAME,
        subdag=make_sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, main_dag.start_date, main_dag.schedule_interval),
        executor=SequentialExecutor(),
        dag=main_dag
    )
    
    main_task_3 = DummyOperator(
        task_id = 'main_3',
        dag = main_dag
    )
    
    main_task_1 >> main_task_2 >> main_task_3
    

    【讨论】:

      【解决方案2】:

      对于这类用例,我们有两种解决方案:

      1. 使用两个共享的网络安装驱动器 工作人员,以便下载和上传任务都可以访问 到同一个文件系统
      2. 使用特定于工作人员的 Airflow queue。如果只有一个工作人员在侦听此队列,您将保证两者都可以访问同一个文件系统。请注意,每个工作人员都可以侦听多个队列,因此您可以让它侦听“默认”队列以及用于此任务的自定义队列。

      【讨论】:

      • 这两种方法都是可行的解决方法,但它们也有其缺点。使用共享存储首先要求工人可以使用这种共享存储,这可能是也可能不是一个重要的约束;即使可用,本地存储也可能具有更好的性能,具体取决于应用程序。第二种方法 - 将两个任务绑定到一个特定的队列 - 似乎放弃了在哪里可以安排任务、添加额外的工作人员等方面的一些灵活性。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-12-04
      • 2017-09-16
      • 2020-01-05
      • 2023-04-05
      • 1970-01-01
      • 1970-01-01
      • 2013-04-23
      相关资源
      最近更新 更多