【问题标题】:Kafka Transaction in case multi ThreadingKafka Transaction 以防多线程
【发布时间】:2020-12-17 12:13:25
【问题描述】:

我正在尝试在 trasnsaction 中创建 kafka 生产者,即如果有人失败,我想写一组 msg,我想回滚所有 msg。

kafkaProducer.beginTransaction();
try
{
    // code to produce to kafka topic
}
catch(Exception e)
{
    kafkaProducer.abortTransaction();
}
kafkaProducer.commitTransaction();

问题是上面的单线程工作正常,但是当多个线程写入时会抛出异常

尝试从状态 IN_TRANSITION 到 IN_TRANSITION 的无效事务

在调试时我发现如果 thread1 事务正在进行并且 thread2 也说beingTransaction 它会抛出这个异常。如果如何解决此问题,我找不到什么。我能找到的一件可能的事情是创建一个农产品池。

是否有任何适用于 kafka 生产者池的 API,否则我将不得不创建自己的。

以下是 jira 已经为此报告的改进。 https://issues.apache.org/jira/browse/KAFKA-6278

任何其他建议都会很有帮助

【问题讨论】:

    标签: apache-kafka kafka-producer-api


    【解决方案1】:

    您一次只能与生产者实例进行一个事务。

    如果您有多个线程进行单独处理并且它们都需要恰好一次语义,则每个线程都应该有一个生产者实例。

    【讨论】:

    • 在kafka中是否有任何api可用于ProducerPool,这样我们就不会创建尽可能多的生产者然后关闭它们。
    • 这不是 Apache Kafka API 的一部分。一些框架,如 spring-kafka 已经提供了这个功能
    【解决方案2】:

    不确定是否已解决。 您可以使用 apache common pool2 创建生产者实例池。 在工厂实现的 create() 方法中,您可以生成并分配唯一的 transactionalID 以避免冲突(ProducerFencedException)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-04-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-28
      • 1970-01-01
      相关资源
      最近更新 更多