【问题标题】:Spring cloud function Function interface return success/failure handlingspring cloud function 函数接口返回成功/失败处理
【发布时间】:2021-12-21 08:19:52
【问题描述】:

我目前有一个spring cloud stream应用,有一个监听函数,主要监听某个topic,依次执行如下:

  1. 使用来自主题的消息
  2. 在数据库中存储消费消息
  3. 调用外部服务获取一些信息
  4. 处理数据
  5. 在数据库中记录结果
  6. 将消息发送到另一个主题
  7. 确认消息(我已将确认模式设置为手动)

我们决定迁移到 Spring Cloud 功能,我已经能够使用Function 接口完成上述几乎所有步骤,以源主题为输入,接收器主题为输出。

@Bean
public Function<Message<NotificationMessage>, Message<ValidatedEvent>> validatedProducts() {
    return message -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
        return MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
    }
}

我的问题与第 7 步(确认消息)中的异常处理有关。只有当我们确定消息已成功发送到 sink 队列时,我们才会确认该消息,否则我们不会确认该消息。

我的问题是,这样的事情怎么能在Spring cloud function中实现,特别是send方法完全依赖于Spring Framework(作为函数接口实现评估的结果)。

之前,我们可以通过 try/catch 来做到这一点

@StreamListener(value = NotificationMesage.INPUT)
public void onMessage(Message<NotificationMessage> message) {
    try {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        
        Message message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
        kafkaTemplate.send(message);
        
        notificationMessageService.saveOrUpdate(notificationMessage, 1, true);
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
    }catch (Exception exception){
        notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
    }
}

Function接口成功返回后是否有监听器触发,类似KafkaSendCallback但不指定模板

【问题讨论】:

  • 请注意,上面的摘录是简化的,旨在描述原始代码跨越多个类的概念

标签: spring spring-cloud spring-kafka spring-cloud-stream spring-cloud-function


【解决方案1】:

Spring Cloud Stream 不知道函数。它与之前的消息处理程序相同,因此与之前使用的回调方法相同的方法可以用于函数。所以也许你可以分享一些可以澄清你的意思的代码?我也不明白你的意思 ..send 方法完全依赖于 Spring Framework..

【讨论】:

  • 感谢您的及时回复!我已经包含了我的 spring 功能代码示例。我的意思是“方法完全依赖于 Spring 框架”是指 Spring 框架完全处理消息路由和发送过程,然后我自己会通过 KafkaTemplate 执行此操作,因此如果发送操作成功或可以得到回调失败
【解决方案2】:

在上述 Oleg 的基础上,如果您想严格恢复 StreamListener 代码中的行为,您可以尝试以下方法。您可以切换到消费者,然后像以前一样使用KafkaTemplate 进行出站发送,而不是使用函数。

@Bean
public Consumer<Message<NotificationMessage>> validatedProducts() {
return message -> {
  try{
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);

        notificationMessageService.saveOrUpdate(notificationMessage, 0, false);
        String status = restEndpoint.getStatusFor(message.getPayload());
        ValidatedEvent event = getProcessingResult(message.getPayload(), status);
        
        Message message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.MESSAGE_KEY, event.getKey().getBytes())
                .build();
        kafkaTemplate.send(message); //here, you make sure that the data was sent successfully by using some callback. 
       //only ack if the data was sent successfully. 
        Optional.ofNullable(acknowledgment).ifPresent(Acknowledgment::acknowledge);
        
  }
  catch (Exception exception){
        notificationMessageService.saveOrUpdate(notificationMessage, 1, false);
    }
  };

}

另一件值得研究的事情是使用 Kafka 事务,在这种情况下,如果它不能端到端工作,则不会发生确认。 Spring Cloud Stream binder 基于 Spring for Apache Kafka 的基础对此提供了支持。更多详情hereHere 是 Spring Cloud Stream 文档。

【讨论】:

  • 感谢您的及时回复!虽然这肯定是一个选项,但我们希望使用 Function 接口,因为它更适合用例,并希望有一种方法能够做到这一点,而无需将其作为消费者处理。交易听起来不错,我需要先浏览您分享的文档。
【解决方案3】:

好吧,所以我选择的实际上是不使用 KafkaTemplate(或 streamBridge)。虽然这是一个可行的解决方案,但这意味着我的 Function 将被拆分为 Consumer 和某种临时提供的(在本例中为 KafkaTemplate)。

由于我想坚持功能接口的设计目标,我已经在 ProducerListener 接口实现中隔离了数据库更新的行为

@Configuration
public class ProducerListenerConfiguration {
    private final MongoTemplate mongoTemplate;

    public ProducerListenerConfiguration(MongoTemplate mongoTemplate) {
        this.mongoTemplate = mongoTemplate;
    }

    @Bean
    public ProducerListener myProducerListener() {
        return new ProducerListener() {
            @SneakyThrows
            @Override
            public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
                final ValidatedEvent event = new ObjectMapper().readerFor(ValidatedEvent.class).readValue((byte[]) producerRecord.value());
                final var updateResult = updateDocumentProcessedState(event.getKey(), event.getPayload().getVersion(), true);
            }

            @SneakyThrows
            @Override
            public void onError(ProducerRecord producerRecord, @Nullable RecordMetadata recordMetadata, Exception exception) {
                ProducerListener.super.onError(producerRecord, recordMetadata, exception);
            }
        };
    }

    public UpdateResult updateDocumentProcessedState(String id, long version, boolean isProcessed) {
        Query query = new Query();
        query.addCriteria(Criteria.where("_id").is(id));
        Update update = new Update();
        update.set("processed", isProcessed);
        update.set("version", version);
        return mongoTemplate.updateFirst(query, update, ProductChangedEntity.class);
    }
}

然后每次成功尝试,数据库都会更新处理结果和更新的版本号。

【讨论】:

    猜你喜欢
    • 2014-07-21
    • 2015-03-25
    • 1970-01-01
    • 2020-06-11
    • 2018-01-11
    • 1970-01-01
    • 2021-08-07
    • 2011-10-16
    • 1970-01-01
    相关资源
    最近更新 更多