【问题标题】:simpMessagingTemplate convertAndSendToUser lot of waiting threads blocking other functionalitysimpMessagingTemplate convertAndSendToUser 大量等待线程阻塞其他功能
【发布时间】:2019-06-14 12:35:45
【问题描述】:

我们在应用程序中使用 Stomp、SpringBoot 和 WebSockets。服务器应用程序正在执行以下操作: 1)生成要推送给用户的消息, 2) 接受 WebSocket 连接和 3) 将消息推送到 ActiveMQ stomp 代理。线程转储显示了许多与 simpMessagingTemplate convertAndSendToUser API 调用相关的等待线程。

应用程序的两个实例在云中运行。此应用程序使用 simpMessagingTemplate convertAndSendToUser API 生成消息并推送到 ActiveMQ stomp 代理(单独运行)。

我们使用 Gatling 来模拟用户 WebSocket 连接以进行负载测试。 Gatling 在单独的实例上运行。该应用程序适用于 2000 个用户连接。一旦我们将用户增加到 4000,我们就会看到消息生成线程停止。用户可以毫无问题地连接到相同的服务器。

如果我们对 simpMessagingTemplate convertAndSendToUser API 调用进行注释,那么一切正常(生成消息和新的 WebSocket 连接)。所以我们怀疑 convertAndSendToUser API 的问题。

Threaddump 堆栈跟踪如下:

"ForkJoinPool-1-worker-440" #477 daemon prio=5 os_prio=0 tid=0x00007f0c541c2800 nid=0x2a47 sleeping[0x00007f08e6371000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at reactor.util.concurrent.WaitStrategy$Sleeping.waitFor(WaitStrategy.java:319)
	at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:211)
	at reactor.core.publisher.MonoProcessor.block(MonoProcessor.java:176)
	at org.springframework.messaging.tcp.reactor.AbstractMonoToListenableFutureAdapter.get(AbstractMonoToListenableFutureAdapter.java:73)
	at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler$SystemStompConnectionHandler.forward(StompBrokerRelayMessageHandler.java:980)
	at org.springframework.messaging.simp.stomp.StompBrokerRelayMessageHandler.handleMessageInternal(StompBrokerRelayMessageHandler.java:549)
	at org.springframework.messaging.simp.broker.AbstractBrokerMessageHandler.handleMessage(AbstractBrokerMessageHandler.java:234)
	at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
	at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:105)
	at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
	at org.springframework.messaging.simp.user.UserDestinationMessageHandler.handleMessage(UserDestinationMessageHandler.java:227)
	at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:138)
	at org.springframework.messaging.support.ExecutorSubscribableChannel.sendInternal(ExecutorSubscribableChannel.java:94)
	at org.springframework.messaging.support.AbstractMessageChannel.send(AbstractMessageChannel.java:119)
	at org.springframework.messaging.simp.SimpMessagingTemplate.sendInternal(SimpMessagingTemplate.java:187)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:162)
	at org.springframework.messaging.simp.SimpMessagingTemplate.doSend(SimpMessagingTemplate.java:48)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
	at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:150)
	at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:229)
	at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:218)
	at org.springframework.messaging.simp.SimpMessagingTemplate.convertAndSendToUser(SimpMessagingTemplate.java:204)
	at com.mypackage.PushMessageManager.lambda$sendMyMessage$2(PushMessageManager.java:77)
	at com.mypackage.PushMessageManager$$Lambda$923/1850582969.accept(Unknown Source)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
	at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
	at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
	at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at com.mypackage.PushMessageManager.sendMyMessage(PushMessageManager.java:74)
	at com.mypackage.PushMessageManager.lambda$processPushMessage$0(PushMessageManager.java:61)
	at com.mypackage.PushMessageManager$$Lambda$664/624459498.run(Unknown Source)
	at nl.talsmasoftware.context.functions.RunnableWithContext.run(RunnableWithContext.java:42)
	at java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at nl.talsmasoftware.context.executors.ContextAwareExecutorService$1.call(ContextAwareExecutorService.java:59)
	at nl.talsmasoftware.context.delegation.RunnableAdapter.run(RunnableAdapter.java:44)
	at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

   Locked ownable synchronizers:
	- None

步骤如下图所示:

  1. Gatling JMS 发布者以每分钟 20000 条消息将 JMS 消息推送到 Active MQ 代理。请注意,这些消息不仅仅针对一位用户。它是基于 WebSocket 用户连接分发的。
  2. 我们的应用程序有一个 JMS 监听器来接收这些消息。我们正在运行应用程序的 2 个实例,因此有两个 JMS 侦听器来处理此消息。
  3. 一旦应用程序接收到 JMS 消息,它会检查缓存中的会话信息以识别连接的用户,并使用 simpMessagingTemplate convertAndSendToUser API simpMessagingTemplate.convertAndSendToUser(sessionId, "/queue/abc", payload) 推送到另一个 ActiveMQ stomp 代理。请注意,当用户首次连接到应用程序时,sessionId 存储在分布式缓存中。所以这些是有效的会话 ID。
  4. ActiveMQ 单脚代理然后将这些消息传播到各个用户单脚队列。
  5. Gatling WebSocket 客户端(每个有 2000 个用户连接)应通过 WebSocket 连接接收这些消息。
  6. 客户端连接和订阅看起来像这样

    stompClient.connect({'username': $("#userName").val()}, function (frame) { 设置连接(真); 订阅 = stompClient.subscribe('/user/queue/abc', function (message) { showData(JSON.parse(message.body)); },headers = {'loginusername': $("#userName").val()}); });

