【问题标题】:How do Kotlin coroutines work internally?Kotlin 协程如何在内部工作?
【发布时间】:2019-04-30 18:44:03
【问题描述】:

Kotlin 如何在内部实现协程?

协程被称为线程的“轻量级版本”,据我了解,它们在内部使用线程来执行协程。

当我使用任何构建器函数启动协程时会发生什么?

这是我对运行这段代码的理解:

GlobalScope.launch {       <---- (A)
    val y = loadData()     <---- (B)  // suspend fun loadData() 
    println(y)             <---- (C)
    delay(1000)            <---- (D)
    println("completed")   <---- (E)
}
  1. Kotlin 在开头有一个预定义的ThreadPool
  2. (A),Kotlin 开始在下一个可用的空闲线程(比如Thread01)中执行协程。
  3. (B),Kotlin 停止执行当前线程,并在下一个可用的空闲线程 (Thread02) 中启动挂起函数 loadData()
  4. (B) 执行后返回时,Kotlin 在下一个可用的空闲线程中Thread03)继续协程。
  5. (C)Thread03 上执行。
  6. (D)Thread03 停止。
  7. 1000 毫秒后,(E) 在下一个空闲线程上执行,比如Thread01

我理解正确吗?还是协程以不同的方式实现?


2021 年更新: Here's an excellent article 由 Manuel Vivo 撰写,补充了以下所有答案。

【问题讨论】:

标签: kotlin kotlin-coroutines


【解决方案1】:

协程与您描述的任何调度策略完全不同。协程基本上是suspend funs 的调用链。暂停完全由您控制:您只需致电suspendCoroutine。您将获得一个回调对象,因此您可以调用其resume 方法并返回您暂停的位置。

这里有一些代码,你可以看到暂停是一个非常直接和透明的机制,完全在你的控制之下:

import kotlin.coroutines.*
import kotlinx.coroutines.*

var continuation: Continuation<String>? = null

fun main(args: Array<String>) {
    val job = GlobalScope.launch(Dispatchers.Unconfined) {
        while (true) {
            println(suspendHere())
        }
    }
    continuation!!.resume("Resumed first time")
    continuation!!.resume("Resumed second time")
}

suspend fun suspendHere() = suspendCancellableCoroutine<String> {
    continuation = it
}

上面的所有代码都在同一个主线程上执行。根本没有多线程。

launch 的协程每次调用suspendHere() 时都会挂起。它将延续回调写入continuation 属性,然后您显式使用该延续来恢复协程。

代码使用Unconfined 协程调度器,它根本不分派到线程,它只是在您调用continuation.resume() 的地方运行协程代码。


考虑到这一点,让我们重新审视您的图表:

GlobalScope.launch {       <---- (A)
    val y = loadData()     <---- (B)  // suspend fun loadData() 
    println(y)             <---- (C)
    delay(1000)            <---- (D)
    println("completed")   <---- (E)
}
  1. Kotlin 在开头有一个预定义的ThreadPool

它可能有也可能没有线程池。 UI 调度程序与单个线程一起工作。

线程成为协程调度器目标的先决条件是有一个与之关联的并发队列,并且线程运行一个顶级循环,该循环从该队列中获取Runnable对象并执行它们。协程调度器只是将延续放在该队列中。

  1. (A),Kotlin 开始在下一个可用的空闲线程(比如Thread01)中执行协程。

它也可以是你调用launch的同一个线程。

  1. (B),Kotlin 停止执行当前线程,并在下一个可用的空闲线程 (Thread02) 中启动挂起函数 loadData()

Kotlin 不需要为了挂起协程而停止任何线程。事实上,协程的主要观点是线程不会启动或停止。线程的顶级循环将继续并选择另一个 runnable 来运行。

此外,您调用suspend fun 的事实本身没有任何意义。协程只有在显式调用suspendCoroutine 时才会挂起。该函数也可以简单地返回而不暂停。

但我们假设它确实调用了suspendCoroutine。在这种情况下,协程不再在任何线程上运行。它被暂停并且无法继续,直到某个代码在某处调用continuation.resume()。该代码可以在未来任何时间在任何线程上运行。

  1. (B) 执行后返回时,Kotlin 在下一个可用的空闲线程中Thread03)继续协程。

B 不会“执行后返回”,协程在其主体内继续执行。它可以在返回之前暂停和恢复任意次数。

  1. (C)Thread03 上执行。
  2. (D)Thread03 停止。
  3. 1000 毫秒后,(E) 在下一个空闲线程上执行,例如 Thread01

再一次,没有线程被停止。协程被挂起,并且通常特定于调度程序的机制用于安排其在 1000 毫秒后恢复。此时它将被添加到与调度程序关联的运行队列中。


为了具体起见,让我们看一些示例,说明调度协程需要什么样的代码。

Swing UI 调度程序:

EventQueue.invokeLater { continuation.resume(value) }

Android UI 调度程序:

mainHandler.post { continuation.resume(value) }

ExecutorService 调度器:

executor.submit { continuation.resume(value) } 

