【问题标题】:losing kafka messages due to exceptions in consumer code由于消费者代码中的异常而丢失 kafka 消息
【发布时间】:2022-01-10 08:21:16
【问题描述】:

我编写了一个容易出错的消费者,它以 50% 的概率随机抛出异常。

这个消费者消费所有消息,但只将其中的 85% 写入数据库。

问题出在哪里?

github:https://github.com/grzegorz-brzeczyszczykiewicz/kafka-losing-messages

我这样配置自动提交

spring.cloud.stream.kafka.bindings.my-channel.consumer.autoCommitOffset=false
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.isolation-level=read_commited

我的消费者看起来像

package com.example.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import java.time.ZonedDateTime;
import java.util.Random;

@Configuration
@Component
@EnableTransactionManagement
@Slf4j
public class Consumer {

    @Autowired
    private MyDataRepository repository;
    private int methodCounter = 0;
    private int defectCounter = 0;

    //@Transactional
    @StreamListener(Stream.IN_CHANNEL)
    public void handleMessage(final Message<String> msg) throws Exception {
        //msg.getHeaders().forEach((key, value) -> System.out.println(key + ": " + value));
        final Acknowledgment acknowledgment = msg.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        System.out.println("payload: " + msg.getPayload());
        methodCounter++;
        System.out.println("method counter: " + methodCounter);
        if(!new Random().nextBoolean()){
            defectCounter++;
            System.out.println("defect counter: " + defectCounter);
            throw new Exception("kaputt");
        }

        System.out.println("");
        MyData data = new MyData();
        String[] split = msg.getPayload().split(" time: ");
        data.setMessage(split[0] + "_" + methodCounter + "_" + defectCounter);
        data.setTime(ZonedDateTime.parse(split[1]));
        repository.save(data);
//        if (acknowledgment!=null) {
//            System.out.println("ack nonnull");
            acknowledgment.acknowledge();
//        } else {
//            System.out.println("ack null");
//        }
    }
}

我找到了解决方案。只需将所有错误代码包装在 try catch 块中,然后在 catch 块中执行 nack()

【问题讨论】:

  • 我相信你已经找到了问题中提到的解决方案,你能关闭这个问题吗?

标签: apache-kafka spring-kafka


【解决方案1】:

我找到了解决方案。只需将所有错误代码包装在 try catch 块中,然后在 catch 块中执行 nack()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-07-12
    • 1970-01-01
    • 2016-04-13
    • 2016-06-10
    • 2021-01-26
    • 2017-03-19
    相关资源
    最近更新 更多