【问题标题】:How to check if Kafka Consumer is ready如何检查 Kafka Consumer 是否准备就绪
【发布时间】:2018-01-03 06:03:10
【问题描述】:

我将 Kafka 提交策略设置为最新并且缺少前几条消息。如果我在开始将消息发送到输入主题之前休眠 20 秒,那么一切都按预期工作。我不确定问题是否与消费者花费很长时间进行分区重新平衡有关。有没有办法在开始轮询之前知道消费者是否准备好?

【问题讨论】:

    标签: apache-kafka rebalancing


    【解决方案1】:
    • 您可以使用consumer.assignment(),它将返回一组分区并验证是否分配了所有可用于该主题的分区。

    • 如果您使用的是spring-kafka项目,您可以包含spring-kafka-test依赖并使用下面的方法等待主题分配,但您需要有容器。 ContainerTestUtils.waitForAssignment(Object container, int partitions);

    【讨论】:

      【解决方案2】:

      您可以执行以下操作:

      我有一个从 kafka 主题读取数据的测试。
      所以你不能在多线程环境中使用KafkaConsumer,但是你可以传递参数“AtomicReference assignment”,在消费者线程中更新,在另一个线程中读取。

      例如,在项目中截取工作代码进行测试:

          private void readAvro(String readFromKafka,
                            AtomicBoolean needStop,
                            List<Event> events,
                            String bootstrapServers,
                            int readTimeout) {
          // print the topic name
          AtomicReference<Set<TopicPartition>> assignment = new AtomicReference<>();
          new Thread(() -> readAvro(bootstrapServers, readFromKafka, needStop, events, readTimeout, assignment)).start();
      
          long startTime = System.currentTimeMillis();
          long maxWaitingTime = 30_000;
          for (long time = System.currentTimeMillis(); System.currentTimeMillis() - time < maxWaitingTime;) {
              Set<TopicPartition> assignments = Optional.ofNullable(assignment.get()).orElse(new HashSet<>());
              System.out.println("[!kafka-consumer!] Assignments [" + assignments.size() + "]: "
                      + assignments.stream().map(v -> String.valueOf(v.partition())).collect(Collectors.joining(",")));
              if (assignments.size() > 0) {
                  break;
              }
              try {
                  Thread.sleep(1_000);
              } catch (InterruptedException e) {
                  e.printStackTrace();
                  needStop.set(true);
                  break;
              }
          }
          System.out.println("Subscribed! Wait summary: " + (System.currentTimeMillis() - startTime));
      }
      
      private void readAvro(String bootstrapServers,
                            String readFromKafka,
                            AtomicBoolean needStop,
                            List<Event> events,
                            int readTimeout,
                            AtomicReference<Set<TopicPartition>> assignment) {
      
          KafkaConsumer<String, byte[]> consumer = (KafkaConsumer<String, byte[]>) queueKafkaConsumer(bootstrapServers, "latest");
          System.out.println("Subscribed to topic: " + readFromKafka);
          consumer.subscribe(Collections.singletonList(readFromKafka));
      
          long started = System.currentTimeMillis();
          while (!needStop.get()) {
              assignment.set(consumer.assignment());
              ConsumerRecords<String, byte[]> records = consumer.poll(1_000);
              events.addAll(CommonUtils4Tst.readEvents(records));
      
              if (readTimeout == -1) {
                  if (events.size() > 0) {
                      break;
                  }
              } else if (System.currentTimeMillis() - started > readTimeout) {
                  break;
              }
          }
      
          needStop.set(true);
      
          synchronized (MainTest.class) {
              MainTest.class.notifyAll();
          }
          consumer.close();
      }
      

      P.S.
      needStop - 全局标志,在成功失败的情况下停止所有正在运行的线程
      事件 - 对象列表,我想要检查
      readTimeout - 我们将等待多长时间才能读取所有数据,如果 readTimeout == -1,则在我们读取任何内容时停止

      【讨论】:

        【解决方案3】:

        感谢 Alexey(我也投了赞成票),我似乎基本上按照相同的想法解决了我的问题。

        只是想分享我的经验......在我们的例子中,我们使用 Kafka 的请求和响应方式,有点像 RPC。请求正在发送一个主题,然后等待另一个主题的响应。遇到类似的问题,即错过第一响应。

        我曾多次尝试... KafkaConsumer.assignment();(使用Thread.sleep(100);)但似乎没有帮助。添加KafkaConsumer.poll(50); 似乎已经为消费者(组)做好了准备并收到了第一个响应。测试了几次,它现在一直在工作。

        顺便说一句,测试需要停止应用程序并删除 Kafka 主题,并且为了一个好的措施,也重新启动了 Kafka。

        PS:仅调用 poll(50); 而没有 assignment(); 获取逻辑,就像 Alexey 提到的那样,可能无法保证消费者(组)已准备好。

        【讨论】:

          【解决方案4】:

          如果您的策略设置为最新(如果没有先前提交的偏移量则生效)但您没有先前提交的偏移量,那么您不应该担心“丢失”消息,因为您告诉 Kafka 不要关心关于“之前”发送给您的消费者准备好的消息。

          如果您关心“以前的”消息,则应将策略设置为最早。

          在任何情况下,无论采用何种策略,您看到的行为都是暂时的,即一旦提交的偏移量保存在 Kafka 中,每次重新启动时,消费者都会从之前离开的地方继续

          【讨论】:

          • 我的要求是我需要发送记录读取它们并进行一些处理。消息一旦被读取,就不应被重新处理。而且我不需要通过设置为最早来阅读所有消息,因为这对我来说没有多大意义。
          • 恐怕您的期望与 Kafka 语义不匹配。将auto.offset.reset 设置为最新时,您不应期望将所有消息都发送到某个主题。消费者和生产者彼此完全异步工作。如果您想要已发送到主题的所有内容,请将重置设置为最早。无论如何,一旦消费者组建立并提交了偏移量,重置策略将无关紧要
          【解决方案5】:

          您可以修改AlwaysSeekToEndListener(仅侦听新消息)以包含回调:

          public class AlwaysSeekToEndListener<K, V> implements ConsumerRebalanceListener {
              private final Consumer<K, V> consumer;
              private Runnable callback;
          
              public AlwaysSeekToEndListener(Consumer<K, V> consumer) {
                  this.consumer = consumer;
              }
          
              public AlwaysSeekToEndListener(Consumer<K, V> consumer, Runnable callback) {
                  this.consumer = consumer;
                  this.callback = callback;
              }
          
              @Override
              public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
              }
          
              @Override
              public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                  consumer.seekToEnd(partitions);
                  if (callback != null) {
                      callback.run();
                  }
              }
          }
          

          并使用闩锁回调订阅:

          CountDownLatch initLatch = new CountDownLatch(1);
          
          consumer.subscribe(singletonList(topic), new AlwaysSeekToEndListener<>(consumer, () -> initLatch.countDown()));
          
          initLatch.await(); // blocks until consumer is ready and listening
          

          然后继续启动你的生产者。

          【讨论】:

            【解决方案6】:

            在进行一些测试之前,我需要知道 kafka 消费者是否准备好,所以我尝试使用consumer.assignment(),但它只返回了分配的分区集,但是有一个问题,我看不到这个分区分配给组的偏移量设置,所以后来当我尝试使用消费者时,它没有正确设置偏移量。

            解决方案是使用committed(),这将为您提供您放入参数中的给定分区的最后提交偏移量。

            所以你可以这样做:consumer.committed(consumer.assignment())

            如果还没有分配分区,它将返回:

            {}
            

            如果分配了分区,但还没有偏移:

            {name.of.topic-0=null, name.of.topic-1=null}
            

            但是如果有分区和偏移:

            {name.of.topic-0=OffsetAndMetadata{offset=5197881, leaderEpoch=null, metadata=''}, name.of.topic-1=OffsetAndMetadata{offset=5198832, leaderEpoch=null, metadata=''}}
            

            有了这些信息,你可以使用类似的东西:

            consumer.committed(consumer.assignment()).isEmpty();
            consumer.committed(consumer.assignment()).containsValue(null);
            

            有了这些信息,你就可以确定 kafka 消费者已经准备好了。

            【讨论】:

              猜你喜欢
              • 2016-04-07
              • 1970-01-01
              • 2011-07-08
              • 2015-10-20
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 1970-01-01
              • 2010-12-10
              相关资源
              最近更新 更多