【问题标题】:RxJava Add new objects into streamRxJava 将新对象添加到流中
【发布时间】:2015-04-13 15:52:31
【问题描述】:

我正在尝试将新对象注入到可能存在的流中。

假设我有这个:

ItemUtils.java

public static void processItems(List<Item> items) {
    Observable.from(items)
        .subscribeOn(Schedulers.io())
        .flatMap(ItemUtils::doSomeHeavyProcessing)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();
}

在另一个班级:

List<Item> items = new ArrayList<>();

public void onClick() {
    processItems(list);
}

是否可以在不耐烦的用户每次按下按钮时将项目注入到一个唯一的项目流中?

【问题讨论】:

    标签: java concurrency rx-java rx-android


    【解决方案1】:

    如果 List items 是一种队列,用户可以在 processItems 已经启动时添加更多 Items,你应该使用Subjects

    Queue<Item> items = new LinkedList<>();
    PublishSubject<Item> subject = PublishSubject.create(); 
    
    ...
    //subscribe once and supply new items in onClick
    subject.subscribeOn(Schedulers.io())
        .flatMap(ItemUtils::doSomeHeavyProcessing)
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();
    
    ...
    public void onClick() {
        while(!queue.isEmpty()){
            subject.onNext(queue.remove());
        }
    }
    

    【讨论】:

      【解决方案2】:

      有可能,但这个问题缺乏精确性,所以这个答案很笼统。

      基本思想是使用 [concatMap](http://reactivex.io/RxJava/javadoc/rx/Observable.html#concat(rx.Observable, rx.Observable)) 或此运算符的变体。

      因此,例如,取决于代码真正需要做什么:

      public static void processItems(List<Item> items) {
          Observable<ProcessdItem> processedItems = Observable.from(items)
              .subscribeOn(Schedulers.io())
              .flatMap(ItemUtils::doSomeHeavyProcessing)
              .observeOn(AndroidSchedulers.mainThread());
      
          Observable.concat(
              Observable.just(ProcessedItems.preProcessed()),
              processedItems
          ).subscribe();
      }
      

      或者在嵌套的 observable 中。

      public static void processItems(List<Item> items) {
          Observable.from(items)
              .subscribeOn(Schedulers.io())
              .flatMap(i ->
                  Observable.just(ProcessedItems.preProcessed())
                      .concatWith(ItemUtils::doSomeHeavyProcessing))
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe();
      }
      

      【讨论】:

        猜你喜欢
        • 2022-01-16
        • 1970-01-01
        • 2021-09-06
        • 2016-01-14
        • 1970-01-01
        • 2021-04-15
        • 2015-02-07
        • 2018-02-08
        • 2013-10-10
        相关资源
        最近更新 更多