【问题标题】:Kafka batch consumer without duplicate recordsKafka批量消费者没有重复记录
【发布时间】:2020-06-02 21:40:13
【问题描述】:

我有以下要求,我们正在从具有 CDC 插入/更新特定表的关系数据库中读取数据,并将这些作为事件导入 Kafka 主题。

例如jdbc-source-topic

|---------------------|------------------|------------------|
|      Timestamp      |        ID        |      Column      |
|---------------------|------------------|------------------|
|        10:00        |         1        |         A        |
|---------------------|------------------|------------------|
|        10:01        |         2        |         B        |
|---------------------|------------------|------------------|
|        10:01        |         1        |         C        |
|---------------------|------------------|------------------|

在管道结束时,我们希望每天使用一次这些事件,并避免相同 ID 的重复。

例如目标主题

|---------------------|------------------|------------------|
|      Timestamp      |        ID        |      Column      |
|---------------------|------------------|------------------|
|        10:01        |         2        |         B        |
|---------------------|------------------|------------------|
|        10:01        |         1        |         C        |
|---------------------|------------------|------------------|

在我看来,最好的解决方案是让一个具有 group_id 的消费者(以便第二天将偏移量存储在 kafka 中)运行一次。但这意味着消费者每次运行时都必须从获取的记录中删除重复项。

考虑到该表将来也可能用于 KSQL 连接,我想知道它是否存在使用 KSQL 查询的更好方法,以便消费者从一个已清理的主题中获取每个键的一条记录。

【问题讨论】:

    标签: apache-kafka stream ksqldb


    【解决方案1】:

    如果此数据的唯一使用者是 ksqlDB,那么您可能不需要删除重复数据,因为如果您在 ksql 中将主题作为 TABLE 导入,ksqlDB 将正确处理对同一键的多次更新,即,而不是这样做:

    CREATE STREAM FOO (... columns ...) WITH (...);
    

    做:

    CREATE TABLE FOO (... columns ...) WITH (...);
    

    目前,当 ksqlDB 处理这样的更改日志时,它将输出所有/部分重复项,具体取决于您如何配置 cache.max.bytes.buffering

    您可以通过使用 24 小时窗口和向上更新 Suppress support 来避免发出重复的内容。在那之前,如果您想按照您的建议删除重复项。您还可以通过编写自己的 Kafka Streams 应用程序将表具体化到状态存储中并使用抑制 api 删除重复项来获得一些工作。

    但是,值得指出的是,从语义上讲,重复不会导致任何问题。将变更日志具体化为表的结果是 正确有无重复。因此,正如我在开始时所说,甚至可能不需要删除重复项。

    【讨论】:

    • 但据我所知,要从流中创建表,我需要按表达式分组,考虑到表具有相同的流列,唯一的解决方案是按每一列分组最终计数,但这不会仅在行级别删除键级别的重复项。我错过了什么吗?
    • 我建议您不要先将主题作为流导入,而是将其作为表导入,即使用 CREATE TABLE,而不是 CREATE STREAM 来导入数据。
    猜你喜欢
    • 2021-04-24
    • 2018-09-20
    • 2019-11-21
    • 1970-01-01
    • 1970-01-01
    • 2017-09-09
    • 2018-04-25
    • 1970-01-01
    • 2021-04-25
    相关资源
    最近更新 更多