【问题标题】:RxKotlin - Wrong subscribeOn, observeOn thread changing for Subject out of Activity?RxKotlin - 错误的订阅,observeOn 线程更改主题的活动?
【发布时间】:2018-08-10 12:44:07
【问题描述】:

我有一个在随机时刻生成不同字符串的对象,我需要订阅这个生成器来获取这些字符串并将它们提供给 ui(也许它将是不同活动中的多个订阅者)。 假设,我得到以下代码:

生成器:

class Generator {

    private var stringToGenerate = ""

    var subject: BehaviorSubject<String> = BehaviorSubject.create<String>()

    init {
        //seems like these instructions are skipped
        subject
                .subscribeOn(AndroidSchedulers.mainThread())
                .doOnNext { t -> Log.i("subject doOnNext", Thread.currentThread().name + " " + Thread.currentThread().id) }
                .observeOn(AndroidSchedulers.mainThread())
                .map { _ -> Log.i("subject map", Thread.currentThread().name + " " + Thread.currentThread().id) }

        //imitation of async creating of strings in separate thread
        timer("timerThread", false, 2000L, 2000L) {
            stringToGenerate = System.currentTimeMillis().toString()
            subject.onNext(stringToGenerate)
        }
    }
}

必须使用生成的字符串的活动之一:

class TestActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        setContentView(R.layout.activity_test)

        val wrongThreadObserver = object : Observer<String> {
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
                Log.i("wrongThreadObserver", Thread.currentThread().name + " " + Thread.currentThread().id)
            }

            override fun onError(e: Throwable) {
            }
        }

        val generator = Generator()
        generator.subject.subscribe(wrongThreadObserver)

        //for correct work illustration
        val correctThreadObserver = object : Observer<String> {
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
                Log.i("correctThreadObserver", Thread.currentThread().name + " " + Thread.currentThread().id)
            }

            override fun onError(e: Throwable) {
            }
        }

        val mainThreadSubject = BehaviorSubject.create<String>()
        mainThreadSubject
                .doOnNext { obj -> Log.i("correctThread doOnNext", Thread.currentThread().name + " " + Thread.currentThread().id) }
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(correctThreadObserver)
        mainThreadSubject.onNext("test thread")
        val handler = Handler()
        handler.postDelayed({ mainThreadSubject.onNext("test thread 2") }, 1000)
        handler.postDelayed({ mainThreadSubject.onNext("test thread 3") }, 2000)
    }
}

在这种情况下,correctThreadObserver,仅在活动中创建,工作正常,但 wrongThreadObserver 保持在计时器线程中工作,似乎它忽略了指令 subscribeOn,ObserveOn , doOnNext 在生成器中,无论这些指令在哪里调用 - 在 init 中,在计时器线程中,在活动中通过从生成器获取对象 - wrongThreadObserver 仍然在计时器线程中工作。 所以日志是:

I/correctThread doOnNext: main 2

I/correctThreadObserver: RxNewThreadScheduler-1 941

I/correctThread doOnNext: main 2

I/correctThreadObserver: RxNewThreadScheduler-1 941

I/wrongThreadObserver: timerThread 937

I/correctThread doOnNext: main 2

I/correctThreadObserver: RxNewThreadScheduler-1 941

I/wrongThreadObserver: timerThread 937

I/wrongThreadObserver: timerThread 937

I/wrongThreadObserver: timerThread 937

没有doOnNext 并且没有wrongThreadObserver 的主线程 我做错了什么?

【问题讨论】:

  • observeOn替换subscribeOn,我认为它应该可以工作

标签: android multithreading kotlin-android-extensions rx-kotlin rx-kotlin2


【解决方案1】:

我找到以下解决方案: 我们必须订阅Observable,而不是Subject,所以我们必须使用方法observeOn()的结果,而不是对象“主题”本身。 如果需要多次订阅,我们可以将observeOn的结果缓存在单独的变量中:

//in Generator
.......
var observableInSeparateVar = subject.observeOn(AndroidSchedulers.mainThread())
.......

//in TestActivity
.......
generator.observableInSeparateVar.subscribe(wrongThreadObserver)
.......

我们也可以在此之后调用observeOn()进行主题和订阅:

//in TestActivity
.......
generator.subject
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(wrongThreadObserver)

【讨论】:

    猜你喜欢
    • 2017-09-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-07-27
    • 1970-01-01
    • 1970-01-01
    • 2021-10-05
    • 1970-01-01
    相关资源
    最近更新 更多