網站首頁 編程語言 正文
引言
本文分析示例代碼如下:
launch(Dispatchers.Main) {
flow {
emit(1)
emit(2)
}.collect {
delay(1000)
withContext(Dispatchers.IO) {
Log.d("liduo", "$it")
}
Log.d("liduo", "$it")
}
}
一.Flow的創建
在協程中,可以通過flow方法創建一個Flow對象,一個Flow對象代表一個冷流。其中參數block是FlowCollector的擴展方法,并且可掛起。代碼入下:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
FlowCollector是一個接口,用于收集上游的流發出的值,代碼如下:
public interface FlowCollector<in T> {
// 可掛起,非線程安全
public suspend fun emit(value: T)
}
調用flow方法,會返回一個Flow接口指向的對象,代碼如下:
public interface Flow<out T> {
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}
這里flow方法的返回對象是一個SafeFlow類型的對象。至此Flow就創建完畢了。
二.Flow的消費
在協程中,當需要消費流時,會調用collect方法,觸發流的消費,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
這里的collect方法不是Flow接口定義的方法,而是Flow的擴展方法,內部創建了一個匿名的FlowCollector對象,并且把action封裝到了FlowCollector對象的emit方法中,最后將FlowCollector對象作為參數傳入到了另一個collect方法,這個collect方法才是Flow接口定義的方法。
1.SafeFlow類
根據上面的分析,Flow對象最后返回的是一個SafeFlow類型的對象。因此,這里調用的另一個collect方法,就是SafeFlow類中的collect方法,代碼如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
SafeFlow類繼承自AbstractFlow類,類中重寫了collectSafely方法。因此調用的collect方法實際上是AbstractFlow類的方法。
2.AbstractFlow類
AbstractFlow類是一個抽象類,實現了Flow接口和CancellableFlow接口。實際上CancellableFlow接口繼承自Flow接口,因此AbstractFlow類只重寫了collect方法,代碼如下:
@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
// 核心方法
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
// 創建SafeCollector對象,對collector進行包裹
val safeCollector = SafeCollector(collector, coroutineContext)
try {
// 調用collectSafely方法
collectSafely(safeCollector)
} finally {
// 釋放攔截的續體
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
collect方法內部調用了collectSafely方法,collectSafely方法在SafeFlow中被重寫。collectSafely方法中會調用flow中的block,并提供一個SafeCollector類的環境。
3. SafeCollector類
當flow方法中的代碼在執行時,會調用emit方法發射數據,這時由于block執行在SafeCollector類的環境中,因此調用的emit方法是SafeCollector類的方法。
SafeCollector類實現了FlowCollector接口并且繼承自ContinuationImpl類,代碼如下:
internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
...
// 保存上下文中元素數量,用于檢查上下文是否變化
@JvmField
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
// 保存上一次的上下文
private var lastEmissionContext: CoroutineContext? = null
// 執行結束后的續體
private var completion: Continuation<Unit>? = null
// 協程上下文
override val context: CoroutineContext
get() = completion?.context ?: EmptyCoroutineContext
// 掛起的核心方法
override fun invokeSuspend(result: Result<Any?>): Any? {
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
completion?.resumeWith(result as Result<Unit>)
return COROUTINE_SUSPENDED
}
// 釋放攔截的續體
public actual override fun releaseIntercepted() {
super.releaseIntercepted()
}
// 發射數據
override suspend fun emit(value: T) {
// 獲取當前suspend方法續體
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
// 調用重載的方法
emit(uCont, value)
} catch (e: Throwable) {
// 出現異常時,將異常封裝成上下文,保存到lastEmissionContext
lastEmissionContext = DownstreamExceptionElement(e)
// 拋出異常
throw e
}
}
}
// 重載的emit方法
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
// 從續體中獲取上下文
val currentContext = uCont.context
// 保證當前協程的Job是active的
currentContext.ensureActive()
// 獲取上次的上下文
val previousContext = lastEmissionContext
// 如果前后上下文發生變化
if (previousContext !== currentContext) {
// 檢查上下文是否發生異常
checkContext(currentContext, previousContext, value)
}
// 保存續體
completion = uCont
// 調用emitFun方法,傳入collector,value,continuation
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
// 檢查上下文變化,防止并發
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
// 如果上次執行過程中發生了異常
if (previousContext is DownstreamExceptionElement) {
// 拋出異常
exceptionTransparencyViolated(previousContext, value)
}
// 檢查上下文是否發生變化,如果變化,則拋出異常
checkContext(currentContext)
lastEmissionContext = currentContext
}
// 用于拋出異常
private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
error("""
Flow exception transparency is violated:
Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
""".trimIndent())
}
}
emit方法最終會調用emitFun方法方法,代碼如下:
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
emitFun是一個lambda表達式,它將只有一個參數的emit方法轉換成三個參數的方法。emitFun方法在編譯時會被編譯器處理,反編譯后的代碼邏輯大致如下:
@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
InlineMarker.mark(0);
// 核心執行
Object var10000 = p1.emit(p2, continuation);
InlineMarker.mark(2);
InlineMarker.mark(1);
return var10000;
}
可以看到,emitFun方法內部會調用FlowCollector類對象的emit方法,同時傳入value和continuation作為參數。
而這個FlowCollector類對象就是一開始的collect方法封裝的匿名類對象,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
調用它的emit方法,會直接調用action的invoke方法,并傳入發射的數據,流在這里被最終消費。
通過上面的分析,可以知道消費的過程是在emit方法中被調用的,如果在消費的過程,沒有發生掛起,那么emit方法執行完畢后,會繼續執行flow方法里剩下的代碼,而如果在消費的過程中發生了掛起,情況會稍有不同。
4.消費過程中的掛起
如果消費過程中發生掛起,那么emit方法會返回一個COROUTINE_SUSPENDED對象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED對象后,會掛起當前協程。代碼如下:
override suspend fun emit(value: T) {
// 獲取當前suspend方法續體
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
// 調用重載的方法
emit(uCont, value)
} catch (e: Throwable) {
// 出現異常時,將異常封裝成上下文,保存到lastEmissionContext
lastEmissionContext = DownstreamExceptionElement(e)
// 拋出異常
throw e
}
}
}
當消費過程執行完畢時,會通過傳入的續體喚起外部協程恢復掛起狀態。根據emitFun可以知道,這里傳入的續體為this,也就是當前的SafeCollector類對象,代碼如下:
emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
恢復掛起需要調用續體的resumeWith方法,上面提到SafeCollector類繼承自ContinuationImpl類,SafeCollector類中沒有重寫resumeWith方法,而ContinuationImpl類中也沒有重寫resumeWith方法,因此實際調用的是ContinuationImpl類的父類BaseContinuationImpl類的resumeWith方法。如下圖所示:
在Kotlin協程:創建、啟動、掛起、恢復中提到過,調用BaseContinuationImpl類的resumeWith方法,內部會調用invokeSuspend方法,而SafeCollector類重寫了invokeSuspend方法,代碼如下:
override fun invokeSuspend(result: Result<Any?>): Any? {
// 嘗試獲取異常
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
// 如果沒有異常,則恢復flow方法續體的執行
completion?.resumeWith(result as Result<Unit>)
// 返回掛起標識,這里掛起的是消費過程
return COROUTINE_SUSPENDED
}
在invokeSuspend方法中,會調用resumeWith方法恢復生產過程——flow方法的執行,同時掛起消費過程的執行。全部過程如下圖所示:
原文鏈接:https://juejin.cn/post/7137647612286468132
相關推薦
- 2022-06-07 詳解ASP.NET?Core高性能服務器HTTP.SYS_實用技巧
- 2022-04-03 Django+Nginx+uWSGI?定時任務的實現方法_python
- 2022-03-14 flutter The argument type ‘String?‘ can‘t be assig
- 2022-10-05 C++淺析數據在內存中如何存儲_C 語言
- 2022-04-30 python?format格式化和數字格式化_python
- 2022-12-03 init?output?stream初始化輸出流源碼分析_Android
- 2022-08-04 GoFrame框架gcache的緩存控制淘汰策略實踐示例_Golang
- 2023-05-10 python中使用numpy包的向量矩陣相乘np.dot和np.matmul實現_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支