【问题标题】:Lost message when stoping and restarting Embedded ActiveMQ停止和重新启动 Embedded ActiveMQ 时丢失消息
【发布时间】:2017-09-02 06:41:37
【问题描述】:

您好,我正在尝试编写一个测试用例来实现对 activeMQ 的故障转移支持。

这里是代码

val brokerA = createBroker("A")
brokerA.start
val failoverUrl = s"failover:(vm://BrokerA?create=false)" +
s"?randomize=false&maxReconnectAttempts=-1&reconnectSupported=true"


val cFactory = new ActiveMQConnectionFactory(failoverUrl)
val qConnection = getQueueConnection
val session = createQueueSession(qConnection)

private def totalReadMessagesCount(queueReceiver: QueueReceiver) = {
val messages = Iterator.continually(Option(queueReceiver.receive(2000))).takeWhile(_.isDefined).flatten.toSeq
messages.size
}

private def getReceiver = {
val queueConnection = getQueueConnection
queueConnection.start()
val queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE)
val queueReceiver = createQueueReceiver(queueSession, brokerA.getBrokerName)
queueReceiver
}

def getQueueConnection =cFactory.createQueueConnection("admin", "")

def createBroker(name:String) = {
val broker = new BrokerService()
val adaptor = new KahaDBPersistenceAdapter()
broker.setBrokerName("Broker" + name)
broker.addConnector(getBrokerUrl)
broker.setPersistent(true)
broker.setUseJmx(false)
broker.setUseShutdownHook(false)
broker
}

def getBrokerUrl =  "tcp://localhost:0"


val queueReceiver: QueueReceiver = getReceiver
val messageCount = 500
(1 to messageCount) map {count =>
  //Calling method to send message to ActiveMQ
  if(count == 200){
    brokerA.stop()
    brokerA.waitUntilStopped()
    brokerA.start(true)
  }
}
val totalCount = totalReadMessagesCount(queueReceiver)
println(s"Read ${totalCount} messages")
assert(totalCount == messageCount)

重启后我可以重新连接 activeMQ,但totalCount 显示的是 300 而不是 500。之前的消息似乎丢失了。但是,当我在非嵌入式模式下运行相同的场景时。我能够收到所有消息。

请帮助我如何防止在重新启动嵌入式活动 mq 时丢失任何消息。

【问题讨论】:

  • 您需要有 2(两个)activeMQ 实例来测试故障转移。故障转移意味着如果客户端无法与给定服务器通信,它将尝试连接字符串中服务器列表中的下一个。您的连接字符串中只有一台服务器。请参阅此处的文档:activemq.apache.org/failover-transport-reference.html

标签: scala apache jms activemq


【解决方案1】:

你必须将persistent设置为true,我不知道scala但这里是java代码

public BrokerService broker() throws Exception {
    final BrokerService broker = new BrokerService();
    //broker.addConnector("tcp://localhost:61616");
    broker.addConnector("stomp://localhost:61613");
    broker.addConnector("vm://localhost");
    PersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
    File dir = new File(System.getProperty("user.home") + File.separator + "kaha");
    if (!dir.exists()) {
        dir.mkdirs();
    }
    persistenceAdapter.setDirectory(dir);
    broker.setPersistenceAdapter(persistenceAdapter);
    broker.setPersistent(true);
    return broker;
}

【讨论】:

  • 我刚刚更新了我的示例。我已经坚持到真实并添加了 KahaDB。但现在我的计数是 305。有什么建议吗?
  • 您已经创建了一个 KahaDBPersistenceAdapter 的实例,但您没有将它设置到代理没有??像这个 broker.setPersistenceAdapter(adaptor);
猜你喜欢
  • 1970-01-01
  • 2016-10-27
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-12-15
  • 2020-03-24
  • 2011-08-06
相关资源
最近更新 更多