【问题标题】:How to wait for Spring WebSocketStompClient to connect如何等待 Spring WebSocketStompClient 连接
【发布时间】:2016-08-14 22:02:42
【问题描述】:

我正在使用this guide 来实现一个简单的 Stomp 客户端:

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);

ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler); // for heartbeats

stompClient.setMessageConverter(new StringMessageConverter());

StompSessionHandler sessionHandler = new MySessionHandler();
stompClient.connect("ws://server/endpoint", sessionHandler);

// WAITING HERE

连接完成后应该异步报告给MySessionHandler

public class MySessionHandler extends StompSessionHandlerAdapter
{
     @Override
     public void afterConnected(StompSession session, StompHeaders connectedHeaders) 
     {
         // WAITING FOR THIS
     }
}

所以问题是:WAITING HERE 行应该如何等待WAITING FOR THIS 行?是否有特定的 Spring 方式来执行此操作?如果不是,哪种通用 Java 方式最适合这里?

【问题讨论】:

    标签: java spring multithreading concurrency spring-websocket


    【解决方案1】:

    也许java.util.concurrent.CountDownLatch 可以像这样解决您的问题:

    CountDownLatch latch = new CountDownLatch(1);
    StompSessionHandler sessionHandler = new MySessionHandler(latch);
    stompClient.connect("ws://server/endpoint", sessionHandler);
    // wait here till latch will be zero
    latch.await();
    

    还有你的MySessionHandler 实现:

    public class MySessionHandler extends StompSessionHandlerAdapter {
        private final CountDownLatch latch;
    
        public MySessionHandler(final CountDownLatch latch) {
            this.latch = latch;
        }
    
        @Override
        public void afterConnected(StompSession session, 
                                   StompHeaders connectedHeaders) {
            try {
                // do here some job
            } finally {
                latch.countDown();
            }
        }
    }
    

    【讨论】:

      【解决方案2】:

      带有闩锁的解决方案有效。后来我发现connect函数返回ListenableFuture<StompSession>,所以我们可以这样等待会话被创建:

      ListenableFuture<StompSession> future = 
                       stompClient.connect("ws://server/endpoint", sessionHandler);
      StompSession session = future.get(); // <--- this line will wait just like afterConnected()
      

      【讨论】:

      • future.get() 会给你会话对象,你应该从 afterConnect..
      【解决方案3】:

      你不需要latch,什么都不需要,afterConnected方法只有在连接建立后才会执行。

      我的例子:

      URI stompUrlEndpoint = new URI("localhost:8080/Serv/enpoint");
      
      ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
      //Calls initialize() after the container applied all property values.
      taskScheduler.afterPropertiesSet();
      
      StandardWebSocketClient webSocketClient = new StandardWebSocketClient();
      List<Transport> transports = new ArrayList<>(2);
      
      transports.add(new WebSocketTransport(webSocketClient));
      
      SockJsClient sockJsClient = new SockJsClient(transports);   
      
      WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);
      
      stompClient.setMessageConverter(new SimpleMessageConverter());          // default converter: SimpleMessageConverter
      
      // Configure a scheduler to use for heartbeats and for receipt tracking. 
      stompClient.setTaskScheduler(taskScheduler);
      
      StompSessionHandlerImp stompSessionHandlerImp = new StompSessionHandlerImp();
      
      ListenableFuture<StompSession> stompSessionFuture2 = stompClient.connect(stompUrlEndpoint.toString(), stompSessionHandlerImp);
      
      
      try {
                  stompSession = stompSessionFuture.get(10, TimeUnit.SECONDS);
              } catch (InterruptedException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              } catch (ExecutionException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              } catch (TimeoutException e) {
                  // TODO Auto-generated catch block
                  e.printStackTrace();
              }
      
      
             private class StompSessionHandlerImp extends StompSessionHandlerAdapter {
                 private StompSession session;
      
      
                @Override
                public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
                this.session =  session;
      
                session.setAutoReceipt(true);
                session.subscribe("/user/queue/bancaria.readcard", new StompFrameHandler() {
                  ...
             }
       }
      

      }

      【讨论】:

        猜你喜欢
        • 2023-03-09
        • 2015-08-05
        • 2015-07-21
        • 2020-08-24
        • 2018-01-14
        • 2011-04-18
        • 1970-01-01
        • 2017-07-03
        • 1970-01-01
        相关资源
        最近更新 更多