【问题标题】:How configure the Spring Sockjs Java Client message converters如何配置 Spring Sockjs Java Client 消息转换器
【发布时间】:2016-12-25 18:51:38
【问题描述】:

我有一个使用 STOMP 的 SockJS Java 客户端。基于此https://github.com/rstoyanchev/spring-websocket-portfolio/blob/master/src/test/java/org/springframework/samples/portfolio/web/load/StompWebSocketLoadTestClient.java

我的代码:

package mx.intercommunication.websocket.stompclient;
import org.springframework.messaging.converter.StringMessageConverter;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaders;
import org.springframework.messaging.simp.stomp.StompSession;
import org.springframework.messaging.simp.stomp.StompSessionHandlerAdapter;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;
import org.springframework.web.socket.sockjs.client.SockJsClient;
import org.springframework.web.socket.sockjs.client.Transport;
import org.springframework.web.socket.sockjs.client.WebSocketTransport;
public class StompClient {
    public StompClient(){
        cliente();
    }
    public void cliente() {
        String host = "localhost";

        int port = 8080;
        String stompUrl = "ws://{host}:{port}/Server/chat";

        StandardWebSocketClient webSocketClient = new StandardWebSocketClient();

        List<Transport> transports = new ArrayList<>(2);

        /*
         * The WebSocketTransport can be configured with:
         *      + StandardWebSocketClient in a JSR-356 runtime
         *      + JettyWebSocketClient using the Jetty 9+ native WebSocket API
         *      + Any implementation of Spring’s WebSocketClient
         */
        transports.add(new WebSocketTransport(webSocketClient));

        SockJsClient sockJsClient = new SockJsClient(transports);
        WebSocketStompClient stompClient = new WebSocketStompClient(sockJsClient);


        //Configure a scheduler to use for heartbeats and for receipt tracking. 
        //stompClient.setTaskScheduler(taskScheduler);
        //stompClient.setDefaultHeartbeat(new long[] {0, 0});

        /*
         * Set the MessageConverter to use to convert the payload of incoming and
         *  outgoing messages to and from byte[] based on object type and the "content-type" header. 
         * By default, SimpleMessageConverter is configured.
         */
        stompClient.setMessageConverter(new StringMessageConverter());


        ProducerStompSessionHandler producer = new ProducerStompSessionHandler();


         /*
         * Connect to the given WebSocket URL and notify the given
         *   org.springframework.messaging.simp.stomp.StompSessionHandler when connected on
         *   the STOMP level after the CONNECTED frame is received.
         *   
         *   Parameters:
         *          url the url to connect to
         *          handler the session handler
         *          uriVars URI variables to expand into the URL
         *   Returns:
         *          ListenableFuture for access to the session when ready for use
         * 
         */
        stompClient.connect(stompUrl, producer, host, port);


    }

    private static class ProducerStompSessionHandler extends StompSessionHandlerAdapter {

        //private final AtomicReference<Throwable> failure;

        private StompSession session;


        @Override
        public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
            this.session =  session;


            try {
                Thread.sleep(6000);
            } catch (InterruptedException e) {

                e.printStackTrace();
            }

            Json m = Json.object()
                .set("from", "cliente1")
                .set("text", "KIKO");

            String message = m.toString();

            //byte messageByteArr[] = message.getBytes();

            /*
             * Send a message to the specified destination, converting the payload to a
             *  byte[] with the help of a MessageConverter.
             *  
             * Parameters:
             *      destination:     the destination to send a message to
             *      payload:        the message payload
             * Returns:
             *      a Receiptable for tracking receipts
             */
            try {
                session.send("/app/chatchannel", message);
                //session.send("/app/chatchannel", messageByteArr);

                System.out.println("Sending message HELLO: "+message);

            } catch (Throwable t) {
                System.out.println("Message sending failed: "+t);

                //logger.error("Message sending failed at " + i, t);
                //failure.set(t);
            }


        }

        /**
         * This implementation returns String as the expected payload type
         * for STOMP ERROR frames.
         */
        @Override
        public Type getPayloadType(StompHeaders headers) {
            return String.class;
        }

        @Override
        public void handleFrame(StompHeaders headers, Object payload) {
            Exception ex = new Exception(headers.toString());
            System.out.println("STOMP ERROR frame: "+ex);
        }

        @Override
        public void handleException(StompSession session, StompCommand command, StompHeaders headers,
                byte[] payload, Throwable exception) {

            System.out.println("Handling exception: "+exception);
        }

        @Override
        public void handleTransportError(StompSession session, Throwable exception) {

            System.out.println("Transport error: "+exception);
        }


        @Override
        public String toString() {
            //return "ConsumerStompSessionHandler[messageCount=" + this.messageCount + "]";
            return "ConsumerStompSessionHandler to String....";
        }

    }
}

当我将stompClient 配置为:

stompClient.setMessageConverter(new StringMessageConverter());

然后我发送一条消息:

session.send("/app/chatchannel", message);

其中message为String类对象,服务端抛出下一个转换错误:

