注:本文代码基于Kotlin 1.9.0

前言

一般而言,Kotlin的协程实现分为两个层次:

  • 基础设施层:标准库的协程API,主要对协程提供了概念和语义上最基本的支持,API所在包名的前缀为kotlin.coroutines.*。基于此,本文主要内容就是解读Kotlin协程启动源码。

  • 业务框架层:协程的上层框架支持,API所在包名的前缀为kotlinx.coroutines.*

整体上,将通过Kotlin协程的基础设施创建的协程称为简单协程,将基于简单协程实现的各种业务层进行封装之后得到的协程称为复合协程

一、协程的创建

Kotlin当中创建一个简单协程不是什么难事,如下代码所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
val continuation = suspend {                      
    val x: Int = 88                               
    123 + x                                        
}.createCoroutine(object : Continuation<Int> {    
    override val context: CoroutineContext        
        get() = EmptyCoroutineContext             

    override fun resumeWith(result: Result<Int>) {
        println("$result")                        
    }                                             
})                                                                                                                        

标准库中提供了一个createCoroutine方法,我们可以通过它来创建协程,不过这个协程并不会立即执行,我们先来看看它的声明:

1
2
3
public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit>

这里解释下上面的声明:

1、suspend ()->TcreateCoroutine方法的ReceiverReceiver是一个被suspend关键字修饰的挂起函数,这也是协程的执行体,我们不妨称它为协程体。这里补充一条Kotlin基础知识:Kotlin扩展的本质就是定义了一个函数,当程序用对象调用扩展方法时,Kotlin在编译过程中会执行静态解析,将调用扩展函数的调用者作为函数的第一个参数传入,也就是说,suspend ()->T会作为createCoroutine方法的第一个参数传入,对于本文后面的内容来说,这是一个伏笔,叫它伏笔1吧;

2、参数completion会在协程执行完成后调用,实际上就是协程的完成回调;

3、返回值是一个Continuation对象,由于现在协程仅仅被创建出来,因此需要通过这个值在之后触发协程的启动。

二、协程的启动

调用continuationresume方法之后,协程体会立即开始执行,如下代码所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val continuation = suspend {                      
    val x: Int = 88                               
    123 + x                                        
}.createCoroutine(object : Continuation<Int> {    
    override val context: CoroutineContext        
        get() = EmptyCoroutineContext             

    override fun resumeWith(result: Result<Int>) {
        println("$result")                        
    }                                             
})                                               
continuation.resume(Unit)                         

此时要抛出本文要重点解决的问题:为什么调用continuation.resume(Unit)就会触发协程体的执行呢?

如果按一般思路去理解,上面代码返回的变量continuation就是问题的突破口,只要找出continuation变量的真身,就能定位到resume方法的具体实现,从而找出触发协程体执行的所在。

OK,一步步来,我们先看suspend方法的源码:

1
public inline fun <R> suspend(noinline block: suspend () -> R): suspend () -> R = block

suspend方法比较简单,把传入的挂起函数原封不动地返回,但是要注意:suspend方法是一个内联方法,而参数block是非内联的,意味着block在编译后会生成一个函数对象(匿名内部类),这里是埋下伏笔2。

createCoroutine方法的声明我们前面讲过了,现在完整看下createCoroutine方法的源码:

1
2
3
4
public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

由源码可知,这里调用了2个参数的构造方法创建了SafeContinuation对象,其中第1个参数先不管,后面会讲到,第2个参数为COROUTINE_SUSPENDED。知道了continuation变量的实例对象是SafeContinuation,也就是说可以推测下,调用continuation变量的resume方法实际上是调用了SafeContinuationresume方法,继续跟踪SafeContinuation的源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
internal expect class SafeContinuation<in T> : Continuation<T> {
    internal constructor(delegate: Continuation<T>, initialResult: Any?)

    @PublishedApi
    internal constructor(delegate: Continuation<T>)

    @PublishedApi
    internal fun getOrThrow(): Any?

    override val context: CoroutineContext
    override fun resumeWith(result: Result<T>): Unit
}

public interface Continuation<in T> {

    public val context: CoroutineContext

    public fun resumeWith(result: Result<T>)
}

通过观察SafeContinuation的源码,可以得到3个结论:

