【发布时间】:2016-01-02 20:53:03
【问题描述】:
我正在尝试让一个非常基本的基于 RxJava 的应用程序工作。我已经定义了以下 Observable 类,它从文件中读取并返回行:
public Observable<String> getObservable() throws IOException
{
return Observable.create(subscribe -> {
InputStream in = getClass().getResourceAsStream("/trial.txt");
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
String line = null;
try {
while((line = reader.readLine()) != null)
{
subscribe.onNext(line);
}
} catch (IOException e) {
subscribe.onError(e);
}
finally {
subscribe.onCompleted();
}
});
}
接下来我定义了订阅者代码:
public static void main(String[] args) throws IOException, InterruptedException {
Thread thread = new Thread(() ->
{
RxObserver observer = new RxObserver();
try {
observer.getObservable()
.observeOn(Schedulers.io())
.subscribe( x ->System.out.println(x),
t -> System.out.println(t),
() -> System.out.println("Completed"));
} catch (IOException e) {
e.printStackTrace();
}
});
thread.start();
thread.join();
}
该文件有近 50000 条记录。运行应用程序时,我收到“rx.exceptions.MissingBackpressureException”。我浏览了一些文档,并按照建议,尝试在调用链中添加“.onBackpressureBuffer()”方法。但是我没有得到异常,但完成的调用也没有被解雇。
在我们有一个快速生成的 Observable 的情况下,处理这种情况的正确方法是什么?
【问题讨论】:
标签: java rx-java rx-android