注册

少年,你可知 Kotlin 协程最初的样子?

如果有人问你,怎么开启一个 Kotlin 协程?你可能会说通过runBlocking/launch/async,回答没错,这几个函数都能开启协程。不过这次咱们换个角度分析,通过提取这几个函数的共性,看看他们内部是怎么开启一个协程的。
相信通过本篇,你将对协程原理有个深刻的认识。
文章目录:

1、suspend 关键字背后的原理
2、如何开启一个原始的协程?
3、协程调用以及整体流程
4、协程代码我为啥看不懂?

1、suspend 关键字背后的原理

suspend 修饰函数

普通的函数

fun launchEmpty(block: () -> Unit) {   
}

定义一个函数,形参为函数类型。
查看反编译结果:

public final class CoroutineRawKt {
public static final void launchEmpty(@NotNull Function0 block) {
}
}

可以看出,在JVM 平台函数类型参数最终是用匿名内部类表示的,而FunctionX(X=0~22) 是Kotlin 将函数类型映射为Java 的接口。
来看看Function0 的定义:

public interface Function0<out R> : Function<R> {
/** Invokes the function. */
public operator fun invoke(): R
}

有一个唯一的方法:invoke(),它没有任何参数。
可作如下调用:

fun launchEmpty(block: () -> Unit) {
block()//与block.invoke()等价
}
fun main(array: Array<String>) {
launchEmpty {
println("I am empty")
}
}

带suspend 的函数

以上写法大家都比较熟悉了,就是典型的高阶函数的定义和调用。
现在来改造一下函数类型的修饰符:

fun launchEmpty1(block: suspend () -> Unit) {
}

相较之前,加了"suspend"关键字。
老规矩,查看反编译结果:

public static final void launchEmpty1(@NotNull Function1 block) {
}

参数从Function0 变为了Function1:

/** A function that takes 1 argument. */
public interface Function1<in P1, out R> : Function<R> {
/** Invokes the function with the specified argument. */
public operator fun invoke(p1: P1): R
}

Function1 的invoke()函数多了一个入参。

也就是说,加了suspend 修饰后,函数会默认加个形参。

当我们调用suspend修饰的函数时:

image.png

意思是:

"suspend"修饰的函数只能在协程里被调用或者是在另一个被"suspend"修饰的函数里调用。

suspend 作用

何为挂起

suspend 意为挂起、阻塞的意思,与协程相关。
当suspend 修饰函数时,表明这个函数可能会被挂起,至于是否被挂起取决于该函数里是否有挂起动作。 比如:

suspend fun testSuspend() {
println("test suspend")
}

这样的写法没意义,因为函数没有实现挂起功能。
你可能会说,挂起需要切换线程,好嘛,换个写法:

suspend fun testSuspend() {
println("test suspend")
thread {
println("test suspend in thread")
}
}

然而并没啥用,编译器依然提示:

image.png

意思是可以不用suspend 修饰,没啥意义。

挂起于协程的意义

第一点
当函数被suspend 修饰时,表明协程执行到此可能会被挂起,若是被挂起那么意味着协程将无法再继续往下执行,直到条件满足恢复了协程的运行。

fun main(array: Array<String>) {
GlobalScope.launch {
println("before suspend")//①
testSuspend()//挂起函数②
println("after suspend")//③
}
}

执行到②时,协程被挂起,将不会执行③,直到协程被恢复后才会执行③。
注:关于协程挂起的生动理解&线程的挂起 下篇将着重分析。

第二点
如果将suspend 修饰的函数类型看做一个整体的话:

suspend () -> T

无参,返回值为泛型。
Kotlin 里定义了一些扩展函数,可用来开启协程。

第三点 suspend 修饰的函数类型,当调用者实现其函数体时,传入的实参将会继承自SuspendLambda(这块下个小结详细分析)。

2、如何开启一个原始的协程?

