【发布时间】:2021-07-23 08:06:21
【问题描述】:
我对传输和读取流数据非常陌生,所以我希望我的问题不是太琐碎。
我正在使用梁 Python SDK 从 PubSub 读取数据,然后再将其写入其他文件。由于我收到的数据始终采用相同的格式,因此我尝试使用schema 功能来解析我从 PubSub 收到的数据。
接收到的数据始终是字典name: "my_name", value: 42,所以我的管道是这样的:
import typing
import apache_beam as beam
from apache_beam.io import ReadFromPubSub
class MySchema(typing.NamedTuple):
name: str
value: int
with beam.Pipeline() as pipeline:
pipeline | ReadFromPubSub(topic=<my_subscription>).with_output_types(MySchema)
但是,然后我收到错误 apache_beam.typehints.decorators.TypeCheckError: Output type hint violation at ReadFromPubSub: expected <class '__main__.MassificationImage'>, got <class 'bytes'>
这是有道理的,因为 PubSub 自然会获取字节:我只需将数据解析到字典中,然后它似乎就可以工作了。
with beam.Pipeline() as pipeline:
(pipeline
| ReadFromPubSub(topic=<my_subscription>)
| beam.Map(lambda x: json.loads(x.decode("utf8"))).with_output_types(MySchema)
它似乎工作正常,但不必将数据解析成字典会破坏模式的目的吗?有没有更直接的方法?
【问题讨论】:
标签: python apache-beam