網站首頁 編程語言 正文
一、協程是如何創建的
launch、async 可以創建、啟動新的協程,那么協程到底是如何創建的?
runBlocking {
println(Thread.currentThread().name)
launch {
println(Thread.currentThread().name)
delay(100L)
}
Thread.sleep(1000L)
}
Log
main @coroutine#1
main @coroutine#2
Process finished with exit code 0
runBlocking{} 啟動了第一個協程,launch{} 啟動了第二個協程。
協程啟動的基礎API
public fun <T> (suspend () -> T).createCoroutine(
completion: Continuation<T>
): Continuation<Unit> =
SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
createCoroutine{}、startCoroutine{}就是 Kotlin 協程當中最基礎的兩個創建協程的 API。啟動協程有三種常見的方式:launch、runBlocking、async。它們其實屬于協程中間層提供的 API,而它們的底層都調用了“基礎層”的協程 API。
createCoroutine{}、startCoroutine{}是擴展函數,其擴展接收者類型是一個函數類型:suspend () -> T,代表了“無參數,返回值為 T 的掛起函數或者 Lambda”。而對于函數本身,它們兩個都接收一個 Continuation<T> 類型的參數,其中一個函數,還會返回一個 Continuation<Unit> 類型的返回值。
val block = suspend {
println("Hello")
delay(1000L)
println("World!")
"Result"
}
fun testLaunch2() {
val continuation = object : Continuation<String> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<String>) {
println("Result:" + result.getOrNull())
}
}
block.startCoroutine(continuation)
}
fun main() {
testLaunch2()
Thread.sleep(2000L)
}
Log
Hello
World!
Result:Result
Process finished with exit code 0
類型為suspend () -> T的函數或者Lambda 表達式可以用 block.startCoroutine() 來啟動協程了。
Continuation 有兩種用法,一種是在實現掛起函數的時候,用于傳遞掛起函數的執行結果;另一種是在調用掛起函數的時候,以匿名內部類的方式,用于接收掛起函數的執行結果。
使用 createCoroutine() 這個方法其實上面代碼的邏輯:
fun testLaunch3() {
val continuation = object : Continuation<String> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<String>) {
println("Result:" + result.getOrNull())
}
}
val coroutinue = block.createCoroutine(continuation)
coroutinue.resume(Unit)
}
val block = suspend {
println("Hello")
delay(1000L)
println("World!")
"Result"
}
fun main() {
testLaunch3()
Thread.sleep(2000L)
}
Log
Hello
World!
Result:Result
Process finished with exit code 0
createCoroutine() 創建一個協程,先不啟動。調用 resume() 才能啟動。createCoroutine()、startCoroutine() 的源代碼差別也并不大,只是前者沒有調用 resume(),而后者調用了 resume()。startCoroutine() 之所以可以創建并同時啟動協程的原因就在于,它在源碼中直接調用了 resume(Unit)。
將 startCoroutine()轉換為Java:
package com.example.myapplication.testcoroutinue;
import kotlin.Metadata;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.ContinuationKt;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.EmptyCoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.DelayKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@Metadata(
mv = {1, 6, 0},
k = 2,
d1 = {"\u0000\u001e\n\u0000\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0010\u000e\n\u0002\u0010\u0000\n\u0002\b\u0004\n\u0002\u0010\u0002\n\u0002\b\u0002\u001a\u0006\u0010\b\u001a\u00020\t\u001a\u0006\u0010\n\u001a\u00020\t\",\u0010\u0000\u001a\u0018\b\u0001\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00030\u0002\u0012\u0006\u0012\u0004\u0018\u00010\u00040\u0001?\u0001\u0000¢\u0006\n\n\u0002\u0010\u0007\u001a\u0004\b\u0005\u0010\u0006\u0082\u0002\u0004\n\u0002\b\u0019¨\u0006\u000b"},
d2 = {"block", "Lkotlin/Function1;", "Lkotlin/coroutines/Continuation;", "", "", "getBlock", "()Lkotlin/jvm/functions/Function1;", "Lkotlin/jvm/functions/Function1;", "main", "", "testLaunch2", "My_Application.app.main"}
)
public final class TestCoroutinue888Kt {
// Kotlin 為 block 變量生成的靜態變量
@NotNull
private static final Function1 block;
public static final void main() {
testLaunch2();
Thread.sleep(2000L);
}
// $FF: synthetic method
public static void main(String[] var0) {
main();
}
// Kotlin 為 block 變量生成的靜態變量以及方法
@NotNull
public static final Function1 getBlock() {
return block;
}
public static final void testLaunch2() {
//continuation 變量對應的匿名內部類
<undefinedtype> continuation = new Continuation() {
@NotNull
public CoroutineContext getContext() {
return (CoroutineContext)EmptyCoroutineContext.INSTANCE;
}
public void resumeWith(@NotNull Object result) {
String var2 = "Result:" + (String)(Result.isFailure-impl(result) ? null : result);
System.out.println(var2);
}
};
//block.startCoroutine(continuation) 轉換成了ContinuationKt.startCoroutine(block, (Continuation)continuation)
ContinuationKt.startCoroutine(block, (Continuation)continuation);
}
static {
//實現了 Continuation 接口
Function1 var0 = (Function1)(new Function1((Continuation)null) {
int label;
//invokeSuspend()為協程狀態機邏輯
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
String var2;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
var2 = "Hello";
System.out.println(var2);
this.label = 1;
if (DelayKt.delay(1000L, this) == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var2 = "World!";
System.out.println(var2);
return "Result";
}
@NotNull
public final Continuation create(@NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}
public final Object invoke(Object var1) {
return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
}
});
block = var0;
}
}
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
在 startCoroutine() 當中,首先會調用 createCoroutineUnintercepted() 方法。
public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit>
代碼中的 expect,一種聲明,由于 Kotlin 是面向多個平臺的,具體的實現,就需要在特定的平臺實現。
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
actual,代表了 createCoroutineUnintercepted() 在 JVM 平臺的實現。
createCoroutineUnintercepted() 是一個擴展函數,this代表了 block 變量。(this is BaseContinuationImpl) 條件為ture,就會調用 create(probeCompletion)。
public open fun create(completion: Continuation<*>): Continuation<Unit> {
throw UnsupportedOperationException("create(Continuation) has not been overridden")
}
在默認情況下,這個 create() 方法是會拋出異常的。
@NotNull
public final Continuation create(@NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function1 var2 = new <anonymous constructor>(completion);
return var2;
}
返回了Continuation 對象。
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
intercepted() 在JVM 實現如下:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
將 Continuation 強轉成了 ContinuationImpl,調用了它的 intercepted()。
ContinuationImpl 的源代碼:
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
}
通過 ContinuationInterceptor,對 Continuation 進行攔截,從而將程序的執行邏輯派發到特定的線程之上。
resume(Unit):
public fun <T> (suspend () -> T).startCoroutine(
completion: Continuation<T>
) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
resume(Unit),作用其實就相當于啟動了協程。
二、launch 是如何啟動協程的
fun main() {
testLaunch11()
Thread.sleep(2000L)
}
fun testLaunch11() {
val coroutineScope = CoroutineScope(Job())
coroutineScope.launch {
println("Hello")
delay(1000L)
println("World!")
}
}
Log
Hello
World!
Process finished with exit code 0
轉Java
package com.example.myapplication.testcoroutinue;
import kotlin.Metadata;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.DelayKt;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.JobKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@Metadata(
mv = {1, 6, 0},
k = 2,
d1 = {"\u0000\n\n\u0000\n\u0002\u0010\u0002\n\u0002\b\u0002\u001a\u0006\u0010\u0000\u001a\u00020\u0001\u001a\u0006\u0010\u0002\u001a\u00020\u0001¨\u0006\u0003"},
d2 = {"main", "", "testLaunch11", "My_Application.app.main"}
)
public final class TestCoroutinue999Kt {
public static final void main() {
testLaunch11();
Thread.sleep(2000L);
}
// $FF: synthetic method
public static void main(String[] var0) {
main();
}
public static final void testLaunch11() {
CoroutineScope coroutineScope = CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null));
//對應 launch 當中的 Lambda。
BuildersKt.launch$default(coroutineScope, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
String var2;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
var2 = "Hello";
System.out.println(var2);
this.label = 1;
if (DelayKt.delay(1000L, this) == var3) {
return var3;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var2 = "World!";
System.out.println(var2);
return Unit.INSTANCE;
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
}), 3, (Object)null);
}
}
launch源碼
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//launch 會根據傳入的 CoroutineContext 創建出新的 Context。
val newContext = newCoroutineContext(context)
//launch 會根據傳入的啟動模式來創建對應的協程對象。這里有兩種,一種是標準的,一種是懶加載的。
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//啟動協程。
coroutine.start(start, coroutine, block)
return coroutine
}
coroutine.start() :
public abstract class AbstractCoroutine<in T>(
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
start(block, receiver, this)
}
}
AbstractCoroutine.kt 對應協程的抽象邏輯。AbstractCoroutine 的start() 方法,用于啟動協程。
public enum class CoroutineStart {
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this) {
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
}
}
start(block, receiver, this),進入 CoroutineStart.invoke()。
invoke() 方法當中,根據 launch 傳入的啟動模式,以不同的方式啟動協程。當啟動模式是 ATOMIC 的時候,就會調用 block.startCoroutine(completion)。startCoroutineUndispatched(completion) 和 startCoroutineCancellable(completion),只是在 startCoroutine() 的基礎上增加了一些額外的功能而已。前者代表啟動協程以后就不會被分發,后者代表啟動以后可以響應取消。
startCoroutineCancellable(completion)
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))
}
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
startCoroutineCancellable() 的源代碼,會調用 createCoroutineUnintercepted(),然后調用 create(probeCompletion),然后最終會調用create() 方法。launch 這個 API,只是對協程的基礎元素 startCoroutine() 等方法進行了一些封裝而已。
原文鏈接:https://blog.csdn.net/zhangying1994/article/details/127880236
相關推薦
- 2023-12-12 InetSocketAddress的使用
- 2022-12-06 關于matplotlib及相關cmap參數的取值方式_python
- 2022-04-25 利用Python寫個摸魚監控進程_python
- 2022-09-19 Redis緩存穿透/擊穿工具類的封裝_Redis
- 2022-08-22 Android中關于Binder常見面試問題小結_Android
- 2022-11-20 CPython?垃圾收集器檢測循環引用詳解_python
- 2022-06-29 C#集合之自定義集合類_C#教程
- 2022-11-08 詳解Python中數據處理的方法總結及實現_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支