【问题标题】:ReplyingKafkaTemplate with Spring Cloud Stream使用 Spring Cloud Stream 回复 KafkaTemplate
【发布时间】:2019-08-07 22:51:32
【问题描述】:

是否可以将ReplyingKafkaTemplate 与 Spring Cloud Stream 一起使用?是否有任何配置代码示例以便使用它?

【问题讨论】:

  • 您想在@StreamListener 中使用它还是想将其用作客户端,@StreamListener 作为服务器?
  • 第二个选项。我想发布一条由 @StreamListener 在不同服务中处理的消息并等待响应。

标签: spring-kafka spring-cloud-stream


【解决方案1】:

这一切都在一个应用程序中,但说明了它是如何工作的......

@SpringBootApplication
@EnableBinding(Processor.class)
public class So57380643Application {

    public static void main(String[] args) {
        SpringApplication.run(So57380643Application.class, args).close();
    }

    @Bean
    public ReplyingKafkaTemplate<byte[], byte[], byte[]> replyer(ProducerFactory<byte[], byte[]> pf,
            ConcurrentMessageListenerContainer<byte[], byte[]> replyContainer) {

        return new ReplyingKafkaTemplate<>(pf, replyContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<byte[], byte[]> replyContainer(
            ConcurrentKafkaListenerContainerFactory<byte[], byte[]> factory) {

        ConcurrentMessageListenerContainer<byte[], byte[]> container = factory.createContainer("replyTopic");
        container.getContainerProperties().setGroupId("replies.group");
        return container;
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public String listen(String in) {
        return in.toUpperCase();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<byte[], byte[], byte[]> replyer) {
        return args -> {
            ProducerRecord<byte[], byte[]> record = new ProducerRecord<>("requestTopic", "foo".getBytes());
            RequestReplyFuture<byte[], byte[], byte[]> future = replyer.sendAndReceive(record);
            RecordMetadata meta = future.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata();
            System.out.println(meta);
            ConsumerRecord<byte[], byte[]> consumerRecord = future.get(10, TimeUnit.SECONDS);
            System.out.println(new String(consumerRecord.value()));
        };
    }

}

spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
  cloud:
    stream:
      bindings:
        input:
          destination: requestTopic
          group: so57380643
        output:
          destination: replyTopic

结果:

requestTopic-0@3
FOO

【讨论】:

  • 谢谢@Gary Russell,我会尽快让你知道
猜你喜欢
  • 2016-06-20
  • 2017-03-26
  • 2018-01-24
  • 1970-01-01
  • 1970-01-01
  • 2019-03-27
  • 2020-10-24
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多