【问题标题】:Can ActiveMQ Artemis play undelivered messages to STOMP clients?ActiveMQ Artemis 可以向 STOMP 客户端播放未传递的消息吗?
【发布时间】:2020-10-04 03:30:42
【问题描述】:

如果我正在运行一个将persistence-enabled 设置为true 的代理,它已将一些消息接收到队列中,有没有办法将其配置为在消息到达后将消息发送到连接的新STOMP 客户端?

这里的想法是我的队列工作人员可能已经停止,我希望他们恢复在他们不运行期间积累的任何工作。

目前,当我的 STOMP 消费者连接到队列时,缓冲的消息最终不会被处理。 “缓冲”是指生产者已将消息写入队列,而没有连接消费者。继续这种情况,当我的消费者连接时,他们能够看到消息,但只能看到新消息。任何先前的消息都不会最终发送给消费者。

代理配置

<?xml version='1.0'?>
<configuration xmlns="urn:activemq" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xi="http://www.w3.org/2001/XInclude" xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

    <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:activemq:core ">

        <name>0.0.0.0</name>

        <persistence-enabled>true</persistence-enabled>

        <!-- this could be ASYNCIO, MAPPED, NIO
            ASYNCIO: Linux Libaio
            MAPPED: mmap files
            NIO: Plain Java Files
        -->
        <journal-type>ASYNCIO</journal-type>

        <paging-directory>data/paging</paging-directory>
        <bindings-directory>data/bindings</bindings-directory>
        <journal-directory>data/journal</journal-directory>
        <large-messages-directory>data/large-messages</large-messages-directory>

        <journal-datasync>true</journal-datasync>
        <journal-min-files>2</journal-min-files>
        <journal-pool-files>10</journal-pool-files>
        <journal-device-block-size>4096</journal-device-block-size>
        <journal-file-size>10M</journal-file-size>

        <!--
        This value was determined through a calculation.
        Your system could perform 50 writes per millisecond
        on the current journal configuration.
        That translates as a sync write every 20000 nanoseconds.

        Note: If you specify 0 the system will perform writes directly to the disk.
                We recommend this to be 0 if you are using journalType=MAPPED and journal-datasync=false.
        -->
        <journal-buffer-timeout>20000</journal-buffer-timeout>

        <!--
        When using ASYNCIO, this will determine the writing queue depth for libaio.
        -->
        <journal-max-io>4096</journal-max-io>
        <!--
        You can verify the network health of a particular NIC by specifying the <network-check-NIC> element.
            <network-check-NIC>theNicName</network-check-NIC>
        -->

        <!--
        Use this to use an HTTP server to validate the network
            <network-check-URL-list>http://www.apache.org</network-check-URL-list> -->

        <!-- <network-check-period>10000</network-check-period> -->
        <!-- <network-check-timeout>1000</network-check-timeout> -->

        <!-- this is a comma separated list, no spaces, just DNS or IPs
            it should accept IPV6

            Warning: Make sure you understand your network topology as this is meant to validate if your network is valid.
                    Using IPs that could eventually disappear or be partially visible may defeat the purpose.
                    You can use a list of multiple IPs, and if any successful ping will make the server OK to continue running -->
        <!-- <network-check-list>10.0.0.1</network-check-list> -->

        <!-- use this to customize the ping used for ipv4 addresses -->
        <!-- <network-check-ping-command>ping -c 1 -t %d %s</network-check-ping-command> -->

        <!-- use this to customize the ping used for ipv6 addresses -->
        <!-- <network-check-ping6-command>ping6 -c 1 %2$s</network-check-ping6-command> -->

        <!-- how often we are looking for how many bytes are being used on the disk in ms -->
        <disk-scan-period>5000</disk-scan-period>

        <!-- once the disk hits this limit the system will block, or close the connection in certain protocols
            that won't support flow control. -->
        <max-disk-usage>90</max-disk-usage>
        <!-- should the broker detect dead locks and other issues -->
        <critical-analyzer>true</critical-analyzer>
        <critical-analyzer-timeout>120000</critical-analyzer-timeout>
        <critical-analyzer-check-period>60000</critical-analyzer-check-period>
        <critical-analyzer-policy>HALT</critical-analyzer-policy>
        <page-sync-timeout>1020000</page-sync-timeout>

        <acceptors>
            <acceptor name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;amqpMinLargeMessageSize=102400;protocols=CORE;useEpoll=true;amqpCredits=1000;amqpLowCredits=300;amqpDuplicateDetection=true</acceptor>
            <acceptor name="stomp">tcp://0.0.0.0:61613?stompEnableMessageId=true;tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=STOMP;useEpoll=true</acceptor>
        </acceptors>

        <connectors>
            <connector name="global">tcp://172.17.0.1:61616</connector>
            <connector name="s">tcp://172.17.0.1:61617</connector>
        </connectors>

        <cluster-user>cluster</cluster-user>
        <cluster-password>REDACTED</cluster-password>

        <cluster-connections>
            <cluster-connection name="multi-region">
                <connector-ref>global</connector-ref>
                <message-load-balancing>ON_DEMAND</message-load-balancing>
                <static-connectors>
                    <connector-ref>s</connector-ref>
                </static-connectors>
            </cluster-connection>
        </cluster-connections>

        <addresses>
            <address name="/queue/global.regional">
                <multicast>
                    <queue name="/queue/global.regional">
                        <durable>true</durable>
                    </queue>
                </multicast>
            </address>
        </addresses>

        <address-settings>

            <!-- if you define auto-create on certain queues, management has to be auto-create -->
            <address-setting match="activemq.management#">
                <dead-letter-address>DLQ</dead-letter-address>
                <expiry-address>ExpiryQueue</expiry-address>
                <redelivery-delay>0</redelivery-delay>
                <!-- with -1 only the global-max-size is in use for limiting -->
                <max-size-bytes>-1</max-size-bytes>
                <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <address-full-policy>PAGE</address-full-policy>
                <auto-create-queues>true</auto-create-queues>
                <auto-create-addresses>true</auto-create-addresses>
                <auto-create-jms-queues>true</auto-create-jms-queues>
                <auto-create-jms-topics>true</auto-create-jms-topics>
            </address-setting>

            <!--default for catch all-->
            <address-setting match="#">
                <!-- <dead-letter-address>DLQ</dead-letter-address>
                <expiry-address>ExpiryQueue</expiry-address>
                <redelivery-delay>0</redelivery-delay> -->
                <!-- with -1 only the global-max-size is in use for limiting -->
                <!-- <max-size-bytes>-1</max-size-bytes>
                <message-counter-history-day-limit>10</message-counter-history-day-limit>
                <address-full-policy>PAGE</address-full-policy>
                <auto-create-queues>true</auto-create-queues>
                <auto-create-addresses>true</auto-create-addresses>
                <auto-create-jms-queues>true</auto-create-jms-queues>
                <auto-create-jms-topics>true</auto-create-jms-topics> -->
            </address-setting>
        </address-settings>

        <security-settings>
            <security-setting match="#">
                <permission type="createNonDurableQueue" roles="amq"/>
                <permission type="deleteNonDurableQueue" roles="amq"/>
                <permission type="createDurableQueue" roles="amq"/>
                <permission type="deleteDurableQueue" roles="amq"/>
                <permission type="createAddress" roles="amq"/>
                <permission type="deleteAddress" roles="amq"/>
                <permission type="consume" roles="amq"/>
                <permission type="browse" roles="amq"/>
                <permission type="send" roles="amq"/>
                <!-- we need this otherwise ./artemis data imp wouldn't work -->
                <permission type="manage" roles="amq"/>
            </security-setting>
        </security-settings>

    </core>
