【发布时间】:2021-12-21 08:19:52
【问题描述】:
我目前有一个spring cloud stream应用,有一个监听函数,主要监听某个topic,依次执行如下:
- 使用来自主题的消息
- 在数据库中存储消费消息
- 调用外部服务获取一些信息
- 处理数据
- 在数据库中记录结果
- 将消息发送到另一个主题
- 确认消息(我已将确认模式设置为手动)
我们决定迁移到 Spring Cloud 功能,我已经能够使用Function 接口完成上述几乎所有步骤,以源主题为输入,接收器主题为输出。
@Bean
public Function<Message<NotificationMessage>, Message<ValidatedEvent>> validatedProducts() {
return message -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
return MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
}
}
我的问题与第 7 步(确认消息)中的异常处理有关。只有当我们确定消息已成功发送到 sink 队列时,我们才会确认该消息,否则我们不会确认该消息。
我的问题是,这样的事情怎么能在Spring cloud function中实现,特别是send方法完全依赖于Spring Framework(作为函数接口实现评估的结果)。
之前,我们可以通过 try/catch 来做到这一点
@StreamListener(value = NotificationMesage.INPUT)
public void onMessage(Message<NotificationMessage> message) {
try {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
String status = restEndpoint.getStatusFor(message.getPayload());
ValidatedEvent event = getProcessingResult(message.getPayload(), status);
Message message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
.build();
kafkaTemplate.send(message);
notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
}catch (Exception exception){
notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
}
}
Function接口成功返回后是否有监听器触发,类似KafkaSendCallback但不指定模板
【问题讨论】:
-
请注意,上面的摘录是简化的,旨在描述原始代码跨越多个类的概念
标签: spring spring-cloud spring-kafka spring-cloud-stream spring-cloud-function