【问题标题】:How to stop emission from PublishSubject when buffer is empty缓冲区为空时如何停止从 PublishSubject 发射
【发布时间】:2019-06-26 07:26:00
【问题描述】:

我正在尝试使用 RxJava 创建一个顺序下载服务。 用户可以批量添加项目(20、30 等)或单个项目。这些项目将被添加到队列中,然后以 10 个为一组按顺序下载。 为此,我正在使用 PublishSubject:

PublishSubject<Int> pubSubject = PublishSubject.create();

它发出用户添加的项目(id),然后将缓冲区运算符应用于批处理项目。使用这些 id,项目在 flatMap 中下载并在订阅的 onNext 中返回。

  pubSubject.buffer(1, TimeUnit.SECONDS, 10)
            .observeOn(Schedulers.io())
            .flatMap { idsBatch -> downloadByIds(idsBatch) }
            .subscribe(
                /* onNext */ { apiResponse -> handleResponse() },
                /* onError */ { handleError(it) },
                /* onComplete*/ { hideProgressBar() }
             )

代码大部分都按预期工作。项目已成功批处理并下载,但即使在发出所有项目后,缓冲区仍会使用空列表调用 flatMap,并且永远不会调用 onComplete()。

我想知道当缓冲区中没有更多项目时,RxJava 中是否有任何方法或方式来获取 onComplete 回调。因为否则我的下载服务永远不会终止。

【问题讨论】:

    标签: android kotlin rx-java rx-java2 rx-android


    【解决方案1】:

    您可以使用takeWhile 操作:

    返回一个Observable,只要每个项目满足指定条件,就会发出源ObservableSource发出的项目,然后在不满足此条件时立即完成。

    pubSubject.buffer(1, TimeUnit.SECONDS, 10)
              .observeOn(Schedulers.io())
              .takeWhile { idsBatch -> idsBatch.isNotEmpty() }
              .flatMap { idsBatch -> downloadByIds(idsBatch) }
              .subscribe(
                  /* onNext */ { apiResponse -> handleResponse() },
                  /* onError */ { handleError(it) },
                  /* onComplete*/ { hideProgressBar() }
               )
    

    【讨论】:

    • 谢谢泰恩。这确实完成了链并在缓冲区列表为空但在 flatMap() 中仍有一个额外的 api 调用时收到 onComplete() 回调。
    • 你是对的。实际上 takeWhile 带有否定可能会更好。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-10-29
    • 2015-02-27
    • 2011-05-14
    • 1970-01-01
    • 1970-01-01
    • 2016-12-03
    相关资源
    最近更新 更多