【问题标题】:Rx Observable emitting values periodicallyRx Observable 定期发射值
【发布时间】:2014-07-03 15:00:03
【问题描述】:

我必须定期轮询一些 RESTful 端点以刷新我的 android 应用程序的数据。我还必须根据连接暂停和恢复它(如果手机离线,甚至不需要尝试)。我当前的解决方案正在运行,但它使用标准 Java 的 ScheduledExecutorService 来执行周期性任务,但我想留在 Rx 范式中。

这是我当前的代码,为简洁起见,省略了部分代码。

userProfileObservable = Observable.create(new Observable.OnSubscribe<UserProfile>() {
    @Override
    public void call(final Subscriber<? super UserProfile> subscriber) {
        final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        final Runnable runnable = new Runnable() {
            @Override
            public void run() {
                // making http request here
            }
        };
        final List<ScheduledFuture<?>> futures = new ArrayList<ScheduledFuture<?>>(1);
        networkStatusObservable.subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean networkAvailable) {
                if (!networkAvailable) {
                    pause();
                } else {
                    pause();                        
                    futures.add(scheduledExecutorService.scheduleWithFixedDelay(runnable, 0, SECOND_IN_MILLIS * SECONDS_TO_EXPIRE, TimeUnit.MILLISECONDS));
                }
            }

            private void pause() {
                for (ScheduledFuture<?> future : futures) {
                    future.cancel(true);
                }
                futures.clear();
            }
        });

        final Subscription subscription = new Subscription() {
            private boolean isUnsubscribed = false;

            @Override
            public void unsubscribe() {
                scheduledExecutorService.shutdownNow();
                isUnsubscribed = true;
            }

            @Override
            public boolean isUnsubscribed() {
                return isUnsubscribed;
            }
        };
        subscriber.add(subscription);
    }
}).multicast(BehaviorSubject.create()).refCount();

networkStatusObservable基本上是一个广播接收器,包裹在Observable&lt;Boolean&gt;中,表示手机已联网。

正如我所说,这个解决方案是有效的,但我想使用 Rx 方法进行定期轮询并发出新的UserProfiles,因为手动安排事情有很多问题,我想避免这些问题。我知道Observable.timerObservable.interval,但不知道如何将它们应用于此任务(我不确定是否需要使用它们)。

