【问题标题】:Composer/Airflow struggles running Dataflow jobs on preemptible VMsComposer/Airflow 难以在抢占式虚拟机上运行 Dataflow 作业
【发布时间】:2020-11-24 17:31:18
【问题描述】:

我在 GCP 中有一个 Composer/Airflow 实例(版本 composer-1.12.2-airflow-1.10.10),它可以可接受地运行其当前作业(大约 40 个 DAG,每个 DAG 有 6 个 Dataflow 作业,每个有 18 个其他较短的任务达格)。 Dataflow 作业从 Dataflow 模板(普通模板,不是 flex 模板)运行,通常需要大约 20 分钟才能完成,部分并行。我用pythonDataflowTemplatedJobStartOperator开始他们

如果我使用数据流选项--flexRSGoal=COST_OPTIMIZED,实例会阻塞。充其量,它设法在 CPU 使用率高的情况下,每分钟安排一个 Dataflow 作业在前 20 分钟内开始。这会导致任务累积,使其速度更慢,直到几乎停止调度。

flexRSGoal 设置是工作设置和问题设置之间的唯一区别。

我预计 DataflowTemplatedJobStartOperator 不能正确支持 Dataflow 作业在开始前一段时间处于“排队”状态的情况 - 或者我需要设置/调整一些其他参数才能使其正常工作。有人有想法吗?谢谢。

【问题讨论】:

  • 您使用的是什么 Airflow 版本? DataflowTemplatedJobStartOperator 运算符的链接似乎与 composer (1.10.6, 1.10.9, 1.10.10,1.10.12) 中当前支持的 Airflow 版本不对应
  • 啊,现在是 1.10.10。有没有更好的算子可以用?
  • 我认为使用DataflowTemplateOperator 是一个好方法;但是,由于代码尚未准备好处理 QUEUED 状态,因此需要使用猴子补丁。在下面查看我的答案,希望对您有所帮助!

标签: google-cloud-dataflow airflow-scheduler google-cloud-composer airflow


【解决方案1】:

Airflow 版本 1.10.0 无法将 QUEUED 识别为 Dataflow 的可能状态。按照模板算子的执行路径可以看到:

  • 调用Here下划线DataflowHook启动模板
  • start_template_dataflow() 方法 calls 一个“私有”_start_template_dataflow() 方法
  • _start_template_dataflow() 内部,_DataflowJob 类中有一个callwait_for_done() 方法
  • 最后,在 wait_for_done() method 中,我们可以看到有一个 if/else 块处理未考虑 QUEUED 的预期作业状态

Composer 支持的其他 Airflow 版本也是如此,即 1.10.6、1.10.9 甚至最新的 1.10.12。

作为一种解决方法,我的建议是使用 monkey patch 来处理 QUEUED 状态。例如,您可以将以下代码添加到您的 DAG 文件中,以在运行时替换 _DataflowJob 中的 wait_for_done() 方法:

from airflow.contrib.hooks.gcp_dataflow_hook import _DataflowJob
import time

def wait_for_done(self):
    while True:
        if self._job and 'currentState' in self._job:
            if 'JOB_STATE_DONE' == self._job['currentState']:
                return True
            elif 'JOB_STATE_RUNNING' == self._job['currentState'] and \
                'JOB_TYPE_STREAMING' == self._job['type']:
                return True
            elif 'JOB_STATE_FAILED' == self._job['currentState']:
                raise Exception("Google Cloud Dataflow job {} has failed.".format(
                self._job['name']))
            elif 'JOB_STATE_CANCELLED' == self._job['currentState']:
                raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
                self._job['name']))
            elif 'JOB_STATE_RUNNING' == self._job['currentState']:
                time.sleep(self._poll_sleep)
            elif 'JOB_STATE_PENDING' == self._job['currentState']:
                time.sleep(15)
            elif 'JOB_STATE_QUEUED' == self._job['currentState']:
                # Uncomment here the behavior desired
                # time.sleep(15) # As if QUEUED was a PENDING state
                # time.sleep(self._poll_sleep) # As if QUEUED was a RUNNING state
                # return True # As if QUEUED was a final state
            else:
                self.log.debug(str(self._job))
                raise Exception(
                    "Google Cloud Dataflow job {} was unknown state: {}".format(
                        self._job['name'], self._job['currentState']))
        else:
            time.sleep(15)

        self._job = self._get_job()

_DataflowJob.wait_for_done = wait_for_done

与原始代码的区别在于elif 语句中我们正在寻找QUEUED 状态:

    elif 'JOB_STATE_QUEUED' == self._job['currentState']:
        # Uncomment here the behavior desired
        # time.sleep(15) # As if QUEUED was a PENDING state
        # time.sleep(self._poll_sleep) # As if QUEUED was a RUNNING state
        # return True # As if QUEUED was a final state

请注意,我留下评论了处理这种状态的三种最自然的方式,即睡眠(15 秒或 poll_sleep 时间)等待作业执行并完成,或者简单地返回 true 并且不等待执行。您可以取消注释要执行的行,甚至可以在此处添加自己的逻辑。

【讨论】:

    猜你喜欢
    • 2020-02-20
    • 1970-01-01
    • 2016-10-17
    • 2022-12-20
    • 2019-05-06
    • 2020-05-19
    • 1970-01-01
    • 2019-07-27
    • 1970-01-01
    相关资源
    最近更新 更多