【发布时间】:2018-01-25 14:05:36
【问题描述】:
我编写了使用 Kafka 库的 Java 程序,我听说 Kafka Producer 有内部缓冲区来保存消息,以便稍后重试。所以我用重试属性创建了 Idempotent Kafka Producer。
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, System.getenv(KafkaConstants.KAFKA_URL));
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put("linger.ms", 1000);
props.put("acks", "all");
props.put("request.timeout.ms",60000);
props.put("retries",3);
props.put("retry.backoff.ms",1000);
props.put("max.in.flight.requests.per.connection",1);
props.put("enable.idempotence",true);
在运行程序之前,我让 Kafka 服务器(只有一个代理)处于关闭状态。当我运行程序时,出现异常“60000 毫秒后更新元数据失败”。但是当我重新启动 Kafka 服务器时,它应该将数据推送到 kafka 主题,因为我已经给出了重试属性。
请在这方面提供帮助。
谢谢, 普里亚姆萨卢加
【问题讨论】:
标签: apache-kafka kafka-producer-api