【问题标题】:Producer callback in Spring Cloud Stream with reactor core publisherSpring Cloud Stream 中的生产者回调与反应堆核心发布者
【发布时间】:2021-09-29 13:32:09
【问题描述】:

我编写了一个 Spring Cloud Stream 应用程序,其中生产者将消息发布到指定的 kafka 主题。我的问题是如何添加生产者回调来接收消息已成功发布在主题上的确认/确认?就像我们在 spring kafka producer.send(record, new callback { ... }) 中所做的一样(维护异步生产者)。以下是我的代码:

private final Sinks.Many<Message<?>> responseProcessor = Sinks.many().multicast().onBackpressureBuffer();

@Bean
public Supplier<Flux<Message<?>>> event() {
        return responseProcessor::asFlux;
    }    

public Message<?> publishEvent(String status) {
  try {
      String key = ...;
      response = MessageBuilder.withPayload(payload)
                  .setHeader(KafkaHeaders.MESSAGE_KEY, key)
                  .build();
      responseProcessor.tryEmitNext(response);
  } 

如何确保 tryEmitNext 已成功写入主题?

实施 ProducerListener 是一种解决方案并且可能吗?在 Spring Cloud Stream 中找不到具体的解决方案/文档

更新

我现在已经在下面实现了,似乎可以按预期工作

@Component
public class MyProducerListener<K, V> implements ProducerListener<K, V> {

    @Override
    public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
        // Do nothing on onSuccess
    }

    @Override
    public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
        log.error("Producer exception occurred while publishing message : {}, exception : {}", producerRecord, exception);
    }

}

 @Bean
    ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> customizer(MyProducerListener pl) {
        return (handler, destinationName) -> handler.getKafkaTemplate().setProducerListener(pl);
    }

【问题讨论】:

    标签: spring-boot apache-kafka spring-kafka spring-cloud-stream kafka-producer-api


    【解决方案1】:

    请参阅Kafka Producer Properties

    recordMetadataChannel

    MessageChannel 的 bean 名称,成功发送结果应该发送到该名称; bean 必须存在于应用程序上下文中。发送到通道的消息是带有附加标头KafkaHeaders.RECORD_METADATA 的已发送消息(转换后,如果有)。头部包含Kafka客户端提供的RecordMetadata对象;它包括在主题中写入记录的分区和偏移量。

    ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

    发送失败进入生产者错误通道(如果已配置);请参阅错误通道。默认值:空

    您可以添加一个@ServiceActivator 以从该频道异步消费。

    【讨论】:

    • 谢谢加里。我用我已经完成的实现更新了问题。这种方法会有什么问题吗?我不这么认为。我不想引入错误通道,没有这个要求和支持。只需记录未发布的消息就足够了!
    • 是的;这是一个有效的解决方案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-23
    • 1970-01-01
    • 1970-01-01
    • 2020-01-28
    • 1970-01-01
    相关资源
    最近更新 更多