【问题标题】:Kafka-Streams - Filtering GlobalKTable before JoiningKafka-Streams - 在加入之前过滤 GlobalKTable
【发布时间】:2020-01-24 12:19:44
【问题描述】:

您能否就以下问题的解决方法给我一个建议。我有两个主题,一个是静态内容,第二个是数据流。任务是加入数据,这在正常情况下很容易。我会将静态内容读取为 GlobalKTable,将动态内容读取为 KStream,然后简单地加入它们。问题是查找数据存在于同一主题的多个版本中。 “版本”由字段“validFrom”标识。因此流的数据需要根据其时间戳与对应版本的查找数据相结合。有没有办法过滤 GlobalKTable 中的数据?

最好的问候 马丁

【问题讨论】:

    标签: join apache-kafka apache-kafka-streams


    【解决方案1】:

    您不能对GlobalKTable 本身应用过滤操作,但您可以尝试测试ValueJoiner 中记录的版本,并将未通过测试的连接结果记录的值设置为null。加入后,您可以应用过滤器过滤掉所有值为null 的记录。

    【讨论】:

    • 非常感谢,布鲁诺。听起来很有希望。明天我会试一试,如果成功了再告诉你。
    • 我已经尝试过这个方法,但我遇到了以下问题:如果我使用例如执行 leftJoin (KStream-GlobalKTable)右侧的 3 个相同的键与左侧的某个键相比,我期望获得该键的 3 个条目,但我只得到一个条目。这意味着结果获得的条目数与输入数据的左侧完全相同,尽管在右侧每个键存在 3 次。我的假设错了吗?
    • 右侧不能有三个相同的键。 GlobalKTable 具有表语义。具有相同键的所有记录都是对该键的更新,即键为k 的第一条记录将被插入到表中,键为k 的第二条记录将用它自己的值覆盖第一条记录的值,并且键为 k 的第三条记录将再次覆盖该值。这就像 SQL 中的 UPSERT。
    • 布鲁诺,谢谢你的解释。它完全解释了这种行为!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-09-05
    • 2019-12-15
    • 2015-11-13
    • 2022-12-05
    • 1970-01-01
    • 1970-01-01
    • 2020-10-05
    相关资源
    最近更新 更多