1、SafeContinuation类是expect关键字修饰,意味着它是一个跨平台类,也就是说不同平台该类有不同的实现。这里先说明,后面我们只看Kotlin平台下的具体实现;

2、SafeContinuation类实现了接口Continuation

3、SafeContinuation类中并没有resume方法,然而我们先前推测过,调用continuationresume方法实际上是调用了SafeContinuationresume方法,很显然这是错的。

看到这里或许已经一头雾水了,但是先不要着急哈,继续寻找突破口,先看下continuation变量的resume方法的源码:

1
2
public inline fun <T> Continuation<T>.resume(value: T): Unit =
    resumeWith(Result.success(value))

由源码可知,resume方法是接口Continuation的一个扩展方法,其内部调用了ContinuationresumeWith方法,结合前面说的,SafeContinuation实现了接口Continuation,那么最终调用的就是SafeContinuationresumeWith方法了。

但是SafeContinuation是一个跨平台类,要先找到它的具体实现,在Kotlin平台上的具体实现位置为:

SafeContinuation具体实现代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
internal actual class SafeContinuation<in T>
internal actual constructor(
    private val delegate: Continuation<T>,
    initialResult: Any?
) : Continuation<T>, CoroutineStackFrame {
    @PublishedApi
    internal actual constructor(delegate: Continuation<T>) : this(delegate, UNDECIDED)

    public actual override fun resumeWith(result: Result<T>) {
        while (true) { // lock-free loop
            val cur = this.result // atomic read
            when {
                cur === UNDECIDED -> if (RESULT.compareAndSet(this, UNDECIDED, result.value)) return
                cur === COROUTINE_SUSPENDED -> if (RESULT.compareAndSet(this, COROUTINE_SUSPENDED, RESUMED)) {
                    delegate.resumeWith(result)
                    return
                }
                else -> throw IllegalStateException("Already resumed")
            }
        }
    }
}

先前讲过,创建SafeContinuation对象时传入的第2个参数为COROUTINE_SUSPENDED,那么可以知道,SafeContinuationresumeWith方法执行后实际上调用的是delegate变量的resumeWith方法,于是乎,现在可以从找continuation变量的真身转为找delegate变量的真身了,因为SafeContinuation只是个包装马甲,真正做事情的是delegate变量,那么delegate变量又是什么了?

SafeContinuation对象被创建是通过调用2个参数的构造方法,其中delegate变量是第1个参数待传入,我们可以回去跟踪下SafeContinuation被创建时的源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit>

public expect fun <T> Continuation<T>.intercepted(): Continuation<T>

由源码可知,delegate变量就是这么长的一串东西:

1
createCoroutineUnintercepted(completion).intercepted()

而且发现createCoroutineUnintercepted方法和intercepted方法都是跨平台方法,要先找到它们的具体实现,在Kotlin平台上的具体实现位置为:

createCoroutineUnintercepted方法和intercepted方法的具体实现代码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)       // ①
    return if (this is BaseContinuationImpl)
        create(probeCompletion)                                   // ②
    else
        createCoroutineFromSuspendFunction(probeCompletion) {     // ③
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

private inline fun <T> createCoroutineFromSuspendFunction(
    completion: Continuation<T>,
    crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
    val context = completion.context
    // label == 0 when coroutine is not started yet (initially) or label == 1 when it was
    return if (context === EmptyCoroutineContext)
        object : RestrictedContinuationImpl(completion as Continuation<Any?>) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
                        block(this) // run the block, may return or suspend
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow() // this is the result if the block had suspended
                    }
                    else -> error("This coroutine had already completed")
                }
        }
    else
        object : ContinuationImpl(completion as Continuation<Any?>, context) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
                        block(this) // run the block, may return or suspend
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow() // this is the result if the block had suspended
                    }
                    else -> error("This coroutine had already completed")
                }
        }
}

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this           // ④

这里解释下上面的标号地方:

标号①、probeCoroutineCreated方法如下:

1
2
3
internal fun <T> probeCoroutineCreated(completion: Continuation<T>): Continuation<T> {
    return completion
}

代码很简单,就是把传入的completion原封不动的返回。

标号②、如果suspend () -> TBaseContinuationImpl的子类,那么调用BaseContinuationImpl类的create方法,并返回Continuation<Unit>,该方法如下:

