【问题标题】:Multithreaded Kafka Consumer not processing all the partitions in parallel多线程 Kafka Consumer 未并行处理所有分区
【发布时间】:2019-07-23 03:32:27
【问题描述】:

我创建了一个多线程 Kafka 消费者,其中一个线程分配给每个分区(我总共有 100 个分区)。我关注了https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example链接。

下面是我的消费者的init方法。

consumer =  kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
        System.out.println("Kafka Consumer initialized.");
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topicName, 100);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topicName);

        executor = Executors.newFixedThreadPool(100);

在上面的 init 方法中,我得到了应该连接到每个分区的 Kafka 流列表(总共 100 个)(正如预期的那样)。

然后我确实使用下面的 sn-p 将每个流提交到不同的线程。

公共对象调用() {

  for (final KafkaStream stream : streams) {
        executor.execute(new StreamWiseConsumer(stream));
    }
    return true;
  }

下面是 StreamWiseConsumer 类。

public class StreamWiseConsumer extends Thread {

    ConsumerIterator<byte[], byte[]> consumerIterator;
    private KafkaStream m_stream;

    public StreamWiseConsumer(ConsumerIterator<byte[], byte[]> consumerIterator) {
        this.consumerIterator = consumerIterator;
    }

    public StreamWiseConsumer(KafkaStream kafkaStream) {
        this.m_stream = kafkaStream;
    }


    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> consumerIterator  = m_stream.iterator();

        while(!Thread.currentThread().isInterrupted() && !interrupted) {
            try {
                if (consumerIterator.hasNext()) {
                    String reqId = UUID.randomUUID().toString();
                    System.out.println(reqId+ " : Event received by threadId : "+Thread.currentThread().getId());
                    MessageAndMetadata<byte[], byte[]> messageAndMetaData = consumerIterator.next();
                    byte[] keyBytes = messageAndMetaData.key();
                    String key = null;
                    if (keyBytes != null) {
                        key = new String(keyBytes);
                    }
                    byte[] eventBytes = messageAndMetaData.message();
                    if (eventBytes == null){
                        System.out.println("Topic: No event fetched for transaction Id:" + key);
                        continue;
                    }
                    String event = new String(eventBytes).trim();
                    // Some Processing code
                    System.out.println(reqId+" : Processing completed for threadId = "+Thread.currentThread().getId());
                    consumer.commitOffsets();
            } catch (Exception ex) {

            }

        }
    }
}

理想情况下,它应该从所有 100 个分区开始并行处理。但它正在从一个线程中挑选一些随机数量的事件并对其进行处理,然后其他一些线程开始从另一个分区进行处理。它似乎是顺序处理,但具有不同的线程。我期望从所有 100 个线程中进行处理。我在这里遗漏了什么吗?

日志链接的PFB。 https://drive.google.com/file/d/14b7gqPmwUrzUWewsdhnW8q01T_cQ30ES/view?usp=sharing https://drive.google.com/file/d/1PO_IEsOJFQuerW0y-M9wRUB-1YJuewhF/view?usp=sharing

【问题讨论】:

  • 您使用的是旧版本的 Kafka 客户端 (kafka.consumer.Consumer 已弃用,您应该使用 `org.apache.kafka.clients.consumer.Consumer
  • 它有什么影响?即使它是旧版本,它也应该可以工作。我正在使用 Kafka 版本 0.8.2.1.

标签: multithreading apache-kafka kafka-consumer-api


【解决方案1】:

我怀疑这是否是垂直缩放 kafka 流的正确方法。

Kafka 流本身就支持多线程消费。

使用 num.stream.threads 配置增加用于处理的线程数。

如果希望 100 个线程处理 100 个分区,请将 num.stream.threads 设置为 100。

【讨论】:

猜你喜欢
  • 1970-01-01
  • 2017-04-20
  • 2017-08-15
  • 2023-01-27
  • 2023-03-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多