【问题标题】:Coroutine StateFlow stops emitting when CoroutineScope is cancelled当 CoroutineScope 被取消时,Coroutine StateFlow 停止发射
【发布时间】:2020-07-06 00:49:42
【问题描述】:

我有一个StateFlow 协程,它在我的应用程序的各个部分之间共享。当我cancelCoroutineScope 的下游收集器时,JobCancellationException 被传播到StateFlow,它停止为所有当前和未来的收集器发出值。

StateFlow:

val songsRelay: Flow<List<Song>> by lazy {
    MutableStateFlow<List<Song>?>(null).apply {
        CoroutineScope(Dispatchers.IO)
            .launch { songDataDao.getAll().distinctUntilChanged().collect { value = it } }
    }.filterNotNull()
}

我的代码中的典型“演示者”实现了以下基类:

abstract class BasePresenter<T : Any> : BaseContract.Presenter<T> {

    var view: T? = null

    private val job by lazy {
        Job()
    }

    private val coroutineScope by lazy { CoroutineScope( job + Dispatchers.Main) }

    override fun bindView(view: T) {
        this.view = view
    }

    override fun unbindView() {
        job.cancel()
        view = null
    }

    fun launch(block: suspend CoroutineScope.() -> Unit): Job {
        return coroutineScope.launch(block = block)
    }
}

BasePresenter 实现可能会调用 launch{ songsRelay.collect {...} } 当presenter解绑时,为了防止泄露,我取消了父job。任何时候收集songsRelayStateFlow 的演示者未绑定,StateFlow 本质上以JobCancellationException 终止,并且没有其他收集器/演示者可以从中收集值。

我注意到我可以改为调用job.cancelChildren(),这似乎可行(StateFlow 不能与JobCancellationException 一起完成)。但是后来我想知道如果我不能取消工作本身,那么声明父母job 有什么意义。我可以完全删除 job,然后调用 coroutineScope.coroutineContext.cancelChildren() 达到同样的效果。

如果我只是打电话给job.cancelChildren(),就够了吗?我觉得如果不打电话给coroutineScope.cancel()job.cancel(),我可能没有正确或完全清理我已经开始的任务。

我也不明白为什么JobCancellationException 在调用job.cancel() 时会向上传播。 job 不是这里的“父母”吗?为什么取消它会影响我的StateFlow

【问题讨论】:

  • 我玩了一下,做了一个小单元测试,试图重现你的行为。但我无法获得JobCancellationException。经过几年的 RxJava,我刚刚开始使用协程 + 流,所以也许我错过了一些东西。你能分享一个独立的例子来重现这个问题吗?
  • 是的,很可能这里还有其他东西在起作用。我会看看我是否可以在 Kotlin 操场上重现该问题

标签: android kotlin kotlin-coroutines


【解决方案1】:

更新:

你确定你的songRelay 真的被所有演示者取消了吗?我运行了这个测试并打印了“Song relay completed”,因为onCompletion 也捕获了下游异常。但是 Presenter 2 发出的值 2 就好了,在歌曲中继打印“完成”之后。如果我取消 Presenter 2,则会再次打印“Song relay completed”,并针对 Presenter 2 的作业显示 JobCancellationException。

我确实发现一个流实例如何为每个订阅的收集器发出一次很有趣。关于流量,我没有意识到这一点。

    val songsRelay: Flow<Int> by lazy {
        MutableStateFlow<Int?>(null).apply {
            CoroutineScope(Dispatchers.IO)
                    .launch {
                        flow {
                            emit(1)
                            delay(1000)
                            emit(2)
                            delay(1000)
                            emit(3)
                        }.onCompletion {
                            println("Dao completed")
                        }.collect { value = it }
                    }
        }.filterNotNull()
                .onCompletion { cause ->
                    println("Song relay completed: $cause")
                }
    }

    @Test
    fun test() = runBlocking {
        val job = Job()
        val presenterScope1 = CoroutineScope(job + Dispatchers.Unconfined)
        val presenterScope2 = CoroutineScope(Job() + Dispatchers.Unconfined)

        presenterScope1.launch {
            songsRelay.onCompletion { cause ->
                println("Presenter 1 Completed: $cause")
            }.collect {
                println("Presenter 1 emits: $it")
            }
        }

        presenterScope2.launch {
            songsRelay.collect {
                println("Presenter 2 emits: $it")
            }
        }

        presenterScope1.cancel()

        delay(2000)
        println("Done test")
    }

我认为您需要在 BasePresenter 中使用 SupervisorJob 而不是 Job。一般来说,使用Job 对整个 Presenter 来说都是一个错误,因为一个失败的协程会取消 Presenter 中的所有协程。一般不是你想要的。

【讨论】:

  • 您可能是对的 - SupervisorJob 可能更合适。但这并不能真正回答问题(并且用 SupervisorJob 替换 Job 并不能解决问题)
  • 更新了我的答案。我无法在测试中重现您所看到的内容。
  • 谢谢。我认为一定有别的东西在起作用,我还在调查。可能这个问题的前提是无效的
  • 我刚刚有机会回到这个话题。所以 - 不,所有演示者的流程都没有取消。我的复制测试有缺陷 - 我在解除绑定然后重新绑定同一个演示者,并且在第二次绑定时,流程停止收集 - 因为 cancel() 已被调用 - 这是预期的行为。哇!
【解决方案2】:

好的,所以问题是我在测试时做出了一些错误的假设。 StateFlow 行为正确,取消工作按预期工作。

我在想PresentersStateFlow 之间会停止发射值,但我实际上是在测试 Presenter 的同一个实例 - 所以它的 Job 已被取消,因此预计不会继续收集流量排放。

我还错误地将onCompletionStateFlow 中发出的CancellationException 消息理解为StateFlow 本身已被取消-实际上它只是说下游Collector/Job 已被取消.

我想出了一个更好的BasePresenter 实现,看起来像这样:

abstract class BasePresenter<T : Any> : BaseContract.Presenter<T>, CoroutineScope {

    var view: T? = null

    private var job = Job()

    override val coroutineContext: CoroutineContext
        get() = job + Dispatchers.Main

    override fun bindView(view: T) {
        if (job.isCancelled) {
            job = Job()
        }
        this.view = view
    }

    override fun unbindView() {
        job.cancel()
        view = null
    }
}

【讨论】:

  • 不错。不过,我仍然建议在这里使用 SupervisorJob。对于使用您的BasePresenter 的人,他们可能不会想到,当launch { getApi1 } 失败时,他们使用过launch { getApi2 } 的其他地方也会被取消。
  • 谢谢。是的,我自己为此使用了 SupervisorJob,我只是不想在问答中引入新概念。
  • 永远记住 StateFlow 永远不会完成。取消是不可能的:)
  • @HarryTimothy 所以如果我们运行多个并且不再使用它们,我们会浪费资源吗?那么对于只要应用程序还活着就需要数据的情况,它可能是理想的吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-01-18
  • 1970-01-01
  • 1970-01-01
  • 2021-11-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多