網站首頁 編程語言 正文
前置知識
Kotlin協程不是什么空中閣樓,Kotlin源代碼會被編譯成class字節碼文件,最終會運行到虛擬機中。所以從本質上講,Kotlin和Java是類似的,都是可以編譯產生class的語言,但最終還是會受到虛擬機的限制,它們的代碼最終會在虛擬機上的某個線程上被執行。
之前我們分析了launch的原理,但當時我們沒有去分析協程創建出來后是如何與線程產生關聯的,怎么被分發到具體的線程上執行的,本篇文章就帶大家分析一下。
要想搞懂Dispatchers,我們先來看一下Dispatchers、CoroutineDispatcher、ContinuationInterceptor、CoroutineContext之間的關系
public actual object Dispatchers { @JvmStatic public actual val Default: CoroutineDispatcher = DefaultScheduler @JvmStatic public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher @JvmStatic public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined @JvmStatic public val IO: CoroutineDispatcher = DefaultIoScheduler } public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { } public interface ContinuationInterceptor : CoroutineContext.Element {} public interface Element : CoroutineContext {}
Dispatchers中存放的是協程調度器(它本身是一個單例),有我們平時常用的IO、Default、Main等。這些協程調度器都是CoroutineDispatcher的子類,這些協程調度器其實都是CoroutineContext。
demo
我們先來看一個關于launch的demo:
fun main() { val coroutineScope = CoroutineScope(Job()) coroutineScope.launch { println("Thread : ${Thread.currentThread().name}") } Thread.sleep(5000L) }
在生成CoroutineScope時,demo中沒有傳入相關的協程調度器,也就是Dispatchers。那這個launch會運行到哪個線程之上?
運行試一下:
Thread : DefaultDispatcher-worker-1
居然運行到了DefaultDispatcher-worker-1
線程上,這看起來明顯是Dispatchers.Default
協程調度器里面的線程。我明明沒傳Dispatchers相關的context,居然會運行到子線程上。說明運行到default線程是launch默認的。
它是怎么與default線程產生關聯的?打開源碼一探究竟:
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { //代碼1 val newContext = newCoroutineContext(context) //代碼2 val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true) //代碼3 coroutine.start(start, coroutine, block) return coroutine }
- 將傳入的CoroutineContext構造出新的context
- 啟動模式,判斷是否為懶加載,如果是懶加載則構建懶加載協程對象,否則就是標準的
- 啟動協程
我們重點關注代碼1,這是與CoroutineContext相關的。
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext { //從父協程那里繼承過來的context+這次的context val combined = coroutineContext.foldCopiesForChildCoroutine() + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined //combined可以簡單的把它看成是一個map,它是CoroutineContext類型的 //如果當前context不等于Dispatchers.Default,而且從map里面取ContinuationInterceptor(用于攔截之后分發線程的)值為空,說明沒有傳入協程應該在哪個線程上運行的相關參數 return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }
調用launch的時候,我們沒有傳入context,默認參數是EmptyCoroutineContext。這里的combined,它其實是CoroutineContext類型的,可以簡單的看成是map(其實不是,只是類似)。
通過combined[ContinuationInterceptor]可以將傳入的線程調度相關的參數給取出來,這里如果取出來為空,是給該context添加了一個Dispatchers.Default,然后把新的context返回出去了。所以launch默認情況下,會走到default線程去執行。
補充一點:CoroutineContext能夠通過+
連接是因為它內部有個public operator fun plus
函數。能夠通過combined[ContinuationInterceptor]這種方式訪問元素是因為有個public operator fun get
函數。
public interface CoroutineContext { /** * Returns the element with the given [key] from this context or `null`. */ public operator fun <E : Element> get(key: Key<E>): E? /** * Returns a context containing elements from this context and elements from other [context]. * The elements from this context with the same key as in the other one are dropped. */ public operator fun plus(context: CoroutineContext): CoroutineContext { ...... } }
startCoroutineCancellable
上面我們分析了launch默認情況下,context中會增加Dispatchers.Default的這個協程調度器,到時launch的Lambda會在default線程上執行,其中具體流程是怎么樣的,我們分析一下。
在之前的文章 Kotlin協程之launch原理 中我們分析過,launch默認情況下會最終執行到startCoroutineCancellable
函數。
public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { //構建ContinuationImpl createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) } public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) //走這里 create(probeCompletion) else createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any?>).invoke(it) } }
在Kotlin協程之launch原理 文章中,咱們分析過create(probeCompletion)這里創建出來的是launch的那個Lambda,編譯器會產生一個匿名內部類,它繼承自SuspendLambda,而SuspendLambda是繼承自ContinuationImpl。
所以 createCoroutineUnintercepted(completion)一開始構建出來的是一個ContinuationImpl,接下來需要去看它的intercepted()函數。
intercepted()函數
internal abstract class ContinuationImpl( completion: Continuation<Any?>?, private val _context: CoroutineContext? ) : BaseContinuationImpl(completion) { constructor(completion: Continuation<Any?>?) : this(completion, completion?.context) public override val context: CoroutineContext get() = _context!! @Transient private var intercepted: Continuation<Any?>? = null public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } }
第一次走到intercepted()函數時,intercepted肯定是為null的,還沒初始化。此時會通過context[ContinuationInterceptor]取出Dispatcher對象,然后調用該Dispatcher對象的interceptContinuation()函數。這個Dispatcher對象在demo這里其實就是Dispatchers.Default。
public actual object Dispatchers { @JvmStatic public actual val Default: CoroutineDispatcher = DefaultScheduler }
可以看到,Dispatchers.Default是一個CoroutineDispatcher對象,interceptContinuation()函數就在CoroutineDispatcher中。
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) } public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) { createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit)) }
這個方法非常簡單,就是新建并且返回了一個DispatchedContinuation對象,將this和continuation給傳入進去。這里的this是Dispatchers.Default。
所以,最終我們發現走完startCoroutineCancellable的前2步之后,也就是走完intercepted()之后,創建的是DispatchedContinuation對象,最后是調用的DispatchedContinuation的resumeCancellableWith函數。最后這步比較關鍵,這是真正將協程的具體執行邏輯放到線程上執行的部分。
internal class DispatchedContinuation<in T>( //這里傳入的dispatcher在demo中是Dispatchers.Default @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation { inline fun resumeCancellableWith( result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)? ) { val state = result.toState(onCancellation) //代碼1 if (dispatcher.isDispatchNeeded(context)) { _state = state resumeMode = MODE_CANCELLABLE //代碼2 dispatcher.dispatch(context, this) } else { //代碼3 executeUnconfined(state, MODE_CANCELLABLE) { if (!resumeCancelled(state)) { resumeUndispatchedWith(result) } } } } } internal abstract class DispatchedTask<in T>( @JvmField public var resumeMode: Int ) : SchedulerTask() { ...... } internal actual typealias SchedulerTask = Task internal abstract class Task( @JvmField var submissionTime: Long, @JvmField var taskContext: TaskContext ) : Runnable { ...... } public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { public abstract fun dispatch(context: CoroutineContext, block: Runnable) public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true }
從DispatchedContinuation的繼承結構來看,它既是一個Continuation(通過委托給傳入的continuation參數),也是一個Runnable。
- 首先看代碼1:這個dispatcher在demo中其實是Dispatchers.Default ,然后調用它的isDispatchNeeded(),這個函數定義在CoroutineDispatcher中,默認就是返回true,只有Dispatchers.Unconfined返回false
- 代碼2:調用Dispatchers.Default的dispatch函數,將context和自己(DispatchedContinuation,也就是Runnable)傳過去了
- 代碼3:對應Dispatchers.Unconfined的情況,它的isDispatchNeeded()返回false
現在我們要分析代碼2之后的執行邏輯,也就是將context和Runnable傳入到dispatch函數之后是怎么執行的。按道理,看到Runnable,那可能這個與線程執行相關,應該離我們想要的答案不遠了。回到Dispatchers,我們發現Dispatchers.Default是DefaultScheduler類型的,那我們就去DefaultScheduler中或者其父類中去找dispatch函數。
DefaultScheduler中找dispatch函數
public actual object Dispatchers { @JvmStatic public actual val Default: CoroutineDispatcher = DefaultScheduler } internal object DefaultScheduler : SchedulerCoroutineDispatcher( CORE_POOL_SIZE, MAX_POOL_SIZE, IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME ) { ...... } internal open class SchedulerCoroutineDispatcher( private val corePoolSize: Int = CORE_POOL_SIZE, private val maxPoolSize: Int = MAX_POOL_SIZE, private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, private val schedulerName: String = "CoroutineScheduler", ) : ExecutorCoroutineDispatcher() { private var coroutineScheduler = createScheduler() private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName) override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block) }
最后發現dispatch函數在其父類SchedulerCoroutineDispatcher中,在這里構建了一個CoroutineScheduler,直接調用了CoroutineScheduler對象的dispatch,然后將Runnable(也就是上面的DispatchedContinuation對象)傳入。
Runnable傳入
internal class CoroutineScheduler( @JvmField val corePoolSize: Int, @JvmField val maxPoolSize: Int, @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS, @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME ) : Executor, Closeable { override fun execute(command: Runnable) = dispatch(command) fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) { trackTask() // this is needed for virtual time support //代碼1:構建Task,Task實現了Runnable接口 val task = createTask(block, taskContext) //代碼2:取當前線程轉為Worker對象,Worker是一個繼承自Thread的類 val currentWorker = currentWorker() //代碼3:嘗試將Task提交到本地隊列并根據結果執行相應的操作 val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch) if (notAdded != null) { //代碼4:notAdded不為null,則再將notAdded(Task)添加到全局隊列中 if (!addToGlobalQueue(notAdded)) { throw RejectedExecutionException("$schedulerName was terminated") } } val skipUnpark = tailDispatch && currentWorker != null // Checking 'task' instead of 'notAdded' is completely okay if (task.mode == TASK_NON_BLOCKING) { if (skipUnpark) return //代碼5: 創建Worker并開始執行該線程 signalCpuWork() } else { // Increment blocking tasks anyway signalBlockingWork(skipUnpark = skipUnpark) } } private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this } internal inner class Worker private constructor() : Thread() { ..... } }
觀察發現,原來CoroutineScheduler類實現了java.util.concurrent.Executor接口,同時實現了它的execute方法,這個方法也會調用dispatch()。
- 代碼1:首先是通過Runnable構建了一個Task,這個Task其實也是實現了Runnable接口,只是把傳入的Runnable包裝了一下
- 代碼2:將當前線程取出來轉換成Worker,當然第一次時,這個轉換不會成功,這個Worker是繼承自Thread的一個類
- 代碼3:將task提交到本地隊列中,這個本地隊列待會兒會在Worker這個線程執行時取出Task,并執行Task
- 代碼4:如果task提交到本地隊列的過程中沒有成功,那么會添加到全局隊列中,待會兒也會被Worker取出來Task并執行
- 代碼5:創建Worker線程,并開始執行
開始執行Worker線程之后,我們需要看一下這個線程的run方法執行的是啥,也就是它的具體執行邏輯。
Worker線程執行邏輯
internal inner class Worker private constructor() : Thread() { override fun run() = runWorker() private fun runWorker() { var rescanned = false while (!isTerminated && state != WorkerState.TERMINATED) { //代碼1 val task = findTask(mayHaveLocalTasks) if (task != null) { rescanned = false minDelayUntilStealableTaskNs = 0L //代碼2 executeTask(task) continue } else { mayHaveLocalTasks = false } if (minDelayUntilStealableTaskNs != 0L) { if (!rescanned) { rescanned = true } else { rescanned = false tryReleaseCpu(WorkerState.PARKING) interrupted() LockSupport.parkNanos(minDelayUntilStealableTaskNs) minDelayUntilStealableTaskNs = 0L } continue } tryPark() } tryReleaseCpu(WorkerState.TERMINATED) } fun findTask(scanLocalQueue: Boolean): Task? { if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue) // If we can't acquire a CPU permit -- attempt to find blocking task val task = if (scanLocalQueue) { localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull() } else { globalBlockingQueue.removeFirstOrNull() } return task ?: trySteal(blockingOnly = true) } private fun executeTask(task: Task) { val taskMode = task.mode idleReset(taskMode) beforeTask(taskMode) runSafely(task) afterTask(taskMode) } fun runSafely(task: Task) { try { task.run() } catch (e: Throwable) { val thread = Thread.currentThread() thread.uncaughtExceptionHandler.uncaughtException(thread, e) } finally { unTrackTask() } } }
run方法直接調用的runWorker(),在里面是一個while循環,不斷從隊列中取Task來執行。
- 代碼1:從本地隊列或者全局隊列中取出Task
- 代碼2:執行這個task,最終其實就是調用這個Runnable的run方法。
也就是說,在Worker這個線程中,執行了這個Runnable的run方法。還記得這個Runnable是誰么?它就是上面我們看過的DispatchedContinuation,這里的run方法執行的就是協程任務,那這塊具體的run方法的實現邏輯,我們應該到DispatchedContinuation中去找。
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation { ...... } internal abstract class DispatchedTask<in T>( @JvmField public var resumeMode: Int ) : SchedulerTask() { public final override fun run() { assert { resumeMode != MODE_UNINITIALIZED } // should have been set before dispatching val taskContext = this.taskContext var fatalException: Throwable? = null try { val delegate = delegate as DispatchedContinuation<T> val continuation = delegate.continuation withContinuationContext(continuation, delegate.countOrElement) { val context = continuation.context val state = takeState() // NOTE: Must take state in any case, even if cancelled val exception = getExceptionalResult(state) /* * Check whether continuation was originally resumed with an exception. * If so, it dominates cancellation, otherwise the original exception * will be silently lost. */ val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null //非空,且未處于active狀態 if (job != null && !job.isActive) { //開始之前,協程已經被取消,將具體的Exception傳出去 val cause = job.getCancellationException() cancelCompletedResult(state, cause) continuation.resumeWithStackTrace(cause) } else { //有異常,傳遞異常 if (exception != null) { continuation.resumeWithException(exception) } else { //代碼1 continuation.resume(getSuccessfulResult(state)) } } } } catch (e: Throwable) { // This instead of runCatching to have nicer stacktrace and debug experience fatalException = e } finally { val result = runCatching { taskContext.afterTask() } handleFatalException(fatalException, result.exceptionOrNull()) } } }
我們主要看一下代碼1處,調用了resume開啟協程。前面沒有異常,才開始啟動協程,這里才是真正的開始啟動協程,開始執行launch傳入的Lambda表達式。這個時候,協程的邏輯是在Worker這個線程上執行的了,切到某個線程上執行的邏輯已經完成了。
ps: rusume會走到BaseContinuationImpl的rusumeWith,然后走到launch傳入的Lambda匿名內部類的invokeSuspend方法,開始執行狀態機邏輯。前面的文章 Kotlin協程createCoroutine和startCoroutine原理 我們分析過這里,這里就只是簡單提一下。
到這里,Dispatchers的執行流程就算完了,前后都串起來了。
小結
Dispatchers是協程框架中與線程交互的關鍵。底層會有不同的線程池,Dispatchers.Default、IO,協程任務來了的時候會封裝成一個個的Runnable,丟到線程中執行,這些Runnable的run方法中執行的其實就是continuation.resume,也就是launch的Lambda生成的SuspendLambda匿名內部類,也就是開啟協程狀態機,開始協程的真正執行。
原文鏈接:https://juejin.cn/post/7127492385923137549
相關推薦
- 2022-06-06 詳細講解Docker虛擬化_docker
- 2022-10-20 詳解Go?語言如何通過測試保證質量_Golang
- 2024-02-17 通過AOP切面實現公共字段的自動填充
- 2022-01-12 修改node_modules的包
- 2022-06-07 使用Docker創建FTP服務器的過程解析_docker
- 2023-03-17 redis中hash數據結構及說明_Redis
- 2022-12-28 React組件實例三大核心屬性State?props?Refs詳解_React
- 2022-07-21 Pandas文件讀寫操作
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支