【问题标题】:RXJava2 block stream until condition is satisfiedRXJava2 阻塞流直到条件满足
【发布时间】:2019-06-19 13:28:35
【问题描述】:

我有一个要上传的文件队列,这些文件由 Observable 发出,我只想在有 WIFI 连接时上传文件。我有一个解决方案,它只会在建立 WIFI 连接时开始发出队列,但理想情况下,我想在发出每个队列项之前进行检查(然后在此时阻塞,直到建立 WIFI 连接)。这样队列上传过程中如果Wifi断开,队列会阻塞正在上传的项目,并在重新建立时恢复。

这是我目前所拥有的:

ReactiveNetwork.observeNetworkConnectivity(application)
    .filter(ConnectivityPredicate.hasState(NetworkInfo.State.CONNECTED))
    .filter(ConnectivityPredicate.hasType(ConnectivityManager.TYPE_WIFI))
    .take(1)
    .toFlowable(BackpressureStrategy.BUFFER)
    .flatMap { queueItemAvailable() }
    .flatMap { audioSampleUploader.uploadFile(it) }
    .subscribeOn(Schedulers.single())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe { // SOMETHING HERE }

所以我想移动

queueItemAvailable()

到顶层,然后对要上传的每个项目进行(阻塞)网络连接检查,但我的 RxJava 相当生疏,所以不知道如何做到这一点。

非常感谢任何智慧?

【问题讨论】:

    标签: java android kotlin rx-java rx-java2


    【解决方案1】:

    好的,你应该尝试结合两个 observables,一个用于上传项目,一个用于检查 wifi 是否可用,我认为你应该将整个网络连接检查移到另一种方法并返回

    ReactiveNetwork.observeNetworkConnectivity(application) .flatMap { queueItemAvailable() } .withLatestFrom {checkForWifiConnection(), BiFunction<QueueResult, Boolean, Pair< Boolean, QueueResult>> { wifiAvailable, queueResult -> Pair(queueResult, wifiAvailable) }).filter { it.first } .flatMap { audioSampleUploader.uploadFile(it) } .subscribeOn(Schedulers.single()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { // SOMETHING HERE }

    请原谅代码格式,但想法是组合 Observables 并返回一对,并且仅在 wifi 连接可用时发出值。我相信你也可以使用zipwith 操作符来组合observables。

    【讨论】:

    • 你能否给我一个提示,告诉我在这种情况下如何使用它,因为这对我有什么帮助并不明显。
    • withLatestFrom 将等到所有可观察对象、上游可观察对象(调用queueItemAvailable 的源)和checkforWifiFunction 发出一个项目。一旦两个源 observables 都发出,您可以使用 withLatestFrom 中的 BiFunction 对两个函数的结果进行实际操作。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-11-10
    • 2014-10-18
    • 2021-12-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-01-23
    相关资源
    最近更新 更多