【发布时间】:2016-09-01 11:24:03
【问题描述】:
我有以下 Kafka 消费者,我想在特定条件下暂停,然后稍后恢复以使用所有先前的消息。一个想法是使用可以由其他线程更新的共享标志,在消费之前,即iterator.next().message()我检查标志的值。如果是真的不要消费,否则消费消息。只是想检查一下我的想法是否正确,或者是否有更好的方法。
class KafkaConsumer implements Runnable {
KafkaStream<byte[], byte[]> topicStream;
ConsumerConnector consumerConnectorObj;
public KafkaConsumer(final KafkaStream<byte[], byte[]> topicStream,
final ConsumerConnector consumerConnectorObj) {
this.topicStream = topicStream;
this.consumerConnectorObj = consumerConnectorObj;
}
@Override
public void run() {
if (topicStream != null) {
ConsumerIterator<byte[], byte[]> iterator = topicStream.iterator();
while (true) {
if (iterator != null) {
boolean nextFlag = true;
try {
iterator.hasNext();
} catch (ConsumerTimeoutException e) {
LOG.warn("Consumer timeout occured", e);
nextFlag = false;
}
if (nextFlag) {
byte[] msg = iterator.next().message();
}
}
}
}
}
}
【问题讨论】:
标签: apache-kafka kafka-consumer-api