【问题标题】:Iteration over ConsumerIterator object seems to be never ending loop对 ConsumerIterator 对象的迭代似乎是永无止境的循环
【发布时间】:2015-09-17 04:41:44
【问题描述】:

我尝试在 0.8.2.1 版本中为 Apache Kafka 实现 Java 消费者客户端。我知道新版本的 Java Consumer API 将在新版本的 Apache Kafka 中可用,但现在我必须在当前版本中实现消费者客户端。

所以我已经完成了,但是我在检查收到的消息数时遇到了问题。问题似乎在于对 ConsumerIterator 对象的迭代以获取消息及其 id。迭代似乎是永无止境的循环。看下面的代码:

public class TestKafkaConsumer extends Thread{

private final static Logger logger = Logger.getLogger(TestKafkaConsumer.class);
private Properties appProperties;
private String topic;
private ZookeeperConsumerConnector consumer;

public TestKafkaConsumer() throws Exception {
    topic = "topic";
    consumer = (ZookeeperConsumerConnector) Consumer.createJavaConsumerConnector(createConsumerConfig());
}

public static void main( String[] args ){
    PropertyConfigurator.configure("log4j.properties");
    try {
        TestKafkaConsumer testConsumer = new TestKafkaConsumer();
        testConsumer.start();
    } catch (Exception e) {
        logger.error("Error create consumer ", e);
    }

}

@Override
public void run() {
    logger.info("Consumer thread - start");
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = (Map<String, List<KafkaStream<byte[], byte[]>>>) consumer
            .createMessageStreams(topicCountMap);
    KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
    ConsumerIterator<byte[], byte[]> it = stream.iterator();
    //logger.info("Count of consumed messages: " + it.length());
    long msgCount = 0;
    while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> messageAndMetadata  = it.next();
            logger.info("Key: " + new String(messageAndMetadata.key()));
            logger.info("Message: " + new String(messageAndMetadata.message()));
            msgCount++;
        }
    logger.info("Summary count of consumed messages: " + msgCount); 
}

private ConsumerConfig createConsumerConfig() throws Exception{
    logger.info("createConsumerConfig - start");
    Properties props = new Properties();
    props.put("zookeeper.connect", "localhost:2181");
    props.put("group.id","testGrp");
    props.put("zookeeper.session.timeout.ms", "400");
    props.put("zookeeper.sync.time.ms", "200");
    props.put("auto.commit.interval.ms", "10000");
    logger.info("createConsumerConfig - finish");
    return new ConsumerConfig(props);                
}

}

问题是日志文件中没有出现消费消息计数的日志条目,所以我认为迭代永远不会结束。我已经测试了所有消息都已阅读。所以所有记录都被迭代但程序不会留下循环。另外我注意到,当我尝试在开始循环之前获取 ConsumerIterator (或大小)的长度(取消注释相应行)时,程序似乎停在这个地方并且循环甚至没有开始 - 日志文件中没有关于读取记录的条目。

问题出在哪里 - 在 ConsumerIterator 类中?也许我做错了什么。如果有人遇到这样的问题并可以帮助我,我将不胜感激。

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api


    【解决方案1】:

    ConsumerIterator 的 javadoc,http://apache.mirrorcatalogs.com/kafka/0.8.2-beta/scala-doc/index.html#kafka.consumer.ConsumerIterator,声明“一个迭代器,它阻塞直到可以从提供的队列中读取一个值。”

    这可能是问题吗?

    【讨论】:

    • 是的,这就是问题所在。当我在一段时间后关闭线程和消费者时,我会得到关于摘要消费消息的信息。非常感谢。
    猜你喜欢
    • 1970-01-01
    • 2023-03-22
    • 1970-01-01
    • 1970-01-01
    • 2016-06-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多