【问题标题】:Kafka Consumer empty records卡夫卡消费者空记录
【发布时间】:2020-10-28 15:51:48
【问题描述】:

关于这个话题有很多问题,但是,这不是重复的问题!

我面临的问题是我尝试使用 Java 14 和 Kafka 2.5.0 设置 SpringBoot 项目,而我的 Consumer 返回一个空的记录列表。 这里的大多数答案表明一些被遗忘的属性,poll frequently 或设置offset mode to earliest

我看不出与docs.confluent.io 有任何逻辑 区别,尽管我的配置设置看起来不合常规(请参阅下面 sn-p 中我对jaas.conf 的设置)。

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaConsumer<Long, MyClass> consumerConfigs() {
        Properties config = new Properties();

        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);

        System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");

        return new KafkaConsumer<>(config);
    }
}

但是,这可行。我没有收到任何异常(Kafka 或其他),并且连接已建立。

// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};

这是我实际投票的地方:

try {
            KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
            consumer.subscribe(Collections.singletonList(inputTopic));

            int count = 0;
            Long start = System.currentTimeMillis();
            Long end = System.currentTimeMillis();

            while (end - start < 900_000) { 
                // boolean would be set to true in production
                ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record -> {
                    MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
                    System.out.println(result);
                });
               
                consumer.commitSync();

                System.out.println("visualize number of loops made: " + ++count);
                end = System.currentTimeMillis();
            }
        } catch (KafkaException e) {
            e.printStackTrace();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }

我添加了打印和其他混乱,以便尝试找出问题。我在调试模式下运行我的程序并将断点放在这一行:

MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());

结果,正如人们所期望的那样,我看到每秒都有一个计数的打印行。但由于我的消费者没有返回任何记录,它永远不会进入forEach,因此永远不会触发我的断点。

我绝对可以在云中看到我的主题,有两个分区。消息源源不断地生成,所以我知道我应该能够接收到一些东西。

我知道连接到集群需要一些时间,但由于当前时间设置为一刻钟,我至少应该会收到一些东西,对吧?作为替代方案,我尝试将consumer.subscribe() 切换到consumer.assign() 方法,如果我指定了我的TopicPartition,将消费者设置为consumer.seekToBeginning()。它运行良好,但也没有返回任何内容。

在最常见的示例中没有发现的另一件事是我使用自己的类。因此,我根据this tutorial 实现了自定义(反)序列化器,而不是KafkaConsumer&lt;String, String&gt;

可能是我的配置设置?轮询超时有问题吗? (反)序列化,还是完全其他的?我真的无法确定为什么我得到零记录的任何原因。任何反馈将不胜感激!

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api jaas confluent-cloud


    【解决方案1】:

    问题解决了。您无法从我发布的问题中确定任何内容,但是,如果其他人发现自己陷入了类似的配置,我想澄清一些事情。

    1. 验证收到的密码确实是正确的。 面部护理

    确实如此,我以为他正在与集群建立连接,但我的循环一直在打印计数,因为执行了 .poll(Duration.ofMillis(1000)) 方法 -> 检查他是否可以在给定的超时时间内连接 -> 继续如果连接失败,返回零记录。没有错误被抛出。通常,大约 2 秒后,应该已经建立了连接。

    1. 检查您与数据库的连接。

    您永远不希望应用程序停止,这就是为什么我设计myOwnKafkaService.getSomethingFromRecord(record.key(), record.value()) 方法来记录所有错误,但捕获所有异常。直到我检查了日志,我才意识到我访问远程数据库的权限不正常。

    1. 所谓的时间戳,应该反序列化为 java.util.Date

    错误解析会引发异常,但我的方法返回了null。正如此答案中的所有评论一样,这也归结为在此类设置中缺乏经验。您会发现下面的修正类可作为一个工作示例(但根本不是最佳实践)。

    KafkaConfig:

    @EnableKafka
    @Configuration
    public class KafkaConfig {
    
        @Bean
        public KafkaConsumer<Long, MyClass> consumerConfigs() {
            Properties config = new Properties();
    
            config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    
            config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
            config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100_000);
            config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300_000);
            config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
    
            System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");
    
            return new KafkaConsumer<>(config);
        }
    }
    

    轮询方法的主体:

                KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
                consumer.subscribe(Collections.singletonList(inputTopic));
    
                while (true) {
                    ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
                    records.forEach(record -> {
                        MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
                        System.out.println(result);
                    });
                    consumer.commitSync();
                }
    

    带有反序列化器的 MyClass 小例子:

    @Data
    @Slf4J
    public class MyClass implements Deserializer<MyClass> {
    
        @JsonProperty("UNIQUE_KEY")
        private Long uniqueKey;
        @JsonProperty("EVENT_TIMESTAMP")
        @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS")
        private Date eventTimestamp;
        @JsonProperty("SOME_OTHER_FIELD")
        private String someOtherField;
    
    @Override
        public MyClass deserialize(String s, byte[] bytes) {
            ObjectMapper mapper = new ObjectMapper();
            MyClass event = null;
            try {
                event = mapper
                        .registerModule(new JavaTimeModule())
                        .readValue(bytes, MyClass.class);
            } catch (Exception e) {
                log.error("Something went wrong during the deserialization of the MyClass: {}", e.getMessage());
            }
            return event;
        }
    }
    

    我希望这能在未来为其他人服务。我从挫折和错误中学到了很多东西。

    【讨论】:

      猜你喜欢
      • 2023-03-29
      • 1970-01-01
      • 2019-07-03
      • 2018-05-05
      • 2021-08-22
      • 1970-01-01
      • 1970-01-01
      • 2020-10-28
      • 2015-12-18
      相关资源
      最近更新 更多