【发布时间】:2017-04-29 03:18:18
【问题描述】:
onNext() 和 onCompleted() 都不会在下面为我的订阅者调用。我尝试通过 doOnNext()/doOnTerminate() 实现订阅者。我也试过doAfterTerminate()。我也尝试过明确定义订阅者,但 onNext() 和 onCompleted() 都没有被调用。
根据RxJS reduce doesn't continue,reduce() 没有终止,所以我尝试添加 take(1),但没有奏效。在同一个 stackoverflow 问题中,有人说问题可能是我的流永远不会关闭。除了 take(1),也许还有其他方法可以关闭流,但我对 ReactiveX 的理解还不够好。
根据Why is OnComplete not called in this code? (RxAndroid),可能是系列中的原始流没有终止。但我不明白为什么如果我调用我认为应该发出终止信号的 take(1) 会很重要。
基本上,为什么不执行以下行?
System.out.println("doOnNext map.size()=" + map.size());
即使下面的代码行被执行了 98 次:
map.put(e.getKey(), e.getValue());
JsonObjectObservableRequest.java
import com.android.volley.toolbox.JsonObjectRequest;
...
public class JsonObjectObservableRequest {
public JsonObjectObservableRequest(int method, String url, JSONObject request) {
jsonObjectRequest = new JsonObjectRequest(method, url, request, getResponseListener(), getResponseErrorListener());
}
private Response.Listener<JSONObject> getResponseListener() {
return new Response.Listener<JSONObject>() {
@Override
public void onResponse(JSONObject response) {
publishSubject.onNext(Observable.just(response));
}
};
}
private Response.ErrorListener getResponseErrorListener() {
return new Response.ErrorListener() {
@Override
public void onErrorResponse(VolleyError error) {
Observable<JSONObject> myError = Observable.error(error);
publishSubject.onNext(myError);
}
};
}
public JsonObjectRequest getJsonObjectRequest() {
return jsonObjectRequest;
}
public Observable<JSONObject> getObservable() {
return publishSubject.flatMap(new Func1<Observable<JSONObject>, Observable< JSONObject>>() {
@Override
public Observable<JSONObject> call(Observable<JSONObject> jsonObjectObservable) {
return jsonObjectObservable;
}
});
}
}
JsonObjectObservableRequest 调用代码
import com.android.volley.Request;
import com.android.volley.RequestQueue;
...
JsonObjectObservableRequest jsonObjectObservableRequest = new JsonObjectObservableRequest(Request.Method.GET, idURLString, null, keyId, key);
Observable<JSONObject> jsonObjectObservable = jsonObjectObservableRequest.getObservable();
jsonObjectObservable
.map(json -> {
try {
return NetworkAccountIdDatasource.parseIdJSON(json);
} catch (JSONException e) {
e.printStackTrace();
return null;
}
})
.flatMapIterable(x -> x)
.map(s -> new AbstractMap.SimpleEntry<String, String>("Name of " + s, "short id for " + s.substring(4)))
.reduce(new HashMap<String, String>(), (map, e) -> {
map.put(e.getKey(), e.getValue());
return map;
})
.take(1)
.doOnNext(map -> {
System.out.println("doOnNext map.size()=" + map.size());
})
.doOnTerminate(() -> {
System.out.println("doOnTerminate");
})
.subscribe();
final RequestQueue queue = Volley.newRequestQueue(context);
queue.add(jsonObjectObservableRequest.getJsonObjectRequest());
【问题讨论】:
-
为什么你还在使用 volley,retrofit2 是新方法!
-
谢谢@Remario。你怎么知道的?
-
很好,它不那么冗长,速度更快(okHttp3)并使用java注释处理器。我的意思是现在每个 android 开发者都在切换。
-
请仔细阅读,您会喜欢的!
-
更好的理由是Retrofit2直接支持直接提供Observable的调用适配器
标签: android rx-java rx-android reactivex