【问题标题】:Pause high level Kafka consumer暂停高级 Kafka 消费者
【发布时间】:2016-09-01 11:24:03
【问题描述】:

我有以下 Kafka 消费者,我想在特定条件下暂停,然后稍后恢复以使用所有先前的消息。一个想法是使用可以由其他线程更新的共享标志,在消费之前,即iterator.next().message()我检查标志的值。如果是真的不要消费,否则消费消息。只是想检查一下我的想法是否正确,或者是否有更好的方法。

class KafkaConsumer implements Runnable {
        KafkaStream<byte[], byte[]> topicStream;
        ConsumerConnector consumerConnectorObj;

        public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream,
                final ConsumerConnector consumerConnectorObj) {
            this.topicStream = topicStream;
            this.consumerConnectorObj = consumerConnectorObj;
        }

        @Override
        public void run() {
            if (topicStream != null) {
                ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator();
                while (true) {
                        if (iterator != null) {
                            boolean nextFlag = true;
                            try {
                                iterator.hasNext();
                            } catch (ConsumerTimeoutException e) {
                                LOG.warn("Consumer timeout occured", e);
                                nextFlag = false;
                            }

                            if (nextFlag) {
                                byte[] msg = iterator.next().message();
                            }
                        }
                }
            }
        }
    }

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    通常您不需要通过手工方式在线程之间进行同步,因为您很有可能会出错。请查看java.util.concurrent 寻求帮助。在你的情况下,它可能是一个信号量。使用它的最简单方法是在处理消息之前获取信号量,然后将其返回并尝试在下一个循环中立即再次获取它。

    我的直觉是,这不是最好的方法。我宁愿打电话给availablePermits() 并在数量更大的时候继续消费。只有当它下降到零时,才尝试获取信号量。这将阻塞线程,直到另一个线程再次提供一个许可。这将解除对工作线程的阻塞,并将许可分发给它,它应该立即再次返回,开始像上面一样循环。

    while (true) {
      if (semaphore.availablePermits()<=0) {
        semaphore.acquire(); // will block
        // eventually another thread increments the semaphore again
        // and we arrive here
        semaphore.release();
      }
      // keep processing messages
    }
    

    您可以在this question 的回复中找到更多想法。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-07-10
      • 2015-06-05
      • 1970-01-01
      • 1970-01-01
      • 2017-09-23
      • 2017-12-23
      • 1970-01-01
      • 2016-10-16
      相关资源
      最近更新 更多