【问题标题】:spring-kafka fixtured event migration implementationspring-kafka 固定事件迁移实现
【发布时间】:2022-01-20 06:15:34
【问题描述】:

我有几项服务,其中一项是事实来源 (SOT)。 Kafka 是他们的消息代理。有时我需要生成一组将在其他服务中使用和应用的事件。所谓的固定事件迁移。

我的夹具文件示例:

EntityUpdated (topicA)
- id
- relation

RelationUpdated (topicB)
- id
- relation

并且类是应用事件后在数据库中具有投影的弹簧实体。

class Entity: Model {
  id
  val relation: Relation
}

class Relation: Model {
  id
}

当前的消费者实现以任意方式读取主题,消费者可以在 topicA 之前从 topicB 读取数据,并且我遇到由于相关实体尚不存在而无法应用消息的情况。 (在 EntityUpdated 之前使用 RelationUpdated)。

我有几个想法来解决它:

  1. 暂停所有分区/主题并按指定顺序恢复。所以我可以避免案例RelationUpdated consumed before EntityUpdated。然后在恢复所有主题的所有分区后,我可以继续以任意方式工作。我不喜欢切换,但它看起来很有效。

  2. 将无法应用的消息放入所谓的死信队列,并尝试一次又一次地重播,直到它们都被应用。

也许有人会做类似的事情。我很高兴知道您的想法。

【问题讨论】:

    标签: apache-kafka spring-kafka fixtures


    【解决方案1】:

    这不是 Apache Kafka 的设计目的。更清楚地说,消息传递是关于独立性和关注点分离的。我的意思是,一个来源中的消息不应该影响其他消息,而您希望通过暂停和 DLT 做什么对其余的主题非常有影响。

    我建议你看看 Spring Integration 及其聚合器模式实现:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator。这个想法是:您的EntityRelationid 有相关性。因此,独立于谁先到达,聚合器将等待第二部分,只有在那之后才会释放它们以进行下一个处理步骤。有了这个集成解决方案,就不需要消费者暂停等任何事情,也不需要通过 DLT 进行额外的代理轮次。

    如果 Spring Integration 对您来说太复杂,您可以考虑使用本地 Map 实现自己的解决方案,以添加基于第一次到达 (computeIfAbsent()) 并返回第二次到达 (computeIfPresent()) .

    【讨论】:

    • 谢谢。聚合器和弹簧集成对我来说很有意义
    猜你喜欢
    • 1970-01-01
    • 2011-04-29
    • 2023-04-01
    • 2023-04-05
    • 1970-01-01
    • 2013-01-16
    • 2019-05-09
    • 2021-04-29
    • 1970-01-01
    相关资源
    最近更新 更多