【发布时间】:2019-11-12 10:16:54
【问题描述】:
在服务器上使用 kafka 1.1.0 版本并在生产者客户端中使用相同的 spring 依赖项:
Gradle 依赖: 实现组:'org.apache.kafka',名称:'kafka_2.11',版本:'1.1.0'
我创建了一个复制因子为 3 和单个分区的主题。服务器端一个 3 节点 kafka 集群。当所有节点都启动生产者并正常发送消息时。当单个节点出现故障时,生产者继续正常工作。当第二个节点(共 3 个)关闭时,我希望生产者在超过 RF - 1 个节点关闭时立即抛出异常。在这种情况下,它将是 2 个节点。
这是我的 java 生产者代码。
生产者配置:
Properties props = new Properties();
props.put("bootstrap.servers","host1:9092,host2:9092,host3:9092);
props.put("min.insync.replicas","2");
props.put("default.replication.factor","3");
props.put("acks","all");
props.put("retries","1");
props.put("batch.size","16384");
props.put("linger.ms","1");
props.put("buffer.memory","33554432");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("num.partitions","1");
发件人代码:
String data = "some data";
String topic = "testTopic";
try {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(
topic, data);
RecordMetadata m = producer.send(producerRecord).get(); // Publish message to topic
logger.info("Message application id: {} ", appId);
logger.info("Message produced, offset: " + m.offset());
logger.info("Message produced, partition : " + m.partition());
logger.info("Message produced, topic: " + m.topic());
logger.info("Data sent to topic: {} ", topic);
}catch (Exception e) {
logger.error("Error:\n", e);
producer.close();
}
通过以上配置和代码,生产者可以在一个节点启动的情况下发送消息。我试图通过使用控制台生产者“kafka-console-producer.sh”来缩小问题范围,并观察到相同的行为。当 2 个节点宕机时,控制台生产者不会失败。
【问题讨论】:
-
这对 Spring 有什么作用?看起来只是简单的 Apache Kafka 问题。此外,您完全不要在您的问题中使用 Spring。那么,为什么要迷惑人呢?感谢理解
-
注意:你可以在 Gradle 中将
kafka_2.11替换为kafka-clients
标签: java apache-kafka