##launch/async/runBlocking 如何开启协程
纵观这几种主流的开启协程方式,它们最终都会调用到:

#CoroutineStart.kt
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}

无论走哪个分支,都是调用block的函数,而block 就是我们之前说的被suspend 修饰的函数。
以DEFAULT 为例startCoroutineUndispatched接下来会调用到IntrinsicsJvm.kt里的:

#IntrinsicsJvm.kt
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
)

该函数带了俩参数,其中的receiver 为接收者,而completion 为协程结束后调用的回调。
为了简单,我们可以省略掉receiver。
刚好IntrinsicsJvm.kt 里还有另一个函数:

#IntrinsicsJvm.kt
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit>

createCoroutineUnintercepted 为 (suspend () -> T) 类型的扩展函数,因此只要我们的变量为 (suspend () -> T)类型就可以调用createCoroutineUnintercepted(xx)函数。
查找该函数的使用之处,发现Continuation.kt 文件里不少扩展函数都调用了它。
如:

#Continuation.kt
//创建协程的函数
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

其中Continuation 为接口:

#Continuation.kt
interface Continuation<in T> {
//协程上下文
public val context: CoroutineContext
//恢复协程
public fun resumeWith(result: Result<T>)
}

Continuation 接口很重要,协程里大部分的类都实现了该接口,通常直译过来为:"续体"。

创建完成后,还需要开启协程函数:

#Continuation.kt
//启动协程的函数
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))

简单创建/调用协程

协程创建

由上分析可知,Continuation.kt 里有我们开启协程所需要的一些基本信息,接着来看看如何调用上述函数。

fun <T> launchFish(block: suspend () -> T) {
//创建协程,返回值为SafeContinuation(实现了Continuation 接口)
//入参为Continuation 类型,参数名为completion,顾名思义就是
//协程结束后(正常返回&抛出异常)将会调用它。
var coroutine = block.createCoroutine(object : Continuation<T> {
override val context: CoroutineContext
get() = EmptyCoroutineContext

//协程结束后调用该函数
override fun resumeWith(result: Result<T>) {
println("result:$result")
}
})
//开启协程
coroutine.resume(Unit)
}

定义了函数launchFish,该函数唯一的参数为函数类型参数,被suspend 修饰,而(suspend () -> T)定义一系列扩展函数,createCoroutine 为其中之一,因此block 可以调用createCoroutine。
createCoroutine 返回类型为SafeContinuation,通过SafeContinuation.resume()开启协程。

协程调用

fun main(array: Array<String>) {
launchFish {
println("I am coroutine")
}
}

打印结果:

image.png

3、协程调用以及整体流程

协程调用背后的玄机

反编译初窥门径

看到上面的打印大家可能比较晕,"println("I am coroutine")"是咋就被调用的?没看到有调用它的地方啊。
launchFish(block) 接收的是函数类型,当调用launchFish 时,在闭包里实现该函数的函数体即可,我们知道函数类型最终会替换为匿名内部类。
因为kotlin 有不少语法糖,无法一下子直击本质,老规矩,反编译看看结果:

    public static final void main(@NotNull String[] array) {
launchFish((Function1)(new Function1((Continuation)null) {
int label;

@Nullable
public final Object invokeSuspend(@NotNull Object var1) {
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
//闭包里的内容
String var2 = "I am coroutine";
boolean var3 = false;
//打印
System.out.println(var2);
return Unit.INSTANCE;
}
}

@NotNull
public final Continuation create(@NotNull Continuation completion) {
//创建一个Continuation,可以认为是续体
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}

public final Object invoke(Object var1) {
//Function1 接口里的方法
return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
}
}));
}

为了更直观,删除了一些不必要的信息。
看到这,你发现了什么?通常传入函数类型的实参最后将会被编译为对应的匿名内部类,此时应该编译为Function1, 实现其唯一的函数:invoke(xx),而我们发现实际上还多了两个函数:invokeSuspend(xx)与create(xx)
我们有理由相信,invokeSuspend(xx)函数一定在某个地方被调用了,原因是:闭包里打印的字符串:"I am coroutine" 只在该函数里实现,而我们测试的结果是这个打印执行了。
还记得我们上面说的suspend 意义的第三点吗?

