【发布时间】:2017-02-10 20:57:13
【问题描述】:
我使用rxjava 来并行处理两个使用Observable.zip 的请求。我想做的是,在一个observable say response 我得到一个响应,而在另一个observable say diff 我试图得到响应并将这种差异保存在数据库中。问题是我不确定如何满足我的要求,因为如果response observable 得到响应,diff observable 没有完成
这就是我正在做的......
public ServiceResponse getDummyResponse(ServiceRequest serviceRequest, String prodId){
Observable<ServiceResponse> subInfoDummyObservable = getDummyResonseGenericObservable();
Observable<ServicesDiff> reObservable = getServicesDiffGenericObservable(serviceRequest, prodId);
Observable<ServiceResponse> responseObservable = Observable.zip(
subInfoDummyObservable,
reObservable,
new Func2<ServiceResponse, ServicesDiff, ServiceResponse>() {
@Override
public ServiceResponse call(ServiceResponse serviceResponse, ServicesDiff diffResponse) {
return serviceResponse;
}
}
);
ServiceResponse serviceResponse = responseObservable.toBlocking().single();
return serviceResponse;
}
Observable<ServiceResponse> getDummyResonseGenericObservable() {
return GenericHystrixCommand.toObservable("getDummyResonseGenericObservable", "getDummyResonseGenericObservable", () -> new ServiceResponse(),(t) -> {return null;} );
}
Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) {
return GenericHystrixCommand.toObservable("getServicesDiffGenericObservable", "getServicesDiffGenericObservable", () -> getBothServiceResponses(serviceRequest, prodId),(t) -> {return null;} );
}
public ServicesDiff getBothServiceResponses(ServiceRequest serviceRequest, String prodId) {
Observable<String> service1ResponseObservable = getService1GenericObservable(prodId);
Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId);
Observable<ServicesDiff> observable = Observable.zip(
service1ResponseObservable, service2ResponseObservable,
new Func2<String, ServiceResponse, ServicesDiff>() {
@Override
public ServicesDiff call(String service1Response, ServiceResponse service2Response) {
return aggregate(service1Response, service2Response); // never reaches this point**********
}
}
);
ServicesDiff response = observable.toBlocking().single();
return response;
}
我在aggregate 方法中将差异插入数据库,但它根本没有达到aggregate。请让我知道我在这里做错了什么?谢谢。
【问题讨论】:
-
您的代码示例对我来说不是很清楚,问题所在的 getBothServiceResponses 方法与其余代码之间的关系是什么?您在 getBothServiceResponses 压缩的两个 observable 是什么?
-
我同意,代码需要澄清。前三个方法永远不会被调用,所以我们不知道第四个方法
getBothServiceResponses()中的 observables 是什么样子的,你的问题出在哪里
标签: java asynchronous rx-java observable