【发布时间】:2022-01-09 10:31:04
【问题描述】:
我刚刚升级了一个使用 Spring Cloud Stream Kafka 生产者和消费者的 Spring Boot 应用程序
plugins {
id("org.springframework.boot") version "2.6.1"
...
}
extra["springCloudVersion"] = "2021.0.0"
extra["springCloudStreamVersion"] = "3.2.1"
应用程序不再启动,但出现以下异常:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
...
Caused by: java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorFluxStream(TraceFunctionAroundWrapper.java:187)
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorStream(TraceFunctionAroundWrapper.java:120)
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:97)
at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:47)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$1.doApply(SimpleFunctionRegistry.java:256)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:550)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.bindFunctionToDestinations(FunctionConfiguration.java:512)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.afterPropertiesSet(FunctionConfiguration.java:418)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
... 16 common frames omitted
我错过了任何升级指南还是一个错误?
制片人
@Component
class EventProducer(@Qualifier("eventSink") private val eventProcessor: Sinks.Many<Message<EventReceived>>) {
private val logger = LoggerFactory.getLogger(javaClass)
fun send(event: EventReceived): Mono<EventReceived> {
return Mono.defer {
val message = MessageBuilder.withPayload(event)
.setHeader(MESSAGE_KEY, event.id)
.setHeader(TIMESTAMP, OffsetDateTime.now().toInstant().toEpochMilli())
.build()
logger.info("Sending event {}", event)
while (eventProcessor.tryEmitNext(message).isFailure) {
LockSupport.parkNanos(10)
}
event.toMono()
}.subscribeOn(Schedulers.boundedElastic())
}
消费者
@Configuration
class MetricConsumer(...) {
private val logger = LoggerFactory.getLogger(javaClass)
@Bean
fun consumeMetricUpdated(): Function<Flux<Message<MetricUpdated>>, Mono<Void>> {
...
}
【问题讨论】:
-
你能展示一下你的函数是什么样子的吗?
标签: spring spring-boot spring-cloud spring-cloud-stream