【问题标题】:RxJava not calling subscribe() but still working. How is that possible?RxJava 没有调用 subscribe() 但仍在工作。这怎么可能?
【发布时间】:2018-11-29 20:33:39
【问题描述】:

我正在使用 RxJava 和 Retrofit 来使用不同的端点。我正在使用几个微服务,它们都使用 RxJava 和 Retrofit 来使用其他服务。

我没有使用 Observables 的经验,因此我正在查看 Internet 上的一些示例以了解如何使用它并自己创建一些服务。我看到总是使用方法 subscribe() 。像这样的:

    @Setter
    @Getter
    private MovieDetail movieDetail;

    public Observable<Movies> observe() {
    allMoviesClientRetrofit
                        .getMovies()
                        .subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.computation())
            .subscribe(new Observer<Movies>() {

                @Override
                public void onCompleted() {

                    }

                @Override
                    public void onError(Throwable throwable) {

                    }

                    @Override
                    public void onNext(Movies movies) {
                    allMovies = movies;

            });

在我的工作服务中,我到处搜索,从未使用过 subscribe(),但一切正常。这怎么可能?

如您所见,在该示例中,我需要返回一个 Observable 以使我的个人代码与我的工作保持一致,但是如果我使用 subscribe() 方法,它会返回一个 Subscription 对象,这不起作用.

这是我工作代码的一部分,您可以在其中看到 subscribe() 从未被调用,但它可以工作

@GetMapping(
        value = "/something",
        produces = MediaType.APPLICATION_JSON_UTF8_VALUE
)
public Single<ResponseEntity<Something>> getSomething() {


    return retrieveSomethingFactory
                    .observe()
                    .toSingle()
                    .map(something -> {
                        return ResponseEntity
                                .status(httpStatus)
                                .body(something);
                    });


class retrieveSomethingFactoryImpl implements retrieveSomethingFactory

@Override
public Observable<Something> observe() {
    return Observable
        .defer(() -> {
            Observable<Something1> something1 = retrieveSomething1Factory
                        .call(link) // Retrofit call
                        .observe()
                        .subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.computation());

            Observable<Something2> something2 = retrieveSomething1Factory
                        .call(link) // Retrofit call
                        .observe()
                        .subscribeOn(Schedulers.io())
                        .observeOn(Schedulers.computation()); 

            return Observable
                        .zip(something1, something2.toList(), (something1, something2) -> {

                            ....
                            ....
                            ....        

                            return something;
                        });                                                       

谢谢

【问题讨论】:

    标签: java-8 rx-java2


    【解决方案1】:

    从您的retrieveSomethingFactory.observe() 调用返回的 Observable 似乎是一个热门的 observable,这意味着无论是否订阅,它都会发出项目。你可以阅读一篇关于冷热观测值的好文章here

    【讨论】:

    • 对不起,我不知道如何知道它是否是一个热门的 observable。你认为是为什么?
    • 在我链接的文章中都有描述。但基本上检查retrieveSomethingFactoryImpl 并查看它是否返回ConnectableObservableSubject。这些类包含即使没有订阅(观察者)也会发出项目的逻辑。
    • 感谢您的回答。实际上我更深入地研究了代码,我可以看到使用了一个冷的 Observable。他们实现了 Observable.Operator,覆盖方法 call() 并在此方法内部返回一个新订阅者,其中覆盖方法 onCompleted()、onError() 和 onError(),所以我现在了解它是如何工作的。谢谢
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-05-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多