suspend 修饰的函数类型,其实参是匿名内部类,继承自抽象类:SuspendLambda。

也就是说invokeSuspend(xx)与create(xx) 的定义很有可能来自SuspendLambda,我们接着来分析它。

SuspendLambda 关系链

#ContinuationImpl.kt
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
...
}

该类本身并没有太多内容,此处继承了ContinuationImpl类,查看该类也没啥特殊的,继续往上查找,找到BaseContinuationImpl类,在里面发现了线索:

#ContinuationImpl.kt
internal abstract class BaseContinuationImpl(
val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
open fun create(completion: Continuation<*>): Continuation<Unit> {
}
}

终于看到了眼熟的:invokeSuspend(xx)与create(xx)。
我们再回过头来捋一下类之间关系:

image.png

闭包生成的匿名内部类:

  • 实现了Function1 接口,并实现了该接口里的invoke函数。
  • 继承了SuspendLambda,并重写了invokeSuspend函数和create函数。

你可能会说还不够直观,那好,继续改写一下:

    class MyAnonymous extends SuspendLambda implements Function1 {
int label;
public final Object invokeSuspend(@NotNull Object var1) {
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
String var2 = "I am coroutine";
boolean var3 = false;
System.out.println(var2);
return Unit.INSTANCE;
}
}
public final Continuation create(@NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}
public final Object invoke(Object var1) {
return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
}
}

public static final void launchFish(@NotNull MyAnonymous block) {
Continuation coroutine = ContinuationKt.createCoroutine(block, (new Continuation() {
@NotNull
public CoroutineContext getContext() {
return (CoroutineContext) EmptyCoroutineContext.INSTANCE;
}

public void resumeWith(@NotNull Object result) {
String var2 = "result:" + Result.toString-impl(result);
boolean var3 = false;
System.out.println(var2);
}
}));
//开启
coroutine.resumeWith(Result.constructor-impl(var3));
}

public static final void main(@NotNull String[] array) {
MyAnonymous myAnonymous = new MyAnonymous();
launchFish(myAnonymous);
}

这么看就比较清晰了,此处我们单独声明了一个MyAnonymous类,并构造对象传递给launchFish函数。

闭包的执行

既然匿名类的构造清晰了,接下来分析闭包是如何被执行的,也就是查找invokeSuspend(xx)函数是怎么被调用的?
将目光转移到launchFish 函数本身。

createCoroutine()
先看createCoroutine()函数调用,直接上代码:

#Continuation.kt
fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit> =
//返回SafeContinuation 对象
//SafeContinuation 构造函数需要2个参数,一个是delegate,另一个是协程状态
//此处默认是挂起
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

#IntrinsicsJvm.kt
actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
//此处的this 即为匿名内部类对象 MyAnonymous,它间接继承了BaseContinuationImpl
//调用MyAnonymous 重写的create 函数
//create 函数里new 新的MyAnonymous 对象
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}

#IntrinsicsJvm.kt
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
//判断是否是ContinuationImpl 类型的Continuation
//我们的demo里是true,因此会继续尝试调用拦截器
(this as? ContinuationImpl)?.intercepted() ?: this

#ContinuationImpl.kt
public fun intercepted(): Continuation<Any?> =
//查看是否已经有拦截器,如果没有,则从上下文里找,上下文没有,则用自身,最后赋值。
//在我们的demo里上下文里没有,用的是自身
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }

最后得出的Continuation 赋值给SafeContinuation 的成员变量:delegate。
至此,SafeContinuation 对象已经构造完毕,接着继续看如何用它开启协程。

再看 resume()

