【问题标题】:Handling wildcards in CompositeQueues - ActiveMQ xml configuration file处理 CompositeQueues 中的通配符 - ActiveMQ xml 配置文件
【发布时间】:2019-08-24 17:28:53
【问题描述】:

我想就我在 ActiveMQ 中的一个主题征求意见。 ???

我正在使用 ActiveMQ 5.15。我正在尝试修改 xml 配置文件以使用转发到另一个队列/主题的 CompositeQueues 添加虚拟目标。从ActiveMQ documentation for this component,架构如下:

<compositeQueue name="IncomingOrders"> 
   <forwardTo>
    <topic physicalName="Notifications" />
   </forwardTo>
 </compositeQueue>

我已经能够转发来自现有队列的消息,例如 request.typeA.classC 。但是,我有几个使用相同前缀 request.typeA. 的队列,因此我的意图是使用通配符,不必为具有该前缀的每个现有队列定义复合队列,并使其更容易维护。

我需要这样的东西:

<compositeQueue name="request.typeA.>"> 
   <forwardTo>
    <topic physicalName="Notifications" />
   </forwardTo>
 </compositeQueue>

但是那段代码不起作用,我怀疑这是因为它根本不受支持(至少目前还不支持)。我已尝试在 physicalName 属性中成功使用通配符,但在 name 中没有。

  • 我的一个先决条件是我必须保留使用相同前缀的不同队列(不能将它们合并为一个)。

  • 我的另一个先决条件是我不能通过代码动态创建新的队列/主题(由于服务器权限)。所以才有兴趣修改xml配置文件。

所以我想知道你们中是否有人知道是否可以在 name 属性中使用通配符(我没有在文档中阅读任何证据),如果可以,我怎么能做。如果您确定使用当前的 ActiveMQ 版本无法做到这一点,我会感谢您确认。

如果您能提出其他替代方案/建议,我将不胜感激,因为我打算达到相同的目的,并且满足我之前提到的先决条件。我还阅读了有关Mirrored Queues 的信息,但这是一个影响所有现有队列的设置(我只是对其中的一小部分感兴趣)并且可能会对性能产生相当大的影响。

非常感谢您抽出宝贵的时间并致以诚挚的问候。 ????

【问题讨论】:

    标签: java activemq virtual-topic


    【解决方案1】:

    我使用的是 5.15.11,我看到通配符适用于复合队列。这是我的复合队列配置。

     <compositeQueue name="email.>">
       <forwardTo>
         <queue physicalName="MY_MAIN_QUEUE" />
       </forwardTo>
     </compositeQueue>
    

    【讨论】:

    • 感谢您的回复,但不幸的是,当我尝试它时,它不起作用.. :( 我终于找到了一种解决方法,它可以使用镜像队列和扩展插件,尽管它更难维护。
    【解决方案2】:

    最后我找到了一种解决方法,它允许我为给定前缀的队列子集创建 MirroredQueues。

    我所做的是创建自己的 DestinationInterceptor,以便仅为我感兴趣的那些队列创建一个镜像队列,并排除其余队列(因为默认的 MirroredQueue 实现镜像了系统中创建的所有队列)。

    我是怎么做到的。我将 MirroredQueue.java 类实现从库中复制到一个名为 CustomMirroredQueue 的新类中,并向名为 mirroring 的类添加了一个新属性。我从接口 DestinationInterceptor 修改了 intercept(final Destination destination) 实现,考虑到了 if 语句中的这个新属性(我为此创建了一个辅助方法 DestinationInterceptor em>isPrefixMirrored):

    /*
    * This method is responsible for intercepting all the queues/topics that are created in the system.
    * In this particular case we are interested only in the queues, in order we can mirror *some* of them and get
    * a copy of the messages that are sent to them (With the topics this mirroring is not necessary since we would just
    * subscribe to that topic for receiving the same message).
    * */
    public Destination intercept(final Destination destination) {
        if (destination.getActiveMQDestination().isQueue()) {
            if (isPrefixMirrored(destination) && (!destination.getActiveMQDestination().isTemporary() || brokerService.isUseTempMirroredQueues())) {
                try {
                    //we create a mirrored queue for that destination
                    final Destination mirrorDestination = getMirrorDestination(destination);
                    if (mirrorDestination != null) {
                        return new DestinationFilter(destination) {
                            public void send(ProducerBrokerExchange context, Message message) throws Exception {
                                message.setDestination(mirrorDestination.getActiveMQDestination());
                                mirrorDestination.send(context, message);
    
                                if (isCopyMessage()) {
                                    message = message.copy();
                                }
                                message.setDestination(destination.getActiveMQDestination());
                                message.setMemoryUsage(null); // set this to null so that it will use the queue memoryUsage instance instead of the topic.
                                super.send(context, message);
                            }
                        };
                    }
                } catch (Exception e) {
                    LOG.error("Failed to lookup the mirror destination for: {}", destination, e);
                }
            }
        }
        return destination;
    }
    
    /*
    * @returns true if the destination passed as parameter will be mirrored. If the value for the attribute "mirroring"
    * is an empty string "" then all the queues will be mirrored by default.
    **/
    private boolean isPrefixMirrored(Destination destination) {
        if (mirroring.equals("")) {
            return true;
        }
        List<String> mirroredQueuesPrefixes = Arrays.asList(mirroring.split(","));
        final String destinationPhysicalName = destination.getActiveMQDestination().getPhysicalName();
        return mirroredQueuesPrefixes.stream().map(String::trim).anyMatch(destinationPhysicalName::contains);
    }
    

    我生成了一个仅包含此自定义类和依赖项(为此使用 gradle)的 .jar,并添加到 ActimeMQ 代理安装中的 lib 文件夹中。然后我可以在 ActiveMQ 配置 XML 文件中将此标记用作 bean

    <destinationInterceptors>
        <bean xmlns="http://www.springframework.org/schema/beans" class="package.CustomMirroredQueue" id="CustomMirroredQueue">
            <property name="copyMessage" value="true"/>
            <property name="postfix" value=""/>
            <property name="prefix" value="mirror."/>
            <property name="mirroring" value="PREFIX_1, QUEUE2, QUEUE3"/>
        </bean>
    </destinationInterceptors>
    

    该类必须包含该类在 ActiveMQ 库文件夹中的路径。属性 copyMessage、postfix 和 prefix 来自默认 MirroredQueue 实现。并且镜像属性将是一个列表,其中包含要镜像的所有特定队列/前缀(并且只有那些)。

    【讨论】:

      猜你喜欢
      • 2015-07-24
      • 1970-01-01
      • 2020-08-10
      • 2011-05-15
      • 1970-01-01
      • 2010-09-07
      • 2011-12-18
      • 2018-02-18
      • 2011-08-23
      相关资源
      最近更新 更多