【发布时间】:2019-04-08 22:54:38
【问题描述】:
我正在尝试为 Spring Kafka 应用程序(Spring Boot 2.0.6、Spring Kafka 2.1.10)编写集成测试,我看到了很多 INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x166e432ebec0001 type:create cxid:0x5e zxid:0x24 txntype:-1 reqpath:n/a Error Path:/brokers/topics/my-topic/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/my-topic/partitions 的实例和各种风格的路径(/brokers, /brokers/topics 等)在 Spring 应用程序启动之前显示在日志中。然后 AdminClient 关闭并记录此消息:
DEBUG org.apache.kafka.common.network.Selector - [SocketServer brokerId=0] Connection with /127.0.0.1 disconnected
java.io.EOFException: null
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:547)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:483)
at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
at kafka.network.Processor.poll(SocketServer.scala:575)
at kafka.network.Processor.run(SocketServer.scala:492)
at java.lang.Thread.run(Thread.java:748)
我在测试中使用@ClassRule 启动选项,如下所示:
@ClassRule
@Shared
private KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 'my-topic')
,自动装配 KafkaTemplate,并根据嵌入的 Kafka 值设置连接的 Spring 属性:
def setupSpec() {
System.setProperty('spring.kafka.bootstrap-servers', embeddedKafka.getBrokersAsString());
System.setProperty('spring.cloud.stream.kafka.binder.zkNodes', embeddedKafka.getZookeeperConnectionString());
}
一旦 Spring 应用程序启动,我可以再次看到用户级 KeeperException 消息的实例:o.a.z.server.PrepRequestProcessor : Got user-level KeeperException when processing sessionid:0x166e445836d0001 type:setData cxid:0x6b zxid:0x2b txntype:-1 reqpath:n/a Error Path:/config/topics/__consumer_offsets Error:KeeperErrorCode = NoNode for /config/topics/__consumer_offsets。
知道我在哪里出错了吗?我可以提供其他设置信息和日志消息,但只是对最初可能最有用的内容进行了有根据的猜测。
【问题讨论】:
-
也许你可以和我们分享一个简单的 GitHub 项目来玩和重现?谢谢
-
@ArtemBilan 这是一个示例项目,感谢您查看:github.com/nateha1984/kafka-int-test-sample
标签: spring-boot spring-test spring-kafka spring-boot-test