網站首頁 編程語言 正文
一.線程的橋接
1.runBlocking方法
runBlocking方法用于在線程中去執(zhí)行suspend方法,代碼如下:
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
// 通知編譯器,block只執(zhí)行一次
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
// 獲取當前線程
val currentThread = Thread.currentThread()
// 獲取上下文中的攔截器
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
// 如果攔截器為空,代表無法進行調度
if (contextInterceptor == null) {
// 從線程中獲取EventLoop,獲取失敗則創(chuàng)建一個新的
eventLoop = ThreadLocalEventLoop.eventLoop
// 添加到上下文中
// newContext = EmptyCoroutineContext + context + eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {// 如果有攔截器
// 嘗試將當前攔截器轉換成EventLoop,
// 如果轉換成功,則判斷是否允許可以在上下文中使用
// 如果轉換失敗或不允許,則創(chuàng)建一個新的
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
// 計算新的上下文
// 這里沒有把EventLoop加到上下文,因為加入后會覆蓋攔截器
newContext = GlobalScope.newCoroutineContext(context)
}
// 創(chuàng)建一個協程
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
// 啟動協程
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
// 分發(fā)任務
return coroutine.joinBlocking()
}
2.BlockingCoroutine類
在runBlocking方法中,最終創(chuàng)建了一個類型為BlockingCoroutine的對象。BlockingCoroutine類繼承自AbstractCoroutine類,代碼如下:
// 繼承了AbstractCoroutine
private class BlockingCoroutine<T>(
parentContext: CoroutineContext,
private val blockedThread: Thread,
private val eventLoop: EventLoop?
) : AbstractCoroutine<T>(parentContext, true) {
// 該協程是一個作用域協程
override val isScopedCoroutine: Boolean get() = true
override fun afterCompletion(state: Any?) {
// 如果當前線程不是阻塞線程
if (Thread.currentThread() != blockedThread)
// 喚醒阻塞線程
LockSupport.unpark(blockedThread)
}
@Suppress("UNCHECKED_CAST")
fun joinBlocking(): T {
registerTimeLoopThread()
try {
// 注冊使用EventLoop
eventLoop?.incrementUseCount()
try {
// 死循環(huán)
while (true) {
@Suppress("DEPRECATION")
// 如果線程當前中斷,則拋出異常,同時取消當前協程
if (Thread.interrupted()) throw InterruptedException().also { cancelCoroutine(it) }
// 分發(fā)執(zhí)行任務,同時獲取等待時間
val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
// 如果任務執(zhí)行結束,則退出循環(huán)
if (isCompleted) break
// 休眠指定的等待時間
parkNanos(this, parkNanos)
}
} finally { // paranoia
// 注冊不使用EventLoop
eventLoop?.decrementUseCount()
}
} finally { // paranoia
unregisterTimeLoopThread()
}
// 獲取執(zhí)行的結果
val state = this.state.unboxState()
// 如果執(zhí)行過程中取消,則拋出異常
(state as? CompletedExceptionally)?.let { throw it.cause }
// 返回結果
return state as T
}
}
BlockingCoroutine類重寫了變量isScopedCoroutine為true。
isScopedCoroutine表示當前協程是否為作用域協程,該變量用在cancelParent方法中。對于一個作用域協程,當它的子協程在運行過程中拋出異常時,子協程調用cancelParent方法不會導致作用域協程取消,而是直接返回true。當子協程執(zhí)行完畢,作用域協程獲取結果時,如果發(fā)現子協程返回的結果為異常,則會再次拋出。
相比于一般協程,作用域協程不相信子協程在執(zhí)行過程中取消通知,而是在執(zhí)行完畢后親自檢查結果是否為異常,達到一種“耳聽為虛,眼見為實”的效果。
joinBlocking方法通過循環(huán)在當前線程上對EventLoop進行任務分發(fā)來實現線程的阻塞。當任務發(fā)生異常或執(zhí)行完畢后,會回調重寫的afterCompletion方法,喚起線程繼續(xù)循環(huán),當在循環(huán)中檢測到isCompleted標志位為true時,會跳出循環(huán),恢復線程執(zhí)行。
二.線程的切換
1.withContext方法
withContext方法用于在協程中切換線程去執(zhí)行其他任務,該方法被suspend關鍵字修飾,因此會引起協程的掛起,代碼如下:
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
// 通知編譯器,block只執(zhí)行一次
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
// 直接掛起,獲取續(xù)體
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// 從續(xù)體中獲取上下文
val oldContext = uCont.context
// 計算新的上下文
val newContext = oldContext + context
// 檢查任務是否執(zhí)行完畢或取消
newContext.checkCompletion()
// 如果前后兩次的上下文完全相同,說明不需要切換,只需要執(zhí)行即可
if (newContext === oldContext) {
// 創(chuàng)建續(xù)體的協程
val coroutine = ScopeCoroutine(newContext, uCont)
// 執(zhí)行block
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// 攔截器相同,但是上下文中增加了其他的元素
// 這里也是在同一個線程上執(zhí)行,但是其中增加的元素只在執(zhí)行當前的block中使用
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
// 創(chuàng)建續(xù)體的協程
val coroutine = UndispatchedCoroutine(newContext, uCont)
// 將當前線程ThreadLocal中的對象更新成newContext上下文對應的對象
withCoroutineContext(newContext, null) {
// 執(zhí)行block
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// 走到這里,說明要切換線程執(zhí)行block任務
val coroutine = DispatchedCoroutine(newContext, uCont)
// 啟動父協程
coroutine.initParentJob()
// 啟動協程
block.startCoroutineCancellable(coroutine, coroutine)
// 獲取結果
coroutine.getResult()
}
}
通過對上面代碼的分析,可以發(fā)現withContext根據上下文的不同進行了三種分類,創(chuàng)建不同的協程并通過不同的方式去執(zhí)行block。如下表所示:
協程上下文變化 | 協程類型 | 啟動方式 |
---|---|---|
完全相同 | ScopeCoroutine | startUndispatchedOrReturn |
攔截器相同 | UndispatchedCoroutine | startUndispatchedOrReturn |
攔截器不同 | DispatchedCoroutine | startCoroutineCancellable |
接下來,將對不同情況下協程的啟動與執(zhí)行進行分析。
2.startUndispatchedOrReturn方法
startUndispatchedOrReturn方法用于在相同的上下文環(huán)境中啟動協程,代碼如下:
internal fun <T, R> ScopeCoroutine<T>.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? {
// 初始化并綁定父協程
initParentJob()
// 獲取并處理執(zhí)行結果
return undispatchedResult({ true }) {
// 啟動協程
block.startCoroutineUninterceptedOrReturn(receiver, this)
}
}
private inline fun <T> ScopeCoroutine<T>.undispatchedResult(
shouldThrow: (Throwable) -> Boolean,
startBlock: () -> Any?
): Any? {
// 啟動協程,獲取結果,
val result = try {
startBlock()
} catch (e: Throwable) {
// 產生異常,則按照取消處理
CompletedExceptionally(e)
}
// 如果結果為掛起,則通知外部掛起
if (result === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
// 結束任務執(zhí)行,獲取最終狀態(tài)
val state = makeCompletingOnce(result)
// 如果需要等待子協程的結束,則通知外部掛起
if (state === COMPLETING_WAITING_CHILDREN) return COROUTINE_SUSPENDED
// 如果執(zhí)最終為異常狀態(tài)
return if (state is CompletedExceptionally) {
when {
// 通過參數判斷是否拋出
shouldThrow(state.cause) -> throw recoverStackTrace(state.cause, uCont)
// 執(zhí)行結果為異常
result is CompletedExceptionally -> throw recoverStackTrace(result.cause, uCont)
// 結果不為異常,則返回
else -> result
}
} else {
// 對最終狀態(tài)進行拆箱,返回最終結果
state.unboxState()
}
}
// JobSupport中提供了下面的類和方法,當協程進入完成狀態(tài)時,會對狀態(tài)進行裝箱。
// 包裝類
private class IncompleteStateBox(@JvmField val state: Incomplete)
// 裝箱
internal fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
// 拆箱
internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
在startUndispatchedOrReturn方法中,通過調用block的startCoroutineUninterceptedOrReturn方法啟動協程,獲取最終結果,并對結果進行異常處理。
接下來,將分析startCoroutineUninterceptedOrReturn方法如何啟動協程,代碼如下:
@SinceKotlin("1.3")
@InlineOnly
public actual inline fun <R, T> (suspend R.() -> T).startCoroutineUninterceptedOrReturn(
receiver: R,
completion: Continuation<T>
): Any? = (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, completion)
這里,直接找到最終的actual方法,可以發(fā)現該方法沒有創(chuàng)建狀態(tài)機,而是直接執(zhí)行了block。這個方法被設計用在suspendCoroutineUninterceptedOrReturn方法中,來恢復掛起協程的執(zhí)行。
至此,可以知道startUndispatchedOrReturn方法實際上就是在同一個協程中執(zhí)行了block。
3.ScopeCoroutine類
在withContext方法中,當上下文相同時,會創(chuàng)建一個類型為ScopeCoroutine的對象。ScopeCoroutine類代表一個標準的作用域協程,代碼如下:
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T>
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
final override val callerFrame: CoroutineStackFrame? get() = uCont as CoroutineStackFrame?
final override fun getStackTraceElement(): StackTraceElement? = null
// 作用域協程
final override val isScopedCoroutine: Boolean get() = true
internal val parent: Job? get() = parentContext[Job]
// 該方法會在協程異常或取消時調用
override fun afterCompletion(state: Any?) {
// 進行攔截,切換線程,恢復執(zhí)行
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
// 該方法會在將其掛起的方法執(zhí)行完畢后回調
override fun afterResume(state: Any?) {
// 直接恢復續(xù)體的執(zhí)行
uCont.resumeWith(recoverResult(state, uCont))
}
}
ScopeCoroutine類重寫了afterCompletion和afterResume兩個方法,afterCompletion方法用于在協程取消時被回調。afterResume方法用于在掛起恢復時被回調。
根據上面的分析,當發(fā)生異常時,afterCompletion方法可能在其他的協程上下文中被調用,因此會調用攔截器切換回原本的線程中。而afterResume方法由于已經在正確的上下文環(huán)境中,因此可以直接恢復執(zhí)行。
4.UndispatchedCoroutine類
在withContext方法中,當上下文不同,但調度器相同時,會創(chuàng)建一個類型為UndispatchedCoroutine的對象。UndispatchedCoroutine類繼承自ScopeCoroutine類,重寫了afterResume方法,代碼如下:
private class UndispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
override fun afterResume(state: Any?) {
val result = recoverResult(state, uCont)
// 將當前線程ThreadLocal中的對象更新成uCont.context上下文對應的對象
withCoroutineContext(uCont.context, null) {
// 恢復執(zhí)行
uCont.resumeWith(result)
}
}
}
與父類ScopeCoroutine的afterResume方法相比,UndispatchedCoroutine類在afterResume方法中對協程上下文進行了更新,然后再恢復執(zhí)行。
- withCoroutineContext
withCoroutineContext方法用于當一個線程中執(zhí)行多個協程時,保存和恢復ThreadLocal類中的對象。
通過withContext方法的代碼可以知道,當上下文不同但調度器相同時,在執(zhí)行之前會通過withCoroutineContext方法將ThreadLocal中的對象更新成newContext對應的對象。在執(zhí)行結束后,又將ThradLocal中的對象更新成原本續(xù)體的上下文context對應的對象。代碼如下:
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
// 將線程上下文更新新的上下文,并返回老的上下文
val oldValue = updateThreadContext(context, countOrElement)
try {
// 在新的上下文環(huán)境中執(zhí)行
return block()
} finally {
// 執(zhí)行結束恢復老的上下文
restoreThreadContext(context, oldValue)
}
}
協程中有一類上下文元素是ThreadContextElement,ThreadContextElement是一個接口,具體的實現類有CoroutineId類和ThreadLocalElement類。其中,CoroutineId類用來修改線程的名字。ThreadLocalElement類用來保存和恢復ThreadLocal類中的對象,withCoroutineContext方法內部的updateThreadContext方法與restoreThreadContext方法正是通過ThreadLocalElement類實現的。ThreadContextElement接口的代碼如下:
public interface ThreadContextElement<S> : CoroutineContext.Element {
// 用于更新新的上下文,并且返回老的上下文
public fun updateThreadContext(context: CoroutineContext): S
// 重新恢復當前線程的上下文,
// 其中oldStart來自updateThreadContext方法的返回值
public fun restoreThreadContext(context: CoroutineContext, oldState: S)
}
當調用updateThreadContext方法時,會返回一個代表當前狀態(tài)的對象。當調用restoreThreadContext方法時,又需要傳入一個代表狀態(tài)的對象作為參數,來恢復之前的狀態(tài)。因此,這就需要對updateThreadContext方法的返回值進行保存。
當協程上下文中只有一個ThreadContextElement接口指向的對象時,保存在變量中即可。而如果協程上下文中有多個ThreadContextElement接口指向的對象,這時就需要一個專門的類來對這些對象進行管理,這個類就是ThreadState類,他們之間的對應關系如下圖所示:
withCoroutineContext方法執(zhí)行圖:
5.DispatchedCoroutine類
在withContext方法中,當需要切換線程時,會創(chuàng)建一個類型為DispatchedCoroutine的對象。DispatchedCoroutine類繼承自ScopeCoroutine類,代碼如下:
// 狀態(tài)機狀態(tài)
private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2
private class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
// 初始狀態(tài)
private val _decision = atomic(UNDECIDED)
// 嘗試掛起
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
// 嘗試恢復
private fun tryResume(): Boolean {
_decision.loop { decision ->
when (decision) {
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
SUSPENDED -> return false
else -> error("Already resumed")
}
}
}
override fun afterCompletion(state: Any?) {
// 通過afterResume方法實現
afterResume(state)
}
override fun afterResume(state: Any?) {
// 如果沒有掛起,則返回
if (tryResume()) return
// 進行攔截,切換線程,恢復執(zhí)行
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
// 獲取最終結果
fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
}
}
DispatchedCoroutine類中使用了一個狀態(tài)機模型,這個狀態(tài)機與在Kotlin協程:生命周期原理中分析CancellableContinuationImpl類中的狀態(tài)機相同,獲取結果的邏輯也與CancellableContinuationImpl類相同。
這里最重要的是DispatchedCoroutine類重寫了afterCompletion和afterResume方法,并且回調這兩個方法都會進行線程的切換。
6.總結
? | ScopeCoroutine類 | UndispatchedCoroutine類 | DispatchedCoroutine類 |
---|---|---|---|
afterCompletion方法 | 切線程 | 切線程 | 切線程 |
afterResume方法 | 不切線程 | 不切線程。更新ThreadLocal | 切線程 |
原文鏈接:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126322545
相關推薦
- 2022-11-18 Python?數據清洗刪除缺失值替換缺失值詳情_python
- 2022-03-31 詳解C語言實現猜數字游戲_C 語言
- 2022-10-23 C#優(yōu)雅的實現INotifyPropertyChanged接口_C#教程
- 2022-07-14 使用react-activation實現keepAlive支持返回傳參_React
- 2022-06-02 Kubernetes部署實例并配置Deployment、網絡映射、副本集_云和虛擬化
- 2022-04-09 一起來了解python的運算符_python
- 2022-09-05 用兩個隊列模擬一個棧
- 2022-10-23 C#中的yield關鍵字詳解_C#教程
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發(fā)現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支