【发布时间】:2020-02-20 17:21:24
【问题描述】:
我已经使用 Python SDK (Apache Beam Python 3.7 SDK 2.19.0) 构建了一个窗口化流数据流管道。初始数据的表示是:
| Phone Number | Call length |
|--------------|-------------|
| 1234 | 6 |
| 1234 | 2 |
| 5678 | 5 |
这个想法是为给定窗口的每一行中的号码找到电话的平均长度。数据作为来自 Pub/Sub 的 CSV 行读入,我将一个值添加到与数字的平均调用长度相对应的所有行:
| Phone Number | Call length | mean call length |
|--------------|-------------|------------------|
| 1234 | 6 | 4 |
| 1234 | 2 | 4 |
| 5678 | 5 | 5 |
我使用以下管道:
with beam.Pipeline(options=pipeline_options) as pipeline:
calls = (pipeline
| 'Read PubSub Messages' >> beam.io.ReadFromPubSub(subscription=input_sub)
| 'byte_to_string' >> beam.Map(lambda x: x.decode("utf-8"))
| 'windows' >> beam.WindowInto(window.FixedWindows(10))
)
mean_call_length = (calls
| 'call_length_for_number' >> beam.ParDo(get_list_of_pairs_of_tuples(),'number','call_length')
| 'mean_call_length_per_number' >> beam.combiners.Mean.PerKey()
)
recombine = (calls
| 'Create dictionary from raw string' >> beam.ParDo(SplitToDict())
| 'Add mean' >> beam.FlatMap(combine_calcs,pvalue.AsList(mean_call_length))
| 'encode to bytes' >> beam.Map(lambda x: str(x).encode())
| 'write to output topic' >> beam.io.WriteToPubSub(topic=output_topic)
)
这在本地(使用 DirectRunner)可以正常工作,但在 GCP(DataflowRunner)中运行时会失败。当我只计算数字频率或平均通话长度的 1 时,它似乎也能正常工作。
我可以在 Dataflow 日志中看到一个 java 异常,其中包含:
Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
这看起来像是与流式传输相关的文件结束异常。
管道在此处的 Dataflow 中可视化:
有什么想法吗?
【问题讨论】:
-
你在DataflowRunner上运行时指定了哪些参数?
-
和往常一样:--runner=DataflowRunner,然后是所需的项目、staging_location、temp_location。我还指定了一个带有一些包的 setup.py 文件。
标签: python google-cloud-dataflow apache-beam