创建协程环境
- runBlocking 阻塞当前线程直到协程执行结束
- launch 不会阻塞当前线程,返回值类型为Job,拿不到协程体的返回值
- async 不会阻塞当前线程,返回值为Deferred,调用await()可以拿到协程体的返回值
协程启动模式
- CoroutineStart.DEFAULT 立即进入待调度状态,一旦调度器 OK 就可以开始执行
- CoroutineStart.LAZY 不会进入执行状态,直到我们需要它执行的时候。比如:Job.start、Job.join、async.await()
- CoroutineStart.ATOMIC 和DEFAULT类似,且在第一个挂起点前不能被取消,只有涉及 cancel 的时候才有意义
- CoroutineStart.UNDISPATCHED 协程在这种模式下会直接开始在当前线程下执行,直到第一个挂起点,这听起来有点儿像前面的 ATOMIC,不同之处在于 UNDISPATCHED 不经过任何调度器即开始执行协程体。当然遇到挂起点之后的执行就取决于挂起点本身的逻辑以及上下文当中的调度器了。
原理
定义一个suspend函数
// 一、空函数
suspend fun test() {
}
// 反编译结果:自动添加Continuation参数,返回值改成COROUTINE_SUSPENDED,代表挂起
public final Object test(@NotNull Continuation $completion) {
// 返回值为COROUTINE_SUSPENDED
return Unit.INSTANCE;
}
// 二、带返回值的函数
suspend fun test(): String {
return "HelloWorld"
}
// 反编译结果,和上面的区别就是,返回值不是COROUTINE_SUSPENDED了,代表这个函数虽然标记为suspend但是不需要挂起
public final Object test(@NotNull Continuation $completion) {
return "HelloWorld";
}
// 三、带返回值的函数+调用其他挂起函数
suspend fun test(): String {
delay(1000) // delay是一个suspend挂起函数
return "HelloWorld"
}
// 反编译结果:用状态机实现程序的挂起和恢复,label标签控制程序的执行流程
public final Object test(@NotNull Continuation var1) {
Object $continuation;
label20: {
if (var1 instanceof <undefinedtype>) {
$continuation = (<undefinedtype>)var1;
if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
break label20;
}
}
$continuation = new ContinuationImpl(var1) {
// $FF: synthetic field
Object result;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
this.result = $result;
this.label |= Integer.MIN_VALUE;
return MainActivity.this.test(this);
}
};
}
Object $result = ((<undefinedtype>)$continuation).result;
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (((<undefinedtype>)$continuation).label) {
case 0:
ResultKt.throwOnFailure($result);
((<undefinedtype>)$continuation).label = 1;
if (DelayKt.delay(1000L, (Continuation)$continuation) == var4) {
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
return "HelloWorld";
}
我们在调用runBlocking或launch函数的时候,那个大括号其实就是一个suspend{}
函数
先来看一个最简单的例子
// 创建一个协程环境
GlobalScope.launch {
}
原始的launch函数只有3个参数
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
)
反编译后的launch函数拥有6个参数,多出来的3个参数是编译器自动帮我们加的
BuildersKt.launch$default(
(CoroutineScope)GlobalScope.INSTANCE,
(CoroutineContext)null,
(CoroutineStart)null,
(Function2)(new Function2((Continuation)null) {
...
}),
3,
(Object)null);
以下是所有参数的含义:
- (CoroutineScope)GlobalScope.INSTANCE:这是第一个参数,它表示协程的作用域(scope)。GlobalScope.INSTANCE 是 GlobalScope 类的单例实例,表示全局的协程作用域。
- (CoroutineContext)null:这是第二个参数,它表示协程的上下文(context)。在这里,将上下文设置为 null,表示使用默认的上下文配置。
- (CoroutineStart)null:这是第三个参数,它表示协程的启动选项。在这里,将启动选项设置为 null,表示使用默认的启动选项。
- (Function2):这是第四个参数,它是一个函数对象,用于协程的执行逻辑。在这里,使用了匿名内部类的方式创建了一个 Function2 对象,该对象具有 invokeSuspend 和 create 方法,用于协程的执行和创建。
- 3:这是第五个参数,它表示协程的调度器(dispatcher)。在这里,将调度器设置为 3,表示使用默认的调度器。
- (Object)null:这是第六个参数,它表示协程的初始值。在这里,将初始值设置为 null。
重点关注第四个参数,协程体,它是一个SuspendLambda对象,继承自ContinuationImpl类->BaseContinuationImpl类->Continuation接口。协程的本质是状态机,用label控制着程序的挂起和恢复,每个协程体只维护自己的状态。ContinuationImpl负责拦截Continuation和释放Continuation,BaseContinuationImpl主要实现了resumeWith方法,resumeWith中调用invokeSuspend来恢复函数的执行。
那么,协程的代码运行流程到底是怎么样的呢?
为了分析这个问题,完善一下最开始写的代码,并查看反编译的结果
GlobalScope.launch {
println("开始")
delay(1000)
println("结束")
}
反编译后:
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) { //$result是上个挂起函数的返回值
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
String var2;
switch (this.label) {
case 0:
ResultKt.throwOnFailure($result);
var2 = "开始";
System.out.println(var2);
this.label = 1;
// this是SuspendLambda对象,也是Continuation对象,把this传递给delay函数,这样当delay执行完后,可以恢复运行
if (DelayKt.delay(1000L, this) == var3) {
// 遇到return,程序挂起。当再次调用invokeSuspend时,会执行 case=1 逻辑
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var2 = "结束";
System.out.println(var2);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
// 这里的var3应该是SuspendLambda对象,或者可以说是Continuation对象
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
再来看一下launch函数的源码实现
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
... (省略其他代码)
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
}
public enum class CoroutineStart {
... (省略其他代码)
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
}
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
// 调用create方法创建Continuation实例,然后添加拦截器,然后执行resumeWith(),resumeWith方法在SuspendLambda中已经实现了
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
调度器
- Dispatchers.DEFAULT 子线程,CPU密集型
- Dispatchers.IO 子线程,IO密集型
- Dispatchers.Main post到UI主线程,加入到handler队列
- Dispatchers.Main.immediate 如果当前正在主线程,直接执行,不用加入到handler队列
- Dispatchers.Unconfined 它不会改变任何的线程。 当它启动时,它在启动它的线程上运行;如果它被恢复,它将在恢复它的线程上运行。
调度器实现原理
Main调度器
Main的调度方法就是将所有的任务通过handler.post()抛到了主线程执行。涉及到HandlerContext类
DEFAULT和IO调度器
基于线程池
Unconfined
Unconfined实现十分简单,isDispatchNeeded()返回值改成false。