【问题讨论】:

    标签: android rx-java


    【解决方案1】:

    在这个 GitHub 问题上有一些方法可能对您有所帮助。

    https://github.com/ReactiveX/RxJava/issues/448

    三个实现分别是:


    Observable.interval

    Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
            .flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
                public Observable<Notification<AppState>> call(Long seconds) {
                    return lyftApi.updateAppState(params).materialize(); } });
    

    Scheduler.schedulePeriodically

    Observable.create({ observer ->
            Schedulers.newThread().schedulePeriodically({
                observer.onNext("application-state-from-network");
            }, 0, 1000, TimeUnit.MILLISECONDS);
        }).take(10).subscribe({ v -> println(v) });
    

    Manual Recursion

    Observable.create(new OnSubscribeFunc<String>() {
            @Override
            public Subscription onSubscribe(final Observer<? super String> o) {
                return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
                    @Override
                    public Subscription call(Scheduler inner, Long t2) {
                        o.onNext("data-from-polling");
                        return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
                    }
                });
            }
        }).toBlockingObservable().forEach(new Action1<String>() {
            @Override
            public void call(String v) {
                System.out.println("output: " + v);
            }
        });
    

    结论是手动递归是可行的方法,因为它会等到操作完成后再安排下一次执行。

    【讨论】:

    • 手动递归确实是一种有趣的技术,虽然目前还不清楚它如何与非阻塞 HTTP 客户端一起工作。
    • 这些示例基于什么版本的 RxJava? OnSubscribeFunc 在 rxjava 1.0.4 中不存在。
    • 正如问题所说,这个例子是从 2013 年的 Github issue 中提取的。无法分辨使用的是什么外翻。
    • 在第一种方法中,如何在内部类中访问参数?在第二种方法中,您正在使用 newThread() ,这将不必要地创建一个新线程,并且您将无法使用自己的线程池。在第三种方法中,您将 Observable 转换为 Blocking,这意味着将失去使用 RX 的优势。
    【解决方案2】:

    其中一个选项是使用 Observable.interval 并在发出间隔时检查用户状态:

         Observable<Long> interval = Observable.interval(1, TimeUnit.SECONDS);
    
        //pulling the user data
        Observable<Observable<String>> userObservable = interval.map(new Func1<Long, Observable<String>>() {
            Random random = new Random();
            @Override
            public Observable<String> call(Long tick) {
                //here you are pulling user data; you should do it asynchronously - rx way - because the interval is using Schedulers.computation which is not best suited for doing io operations
                switch(random.nextInt(10)){
                    case 0://suppose this is for cases when network in  not available or exception happens
                        return Observable.<String>just(null);
                    case 1:
                    case 2:
                        return Observable.just("Alice");
                    default:
                        return Observable.just("Bob");
                }
            }
        });
    
        Observable<String> flatUsers = userObservable.flatMap(new Func1<Observable<String>, Observable<? extends String>>() {
            @Override
            public Observable<? extends String> call(Observable<String> stringObservable) {
                return stringObservable;
            }
        });
    
        //filter valid data
        Observable<String> usersWithoutErrors = flatUsers.filter(new Func1<String, Boolean>() {
            @Override
            public Boolean call(String s) {
                return s != null;
            }
        });
    
        //publish only changes
        Observable<String> uniqueUsers = usersWithoutErrors.distinctUntilChanged();
    

    如果您的 networkStatusObservable 发出事件的频率至少与您检查用户数据所需的频率一样,那么您可以更简单地进行操作

     networkStatusObservable.sample(1,TimeUnit.Seconds).filter(/*the best is to filter only connected state */).map(/*now start pulling the user data*/)
    

    最后,您可以创建使用调度程序定期发出用户状态的 observable - 请参阅 Schedulers documentation 了解最适合您的调度程序:

    public abstract class ScheduledOnSubscribe<T> implements Observable.OnSubscribe<T>{
        private final Scheduler scheduler;
        private final long initialDelay;
        private final long period;
        private final TimeUnit unit;
    
        public ScheduledOnSubscribe(Scheduler scheduler, long initialDelay, long period, TimeUnit unit) {
            this.scheduler = scheduler;
            this.initialDelay = initialDelay;
            this.period = period;
            this.unit = unit;
        }
    
        abstract T next() throws Exception;
    
    
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            final Scheduler.Worker worker = scheduler.createWorker();
            subscriber.add(worker);
            worker.schedulePeriodically(new Action0() {
                @Override
                public void call() {
                    try {
                        subscriber.onNext(next());
                    } catch (Throwable e) {
                        try {
                            subscriber.onError(e);
                        } finally {
                            worker.unsubscribe();
                        }
                    }
                }
    
            }, initialDelay, period, unit);
        }
    }
    
    //And here is the sample usage
     Observable<String> usersObservable = Observable.create(new ScheduledOnSubscribe(Schedulers.io(), 1, 1, TimeUnit.SECONDS ){
            Random random = new Random();
            @Override
            String next() throws Exception {
                //if you use Schedulers.io, you can call the remote service synchronously
                switch(random.nextInt(10)){
                    case 0:
                        return null;
                    case 1:
                    case 2:
                        return "Alice";
                    default:
                        return "Bob";
                }
            }
        });
    

    【讨论】:

    • 嘿,感谢您抽出宝贵的时间来写这篇文章。不幸的是,我没有时间检查您的解决方案是否有效,因为我已经实施了自己的解决方案,我将其作为另一个答案发布。我的解决方案已经过测试并且正在运行。
    【解决方案3】:

    简短的回答。 RxJava2:

    Observable.interval(initialDelay, unitAmount, timeUnit)
                .subscribe(value -> {
                    // code for periodic execution
                });
    

    根据需要选择initialDelay、unitAmount和TimeUnit。

    示例:0、1、TimeUnit.MINUTES。

    【讨论】:

      【解决方案4】:

      有一个更简单的方法是使用interval()。我已经测试了这段代码并且它有效。 但首先,您应该将要定期执行的作业封装在 Action1 的子类中。

      class Act<T> implements Action1<T> {
           public Service service;
           public String data;
           public void call(T t){
               service.log(data); //the periodic job
           }
      }
      

      (为简洁起见,我将字段公开,但这是不可取的)。现在您可以通过以下方式安排它:

      Act<Long> act=new Act<>();
      act.data="dummy data";
      act.service=this;
      Observable.interval(0l, period, TimeUnit.SECONDS).subscribeOn(Schedulers.from(Executors.newFixedThreadPool(10))).subscribe((Action1<Long>)act);
      

      与其他答案中给出的方法不同,这不会在任何地方阻塞您的线程。这种方法允许我们将变量作为一种可变存储在 Action 中传递,这在后续调用中可能很方便。此外,通过这种方式,您可以在自己的线程池中订阅您的调用。

      【讨论】:

        【解决方案5】:

        好的,我将发布我自己的解决方案,也许有人会从中受益。我只会发布与问题相关的部分,省略 HTTP 和缓存内容。这是我的做法:

        private ConnectableObservable<Long> createNetworkBoundHeartbeatObservable(final Observable<Boolean> networkStatusObservable,
                                                                                  final Observable<Boolean> pauseResumeObservable) {
        
            final Observable<Boolean> pausableHeartbeatObservable = Observable.combineLatest(networkStatusObservable, pauseResumeObservable,
                    new Func2<Boolean, Boolean, Boolean>() {
                        @Override
                        public Boolean call(Boolean networkAvailable, Boolean mustPause) {
                            return mustPause && networkAvailable;
                        }
                    }
            ).distinctUntilChanged();
        
            final Observable<Boolean> hasToResumeObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
                @Override
                public Boolean call(Boolean networkAvailable) {
                    return networkAvailable;
                }
            });
            final Observable<Boolean> hasToStopObservable = pausableHeartbeatObservable.filter(new Func1<Boolean, Boolean>() {
                @Override
                public Boolean call(Boolean networkAvailable) {
                    return !networkAvailable;
                }
            });
        
        
            return pausableHeartbeatObservable.concatMap(new Func1<Boolean, Observable<Long>>() {
                @Override
                public Observable<Long> call(Boolean shouldResumeRequests) {
                    if (shouldResumeRequests) {
                        long timeToUpdate;
                        final Date oldestModifiedExpiresAt = cache.oldestModifiedExpiresAt();
                        timeToUpdate = Math.max(0, oldestModifiedExpiresAt.getTime() - System.currentTimeMillis());
                        Log.d(TAG, String.format("Have to restart updates, %d seconds till next update", timeToUpdate / SECOND_IN_MILLIS));
                        return Observable
                                .timer(timeToUpdate, SECONDS_TO_EXPIRE * SECOND_IN_MILLIS, TimeUnit.MILLISECONDS)
                                .takeUntil(hasToStopObservable);
                    } else {
                        Log.d(TAG, "Have to pause updates");
                        return Observable.<Long>never().takeUntil(hasToResumeObservable);
                    }
                }
            }).multicast(PublishSubject.<Long>create());
        }
        

        如您所见,暂停或恢复更新的条件变得更加复杂,添加了一个新的 Observable 以支持应用程序进入后台时暂停。

        那么解决方案的核心是concatMap 操作,它依次发出Observables(因此是concatMap,而不是flatMap,请参见这个问题:What is the difference between concatMap and flatMap in RxJava)。它会发出intervalnever Observables,具体取决于更新应该继续还是暂停。然后每个发出的 ObservabletakenUntil '相反' Observable 发出新值。

        ConnectableObservable 被返回是因为创建的Observable 很热,并且所有预期的订阅者都必须在它开始发出某些东西之前订阅它,否则初始事件可能会丢失。我稍后会打电话给connect

        如果有的话,我会根据投票接受我的答案或其他答案。

        【讨论】:

        • 你会在哪里进行异步网络调用?您能否详细说明您的示例以及如何使用它?谢谢!
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-09-21
        相关资源
        最近更新 更多