【问题标题】:Spring boot standalone CommandLineRunner won't return with spring-starter-amqpSpring Boot 独立 CommandLineRunner 不会随 spring-starter-amqp 返回
【发布时间】:2019-11-21 21:55:28
【问题描述】:

我正在构建一个经典的生产者 -> rabbitmq -> 消费者流。 所有 3 个节点都运行在单独的 jvm 甚至单独的主机上

Producer 是一个 Spring Boot 命令行运行器应用程序,预计在完成生产后会停止。

Consumer 应用程序是一个 Spring Boot Web 应用程序,它侦听 3 个 rabbitmq 队列(2 个持久队列绑定到直接交换,1 个非持久绑定到扇出交换)

我的启动顺序如下: - 启动rabbitmq - 开始消费者 - 开始制作人

生产者和消费者 amqp 依赖 mvn dependency:tree

[INFO] |  +- org.springframework.boot:spring-boot-starter-amqp:jar:2.1.6.RELEASE:compile
[INFO] |  |  +- org.springframework:spring-messaging:jar:5.1.8.RELEASE:compile
[INFO] |  |  \- org.springframework.amqp:spring-rabbit:jar:2.1.7.RELEASE:compile
[INFO] |  |     +- org.springframework.amqp:spring-amqp:jar:2.1.7.RELEASE:compile
[INFO] |  |     |  \- org.springframework.retry:spring-retry:jar:1.2.4.RELEASE:compile
[INFO] |  |     +- com.rabbitmq:amqp-client:jar:5.4.3:compile
[INFO] |  |     \- org.springframework:spring-tx:jar:5.1.8.RELEASE:compile

生产者代码

/**
 * @author louis.gueye@gmail.com
 */
@RequiredArgsConstructor
@Slf4j
public class PlatformBrokerExampleProducerJob implements CommandLineRunner {

    private final AmqpTemplate template;

    @Override
    public void run(String... args) {
        final Instant now = Instant.now();
        final Instant anHourAgo = now.minus(Duration.ofHours(1));
        final String directExchangeName = "careassist_queues";
        final String fanoutExchangeName = "careassist_schedules_topics";
        IntStream.range(0, 60).boxed().forEach(i -> {
            final SensorEventDto event = SensorEventDto.builder() //
                    .id(UUID.randomUUID().toString()) //
                    .businessId("sens-q7ikjxk1ftik") //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .state(SensorState.on) //
                    .build();
            final String routingKey = "care.events";
            template.convertAndSend(directExchangeName, routingKey, event);
            log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
        });
        IntStream.range(0, 60).boxed().forEach(i -> {
            final SensorEventDto event = SensorEventDto.builder() //
                    .id(UUID.randomUUID().toString()) //
                    .businessId("sens-q7ikjxk1ftik") //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .state(SensorState.off) //
                    .build();
            final String routingKey = "maintenance.events";
            template.convertAndSend(directExchangeName, routingKey, event);
            log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), directExchangeName, routingKey);
        });
        IntStream.range(0, 60).boxed().forEach(i -> {
            final SensorEventDto event = SensorEventDto.builder() //
                    .id(UUID.randomUUID().toString()) //
                    .businessId("sens-q7ikjxk1ftik") //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .state(SensorState.off) //
                    .build();
            final ScheduleDto schedule = ScheduleDto.builder().id(UUID.randomUUID().toString()) //
                    .destination("any.routing.queue") //
                    .message(event) //
                    .timestamp(anHourAgo.plus(Duration.ofMinutes(i))) //
                    .build();
            final String routingKey = "#";
            template.convertAndSend(fanoutExchangeName, routingKey, schedule);
            log.info(">>>>>>>>>>> Sent {} to exchange {} with routing key {}", event.getId(), fanoutExchangeName, routingKey);
        });
    }
}

消费者代码(1 个监听器)

@Component
@RabbitListener(queues = {PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME})
@Slf4j
public class PlatformBrokerExampleCareEventsQueueConsumer {
    public static final String QUEUE_NAME = "care_events";
    @RabbitHandler
    public void onMessage(SensorEventDto event) {
        log.info("<<<<<<<<<<<< Received event [" + event + "] from {}...", PlatformBrokerExampleCareEventsQueueConsumer.QUEUE_NAME);
    }
}

我希望生产者生产然后关闭,但相反,java 进程无限期挂起

任何关于为什么生产者在产生其消息后不会停止的解释将不胜感激。我怀疑它与spring-started-amqp 有关,但我不确定。我当然不需要完整的罐子,只需要包含 AmqpTemplate 的小罐子

注意:消费者收到所有消息

githubproject

感谢您的帮助。

【问题讨论】:

    标签: java spring-boot rabbitmq spring-amqp


    【解决方案1】:

    PlatformBrokerClientConfiguration 绑定队列。但我看不到任何地方可以关闭队列。所以这可能是暂停您的实例的原因。

    请试试这个。

      public static void main(String[] args) {
        System.exit(SpringApplication.exit(SpringApplication.run(EmployeeDataProduceApp.class, args)));
      }
    

    【讨论】:

    • 你好@todaynowork 我认为这与此无关。生产者和消费者可以来来去去,这不会影响它相当于数据库模式(意味着留下)的绑定。生产者可能会停止,但消费者应该继续收听,以防另一个生产者开始生产消息。我会说可能有一个 MessageListenerContainer 由 spring-boot 自动启动(即使我没有看到任何正当理由,因为我没有在生产者中声明任何 @Listener)但我不确定除非我进行更深入的调查
    • 你可以试试这个。 public static void main(String[] args) { System.exit(SpringApplication.exit(SpringApplication.run(EmployeeDataProduceApp.class, args))); }
    • 嘿@todaynowork 感谢您的建议。有效。我仍然不知道挂起的根本原因,但它完全停止了
    • AMQP客户端有一些线程在运行;您可以在跑步者退出后简单地关闭上下文,一切都会干净地关闭...SpringApplication.run(MyApplication.class, args).close();
    • 很高兴知道退出程序的其他方法
    【解决方案2】:

    AMQP 客户端有一些后台线程。

    您应该更改main() 方法以在运行器返回后关闭应用程序上下文...

    public static void main(String[] args) {
        SpringApplication.run(MyApplication.class, args).close();
    }
    

    它会干净利落地关闭一切,不像System.exit()那么残酷。

    【讨论】:

    • 会的。 #TIL。我不知道 amqp 是生产者端的连接协议。有时间我会调查那部分
    • 他有一些线程处理代理的 I/O,并提供发布者确认/返回等的异步传递。
    猜你喜欢
    • 2017-05-12
    • 2017-02-02
    • 1970-01-01
    • 1970-01-01
    • 2014-04-07
    • 2016-01-29
    • 1970-01-01
    • 2015-07-07
    • 2023-04-04
    相关资源
    最近更新 更多