【问题标题】:RxJava2 how to observe UDP packets?RxJava2如何观察UDP数据包?
【发布时间】:2017-02-19 11:27:36
【问题描述】:

我刚刚开始使用 RxJava2,想知道如何正确实现 UDP observable。
我已经有了一些工作代码,但我认为可能存在一些问题:请参阅下面源代码的 cmets 中的 4 个问题。

我还在 GitHub RxJava2_Udp 上发布了代码:欢迎 cmets、issues 和 pull requests。

class UdpObservable {

    private static class UdpThread extends Thread {
        private final int portNo;
        private final int bufferSizeInBytes;
        private final ObservableEmitter<DatagramPacket> emitter;
        private DatagramSocket udpSocket;

        private UdpThread(@NonNull ObservableEmitter<DatagramPacket> emitter
                , int portNo, int bufferSizeInBytes) {
            this.emitter = emitter;
            this.portNo = portNo;
            this.bufferSizeInBytes = bufferSizeInBytes;
        }

        @Override
        public void run() {
            try {
                // we don't want to create the DatagramSocket in the constructor, because this
                // might raise an Exception that the observer wants to handle
                udpSocket = new DatagramSocket(portNo);
                try {
                    /* QUESTION 1:
                       Do I really need to check isInterrupted() and emitter.isDisposed()?

                       When the thread is interrupted an interrupted exception will
                       be raised anyway and the emitter is being disposed (this is what
                       caused the interruption)
                    */
                    while (!isInterrupted() && !emitter.isDisposed()) {
                        byte[] rcvBuffer = new byte[bufferSizeInBytes];
                        DatagramPacket datagramPacket = new DatagramPacket(rcvBuffer, rcvBuffer.length);
                        udpSocket.receive(datagramPacket);
                        // QUESTION 1a: same as QUESTION 1 above
                        if (!isInterrupted() && !emitter.isDisposed()) {
                            emitter.onNext(datagramPacket);
                        }
                    }
                } finally {
                    closeUdpSocket();
                }
            } catch (Throwable th) {
                // the thread will only be interrupted when the observer has unsubscribed:
                // so we need not report it
                if (!isInterrupted()) {
                    if (!emitter.isDisposed()) {
                        emitter.onError(th);
                    } else {
                        // QUESTION 2: is this the correct way to handle errors, when the emitter
                        //             is already disposed?
                        RxJavaPlugins.onError(th);
                    }
                }
            }
        }

        private void closeUdpSocket() {
            if (!udpSocket.isClosed()) {
                udpSocket.close();
            }
        }

        @Override
        public void interrupt() {
            super.interrupt();
            // QUESTION 3: this is called from an external thread, right, so
            //             how can we correctly synchronize the access to udpSocket?
            closeUdpSocket();
        }
    }

    /**
     * creates an Observable that will emit all UDP datagrams of a UDP port.
     * <p>
     * This will be an infinite stream that ends when the observer unsubscribes, or when an error
     * occurs. The observer does not handle backpressure.
     * </p>
     */
    public static Observable<DatagramPacket> create(final int portNo, final int bufferSizeInBytes) {
        return Observable.create(
                new ObservableOnSubscribe<DatagramPacket>() {
                    @Override
                    public void subscribe(ObservableEmitter<DatagramPacket> emitter) throws Exception {
                        final UdpThread udpThread = new UdpThread(emitter, portNo, bufferSizeInBytes);
                        /* QUESTION 4: Is this the right way to handle unsubscription?
                         */
                        emitter.setCancellable(new Cancellable() {
                            @Override
                            public void cancel() throws Exception {
                                udpThread.interrupt();
                            }
                        });
                        udpThread.start();
                    }
                }
        );
    }

}

【问题讨论】:

  • 缺少“cmets 中的 4 个问题”。与他们一起更新问题。
  • 它们在源代码 cmets 中

标签: android multithreading udp rx-java2


【解决方案1】:
  • 一般来说,我认为这不是正确的创建方式,你不应该自己创建线程,因为 RxJava 和它的 Schedulers 应该为你做。
    考虑到在 ObservableOnSubscribe 处执行的代码将根据您的 Scheduler 策略在线程上运行,因此您不需要自己构建它。只需在 create 中执行 ude while 循环即可。
  • 您不需要调用Thread.interrupt() 方法,RxJava 会在您处理(取消订阅)Observable 时为您执行此操作。 (当然在while循环之前设置cancelable

至于你的问题:

  1. 您不需要检查中断,因为异常会 如果你正在等待io操作,你也不需要 检查处置,因为onNext() 会为您处理,并且会 不发出未订阅的。

  2. 您可以再次调用onError,发射器将负责检查Observable 是否已取消订阅。

  3. 如前所述,应该没有Thread,但是对于资源清理,可以使用emitter.setCancellable方法。 (关闭流),这发生在您的代码运行的同一线程上。
  4. 之前回答过,Thread.interrput() 将通过 RxJava 的 dispose/unsubscribe 引发,资源清理应该转到 emitter.setCancellable 方法

【讨论】:

  • 感谢您的完美回答。这帮助我学到了很多东西,UdpObservable 中的代码现在更加简洁明了。
  • 感谢您提供此信息,@TmTron。很有帮助!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-06-04
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-04-04
  • 1970-01-01
相关资源
最近更新 更多