【问题标题】:EmbeddedKafka AdminClient shuts down before Spring app starts for testsEmbeddedKafka AdminClient 在 Spring 应用程序启动测试之前关闭
【发布时间】:2019-04-08 22:54:38
【问题描述】:

我正在尝试为 Spring Kafka 应用程序(Spring Boot 2.0.6、Spring Kafka 2.1.10)编写集成测试,我看到了很多 INFO org.apache.zookeeper.server.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x166e432ebec0001 type:create cxid:0x5e zxid:0x24 txntype:-1 reqpath:n/a Error Path:/brokers/topics/my-topic/partitions Error:KeeperErrorCode = NoNode for /brokers/topics/my-topic/partitions 的实例和各种风格的路径(/brokers/brokers/topics 等)在 Spring 应用程序启动之前显示在日志中。然后 AdminClient 关闭并记录此消息:

DEBUG org.apache.kafka.common.network.Selector - [SocketServer brokerId=0] Connection with /127.0.0.1 disconnected
java.io.EOFException: null
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:124)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:93)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:235)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:196)
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:547)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:483)
at org.apache.kafka.common.network.Selector.poll(Selector.java:412)
at kafka.network.Processor.poll(SocketServer.scala:575)
at kafka.network.Processor.run(SocketServer.scala:492)
at java.lang.Thread.run(Thread.java:748)

我在测试中使用@ClassRule 启动选项,如下所示:

@ClassRule
@Shared
private KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 'my-topic')

,自动装配 KafkaTemplate,并根据嵌入的 Kafka 值设置连接的 Spring 属性:

def setupSpec() {
    System.setProperty('spring.kafka.bootstrap-servers', embeddedKafka.getBrokersAsString());
    System.setProperty('spring.cloud.stream.kafka.binder.zkNodes', embeddedKafka.getZookeeperConnectionString());
}

一旦 Spring 应用程序启动,我可以再次看到用户级 KeeperException 消息的实例:o.a.z.server.PrepRequestProcessor : Got user-level KeeperException when processing sessionid:0x166e445836d0001 type:setData cxid:0x6b zxid:0x2b txntype:-1 reqpath:n/a Error Path:/config/topics/__consumer_offsets Error:KeeperErrorCode = NoNode for /config/topics/__consumer_offsets

知道我在哪里出错了吗?我可以提供其他设置信息和日志消息,但只是对最初可能最有用的内容进行了有根据的猜测。

【问题讨论】:

标签: spring-boot spring-test spring-kafka spring-boot-test


【解决方案1】:

我不熟悉 Spock,但我知道 @KafkaListener 方法是在其自己的线程上调用的,因此您不能直接在 then: 块中断言它。

您需要确保在您的测试用例中以某种方式阻塞等待。

我尝试使用BlockingVariable 对抗真实服务而不是模拟,我在日志中看到您的println(message)。但是 BlockingVariable 仍然对我不起作用:

@DirtiesContext
@SpringBootTest(classes = [KafkaIntTestApplication.class])
@ActiveProfiles('test')
class CustomListenerSpec  extends Specification {

    @ClassRule
    @Shared
    public KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, 'my-topic')

    @Autowired
    private KafkaTemplate<String, String> template

    @SpyBean
    private SimpleService service

    final def TOPIC_NAME = 'my-topic'

    def setupSpec() {
        System.setProperty('spring.kafka.bootstrapServers', embeddedKafka.getBrokersAsString());
    }

    def 'Sample test'() {
        given:
        def testMessagePayload = "Test message"
        def message = MessageBuilder.withPayload(testMessagePayload).setHeader(KafkaHeaders.TOPIC, TOPIC_NAME).build()
        def result = new BlockingVariable<Boolean>(5)
        service.handleMessage(_) >> {
            result.set(true)
        }

        when: 'We put a message on the topic'
        template.send(message)

        then: 'the service should be called'
        result.get()
    }
}

而日志是这样的:

2018-11-05 13:38:51.089  INFO 8888 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [my-topic-0, my-topic-1]
Test message

BlockingVariable.get() timed out after 5,00 seconds

    at spock.util.concurrent.BlockingVariable.get(BlockingVariable.java:113)
    at com.example.CustomListenerSpec.Sample test(CustomListenerSpec.groovy:54)

2018-11-05 13:38:55.917  INFO 8888 --- [           main] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@11ebb1b6: startup date [Mon Nov 05 13:38:49 EST 2018]; root of context hierarchy

我还必须添加这个依赖:

testImplementation "org.hamcrest:hamcrest-core"

更新

好的。真正的问题是MockConfig 在测试上下文配置中不可见,而@Import(MockConfig.class) 可以解决问题。其中@Primary 还为我们提供了额外的信号,为测试类中的注入选择什么 bean。

【讨论】:

    【解决方案2】:

    @ArtemBilan 的回复让我走上了正确的道路,因此感谢他的加入,在查看了其他 BlockingVariable 文章和示例后,我能够弄清楚。我在模拟的响应中使用了BlockingVariable,而不是作为回调。当调用 mock 的响应时,将其设置为 true,then 块只执行result.get() 并且测试通过。

    @DirtiesContext
    @ActiveProfiles('test')
    @SpringBootTest
    @Import(MockConfig.class)
    class CustomListenerSpec extends TestSpecBase {
    
        @ClassRule
        @Shared
        private KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, TOPIC_NAME)
    
        @Autowired
        private KafkaTemplate<String, String> template
    
        @Autowired
        private SimpleService service
    
        final def TOPIC_NAME = 'my-topic'
    
        def setupSpec() {
            System.setProperty('spring.kafka.bootstrap-servers', embeddedKafka.getBrokersAsString());
        }
    
        def 'Sample test'() {
            def testMessagePayload = "Test message"
            def message = MessageBuilder.withPayload(testMessagePayload).setHeader(KafkaHeaders.TOPIC, TOPIC_NAME).build()
            def result = new BlockingVariable<Boolean>(5)
            service.handleMessage(_ as String) >> {
                result.set(true)
            }
    
            when: 'We put a message on the topic'
            template.send(message)
    
            then: 'the service should be called'
            result.get()
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-04-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多