【问题标题】:Camel ActiveMQ how to destroy/purge a virtual topic queue without consumersCamel ActiveMQ如何在没有消费者的情况下销毁/清除虚拟主题队列
【发布时间】:2016-10-25 14:46:13
【问题描述】:

我的应用程序包含许多运行在 JBoss Fuse 6.2.1 中的 OSGi 包。每个包都有一个从 ActiveMQ 端点消耗的 Camel 路由。使用VirtualTopics交换数据。

ProducerBundle 发布到主题VirtualTopic.MyTopic

ConsumerBundle A 从队列 @9​​87654323@ 消费
ConsumerBundle B 从队列 @9​​87654324@ 消费
ConsumerBundle C 从队列消费排队Consumer.C.VirtualTopic.MyTopic

在某个时间消费者 C 关闭,它的捆绑包被卸载并且永远不会回来。但是,消息仍然排入Consumer.C.VirtualTopic.MyTopic 队列。
如何销毁这样的队列?

当队列填满时,ActiveMQ 会暂停生产者,我无法设置一小段时间来处理消息,因为其他消费者可能需要一段时间来处理每条消息。我无法修改 VirtualTopic 结构。我可以完全访问 ActiveMQ 配置和 Camel 路由。
是否有其他选项来处理这种情况?

<!-- producer route -->
<route id="ProducerRoute"/>
    <from uri="direct:trigger"/>
    <to uri="activemq:topic:VirtualTopic.MyTopic"/>
</route>

<!-- each consumer route -->
<route id="ConsumerARoute">
    <from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
    <to uri="bean:myProcessor"/>
</route>

【问题讨论】:

  • 您可以为消息设置生存时间。
  • 如果 Consumer C 包是手动卸载的,也许你也可以手动删除队列(通过 web 控制台或通过 jmx)。
  • @AlexandreCartapanis 如果我设置的 TTL 太小,其他消费者可能会丢失消息;如果它足够大,生产者就会减慢速度,从而减慢整个过程。我希望 ActiveMQ 或 Camel 提供一些事件/触发器/拦截器,而不是自己编写。

标签: apache-camel activemq jbossfuse


【解决方案1】:

我有类似的情况,但我不能使用克劳斯的建议,因为在我的代理中还有其他没有消费者的队列,我不想删除它们。 在我的情况下,我正在运行带有结构的 JBoss Fuse 6.1.0(我认为这与较新版本的 Fuse 相同):我刚刚删除了消费者(在我的情况下,我刚刚删除了消费者的配置文件),然后我使用 hawtio 控制台中的删除按钮删除了队列。

【讨论】:

  • 你是对的,我的错 :) 现在我要添加一个更重要的答案。
【解决方案2】:

我选择了激进的解决方案:我挂钩到 OSGi 捆绑生命周期,当它停止时,我使用 JMX MBeanServer 来销毁现在不需要的队列。

因为我的包是使用蓝图管理的,所以我选择了一个带有销毁方法的 bean。
这是一个示例实现:

我的豆

package org.darugna.osgi;

import javax.management.MBeanServer;
import javax.management.ObjectName;

public class QueueDestroyer {

    private static final String[] QUEUES_TO_DESTROY = {
        "Consumer.A.VirtualTopic.MyTopic"
    };

    private MBeanServer mBeanServer;

    public void setMbeanServer(MBeanServer mBeanServer) {
        this.mBeanServer = mBeanServer;
    }

    public void destroy() throws Exception {
        ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=amq");
        for (String queueName : QUEUES_TO_DESTROY) {
            Object returnValue = mBeanServer.invoke(brokerName,
                    "removeQueue",
                    new Object[]{queueName},
                    new String[]{String.class.getName()});
        }
    }

}

蓝图.xml

<blueprint>

    <reference id="mbeanServer" interface="javax.management.MBeanServer" 
               availability="mandatory"/>

    <bean id="queueDestroyer" class="org.darugna.osgi.QueueDestroyer" 
          destroy-method="destroy">
        <property name="mbeanServer" ref="mbeanServer"/>
    </bean>

    <camelContext>
        <route>
            <from uri="activemq:Consumer.A.VirtualTopic.MyTopic"/>
            <to uri="bean:myProcessor"/>
        </route>
    <camelContext>

</blueprint>    

【讨论】:

    【解决方案3】:

    查看 Apache ActiveMQ 文档如何删除非活动队列/主题,例如:http://activemq.apache.org/delete-inactive-destinations.html

    【讨论】:

    • 谢谢@Claus 我正在考虑那个解决方案,但链接显示:“if 它们是 empty 30 秒”。如果消费者关闭后队列收到消息,问题依然存在。我看到我可以配置这样的超时,我再试一次。
    猜你喜欢
    • 2020-12-01
    • 2018-03-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-06-27
    • 1970-01-01
    • 2020-09-06
    • 2016-05-16
    相关资源
    最近更新 更多