【问题标题】:Why does this observable emit only one value为什么这个 observable 只发出一个值
【发布时间】:2014-10-30 01:17:49
【问题描述】:

下面的代码基于@a.bertucci 在此处Emit objects for drawing in the UI in a regular interval using RxJava on Android 提供的示例,其中我使用计时器压缩了一个 Observable。当我通过调用 processDelayedItems() 触发订阅时,压缩后的 Observable 中的代码 [A] 只执行一次,并且将一项发送到 [B]。我本来希望代码 [A] 在触发后连续运行,并每 1500 毫秒持续发射项目,但它显然只在这里运行一次。

private static void processDelayedItems() {

    Observable.zip(
            Observable.create(new Observable.OnSubscribe<Object>() {

                @Override public void call(Subscriber<? super Object> subscriber) {
                    // [A] this code is only called once
                    subscriber.OnNext(o)
                }

            }),
            Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() {
                @Override public Object call(Object entity, Long aLong) {
                    return entity;
                }
            }
    )
    .subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Object>() {

                @Override public void call(Object entity) {
                    // ... and accordingly one item is emitted [B]
                }

            }, new Action1<Throwable>() {

                @Override public void call(Throwable throwable) {
                    throwable.printStackTrace();
                }

            }, new Action0() {

                @Override public void call() {

                }

            });

}
  1. 谁能看到我在这里遇到的问题?是不是我需要从函数外部引用 Observable 才能让它存活更多时间?它是由 GC (Android) 收集的吗?函数是静态的有问题吗?

  2. Observable 的生存时间规则是什么?是否有任何最佳实践应该如何引用运行时间更长的 Observable 以及它们是否可以是静态的?在我的测试中,我注意到这并不重要,但也许在这里,当涉及计时器时。

--

更正的代码[尚未工作]:

  • 添加了重复()

    Observable.zip(
            Observable.create(new Observable.OnSubscribe<Object>() {
    
                @Override public void call(Subscriber<? super Object> subscriber) {
                    // [A] this code is only called once
                    subscriber.OnNext(o);
                    subscriber.OnCompleted();
                }
    
            }).repeat(Schedulers.newThread()),
            Observable.timer(1500, 1500, TimeUnit.MILLISECONDS), new Func2<Object, Long, Object>() {
                @Override public Object call(Object entity, Long aLong) {
                    return entity;
                }
            }
    )
    .subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Action1<Object>() {
    
                @Override public void call(Object entity) {
                    // ... and accordingly one item is emitted [B]
                }
    
            }, new Action1<Throwable>() {
    
                @Override public void call(Throwable throwable) {
                    throwable.printStackTrace();
                }
    
            }, new Action0() {
    
                @Override public void call() {
    
                }
    
            });
    

【问题讨论】:

    标签: android rx-java


    【解决方案1】:

    你需要repeat 来生成一个无限的 Observable。例如,

        Observable.create(new Observable.OnSubscribe<Object>() {
    
            @Override public void call(Subscriber<? super Object> subscriber) {
                // [A] this code is only called once
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(o);
                }
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            }
    
        }).repeat(Schedulers.newThread());
    

    我是否需要从函数外部引用 Observable 才能使其存活更长时间?它是由 GC (Android) 收集的吗?函数是静态的有问题吗?

    由于您使用了Schedulers.newThread()timer,因此会有一些其他线程引用了您的 Observable。你不需要更多的工作。

    Observable 的生存时间规则是什么?是否有任何最佳实践应该如何引用运行时间更长的 Observable 以及它们是否可以是静态的?在我的测试中,我注意到这并不重要,但也许在这里,当涉及计时器时。

    你是对的。没关系。

    【讨论】:

    • 感谢您的详细回复。这是有道理的,因为 Y 组合器会在两侧寻找可观察对象,如果一侧没有可压缩的东西,则不会发出任何内容。
    • 我刚刚测试过——不知道为什么——Observable 仍然只发射一次。我更正的代码在我原来的帖子下面。即使我不这么认为,请检查我是否已将 repeat() 添加到正确的 Observable 中。
    • 另一个问题:有没有办法以不同的方式实现相同的目标?就像不使用 zip 一样,让计时器/间隔 Observable 定期发出间隔,然后触发另一个 Observable 在计时器/间隔 Oberservable 后面连续切换?
    • 您忘记在您的subscriber.onNext(o); 之后添加subscriber.onCompleted();。如果原始 Observable 没有先完成,Repeat 将无法开始重复。
    • 对不起,我看到你有这个,但我没有意识到我必须调用 OnCompleted() 才能再次启动 Observable。在我调用 OnCompleted() 之后,Observable 会定期发出,但它不再尊重计时器,例如if 尽可能快地发射。我已经更新了上面的代码。
    【解决方案2】:

    关于您的评论,为简单起见,您可以这样做,

    Observable.timer(1500, 1500, TimeUnit.MILLISECONDS)
            .flatMap(new Func1<Long, Observable<Object>>() {
                @Override
                public Observable<Object> call(Long aLong) {
                    String o = "0";
                    return Observable.from(o);
                }
            })
            .subscribe(new Action1<Object>() {
                @Override
                public void call(Object aLong) {
                    System.out.println(aLong);
                }
            });
    

    在这里,您仍然可以获得计时器的好处,而无需在顶部添加拉链/重复。它仍然有点冗长,但它有点简单。

    【讨论】:

    • 这看起来好多了。当我使用 Observable.from(o);其中 o 是 Object 类型,它告诉我“来自”已贬值。所以我把它改成了 Observable.from(new Object[] { entity }); - 以防万一有人读到这个。
    • 使用just 代替已弃用的from
    • @Miguel Lavigne,为什么不是map
    • @zsxwing,我使用 flatMap 是因为使用 Observable 作为入口点的问题是数据 Observable。我没有假设 Observable 是如何被使用的。如果他愿意,他可以使用 flatMap 从另一个类中获取 Observable。另一方面,如果 Object 是内联返回的,那么 map 也可以工作。
    • 感谢两位的解决方案。我在这里做了这个,它做了我需要做的事情。我仍然不明白为什么其他解决方案不能正常工作,但它可能与实现细节有关。 - 我没有将任何标记为正确,因为这对我有用,但另一个更接近我原来的问题。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-03-21
    • 2012-04-04
    • 1970-01-01
    • 2021-08-14
    • 2022-01-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多