【问题标题】:Kafka offset not incremented卡夫卡偏移量未增加
【发布时间】:2018-04-04 04:02:31
【问题描述】:

我正在使用带有 Spring-boot 的 Kafka:

Kafka Producer 类

@Service
public class MyKafkaProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    private static Logger LOGGER = LoggerFactory.getLogger(NotificationDispatcherSender.class);

    // Send Message
    public void sendMessage(String topicName, String message) throws Exception {
        LOGGER.debug("========topic Name===== " + topicName + "=========message=======" + message);
        ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topicName, message);
        result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                LOGGER.debug("sent message='{}' with offset={}", message, result.getRecordMetadata().offset());
            }

            @Override
            public void onFailure(Throwable ex) {
                LOGGER.error(Constants.PRODUCER_MESSAGE_EXCEPTION.getValue() + " : " + ex.getMessage());
            }
        });
    }
}

Kafka-配置:

spring.kafka.producer.retries=0
spring.kafka.producer.batch-size=100000
spring.kafka.producer.request.timeout.ms=30000
spring.kafka.producer.linger.ms=10
spring.kafka.producer.acks=0
spring.kafka.producer.buffer-memory=33554432
spring.kafka.producer.max.block.ms=5000
spring.kafka.bootstrap-servers=192.168.1.161:9092,192.168.1.162:9093

问题

我有一个主题的 5 个分区,比如说my-topic

发生的情况是,我获得了成功(即消息已成功发送到 Kafka)日志,但主题 my-topic 的无分区偏移量增加了。

正如您在上面看到的,我添加了日志onSuccessonFailure。 我期望的是,当 Kafka 无法向 Kafka 发送消息时,我应该会收到错误消息,但在这种情况下我没有收到任何错误消息。

Kafka 的上述行为以100: 5 的比率发生(即每 100 条成功的消息发送到 kafka 之后)。

Edit1:为成功案例添加Kafka生产者日志(即消费者端成功接收消息)

ProducerConfig - logAll:180] ProducerConfig values: 
    acks = 0
    batch.size = 1000
    block.on.buffer.full = false
    bootstrap.servers = [10.20.1.19:9092, 10.20.1.20:9093, 10.20.1.26:9094]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 10
    max.block.ms = 5000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 60000
    retries = 0
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2017-10-24 14:30:09, [INFO] [karma-unified-notification-manager - ProducerConfig - logAll:180] ProducerConfig values: 
    acks = 0
    batch.size = 1000
    block.on.buffer.full = false
    bootstrap.servers = [10.20.1.19:9092, 10.20.1.20:9093, 10.20.1.26:9094]
    buffer.memory = 33554432
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    interceptor.classes = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 10
    max.block.ms = 5000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.fetch.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.ms = 50
    request.timeout.ms = 60000
    retries = 0
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    timeout.ms = 30000
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer

【问题讨论】:

    标签: java apache-kafka kafka-producer-api spring-kafka


    【解决方案1】:

    它没有显示错误,因为您已将 spring.kafka.producer.acks 设置为 0。 将其设置为 1,您的回调函数应该可以工作。然后你可以看到偏移量是否在增加。

    【讨论】:

    • 您的回答真的很有帮助。非常感谢。
    【解决方案2】:

    你的代码对我来说很好......

    @SpringBootApplication
    public class So46892185Application {
    
        public static void main(String[] args) {
            SpringApplication.run(So46892185Application.class, args);
        }
    
        private static final Logger LOGGER = LoggerFactory.getLogger(So46892185Application.class);
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                for (int i = 0; i < 10; i++) {
                    send(template, "foo" + i);
                }
            };
        }
    
        public void send(KafkaTemplate<String, String> template, String message) {
            ListenableFuture<SendResult<String, String>> result = template.send(topic().name(), message);
            result.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    
                @Override
                public void onSuccess(SendResult<String, String> result) {
                    LOGGER.info("sent message='{}'"
                            + " to partition={}"
                            + " with offset={}", message, result.getRecordMetadata().partition(),
                            result.getRecordMetadata().offset());
                }
    
                @Override
                public void onFailure(Throwable ex) {
                    LOGGER.error("Ex : " + ex.getMessage());
                }
    
            });
        }
    
        @Bean
        public NewTopic topic() {
            return new NewTopic("so46892185-3", 5, (short) 1);
        }
    
    }
    

    结果

    2017-10-23 11:12:05.907  INFO 86390 --- [ad | producer-1] com.example.So46892185Application        
    : sent message='foo3' to partition=1 with offset=0
    2017-10-23 11:12:05.907  INFO 86390 --- [ad | producer-1] com.example.So46892185Application        
    : sent message='foo8' to partition=1 with offset=1
    2017-10-23 11:12:05.907  INFO 86390 --- [ad | producer-1] com.example.So46892185Application        
    : sent message='foo1' to partition=2 with offset=0
    2017-10-23 11:12:05.907  INFO 86390 --- [ad | producer-1] com.example.So46892185Application        
    : sent message='foo6' to partition=2 with offset=1
    2017-10-23 11:12:05.907  INFO 86390 --- [ad | producer-1] com.example.So46892185Application        
    : sent message='foo0' to partition=0 with offset=0
    2017-10-23 11:12:05.907  INFO 86390 --- [ad | producer-1] com.example.So46892185Application        
    : sent message='foo5' to partition=0 with offset=1
    2017-10-23 11:12:05.907  INFO 86390 --- [ad | producer-1] com.example.So46892185Application        
    : sent message='foo4' to partition=3 with offset=0
    2017-10-23 11:12:05.907  INFO 86390 --- [ad | producer-1] com.example.So46892185Application        
    : sent message='foo9' to partition=3 with offset=1
    2017-10-23 11:12:05.907  INFO 86390 --- [ad | producer-1] com.example.So46892185Application        
    : sent message='foo2' to partition=4 with offset=0
    2017-10-23 11:12:05.907  INFO 86390 --- [ad | producer-1] com.example.So46892185Application        
    : sent message='foo7' to partition=4 with offset=1
    

    【讨论】:

    • 问题不是每次都来。我也能够成功地向 Kafka 发送消息。但是假设 2 到 3 小时后,消息从 Kafka 成功发送,但分区偏移量没有增加
    • 那么你需要在你的问题描述中更加精确。您还需要提供证据;光靠言语是不够的。由于您没有在日志消息中包含分区,因此很难看出如何声称存在问题。
    • 我在 Kafka 中生成了一条消息,我收到了这些日志成功日志onSuccess:36] sent message=''my Kafka message' to partition=4 with offset=-1,但我的消费者没有收到它,当我检查分区 4 偏移计数器时,它没有增加。我正在使用 Yahoo kafka-manager 作为监控工具。
    • 我已经编辑了我的问题并添加了 Kafka 生产者日志(由 Kafka 库打印的日志)。请看一看。另外,我错误地将my edit 添加到您的答案而不是我的问题中。我很抱歉,请忽略它。
    • 虽然我不想混淆,但请原谅我。我也收到错误Failed to send; nested exception is org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 5000 ms.。但这是我提出问题的独立案例。即即使我没有收到此错误消息,分区偏移计数器也不会增加。我添加了上述错误只是为了更深入地了解我的问题
    猜你喜欢
    • 2020-06-04
    • 2019-05-18
    • 1970-01-01
    • 2016-10-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多