【问题标题】:My subscriber's onNext and onComplete functions do not run when I call onNext within my FlowableOnSubscribe class当我在 FlowableOnSubscribe 类中调用 onNext 时,我的订阅者的 onNext 和 onComplete 函数不运行
【发布时间】:2017-02-24 00:00:57
【问题描述】:

在一个使用 RxJava 2 的 Android 项目中,我在初始活动的 onCreate 中创建了一个类似这样的 Flowable

Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
        .doOnComplete(new MyAction())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new MySubscriber());

FlowableOnSubscribe 的实现是:

public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
    public static final String TAG = "XX MyFlOnSub1";

    @Override
    public void subscribe(FlowableEmitter<String> emitter) {
        Log.i(TAG, "subscribe");

        emitter.onNext("hello");
        emitter.onComplete();
    }
}

这是订阅者的实现:

public class MySubscriber implements Subscriber<String> {
    public static final String TAG = "XX MySubscriber";

    @Override
    public void onSubscribe(Subscription s) {
        Log.i(TAG, "onSubscribe");
    }

    @Override
    public void onComplete() {
        Log.i(TAG, "onComplete");
    }

    @Override
    public void onError(Throwable e) {
        Log.i(TAG, "onError");
    }

    @Override
    public void onNext(String s) {
        Log.i(TAG, "onNext: " + s);
    }
}

而动作实现是:

public class MyAction implements Action {
    public static final String TAG = "XX MyAction";

    @Override
    public void run() {
        Log.i(TAG, "run");
    }
}

在我的输出中,我期待来自onNext 的日志语句,但我没有看到。相反,这是我的全部输出:

02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run

这表明onNext 永远不会运行,onComplete 甚至也不会运行。但是MyAction 运行成功。

当我注释掉对 onNext 的调用时会发生以下情况:

02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete

在这种情况下onNext当然不会运行,但至少onComplete会运行。

我希望看到onComplete 在这两种情况下都会运行,而onNext 在我调用emitter.onNext 时会运行。我在这里做错了什么?

【问题讨论】:

    标签: java android rx-java rx-android


    【解决方案1】:

    您需要手动发出请求,否则直接扩展Subscriber时不会发出任何数据:

    @Override
    public void onSubscribe(Subscription s) {
        Log.i(TAG, "onSubscribe");
        s.request(Long.MAX_VALUE);
    }
    

    或者,您可以扩展 DisposableSubscriberResourceSubscriber

    【讨论】:

    • 这似乎是我的问题。当我从使用 RxJava 1 的 Observer 切换到 RxJava 2 的 Subscriber 时,我不知道我必须在 Subscription 上调用 request 才能真正获取事件。谢谢。
    【解决方案2】:

    你是如何测试它的?您的主线程是否可能在 Observable 有机会发出结果之前退出,因为您正在使用 Schedulers.IO 订阅线程。此外,您的 observeOn 不会做任何事情,因为它仅用于进一步的下游操作员。

    【讨论】:

    • 我不确定observeOn 的下游运营商指的是什么。我一直在通过在平板电脑上运行 Android 应用程序并在 Android Monitor 中查看 logcat 中的输出来对此进行测试。我想我对 Android 线程不是很熟悉。我尝试将Schedulers.io() 用于observeOnsubscribeOn,没有任何变化。为两者设置AndroidSchedulers.mainThread() 也没有效果。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多