網站首頁 編程語言 正文
一.異步冷數據流
在Kotlin協程:協程的基礎與使用中,通過使用協程中提供的flow方法可以創建一個Flow對象。這種方法得到的Flow對象實際上是一個異步冷數據流,代碼如下:
private suspend fun test() {
val flow = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
GlobalScope.launch {
// 觸發flow執行
flow.collect {
Log.d("liduo", "test1: $it")
}
}
GlobalScope.launch {
// 再次觸發flow執行
flow.collect {
Log.d("liduo", "test2: $it")
}
}
}
在上面的代碼中,通過調用flow方法,構建了一個名為flow對象,并對flow對象異步執行了兩次。每次都會打印出1、2、3、4,然后結束執行。無論誰在前誰在后,無論執行多少次,得到的結果都是相同的,這就是異步冷數據流的一個特點。
二.異步熱數據流
既然有冷數據流,那就一定有熱數據流。在協程中提供了MutableSharedFlow方法來創建異步熱數據流。相比于異步冷數據流,異步熱數據流一般在類似廣播訂閱的場景中使用。
1.異步熱數據流的設計
在異步熱數據流中,核心接口的繼承關系如下圖所示:
1)SharedFlow接口
SharedFlow接口繼承自Flow接口,代碼如下:
public interface SharedFlow<out T> : Flow<T> {
// 用于保存最近的已經發送的數據
public val replayCache: List<T>
}
- replay緩存:每個SharedFlow類型的對象會將最新發射的數據保存到replayCache中,每一個新的訂閱者會先從replayCache中獲取數據,然后再獲取最新發射的數據。
- 訂閱過程:在SharedFlow中,每個FlowCollecter類型的對象都被稱為訂閱者。調用SharedFlow類型對象的collect方法會觸發訂閱。正常情況下,訂閱不會自動結束,但訂閱者可以取消訂閱,當訂閱者所在的協程被取消時,訂閱過程就會取消。
- 操作符使用:對于大部分終端操作符,比如:toList方法,當對SharedFlow類型的對象使用這些操作符將永遠不會結束或完成變換(toList用于將上游發射的所有數據保存到列表中,并返回列表)。對于部分用于截斷流的操作符,比如:take方法,當對SharedFlow類型的對象使用這些操作符可以完成變換(take用于截取指定數量的上游流發射的數據)。當對SharedFlow類型的對象使用flowOn操作符、cancellable操作符,或使用指定參數為RENDEZVOUS的buffer操作符是無效的。
- SharedFlow并發: SharedFlow中所有的方法都是線程安全的,并且可以在多協程并發的場景中使用且不必額外加鎖。
- 冷流轉換熱流:對于一個冷流,可以通過調用shareIn方法,轉換為一個熱流。
-
SharedFlow與BroadcastChannel的區別:從概念上講,SharedFlow與BroadcastChannel很相似,但二者也有很大的差別,推薦使用SharedFlow,SharedFlow設計的目的就是要在未來替代BroadcastChannel:
- SharedFlow更簡單,不需要實現一堆與Channel相關的接口。
- SharedFlow支持配置replay緩存與緩存溢出策略。
- SharedFlow清楚地劃分了只讀的SharedFlow和可讀可寫的SharedFlow。
- SharedFlow不能關閉,也不能表示失敗,因此如果需要,所有的錯誤與完成信號都應該具體化。
2)MutableSharedFlow接口
MutableSharedFlow接口繼承自SharedFlow接口與FlowCollector接口,并在此基礎上定義了兩個方法與一個常量,代碼如下:
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
// 該方法用于嘗試發射一個數據,
// 當返回true時表示發射成功,返回false時,表示緩存空間不足,需要掛起。
public fun tryEmit(value: T): Boolean
// 該常量表示當前SharedFlow的訂閱者的數量,
// 該常量是一個狀態流StateFlow,也是一個熱流,當其中數值發生變化時會進行回調通知
public val subscriptionCount: StateFlow<Int>
// 用于清空replayCache
// 在調用該方法之前老的訂閱者,可以繼續收到replaycache中的緩存數據,
// 在調用該方法之后的新的訂閱者,只能收到emit方法發射的新數據
@ExperimentalCoroutinesApi
public fun resetReplayCache()
}
2.異步熱數據流的使用
1)MutableSharedFlow方法
在協程中,可以通過調用MutableSharedFlow方法創建一個MutableSharedFlow接口指向的對象,代碼如下:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
...
}
其中構造方法中三個參數的含義如下:
- replay:表示新訂閱的接收者可以收到的最近已經發射的數據的數量,默認為0。
- extraBufferCapacity:表示除replay外,當發射速度大于接收速度時數據可緩存的數量,默認為0。
- onBufferOverflow:表示當緩存已滿,數據即將溢出時的數據的處理策略,默認為SUSPEND。
當創建MutableSharedFlow類型的對象時,可以通過參數replay確定SharedFlow接口中定義的replayCache的最大容量,通過參數extraBufferCapacity設置一個不包括replay大小的緩存數量。replayCache本質上也是緩存的一部分,因此extraBufferCapacity與replay共同決定了緩存的大小。
對于處理數據慢的訂閱者,可以通過從緩存中獲取數據,以此來避免發射者的掛起。緩存的數量大小決定了數據處理快的訂閱者與數據處理慢的訂閱者之間的延遲程度。
當使用默認的構造方法創建MutableSharedFlow類型的對象時,它的緩存數量為0。當調用它的emit方法時會直接掛起,直到所有的訂閱者都處理完當前emit方法發送的數據,才會恢復emit方法的掛起。如果MutableSharedFlow類型的對象沒有訂閱者,則調用emit方法會直接返回。
2)使用示例
代碼如下:
private suspend fun test() {
// 創建一個熱流
val flow = MutableSharedFlow<Int>(2, 3, BufferOverflow.SUSPEND)
// 啟動一個協程,發射數據:1
// 由于有緩存,因此會被添加到緩存中,不會掛起
GlobalScope.launch {
flow.emit(1)
}
// 將MutableSharedFlow對象轉換為SharedFlow對象
// SharedFlow對象不能調用emit方法,因此只能用于接收
val onlyReadFlow = flow.asSharedFlow()
// 接收者1
// 啟動一個新協程
GlobalScope.launch {
// 訂閱監聽,當collect方法觸發訂閱時,會首先會調onSubscription方法
onlyReadFlow.onSubscription {
Log.d("liduozuishuai", "test0: ")
// 發射數據:3
// 向下游發射數據:3,其他接收者收不到
emit(3)
}.onEach {
// 處理接收的數據
Log.d("liduozuishuai", "test1: $it")
}.collect()
}
// 接收者2
// 啟動一個新的協程
GlobalScope.launch {
// 觸發并處理接收的數據
onlyReadFlow.collect {
Log.d("liduozuishuai", "test2: $it")
}
}
// 發送數據:2
GlobalScope.launch {
flow.emit(2)
}
}
對于上面的代碼,接收者1會依次打印出:3、1、2,接收者2會依次打印出1、2。
原文鏈接:https://juejin.cn/post/7144646576949395486
相關推薦
- 2022-09-16 Pandas缺失值刪除df.dropna()的使用_python
- 2022-11-09 React的特征單向數據流學習_React
- 2022-03-16 react實現todolist的增刪改查詳解_React
- 2022-07-04 一文搞懂???????python可迭代對象,迭代器,生成器,協程_python
- 2022-09-09 Nginx配置解決NetCore的跨域問題_nginx
- 2022-08-18 python上下文管理器使用場景及異常處理_python
- 2022-04-27 C語言的常量,字符串,轉義字符,注釋你都了解嗎_C 語言
- 2022-10-04 C語言零基礎徹底掌握預處理上篇_C 語言
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支