【发布时间】:2017-01-12 13:59:32
【问题描述】:
我将我的代码从单个 kafka 消费者更改为让多个消费者从具有相同组 ID 的同一主题中读取,以便有效地使用大容量主题。但它似乎在成功启动后出现错误,我认为这与消费者等待消息然后失败有关。但问题是,当一个消费者关闭时,其他消费者也会关闭。
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
try {
while (it.hasNext())
{
try{
String mesg = new String(it.next().message());
System.out.println( mesg);
if (StringUtils.isEmpty(mesg)){
continue;
}
System.out.println("Thread " + m_threadNumber + ": " +
new String(it.next().message()));
mesg = messageFormat.createMsg(mesg);
System.out.println("MESSAGE TRANSMISSION SUCCESSFUL!");
}
catch(Exception e)
{
e.printStackTrace();
continue;
}
}
}catch(Exception e)
{
e.printStackTrace();
}
System.out.println("Shutting down Thread: " + m_threadNumber);
//System.out.println("Shutting down Thread: " + m_threadNumber);
}
这里是异常错误。
NoSuchElementException exception
真的很想在这里让他工作,所以很感激这里的任何帮助。提前致谢。
【问题讨论】:
-
发布异常堆栈跟踪可能会有所帮助。
标签: apache-kafka