日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Kotlin協程之Flow基礎原理示例解析_Android

作者:李蕭蝶 ? 更新時間: 2022-10-27 編程語言

引言

本文分析示例代碼如下:

launch(Dispatchers.Main) {
    flow {
        emit(1)
        emit(2)
    }.collect {
        delay(1000)

        withContext(Dispatchers.IO) {
            Log.d("liduo", "$it")
        }

        Log.d("liduo", "$it")
    }
}

一.Flow的創建

在協程中,可以通過flow方法創建一個Flow對象,一個Flow對象代表一個冷流。其中參數block是FlowCollector的擴展方法,并且可掛起。代碼入下:

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

FlowCollector是一個接口,用于收集上游的流發出的值,代碼如下:

public interface FlowCollector<in T> {
    // 可掛起,非線程安全
    public suspend fun emit(value: T)
}

調用flow方法,會返回一個Flow接口指向的對象,代碼如下:

public interface Flow<out T> {
   
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector<T>)
}

這里flow方法的返回對象是一個SafeFlow類型的對象。至此Flow就創建完畢了。

二.Flow的消費

在協程中,當需要消費流時,會調用collect方法,觸發流的消費,代碼如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

這里的collect方法不是Flow接口定義的方法,而是Flow的擴展方法,內部創建了一個匿名的FlowCollector對象,并且把action封裝到了FlowCollector對象的emit方法中,最后將FlowCollector對象作為參數傳入到了另一個collect方法,這個collect方法才是Flow接口定義的方法。

1.SafeFlow類

根據上面的分析,Flow對象最后返回的是一個SafeFlow類型的對象。因此,這里調用的另一個collect方法,就是SafeFlow類中的collect方法,代碼如下:

private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        collector.block()
    }
}

SafeFlow類繼承自AbstractFlow類,類中重寫了collectSafely方法。因此調用的collect方法實際上是AbstractFlow類的方法。

2.AbstractFlow類

AbstractFlow類是一個抽象類,實現了Flow接口和CancellableFlow接口。實際上CancellableFlow接口繼承自Flow接口,因此AbstractFlow類只重寫了collect方法,代碼如下:

@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

     // 核心方法 
    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector<T>) {
        // 創建SafeCollector對象,對collector進行包裹
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            // 調用collectSafely方法
            collectSafely(safeCollector)
        } finally {
            // 釋放攔截的續體
            safeCollector.releaseIntercepted()
        }
    }
    
    public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}

collect方法內部調用了collectSafely方法,collectSafely方法在SafeFlow中被重寫。collectSafely方法中會調用flow中的block,并提供一個SafeCollector類的環境。

3. SafeCollector類

當flow方法中的代碼在執行時,會調用emit方法發射數據,這時由于block執行在SafeCollector類的環境中,因此調用的emit方法是SafeCollector類的方法。

SafeCollector類實現了FlowCollector接口并且繼承自ContinuationImpl類,代碼如下:

internal actual class SafeCollector<T> actual constructor(
    @JvmField internal actual val collector: FlowCollector<T>,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
    
    ...
    // 保存上下文中元素數量,用于檢查上下文是否變化
    @JvmField
    internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
    // 保存上一次的上下文
    private var lastEmissionContext: CoroutineContext? = null
    // 執行結束后的續體
    private var completion: Continuation<Unit>? = null

    // 協程上下文
    override val context: CoroutineContext
        get() = completion?.context ?: EmptyCoroutineContext

    // 掛起的核心方法
    override fun invokeSuspend(result: Result<Any?>): Any? {
        result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
        completion?.resumeWith(result as Result<Unit>)
        return COROUTINE_SUSPENDED
    }

    // 釋放攔截的續體
    public actual override fun releaseIntercepted() {
        super.releaseIntercepted()
    }

    // 發射數據
    override suspend fun emit(value: T) {
        // 獲取當前suspend方法續體
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                // 調用重載的方法
                emit(uCont, value)
            } catch (e: Throwable) {
                 // 出現異常時,將異常封裝成上下文,保存到lastEmissionContext
                lastEmissionContext = DownstreamExceptionElement(e)
                // 拋出異常
                throw e
            }
        }
    }

    // 重載的emit方法
    private fun emit(uCont: Continuation<Unit>, value: T): Any? {
        // 從續體中獲取上下文
        val currentContext = uCont.context
        // 保證當前協程的Job是active的
        currentContext.ensureActive()
        // 獲取上次的上下文
        val previousContext = lastEmissionContext
        // 如果前后上下文發生變化
        if (previousContext !== currentContext) {
            // 檢查上下文是否發生異常
            checkContext(currentContext, previousContext, value)
        }
        // 保存續體
        completion = uCont
        // 調用emitFun方法,傳入collector,value,continuation
        return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
    }

    // 檢查上下文變化,防止并發
    private fun checkContext(
        currentContext: CoroutineContext,
        previousContext: CoroutineContext?,
        value: T
    ) {
        // 如果上次執行過程中發生了異常
        if (previousContext is DownstreamExceptionElement) {
            // 拋出異常
            exceptionTransparencyViolated(previousContext, value)
        }
        // 檢查上下文是否發生變化,如果變化,則拋出異常
        checkContext(currentContext)
        lastEmissionContext = currentContext
    }
    
    // 用于拋出異常
    private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
        error("""
            Flow exception transparency is violated:
                Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
                Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                For a more detailed explanation, please refer to Flow documentation.
            """.trimIndent())
    }
}