【讨论】:

  • 我猜你的意思是当你使用withContext(Dispatchers.IO) { ... work ... }时。您将使用它来获得 非暂停 乐趣,以避免它阻塞您当前的线程。运行时将暂停您当前的协程并在后台线程上恢复它。当withContext() 块完成后,它将再次挂起并在您的初始线程上恢复。所以它会一直是同一个协程,只是从一个线程跳到另一个线程(就像一个线程可以从一个核心跳到另一个核心)。
  • 所以基本上一个协程是许多Continuations的集合,而Continuation就像一个回调 [幕后],由完成工作后的其他暂停功能。 Dispatcher 负责调度这些Continuations,方法是将它们放入并发队列(mainIO,基于开发人员的偏好)。对吗?
  • Continuation 是一个回调在底层,它的客户使用它。在内部,它是一个优化的实现,允许重用相同的回调对象以从函数体内的任何挂起点恢复。协程与模仿调用堆栈的延续的链表相关联。一旦suspend fun 完成,它就会继续调用它的调用者并返回结果。这就是“续传风格”的精髓。
  • 我还想澄清一点。假设没有线程被停止和重新启动,那么总是可能很少有线程启动和运行,它们会不断检查并发队列以在它们处于 READY 状态时运行一件工作,对吗?
  • 给定的事件循环实现可以使用不同的策略。两个基本的机制是指数退避,在这种情况下您会逐渐休眠(但始终是有限的时间),以及wait-notify 机制,您会在这种机制下进入休眠状态,直到生产者线程向您发送信号。
【解决方案2】:

协程通过在可能的恢复点上创建切换来工作:

class MyClass$Coroutine extends CoroutineImpl {
    public Object doResume(Object o, Throwable t) {
        switch(super.state) {
        default:
                throw new IllegalStateException("call to \"resume\" before \"invoke\" with coroutine");
        case 0:  {
             // code before first suspension
             state = 1; // or something else depending on your branching
             break;
        }
        case 1: {
            ...
        }
        }
        return null;
    }
}

执行此协程的结果代码随后创建该实例并在每次需要恢复执行时调用doResume() 函数,处理方式取决于用于执行的调度程序。

这是一个简单协程的示例编译:

launch {
    println("Before")
    delay(1000)
    println("After")
}

编译成这个字节码

private kotlinx.coroutines.experimental.CoroutineScope p$;

public final java.lang.Object doResume(java.lang.Object, java.lang.Throwable);
Code:
   0: invokestatic  #18                 // Method kotlin/coroutines/experimental/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED:()Ljava/lang/Object;
   3: astore        5
   5: aload_0
   6: getfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
   9: tableswitch   { // 0 to 1
                 0: 32
                 1: 77
           default: 102
      }
  32: aload_2
  33: dup
  34: ifnull        38
  37: athrow
  38: pop
  39: aload_0
  40: getfield      #24                 // Field p$:Lkotlinx/coroutines/experimental/CoroutineScope;
  43: astore_3
  44: ldc           #26                 // String Before
  46: astore        4
  48: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;
  51: aload         4
  53: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
  56: sipush        1000
  59: aload_0
  60: aload_0
  61: iconst_1
  62: putfield      #22                 // Field kotlin/coroutines/experimental/jvm/internal/CoroutineImpl.label:I
  65: invokestatic  #44                 // Method kotlinx/coroutines/experimental/DelayKt.delay:(ILkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
  68: dup
  69: aload         5
  71: if_acmpne     85
  74: aload         5
  76: areturn
  77: aload_2
  78: dup
  79: ifnull        83
  82: athrow
  83: pop
  84: aload_1
  85: pop
  86: ldc           #46                 // String After
  88: astore        4
  90: getstatic     #32                 // Field java/lang/System.out:Ljava/io/PrintStream;
  93: aload         4
  95: invokevirtual #38                 // Method java/io/PrintStream.println:(Ljava/lang/Object;)V
  98: getstatic     #52                 // Field kotlin/Unit.INSTANCE:Lkotlin/Unit;
 101: areturn
 102: new           #54                 // class java/lang/IllegalStateException
 105: dup
 106: ldc           #56                 // String call to \'resume\' before \'invoke\' with coroutine
 108: invokespecial #60                 // Method java/lang/IllegalStateException."<init>":(Ljava/lang/String;)V
 111: athrow

我用 kotlinc 1.2.41 编译了这个

从 32 到 76 是打印 Before 并调用暂停的 delay(1000) 的代码。

77到101是打印After的代码。

从 102 到 111 是对非法恢复状态的错误处理,如开关表中的 default 标签所示。

总而言之,kotlin 中的协程只是由一些调度程序控制的状态机。

【讨论】:

  • 这实质上意味着一个协程在内部被划分为若干个switch的case。继续将像executeCase(1)executeCase(2)executeCase(N) 一样执行。对吗?
  • 状态由doResume 方法通过修改它继承的CoroutineImpl 中的一个字段来更新。然后控制权返回给调用者(调度程序)并在稍后恢复,也许它首先执行其他操作或立即恢复。
猜你喜欢
  • 2018-06-03
  • 2020-03-19
  • 2020-09-15
  • 2021-08-01
  • 2021-10-27
  • 1970-01-01
  • 2020-10-08
  • 2019-08-08
  • 1970-01-01
相关资源
最近更新 更多