【问题标题】:How spy an autowired bean in Spring Tests如何在 Spring 测试中监视自动装配的 bean
【发布时间】:2021-04-27 09:38:59
【问题描述】:

我有一个简单的日志记录处理程序 bean 配置,我将其注入 IntegrationFlow

@Configuration
class LogHandlerConfiguration {

    private LoggingHandler handler;

    @Bean
    public MessageHandler kafkaSuccessHandler() {
        return getLogger(LoggingHandler.Level.INFO);
    }

    @Bean(name="kafkaFailureHandler")
    public MessageHandler kafkaFailureHandler() {
        return getLogger(LoggingHandler.Level.ERROR);
    }

    private LoggingHandler getLogger(LoggingHandler.Level level) {
        handler = new LoggingHandler(level);
        handler.setShouldLogFullMessage(Boolean.TRUE);
        return handler;
    }
}

要测试的集成流程

@Bean
IntegrationFlow kafkaFailureFlow(ExecutorChannel kafkaErrorChannel, MessageHandler kafkaFailureHandler) {
    return IntegrationFlows.from(kafkaErrorChannel)
            .transform("payload.failedMessage")
            .handle(kafkaFailureHandler)
            .get();
}

这是我的测试

@SpyBean
MessageHandler kafkaFailureHandler;

@BeforeEach
public void setup() {
    MockitoAnnotations.openMocks(KafkaPublishFailureTest.class);
}

@Test
void testFailedKafkaPublish() {

    //Dummy message
    Map<String, String> map = new HashMap<>();
    map.put("key", "value");
    // Publish Message
    Message<Map<String, String>> message = MessageBuilder.withPayload(map)
            .setHeader("X-UPSTREAM-TYPE", "alm")
            .setHeader("X-UPSTREAM-INSTANCE", "jira")
            .setHeader("X-MESSAGE-KEY", "key-1")
            .build();

    kafkaGateway.publish(message);
    // Failure handler called
    Mockito.verify(kafkaFailureHandler, Mockito.timeout(0).atLeastOnce()).handleMessage(
            ArgumentMatchers.any(Message.class));
}

我们创建了一个通用的 Kafka 生产者和消费者配置,下游应用可以将最适合其需求的失败和成功处理程序附加到该配置。在这种情况下,我无法验证 LoggingHandler 至少被调用一次。

failureHandler 在由ThreadPoolTaskExecutor 支持的ExecturoeChannel 下执行

@Bean
ExecutorChannel kafkaErrorChannel(Executor threadPoolExecutor) {
    return MessageChannels.executor("kafkaErrorChannel", threadPoolExecutor).get();
}

通过重试建议处理失败

@Bean
RequestHandlerRetryAdvice retryAdvice(ExecutorChannel kafkaErrorChannel) {
    RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
    retryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(kafkaErrorChannel));
    return retryAdvice;
}

我在运行测试时收到此错误

java.lang.IllegalStateException: No bean found for definition [SpyDefinition@44dfdd58 name = '', typeToSpy = org.springframework.messaging.MessageHandler, reset = AFTER]
    at org.springframework.util.Assert.state(Assert.java:97) ~[spring-core-5.3.4.jar:5.3.4]
    at org.springframework.boot.test.mock.mockito.MockitoPostProcessor.inject(MockitoPostProcessor.java:351) ~[spring-boot-test-2.4.3.jar:2.4.3]

【问题讨论】:

标签: java spring spring-boot spring-integration


【解决方案1】:

这是我尝试过的方法,并且有效:

@SpringBootApplication
public class Demo1Application {

    public static void main(String[] args) {
        SpringApplication.run(Demo1Application.class, args);
    }

    @Bean
    ExecutorChannel kafkaErrorChannel(TaskExecutor taskExecutor) {
        return new ExecutorChannel(taskExecutor);
    }

    @Bean
    public MessageHandler kafkaFailureHandler() {
        LoggingHandler handler = new LoggingHandler(LoggingHandler.Level.ERROR);
        handler.setShouldLogFullMessage(Boolean.TRUE);
        return handler;
    }

    @Bean
    IntegrationFlow kafkaFailureFlow(ExecutorChannel kafkaErrorChannel, MessageHandler kafkaFailureHandler) {
        return IntegrationFlows.from(kafkaErrorChannel)
                .transform("payload.failedMessage")
                .handle(kafkaFailureHandler)
                .get();
    }

}

@SpringBootTest
class Demo1ApplicationTests {

