【问题标题】:Apache Beam Dataflow: 'NoneType' object has no attribute 'parts'Apache Beam 数据流:“NoneType”对象没有属性“部件”
【发布时间】:2017-11-28 19:03:25
【问题描述】:

我正在尝试编写一个管道以从 pubsub 读取流并将其写入 bigquery,使用带有 apache 光束的谷歌云数据流。 我有这个代码:

import apache_beam as beam
from apache_beam.transforms.window import FixedWindows

topic = 'projects/???/topics/???'
table = '???.???'

gcs_path = "gs://???"

with beam.Pipeline(runner="DataflowRunner", argv=[
        "--project", "???",
        "--staging_location", ("%s/staging_location" % gcs_path),
        "--temp_location", ("%s/temp" % gcs_path),
        "--output", ("%s/output" % gcs_path)
    ]) as p:
    (p 
    | 'winderow' >> beam.WindowInto(FixedWindows(60))
    | 'hello' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic) 
    | 'hello2' >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table))
    )
    p.run().wait_until_finish()

但是我在运行时遇到了这个错误:

No handlers could be found for logger "oauth2client.contrib.multistore_file"
ERROR:root:Error while visiting winderow
Traceback (most recent call last):
  File ".\main.py", line 20, in <module>
    p.run().wait_until_finish()
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 339, in run
    return self.runner.run(self)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 296, in run
    super(DataflowRunner, self).run(pipeline)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 138, in run
    pipeline.visit(RunVisitor(self))
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 367, in visit
    self._root_transform().visit(visitor, self, visited)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 710, in visit
    part.visit(visitor, pipeline, visited)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\pipeline.py", line 713, in visit
    visitor.visit_transform(self)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 133, in visit_transform
    self.runner.run_transform(transform_node)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 176, in run_transform
    return m(transform_node)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\dataflow\dataflow_runner.py", line 526, in run_ParDo
    input_step = self._cache.get_pvalue(transform_node.inputs[0])
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 252, in get_pvalue
    self._ensure_pvalue_has_real_producer(pvalue)
  File "C:\ProgramData\Anaconda2\lib\site-packages\apache_beam\runners\runner.py", line 226, in _ensure_pvalue_has_real_producer
    while real_producer.parts:
AttributeError: 'NoneType' object has no attribute 'parts'

这是代码或配置的问题吗? 我怎样才能让它工作?

【问题讨论】:

  • PubSub 是一个无限源,因此您应该将“--streaming”添加到您的 argv,并且您不需要 p.run().wait_until_finish() 部分,因为无限流永远不会完成

标签: google-bigquery google-cloud-platform google-cloud-dataflow apache-beam google-cloud-pubsub


【解决方案1】:

我还没有使用窗口管道的经验,但根据我从概念中的理解,窗口应该应用于您的输入数据,而不是作为管道设置。

既然如此,你的代码可能应该是:

with beam.Pipeline(runner="DataflowRunner", argv=[
        "--project", "???",
        "--staging_location", ("%s/staging_location" % gcs_path),
        "--temp_location", ("%s/temp" % gcs_path),
        "--output", ("%s/output" % gcs_path)
    ]) as p:
    (p 
    | 'hello' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic) 
    | 'winderow' >> beam.WindowInto(FixedWindows(60))
    | 'hello2' >> beam.io.Write(beam.io.gcp.bigquery.BigQuerySink(table))
    )
    p.run().wait_until_finish()

官方 repo 也有一些关于窗口操作的 samples

【讨论】:

  • 说:ValueError: PubSubPayloadSource is currently available for use only in streaming pipelines.
  • 我想知道如果你使用DirectRunner 而只是为了测试会发生什么。有用吗?
  • 这是对的,先从pubsub读取。还添加--streaming 使其成为流式管道。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-12-09
  • 2021-11-02
  • 2014-07-05
相关资源
最近更新 更多