emit方法最終會調用emitFun方法方法,代碼如下:

private val emitFun =
    FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>

emitFun是一個lambda表達式,它將只有一個參數的emit方法轉換成三個參數的方法。emitFun方法在編譯時會被編譯器處理,反編譯后的代碼邏輯大致如下:

@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
   InlineMarker.mark(0);
   // 核心執行
   Object var10000 = p1.emit(p2, continuation);
   InlineMarker.mark(2);
   InlineMarker.mark(1);
   return var10000;
}

可以看到,emitFun方法內部會調用FlowCollector類對象的emit方法,同時傳入value和continuation作為參數。

而這個FlowCollector類對象就是一開始的collect方法封裝的匿名類對象,代碼如下:

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> {
        override suspend fun emit(value: T) = action(value)
    })

調用它的emit方法,會直接調用action的invoke方法,并傳入發射的數據,流在這里被最終消費。

通過上面的分析,可以知道消費的過程是在emit方法中被調用的,如果在消費的過程,沒有發生掛起,那么emit方法執行完畢后,會繼續執行flow方法里剩下的代碼,而如果在消費的過程中發生了掛起,情況會稍有不同。

4.消費過程中的掛起

如果消費過程中發生掛起,那么emit方法會返回一個COROUTINE_SUSPENDED對象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED對象后,會掛起當前協程。代碼如下:

override suspend fun emit(value: T) {
    // 獲取當前suspend方法續體
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
        try {
            // 調用重載的方法
            emit(uCont, value)
        } catch (e: Throwable) {
            // 出現異常時,將異常封裝成上下文,保存到lastEmissionContext
            lastEmissionContext = DownstreamExceptionElement(e)
            // 拋出異常
            throw e
        }
    }
}

當消費過程執行完畢時,會通過傳入的續體喚起外部協程恢復掛起狀態。根據emitFun可以知道,這里傳入的續體為this,也就是當前的SafeCollector類對象,代碼如下:

emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)

恢復掛起需要調用續體的resumeWith方法,上面提到SafeCollector類繼承自ContinuationImpl類,SafeCollector類中沒有重寫resumeWith方法,而ContinuationImpl類中也沒有重寫resumeWith方法,因此實際調用的是ContinuationImpl類的父類BaseContinuationImpl類的resumeWith方法。如下圖所示:

在Kotlin協程:創建、啟動、掛起、恢復中提到過,調用BaseContinuationImpl類的resumeWith方法,內部會調用invokeSuspend方法,而SafeCollector類重寫了invokeSuspend方法,代碼如下:

override fun invokeSuspend(result: Result<Any?>): Any? {
    // 嘗試獲取異常
    result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
    // 如果沒有異常,則恢復flow方法續體的執行
    completion?.resumeWith(result as Result<Unit>)
    // 返回掛起標識,這里掛起的是消費過程
    return COROUTINE_SUSPENDED
}

在invokeSuspend方法中,會調用resumeWith方法恢復生產過程——flow方法的執行,同時掛起消費過程的執行。全部過程如下圖所示:

原文鏈接:https://juejin.cn/post/7137647612286468132

欄目分類
最近更新