注册

一文搞明白协程的挂起和恢复

协程是使用非阻塞式挂起的方式来实现线程运行的。那协程又是如何挂起和恢复的,这里面的概念又是什么,带着这些问题就让我们重新探究下协程的挂起和恢复。

我们先创建个协程:

override fun initView() {
lifecycleScope.launch {
val num = dealA()
dealB(num)
}
}

private suspend fun dealA():Int {
withContext(Dispatchers.IO) {
delay(3000)
}
return 1
}

private suspend fun dealB(num:Int) {
withContext(Dispatchers.IO) {
delay(1000)
}
}

可以看到写协程的时候要在函数前面加上suspend修饰,这也是常说的挂起函数,那挂起函数又是什么?

挂起函数

了解之前,我们先将上面的挂起函数dealA()反编译成 Java,简单的看看编译后是什么样的?(省略了后面会着重解释的一些代码,主要先看挂起函数的方法)

private final Object dealA(Continuation var1) {
......

Object $result = ((<undefinedtype>)$continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO();
Function2 var10001 = (Function2)(new Function2((Continuation)null) {
int label;

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
.......

return Unit.INSTANCE;
}
......
if (BuildersKt.withContext(var10000, var10001, (Continuation)$continuation) == var4) {
return var4;
}
break;
......
}

return Boxing.boxInt(1);
}

可以看到suspend经过反编译后,会出现Continuation类型的参数传进去,并且返回的是Object对象。

是不是对Continuation是什么很好奇,这也是协程的核心部分`:

public interface Continuation<in T> {
//对应于这个延续的协程的上下文
public val context: CoroutineContext

//继续执行相应的协程,传递一个成功或失败的 [result] 作为最后一个暂停点的返回值。
public fun resumeWith(result: Result<T>)
}

从定义上可以看出Continuation其实就是一个带有泛型参数的callback,而resumeWith也就相当于onSuccess的成功回调,来恢复执行后面的代码,除这个之外,还有一个ContineContext,它就是协程的上下文。

回到dealA方法中,当执行到withContext方法的时候,会返回CoroutineSingletons.COROUTINE_SUSPENDED,表示函数被挂起了,到这里你是不是觉得就结束了,其实还没有。

在查看过程中是不是看到有个invokeSuspend的回调方法还没有被调用,这又是在什么时候会被触发的?

那我们就从刚才执行到的withContext那里进一步查看,写过协程的都知道这就是用来切换线程:

public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.ensureActive()
// FAST PATH #1 -- 新上下文与旧上下文相同
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 新的调度程序与旧的调度程序相同
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// 上下文有变化,所以这个线程需要更新
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- 使用新的调度程序
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}

withContext方法中,传入了两个参数,一个是协程的上下文,另一个就是协程里的代码。可以看到不管新的调度和旧的调度一样最后都是会调用startCoroutineCancellable方法:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}

而在startCoroutineCancellable方法中,创建了Coroutination,之后会调用resumeCancelableWith方法:

public fun <T> Continuation<T>.resumeCancellableWith(
result: Result<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
}

在这里是不是看到了我们之前提到过的resumeWith方法,之前也解释了下它就相当于一个回调。然后我们再来看下它的具体实现,是在ContinuationImpl类中:

public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {

probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
val outcome = invokeSuspend(param)
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
.......
}
}
}

resumeWith方法中执行到了我们一直在找的invokeSuspend,通过这个方法将result回调了出去,并判断当前是不是COROUTINE_SUSPENDED(挂起),是挂起直接退出,去执行上面说到的invokeSuspend里面的内容。

在这里我们了解到invokeSuspend是由resumeWith所触发的,那接下来我们看看真正的挂起和恢复如何被执行的。

协程的启动

了解挂起和恢复的过程,要从协程的启动执行开始,我们还是跟刚才一样反编译启动协程的代码:

 BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
......
}

@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}

public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);

在协程启动的反编译代码我们又看到了ininvokeSuspend方法,这个方法又是在最下面创建了Continuation,之后在invoke中被调用,更多的信息是看不出来了。我们还是回到launch源码内部里面去寻找答案。

public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

//coroutine.start
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}

当查看到start这里的时候,你会发现跟进不下去了。那我们就换种方法,还是将这个类反编译下,你会看到变成了这样:

 public final void start(@NotNull CoroutineStart start, Object receiver, @NotNull Function2 block) {
Intrinsics.checkNotNullParameter(start, "start");
Intrinsics.checkNotNullParameter(block, "block");
start.invoke(block, receiver, (Continuation)this);
}

//使用此协程启动策略将带有接收器的相应块作为协程启动
@InternalCoroutinesApi
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}

在这里又看到了我们熟悉的startCoroutineCancellable,由于默认值为CoroutineStart.DEFAULT,所以该方法会被调用。后面会怎么调用,应该很清楚了,最后会一路调用到invokeSuspend方法,所以这时候就会执行到suspend{}代码块里面,协程启动!

协程的挂起

调用到invokeinvokeSuspend函数里面的代码的时候,我们单拎出出来看下:

//launch
public final Object invokeSuspend(@NotNull Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
ButtonTextActivity var4;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
var4 = ButtonTextActivity.this;
this.label = 1;
var10000 = var4.dealA(this);
if (var10000 == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2:
ResultKt.throwOnFailure($result);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

int num = ((Number)var10000).intValue();
var4 = ButtonTextActivity.this;
this.label = 2;
if (var4.dealB(num, this) == var3) {
return var3;
} else {
return Unit.INSTANCE;
}
}

这里涉及到了label状态机的分析,当label为0时,会调用case为0下面的代码。在里面label被设置为了1,又调用了var4.dealA(this)这个挂起函数,从前面挂起函数的分析知道其会返回COROUTINE_SUSPENDED标志,所以var10000也就会得到COROUTINE_SUSPENDED标志,此时会被判断相等,协程会被挂起。

协程的恢复

挂起后就要恢复了。在前面执行到的dealA方法中,在withContext的时候会触发dealA中的invokeSuspend方法。此时label被设置为1,所以会被调用到case为1的代码:

//dealA 
public final Object invokeSuspend(@NotNull Object $result) {
Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
this.label = 1;
if (DelayKt.delay(3000L, this) == var2) {
return var2;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}

return Unit.INSTANCE;
}

return Boxing.boxInt(1);

执行了ResultKt.throwOnFailure($result),最后返回int的值。同时launch中的invokeSuspend也被执行,上面已经将label设置为1,这里就会执行到case 1下的代码:

//launch  invokeSuspend
switch(this.label) {
......
case 1:
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
}

int num = ((Number)var10000).intValue();
var4 = ButtonTextActivity.this;
this.label = 2;
if (var4.dealB(num, this) == var3) {
return var3;
} else {
return Unit.INSTANCE;
}

对结果进行了失败处理,此时var10000也就是刚刚得到的int值,接着执行suspend剩余的代码,在下面将lable设置为了2,开始执行dealB的方法。

dealB方法中,跟之前分析的步骤一样,也会回到invokeSuspend中:

private final Object dealB(int num, Continuation $completion) {
Object var10000 = BuildersKt.withContext((CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
int label;

@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
......

return Unit.INSTANCE;
}

......
return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
}

最后当没有挂起函数的时候,会返回Unit.INSTANCE,结束协程执行。

小结

协程通过suspend来标识挂起点,但真正的挂起点还需要通过是否返回COROUTINE_SUSPENDED来判断,而代码体现是通过状态机来处理协程的挂起与恢复。

在挂起和恢复的过程中,当判断挂起函数到返回值是COROUTINE_SUSPENDED标志时,会挂起,在需要挂起的时候,状态机会把之前的结果以成员变量的方式保存在 continuation 中。在挂起函数恢复的时候,会调用Continuation的resumeWith方法,继而触发invokeSuspend。根据保存在Continuation中的label,进入不同的 分支恢复之前保存的状态,进入下一个状态。

在挂起的时候并不会阻塞当前的线程,是因为挂起是在invokeSuspend方法中return出去的,而invokeSuspend之外的函数当然还是会继续执行。


作者:罗恩不带土
链接:https://juejin.cn/post/7103311646591811598
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册