【问题标题】:Not able to GET the state of Dataflow job in Airflow无法在 Airflow 中获取 Dataflow 作业的状态
【发布时间】:2020-04-29 11:01:05
【问题描述】:

最近我将 Apache Beam 升级到 2.20.0,然后我的 Dataflow 作业的 Airflow 任务开始失败,尽管数据流作业本身成功了。

我注意到升级后,用于数据流的 GET API 在 URL 中使用位置而不是作业 ID

GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/us-central1?alt=json

理想情况下,URL 应该是这样的

GET https://dataflow.googleapis.com/v1b3/projects/umg-de/locations/us-central1/jobs/{job_id}?alt=json

有人可以解释为什么会这样吗?

【问题讨论】:

  • 那么响应中的确切错误消息或代码是什么?
  • @tank 我已经编辑了我的答案以包括我对解决方法的实施。看看吧,可能会有帮助。

标签: python-3.x airflow google-cloud-dataflow


【解决方案1】:

这是一个已知的 Airflow issuefix 已合并到 Airflow master 但尚未发布。

作为一种解决方法,您可以使用 Apache Beam SDK 2.19.0 或在自定义挂钩中实施修复(与 dataflow_hook.py 相同,但应用了建议的 change),然后实施使用此挂钩的自定义运算符。 这是我的做法:

首先,我创建了一个名为my_dataflow_hook.py的文件:

import re

from airflow.contrib.hooks.gcp_dataflow_hook import DataFlowHook, _Dataflow, _DataflowJob
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook


class _myDataflow(_Dataflow):
    @staticmethod
    def _extract_job(line):
        job_id_pattern = re.compile(
            br".*console.cloud.google.com/dataflow.*/jobs/.*/([a-z|0-9|A-Z|\-|\_]+).*")
        matched_job = job_id_pattern.search(line or '')
        if matched_job:
            return matched_job.group(1).decode()


class MyDataFlowHook(DataFlowHook):
    @GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
    def _start_dataflow(self, variables, name, command_prefix, label_formatter):
        variables = self._set_variables(variables)
        cmd = command_prefix + self._build_cmd(variables, label_formatter)
        job_id = _myDataflow(cmd).wait_for_done()
        _DataflowJob(self.get_conn(), variables['project'], name,
                     variables['region'],
                     self.poll_sleep, job_id,
                     self.num_retries).wait_for_done()

然后,我创建了一个名为my_dataflow_python.py的文件:

import re

from airflow.contrib.operators.dataflow_operator import DataFlowPythonOperator, GoogleCloudBucketHelper
from hooks.my_dataflow_hook import MyDataFlowHook
from airflow.plugins_manager import AirflowPlugin


class MyDataFlowPythonOperator(DataFlowPythonOperator):
    def execute(self, context):
        """Execute the python dataflow job."""
        bucket_helper = GoogleCloudBucketHelper(
            self.gcp_conn_id, self.delegate_to)
        self.py_file = bucket_helper.google_cloud_to_local(self.py_file)
        hook = MyDataFlowHook(gcp_conn_id=self.gcp_conn_id,
                            delegate_to=self.delegate_to,
                            poll_sleep=self.poll_sleep)
        dataflow_options = self.dataflow_default_options.copy()
        dataflow_options.update(self.options)
        # Convert argument names from lowerCamelCase to snake case.
        camel_to_snake = lambda name: re.sub(
            r'[A-Z]', lambda x: '_' + x.group(0).lower(), name)
        formatted_options = {camel_to_snake(key): dataflow_options[key]
                             for key in dataflow_options}
        hook.start_python_dataflow(
            self.job_name, formatted_options,
            self.py_file, self.py_options)

class MyDataFlowPlugin(AirflowPlugin):
    """Expose Airflow operators."""

    name = 'dataflow_fix_plugin'
    operators = [MyDataFlowPythonOperator]

最后,我按照这个结构将这些文件上传到 Composer 环境的桶中:

├── dags
│   └── my_dag.py
└── plugins
    ├── hooks
    │   └── my_dataflow_hook.py
    └── my_dataflow_python.py

现在,我可以在我的 DAG 中使用 MyDataFlowPythonOperator 创建任务:

from airflow import DAG
from airflow.operators.dataflow_fix_plugin import MyDataFlowPythonOperator
...
with DAG("df-python-test", default_args=default_args) as dag:
    test_task = MyDataFlowPythonOperator(dag=dag, task_id="df-python", py_file=PY_FILE, job_name=JOB_NAME)

【讨论】:

    猜你喜欢
    • 2019-03-07
    • 1970-01-01
    • 1970-01-01
    • 2017-03-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多