【问题标题】:Does ActiveMQ Artemis support updating scheduled messages in a last value queue?ActiveMQ Artemis 是否支持更新最后一个值队列中的预定消息?
【发布时间】:2022-02-10 00:02:31
【问题描述】:

在我对 Artemis LastValueQueue code 的测试和审查中,似乎消息的调度延迟优先于其对“last-value-key”的评估。换句话说,如果您安排了一条消息,则仅在准备传递时评估它以替换队列中的最后一个值。

我的问题是我是否正确理解了代码,如果是,是否有解决方法或 ActiveMQ / Artemis 的功能可能有助于满足我们的要求。

我们的要求如下:

  1. 生成一条消息,并将该消息的处理延迟到未来的某个时间点(通常为 30 秒)。
  2. 如果由于新的外部事件而生成了更新版本的消息,请将任何现有的预定消息替换为新版本的消息 - 除了消息有效负载之外,还应更新预定传递时间。李>

其他一些注意事项:

  • 我目前的原型使用的是 Artemis 嵌入式服务器
  • Spring-jms JmsTemplate 用于生成消息
  • Spring-jms JmsListenerContainerFactory 用于消费消息
  • 我们目前不使用 SpringBoot,因此您将在下面看到一些 bean 设置。

ArtemisConfig.java:

@Configuration
@EnableJms
public class ArtemisConfig {

    @Bean
    public org.apache.activemq.artemis.core.config.Configuration configuration() throws Exception {
        org.apache.activemq.artemis.core.config.Configuration config = new ConfigurationImpl();
        config.addAcceptorConfiguration("in-vm", "vm://0");
        config.setPersistenceEnabled(true);
        config.setSecurityEnabled(false);
        config.setJournalType(JournalType.ASYNCIO);
        config.setCreateJournalDir(true);
        config.setJournalDirectory("/var/mq/journal");
        config.setBindingsDirectory("/var/mq/bindings");
        config.setLargeMessagesDirectory("/var/mq/large-messages");
        config.setJMXManagementEnabled(true);

        QueueConfiguration queueConfiguration = new QueueConfiguration("MYLASTVALUEQUEUE");
        queueConfiguration.setAddress("MYLASTVALUEQUEUE");
        queueConfiguration.setLastValueKey("uniqueJobId");
        queueConfiguration.setDurable(true);
        queueConfiguration.setEnabled(true);
        queueConfiguration.setRoutingType(RoutingType.ANYCAST);

        CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
        coreAddressConfiguration.addQueueConfiguration(queueConfiguration);

        config.addAddressConfiguration(coreAddressConfiguration);

        return config;
    }

    @Bean
    public EmbeddedActiveMQ artemisServer() throws Exception {
        EmbeddedActiveMQ server = new EmbeddedActiveMQ();
        server.setConfiguration(configuration());
        server.start();

        return server;
    }

    @PreDestroy
    public void preDestroy() throws Exception {
        artemisServer().stop();
    }

    @Bean
    public ConnectionFactory activeMqConnectionFactory() throws Exception {
        return ActiveMQJMSClient.createConnectionFactory("vm://0", "artemis-client");
    }


    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory() throws Exception {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMqConnectionFactory());
        factory.setSessionTransacted(true);
        factory.setConcurrency("8");
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @Bean
    public JmsTemplate jmsTemplate() throws Exception {
        JmsTemplate jmsTemplate = new JmsTemplate(activeMqConnectionFactory());
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        jmsTemplate.setDeliveryPersistent(true);

        return jmsTemplate;
    }

    @Bean
    QueueMessageService queueMessageService() {
        return new QueueMessageService();
    }
}

QueueMessageService.java

public class QueueMessageService {
    @Resource
    private JmsTemplate jmsTemplate;

    public void queueJobRequest(
            final String queue,
            final int priority,
            final long deliveryDelayInSeconds,
            final MyMessage message) {

        jmsTemplate.convertAndSend(queue, jobRequest, message -> {
            message.setJMSPriority(priority);
            if (deliveryDelayInSeconds > 0 && deliveryDelayInSeconds <= 86400) {
                message.setLongProperty(
                        Message.HDR_SCHEDULED_DELIVERY_TIME.toString(),
                        Instant.now().plus(deliveryDelayInSeconds, ChronoUnit.SECONDS).toEpochMilli()
                );
            }
            message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "uniqueJobId");
            message.setStringProperty("uniqueJobId", jobRequest.getUniqueJobId().toString());
            return message;
        });
    }
}

【问题讨论】:

    标签: java jms spring-jms activemq-artemis


    【解决方案1】:

    您对带有最后值队列的调度消息语义的理解是正确的。当一条消息被调度时,它在技术上还没有在队列中。直到预定时间到达最后值队列语义被强制执行时,它才会被放入队列。

    没有实现新功能我看不出如何以任何自动方式实现所需的行为。此时我的建议是使用管理 API(即QueueControl)在发送“新”预定消息之前手动删除“旧”预定消息。您可以为此使用removeMessage 方法之一,因为它们可以处理预定消息和非预定消息。

    【讨论】:

      猜你喜欢
      • 2014-08-24
      • 2012-03-25
      • 2019-10-09
      • 2021-09-02
      • 2018-07-27
      • 1970-01-01
      • 1970-01-01
      • 2018-01-09
      • 2019-09-28
      相关资源
      最近更新 更多