【发布时间】:2016-04-26 09:38:11
【问题描述】:
我正在编写一个消费者,一旦将一系列记录提交给 Mongo,它就会手动提交偏移量。
在 Mongo 错误或任何其他错误的情况下,尝试将记录保存到错误处理集合中
以便日后重播。
如果 Mongo 已关闭,那么我希望消费者在尝试从 Kakfa 的未提交偏移中读取记录之前停止处理一段时间。
下面的示例有效,但我想知道这种情况的最佳实践是什么?
while (true) {
boolean commit = false;
try {
ConsumerRecords<K, V> records = consumer.poll(consumerTimeout);
kafkaMessageProcessor.processRecords(records);
commit = true;
}
catch (Exception e) {
logger.error("Unable to consume closing consumer and restarting", e);
try {
consumer.close();
}
catch (Exception consumerCloseError) {
logger.error("Unable to close consumer", consumerCloseError);
}
logger.error(String.format("Attempting recovery in [%d] milliseconds.", recoveryInterval), e);
Thread.sleep(recoveryInterval);
consumer = createConsumer(properties);
}
if (commit) {
consumer.commitSync();
}
}
private KafkaConsumer<K, V> createConsumer(Properties properties) {
KafkaConsumer<K, V> consumer = new KafkaConsumer<K, V>(properties);
consumer.subscribe(topics);
return consumer;
}
如果我不重新创建消费者,我会收到以下错误。
o.a.k.c.c.internals.AbstractCoordinator : Marking the coordinator 2147483647 dead.
o.a.k.c.c.internals.ConsumerCoordinator : Error ILLEGAL_GENERATION occurred while committing offsets for group test.consumer
【问题讨论】:
标签: java apache-kafka kafka-consumer-api