【发布时间】:2020-04-22 12:19:18
【问题描述】:
我使用 (reactor.kafka.sender.KafkaSender) 使用 Reactor Kafka(Kafka 的功能性 Java API)创建了一个 KafkaProducer。使用以下生产者配置,
max.block.ms = 8000
request.timeout.ms= 4000
retries = 3
retry.backoff.ms = 2000
max.in.flight.requests.per.connection = 512
acks = all
当我尝试将记录发布到无效主题时,我收到超时异常
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 8000 ms
正如预期的那样。但是我已经配置了没有发生的重试。我假设在max.block.ms / request.timeout.ms 已经过去之后,将在每个retry.backoff.ms 直到metadata.max.age.ms 或retries 用尽后重试。
仅供参考,代码:
String topic = "order/";
int count = 1;
Flux<SenderRecord<String, Event, EventInfo>> source = Flux.range(1, count).map(x -> {
Event event = new Event();
return SenderRecord.create(
new ProducerRecord<String, Event>(topic, event.getX(),
event), event.getEvent());
});
kafkaSender.send(source).subscribe(x -> System.out.println(x));
kafkaSender.close();
- 启用重试的配置是否正确?
-
request.timeout.ms/max.block.ms之后何时重试? - 需要对上述代码进行哪些更改才能重试?
【问题讨论】:
标签: java apache-kafka kafka-producer-api reactor-kafka