【发布时间】:2021-04-27 09:38:59
【问题描述】:
我有一个简单的日志记录处理程序 bean 配置,我将其注入 IntegrationFlow
@Configuration
class LogHandlerConfiguration {
private LoggingHandler handler;
@Bean
public MessageHandler kafkaSuccessHandler() {
return getLogger(LoggingHandler.Level.INFO);
}
@Bean(name="kafkaFailureHandler")
public MessageHandler kafkaFailureHandler() {
return getLogger(LoggingHandler.Level.ERROR);
}
private LoggingHandler getLogger(LoggingHandler.Level level) {
handler = new LoggingHandler(level);
handler.setShouldLogFullMessage(Boolean.TRUE);
return handler;
}
}
要测试的集成流程
@Bean
IntegrationFlow kafkaFailureFlow(ExecutorChannel kafkaErrorChannel, MessageHandler kafkaFailureHandler) {
return IntegrationFlows.from(kafkaErrorChannel)
.transform("payload.failedMessage")
.handle(kafkaFailureHandler)
.get();
}
这是我的测试
@SpyBean
MessageHandler kafkaFailureHandler;
@BeforeEach
public void setup() {
MockitoAnnotations.openMocks(KafkaPublishFailureTest.class);
}
@Test
void testFailedKafkaPublish() {
//Dummy message
Map<String, String> map = new HashMap<>();
map.put("key", "value");
// Publish Message
Message<Map<String, String>> message = MessageBuilder.withPayload(map)
.setHeader("X-UPSTREAM-TYPE", "alm")
.setHeader("X-UPSTREAM-INSTANCE", "jira")
.setHeader("X-MESSAGE-KEY", "key-1")
.build();
kafkaGateway.publish(message);
// Failure handler called
Mockito.verify(kafkaFailureHandler, Mockito.timeout(0).atLeastOnce()).handleMessage(
ArgumentMatchers.any(Message.class));
}
我们创建了一个通用的 Kafka 生产者和消费者配置,下游应用可以将最适合其需求的失败和成功处理程序附加到该配置。在这种情况下,我无法验证 LoggingHandler 至少被调用一次。
failureHandler 在由ThreadPoolTaskExecutor 支持的ExecturoeChannel 下执行
@Bean
ExecutorChannel kafkaErrorChannel(Executor threadPoolExecutor) {
return MessageChannels.executor("kafkaErrorChannel", threadPoolExecutor).get();
}
通过重试建议处理失败
@Bean
RequestHandlerRetryAdvice retryAdvice(ExecutorChannel kafkaErrorChannel) {
RequestHandlerRetryAdvice retryAdvice = new RequestHandlerRetryAdvice();
retryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(kafkaErrorChannel));
return retryAdvice;
}
我在运行测试时收到此错误
java.lang.IllegalStateException: No bean found for definition [SpyDefinition@44dfdd58 name = '', typeToSpy = org.springframework.messaging.MessageHandler, reset = AFTER]
at org.springframework.util.Assert.state(Assert.java:97) ~[spring-core-5.3.4.jar:5.3.4]
at org.springframework.boot.test.mock.mockito.MockitoPostProcessor.inject(MockitoPostProcessor.java:351) ~[spring-boot-test-2.4.3.jar:2.4.3]
【问题讨论】:
-
你能分享一个简单的项目让我们玩和重现吗?
标签: java spring spring-boot spring-integration