1
2
3
4
5
6
7
8
internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {

    public open fun create(completion: Continuation<*>): Continuation<Unit> {
        throw UnsupportedOperationException("create(Continuation) has not been overridden")
    }  
}

BaseContinuationImpl类的create方法默认没有实现,估计是其子类实现了。

标号③、如果suspend () -> T不是BaseContinuationImpl的子类,那么调用createCoroutineFromSuspendFunction方法,并返回Continuation<Unit>,其方法闭包中会将suspend () -> T强制转换为Function1接口,并调用Function1接口的invoke方法。

解释下createCoroutineFromSuspendFunction方法的实现:

createCoroutineFromSuspendFunction方法内部会先判断context变量是否为EmptyCoroutineContext对象,如果是的话则返回RestrictedContinuationImpl对象,否则就返回ContinuationImpl对象。

标号④、将调用Continuation接口扩展方法intercepted的调用者强制转换为ContinuationImpl对象,如果转换成功,则调用ContinuationImplintercepted方法,否则返回调用者本身。

OK,上面分析那么多,发现有2个名字比较显眼:ContinuationImplBaseContinuationImpl,可以认为它们和delegate的构建肯定有关系,到这里就不往下看源码了,因为看不懂了,啊哈哈~。

既然按一般思路去跟踪源码无法定位到问题,那么只能反编译源码了,去看看编译器是否在编译后添加了“魔法”,回顾下先前协程启动的源码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
val continuation = suspend {                       
    val x: Int = 88                                
    123 + x                                         
}.createCoroutine(object : Continuation<Int> {     
    override val context: CoroutineContext         
        get() = EmptyCoroutineContext              

    override fun resumeWith(result: Result<Int>) { 
        println("$result")                         
    }                                              
})                                                 
continuation.resume(Unit)                          

运行过上面代码后,点击IDEATools->Kotlin->Show Kotlin ByteCode来查看字节码:

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
// ================com/pengmj/kotlincoroutine/SampleKt.class =================
// class version 52.0 (52)
// access flags 0x31
public final class com/pengmj/kotlincoroutine/SampleKt {


