【问题标题】:rxjava - getting response and inserting diff parallellyrxjava - 获取响应并并行插入差异
【发布时间】:2017-02-10 20:57:13
【问题描述】:

我使用rxjava 来并行处理两个使用Observable.zip 的请求。我想做的是,在一个observable say response 我得到一个响应,而在另一个observable say diff 我试图得到响应并将这种差异保存在数据库中。问题是我不确定如何满足我的要求,因为如果response observable 得到响应,diff observable 没有完成

这就是我正在做的......

public ServiceResponse getDummyResponse(ServiceRequest serviceRequest, String prodId){
    Observable<ServiceResponse> subInfoDummyObservable = getDummyResonseGenericObservable();
    Observable<ServicesDiff> reObservable = getServicesDiffGenericObservable(serviceRequest, prodId);

    Observable<ServiceResponse> responseObservable = Observable.zip(
            subInfoDummyObservable,
            reObservable,
            new Func2<ServiceResponse, ServicesDiff, ServiceResponse>() {
                @Override
                public ServiceResponse call(ServiceResponse serviceResponse, ServicesDiff diffResponse) {
                    return serviceResponse;
                }
            }
    );

    ServiceResponse serviceResponse = responseObservable.toBlocking().single();

    return serviceResponse;
}

Observable<ServiceResponse> getDummyResonseGenericObservable() {
    return GenericHystrixCommand.toObservable("getDummyResonseGenericObservable", "getDummyResonseGenericObservable", () -> new ServiceResponse(),(t) -> {return null;} );
}

Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) {
    return GenericHystrixCommand.toObservable("getServicesDiffGenericObservable", "getServicesDiffGenericObservable", () -> getBothServiceResponses(serviceRequest, prodId),(t) -> {return null;} );
}

public ServicesDiff getBothServiceResponses(ServiceRequest serviceRequest, String prodId) {
    Observable<String> service1ResponseObservable = getService1GenericObservable(prodId);
    Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId);

    Observable<ServicesDiff> observable = Observable.zip(
            service1ResponseObservable, service2ResponseObservable,
            new Func2<String, ServiceResponse, ServicesDiff>() {
                @Override
                public ServicesDiff call(String service1Response, ServiceResponse service2Response) {
                    return aggregate(service1Response, service2Response); // never reaches this point**********
                }
            }
    );
    ServicesDiff response = observable.toBlocking().single();

    return response;
}

我在aggregate 方法中将差异插入数据库,但它根本没有达到aggregate。请让我知道我在这里做错了什么?谢谢。

【问题讨论】:

  • 您的代码示例对我来说不是很清楚,问题所在的 getBothServiceResponses 方法与其余代码之间的关系是什么?您在 getBothServiceResponses 压缩的两个 observable 是什么?
  • 我同意,代码需要澄清。前三个方法永远不会被调用,所以我们不知道第四个方法 getBothServiceResponses() 中的 observables 是什么样子的,你的问题出在哪里

标签: java asynchronous rx-java observable


【解决方案1】:

Observable 是如何消费数据的描述。在您的代码示例中,您没有订阅,也没有实际使用数据。您刚刚描述了如何请求,但是缺少订阅部分,即触发请求的部分。

所以,如果我重写一点你的代码:

class Aggregate {
    Aggregate(String reponse, ServicesDiff diff) {
        ...
    }
}

Observable<String> getService1GenericObservable(String prodId) {
    ...
}

Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) {
    ...
}

public Observable<Aggregate> getBothServiceResponses(ServiceRequest serviceRequest, String prodId) {
    Observable<String> service1ResponseObservable = getService1GenericObservable(prodId);
    Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId);

    return Observable<Aggregate> observable = Observable.zip(
            service1ResponseObservable, service2ResponseObservable,
            new Func2<String, ServiceResponse, ServicesDiff>() {
                @Override
                public ServicesDiff call(String service1Response, ServiceResponse service2Response) {
                    return aggregate(service1Response, service2Response);
                }
            }
    );
}

您只需执行此操作即可访问结果聚合:

getBothServiceResponses(serviceRequest, prodId).subscribe(...)

【讨论】:

    猜你喜欢
    • 2014-12-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-10-06
    • 1970-01-01
    • 1970-01-01
    • 2018-02-11
    • 2018-02-27
    相关资源
    最近更新 更多