【发布时间】:2019-08-07 03:19:13
【问题描述】:
我使用带有幂等生产者配置的spring-kafka:
这些是我的配置道具:
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers()));
//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, appProps.getJksLocation());
props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, appProps.getJksPassword());
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
props.put(ProducerConfig.RETRIES_CONFIG, 5);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
我的 kafka 生产者抛出 OutOfOrderSequenceException:
2019-03-06 21:25:47 Sender [ERROR] [Producer clientId=producer-1] 代理返回 org.apache.kafka.common.errors.OutOfOrderSequenceException:代理收到的序列号乱序偏移量 -1 处的主题分区主题 1。这表明代理上的数据丢失,应该进行调查。 2019-03-06 21:25:47 TransactionManager [INFO] [Producer clientId=producer-1] ProducerId 设置为 -1 纪元 -1 2019-03-06 21:25:47 ProducerKafka [ERROR] 我们在发送到 kafka 时遇到错误,请重试作业
我不确定为什么会抛出这个异常。我找不到具体的答案。该异常的官方 javadoc 声明如下:
这个异常表示broker从生产者那里收到了一个意外的序列号,这意味着数据可能已经丢失了。如果生产者仅配置为幂等性(即,如果设置了 enable.idempotence 并且未配置 transactional.id),则可以使用相同的生产者实例继续发送,但这样做会冒重新排序已发送记录的风险。对于事务性生产者,这是一个致命错误,您应该关闭生产者。
这是否意味着我需要使用事务生产者来避免这个问题?
KafkaProducer 文档说明了使上述声明模棱两可的内容:https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
要启用幂等性,必须将 enable.idempotence 配置设置为 true。如果设置,重试配置默认为 Integer.MAX_VALUE,max.in.flight.requests.per.connection 配置默认为 1,acks 配置默认为 all。幂等生产者没有 API 更改,因此无需修改现有应用程序即可利用此功能。
要利用幂等生产者,必须避免应用程序级别的重新发送,因为这些无法重复数据删除。因此,如果应用程序启用幂等性,建议不要设置重试配置,因为它将默认为 Integer.MAX_VALUE。另外,如果 send(ProducerRecord) 在无限次重试后仍返回错误(例如,如果消息在发送之前在缓冲区中过期),则建议关闭生产者并检查最后生成的消息的内容以确保它没有重复。最后,生产者只能保证单个会话内发送的消息的幂等性。
上面的声明清楚地表明,对于幂等生产者,我所需要的只是使用enable.idempotence 属性。但是,异常声明我必须使用该 transactional.id 属性。
在不必处理致命的OutOfOrderSequenceException 的情况下创建幂等异步生产者的正确方法是什么。
【问题讨论】:
标签: java apache-kafka kafka-producer-api spring-kafka