【问题标题】:After join the topic contains duplicate messages加入主题后包含重复消息
【发布时间】:2019-05-01 14:33:14
【问题描述】:

尝试通过消息负载中的时间戳字段连接两个流会导致输出主题中出现重复消息。这不是预期的。如何避免这种重复?

我正在使用 DSL 拓扑,它使用来自两个主题的两个流。当我为每个流做一些映射时,这会产生两个额外的主题。最后,在加入后,第五个主题填充了结果,这个主题显示了重复的消息。我检查了其他四个主题不包含重复项。我还注意到我提供给 kafka 流连接函数的函数被重复调用。这个函数已经表明发生了重复。

KStream<String, MappedOriginalSensorData> flattenedOriginalData = originalData
                .flatMap(flattenOriginalData())
                .through("mapped-original-sensor-data", Produced.with(Serdes.String(), new MappedOriginalSensorDataSerde()));

        KStream<String, MappedErrorScoreData> enrichedErrorData = errorScoreData
                .map(enrichWithModelAndAlgorithmAndReduceKey())
                .through("mapped-error-score-data-repartition", Produced.with(Serdes.String(), new MappedErrorScoreDataSerde()));


        return enrichedErrorData
                //#3. Join
                .join(flattenedOriginalData, join(),
                        JoinWindows.of(Duration.ofMillis(1).toMillis()), Joined.with(Serdes.String(), new MappedErrorScoreDataSerde(), new MappedOriginalSensorDataSerde()))
                //#4. set key
                .selectKey((k,v) -> v.getOriginalKey())
                //#5. Map removing the originalKey field)
                .mapValues(removeOriginalKeyField())
                .through("joined-data-repartition");

我希望joined-data-repartition 主题仅显示基于负载的唯一消息:

{
  "timestamp": 1556626280000,
  "errorSignal": 84.98,
  "originalSignal": 36
}
Key:
1234:a:v2:nord::TE7
Timestamp:
Apr 30th, 2019 14:11:20.00
Offset:
3629
Partition:
0

{
  "timestamp": 1556626280000,
  "errorSignal": 84.98,
  "originalSignal": 36
}
Key:
1234:a:v2:nord::TE7
Timestamp:
Apr 30th, 2019 14:11:20.00
Offset:
3628
Partition:
0

查看偏移量

【问题讨论】:

  • 您是否为您的应用程序启用了一次性处理?如果没有,您可能会看到重复。
  • 我确实尝试过使用 spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee=exactly_once 但这会导致应用程序在 60000 毫秒后由于超时异常而失败,然后它会自行关闭: 所有流线程都死了。该实例将处于错误状态,应关闭。有什么想法吗?

标签: apache-kafka-streams spring-cloud-stream


【解决方案1】:

我发现本地磁盘上的一些剩余状态存储导致了重复消息。也就是说,我用擦除 kafka 存储多次重复我的测试,但不知道本地磁盘上的状态存储。每次我重复测试(使用相同的消息)时,这些消息都会与状态存储中的内容相结合,从而导致重复消息。

【讨论】:

    猜你喜欢
    • 2019-05-20
    • 2017-07-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-19
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多