【问题标题】:Java monitoring activemq but without polling the queueJava监控activemq但不轮询队列
【发布时间】:2015-04-07 17:54:09
【问题描述】:

我需要编写一些程序来监视 java 中的 activemq 队列。这意味着我需要记录消息何时在队列中入队以及消息何时出队。我的程序不能发送消息或接收消息,它只需要记录。

我发现要推送消息和接收消息,但这不是我想要做的,只需记录外部进程是否将消息放入队列或从队列中取出。

为了更清楚,我画了一张图

我使用 apache camel 进行集成, 我的路由构建器看起来像

public void configure() throws Exception {
        Processor queueProcessor = new QueueProcessor();

        from("activemq:queue:KBC").process(queueProcessor);
    }

它调用followwing处理器

@Override
    public void process(Exchange exchange) throws Exception {
        Trax_EventDao dao = new Trax_EventDao();
        dao.insert(new Trax_Event("Queue",exchange.getExchangeId(),"UP","KBC", new Time(new Date().getTime())));
    }

dao 处理数据库连接并插入记录

实际的问题是,当我将消息推送到队列中并且程序运行时,消息被记录下来,这是可以的,但它也会立即被轮询,这是不行的。 如何在不轮询消息的情况下进行插入?

【问题讨论】:

    标签: java apache-camel activemq


    【解决方案1】:

    您可以使用 ActiveMQ 咨询消息来监控队列活动...

    http://activemq.apache.org/advisory-message.html

    【讨论】:

      【解决方案2】:

      我最后做的是编写一个自己的运行器类,它使用队列浏览器。

      我想用这门课做的是

      1. 与 avtivemq 建立连接并启动它
      2. 创建一个无限循环,控制指定的队列。我有一个队列中的项目列表。在每个循环中,我都会检查这个
      3. 如果列表大于队列的大小,则有项目出队。这意味着我需要循环它并检查哪些项目已出队。 否则,我会循环队列的枚举,如果元素尚不存在,则将元素添加到列表中

        package queueFeed;
        
        import dao.ProcmonDao;
        import dao.EventDao;
        import domain.Event;
        import org.apache.activemq.ActiveMQConnectionFactory;
        import org.apache.activemq.command.ActiveMQQueue;
        
        import javax.jms.*;
        import java.sql.SQLException;
        import java.sql.Time;
        import java.util.*;
        
        public class QueueRunner {
            private ProcmonDao dao;
            private Connection connection;
            private String queueName;
        
            public QueueRunner() throws SQLException {
                dao = new EventDao();
            }
        
        public void setConnection(String username, String password, String url) throws JMSException {
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(username, password, url);
            connection = factory.createConnection();
        }
        
        public void run() throws Exception {
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            QueueBrowser browser = session.createBrowser(new ActiveMQQueue(queueName));
        
            List<String> ids = new ArrayList<>();
            int queueSize = 0;
            int counter = 0;
            connection.start();
        
            while (true) {
                Enumeration enumeration = browser.getEnumeration();
                if (queueSize < ids.size()) {
                    while (enumeration.hasMoreElements()) {
                        Message message = (Message) enumeration.nextElement();
                        ids.remove(message.getJMSMessageID());
                        counter++;
                    }
        
                    if (ids.size() > 0 && ids.size() > 0) {
                        Iterator<String> iterator = ids.iterator();
                        while (iterator.hasNext()) {
                            String messageId = iterator.next();
                            dao.insert(new Event("Queue", messageId, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime())));
                            iterator.remove();
                        }
                    }
        
                    queueSize = counter;
                    counter = 0;
                } else {
        
                    while (enumeration.hasMoreElements()) {
                        counter++;
                        Message message = (Message) enumeration.nextElement();
                        String id = message.getJMSMessageID();
                        if (!ids.contains(id)) {
                            ids.add(id);
                            dao.insert(new Event("Queue", id, "UP", browser.getQueue().getQueueName(), new Time(new Date().getTime())));
                        }
                    }
                    queueSize = counter;
                    counter = 0;
                }
            }
        }
        
        public void setQueueName(String queueName) {
            this.queueName = queueName;
        }
        
        public String getQueueName() {
            return this.queueName;
        }
        

        }

      这还不够完美。我认为其中存在一个小逻辑问题。

      【讨论】:

        猜你喜欢
        • 2018-10-29
        • 2018-01-30
        • 2011-12-27
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-07-03
        • 1970-01-01
        相关资源
        最近更新 更多