【发布时间】:2011-05-03 13:25:04
【问题描述】:
我有一个 Java 应用程序,其中包含许多通过 JMS (ActiveMQ) 进行通信的组件。目前,应用程序和 JMS Hub 位于同一台服务器上,尽管我们最终计划拆分组件以实现可伸缩性。目前,我们在性能方面遇到了重大问题,似乎都与 JMS 有关,最明显的是,这个问题的重点是向主题发布消息所花费的时间。
我们有大约 50 个动态创建的主题,用于应用程序组件之间的通信。一个组件从表中读取记录并一次处理一条,该处理涉及创建 JMS 对象消息并将其发布到主题之一。此处理无法跟上记录写入源表的速率 ~23/sec,因此我们更改了处理以创建 JMS 对象消息并将其添加到队列中。创建了一个新线程,该线程从该队列中读取并将消息发布到适当的主题。显然这不会加快处理速度,但它确实让我们通过查看队列的大小来了解我们落后了多远。
一天开始时没有消息通过整个系统,这迅速从第一个小时通过集线器的 1560000 条(433 条/秒)消息增加到第三个小时的 2100000 条(582 条/秒),然后停留在那个水平。在第一个小时开始时,从数据库表中读取记录的组件发布的消息保持不变,到那个小时结束时,队列中有 2000 条消息等待发送,到第 3 个小时,队列中有 9000 条消息在里面。
以下是发送 JMS 消息的代码的适当部分,非常感谢任何关于我们做错了什么或如何改进此性能的建议。查看网络上的统计数据 JMS 应该能够轻松处理约 1000-2000 条大消息/秒或约 10000 条小消息/秒。我们的消息每条大约 500 字节,所以我想位于这个规模的中间。
获取发布者的代码:
private JmsSessionPublisher getJmsSessionPublisher(String topicName) throws JMSException {
if (!this.topicPublishers.containsKey(topicName)) {
TopicSession pubSession = (ActiveMQTopicSession) topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQTopic topic = getTopic(topicName, pubSession);
// Create a JMS publisher and subscriber
TopicPublisher publisher = pubSession.createPublisher(topic);
this.topicPublishers.put(topicName, new JmsSessionPublisher(pubSession, publisher));
}
return this.topicPublishers.get(topicName);
}
发送消息:
JmsSessionPublisher jmsSessionPublisher = getJmsSessionPublisher(topicName);
ObjectMessage objMessage = jmsSessionPublisher.getSession().createObjectMessage(messageObj);
objMessage.setJMSCorrelationID(correlationID);
objMessage.setJMSTimestamp(System.currentTimeMillis());
jmsSessionPublisher.getPublisher().publish(objMessage, false, 4, 0);
将消息添加到队列的代码:
List<EventQueue> events = eventQueueDao.getNonProcessedEvents();
for (EventQueue eventRow : events) {
IEvent event = eventRow.getEvent();
AbstractEventFactory.EventType eventType = AbstractEventFactory.EventType.valueOf(event.getEventType());
String topic = event.getTopicName() + topicSuffix;
EventMsgPayload eventMsg = AbstractEventFactory.getFactory(eventType).getEventMsgPayload(event);
synchronized (queue) {
queue.add(new QueueElement(eventRow.getEventId(), topic, eventMsg));
queue.notify();
}
}
线程中的代码从队列中移除项目:
jmsSessionFactory.publishMessageToTopic(e.getTopic(), e.getEventMsg(), Integer.toString(e.getEventMsg().hashCode()));
publishMessageToTopic 执行上面的“发送消息”代码。
如果一致认为 ActiveMQ 可能不是最佳选择,则可以选择其他 JMS 实现。
谢谢,
詹姆斯
【问题讨论】:
-
你在使用事务吗?您对队列有什么配置?他们真的要排队吗?还是话题?这么多的问题 :)。什么版本的 ActiveMQ?独立?在应用服务器上?
-
我们不会明确使用事务,除非它们被默认使用。我们不在集线器上创建任何配置,主题是使用动态创建的: topic = (ActiveMQTopic) this.initialContext.lookup("dynamicTopics/" + topicName);我们使用在独立实例中运行的 ActiveMQ 5.3.2 版。
-
那么这个问题是怎么回事?你解决了吗?