KafkaConsumer是非线程安全的,多线程共享一个KafkaConsumer实例,kafka会有如下异常:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

public class MyConsumer5 {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer5.class);

    public static void main(String[] args) throws InterruptedException {
        Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2");
        consumer.subscribe(Collections.singletonList("topic1"));

        new Thread(() -> {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    LOGGER.error("consumer51: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                }
            }
        }).start();

        new Thread(() -> {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(2));
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    LOGGER.error("consumer52: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                }
            }
        }).start();
    }
}

运行结果:

kafka学习总结014 --- consumer多线程问题

 

使用方法可见:https://blog.csdn.net/clypm/article/details/80618036

相关文章:

  • 2022-12-23
  • 2022-03-05
  • 2022-12-23
  • 2021-12-11
  • 2021-11-28
猜你喜欢
  • 2021-10-25
  • 2021-11-20
  • 2021-10-06
  • 2021-11-20
  • 2021-11-20
  • 2021-07-13
  • 2022-12-23
相关资源
相似解决方案