【问题标题】:Composer - Dataflow hook crashComposer - 数据流挂钩崩溃
【发布时间】:2018-05-25 08:07:51
【问题描述】:

我正在 Airflow 中创建一个 Hourly 任务来安排数据流作业,但是 Airflow 库提供的挂钩在大多数情况下会在数据流作业实际成功时崩溃。

[2018-05-25 07:05:03,523] {base_task_runner.py:98} INFO - Subtask: [2018-05-25 07:05:03,439] {gcp_dataflow_hook.py:109} WARNING -   super(GcsIO, cls).__new__(cls, storage_client))
[2018-05-25 07:05:03,721] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-05-25 07:05:03,725] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-05-25 07:05:03,726] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-05-25 07:05:03,729] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/bin/cli.py", line 392, in run
[2018-05-25 07:05:03,729] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-05-25 07:05:03,731] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/utils/db.py", line 50, in wrapper
[2018-05-25 07:05:03,732] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-05-25 07:05:03,734] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/models.py", line 1492, in _run_raw_task
[2018-05-25 07:05:03,738] {base_task_runner.py:98} INFO - Subtask:     result = task_copy.execute(context=context)
[2018-05-25 07:05:03,740] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/contrib/operators/dataflow_operator.py", line 313, in execute
[2018-05-25 07:05:03,746] {base_task_runner.py:98} INFO - Subtask:     self.py_file, self.py_options)
[2018-05-25 07:05:03,748] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 188, in start_python_dataflow
[2018-05-25 07:05:03,751] {base_task_runner.py:98} INFO - Subtask:     label_formatter)
[2018-05-25 07:05:03,753] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 158, in _start_dataflow
[2018-05-25 07:05:03,756] {base_task_runner.py:98} INFO - Subtask:     _Dataflow(cmd).wait_for_done()
[2018-05-25 07:05:03,757] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 129, in wait_for_done
[2018-05-25 07:05:03,759] {base_task_runner.py:98} INFO - Subtask:     line = self._line(fd)
[2018-05-25 07:05:03,761] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python2.7/site-packages/airflow/contrib/hooks/gcp_dataflow_hook.py", line 110, in _line
[2018-05-25 07:05:03,763] {base_task_runner.py:98} INFO - Subtask:     line = lines[-1][:-1]
[2018-05-25 07:05:03,766] {base_task_runner.py:98} INFO - Subtask: IndexError: list index out of range

我在Airflow github repo 中查找该文件,并且行错误不匹配,这让我认为来自 Cloud Composer 的实际 Airflow 实例已过时。有什么办法更新吗?

【问题讨论】:

    标签: google-cloud-dataflow airflow google-cloud-composer


    【解决方案1】:

    这将在 1.10 或 2.0 中解决。

    看看这个 PR https://github.com/apache/incubator-airflow/pull/3165

    这已被合并到 master。您可以使用此 PR 代码并创建自己的插件。

    【讨论】:

      猜你喜欢
      • 2012-08-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-04-24
      相关资源
      最近更新 更多