【问题标题】:FlatMap run on main thread even using Scheuler.io即使使用 Scheuler.io,FlatMap 也可以在主线程上运行
【发布时间】:2019-07-28 05:39:46
【问题描述】:

我已经测试过了,肯定是从主线程发出值会导致这个问题。但是,我想知道我是否有这个用例从主线程接收一些值以继续 Rx 流。应该怎么做才能使 flatMap 在不同于 main 的线程中运行。

class MainActivity : AppCompatActivity() {

    private lateinit var emitter: ObservableEmitter<String>

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        btnFlatMap.setOnClickListener {

            val obs = Observable.create<String> {
                emitter = it
                logThread("inside observable")

                // TODO: fetch some configuration from the internet or local db

                // TODO: then call startActivityForResult()
            }

            obs
                .flatMap {
                    logThread("flatMap, Banana")
                    Observable.just("$it, 1 Item")
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ next ->
                    logThread("onNext")
                }, { error ->
                    logThread("onError")
                }, {
                    logThread("onComplete")
                })
        }

        btnEmitter.setOnClickListener {
            // TODO: simulate that onActivityResult is called
            emitter.onNext("Banana")
            emitter.onComplete()
        }
    }

    private fun logThread(operation: String) {
        Log.e("THREAD", "$operation run at [${Thread.currentThread().name}]")
    } }

当前的 Logcat

在 [RxCachedThreadScheduler-1] 的 observable 内部运行

flatMap, Banana 在 [main] 运行

onNext 在 [main] 运行

onComplete 在 [main] 运行

预期的 Logcat

在 [RxCachedThreadScheduler-1] 的 observable 内部运行

flatMap, Banana 在 [RxCachedThreadScheduler-1] 运行

onNext 在 [main] 运行

onComplete 在 [main] 运行

【问题讨论】:

    标签: android rx-java2 rx-android


    【解决方案1】:

    添加一个额外的onserverOn(Schedulers.io()) 就可以了,告诉 Rx 将线程切换回工作线程

            obs
                .observeOn(Schedulers.io()) <------ additional 
                .flatMap {
                    logThread("flatMap, Banana")
                    Observable.just("$it, 1 Item")
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe({ next ->
                    logThread("onNext")
                }, { error ->
                    logThread("onError")
                }, {
                    logThread("onComplete")
                })
    

    由于主线程发出的数据,导致RX在订阅主线程时从subscribeOn()注册的工作线程(Schedulers.io)切换整个下游。为了将整个下游的执行线程再次更改(切换)为工作线程observeOn()就是为此目的。

    总之

    • observeOn,将下游更改为在特定线程上执行。

    • subscribeOn,将上游(根源)设置为在特定线程上执行。

    现在是 Logcat

    flatMap, Banana 在 [RxCachedThreadScheduler-1] 运行

    onNext 在 [main] 运行

    onComplete 在 [main] 运行

    【讨论】:

    • 你能解释一下你的答案吗?
    • @MidasLefko 关于这个问题,我已经测试并找到了flatMap 在主线程上执行的原因。这是因为我从主线程发出值。因此,它使用该线程制作流的其余部分。为了切换回工作线程,observeOn() 用于此目的。
    • 更多关于 observeOn 和 subscribeOn 的信息 proandroiddev.com/…
    • 又一个不错的blog.gojekengineering.com/…
    猜你喜欢
    • 2016-06-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-27
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多