【问题标题】:Read AVRO messages from PubSub in Dataflow Python在 Dataflow Python 中从 PubSub 读取 AVRO 消息
【发布时间】:2020-07-27 16:13:10
【问题描述】:

我需要从另一个 GCP 项目的 PubSub 主题中读取 AVRO 消息。我之前实现了 Python 数据流管道,它从 PubSub 读取 JSON 消息并写入 BigQuery。但我是处理 AVRO 消息的新手。我试图查找 AVRO 的 Python 文档,它指向此链接 https://avro.apache.org/docs/current/gettingstartedpython.html

在这个链接中有一些从文件读取和写入文件的例子,但我认为这些函数对于从 PubSub 读取没有用。我正在使用以下转换从输出为字节串的 PubSub 中读取。

"Read from PubSub" >> beam.io.ReadFromPubSub(topic=TOPIC).with_output_types(bytes)

我需要一种读取这些字节的方法(AVRO 格式)

【问题讨论】:

    标签: python-3.x google-cloud-dataflow avro google-cloud-pubsub


    【解决方案1】:

    这是一个您可以使用的示例代码

    1. 从 Pub/Sub 读取消息
    from fastavro import parse_schema, schemaless_reader
    
    messages = (p
                | beam.io.ReadFromPubSub(
                    subscription=known_args.input_subscription)
                .with_output_types(bytes))
    
    1. 使用 Fastavro 包通过类定义定义架构和读取器
    class AvroReader:
        def __init__(self, schema):
            self.schema = schema
    
        def deserialize(self, record):
            bytes_reader = io.BytesIO(record)
            dict_record = schemaless_reader(bytes_reader, self.schema)
            return dict_record
    
    1. 现在映射字节元素并指定架构
    schema = avro.schema.parse(open("avro.avsc", "rb").read())
    avroReader = AvroReader(schema)
    
    lines = messages | "decode" >> beam.Map(lambda input: avroReader.deserialize(input))
    
    

    这些行应该有PCollection,格式为Avro。

    【讨论】:

    • 感谢 Jayadeep 的回答。我对“模式”变量有疑问。我从其他团队收到了模式作为“.avsc”文件。所以在你分享的这个脚本中,我猜模式是这个文件 .avsc 文件的内容。所以我正在尝试这样,""" with open(schema_path) as fd: schema = json.load(fd) """
    • 嗨 Jayadeep,我没有得到您建议 ReadFromAvro 的最后一行。我猜这个函数是从文件中读取的。在我的情况下,记录仍将来自 PubSub 主题。
    • 谢谢。我现在将尝试创建 AVRO 消息并将它们放在 PubSub 中,以便我可以测试这个管道。目前我们依赖其他团队在这里发布消息。他们也使用过Java。如果你有发布者代码也请在这里分享,否则我会做更多的搜索。
    • 会不会是这样,records = [ {}, {}] fo = BytesIO() writer(fo, schema, records)
    猜你喜欢
    • 2020-10-16
    • 2018-01-01
    • 2019-01-04
    • 1970-01-01
    • 1970-01-01
    • 2021-04-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多