</configuration>

【问题讨论】:

  • 是的,目前当我的 stomp 消费者连接到队列时,缓冲的消息最终不会被处理。
  • 没有太多真正要分享的东西,因为我使用的是 Laravel 的队列抽象和与之集成的可插入队列系统后端(Enqueue)。您主要关注的是框架代码与我编写的代码。
  • 我所说的缓冲是指生产者已将消息写入队列而没有连接消费者。继续这种情况,当我的消费者连接时,他们能够看到消息,但只能看到新消息。任何先前的消息都不会最终发送给消费者。
  • 不管怎样,这里是我正在使用的 Enqueue 库的 STOMP 插件:github.com/php-enqueue/enqueue-dev/tree/master/pkg/stomp
  • multicast 更改为anycast 非常重要。我在回答中进一步解释了。如果您需要任播语义,那么通过更改配置应该不会产生“不利影响”。

标签: queue stomp activemq-artemis


【解决方案1】:

根据描述,听起来您的客户正在使用通常称为“发布/订阅”的语义。在 ActiveMQ Artemis 文档和配置中,这称为“多播”语义。当使用 pub/sub 语义时,消费者(即订阅者)仅在他们连接到目的地之后(即在他们创建订阅之后)收到发送的消息。

在 JMS 之类的东西中,这些语义由客户端是使用队列还是主题来控制。然而,STOMP 规范只定义了一个没有特定语义的通用“目的地”。在“协议概述”部分它指出:

STOMP 服务器被建模为一组可以向其发送消息的目的地。 STOMP 协议将目的地视为不透明的字符串,并且它们的语法是特定于服务器实现的。此外,STOMP 没有定义目的地的交付语义应该是什么。目的地的交付或“消息交换”语义可能因服务器而异,甚至因目的地而异。这让服务器可以利用 STOMP 支持的语义来发挥创造力。

听起来你真正想要的是 anycast 语义而不是多播。 STOMP chapter in the ActiveMQ Artemis documentation 涵盖了为动态创建的地址和队列控制这些语义的几种方法。但是,由于您使用的是静态创建的地址和队列,我建议您直接使用:

        <addresses>
            <address name="/queue/global.regional">
                <anycast>
                    <queue name="/queue/global.regional"/>
                </anycast>
            </address>
        </addresses>

您当前的配置定义了一个非常罕见的静态创建的多播队列。 STOMP 客户端使用的地址上的多播队列表示该客户端的订阅。一般来说,当您需要手动控制 STOMP 客户端的持久订阅时,您只想静态创建多播队列。这将在"Durable Subscriptions" section of the STOMP chapter in the ActiveMQ Artemis documentation 中进一步讨论。

【讨论】:

  • 一如既往,衷心感谢您提供的惊人见解!
  • 哈哈,是的!感谢您的 ping,在写完最后一条评论后完全忘记了。这是忙碌的一天! #derp
猜你喜欢
  • 2021-06-04
  • 1970-01-01
  • 2011-12-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-11-21
  • 2012-07-15
  • 1970-01-01
相关资源
最近更新 更多