網站首頁 編程語言 正文
示例
代碼如下:
launch(Dispatchers.Main) {
// 第一部分
flow {
emit(1)
throw NullPointerException("e")
}.catch {
Log.d("liduo", "onCreate1: $it")
}.collect {
Log.d("liudo", "onCreate2: $it")
}
// 第二部分
flow {
emit(1)
}.onCompletion {
Log.d("liduo", "onCreate3: $it")
}.collect {
Log.d("liudo", "onCreate4: $it")
}
// 第三部分
flow {
emit(1)
throw NullPointerException("e")
}.retryWhen { cause, attempt ->
cause !is NullPointerException && attempt <= 2
}.collect {
Log.d("liudo", "onCreate5: $it")
}
}
一.catch方法
catch方法用于捕獲上游流產生的異常,代碼如下:
public fun <T> Flow<T>.catch(action: suspend FlowCollector<T>.(cause: Throwable) -> Unit): Flow<T> =
flow { // 創建Flow對象
// 觸發上游流的執行,并捕獲異常
val exception = catchImpl(this)
// 捕獲到異常,則回調action處理
if (exception != null) action(exception)
}
catch方法是Flow接口的擴展方法,并返回一個Flow類型的對象。在catch方法中,調用flow方法創建了一個Flow對象。
catch方法核心是通過catchImpl方法實現異常的捕獲,如果成功捕獲到異常,則回調參數action處理。這里參數action是FlowCollector接口的擴展方法,因此可以繼續調用emit方法,向下游發送值。
catchImpl方法
當下游調用collect方法時,會觸發catch方法創建的Flow對象的執行,并調用catchImpl方法來處理,代碼如下:
internal suspend fun <T> Flow<T>.catchImpl(
collector: FlowCollector<T>
): Throwable? {
// 保存下游流執行拋出的異常
var fromDownstream: Throwable? = null
try {
// 觸發上游流的執行
collect {
try {
// 將上游流發送的值作為參數,觸發下游流執行
collector.emit(it)
} catch (e: Throwable) { // 如果下游流在執行中發生異常,保存并拋出
fromDownstream = e
throw e
}
}
} catch (e: Throwable) { // 這里捕獲的異常,可能為上游流的異常——collect方法,
// 也可能為下游流的異常——emit方法
// 如果異常是下游流產生的異常,或者是協程取消時拋出的異常
if (e.isSameExceptionAs(fromDownstream) || e.isCancellationCause(coroutineContext)) {
throw e // 再次拋出,交給下游處理
} else { // 如果是上游流的異常且不為協程取消異常
return e // 成功捕獲
}
}
// 未捕獲到異常,返回
return null
}
catchImpl方法是Flow接口的擴展方法,因此在調用collect方法時,會觸發上游流的執行。catchImpl方法的核心在于:將上游發出的值傳遞給下游處理,并對這一過程進行了異常捕獲操作。
二. onCompletion方法
onCompletion方法用于在上游的流全部執行完畢后最后執行,代碼如下:
public fun <T> Flow<T>.onCompletion(
action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T> = unsafeFlow { // 創建一個Flow對象
try {
// 觸發上游流的執行
// this表示下游的FlowCollector
collect(this)
} catch (e: Throwable) {// 如果下游發生異常
// 將異常封裝成ThrowingCollector類型的FlowCollector,并回調參數action,
ThrowingCollector(e).invokeSafely(action, e)
// 拋出異常
throw e
}
// 如果正常執行結束,會走到這里
val sc = SafeCollector(this, currentCoroutineContext())
try {
// 回調執行參數action
sc.action(null)
} finally {
sc.releaseIntercepted()
}
}
onCompletion方法是Flow接口的擴展方法,因此在調用collect方法時,會觸發上游流的執行。同時,傳入this作為參數,this表示下游流調用collect方法時,傳給unsafeFlow方法創建的Flow對象的類型為FlowCollector的對象。onCompletion方法的核心在于:將自身創建的Flow對象作為上游與下游的連接容器,只有當流全部執行完畢或執行過程中發生異常,collect方法才可以執行完成,繼續向下執行。
1.unsafeFlow方法
unsafeFlow方法用于創建一個類型為Flow對象,與之前在Kotlin協程:Flow基礎原理提到過的SafeFlow類相比,unsafeFlow方法創建的Flow對象不會對執行的上下文進行檢查,代碼如下:
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
// 返回一個匿名內部類
return object : Flow<T> {
// 回調collect方法是直接執行block
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
雖然onCompletion方法內部使用unsafeFlow方法創建Flow對象,但卻使用了SafeCollector類。根據之前在Kotlin協程:Flow基礎原理提到的,調用SafeCollector類的emit方法時,會對上下文進行檢查。因此實際效果與使用SafeFlow類效果相同。
2.ThrowingCollector類
ThrowingCollector類也是一種FlowCollector,用于包裹異常。當調用它的emit方法時,會拋出包裹的異常,代碼如下:
private class ThrowingCollector(private val e: Throwable) : FlowCollector<Any?> {
override suspend fun emit(value: Any?) {
// 拋出異常
throw e
}
}
為什么要重新創建ThrowingCollector對象,而不使用下游的FlowCollector對象呢?
為了防止當下游的流執行失敗時,onCompletion方法的action參數執行時調用emit方法發送數據,這樣會導致onCompletion方法作為在“finially代碼塊”使用時不是最后執行的方法。
onCompletion方法搭配與catch方法,實現try-catch-finially代碼塊的效果。
三. retryWhen方法
retryWhen方法與catch方法類似,都可以用于捕獲上游流產生的異常。但兩者不同之處在于,retryWhen方法還可以根據“異常類型”和“重試次數”來決定是否要再次觸發上游流的執行,而且當retryWhen方法不打算再次觸發上游流的執行時,捕獲的異常會被拋出,代碼如下:
// 參數cause表示捕獲到的異常
// 參數attempt表示重試的次數
// 參數predicate返回true表示重新觸發上游流的執行
public fun <T> Flow<T>.retryWhen(predicate: suspend FlowCollector<T>.(cause: Throwable, attempt: Long) -> Boolean): Flow<T> =
// 創建一個Flow對象
flow {
// 記錄重試次數
var attempt = 0L
// 表示是否重新觸發
var shallRetry: Boolean
do {
// 復位成false
shallRetry = false
// 觸發上游流的執行,并捕獲異常
val cause = catchImpl(this)
// 如果捕獲到異常
if (cause != null) {
// 用戶判斷,是否要重新觸發
if (predicate(cause, attempt)) {
// 表示要重新觸發
shallRetry = true
// 重試次數加1
attempt++
} else { // 如果用戶不需要重新觸發
// 則拋出異常
throw cause
}
}
// 判斷是否重新觸發
} while (shallRetry)
}
retryWhen方法是Flow接口的擴展方法。retryWhen方法的核心通過catchImpl方法實現對上游流的觸發及異常捕獲,并加入了由用戶判斷的重試邏輯實現。
原文鏈接:https://blog.csdn.net/LeeDuoZuiShuai/article/details/126828155
相關推薦
- 2023-04-04 Golang利用casbin實現權限驗證詳解_Golang
- 2023-07-22 垃圾回收的核心知識點解析
- 2021-12-10 Ubuntu環境下mongodb安裝配置詳細步驟_MongoDB
- 2022-12-03 FFmpeg?Principle分析Out?put?File?數據結構_Android
- 2022-12-06 Python?list?append方法之給列表追加元素_python
- 2022-05-17 基于Python編寫簡易文字語音轉換器_python
- 2022-08-13 windows上使用docker搭建kafka
- 2022-12-03 Sql?Server中通過sql命令獲取cpu占用及產生鎖的sql_MsSql
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支