因此,每个用户都应该只收到发给他们的消息,而不是所有消息。这就是我们在通过 WebSocket 连接时将用户连接到各个队列并使用 convertAndSendToUser 将消息推送到特定会话的原因。后端 JMS 发布者确保消息以循环方式发布给用户。

要回答您关于识别瓶颈的问题,如果我们连接 2000 个用户,一切正常。但是当我们添加更多用户时,我们看到应用程序的 JMS 侦听器无法侦听后端 Gatling JMS 负载生成器每分钟发送的 20000 条消息。 ActiveMQ JMS 队列深度因此而增加。

为了确保瓶颈是 convertAndSendToUser API,我们对 API 调用进行了注释。如果我们这样做,我们能够连接约 13k WebSocket 连接,并且后端 JMS 侦听器也能够每分钟消费所有 20000 条消息。

希望这可以澄清您的一些问题。 更新 下面给出了用于显示异步调用 simpMessagingTemplate.convertAndSendToUser API 的代码 sn-p。这里的 RepositoryUtil.executor() 是我们自己的 executor 对象的包装器。

    public CompletableFuture<Void> processPushMessage(String userName, String payload) {
    return ContextAwareCompletableFuture.runAsync(() -> {
        sendABCMessage(payload, userName);
    }, RepositoryUtil.executor());
}

public void sendABCMessage(@Payload String payload, String username) {
    ArrayList<UserProfiles> userProfiles = (ArrayList<UserProfiles>) cacheService.getValue(username);
    if (Objects.nonNull(userProfiles) && userProfiles.size() > 0) {
      userProfiles.parallelStream()
          .filter(userProfiles1 -> ("/user/queue/abc".equalsIgnoreCase(userProfiles1.getSubscribeMapping()) && username.equals(userProfiles1.getUserName())))
          .forEach(userProfiles1 -> {              simpMessagingTemplate.convertAndSendToUser(userProfiles1.getSessionId(), "/queue/abc", payload);
          });
    } else {
      LOGGER.info("sendABCMessage userProfiles is null. Payload: {}", payload);
    }
}

【问题讨论】:

    标签: java spring spring-boot websocket stomp


    【解决方案1】:

    我们可以通过移动到 /user/topic 而不是 /user/queue 来解决问题。我们现在能够每分钟处理约 35k 条来自后端的消息和 8k 个 Web 套接字用户连接。

    【讨论】:

      【解决方案2】:

      该应用程序适用于 2000 个用户连接,每分钟负载 20,000 条消息。一旦我们将用户增加到 4000,我们就会看到消息生成线程停止。

      如果您将 20,000 条消息推送到 ActiveMQ 并且每条消息有 1,000 个订阅者,这意味着将有 20,000,000 条消息 (1,000 * 20,000) 发布回 WebSocket 客户端。因此,尝试确定流过的消息总量并了解瓶颈在哪里(服务器将消息转发到 ActiveMQ,ActiveMQ 处理消息,或者服务器将消息发布到 WebSocket 客户端)。

      对于这 20,000 条消息,它们是从单个线程生成的,还是从大量不同线程发送的,例如由于处理来自 WebSocket 客户端或 REST HTTP 调用的消息?如果是后者,可能是有太多线程同时尝试将消息转发到代理,您可能必须应用某种速率限制。

      在一天结束时,您需要了解总容量、瓶颈在哪里以及在哪里应用一些速率限制。

      【讨论】:

      • 感谢您的回复。实际上,我们正在使用 gating WebSocket 来让用户连接。
      • 在成功的时候,我们已经连接了2000个不同会话ID的用户。假设用户 a1,a2,a3....a1999,a2000 已连接,并且 20,000 条消息将针对所有 2000 个用户,并且将以循环方式.. 即; 20,000/20=每个用户或会话 ID 10 条消息。如果我们将连接的用户增加到约 4000 人,那么几分钟后所有用户都会下降。我们同时进行了线程转储,并且与上面相同。
      • @Rossen Stoyanchev 我通过添加数据流程图编辑了原始帖子,并尝试解释流程。
      • 谢谢。 ActiveMQ (JMS) -> 应用程序 -> ActiveMQ (STOMP) 通过应用程序的额外跃点是否有原因?发布到 ActiveMQ (JMS) 的任何内容都不能发布到 ActiveMQ (STOMP) 吗?
      • @RossenStoyanchev 该应用程序是一个通用组件,应充当网关组件,接受所有 Web 套接字连接、验证用户(令牌)、维护会话并充当通用组件,处理来自其他后端组件(通过 ActiveMQ-JMS)的消息,然后将其发布到 stomp 服务器。您认为这种方法存在瓶颈吗?
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-11-16
      • 2013-07-11
      • 1970-01-01
      • 2021-07-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多