    @Autowired
    ExecutorChannel kafkaErrorChannel;

    @SpyBean
    MessageHandler kafkaFailureHandler;

    @Test
    void testSpyBean() throws InterruptedException {
        MessagingException payload = new MessageHandlingException(new GenericMessage<>("test"));
        this.kafkaErrorChannel.send(new ErrorMessage(payload));
        Thread.sleep(1000);
        Mockito.verify(this.kafkaFailureHandler).handleMessage(ArgumentMatchers.any(Message.class));
    }

}

也许您的问题是您没有将LogHandlerConfiguration 包含到您的@SpringBootTest 配置中。这就是为什么我要求一个简单的项目来玩。 您具有所有这些属性的代码过于自定义,无法复制/粘贴到我的环境中......

还要注意Thread.sleep(1000);。由于您的kafkaErrorChannelExecutorChannel,因此消息消耗发生在另一个线程上,离开您的主测试线程并由于竞争条件而导致失败。很难猜出正确的时机,所以最好存根模拟方法来满足一些线程屏障,例如new CountDownLatch(1),然后在测试中等待它。

题外话你也可以研究一下 Spring 集成测试框架:https://docs.spring.io/spring-integration/docs/current/reference/html/testing.html#test-context

【讨论】:

  • 我在Mockito 中使用timeout 方法无济于事;我会试着把闩锁存起来。
  • 您解决了No bean found for definition 的原始问题吗?
  • 不,我也尝试了MockIntegrationContextChannelInterceptor,但由于我的等待逻辑无效,它也失败了。
  • 那个频道真的必须是ExecutorChannel吗?尽管我认为这已经与原始问题无关。关于这件事还有什么我可以帮你的吗?
  • 好吧,这段代码进入了一个处理所有来自 Kafka 的 pub/sub 的集线器,使用 ExecutorChannel 不会提高吞吐量吗?
【解决方案2】:

所以,被挂断了 为什么不 @SpyBean?有两个问题

  1. 成功和失败处理程序都是 MessageHandlers 混淆@SpyBean
  2. Kafka生产者等待时间太长,即1000ms

这是最后的工作,使用命名的 bean

@Bean("kafkaFailureHandler")
public MessageHandler kafkaFailureHandler() {
    LoggingHandler handler = new LoggingHandler(LoggingHandler.Level.INFO);
    handler.setShouldLogFullMessage(Boolean.TRUE);
    return handler;
}

然后在测试中也减少最大块

@DirtiesContext
@SpringBootTest(classes = {KafkaHandlerConfiguration.class, SwiftalkKafkaGateway.class})
@SpringIntegrationTest(noAutoStartup = {"kafkaFailureFlow"})
@TestPropertySource(properties = {
        "spring.main.banner-mode=off",
        "logging.level.root=INFO",
        "logging.level.org.springframework=INFO",
        "logging.level.org.springframework.integration=DEBUG",
        "spring.kafka.producer.properties.max.block.ms=50",
        "spring.kafka.producer.bootstrap-servers=localhost:9999",
        "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
        "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
})
public class KafkaPublishFailureTest {

    private static final Logger log = LogManager.getLogger(KafkaPublishFailureTest.class);

    @Autowired
    SwiftalkKafkaGateway kafkaGateway;

    @SpyBean(name = "kafkaFailureHandler")
    MessageHandler kafkaFailureHandler;

    @Test
    @SuppressWarnings("all")
    void testFailedKafkaPublish() throws InterruptedException {

        //Dummy message
        Map<String, String> map = new HashMap<>();
        map.put("key", "value");
        // Publish Message
        Message<Map<String, String>> message = MessageBuilder.withPayload(map)
                .setHeader("X-UPSTREAM-TYPE", "alm")
                .setHeader("X-UPSTREAM-INSTANCE", "jira")
                .setHeader("X-MESSAGE-KEY", "key-1")
                .build();

        kafkaGateway.publish(message);
        verify(this.kafkaFailureHandler, timeout(500)).handleMessage(any(Message.class));
    }

}

注意spring.kafka.producer.properties.max.block.ms=50verify(this.kafkaFailureHandler, timeout(500)).handleMessage(any(Message.class));

【讨论】:

    猜你喜欢
    • 2014-03-11
    • 2014-11-18
    • 2022-08-20
    • 1970-01-01
    • 2019-05-17
    • 2020-11-19
    • 1970-01-01
    • 1970-01-01
    • 2022-09-23
    相关资源
    最近更新 更多