  // access flags 0x19
  public final static main()V
   L0
    LINENUMBER 6 L0
   L1
    LINENUMBER 9 L1
   L2
    LINENUMBER 6 L2
    NEW com/pengmj/kotlincoroutine/SampleKt$main$continuation$1
    DUP
    ACONST_NULL
    INVOKESPECIAL com/pengmj/kotlincoroutine/SampleKt$main$continuation$1.<init> (Lkotlin/coroutines/Continuation;)V
    CHECKCAST kotlin/jvm/functions/Function1
    ASTORE 1
   L3
    ALOAD 1
   L4
    LINENUMBER 9 L4
    NEW com/pengmj/kotlincoroutine/SampleKt$main$continuation$2
    DUP
    INVOKESPECIAL com/pengmj/kotlincoroutine/SampleKt$main$continuation$2.<init> ()V
    CHECKCAST kotlin/coroutines/Continuation
    INVOKESTATIC kotlin/coroutines/ContinuationKt.createCoroutine (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
   L5
    LINENUMBER 6 L5
    ASTORE 0
   L6
    LINENUMBER 17 L6
    ALOAD 0
    ASTORE 1
    GETSTATIC kotlin/Unit.INSTANCE : Lkotlin/Unit;
    ASTORE 2
   L7
    ALOAD 1
    GETSTATIC kotlin/Result.Companion : Lkotlin/Result$Companion;
    POP
    ALOAD 2
    INVOKESTATIC kotlin/Result.constructor-impl (Ljava/lang/Object;)Ljava/lang/Object;
   L8
    INVOKEINTERFACE kotlin/coroutines/Continuation.resumeWith (Ljava/lang/Object;)V (itf)
   L9
    LINENUMBER 18 L9
    RETURN
   L10
    LOCALVARIABLE continuation Lkotlin/coroutines/Continuation; L6 L10 0
    MAXSTACK = 3
    MAXLOCALS = 3

  // access flags 0x1009
  public static synthetic main([Ljava/lang/String;)V
    INVOKESTATIC com/pengmj/kotlincoroutine/SampleKt.main ()V
    RETURN
    MAXSTACK = 0
    MAXLOCALS = 1

  @Lkotlin/Metadata;(mv={1, 9, 0}, k=2, d1={"\u0000\u0008\n\u0000\n\u0002\u0010\u0002\n\u0000\u001a\u0006\u0010\u0000\u001a\u00020\u0001\u00a8\u0006\u0002"}, d2={"main", "", "Kotlin"})
  // access flags 0x18
  final static INNERCLASS com/pengmj/kotlincoroutine/SampleKt$main$continuation$1 null null
  // access flags 0x19
  public final static INNERCLASS com/pengmj/kotlincoroutine/SampleKt$main$continuation$2 null null
  // compiled from: sample.kt
}


// ================com/pengmj/kotlincoroutine/SampleKt$main$continuation$1.class =================
// class version 52.0 (52)
// access flags 0x30
// signature Lkotlin/coroutines/jvm/internal/SuspendLambda;Lkotlin/jvm/functions/Function1<Lkotlin/coroutines/Continuation<-Ljava/lang/Integer;>;Ljava/lang/Object;>;
// declaration: com/pengmj/kotlincoroutine/SampleKt$main$continuation$1 extends kotlin.coroutines.jvm.internal.SuspendLambda implements kotlin.jvm.functions.Function1<kotlin.coroutines.Continuation<? super java.lang.Integer>, java.lang.Object>
final class com/pengmj/kotlincoroutine/SampleKt$main$continuation$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function1 {


  // access flags 0x11
  public final invokeSuspend(Ljava/lang/Object;)Ljava/lang/Object;
  @Lorg/jetbrains/annotations/Nullable;() // invisible
    // annotable parameter count: 1 (visible)
    // annotable parameter count: 1 (invisible)
    @Lorg/jetbrains/annotations/NotNull;() // invisible, parameter 0
    INVOKESTATIC kotlin/coroutines/intrinsics/IntrinsicsKt.getCOROUTINE_SUSPENDED ()Ljava/lang/Object;
   L0
    LINENUMBER 6 L0
    ASTORE 3
    ALOAD 0
    GETFIELD com/pengmj/kotlincoroutine/SampleKt$main$continuation$1.label : I
    TABLESWITCH
      0: L1
      default: L2
   L1
    ALOAD 1
    INVOKESTATIC kotlin/ResultKt.throwOnFailure (Ljava/lang/Object;)V
   L3
    LINENUMBER 7 L3
    BIPUSH 88
    ISTORE 2
   L4
    LINENUMBER 8 L4
    BIPUSH 123
    ILOAD 2
    IADD
    INVOKESTATIC kotlin/coroutines/jvm/internal/Boxing.boxInt (I)Ljava/lang/Integer;
   L5
    ARETURN
   L2
    LINENUMBER 6 L2
    NEW java/lang/IllegalStateException
    DUP
    LDC "call to 'resume' before 'invoke' with coroutine"
    INVOKESPECIAL java/lang/IllegalStateException.<init> (Ljava/lang/String;)V
    ATHROW
    LOCALVARIABLE x I L4 L5 2
    LOCALVARIABLE this Lcom/pengmj/kotlincoroutine/SampleKt$main$continuation$1; L3 L2 0
    LOCALVARIABLE $result Ljava/lang/Object; L3 L2 1
    MAXSTACK = 3
    MAXLOCALS = 4

  @Lkotlin/coroutines/jvm/internal/DebugMetadata;(f="sample.kt", l={}, i={}, s={}, n={}, m="invokeSuspend", c="com.pengmj.kotlincoroutine.SampleKt$main$continuation$1")

  // access flags 0x0
  <init>(Lkotlin/coroutines/Continuation;)V
    ALOAD 0
    ICONST_1
    ALOAD 1
    INVOKESPECIAL kotlin/coroutines/jvm/internal/SuspendLambda.<init> (ILkotlin/coroutines/Continuation;)V
    RETURN
    MAXSTACK = 3
    MAXLOCALS = 2

  // access flags 0x0
  I label

  // access flags 0x11
  // signature (Lkotlin/coroutines/Continuation<*>;)Lkotlin/coroutines/Continuation<Lkotlin/Unit;>;
  // declaration: kotlin.coroutines.Continuation<kotlin.Unit> create(kotlin.coroutines.Continuation<?>)
  public final create(Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
  @Lorg/jetbrains/annotations/NotNull;() // invisible
    // annotable parameter count: 1 (visible)
    // annotable parameter count: 1 (invisible)
    @Lorg/jetbrains/annotations/NotNull;() // invisible, parameter 0
   L0
    ALOAD 1
    LDC "completion"
    INVOKESTATIC kotlin/jvm/internal/Intrinsics.checkNotNullParameter (Ljava/lang/Object;Ljava/lang/String;)V
    NEW com/pengmj/kotlincoroutine/SampleKt$main$continuation$1
    DUP
    ALOAD 1
    INVOKESPECIAL com/pengmj/kotlincoroutine/SampleKt$main$continuation$1.<init> (Lkotlin/coroutines/Continuation;)V
    ASTORE 2
    ALOAD 2
    ARETURN
   L1
    LOCALVARIABLE this Lkotlin/coroutines/jvm/internal/BaseContinuationImpl; L0 L1 0
    LOCALVARIABLE completion Lkotlin/coroutines/Continuation; L0 L1 1
    MAXSTACK = 3
    MAXLOCALS = 3

  // access flags 0x11
  public final invoke(Ljava/lang/Object;)Ljava/lang/Object;
    ALOAD 0
    ALOAD 1
    CHECKCAST kotlin/coroutines/Continuation
    INVOKEVIRTUAL com/pengmj/kotlincoroutine/SampleKt$main$continuation$1.create (Lkotlin/coroutines/Continuation;)Lkotlin/coroutines/Continuation;
    CHECKCAST com/pengmj/kotlincoroutine/SampleKt$main$continuation$1
    GETSTATIC kotlin/Unit.INSTANCE : Lkotlin/Unit;
    INVOKEVIRTUAL com/pengmj/kotlincoroutine/SampleKt$main$continuation$1.invokeSuspend (Ljava/lang/Object;)Ljava/lang/Object;
    ARETURN
    MAXSTACK = 2
    MAXLOCALS = 2

  @Lkotlin/Metadata;(mv={1, 9, 0}, k=3, d1={"\u0000\n\n\u0000\n\u0002\u0010\u0008\n\u0002\u0008\u0002\u0010\u0000\u001a\u00020\u0001H\u008a@\u00a2\u0006\u0004\u0008\u0002\u0010\u0003"}, d2={"<anonymous>", "", "invoke", "(Ljava/lang/Object;)Ljava/lang/Object;"})
  OUTERCLASS com/pengmj/kotlincoroutine/SampleKt main ()V
  // access flags 0x18
  final static INNERCLASS com/pengmj/kotlincoroutine/SampleKt$main$continuation$1 null null
  // compiled from: sample.kt
}


// ================com/pengmj/kotlincoroutine/SampleKt$main$continuation$2.class =================
// class version 52.0 (52)
// access flags 0x31
// signature Ljava/lang/Object;Lkotlin/coroutines/Continuation<Ljava/lang/Integer;>;
// declaration: com/pengmj/kotlincoroutine/SampleKt$main$continuation$2 implements kotlin.coroutines.Continuation<java.lang.Integer>
public final class com/pengmj/kotlincoroutine/SampleKt$main$continuation$2 implements kotlin/coroutines/Continuation {

  OUTERCLASS com/pengmj/kotlincoroutine/SampleKt main ()V

  // access flags 0x1
  public getContext()Lkotlin/coroutines/CoroutineContext;
  @Lorg/jetbrains/annotations/NotNull;() // invisible
   L0
    LINENUMBER 11 L0
    GETSTATIC kotlin/coroutines/EmptyCoroutineContext.INSTANCE : Lkotlin/coroutines/EmptyCoroutineContext;
    CHECKCAST kotlin/coroutines/CoroutineContext
    ARETURN
   L1
    LOCALVARIABLE this Lcom/pengmj/kotlincoroutine/SampleKt$main$continuation$2; L0 L1 0
    MAXSTACK = 1
    MAXLOCALS = 1

  // access flags 0x1
  public resumeWith(Ljava/lang/Object;)V
    // annotable parameter count: 1 (visible)
    // annotable parameter count: 1 (invisible)
    @Lorg/jetbrains/annotations/NotNull;() // invisible, parameter 0
   L0
    LINENUMBER 14 L0
    ALOAD 1
    INVOKESTATIC kotlin/Result.toString-impl (Ljava/lang/Object;)Ljava/lang/String;
    ASTORE 2
   L1
    GETSTATIC java/lang/System.out : Ljava/io/PrintStream;
    ALOAD 2
    INVOKEVIRTUAL java/io/PrintStream.println (Ljava/lang/Object;)V
   L2
   L3
    LINENUMBER 15 L3
    RETURN
   L4
    LOCALVARIABLE this Lcom/pengmj/kotlincoroutine/SampleKt$main$continuation$2; L0 L4 0
    LOCALVARIABLE result Ljava/lang/Object; L0 L4 1
    MAXSTACK = 2
    MAXLOCALS = 3

  // access flags 0x0
  <init>()V
   L0
    LINENUMBER 9 L0
    ALOAD 0
    INVOKESPECIAL java/lang/Object.<init> ()V
    RETURN
   L1
    LOCALVARIABLE this Lcom/pengmj/kotlincoroutine/SampleKt$main$continuation$2; L0 L1 0
    MAXSTACK = 1
    MAXLOCALS = 1

  @Lkotlin/Metadata;(mv={1, 9, 0}, k=1, d1={"\u0000%\n\u0000\n\u0002\u0018\u0002\n\u0002\u0010\u0008\n\u0000\n\u0002\u0018\u0002\n\u0002\u0008\u0003\n\u0002\u0010\u0002\n\u0000\n\u0002\u0018\u0002\n\u0002\u0008\u0002*\u0001\u0000\u0008\n\u0018\u00002\u0008\u0012\u0004\u0012\u00020\u00020\u0001J\u001e\u0010\u0007\u001a\u00020\u00082\u000c\u0010\u0009\u001a\u0008\u0012\u0004\u0012\u00020\u00020\nH\u0016\u00f8\u0001\u0000\u00a2\u0006\u0002\u0010\u000bR\u0014\u0010\u0003\u001a\u00020\u00048VX\u0096\u0004\u00a2\u0006\u0006\u001a\u0004\u0008\u0005\u0010\u0006\u0082\u0002\u0004\n\u0002\u0008\u0019\u00a8\u0006\u000c"}, d2={"com/pengmj/kotlincoroutine/SampleKt$main$continuation$2", "Lkotlin/coroutines/Continuation;", "", "context", "Lkotlin/coroutines/CoroutineContext;", "getContext", "()Lkotlin/coroutines/CoroutineContext;", "resumeWith", "", "result", "Lkotlin/Result;", "(Ljava/lang/Object;)V", "Kotlin"})
  // access flags 0x19
  public final static INNERCLASS com/pengmj/kotlincoroutine/SampleKt$main$continuation$2 null null
  // compiled from: sample.kt
}


// ================META-INF/Kotlin.kotlin_module =================
    
&
com.pengmj.kotlincoroutineSampleKt"*

在上面一大片字节码中,我们着重看这几行:

1
2
3
4
5
6
7
NEW com/pengmj/kotlincoroutine/SampleKt$main$continuation$1

NEW com/pengmj/kotlincoroutine/SampleKt$main$continuation$2

final class com/pengmj/kotlincoroutine/SampleKt$main$continuation$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function1 {

}

由上面字节码可知:

编译器帮我们创建了2个匿名内部类SampleKt$main$continuation$1SampleKt$main$continuation$2,它们的类名是<FileName>Kt$<FunctionName>$continuation$1这样的形式,其中<FileName>和<FunctionName>指代的是代码所在的文件名和函数名。

这里会有个疑问:这些匿名内部类是哪来的?

第一个匿名内部类SampleKt$main$continuation$1,它就是我们的协程体,那个用以创建协程的suspend方法传入的Lambda表达式,即suspend ()->R,也对应了前面说过的埋下伏笔2;

并且SampleKt$main$continuation$1继承了抽象类SuspendLambda,还实现了Function1接口,继续追踪SuspendLambda时,发现它间接实现了Continuation接口,与先前提到的ContinuationImplBaseContinuationImpl存在继承关系,如下图所示:

第二个匿名内部类SampleKt$main$continuation$2,从它的字节码中可以看出它有getContext方法、resumeWith方法以及无参构造方法,可以猜到它就是createCoroutine方法传入的对象表达式,学过Kotlin基础的应该明白,对象表达式其实是增强版的匿名内部类。

到目前为止还看不出什么,还需要把字节码再转译一下,点击Kotlin ByteCode面板上的Decompile按钮,转译后的源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Metadata(
   mv = {1, 9, 0},
   k = 2,
   d1 = {"\u0000\b\n\u0000\n\u0002\u0010\u0002\n\u0000\u001a\u0006\u0010\u0000\u001a\u00020\u0001¨\u0006\u0002"},
   d2 = {"main", "", "Kotlin"}
)
public final class SampleKt {
   public static final void main() {
      Function1 var1 = (Function1)(new Function1((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object var1) {
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
               case 0:
                  ResultKt.throwOnFailure(var1);
                  int x = 88;
                  return Boxing.boxInt(123 + x);
               default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
         }

         @NotNull
         public final Continuation create(@NotNull Continuation completion) {    // ②
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function1 var2 = new <anonymous constructor>(completion);
            return var2;
         }

         public final Object invoke(Object var1) {
            return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
         }
      });
      Continuation continuation = ContinuationKt.createCoroutine(var1, (Continuation)(new Continuation() {     // ①
         @NotNull
         public CoroutineContext getContext() {
            return (CoroutineContext)EmptyCoroutineContext.INSTANCE;
         }

         public void resumeWith(@NotNull Object result) {
            String var2 = Result.toString-impl(result);
            System.out.println(var2);
         }
      }));
      Unit var2 = Unit.INSTANCE;
      Result.Companion var10001 = Result.Companion;
      continuation.resumeWith(Result.constructor-impl(var2));
   }

   // $FF: synthetic method
   public static void main(String[] var0) {
      main();
   }
}

这里解释下上面的标号地方:

标号①、调用了ContinuationKtcreateCoroutine方法,第一个参数传入var1变量,而var1变量的类型是Function1,对应的Koltin实际代码为:

1
2
3
4
public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

前面提过埋下伏笔1,suspend ()->T会作为createCoroutine方法的第一个参数传入,此时可以推断出,这里的var1就是第一个匿名内部类SampleKt$main$continuation$1的实例;createCoroutine方法第二个参数则是completion回调,也是前面说的第二个匿名内部类SampleKt$main$continuation$2

标号②、还记得前面说的createCoroutineUnintercepted方法和intercepted方法的具体实现吗?

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)       
    return if (this is BaseContinuationImpl)
        create(probeCompletion)          // ①                        
    else
        createCoroutineFromSuspendFunction(probeCompletion) {     
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        }
}

因为SampleKt$main$continuation$1BaseContinuationImpl的子类,所以会执行上面①处的create方法,此处调用的是BaseContinuationImpl类的create方法,但是BaseContinuationImpl类中没有具体实现create方法,所以由子类SampleKt$main$continuation$1来实现,如下代码所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
public final class SampleKt {
   public static final void main() {
      Function1 var1 = (Function1)(new Function1((Continuation)null) {

         @NotNull
         public final Continuation create(@NotNull Continuation completion) {    // ②
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function1 var2 = new <anonymous constructor>(completion);
            return var2;
         }

      });
}

发现在create方法中又创建了一个匿名内部类对象var2,类型为Function1,那么可以知道createCoroutineUnintercepted方法返回的就是新的SampleKt$main$continuation$1对象。

有一个疑问:var2和外部new Function1创建的匿名内部类对象var1有什么区别和关联呢?

先声明下,从Kotlin代码反编译而来的Java代码可能会出现不符合Java代码执行逻辑的情况。外部创建的匿名内部类对象var1主要是用来帮助启动协程,因此completion传入为null;而create方法中创建的匿名内部类对象var2主要是用来管理协程的状态。

如果你看到本文的后续内容后,可能会回过头来问,为什么var2可以调用var1中的invokeSuspend方法,它们不是两个不同的对象吗?

是的,没错,它们的确是两个不同的对象,但是因为反编译后的源码不一定能准确的表达完整Java代码执行逻辑,这里我再手动转译一下,应该就能看明白了:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public final class SampleKt {

  final class SampleKt$main$continuation$1 extends SuspendLambda implements Function1 {

    public SampleKt$main$continuation$1(Continuation completion){
        super(completion)
    }

    int label;                                                                                           

    @Nullable                                                                                            
    public final Object invokeSuspend(@NotNull Object var1) {                                            
       Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();                                              
       switch (this.label) {                                                                             
          case 0:                                                                                        
             ResultKt.throwOnFailure(var1);                                                              
             int x = 88;                                                                                 
             return Boxing.boxInt(123 + x);                                                              
          default:                                                                                       
             throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");         
       }                                                                                                 
    }                                                                                                    

    @NotNull                                                                                             
    public final Continuation create(@NotNull Continuation completion) {                                 
       Intrinsics.checkNotNullParameter(completion, "completion");                                       
       Function1 var2 = new SampleKt$main$continuation$1(completion);                                         
       return var2;                                                                                      
    }                                                                                                    

    public final Object invoke(Object var1) {                                                            
       return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);           
    }                                                                                                             
  }