08:51:06,746 ERROR [org.springframework.web.socket.messaging.WebSocketAnnotationMethodMessageHandler] (clientInboundChannel-4) Unhandled exception from message handler method: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.gasmart.websocket.Message] for GenericMessage [payload=byte[33], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/app/chatchannel], content-type=[text/plain;charset=UTF-8], content-length=[33]}, simpSessionAttributes={ip=/127.0.0.1:59629}, simpHeartbeat=[J@147a2bf, contentType=text/plain;charset=UTF-8, lookupDestination=/chatchannel, simpSessionId=79431feb8b5f4a9497492ccc64f8965f, simpDestination=/app/chatchannel}]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:124)
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107)
    at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMatch(AbstractMethodMessageHandler.java:502)
    at org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler.handleMatch(SimpAnnotationMethodMessageHandler.java:497)
    at org.springframework.messaging.simp.annotation.support.SimpAnnotationMethodMessageHandler.handleMatch(SimpAnnotationMethodMessageHandler.java:87)
    at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessageInternal(AbstractMethodMessageHandler.java:461)
    at org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler.handleMessage(AbstractMethodMessageHandler.java:399)
    at org.springframework.messaging.support.ExecutorSubscribableChannel$SendTask.run(ExecutorSubscribableChannel.java:135)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

观察错误:

MessageConversionException:无法从 [[B] 转换为 [org.gasmart.websocket.Message] for GenericMessage [payload=byte[33], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/ app/chatchannel], content-type=[text/plain;charset=UTF-8], content-length=[33]}, simpSessionAttributes={ip=/127.0.0.1:59629}, simpHeartbeat=[J@147a2bf, contentType=text/plain;charset=UTF-8, lookupDestination=/chatchannel, simpSessionId=79431feb8b5f4a9497492ccc64f8965f, simpDestination=/app/chatchannel}]

看到 content-type 标头是由 SockJS Java 客户端创建的。

如果我将stompClient 配置为:

stompClient.setMessageConverter(new SimpleMessageConverter());

然后我发送一条消息:

...
String message = m.toString();      
byte messageByteArr[] = message.getBytes();
session.send("/app/chatchannel", message);

其中 message 是一个字节数组,服务器不会抛出错误。 但我需要在发送之前将所有字符串转换为字节数组。 我想了解为什么服务器可以将相同的 JSON 对象转换为 SimpleMessageConverter 和一个使用 StringMessageConverter 发送。

我比较了从 JavaScript 客户端和 Java 客户端发送的 JSON。两者都发送相同的 JSON 消息。然后我实现了一个 ChannelInterceptor 来打印在消息发送到相应控制器之前收到的消息:

public class WebSocketTraceChannelInterceptor extends ChannelInterceptorAdapter {
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex) {
         String payload = new String((byte[]) message.getPayload());
                                System.out.println("WebSocketTraceChannelInterceptor::afterSendCompletion!!    payload: "+payload);
    }   
}

对应的服务器控制器是:

@MessageMapping("/chatchannel")
    @SendTo("/topic/messages")
    public OutputMessage send(SimpMessageHeaderAccessor ha,@Payload Message message) throws Exception {
.....
}

Java 输出:

message: GenericMessage [payload=byte[33], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/app/chatchannel], **content-type=[text/plain;charset=UTF-8]**, content-length=[33]}, simpSessionAttributes={ip=/127.0.0.1:59629}, simpHeartbeat=[J@147a2bf, contentType=text/plain;charset=UTF-8, simpSessionId=79431feb8b5f4a9497492ccc64f8965f, simpDestination=/app/chatchannel}]

而 JavaScript:SockJS + Stomp.js:

message: GenericMessage [payload=byte[33], headers={simpMessageType=MESSAGE, stompCommand=SEND, nativeHeaders={destination=[/app/chatchannel], content-length=[33]}, simpSessionAttributes={ip=/127.0.0.1:57890}, simpHeartbeat=[J@d154f0, simpSessionId=innyvfme, simpDestination=/app/chatchannel}]

观察区别:content-type=[text/plain;charset=UTF-8]

Spring 消息传递中有更多的转换器:

您可以在这个包中观察 SimpleMessageConverter 和 StringMessageConverter。但是有ByteArrayMessageConverter、CompositeMessageConverter、SmartMessageConverter等。 这个转换器如何工作?

如果我想从 Java 客户端发送字符串,我需要什么转换器?

为什么服务器可以转换 JSON 消息 {"from":"cliente1","text":"KIKO"} 从 JScript 客户端发送,但不能从 Java 客户端发送?

【问题讨论】:

    标签: spring stomp spring-websocket sockjs spring-messaging


    【解决方案1】:

    尝试创建一个类似的 Bean

    public class SimpleBean{
    private String from;
    private String text; 
    public String getFrom(){return from;}
    public String getText(){return text:}
    //other methods like setters here... 
    }
    

    然后这样设置messageConverter

                MappingJackson2MessageConverter m = new MappingJackson2MessageConverter();
                stompClient.setMessageConverter(m);
    

    我没有看到在方法“afterConnected(StompSession session, StompHeaders connectedHeaders)”中为您的客户端调用任何“订阅”方法

        session.subscribe(subscribeMethod, new StompFrameHandler() {
    //.. overriding for public Type getPayloadType(StompHeaders headers) {} and
    // public void handleFrame(StompHeaders headers, Object payload) {} here
    });
    

    比调用“send”方法,使用之前定义的 SimpleBean 实例:

    SimpleBean sb=new SimpleBean();
    sb.setFrom("cliente1");
    sb.setText("KIKO");
    session.send("/app/chatchannel", sb);
    

    这应该会有所帮助。

    【讨论】:

      猜你喜欢
      • 2014-03-10
      • 1970-01-01
      • 2016-05-31
      • 2015-07-22
      • 1970-01-01
      • 1970-01-01
      • 2015-05-30
      • 2011-01-15
      • 1970-01-01
      相关资源
      最近更新 更多