【问题标题】:How to create simple okhttp3 websocket connection?如何创建简单的 okhttp3 websocket 连接?
【发布时间】:2017-01-17 21:04:34
【问题描述】:

谁能给我看一个例子,说明如何使用 okhttp3 okhttp-ws 库建立与具有特定授权标头的 wss:// 地址的连接?

我只有 WS 服务器的 url 和授权字符串令牌。

稍后,我必须能够向该连接发送请求,侦听来自 WS 服务器的即将到来的数据,然后关闭连接。我在这个新的 WS 世界中遇到了困难,一直只使用 REST(也使用 okhttp3)

【问题讨论】:

    标签: web-services websocket okhttp3


    【解决方案1】:

    所以通常这个样本是你最需要的

    https://github.com/square/okhttp/blob/d854e6d5ad93da4da9b5d5818ee752477e501b18/samples/guide/src/main/java/okhttp3/recipes/WebSocketEcho.java

    但是你会有两个变化

    1. 在您的 URL 中使用 wss 而不是 ws
    2. 调用 request.addHeader 来添加你的令牌

      request.addHeader("Authorization", "Bearer" + token)

    【讨论】:

    • 这里看到了onOpen方法,但是默认不调用。我如何实际打开与您的代码的连接?
    • 在run方法中被enqueue间接调用
    • 是的,它正在工作,抱歉这个愚蠢的问题,我设法打开了一个连接
    • 但是现在如何在收到消息后向 WS 服务器发送消息呢?我看到消息仅在连接时发送(覆盖 onOpen 方法)。我在 onMessage 方法中添加了一些代码来处理服务器消息并构造新的 json-message 以将其发送回来,但我无法将其发送到这里
    【解决方案2】:

    我知道这是一个老问题,但是当我尝试将 websocket 与 okhttp3 一起使用时,有很多我想要的选项,但它不在库中。所以我创建了一个类来处理具有额外功能的 WS 连接。我希望它会帮助一些身体。 Gist link

    import android.os.Handler;
    import android.os.Looper;
    import android.support.annotation.NonNull;
    import android.util.Log;
    
    import org.json.JSONException;
    import org.json.JSONObject;
    
    import java.net.ProtocolException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    
    import okhttp3.OkHttpClient;
    import okhttp3.Request;
    import okhttp3.Response;
    import okhttp3.WebSocket;
    import okhttp3.WebSocketListener;
    import okhttp3.internal.ws.RealWebSocket;
    import okhttp3.logging.HttpLoggingInterceptor;
    import okio.ByteString;
    
    /**
     * Websocket class based on OkHttp3 with {event->data} message format to make your life easier.
     *
     * @author Ali Yusuf
     * @since 3/13/17
     */
    
    public class Socket {
    
        private final static String TAG = Socket.class.getSimpleName();
        private final static String CLOSE_REASON = "End of session";
        private final static int MAX_COLLISION = 7;
    
        public final static String EVENT_OPEN = "open";
        public final static String EVENT_RECONNECT_ATTEMPT = "reconnecting";
        public final static String EVENT_CLOSED = "closed";
    
        /**
         * Main socket states
         */
        public enum State {
            CLOSED, CLOSING, CONNECT_ERROR, RECONNECT_ATTEMPT, RECONNECTING, OPENING, OPEN
        }
    
        private static HttpLoggingInterceptor logging =
                new HttpLoggingInterceptor()
                        .setLevel(BuildConfig.DEBUG ? HttpLoggingInterceptor.Level.HEADERS : HttpLoggingInterceptor.Level.NONE);
    
        private static OkHttpClient.Builder httpClient =
                new OkHttpClient.Builder()
                        .addInterceptor(logging);
    
        public static class Builder {
    
            private Request.Builder request;
    
            private Builder(Request.Builder request) {
                this.request = request;
            }
    
            public static Builder with(@NonNull String url) {
                // Silently replace web socket URLs with HTTP URLs.
                if (!url.regionMatches(true, 0, "ws:", 0, 3) && !url.regionMatches(true, 0, "wss:", 0, 4))
                    throw new IllegalArgumentException("web socket url must start with ws or wss, passed url is " + url);
    
                return new Builder(new Request.Builder().url(url));
            }
    
            public Builder setPingInterval(long interval, @NonNull TimeUnit unit){
                httpClient.pingInterval(interval, unit);
                return this;
            }
    
            public Builder addHeader(@NonNull String name, @NonNull String value) {
                request.addHeader(name, value);
                return this;
            }
    
            public Socket build() {
                return new Socket(request.build());
            }
        }
    
        /**
         * Websocket state
         */
        private static State state;
        /**
         * Websocket main request
         */
        private static Request request;
        /**
         * Websocket connection
         */
        private static RealWebSocket realWebSocket;
        /**
         * Reconnection post delayed handler
         */
        private static Handler delayedReconnection;
        /**
         * Websocket events listeners
         */
        private static Map<String,OnEventListener> eventListener;
        /**
         * Websocket events new message listeners
         */
        private static Map<String,OnEventResponseListener> eventResponseListener;
        /**
         * Message list tobe send onEvent open {@link State#OPEN} connection state
         */
        private static Map<String,String> onOpenMessageQueue = new HashMap<>();
        /**
         * Websocket state change listener
         */
        private static OnStateChangeListener onChangeStateListener;
        /**
         * Websocket new message listener
         */
        private static OnMessageListener messageListener;
        /**
         * Number of reconnection attempts
         */
        private static int reconnectionAttempts;
        private static boolean skipOnFailure;
    
        private Socket(Request request) {
            Socket.request = request;
            state = State.CLOSED;
            eventListener = new HashMap<>();
            eventResponseListener = new HashMap<>();
            delayedReconnection = new Handler(Looper.getMainLooper());
            skipOnFailure = false;
        }
    
        /**
         * Start socket connection if i's not already started
         */
        public Socket connect() {
            if (httpClient == null) {
                throw new IllegalStateException("Make sure to use Socket.Builder before using Socket#connect.");
            }
            if (realWebSocket == null) {
                realWebSocket = (RealWebSocket) httpClient.build().newWebSocket(request, webSocketListener);
                changeState(State.OPENING);
            } else if (state == State.CLOSED) {
                realWebSocket.connect(httpClient.build());
                changeState(State.OPENING);
            }
            return this;
        }
    
        /**
         * Set listener which fired every time message received with contained data.
         *
         * @param listener message on arrive listener
         */
        public Socket onEvent(@NonNull String event, @NonNull OnEventListener listener){
            eventListener.put(event,listener);
            return this;
        }
    
        /**
         * Set listener which fired every time message received with contained data.
         *
         * @param listener message on arrive listener
         */
        public Socket onEventResponse(@NonNull String event, @NonNull OnEventResponseListener listener){
            eventResponseListener.put(event,listener);
            return this;
        }
    
        /**
         * Send message in {event->data} format
         *
         * @param event event name that you want sent message to
         * @param data message data in JSON format
         * @return true if the message send/on socket send quest; false otherwise
         */
        public boolean send(@NonNull String event, @NonNull String data){
            try {
                JSONObject text = new JSONObject();
                text.put("event", event);
                text.put("data", new JSONObject(data));
                Log.v(TAG,"Try to send data "+text.toString());
                return realWebSocket.send(text.toString());
            } catch (JSONException e) {
                Log.e(TAG,"Try to send data with wrong JSON format, data: "+data);
            }
            return false;
        }
    
        /**
         * Set state listener which fired every time {@link Socket#state} changed.
         *
         * @param listener state change listener
         */
        public Socket setOnChangeStateListener(@NonNull OnStateChangeListener listener) {
            onChangeStateListener = listener;
            return this;
        }
    
        /**
         * Message listener will be called in any message received even if it's not
         * in a {event -> data} format.
         *
         * @param listener message listener
         */
        public Socket setMessageListener(@NonNull OnMessageListener listener) {
            messageListener = listener;
            return this;
        }
    
        public void removeEventListener(@NonNull String event) {
            eventListener.remove(event);
            onOpenMessageQueue.remove(event);
        }
    
        /**
         * Clear all socket listeners in one line
         */
        public void clearListeners() {
            eventListener.clear();
            messageListener = null;
            onChangeStateListener = null;
        }
    
        /**
         * Send normal close request to the host
         */
        public void close() {
            if (realWebSocket != null) {
                realWebSocket.close(1000, CLOSE_REASON);
            }
        }
    
        /**
         * Send close request to the host
         */
        public void close(int code, @NonNull String reason) {
            if (realWebSocket != null) {
                realWebSocket.close(code, reason);
            }
        }
    
        /**
         * Terminate the socket connection permanently
         */
        public void terminate() {
            skipOnFailure = true; // skip onFailure callback
            if (realWebSocket != null) {
                realWebSocket.cancel(); // close connection
                realWebSocket = null; // clear socket object
            }
        }
    
        /**
         * Add message in a queue if the socket not open and send them
         * if the socket opened
         *
         * @param event event name that you want sent message to
         * @param data message data in JSON format
         */
        public void sendOnOpen(@NonNull String event, @NonNull String data) {
            if (state != State.OPEN)
                onOpenMessageQueue.put(event,data);
            else
                send(event,data);
        }
    
        /**
         * Retrieve current socket connection state {@link State}
         */
        public State getState() {
            return state;
        }
    
        /**
         * Change current state and call listener method with new state
         * {@link OnStateChangeListener#onChange(Socket, State)}
         * @param newState new state
         */
        private void changeState(State newState) {
            state = newState;
            if (onChangeStateListener != null) {
                onChangeStateListener.onChange(Socket.this, state);
            }
        }
    
        /**
         * Try to reconnect to the websocket after delay time using <i>Exponential backoff</i> method.
         * @see <a href="https://en.wikipedia.org/wiki/Exponential_backoff"></a>
         */
        private void reconnect() {
            if (state != State.CONNECT_ERROR) // connection not closed !!
                return;
    
            changeState(State.RECONNECT_ATTEMPT);
    
            if (realWebSocket != null) {
                // Cancel websocket connection
                realWebSocket.cancel();
                // Clear websocket object
                realWebSocket = null;
            }
    
            if (eventListener.get(EVENT_RECONNECT_ATTEMPT) != null) {
                eventListener.get(EVENT_RECONNECT_ATTEMPT).onMessage(Socket.this, EVENT_RECONNECT_ATTEMPT);
            }
    
            // Calculate delay time
            int collision = reconnectionAttempts > MAX_COLLISION ? MAX_COLLISION : reconnectionAttempts;
            long delayTime = Math.round((Math.pow(2, collision)-1)/2) * 1000;
    
            // Remove any pending posts of callbacks
            delayedReconnection.removeCallbacksAndMessages(null);
            // Start new post delay
            delayedReconnection.postDelayed(new Runnable() {
                @Override
                public void run() {
                    changeState(State.RECONNECTING);
                    reconnectionAttempts++; // Increment connections attempts
                    connect(); // Establish new connection
                }
            }, delayTime);
        }
    
        private WebSocketListener webSocketListener = new WebSocketListener() {
            @Override
            public void onOpen(WebSocket webSocket, Response response) {
                Log.v(TAG,"Socket has been opened successfully.");
                // reset connections attempts counter
                reconnectionAttempts = 0;
    
                // fire open event listener
                if (eventListener.get(EVENT_OPEN) != null) {
                    eventListener.get(EVENT_OPEN).onMessage(Socket.this, EVENT_OPEN);
                }
    
                // Send data in queue
                for (String event : onOpenMessageQueue.keySet()) {
                    send(event, onOpenMessageQueue.get(event));
                }
                // clear queue
                onOpenMessageQueue.clear();
    
                changeState(State.OPEN);
            }
    
            /**
             * Accept only Json data with format:
             * <b> {"event":"event name","data":{some data ...}} </b>
             */
            @Override
            public void onMessage(WebSocket webSocket, String text) {
                // print received message in log
                Log.v(TAG, "New Message received "+text);
    
                // call message listener
                if (messageListener != null)
                    messageListener.onMessage(Socket.this, text);
    
                try {
                    // Parse message text
                    JSONObject response = new JSONObject(text);
                    String event = response.getString("event");
                    JSONObject data = response.getJSONObject("data");
    
                    // call event listener with received data
                    if (eventResponseListener.get(event) != null) {
                        eventResponseListener.get(event).onMessage(Socket.this, event, data);
                    }
                    // call event listener
                    if (eventListener.get(event) != null) {
                        eventListener.get(event).onMessage(Socket.this, event);
                    }
                } catch (JSONException e) {
                    // Message text not in JSON format or don't have {event}|{data} object
                    Log.e(TAG, "Unknown message format.");
                }
            }
    
            @Override
            public void onMessage(WebSocket webSocket, ByteString bytes) {
                // TODO: some action
            }
    
            @Override
            public void onClosing(WebSocket webSocket, int code, String reason) {
                Log.v(TAG,"Close request from server with reason '"+reason+"'");
                changeState(State.CLOSING);
                webSocket.close(1000,reason);
            }
    
            @Override
            public void onClosed(WebSocket webSocket, int code, String reason) {
                Log.v(TAG,"Socket connection closed with reason '"+reason+"'");
                changeState(State.CLOSED);
                if (eventListener.get(EVENT_CLOSED) != null) {
                    eventListener.get(EVENT_CLOSED).onMessage(Socket.this, EVENT_CLOSED);
                }
            }
    
            /**
             * This method call if:
             * - Fail to verify websocket GET request  => Throwable {@link ProtocolException}
             * - Can't establish websocket connection after upgrade GET request => response null, Throwable {@link Exception}
             * - First GET request had been failed => response null, Throwable {@link java.io.IOException}
             * - Fail to send Ping => response null, Throwable {@link java.io.IOException}
             * - Fail to send data frame => response null, Throwable {@link java.io.IOException}
             * - Fail to read data frame => response null, Throwable {@link java.io.IOException}
             */
            @Override
            public void onFailure(WebSocket webSocket, Throwable t, Response response) {
                if (!skipOnFailure) {
                    skipOnFailure = false; // reset flag
                    Log.v(TAG, "Socket connection fail, try to reconnect. (" + reconnectionAttempts + ")");
                    changeState(State.CONNECT_ERROR);
                    reconnect();
                }
            }
        };
    
        public abstract static class OnMessageListener {
            public abstract void onMessage (String data);
    
            /**
             * Method called from socket to execute listener implemented in
             * {@link #onMessage(String)} on main thread
             *
             * @param socket Socket that receive the message
             * @param data Data string received
             */
            private void onMessage (Socket socket, final String data) {
                new Handler(Looper.getMainLooper()).post(new Runnable() {
                    @Override
                    public void run() {
                        onMessage(data);
                    }
                });
            }
        }
    
        public abstract static class OnEventListener {
            public abstract void onMessage (String event);
    
            private void onMessage (Socket socket, final String event) {
                new Handler(Looper.getMainLooper()).post(new Runnable() {
                    @Override
                    public void run() {
                        onMessage(event);
                    }
                });
            }
        }
    
        public abstract static class OnEventResponseListener extends OnEventListener {
            /**
             * Method need to override in listener usage
             */
            public abstract void onMessage (String event, String data);
    
            /**
             * Just override the inherited method
             */
            @Override
            public void onMessage(String event) {}
    
            /**
             * Method called from socket to execute listener implemented in
             * {@link #onMessage(String, String)} on main thread
             *
             * @param socket Socket that receive the message
             * @param event Message received event
             * @param data Data received in the message
             */
            private void onMessage (Socket socket, final String event, final JSONObject data) {
                new Handler(Looper.getMainLooper()).post(new Runnable() {
                    @Override
                    public void run() {
                        onMessage(event, data.toString());
                        onMessage(event);
                    }
                });
            }
        }
    
        public abstract static class OnStateChangeListener {
            /**
             * Method need to override in listener usage
             */
            public abstract void onChange (State status);
    
            /**
             * Method called from socket to execute listener implemented in
             * {@link #onChange(State)} on main thread
             *
             * @param socket Socket that receive the message
             * @param status new status
             */
            private void onChange (Socket socket, final State status){
                new Handler(Looper.getMainLooper()).post(new Runnable() {
                    @Override
                    public void run() {
                        onChange(status);
                    }
                });
            }
        }
    
    }
    

    使用示例:

    Socket socket = Socket.Builder.with(WEBSOCKET_BASE_URL).build().connect();
    socket.onEvent(Socket.EVENT_OPEN, socketOpenListener);
    socket.onEvent(Socket.EVENT_RECONNECT_ATTEMPT, .....);
    socket.onEvent(Socket.EVENT_CLOSED, .....);
    socket.onEventResponse("Some event", socketPairListener);
    socket.send("Some event", "{"some data":"in JSON format"}");
    socket.sendOnOpen("Some event", "{"some data":"in JSON format"}");
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-06-01
      • 2018-02-03
      • 1970-01-01
      • 1970-01-01
      • 2017-01-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多