   public static final void main() {
      SampleKt$main$continuation$1 var1 = new SampleKt$main$continuation$1(null);
      Continuation continuation = ContinuationKt.createCoroutine(var1, (Continuation)(new Continuation() {
         @NotNull
         public CoroutineContext getContext() {
            return (CoroutineContext)EmptyCoroutineContext.INSTANCE;
         }

         public void resumeWith(@NotNull Object result) {
            String var2 = Result.toString-impl(result);
            System.out.println(var2);
         }
      }));
      Unit var2 = Unit.INSTANCE;
      Result.Companion var10001 = Result.Companion;
      continuation.resumeWith(Result.constructor-impl(var2));      
   }
}  

还记得我们最初的那个任务吗?就是找到delegate变量的真身,我们已经知道delegate变量就是这么长的一串东西:

1
createCoroutineUnintercepted(completion).intercepted()

createCoroutineUnintercepted方法已经分析了,返回的就是新的SampleKt$main$continuation$1对象,那么继续看intercepted方法:

1
2
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this 

很明显,SampleKt$main$continuation$1也是ContinuationImpl的子类,因而这里调用了ContinuationImplintercepted方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }  
}

上面代码说明如果设置了协程的拦截器,那么就从上下文中去找,如果上下文中也没有,则用自身SampleKt$main$continuation$1,最后赋值。很显然,本例是没有设置协程的拦截器,那么intercepted方法返回的就是SampleKt$main$continuation$1,所以delegate变量的真身就是SampleKt$main$continuation$1

