【发布时间】:2021-12-28 07:21:52
【问题描述】:
我正在尝试通过实现 Spring 云功能来创建一个 kafka。我正在使用 spring-boot 版本 2.4.0 和 spring cloud 版本:2020.0.0
@Slf4j
@Component
public class ArticleEventPublisher implements Function<ArticleAggregatedDocument, Message<ArticleMessage>> {
private final ArticleMessageMapper articleMessageMapper;
public ArticleEventPublisher(
ArticleMessageMapper articleMessageMapper) {
this.articleMessageMapper = articleMessageMapper;
}
@Override
public Message<ArticleMessage> apply(ArticleAggregatedDocument articleAggregatedDocument) {
String destination ="articleAggregated";
log.info("CREATING MESSAGE IN FUNCTION BEAN");
return createMessage( articleAggregatedDocument, destination);
}
private Message<ArticleMessage> createMessage(ArticleAggregatedDocument articleAggregatedDocument, String destination) {
ArticleMessage articleMessage = articleMessageMapper.apply(articleAggregatedDocument);
return MessageBuilder
.withPayload(articleMessage)
.setHeader(KafkaHeaders.MESSAGE_KEY, articleMessage.getKey().getBytes())
.setHeader("spring.cloud.stream.sendto.destination", destination)
.build();
}
}
我知道它不是动态绑定并且目标是硬编码的,但是,我现在想测试这个特定的实现。 尽管应用程序中的控件确实转到了类并且正在创建消息,但我在“目标”中看不到它们。
我在 application.properties 中添加了以下内容:
spring.cloud.function.routing.enabled= true
spring.cloud.function.scan.packages= **publisher package name**
spring.cloud.stream.bindings.articleAggregated.destination=article.aggregated
spring.cloud.stream.bindings.articleAggregated.contentType=application/json
spring.cloud.stream.bindings.articleAggregated.producer.autoStartup=false
spring.cloud.stream.bindings.articleAggregated.producer.headerMode=none
spring.cloud.stream.kafka.bindings.articleAggregated.producer.configuration.client.id=articleAggregated
spring.cloud.stream.kafka.bindings.articleAggregated.producer.sync=true
在这里我还要提一下,IDEA intelliJ 中不再解析以下属性配置:
spring.cloud.function.routing.enabled
spring.cloud.function.scan.packages
它们被弃用了吗?我在文档中找不到任何关于它的信息。
从文档中我还读到,如果是多绑定器,应该为每个生产者/消费者添加绑定器,但是我们只为应用程序中的所有消费者和生产者使用一个绑定器,所以我没有添加它。但我确实在绝望的时刻尝试过:
spring.cloud.stream.bindings.articleAggregated.binder=kafka
我在这里缺少什么?我已经为此停留了一段时间,同时切换到 Sink+Supplier 实施作为替代方案。但我想了解为什么这个实现不起作用。
【问题讨论】:
-
您的配置中发生了很多事情,难以理解。例如,
articleAggregated是什么?你正在使用什么版本?等等。 。你能创建一个最低限度的示例项目,将其推送到 github 并在此处留下链接,以便我们查看 -
感谢您抽出宝贵时间阅读本文。我已经上传了一个示例项目。 github.com/rshm/SCF/tree/master
标签: spring-boot apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka spring-cloud-function