【发布时间】:2021-09-29 13:32:09
【问题描述】:
我编写了一个 Spring Cloud Stream 应用程序,其中生产者将消息发布到指定的 kafka 主题。我的问题是如何添加生产者回调来接收消息已成功发布在主题上的确认/确认?就像我们在 spring kafka producer.send(record, new callback { ... }) 中所做的一样(维护异步生产者)。以下是我的代码:
private final Sinks.Many<Message<?>> responseProcessor = Sinks.many().multicast().onBackpressureBuffer();
@Bean
public Supplier<Flux<Message<?>>> event() {
return responseProcessor::asFlux;
}
public Message<?> publishEvent(String status) {
try {
String key = ...;
response = MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.build();
responseProcessor.tryEmitNext(response);
}
如何确保 tryEmitNext 已成功写入主题?
实施 ProducerListener 是一种解决方案并且可能吗?在 Spring Cloud Stream 中找不到具体的解决方案/文档
更新
我现在已经在下面实现了,似乎可以按预期工作
@Component
public class MyProducerListener<K, V> implements ProducerListener<K, V> {
@Override
public void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
// Do nothing on onSuccess
}
@Override
public void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
log.error("Producer exception occurred while publishing message : {}, exception : {}", producerRecord, exception);
}
}
@Bean
ProducerMessageHandlerCustomizer<KafkaProducerMessageHandler<?, ?>> customizer(MyProducerListener pl) {
return (handler, destinationName) -> handler.getKafkaTemplate().setProducerListener(pl);
}
【问题讨论】:
标签: spring-boot apache-kafka spring-kafka spring-cloud-stream kafka-producer-api