接下来看看delegate变量调用了resumeWith方法的流程,因为SampleKt$main$continuation$1中没有resumeWith方法,再根据继承关系,我们看BaseContinuationImpl类的resumeWith方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
internal abstract class BaseContinuationImpl(
    // This is `public val` so that it is private on JVM and cannot be modified by untrusted code, yet
    // it has a public getter (since even untrusted code is allowed to inspect its call stack).
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {

    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

    protected abstract fun invokeSuspend(result: Result<Any?>): Any?

}

上面代码的重点是调用了invokeSuspend方法,这是个抽象方法,由子类SampleKt$main$continuation$1实现:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Metadata(
   mv = {1, 9, 0},
   k = 2,
   d1 = {"\u0000\b\n\u0000\n\u0002\u0010\u0002\n\u0000\u001a\u0006\u0010\u0000\u001a\u00020\u0001¨\u0006\u0002"},
   d2 = {"main", "", "Kotlin"}
)
public final class SampleKt {
   public static final void main() {
      Function1 var1 = (Function1)(new Function1((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object var1) {
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch (this.label) {
               case 0:
                  ResultKt.throwOnFailure(var1);
                  int x = 88;
                  return Boxing.boxInt(123 + x);
               default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
         }
      }
   } 
}

这里invokeSuspend方法通过状态机来执行协程体中的逻辑,最终返回处理结果Boxing.boxInt(123 + x),很显然,invokeSuspend方法其实就是suspend {}闭包中的处理逻辑。

回到BaseContinuationImpl类的resumeWith方法,看下这2行代码:

1
2
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return

如果invokeSuspend方法返回值是挂起状态COROUTINE_SUSPENDED,则resumeWith方法直接退出,否则就通过调用completion.resumeWith(outcome)传递给外部。

至此,协程启动流程就讲解完成了。