【问题标题】:What value to be passed for transaction Id in Transactional Flow api Alpakka在 Transactional Flow api Alpakka 中为交易 ID 传递什么值
【发布时间】:2023-03-22 14:30:01
【问题描述】:

使用 Alpakka 我想使用 Transactional.Source Api 使用记录并使用 Transactional.flow 将其生成到另一个主题,但文档说我们需要传递 transactionId。

我应该如何为例如创建 TransactionId以下代码

```via(Transactional.flow(producerSettings, transactionalId))```

是 Alpakka 中的每个 Producer 还是 Per Producer 记录

【问题讨论】:

    标签: apache-kafka playframework transactions kafka-producer-api alpakka


    【解决方案1】:

    transactional.id 在抵御僵尸方面发挥着重要作用。但 维护一个跨生产者会话一致的标识符 并且正确地隔离僵尸有点棘手。

    正确隔离僵尸的关键是确保输入 读-写-写周期中的主题和分区始终是 对于给定的 transactional.id 相同。如果这不是真的,那么它是 某些消息可能会通过提供的围栏泄漏 交易。

    例如,在分布式流处理应用程序中,假设 topic-partition tp0 最初由 transactional.id T0 处理。 如果在稍后的某个时间点,它可以映射到另一个生产者 transactional.id T1,T0 和 T1 之间不会有围栏。所以 来自 tp0 的消息可能会被重新处理,这违反了 一次处理保证。

    实际上,要么必须存储输入之间的映射 partitions 和 transactional.ids 在外部存储中,或者有一些 它的静态编码。 Kafka Streams 选择后一种方法 解决这个问题。事务如何执行,以及如何调整它们

    Transactions in Apache Kafka

    【讨论】:

      猜你喜欢
      • 2016-02-15
      • 1970-01-01
      • 2020-05-20
      • 2018-03-01
      • 2019-12-21
      • 1970-01-01
      • 1970-01-01
      • 2013-07-01
      • 2016-08-06
      相关资源
      最近更新 更多