【问题标题】:Kafka - From JSON records to Parquet files in S3Kafka - 从 JSON 记录到 S3 中的 Parquet 文件
【发布时间】:2020-01-18 05:25:37
【问题描述】:

如果我错了,请纠正我.. parquet 文件是自我描述的,这意味着它包含其正确的架构。

我想使用 S3 sink confluent 连接器(特别是因为它可以正确处理 S3 的 Exactly Once 语义)从我们的 Kafka 读取 JSON 记录,然后在 s3 中创建 parquet 文件(按事件时间分区)。 我们的 JSON 记录没有嵌入架构。

我知道它还不支持,但我对镶木地板和 AVRO 也有一些疑问。

由于我们的 JSON 记录中没有嵌入模式,这意味着连接器任务必须从它自己的 JSON 字段中推断数据? (这是一个可行的解决方案吗?)

除了镶木地板,在 Kafka.. 中没有模式注册表之类的东西,对吗?

AVRO 似乎很好地集成到 Kafka,意味着使用模式注册表读取模式。这是否意味着融合的 S3 接收器将足够智能,可以在 s3 中创建包含模式作为标题的文件,然后在 s3 文件中创建一堆记录?

我知道那个人正在为这个 s3 接收器连接器实现镶木地板:

https://github.com/confluentinc/kafka-connect-storage-cloud/pull/172

但我不明白,它似乎在代码中使用了 AVRO 模式,这是否意味着 Kafka 中有 AVRO 记录来使用这个 Parquet 实现?

我开始认为在 S3 上定位 AVRO 文件会更容易(我可以通过失去一些 OLAP 功能来负担它),但在使用 AVRO 之前想确定一下。

问候,

亚尼克

【问题讨论】:

    标签: json apache-kafka parquet apache-kafka-connect


    【解决方案1】:

    如果我错了,请纠正我。镶木地板文件是自我描述的,意味着它包含正确的架构

    正确。如果您有 parquet 文件,则可以从中获取架构。

    How do I get schema / column names from parquet file?

    在 s3 中创建包含架构作为标题的文件,然后在 s3 文件中创建一堆记录?

    是的,这正是 S3 连接器对 Avro 文件的工作方式。

    它似乎在代码中使用了 AVRO 模式,这是否意味着 Kafka 中有 AVRO 记录来使用这个 Parquet 实现?

    我没有对 PR 进行过深入研究,但我认为 Parquet 存储格式只需要一个 Connect Schema,而不需要 Avro 数据,因为使用 AvroData 类,可以在 Connect Schemas 和 Avro 模式之间来回转换,例如avroData.fromConnectSchema(schema)。这会解析 Connect Schema 结构并形成一个新的 Avro 模式,并且不适用于 Registry 或要求输入数据是 Avro。

    话虽如此,如果您的 JSON 对象确实具有架构,那么可能可以使用其他 JSONFormat 选项来编写它们,因为 format.class 设置在转换器之后应用。有趣的是,我知道我可以使用 AvroConverter + JSONFormat 将 Avro 输入记录写为 JSON 文件,但我没有尝试使用带有 AvroFormat 的 JSONConverter + schema'd JSON,所以 YMMV

    我开始认为在 S3 上定位 AVRO 文件会更容易

    可能...注意,您可以改用 Secor,它具有 Hive 表集成和 Parquet support for JSON

    【讨论】:

    • 感谢您的回答部分,但我仍然不知道镶木地板模式在 Kafka 中的位置。它是否应该出现在构成镶木地板文件的每条记录中......?
    • 镶木地板模式不在 Kafka 中。它是 Schema + Avro 转换后上传文件的一部分。 github.com/confluentinc/kafka-connect-storage-cloud/pull/172/…
    • 谢谢,但是根据我的阅读,架构当前位于记录值 ``` schema = record.valueSchema();``` 中,据我了解,此代码假定架构不会改变(因为它不检查每个记录中的模式)。它还假设模式可以在任何记录值中,就像每条记录都包含模式..?上传的文件是什么意思?
    • 为什么valueSchema() 不改变?如果您添加了新的可为空字段,则架构会更改。这只会关闭当前的文件句柄,写入一个文件,然后打开一个具有所有新记录的下一个模式的新文件。这就是 backwards.compatibility 设置的用途。上传的文件是指发送到 S3 的文件。
    猜你喜欢
    • 1970-01-01
    • 2022-07-07
    • 2017-10-08
    • 1970-01-01
    • 2020-03-27
    • 1970-01-01
    • 2020-12-19
    • 2022-10-14
    • 2021-10-06
    相关资源
    最近更新 更多