【发布时间】:2018-04-15 05:17:50
【问题描述】:
我们正在使用 Spring KafkaListener,它在处理到 DB 后确认每个记录。如果我们在写入数据库时遇到问题,我们不会确认记录,因此不会为消费者提交偏移量。这很好用。现在我们想在下一次轮询中获取失败的消息以重试它们。我们将错误处理程序添加到我们的侦听器并调用 ConsumerAwareListenerErrorHandler 并尝试对失败的消息偏移执行 consumer.seek()。期望在下一次轮询期间,我们应该收到失败的消息。这没有发生。下一次轮询仅获取新消息而不是失败消息代码 sn-p 如下所示。
@Service
public class KafkaConsumer {
@KafkaListener(topics = ("${kafka.input.stream.topic}"), containerFactory = "kafkaManualAckListenerContainerFactory", errorHandler = "listen3ErrorHandler")
public void onMessage(ConsumerRecord<Integer, String> record,
Acknowledgment acknowledgment ) throws Exception {
try {
msg = JaxbUtil.convertJsonStringToMsg(record.value());
onHandList = DCMUtil.convertMsgToOnHandDTO(msg);
TeradataDAO.updateData(onHandList);
acknowledgment.acknowledge();
recordSuccess = true;
LOGGER.info("Message Saved in Teradata DB");
} catch (Exception e) {
LOGGER.error("Error Processing On Hand Data ", e);
recordSuccess = false;
}
}
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() throws InterruptedException {
return (message, exception, consumer) -> {
this.listen3Exception = exception;
MessageHeaders headers = message.getHeaders();
consumer.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
}
Container Class
@Bean
public Map<Object,Object> consumerConfigs() {
Map<Object,Object> props = new HashMap<Object,Object> ();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
localhost:9092);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "example-1");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return props;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerConfigs());
}
@SuppressWarnings("unchecked")
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
return factory;
}
【问题讨论】:
标签: spring-kafka