網(wǎng)站首頁 編程語言 正文
引言
本文分析示例代碼如下:
launch(Dispatchers.Main) {
flow {
emit(1)
emit(2)
}.collect {
delay(1000)
withContext(Dispatchers.IO) {
Log.d("liduo", "$it")
}
Log.d("liduo", "$it")
}
}
一.Flow的創(chuàng)建
在協(xié)程中,可以通過flow方法創(chuàng)建一個Flow對象,一個Flow對象代表一個冷流。其中參數(shù)block是FlowCollector的擴展方法,并且可掛起。代碼入下:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
FlowCollector是一個接口,用于收集上游的流發(fā)出的值,代碼如下:
public interface FlowCollector<in T> {
// 可掛起,非線程安全
public suspend fun emit(value: T)
}
調用flow方法,會返回一個Flow接口指向的對象,代碼如下:
public interface Flow<out T> {
@InternalCoroutinesApi
public suspend fun collect(collector: FlowCollector<T>)
}
這里flow方法的返回對象是一個SafeFlow類型的對象。至此Flow就創(chuàng)建完畢了。
二.Flow的消費
在協(xié)程中,當需要消費流時,會調用collect方法,觸發(fā)流的消費,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
這里的collect方法不是Flow接口定義的方法,而是Flow的擴展方法,內部創(chuàng)建了一個匿名的FlowCollector對象,并且把action封裝到了FlowCollector對象的emit方法中,最后將FlowCollector對象作為參數(shù)傳入到了另一個collect方法,這個collect方法才是Flow接口定義的方法。
1.SafeFlow類
根據(jù)上面的分析,F(xiàn)low對象最后返回的是一個SafeFlow類型的對象。因此,這里調用的另一個collect方法,就是SafeFlow類中的collect方法,代碼如下:
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
collector.block()
}
}
SafeFlow類繼承自AbstractFlow類,類中重寫了collectSafely方法。因此調用的collect方法實際上是AbstractFlow類的方法。
2.AbstractFlow類
AbstractFlow類是一個抽象類,實現(xiàn)了Flow接口和CancellableFlow接口。實際上CancellableFlow接口繼承自Flow接口,因此AbstractFlow類只重寫了collect方法,代碼如下:
@FlowPreview
public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
// 核心方法
@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
// 創(chuàng)建SafeCollector對象,對collector進行包裹
val safeCollector = SafeCollector(collector, coroutineContext)
try {
// 調用collectSafely方法
collectSafely(safeCollector)
} finally {
// 釋放攔截的續(xù)體
safeCollector.releaseIntercepted()
}
}
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
}
collect方法內部調用了collectSafely方法,collectSafely方法在SafeFlow中被重寫。collectSafely方法中會調用flow中的block,并提供一個SafeCollector類的環(huán)境。
3. SafeCollector類
當flow方法中的代碼在執(zhí)行時,會調用emit方法發(fā)射數(shù)據(jù),這時由于block執(zhí)行在SafeCollector類的環(huán)境中,因此調用的emit方法是SafeCollector類的方法。
SafeCollector類實現(xiàn)了FlowCollector接口并且繼承自ContinuationImpl類,代碼如下:
internal actual class SafeCollector<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
...
// 保存上下文中元素數(shù)量,用于檢查上下文是否變化
@JvmField
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
// 保存上一次的上下文
private var lastEmissionContext: CoroutineContext? = null
// 執(zhí)行結束后的續(xù)體
private var completion: Continuation<Unit>? = null
// 協(xié)程上下文
override val context: CoroutineContext
get() = completion?.context ?: EmptyCoroutineContext
// 掛起的核心方法
override fun invokeSuspend(result: Result<Any?>): Any? {
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
completion?.resumeWith(result as Result<Unit>)
return COROUTINE_SUSPENDED
}
// 釋放攔截的續(xù)體
public actual override fun releaseIntercepted() {
super.releaseIntercepted()
}
// 發(fā)射數(shù)據(jù)
override suspend fun emit(value: T) {
// 獲取當前suspend方法續(xù)體
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
// 調用重載的方法
emit(uCont, value)
} catch (e: Throwable) {
// 出現(xiàn)異常時,將異常封裝成上下文,保存到lastEmissionContext
lastEmissionContext = DownstreamExceptionElement(e)
// 拋出異常
throw e
}
}
}
// 重載的emit方法
private fun emit(uCont: Continuation<Unit>, value: T): Any? {
// 從續(xù)體中獲取上下文
val currentContext = uCont.context
// 保證當前協(xié)程的Job是active的
currentContext.ensureActive()
// 獲取上次的上下文
val previousContext = lastEmissionContext
// 如果前后上下文發(fā)生變化
if (previousContext !== currentContext) {
// 檢查上下文是否發(fā)生異常
checkContext(currentContext, previousContext, value)
}
// 保存續(xù)體
completion = uCont
// 調用emitFun方法,傳入collector,value,continuation
return emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
}
// 檢查上下文變化,防止并發(fā)
private fun checkContext(
currentContext: CoroutineContext,
previousContext: CoroutineContext?,
value: T
) {
// 如果上次執(zhí)行過程中發(fā)生了異常
if (previousContext is DownstreamExceptionElement) {
// 拋出異常
exceptionTransparencyViolated(previousContext, value)
}
// 檢查上下文是否發(fā)生變化,如果變化,則拋出異常
checkContext(currentContext)
lastEmissionContext = currentContext
}
// 用于拋出異常
private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
error("""
Flow exception transparency is violated:
Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
For a more detailed explanation, please refer to Flow documentation.
""".trimIndent())
}
}
emit方法最終會調用emitFun方法方法,代碼如下:
private val emitFun =
FlowCollector<Any?>::emit as Function3<FlowCollector<Any?>, Any?, Continuation<Unit>, Any?>
emitFun是一個lambda表達式,它將只有一個參數(shù)的emit方法轉換成三個參數(shù)的方法。emitFun方法在編譯時會被編譯器處理,反編譯后的代碼邏輯大致如下:
@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
InlineMarker.mark(0);
// 核心執(zhí)行
Object var10000 = p1.emit(p2, continuation);
InlineMarker.mark(2);
InlineMarker.mark(1);
return var10000;
}
可以看到,emitFun方法內部會調用FlowCollector類對象的emit方法,同時傳入value和continuation作為參數(shù)。
而這個FlowCollector類對象就是一開始的collect方法封裝的匿名類對象,代碼如下:
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
調用它的emit方法,會直接調用action的invoke方法,并傳入發(fā)射的數(shù)據(jù),流在這里被最終消費。
通過上面的分析,可以知道消費的過程是在emit方法中被調用的,如果在消費的過程,沒有發(fā)生掛起,那么emit方法執(zhí)行完畢后,會繼續(xù)執(zhí)行flow方法里剩下的代碼,而如果在消費的過程中發(fā)生了掛起,情況會稍有不同。
4.消費過程中的掛起
如果消費過程中發(fā)生掛起,那么emit方法會返回一個COROUTINE_SUSPENDED對象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED對象后,會掛起當前協(xié)程。代碼如下:
override suspend fun emit(value: T) {
// 獲取當前suspend方法續(xù)體
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
try {
// 調用重載的方法
emit(uCont, value)
} catch (e: Throwable) {
// 出現(xiàn)異常時,將異常封裝成上下文,保存到lastEmissionContext
lastEmissionContext = DownstreamExceptionElement(e)
// 拋出異常
throw e
}
}
}
當消費過程執(zhí)行完畢時,會通過傳入的續(xù)體喚起外部協(xié)程恢復掛起狀態(tài)。根據(jù)emitFun可以知道,這里傳入的續(xù)體為this,也就是當前的SafeCollector類對象,代碼如下:
emitFun(collector as FlowCollector<Any?>, value, this as Continuation<Unit>)
恢復掛起需要調用續(xù)體的resumeWith方法,上面提到SafeCollector類繼承自ContinuationImpl類,SafeCollector類中沒有重寫resumeWith方法,而ContinuationImpl類中也沒有重寫resumeWith方法,因此實際調用的是ContinuationImpl類的父類BaseContinuationImpl類的resumeWith方法。如下圖所示:
在Kotlin協(xié)程:創(chuàng)建、啟動、掛起、恢復中提到過,調用BaseContinuationImpl類的resumeWith方法,內部會調用invokeSuspend方法,而SafeCollector類重寫了invokeSuspend方法,代碼如下:
override fun invokeSuspend(result: Result<Any?>): Any? {
// 嘗試獲取異常
result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
// 如果沒有異常,則恢復flow方法續(xù)體的執(zhí)行
completion?.resumeWith(result as Result<Unit>)
// 返回掛起標識,這里掛起的是消費過程
return COROUTINE_SUSPENDED
}
在invokeSuspend方法中,會調用resumeWith方法恢復生產過程——flow方法的執(zhí)行,同時掛起消費過程的執(zhí)行。全部過程如下圖所示:
原文鏈接:https://juejin.cn/post/7137647612286468132
相關推薦
- 2022-03-14 命令刪除node_modules
- 2022-12-12 C語言中帶頭雙向循環(huán)鏈表基本操作的實現(xiàn)詳解_C 語言
- 2022-10-15 Tomcat架構設計及Servlet作用規(guī)范講解_Tomcat
- 2022-09-03 Redis實現(xiàn)Session共享與單點登錄_Redis
- 2023-03-18 pandas數(shù)據(jù)聚合與分組運算的實現(xiàn)_python
- 2022-11-21 Pandas數(shù)據(jù)分析之groupby函數(shù)用法實例詳解_python
- 2022-09-03 redis?主從哨兵模式實現(xiàn)一主二從_Redis
- 2022-06-16 gin框架中使用JWT的定義需求及解析_Golang
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細win安裝深度學習環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結構-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支