【问题标题】:How is the DefaultKafkaProducerFactory cache managed for transactions?DefaultKafkaProducerFactory 缓存是如何管理事务的?
【发布时间】:2021-06-07 21:43:34
【问题描述】:

在spring kafka文档https://docs.spring.io/spring-kafka/docs/2.3.3.RELEASE/reference/html/#transactions

它提到了;

通过为 DefaultKafkaProducerFactory 提供 transactionIdPrefix 来启用事务。在这种情况下,工厂不是管理单个共享生产者,而是维护事务生产者的缓存。当用户在生产者上调用 close() 时,它会返回到缓存中以供重用,而不是实际关闭。每个生产者的 transactional.id 属性是 transactionIdPrefix + n

  • 如何配置此缓存,例如生产者池大小?
  • 当给定事务的缓存中没有任何可用的生产者时,它是否会动态创建一个新的生产者?

【问题讨论】:

    标签: spring-kafka


    【解决方案1】:

    这取决于事务是否仅是生产者以及producerPerConsumerPartition 默认为true(对于消费者发起的事务)。

    此属性是为了支持EOSMode.ALPHA(或在使用 BETA 但代理早于 2.5 时回退到 ALPHA)。

    请参阅here 了解有关恰好一次语义的更多信息。

    当使用producerPerConsumerPartition=false 和仅生产者事务时,缓存大小没有限制;缓存为空时创建新的生产者,关闭时返回缓存。

    【讨论】:

    • 对于 producerPerConsumerPartition=false 和仅生产者事务。 transactional.id 后缀在创建新生产者时如何增加以及缓存如何清空。以确保围栏。
    • 每个生产者创建时分配一个新的后缀;如果您有多个应用程序实例,transactional.id(前缀)在每个实例上必须是唯一的。当工厂被销毁(在应用程序上下文关闭期间)或工厂的reset() 方法被调用时,该类被清除。
    猜你喜欢
    • 1970-01-01
    • 2015-05-31
    • 1970-01-01
    • 1970-01-01
    • 2019-02-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多