【问题标题】:Requerying and updating subscribers with RxJava2使用 RxJava2 重新查询和更新订阅者
【发布时间】:2017-05-04 19:56:19
【问题描述】:

我的应用中有一个由改造服务支持的数据层。 (到目前为止,只有网络上的持久性。当我进一步开发时,我将添加离线优先的本地存储)

当我调用服务器时,Retrofit 会向我返回 Observable<List<Item>>。这很好用。在我的订阅者中,我在订阅时会收到列表,然后可以使用 Items 填充我的 UI。

我遇到的问题是:如果列表被修改(通过某种外部机制),我如何让 observable 重新查询改造服务并发出新的项目列表。我知道数据已过时,但我不确定如何启动重新查询。

这是我的DataManager的精简版

class DataManager {

    // Retrofit
    RetrofitItemsService itemsService;

    // The observalble provided by retrofit
    Observable<List<Item>> itemsObservable;

    //ctor
    public DataManager(RetrofitItemsService itemsService) {
        this.itemsService = itemsService;
    }

    /* Creates and stores an observable if one has not been created yet.
     * Returns the observable so that it can be subscribed to by the function caller
     */
    public Observable<List<Item>> getItems(){
        if(itemsObservable == null){
            itemsObservable = itemsService.getItems();
        }

        return itemsObservable;
    }

    /* Adds a new Item to the list.
     */
    public Completable addItem(Item item){
        Completable call = itemsService.addItem(item);

        call.subscribe(()->{
            /*
             < < < Here > > >
             If someone has previously called getItems before this item was added, they now have stale data.

             How can I call something like:

             itemsObservable.refreshAllSubscribers()
            */
        });

        return call;
    }
}

【问题讨论】:

    标签: android retrofit rx-java reactive-programming rx-java2


    【解决方案1】:

    您在这里解决的问题在于 hotcold 可观察到的区别。您可以在 Google 上搜索很多很棒的文章,其中详细描述了这些差异,所以让我只描述基础知识。

    Cold observable 为 每个 订阅者创建一个新的生产者。这意味着,当两个不同的订阅者订阅同一个冷可观察对象时,他们每个人都会收到这些排放的不同实例。它们可能(!)是相等的,但它们永远是不同的对象。在此处应用于您的案例,每个订阅者都有自己的生产者,该生产者向服务器请求数据并将其传递到流中。每个订阅者都会收到来自其自己的生产者的数据。

    Hot observable 与其所有观察者共享一个生产者。例如,如果生产者正在迭代对象集合,则在发射过程中加入第二个订阅者意味着它将仅获得之后发射的项目(如果它没有通过像 replay 这样的运算符进行修改)。任何订阅者收到的每个对象在所有观察者中都是相同的实例,因为它来自单个生产者。

    因此,从外观上看,您需要有一个 hot observable 来分发您的数据,这样当您知道它不再有效时,您只需通过这个 hot observable 发出一次,每个观察者都会对更新感到满意。

    幸运的是,将可观察到的冷态转变为热态通常没什么大不了的。您可以创建自己的生产者来模仿这种行为,使用流行的运算符之一,例如share,或者您可以只转换流,使其表现得像一个。

    我建议使用 PublishSubject 刷新数据并将其与原始冷可观察数据合并,如下所示:

    class DataManager {
    
        .....
    
        PublishSubject<Boolean> refreshSubject = PublishSubject.create();
    
        // The observable for retrieving always fresh data
        Observable<List<Item>> itemsObservable;
    
        //ctor
        public DataManager(RetrofitItemsService itemsService) {
            this.itemsService = itemsService;
            itemsObservable = itemsService.getItems()
                                  .mergeWith(refreshSubject.flatMap(refresh -> itemsService.getItems()))
        }
    
    
        public Observable<List<Item>> getItems(){
            return itemsObservable;
        }
    
        /* Adds a new Item to the list.
         */
        public Completable addItem(Item item){
            Completable call = itemsService.addItem(item);
    
            call.subscribe(()->{
                refreshSubject.onNext(true);
            });
    
            return call;
        }
    }
    

    【讨论】:

      【解决方案2】:

      我猜 itemsService.getItems() 返回单个元素 Observable 因此消费者必须重新订阅才能获得新数据,并且他们会得到它,因为 Retrofit Observables 也是延迟/惰性的。

      您可以通过PublishSubject 的帮助拥有一个单独的“长”Observable,您可以在数据更改时触发它:

      final Subject<Object> onItemsChanged = PublishSubject.create().toSerialized();
      
      public Observable<Object> itemsChanged() {
          return onItemsChanged;
      }
      
      public Completable addItem(Item item){
          Completable call = itemsService.addItem(item);
      
          // prevent triggering the addItem multiple times
          // Needs RxJava 2 Extensions library for now
          // as there is no Completable.cache() or equivalent in 2.0.3
          CompletableSubject cs = CompletableSubject.create();
      
          call.doOnComplete(() -> onItemsChanged.onNext("changed"))
          .subscribe(cs);
      
          return cs;
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2021-03-03
        • 1970-01-01
        • 2011-08-21
        • 1970-01-01
        • 1970-01-01
        • 2022-06-15
        • 2019-05-19
        相关资源
        最近更新 更多