【问题标题】:Should not acknowledging a message cause it to be redelivered to the Kafka consumer?不应该确认消息会导致它被重新传递给 Kafka 消费者吗?
【发布时间】:2019-12-11 04:59:55
【问题描述】:

我正在运行一个 Spring Cloud Stream 应用程序,我在其中从 Kafka 主题读取事务,处理事务,然后将它们发送到 IBM MQ。当与 IBM MQ 没有连接以防止事务丢失时,我正在尝试处理该错误。在这种情况下,jms 模板将抛出异常,并且流侦听器不会提交事务。预期的行为是事务保留在 Kafka 主题中,并且流侦听器再次读取它。然而,消息似乎只被消费一次,并且没有发生“回滚”。为此,这是我的配置:

spring:
  cloud:
    stream:
      kafka:
        bindings:
          input:
            consumer:
              auto-commit-offset: false
      bindings:
        input:
          destination:  kafka_topic
          brokers: localhost:9092

这里是代码:

    public void handleMessage(Message<TransactionMessage> request,  @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) {
        TransactionMessage message = request.getPayload();
        System.out.println("Consumed a message");
        try {
            executionFlow.execute(message); // here the jmsTemplate throws an exception
            System.out.println("doing the ack");
            acknowledgment.acknowledge();
        }
        catch (RuntimeException e) {
            System.out.println("did not send to MQ");
        }
    }

executionFlow调用的jmsTemplate代码:

    public void sendMessage(String messageTarget) {

        System.out.println("i am trying to send to MQ");
        try {
            jmsTemplate.convertAndSend(destinationTopicQueue, messageTarget);
        } catch (Exception e) {
            throw new RuntimeException("jmsTemplate failed to send to IBM MQ");
        }
    }

这是我关闭与 IBM MQ 的连接时的输出:

Consumed a message
i am trying to send to MQ
did not send to MQ

【问题讨论】:

    标签: java apache-kafka ibm-mq spring-cloud-stream


    【解决方案1】:
       catch (RuntimeException e) {
            System.out.println("did not send to MQ");
        }
    

    你需要重新抛出异常才能导致回滚。

    您还需要在活页夹中启用 Kafka 事务。

    Kafka Binder Properties

    spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
    

    在活页夹中启用事务。请参阅 Kafka 文档中的 transaction.id 和 spring-kafka 文档中的 Transactions。启用事务后,将忽略单个生产者属性,所有生产者都使用 spring.cloud.stream.kafka.binder.transaction.producer.* 属性。

    默认 null(无交易)

    如果你也向kafka发送数据,你需要一个事务生产者

    spring.cloud.stream.kafka.binder.transaction.producer.*
    

    事务绑定器中生产者的全局生产者属性。请参阅 spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix 和 Kafka Producer Properties 以及所有 binder 支持的一般 producer properties。

    默认值:查看各个生产者属性。

    【讨论】:

    • 我不想写 Kafka 主题。仅从一个读取并将消息重定向到 IBM MQ。所以不确定它与 kafka 生产者配置有什么关系?
    • 如果你不写卡夫卡你不需要生产者部分;我为将来可能遇到此问题/答案的其他人添加了这一点。
    • 如果您使用 Spring Cloud Stream 2.1 或更高版本,在 binder 中启用事务的替代方法是向应用程序添加自定义 ListenerContainerCustomizer bean,并使用它在侦听器容器。如果没有 kafka 事务的开销,这将具有相同的效果。您仍然需要抛出异常以便调用它。
    • 谢谢。我还能够使用重试模板配置回滚。
    猜你喜欢
    • 1970-01-01
    • 2020-02-13
    • 1970-01-01
    • 1970-01-01
    • 2020-08-11
    • 2017-11-09
    • 1970-01-01
    • 2019-07-01
    • 1970-01-01
    相关资源
    最近更新 更多