【问题标题】:Reactor Kafka producer- Unable to retryReactor Kafka 生产者 - 无法重试
【发布时间】:2020-04-22 12:19:18
【问题描述】:

我使用 (reactor.kafka.sender.KafkaSender) 使用 Reactor Kafka(Kafka 的功能性 Java API)创建了一个 KafkaProducer。使用以下生产者配置,

max.block.ms = 8000
request.timeout.ms= 4000
retries = 3
retry.backoff.ms = 2000
max.in.flight.requests.per.connection = 512
acks = all

当我尝试将记录发布到无效主题时,我收到超时异常

org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 8000 ms

正如预期的那样。但是我已经配置了没有发生的重试。我假设在max.block.ms / request.timeout.ms 已经过去之后,将在每个retry.backoff.ms 直到metadata.max.age.msretries 用尽后重试。 仅供参考,代码:

    String topic = "order/";
    int count = 1;
    Flux<SenderRecord<String, Event, EventInfo>> source = Flux.range(1, count).map(x -> {
      Event event = new Event();
      return SenderRecord.create(
            new ProducerRecord<String, Event>(topic, event.getX(),
                event), event.getEvent());
    });
    kafkaSender.send(source).subscribe(x -> System.out.println(x));
    kafkaSender.close();
  • 启用重试的配置是否正确?
  • request.timeout.ms/max.block.ms 之后何时重试?
  • 需要对上述代码进行哪些更改才能重试?

【问题讨论】:

    标签: java apache-kafka kafka-producer-api reactor-kafka


    【解决方案1】:

    我相信你也应该设置“delivery.timeout.ms”

    请参阅此处的文档:https://docs.confluent.io/current/installation/configuration/producer-configs.html#retries

    Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. Note additionally that produce requests will be failed before the number of retries has been exhausted if the timeout configured by delivery.timeout.ms expires first before successful acknowledgement. Users should generally prefer to leave this config unset and instead use delivery.timeout.ms to control retry behavior.

    【讨论】:

    • 我用的是apache kafka 2.0.0版,好像没有你说的这个配置设置。
    • 我相信即使使用 kafka 2.0.0 版它也应该存在,因为文档没有谈到弃用。你试过了吗?
    猜你喜欢
    • 2016-06-11
    • 2019-05-11
    • 2021-03-22
    • 2019-05-09
    • 2020-06-25
    • 1970-01-01
    • 2013-10-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多