【发布时间】:2019-06-14 12:35:45
【问题描述】:
我们在应用程序中使用 Stomp、SpringBoot 和 WebSockets。服务器应用程序正在执行以下操作: 1)生成要推送给用户的消息, 2) 接受 WebSocket 连接和 3) 将消息推送到 ActiveMQ stomp 代理。线程转储显示了许多与 simpMessagingTemplate convertAndSendToUser API 调用相关的等待线程。
应用程序的两个实例在云中运行。此应用程序使用 simpMessagingTemplate convertAndSendToUser API 生成消息并推送到 ActiveMQ stomp 代理(单独运行)。
我们使用 Gatling 来模拟用户 WebSocket 连接以进行负载测试。 Gatling 在单独的实例上运行。该应用程序适用于 2000 个用户连接。一旦我们将用户增加到 4000,我们就会看到消息生成线程停止。用户可以毫无问题地连接到相同的服务器。
如果我们对 simpMessagingTemplate convertAndSendToUser API 调用进行注释,那么一切正常(生成消息和新的 WebSocket 连接)。所以我们怀疑 convertAndSendToUser API 的问题。
Threaddump 堆栈跟踪如下:
"ForkJoinPool-1-worker-440" #477 daemon prio=5 os_prio=0 tid=0x00007f0c541c2800 nid=0x2a47 sleeping[0x00007f08e6371000]
java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at reactor.util.concurrent.WaitStrategy$Sleeping.waitFor(WaitStrategy.java:319)
at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:211)
at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:176)
at org.springframework.messaging.tcp.reactor.AbstractMonoToListenableFutureAdapter.get(AbstractMonoToListenableFutureAdapter.java:73)
at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemStompConnectionHandler.forward(StompBrokerRelayMessageHandler.java:980)
at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:549)
at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:234)
at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:105)
at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.messaging.simp.user.UserDestinationMessageHandler.handleMessage(UserDestinationMessageHandler.java:227)
at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:150)
at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:229)
at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:218)
at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:204)
at com.mypackage.PushMessageManager.lambda$sendMyMessage$2(PushMessageManager.java:77)
at com.mypackage.PushMessageManager$$Lambda$923/1850582969.accept(Unknown Source)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.mypackage.PushMessageManager.sendMyMessage(PushMessageManager.java:74)
at com.mypackage.PushMessageManager.lambda$processPushMessage$0(PushMessageManager.java:61)
at com.mypackage.PushMessageManager$$Lambda$664/624459498.run(Unknown Source)
at nl.talsmasoftware.context.functions.RunnableWithContext.run(RunnableWithContext.java:42)
at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at nl.talsmasoftware.context.executors.ContextAwareExecutorService$1.call(ContextAwareExecutorService.java:59)
at nl.talsmasoftware.context.delegation.RunnableAdapter.run(RunnableAdapter.java:44)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Locked ownable synchronizers:
- None
步骤如下图所示:
- Gatling JMS 发布者以每分钟 20000 条消息将 JMS 消息推送到 Active MQ 代理。请注意,这些消息不仅仅针对一位用户。它是基于 WebSocket 用户连接分发的。
- 我们的应用程序有一个 JMS 监听器来接收这些消息。我们正在运行应用程序的 2 个实例,因此有两个 JMS 侦听器来处理此消息。
- 一旦应用程序接收到 JMS 消息,它会检查缓存中的会话信息以识别连接的用户,并使用 simpMessagingTemplate convertAndSendToUser API simpMessagingTemplate.convertAndSendToUser(sessionId, "/queue/abc", payload) 推送到另一个 ActiveMQ stomp 代理。请注意,当用户首次连接到应用程序时,sessionId 存储在分布式缓存中。所以这些是有效的会话 ID。
- ActiveMQ 单脚代理然后将这些消息传播到各个用户单脚队列。
- Gatling WebSocket 客户端(每个有 2000 个用户连接)应通过 WebSocket 连接接收这些消息。
-
客户端连接和订阅看起来像这样
stompClient.connect({'username': $("#userName").val()}, function (frame) { 设置连接(真); 订阅 = stompClient.subscribe('/user/queue/abc', function (message) { showData(JSON.parse(message.body)); },headers = {'loginusername': $("#userName").val()}); });
因此,每个用户都应该只收到发给他们的消息,而不是所有消息。这就是我们在通过 WebSocket 连接时将用户连接到各个队列并使用 convertAndSendToUser 将消息推送到特定会话的原因。后端 JMS 发布者确保消息以循环方式发布给用户。
要回答您关于识别瓶颈的问题,如果我们连接 2000 个用户,一切正常。但是当我们添加更多用户时,我们看到应用程序的 JMS 侦听器无法侦听后端 Gatling JMS 负载生成器每分钟发送的 20000 条消息。 ActiveMQ JMS 队列深度因此而增加。
为了确保瓶颈是 convertAndSendToUser API,我们对 API 调用进行了注释。如果我们这样做,我们能够连接约 13k WebSocket 连接,并且后端 JMS 侦听器也能够每分钟消费所有 20000 条消息。
希望这可以澄清您的一些问题。 更新 下面给出了用于显示异步调用 simpMessagingTemplate.convertAndSendToUser API 的代码 sn-p。这里的 RepositoryUtil.executor() 是我们自己的 executor 对象的包装器。
public CompletableFuture<Void> processPushMessage(String userName, String payload) {
return ContextAwareCompletableFuture.runAsync(() -> {
sendABCMessage(payload, userName);
}, RepositoryUtil.executor());
}
public void sendABCMessage(@Payload String payload, String username) {
ArrayList<UserProfiles> userProfiles = (ArrayList<UserProfiles>) cacheService.getValue(username);
if (Objects.nonNull(userProfiles) && userProfiles.size() > 0) {
userProfiles.parallelStream()
.filter(userProfiles1 -> ("/user/queue/abc".equalsIgnoreCase(userProfiles1.getSubscribeMapping()) && username.equals(userProfiles1.getUserName())))
.forEach(userProfiles1 -> { simpMessagingTemplate.convertAndSendToUser(userProfiles1.getSessionId(), "/queue/abc", payload);
});
} else {
LOGGER.info("sendABCMessage userProfiles is null. Payload: {}", payload);
}
}
【问题讨论】:
标签: java spring spring-boot websocket stomp