【问题标题】:ActiveMQ throttling consumerActiveMQ 节流消费者
【发布时间】:2012-12-24 21:20:58
【问题描述】:

我想在 hornetq 中对 activeMQ 中某个队列的消费者进行节流(对于 jboss,这是通过 mdb 消费者定义上的注释来实现的)。我在activemq的文档中找不到任何类似的东西,我找到的最接近的是这个

consumer.recvDelay   0 ms    Pause consumer for recvDelay milliseconds with each message (allows consumer throttling).

来自:http://activemq.apache.org/activemq-performance-module-users-manual.html

但是我找不到如何在 java 中做到这一点。

提前致谢,

问候。

编辑:这是 ActiveMQManager 代码和使用者代码:

public class ActiveMQManager {

    private static ActiveMQConnectionFactory CONNECTION_FACTORY;

    public static Connection CONNECTION;

    public static Session SESSION;

    public static Destination TEST_QUEUE;

    public static void start() {
        try {

            CONNECTION_FACTORY = new ActiveMQConnectionFactory("vm://localhost");

            CONNECTION = CONNECTION_FACTORY.createConnection();
            CONNECTION.start();

            SESSION = CONNECTION.createSession(false,
                    Session.CLIENT_ACKNOWLEDGE);

            TestClient testClient = new TestClient();

            TEST_QUEUE = SESSION.createQueue("TEST.QUEUE");

            MessageConsumer testConsumer = SESSION.createConsumer(TEST_QUEUE);
            test.setMessageListener(testClient);

        } catch (Exception e) {
        }
    }

    public static void stop() {
        try {
            // Clean up
            SESSION.close();
            CONNECTION.close();
        } catch (JMSException e) {
            log.error(e);
        }
    }
}

消费者代码非常简单(本例):

public class TestConsumer implements MessageListener {

    @Override
    public void onMessage(Message message) {
        //Do something with the message
    }

}

【问题讨论】:

  • 你能发布你的消费者代码/配置吗?
  • 好的,我会把它添加到问题中。

标签: java activemq


【解决方案1】:

使用 ActiveMQ,您可以设置消费者预取限制:http://activemq.apache.org/what-is-the-prefetch-limit-for.html

要配置它,您可以使用连接 URL(大多数配置可以使用 URL 完成)或 Java API。

更多有趣的参数:http://activemq.apache.org/connection-configuration-uri.html

【讨论】:

  • 预取限制只控制消费者发送/缓冲消息的算法,不控制消费速率...
【解决方案2】:

这取决于所使用的消费者技术...但这里有一些选择

  • 您可以在消费者代码中手动引入延迟(不是一门精确的科学,但这会限制吞吐量)

  • 您还可以通过设置 JMS 连接的 maxConcurrentConsumers 属性来控制使用者使用的线程数...也就是说,这不会限制消息吞吐量,只是限制您使用的并发级别消费者

  • 更好的是,您可以使用限制器 EIP 实现来设置每个时间段要消耗的确切消息数

    例如,使用 Camel Throttler 很简单

    from("activemq:queueA").throttle(10).to("activemq:queueB")

【讨论】:

  • 是的,我们在线程中使用 sleep 来进行延迟,这很丑陋,因为我为这个问题加注了星标,骆驼似乎很有趣,但我们没有时间(现在)实施该解决方案,但将来也许我们会这样做。
  • 无论如何,谢谢你的答案,继续寻找解决方案会有所帮助
  • 好的。感谢您的 cmets boday,无论如何我仍然无法证明“骆驼”解决方案,但似乎是最好的答案,恭喜,您有赏金 :)
  • @boday 您的解决方案是否在不设置预取限制的情况下工作?我不这么认为,不知道答案是否应该修改。
  • @FritzDuchardt - 它无需显式设置预取即可工作(队列默认为 1k)...您绝对可以更改它并查看不同的吞吐量,但不需要更改它来限制来自队列等的消费者。
【解决方案3】:

考虑到 Camel Throttler 在交换被 Throttler 阻止时将它们保存在内存中。因此,如果您有 100 个队列消费者并且服务器(公开 SOAP 服务)很慢,那么您的内存中可能有多达 100 个交换!

为此发布了这个问题:Throttle consumption rate of all JMS consumers listening on an ActiveMQ queue

【讨论】:

  • 欢迎来到 SO。您的陈述:“我想知道是否有一个 ActiveMQ 功能可以在 ActiveMQ 级别上施加节流要求。” 似乎是一个问题。如果现有答案没有帮助,请提出一个新问题。或者,如果我误解了您的回答,请考虑改写。
猜你喜欢
  • 2014-08-06
  • 1970-01-01
  • 2012-04-18
  • 2021-12-02
  • 1970-01-01
  • 1970-01-01
  • 2021-03-06
  • 2016-06-08
  • 2014-06-27
相关资源
最近更新 更多