【问题标题】:Dataflow fails when combining multiple side inputs in streaming pipeline在流式管道中组合多个侧输入时数据流失败
【发布时间】:2020-02-20 17:21:24
【问题描述】:

我已经使用 Python SDK (Apache Beam Python 3.7 SDK 2.19.0) 构建了一个窗口化流数据流管道。初始数据的表示是:

| Phone Number | Call length |
|--------------|-------------|
| 1234         | 6           |
| 1234         | 2           |
| 5678         | 5           |

这个想法是为给定窗口的每一行中的号码找到电话的平均长度。数据作为来自 Pub/Sub 的 CSV 行读入,我将一个值添加到与数字的平均调用长度相对应的所有行:

| Phone Number | Call length | mean call length |
|--------------|-------------|------------------|
| 1234         | 6           | 4                |
| 1234         | 2           | 4                |
| 5678         | 5           | 5                |

我使用以下管道:

    with beam.Pipeline(options=pipeline_options) as pipeline:
        calls = (pipeline
             | 'Read PubSub Messages' >> beam.io.ReadFromPubSub(subscription=input_sub)
             | 'byte_to_string' >> beam.Map(lambda x: x.decode("utf-8"))
             | 'windows' >> beam.WindowInto(window.FixedWindows(10))   
            )

        mean_call_length = (calls
             | 'call_length_for_number' >> beam.ParDo(get_list_of_pairs_of_tuples(),'number','call_length')
             |  'mean_call_length_per_number' >> beam.combiners.Mean.PerKey()
            )

        recombine = (calls
              | 'Create dictionary from raw string' >> beam.ParDo(SplitToDict())
              |  'Add mean' >> beam.FlatMap(combine_calcs,pvalue.AsList(mean_call_length))
              | 'encode to bytes' >> beam.Map(lambda x: str(x).encode())
              | 'write to output topic' >> beam.io.WriteToPubSub(topic=output_topic)
            )

这在本地(使用 DirectRunner)可以正常工作,但在 GCP(DataflowRunner)中运行时会失败。当我只计算数字频率或平均通话长度的 1 时,它似乎也能正常工作。

我可以在 Dataflow 日志中看到一个 java 异常,其中包含:

Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException 

这看起来像是与流式传输相关的文件结束异常。

管道在此处的 Dataflow 中可视化:

有什么想法吗?

【问题讨论】:

  • 你在DataflowRunner上运行时指定了哪些参数?
  • 和往常一样:--runner=DataflowRunner,然后是所需的项目、staging_location、temp_location。我还指定了一个带有一些包的 setup.py 文件。

标签: python google-cloud-dataflow apache-beam


【解决方案1】:

我通过更改管道将平均计算的结果转换为整数来解决这个问题:

...
mean_call_length = (calls
             | 'call_length_for_number' >> beam.ParDo(get_list_of_pairs_of_tuples(),'number','call_length')
             |  'mean_call_length_per_number' >> beam.combiners.Mean.PerKey())
             | 'convert_mean_to_int' >> beam.Map(lambda elem: (elem[0],int(elem[1])))
...

Python SDK 和底层 Java 代码之间似乎存在一些打字问题; Java 代码似乎期望 element[1] 低于一定的字节数,如果您通过 Python SDK 提交浮点数,则会超过该字节数。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-07-11
    • 1970-01-01
    • 1970-01-01
    • 2015-08-31
    • 1970-01-01
    相关资源
    最近更新 更多