【问题标题】:Kafka consumer offset commit inside completable futureKafka消费者在可完成的未来内提交偏移量
【发布时间】:2018-03-04 07:16:06
【问题描述】:

我在线程内部创建了一个 Kafka 消费者实例,作为构造函数的一部分,并且在线程内部的 run 方法中,我确实调用了不同的 Web 服务并保持调用非阻塞,我正在使用可完成的未来。我的问题是我无法通过调用 thenApply 方法并传递 Kafka 消费者实例来发出提交,因为它给了我一个错误,即 Kafka 消费者不是线程安全的。虽然在我的提交方法中我已经编写了代码

synchronized(consumer) {
  commitResponse();
}

我仍然得到ConcurrentModificationException

class KafkaConsumerThread implements Runnable {

  KafkaConsumer<String, String> consumer;

  public KafkaConsumerThread(Properties properties) {
    consumer = new KafkaConsumer<String, String>(properties);    
    ...
  }

  @Override
  public void run() {
    try {
      // synchronized (consumer) {
      consumer.subscribe(topics);
      while (true) {
        if (closed.get()) {
          consumer.close();
        }
        ConsumerRecords<String, String> records = consumer.poll(120000);
        for (ConsumerRecord<String, String> record : records) {
          getAsyncClient().prepareGet(webServiceUrl)
              .execute()
              .toCompletableFuture()
              .thenApply(resp -> callAnotherService1(resp))
              .thenApply(resp -> callAnotherService2(resp))
              .thenApply(resp -> commitResponse(resp, consumer));
          }
        }
      }
    } catch (Exception ex) {
      ...
    }

在上面的代码中,我在 commitResponse 方法中遇到异常,即“KafkaConsumer 对于多线程访问不安全”。虽然在我的提交响应中,如果我将提交包含在 synchronized(consumer) 中,我仍然会收到错误。

【问题讨论】:

    标签: java multithreading java-8 apache-kafka kafka-consumer-api


    【解决方案1】:

    很可能是因为poll 方法未同步,而是被执行(同时仍持有内部 kafka 锁运行),而您的异步 GET 执行提交。

    查看对私有方法的引用: org.apache.kafka.clients.consumer.KafkaConsumer.acquire()org.apache.kafka.clients.consumer.KafkaConsumer.release()org.apache.kafka.clients.consumer.KafkaConsumer

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2022-11-13
      • 2021-11-10
      • 2022-11-11
      • 2017-08-22
      • 1970-01-01
      • 2018-09-27
      • 2020-08-08
      相关资源
      最近更新 更多