#SafeContinuationJvm.kt
actual override fun resumeWith(result: Result<T>) {
while (true) { // lock-free loop
val cur = this.result // atomic read
when {
//初始化状态为UNDECIDED,因此直接return
cur === CoroutineSingletons.UNDECIDED -> if (SafeContinuation.RESULT.compareAndSet(this,
CoroutineSingletons.UNDECIDED, result.value)) return
//如果是挂起,将它变为恢复状态,并调用恢复函数
//demo 里初始化状态为COROUTINE_SUSPENDED,因此会走到这
cur === COROUTINE_SUSPENDED -> if (SafeContinuation.RESULT.compareAndSet(this, COROUTINE_SUSPENDED,
CoroutineSingletons.RESUMED)) {
//delegate 为之前创建的Continuation,demo 里因为没有拦截,因此为MyAnonymous
delegate.resumeWith(result)
return
}
else -> throw IllegalStateException("Already resumed")
}
}
}

#ContinuationImpl.kotlin
#BaseContinuationImpl类的成员函数
override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
//invokeSuspend 即为MyAnonymous 里的方法
val outcome = invokeSuspend(param)
//如果返回值是挂起状态,则函数直接退出
if (outcome === kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED) return
kotlin.Result.success(outcome)
} catch (exception: Throwable) {
kotlin.Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
//执行到这,最终执行外层的completion,在demo里会输出"result:$result"
completion.resumeWith(outcome)
return
}
}
}
}

最后再回头看 invokeSuspend

         public final Object invokeSuspend(@NotNull Object var1) {
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure(var1);
String var2 = "I am coroutine";
boolean var3 = false;
System.out.println(var2);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
}

你兴许已经发现了,此处的返回值永远是Unit.INSTANCE啊,那么协程永远不会挂起。
没有挂起功能的协程就是鸡肋...
没错,咱们的demo里实现的是一个无法挂起的协程,回到最初的launchFish()的调用:

    launchFish {
println("I am coroutine")
}
}

因为闭包里只有一个打印语句,根本没有挂起函数,当然就没有挂起的说法了。

协程调用整体流程

上面花很多篇幅去分析协程的调用,其实就是为了从kotlin 的简洁里脱离出来,从而真正了解其背后的原理。
Demo里的协程构造比较原始,相较于launch/async 等启动方式,它没有上下文、没有线程调度,但并不妨碍我们通过它去了解协程的运作。当我们了解了其运作的核心,到时候再去看launch/async/runBlocking 就非常容易了,毕竟它们都是提供给开发者更方便操作协程的工具,是在原始携程的基础上演变的。
协程创建调用栈简易图:

image.png

4、协程代码我为啥看不懂?

之前有一些小伙伴跟我反馈说:"小鱼人,我尝试去看协程源码,感觉找不到入口,又或是跟着源码跟到一半就断了... 你是咋阅读的啊?"
有一说一,协程源码确实不太好懂,若要比较顺畅读懂源码,根据个人经验可能需要以下前置条件:

1、kotlin 语法基础,这是必须的。
2、高阶函数&扩展函数。
3、平台代码差异,有一些类、函数是与平台相关,需要定位到具体平台,比如SafeContinuation,找到Java 平台的文件:SafeContinuationJvm.kt。
4、断点调试时,有些单步断点不会进入,需要指定运行到的位置。
5、有些代码是编译时期构造的,需要对照反编译结果查看。
6、还有些代码是没有源码的,可能是ASM插入的,此时只能靠肉眼理解了。

如果你对kotlin 基础/高阶函数 等有疑惑,请查看之前的文章。

本篇仅仅构造了一个简陋的协程,协程的最重要的挂起/恢复并没有涉及,下篇将会着重分析如何构造一个挂起函数,以及协程到底是怎么挂起的。

本文基于Kotlin 1.5.3,文中完整Demo请点击


作者:小鱼人爱编程
链接:https://juejin.cn/post/7109410972653060109
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册