【问题标题】:Catching exception in Kafkalistener Integration test在 Kafkalistener 集成测试中捕获异常
【发布时间】:2020-06-30 04:17:07
【问题描述】:

所以我需要为我的 kafkalistener 方法创建一个集成测试,其中测试期望 ListenerExecutionFailedException 实际上被抛出,因为消息在消费期间由于另一个服务处于非活动状态而失败。

下面是测试代码,我使用 Embeddedkafkabroker 作为生产者和消费者:

@Test(expected = ListenerExecutionFailedException.class)
public void shouldThrowException() {

    RecordHeaders recordHeaders = new RecordHeaders();
    recordHeaders.add(new RecordHeader("messageType", "bootstrap".getBytes()));
    recordHeaders.add(new RecordHeader("userId", "123".getBytes()));
    recordHeaders.add(new RecordHeader("applicationId", "1234".getBytes()));
    recordHeaders.add(new RecordHeader("correlationId", UUID.randomUUID().toString().getBytes()));

    ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
        "TEST_TOPIC",
        1,
        null,
        "message",
        "",
        recordHeaders);
    producer.send(producerRecord);


    consumer.subscribe(Collections.singleton("TEST_TOPIC"));

    consumer.poll(Duration.ofSeconds(2));

}

我想知道的是异常被认为未引发并且测试失败,即使我知道侦听器确实收到了消息并且由于我在日志上看到它们而引发了异常。

即使我将预期更改为 Throwable,似乎也没有检测到异常。

我应该怎么做才能让 Junit 检测到异常?

另外,另一个有趣的事情是我试图模拟在侦听器中调用的服务类并返回一些虚拟值,但是当我使用 Mockito.verify 时服务没有被调用

【问题讨论】:

    标签: spring-boot junit spring-kafka spring-kafka-test


    【解决方案1】:

    你好像有什么误会。

    producer.send

    consumer.poll

    您是直接调用 kafka-clients 并且在此测试中根本没有使用 Spring。

    ListenerExecutionFailedException是Spring的监听容器包装了消息监听器抛出的用户异常的异常。

    【讨论】:

    • 您究竟如何使用 Spring 发送和接收测试消息,以便调用您应用程序中的 @KafkaListener 方法并在抛出异常时捕获它们?仅供参考,我已将预期异常更改为运行时和可抛出异常,还使用 ​​kafkatemplate 发送消息
    • 监听器在不同的线程中运行。如果要在单元测试中捕获异常,则必须直接从测试中调用侦听器,而不是向 kafka 发送消息。如果你想创建一个集成测试,你可以包装你的监听器——例如,参见 [this answer](监听器在不同的线程中运行)。
    猜你喜欢
    • 1970-01-01
    • 2020-05-13
    • 1970-01-01
    • 2021-07-13
    • 2022-01-02
    • 1970-01-01
    • 2016-06-16
    • 1970-01-01
    • 2020-10-02
    相关资源
    最近更新 更多