【问题标题】:Cancelling the collect of the Kotlin flow取消 Kotlin 流的收集
【发布时间】:2021-12-24 09:18:03
【问题描述】:

我有一个具有不同状态的父类,这个父类有一个子类列表,每个子类都有不同的状态。我想收集它们中的每一个并取消达到Terminated 状态的那个。类似的东西:

coroutineScope.launch(Dispatcher.IO) {
   parent.parentState.collect {
      if(it is ParentState.Normal){
         it.children.forEach{ child ->
             coroutineScope.launch(Dispatcher.IO){
                child.childState.collect{
                    if(it is ChildState.Terminated){
                       //when this line executed all the collectors stop until I change the states for each one of them..
                       this.coroutineContext.job.cancel()
                    } else{
                       // Do something else for any other state...
                    }
                }
             }

         }
      }
   }
}

但是当我这样做时,我正在收集的所有孩子都会停止收集,但如果我更改了每个孩子的状态,它会再次开始收集,而在取消其中一个之前情况并非如此。

所以我的问题是为什么在取消其中一位收藏家的工作时会出现这样的行为?

还有没有更好的“反应式”来写这个?

【问题讨论】:

  • 你的代码 sn-p 中的all the collectors stop 是什么意思?该流只有一个收集器。您想取消对达到终止状态的孩子的收集,而其他孩子的流程应该继续,这是您想要的吗?
  • 没错。其他孩子应该继续。

标签: android kotlin kotlin-coroutines kotlin-flow


【解决方案1】:

我完全同意您可以使用SupervisorJob 来处理您的问题。

但在我看来,不需要那么多的子作业。一个子作业可以解决你遇到的问题。在您的代码中,将创建一个大小为children Collection 的子协程。虽然协程很轻量级,但我认为是不必要的开销。

您可以在每个Flow collect 之前将List<Flow<T>> 完全转换为Flow<List<T>>。之后,只有转换后的单个Flow可以是collect

我是这样处理的:

inline fun <reified T> List<Flow<T>>.flattenFlow(): Flow<List<T>> = combine(this@flattenFlow) {
    it.toList()
}

coroutineScope.launch(Dispatcher.IO) {
    parent.parentState.collect {
        if (it is ParentState.Normal) {
            it.flattenFlow().collect {childStateList ->
                childStateList.onEach {childState ->
                    if (childState is ChildState.Terminated) {
                        // Do something when state in Terminated..
                    } else {
                        // Do something else for any other state...
                    }
                }
            }
        }
    }
}

【讨论】:

    【解决方案2】:

    默认情况下,协程作用域在CoroutineContext 中使用Job()Job() 将取消协程执行或任何正在运行的子程序被取消或抛出异常。

    要保持其他子执行保持活动状态,您可以使用特殊的Job,即SupervisorJob()

    CoroutineScope(SupervisorJob() + Dispatchers.IO)
    

    另外,让我们为每个孩子创建一个全新的范围

    val childScope = CoroutineScope(Dispatchers.IO)
    childScope.launch {
        child.childState.collect {
            ....
        }
    }
    

    您应该取消那些childScope 而不是coroutineContext

    因此,您的代码将如下所示

    val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
    scope.launch {
       parent.parentState.collect {
          if(it is ParentState.Normal){
             it.children.forEach{ child ->
                 val childScope = CoroutineScope(Dispatchers.IO)
                 childScope.launch(Dispatcher.IO){
                    child.childState.collect{
                        if(it is ChildState.Terminated){
                           //when this line executed all the collectors stop until I change the states for each one of them..
                           childScope.cancel()
                        } else{
                           // Do something else for any other state...
                        }
                    }
                 }
    
             }
          }
       }
    }
    

    【讨论】:

    • 我认为这行不通。为什么要在父作用域中添加主管作业?
    • 我尝试了你的方法,但我仍然有同样的问题,所有的孩子都会停止,直到我改变孩子的状态。
    • 还有其他父作用域启动这个协程吗?
    • 没有。我尝试了很多东西,但对我没有用,我真的很困惑。
    • 如何为每个孩子使用新范围,让我更新答案
    猜你喜欢
    • 1970-01-01
    • 2020-09-14
    • 2021-11-07
    • 2021-01-04
    • 1970-01-01
    • 2021-09-30
    • 2022-09-22
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多