【问题标题】:Is there a way for me to retrieve the error codes from Kafka when my producer fails to publish a message successfully?当我的生产者未能成功发布消息时,我有没有办法从 Kafka 中检索错误代码?
【发布时间】:2020-10-21 20:23:54
【问题描述】:

当我的生产者未能成功发布消息时,我有没有办法从 Kafka 中检索错误代码 https://kafka.apache.org/protocol#protocol_error_acodes

请提供相同的 POC 以在 java 中检索错误代码

【问题讨论】:

标签: apache-kafka kafka-producer-api


【解决方案1】:

您可以向生产者注册回调,用于成功和失败。请看以下代码:

public ListenableFuture<SendResult<K, V>> sendMessage(String topicName, V message) {
    log.info("sending message :{} to topic {} ", message, topicName);
    ListenableFuture<SendResult<K, V>> future = kafkaTemplate.send(topicName, message);
    addCallback(message, future);
    return future;
}

private void addCallback(V message, ListenableFuture<SendResult<K, V>> future) {
    future.addCallback(new ListenableFutureCallback<SendResult<K, V>>() {
        @Override
        public void onSuccess(SendResult<K, V> result) {
            //this is a callback for success. 
            // use result to know various attributes like offset can be known
            // using result.getRecordMetadata().offset(), etc.
        }

        @Override
        public void onFailure(Throwable ex) {
            
            //use ex to know the exception that occurred while
            //publishing a message
        }
    });
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-10-03
    • 2021-08-17
    • 2018-10-17
    • 1970-01-01
    • 1970-01-01
    • 2019-11-10
    • 2021-08-22
    • 1970-01-01
    相关资源
    最近更新 更多