【问题标题】:Is there a way for me to retrieve the error codes from Kafka when my producer fails to publish a message successfully?当我的生产者未能成功发布消息时,我有没有办法从 Kafka 中检索错误代码?
【发布时间】:2020-10-21 20:23:54
【问题描述】:
【问题讨论】:
标签:
apache-kafka
kafka-producer-api
【解决方案1】:
您可以向生产者注册回调,用于成功和失败。请看以下代码:
public ListenableFuture<SendResult<K, V>> sendMessage(String topicName, V message) {
log.info("sending message :{} to topic {} ", message, topicName);
ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topicName, message);
addCallback(message, future);
return future;
}
private void addCallback(V message, ListenableFuture<SendResult<K, V>> future) {
future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
@Override
public void onSuccess(SendResult<K, V> result) {
//this is a callback for success.
// use result to know various attributes like offset can be known
// using result.getRecordMetadata().offset(), etc.
}
@Override
public void onFailure(Throwable ex) {
//use ex to know the exception that occurred while
//publishing a message
}
});
}