【问题标题】:Apache beam read schema from pubsub来自 pubsub 的 Apache Beam 读取模式
【发布时间】:2021-07-23 08:06:21
【问题描述】:

我对传输和读取流数据非常陌生,所以我希望我的问题不是太琐碎。

我正在使用梁 Python SDK 从 PubSub 读取数据,然后再将其写入其他文件。由于我收到的数据始终采用相同的格式,因此我尝试使用schema 功能来解析我从 PubSub 收到的数据。

接收到的数据始终是字典name: "my_name", value: 42,所以我的管道是这样的:

import typing

import apache_beam as beam
from apache_beam.io import ReadFromPubSub


class MySchema(typing.NamedTuple):
    name: str
    value: int

with beam.Pipeline() as pipeline:
    pipeline | ReadFromPubSub(topic=<my_subscription>).with_output_types(MySchema)

但是,然后我收到错误 apache_beam.typehints.decorators.TypeCheckError: Output type hint violation at ReadFromPubSub: expected &lt;class '__main__.MassificationImage'&gt;, got &lt;class 'bytes'&gt;

这是有道理的,因为 PubSub 自然会获取字节:我只需将数据解析到字典中,然后它似乎就可以工作了。

with beam.Pipeline() as pipeline:
    (pipeline 
        | ReadFromPubSub(topic=<my_subscription>)
        | beam.Map(lambda x: json.loads(x.decode("utf8"))).with_output_types(MySchema)

它似乎工作正常,但不必将数据解析成字典会破坏模式的目的吗?有没有更直接的方法?

【问题讨论】:

    标签: python apache-beam


    【解决方案1】:

    https://beam.apache.org/documentation/programming-guide/#schemas 描述了 Beam 模式的用途。假设是

    通常,正在处理的记录类型具有明显的结构。常见的 Beam 源产生 JSON、Avro、Protocol Buffer 或数据库行对象;所有这些类型都有明确定义的结构...

    而且模式使进一步的转换更容易。

    这里的混淆在于 Pub/Sub 作为来源不直接提供结构化数据。或者至少 Pub/Sub 的 Beam IO 读取字节。

    Pub/Sub 确实支持架构:https://cloud.google.com/pubsub/docs/schemas。您可以创建一个新的 IO 来合并字节读取和模式解析。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2021-01-09
      • 1970-01-01
      • 1970-01-01
      • 2022-12-24
      • 2019-12-21
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多