【问题标题】:Why doesn't doOnNext() get called?为什么不调用 doOnNext() ?
【发布时间】:2017-04-29 03:18:18
【问题描述】:

onNext() 和 onCompleted() 都不会在下面为我的订阅者调用。我尝试通过 doOnNext()/doOnTerminate() 实现订阅者。我也试过doAfterTerminate()。我也尝试过明确定义订阅者,但 onNext() 和 onCompleted() 都没有被调用。

根据RxJS reduce doesn't continue,reduce() 没有终止,所以我尝试添加 take(1),但没有奏效。在同一个 stackoverflow 问题中,有人说问题可能是我的流永远不会关闭。除了 take(1),也许还有其他方法可以关闭流,但我对 ReactiveX 的理解还不够好。

根据Why is OnComplete not called in this code? (RxAndroid),可能是系列中的原始流没有终止。但我不明白为什么如果我调用我认为应该发出终止信号的 take(1) 会很重要。

基本上,为什么不执行以下行?

System.out.println("doOnNext map.size()=" + map.size());

即使下面的代码行被执行了 98 次:

map.put(e.getKey(), e.getValue());

JsonObjectObservableRequest.java

import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {

    public JsonObjectObservableRequest(int method, String url, JSONObject request) {
    jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(),     getResponseErrorListener());
    }

    private Response.Listener<JSONObject> getResponseListener() {
        return new Response.Listener<JSONObject>() {
            @Override
            public void onResponse(JSONObject response) {
                publishSubject.onNext(Observable.just(response));
            }
        };
    }

    private Response.ErrorListener getResponseErrorListener() {
        return new Response.ErrorListener() {
            @Override
            public void onErrorResponse(VolleyError error) {
                Observable<JSONObject> myError = Observable.error(error);
                publishSubject.onNext(myError);
            }
        };
    }

    public JsonObjectRequest getJsonObjectRequest() {
        return jsonObjectRequest;
    }

    public Observable<JSONObject> getObservable() {
    return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable<    JSONObject>>() {
            @Override
            public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
                return jsonObjectObservable;
            }
        });
    }
}

JsonObjectObservableRequest 调用代码

import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
    JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
    Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();

    jsonObjectObservable
            .map(json -> {
                try {
                    return NetworkAccountIdDatasource.parseIdJSON(json);
                } catch (JSONException e) {
                    e.printStackTrace();
                    return null;
                }
            })
            .flatMapIterable(x -> x)
            .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
            .reduce(new HashMap<String, String>(), (map, e) -> {
                map.put(e.getKey(), e.getValue());
                return map;
            })
            .take(1)
            .doOnNext(map -> {
                System.out.println("doOnNext map.size()=" + map.size());
            })
            .doOnTerminate(() -> {
                System.out.println("doOnTerminate");
            })
            .subscribe();

    final RequestQueue queue = Volley.newRequestQueue(context);
    queue.add(jsonObjectObservableRequest.getJsonObjectRequest());

【问题讨论】:

  • 为什么你还在使用 volley,retrofit2 是新方法!
  • 谢谢@Remario。你怎么知道的?
  • 很好,它不那么冗长,速度更快(okHttp3)并使用java注释处理器。我的意思是现在每个 android 开发者都在切换。
  • 请仔细阅读,您会喜欢的!
  • 更好的理由是Retrofit2直接支持直接提供Observable的调用适配器

标签: android rx-java rx-android reactivex


【解决方案1】:

您正确识别了问题,您没有在源网络Observable 上调用onCompleted()(在JsonObjectObservableRequest 类),而take(1) 也没有帮助,因为您将它放在之前reduce
正如您可能理解的那样,reduce 必须以有限的发射次数对Observable 进行操作,因为它会在所有项目都已发射时发射累积的项目,因此它依赖于onCompleted() 事件来了解观察到的流何时结束。

  • 我认为在你的情况下,最好的办法是改变JsonObjectObservableRequest的操作方式,你不需要Subject,你可以使用创建方法来包装回调(你可以看到我的答案here,阅读更多关于回调世界与 RxJava 之间的桥接here
  • 此外,您不需要先发出 Observable 的东西,然后将 flatmap 它发送到项目,您可以简单地使用 onNext() 发出项目并使用 onError() 发出错误,实际上你是隐藏错误并将它们转换为发射,这可能会使下游的错误更难处理。
  • 您应该在致电onNext() 后致电onCompleted()onResponse() 以表示完成。如果发生错误,onError 将通知流终止。
  • 另一个问题是取消似乎没有以这种方式处理的请求,您可以阅读源代码以了解在包装 callbacl 调用时如何完成。

【讨论】:

    【解决方案2】:

    我找到了一种方法来调用 doOnTerminate() 和 doOnNext()。它将 take(1) 语句放在流处理中更高的位置。我根据@yosriz 的评论得到了这个想法。我希望我明白为什么这能解决问题,但我不明白,因为我认为终止信号会被传播,但我想我需要了解更多关于 ReactiveX / 反应式函数式编程 / 等的知识。

    JsonObjectObservableRequest 调用代码

    import com.android.volley.Request;
    import com.android.volley.RequestQueue;
    ...
        JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
        Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();
    
    jsonObjectObservable
            .map(json -> {
                try {
                    return NetworkAccountIdDatasource.parseIdJSON(json);
                } catch (JSONException e) {
                    e.printStackTrace();
                    return null;
                }
            })
            .take(1) // SOLUTION: MOVED THIS UP FROM BELOW reduce()
            .flatMapIterable(x -> x)
            .map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
            .reduce(new HashMap<String, String>(), (map, e) -> {
                map.put(e.getKey(), e.getValue());
                return map;
            })
            // .take(1) // SOLUTION: MOVE THIS ABOVE flatMapIterable() 
            .doOnNext(map -> {
                System.out.println("doOnNext map.size()=" + map.size());
            })
            .doOnTerminate(() -> {
                System.out.println("doOnTerminate");
            })
            .subscribe();
    
    final RequestQueue queue = Volley.newRequestQueue(context);
    queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
    

    此外,如果我将 take(1) 作为第一个操作,在第一个 map() 操作(调用 parseIdJSON() 的那个)之前,它也可以工作。但是,如果我将 take(1) 作为 reduce() 之前的操作,它只能部分起作用;好事是 doOnNext() 被调用,但坏事是 map.size() 在我的数据集应该为 98 时始终为 1。

    【讨论】:

    • 您是否仔细阅读了我的回答?您没有在 JsonObjectObservableRequest 上调用 onCompleted,您的 Observable 永远不会结束(完成)= reduce 无法工作(只是因为它永远不会获得完成事件)
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-06-16
    • 2016-07-15
    • 2020-03-28
    • 2019-03-18
    • 2011-05-04
    • 2019-11-04
    相关资源
    最近更新 更多