【问题标题】:RxJava and rx.exceptions.MissingBackpressureException exceptionRxJava 和 rx.exceptions.MissingBackpressureException 异常
【发布时间】:2016-01-02 20:53:03
【问题描述】:

我正在尝试让一个非常基本的基于 RxJava 的应用程序工作。我已经定义了以下 Observable 类,它从文件中读取并返回行:

public Observable<String> getObservable() throws IOException
    {
        return Observable.create(subscribe -> {
            InputStream in = getClass().getResourceAsStream("/trial.txt");
            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
            String line = null;
            try {
                while((line = reader.readLine()) != null)
                {
                    subscribe.onNext(line);
                }
            } catch (IOException e) {
                subscribe.onError(e);
            }
            finally {
                subscribe.onCompleted();
            }
        });
    }

接下来我定义了订阅者代码:

public static void main(String[] args) throws IOException, InterruptedException {
        Thread thread = new Thread(() ->
        {
            RxObserver observer = new RxObserver();
            try {
                observer.getObservable()
                        .observeOn(Schedulers.io())
                        .subscribe( x ->System.out.println(x),
                                    t -> System.out.println(t),
                                    () -> System.out.println("Completed"));

            } catch (IOException e) {
                e.printStackTrace();
            }
        });

        thread.start();
        thread.join();
    }

该文件有近 50000 条记录。运行应用程序时,我收到“rx.exceptions.MissingBackpressureException”。我浏览了一些文档,并按照建议,尝试在调用链中添加“.onBackpressureBuffer()”方法。但是我没有得到异常,但完成的调用也没有被解雇。

在我们有一个快速生成的 Observable 的情况下,处理这种情况的正确方法是什么?

【问题讨论】:

    标签: java rx-java rx-android


    【解决方案1】:

    第一个问题是您的 readLine 逻辑忽略了背压。您可以在 observeOn 之前申请 onBackpressureBuffer() 以开始,但最近添加了 SyncOnSubscribe 可以让您一一生成值并处理背压:

    SyncOnSubscribe.createSingleState(() => {
        try {
            InputStream in = getClass().getResourceAsStream("/trial.txt");
            return new BufferedReader(new InputStreamReader(in));
        } catch (IOException ex) {
            throw new RuntimeException(ex);
        }
    },
    (s, o) -> {
        try {
            String line = s.readLine();
            if (line == null) {
                o.onCompleted();
            } else {
                o.onNext(line);
            }
        } catch (IOException ex) {
            s.onError(ex);
        }
    },
    s -> {
        try {
           s.close();
        } catch (IOException ex) {
        }
    });
    

    第二个问题是您的线程将在 io 线程上的所有元素都已交付之前完成,因此主程序退出。删除observeOn,添加.toBlocking 或使用CountDownLatch

    RxObserver observer = new RxObserver();
    try {
    
        CountDownLatch cdl = new CountDownLatch(1);
    
        observer.getObservable()
               .observeOn(Schedulers.io())
               .subscribe( x ->System.out.println(x),
                           t -> { System.out.println(t); cdl.countDown(); },
                           () -> { System.out.println("Completed"); cdl.countDown(); });
    
        cdl.await();
     } catch (IOException | InterruptedException e) {
         e.printStackTrace();
     }
    

    【讨论】:

    • SyncOnSubscribe 是最新的 RxJava 包的一部分吗?我有 1.0.8 版本的 RxJava 框架?
    • 知道了。包括最新版本 1.1.0,它包括 SyncOnSubscribe 类。
    【解决方案2】:

    这里的问题是 observeOn 运算符,因为每个 Observer 的 onNext() 调用都计划在单独的线程上调用,所以您的 Observable 会在循环中不断产生这些预定调用,而不管订阅者如何 (observeOn)容量。

    如果你保持同步,在订阅者完成前一个元素之前,Observable 不会发出下一个元素,因为这一切都在一个线程上完成,你将不再有背压问题。

    如果您仍想使用 observeOn,则必须在 Observable 的 OnSubscribe#call 方法中实现背压逻辑

    【讨论】:

    • 这已经很老了,但是摆脱了 observeOn 为我解决了背压,您能否详细说明为什么会这样?我能感觉到一个 observable 应该能够对多个线程做同样的事情
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-07-28
    • 2018-05-27
    • 2023-03-22
    • 1970-01-01
    • 2014-08-06
    • 2017-07-10
    • 1970-01-01
    相关资源
    最近更新 更多