網(wǎng)站首頁 編程語言 正文
一.異步冷數(shù)據(jù)流
在Kotlin協(xié)程:協(xié)程的基礎(chǔ)與使用中,通過使用協(xié)程中提供的flow方法可以創(chuàng)建一個(gè)Flow對(duì)象。這種方法得到的Flow對(duì)象實(shí)際上是一個(gè)異步冷數(shù)據(jù)流,代碼如下:
private suspend fun test() {
val flow = flow {
emit(1)
emit(2)
emit(3)
emit(4)
}
GlobalScope.launch {
// 觸發(fā)flow執(zhí)行
flow.collect {
Log.d("liduo", "test1: $it")
}
}
GlobalScope.launch {
// 再次觸發(fā)flow執(zhí)行
flow.collect {
Log.d("liduo", "test2: $it")
}
}
}
在上面的代碼中,通過調(diào)用flow方法,構(gòu)建了一個(gè)名為flow對(duì)象,并對(duì)flow對(duì)象異步執(zhí)行了兩次。每次都會(huì)打印出1、2、3、4,然后結(jié)束執(zhí)行。無論誰在前誰在后,無論執(zhí)行多少次,得到的結(jié)果都是相同的,這就是異步冷數(shù)據(jù)流的一個(gè)特點(diǎn)。
二.異步熱數(shù)據(jù)流
既然有冷數(shù)據(jù)流,那就一定有熱數(shù)據(jù)流。在協(xié)程中提供了MutableSharedFlow方法來創(chuàng)建異步熱數(shù)據(jù)流。相比于異步冷數(shù)據(jù)流,異步熱數(shù)據(jù)流一般在類似廣播訂閱的場(chǎng)景中使用。
1.異步熱數(shù)據(jù)流的設(shè)計(jì)
在異步熱數(shù)據(jù)流中,核心接口的繼承關(guān)系如下圖所示:
1)SharedFlow接口
SharedFlow接口繼承自Flow接口,代碼如下:
public interface SharedFlow<out T> : Flow<T> {
// 用于保存最近的已經(jīng)發(fā)送的數(shù)據(jù)
public val replayCache: List<T>
}
- replay緩存:每個(gè)SharedFlow類型的對(duì)象會(huì)將最新發(fā)射的數(shù)據(jù)保存到replayCache中,每一個(gè)新的訂閱者會(huì)先從replayCache中獲取數(shù)據(jù),然后再獲取最新發(fā)射的數(shù)據(jù)。
- 訂閱過程:在SharedFlow中,每個(gè)FlowCollecter類型的對(duì)象都被稱為訂閱者。調(diào)用SharedFlow類型對(duì)象的collect方法會(huì)觸發(fā)訂閱。正常情況下,訂閱不會(huì)自動(dòng)結(jié)束,但訂閱者可以取消訂閱,當(dāng)訂閱者所在的協(xié)程被取消時(shí),訂閱過程就會(huì)取消。
- 操作符使用:對(duì)于大部分終端操作符,比如:toList方法,當(dāng)對(duì)SharedFlow類型的對(duì)象使用這些操作符將永遠(yuǎn)不會(huì)結(jié)束或完成變換(toList用于將上游發(fā)射的所有數(shù)據(jù)保存到列表中,并返回列表)。對(duì)于部分用于截?cái)嗔鞯牟僮鞣热纾簍ake方法,當(dāng)對(duì)SharedFlow類型的對(duì)象使用這些操作符可以完成變換(take用于截取指定數(shù)量的上游流發(fā)射的數(shù)據(jù))。當(dāng)對(duì)SharedFlow類型的對(duì)象使用flowOn操作符、cancellable操作符,或使用指定參數(shù)為RENDEZVOUS的buffer操作符是無效的。
- SharedFlow并發(fā): SharedFlow中所有的方法都是線程安全的,并且可以在多協(xié)程并發(fā)的場(chǎng)景中使用且不必額外加鎖。
- 冷流轉(zhuǎn)換熱流:對(duì)于一個(gè)冷流,可以通過調(diào)用shareIn方法,轉(zhuǎn)換為一個(gè)熱流。
-
SharedFlow與BroadcastChannel的區(qū)別:從概念上講,SharedFlow與BroadcastChannel很相似,但二者也有很大的差別,推薦使用SharedFlow,SharedFlow設(shè)計(jì)的目的就是要在未來替代BroadcastChannel:
- SharedFlow更簡(jiǎn)單,不需要實(shí)現(xiàn)一堆與Channel相關(guān)的接口。
- SharedFlow支持配置replay緩存與緩存溢出策略。
- SharedFlow清楚地劃分了只讀的SharedFlow和可讀可寫的SharedFlow。
- SharedFlow不能關(guān)閉,也不能表示失敗,因此如果需要,所有的錯(cuò)誤與完成信號(hào)都應(yīng)該具體化。
2)MutableSharedFlow接口
MutableSharedFlow接口繼承自SharedFlow接口與FlowCollector接口,并在此基礎(chǔ)上定義了兩個(gè)方法與一個(gè)常量,代碼如下:
public interface MutableSharedFlow<T> : SharedFlow<T>, FlowCollector<T> {
// 該方法用于嘗試發(fā)射一個(gè)數(shù)據(jù),
// 當(dāng)返回true時(shí)表示發(fā)射成功,返回false時(shí),表示緩存空間不足,需要掛起。
public fun tryEmit(value: T): Boolean
// 該常量表示當(dāng)前SharedFlow的訂閱者的數(shù)量,
// 該常量是一個(gè)狀態(tài)流StateFlow,也是一個(gè)熱流,當(dāng)其中數(shù)值發(fā)生變化時(shí)會(huì)進(jìn)行回調(diào)通知
public val subscriptionCount: StateFlow<Int>
// 用于清空replayCache
// 在調(diào)用該方法之前老的訂閱者,可以繼續(xù)收到replaycache中的緩存數(shù)據(jù),
// 在調(diào)用該方法之后的新的訂閱者,只能收到emit方法發(fā)射的新數(shù)據(jù)
@ExperimentalCoroutinesApi
public fun resetReplayCache()
}
2.異步熱數(shù)據(jù)流的使用
1)MutableSharedFlow方法
在協(xié)程中,可以通過調(diào)用MutableSharedFlow方法創(chuàng)建一個(gè)MutableSharedFlow接口指向的對(duì)象,代碼如下:
public fun <T> MutableSharedFlow(
replay: Int = 0,
extraBufferCapacity: Int = 0,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
...
}
其中構(gòu)造方法中三個(gè)參數(shù)的含義如下:
- replay:表示新訂閱的接收者可以收到的最近已經(jīng)發(fā)射的數(shù)據(jù)的數(shù)量,默認(rèn)為0。
- extraBufferCapacity:表示除replay外,當(dāng)發(fā)射速度大于接收速度時(shí)數(shù)據(jù)可緩存的數(shù)量,默認(rèn)為0。
- onBufferOverflow:表示當(dāng)緩存已滿,數(shù)據(jù)即將溢出時(shí)的數(shù)據(jù)的處理策略,默認(rèn)為SUSPEND。
當(dāng)創(chuàng)建MutableSharedFlow類型的對(duì)象時(shí),可以通過參數(shù)replay確定SharedFlow接口中定義的replayCache的最大容量,通過參數(shù)extraBufferCapacity設(shè)置一個(gè)不包括replay大小的緩存數(shù)量。replayCache本質(zhì)上也是緩存的一部分,因此extraBufferCapacity與replay共同決定了緩存的大小。
對(duì)于處理數(shù)據(jù)慢的訂閱者,可以通過從緩存中獲取數(shù)據(jù),以此來避免發(fā)射者的掛起。緩存的數(shù)量大小決定了數(shù)據(jù)處理快的訂閱者與數(shù)據(jù)處理慢的訂閱者之間的延遲程度。
當(dāng)使用默認(rèn)的構(gòu)造方法創(chuàng)建MutableSharedFlow類型的對(duì)象時(shí),它的緩存數(shù)量為0。當(dāng)調(diào)用它的emit方法時(shí)會(huì)直接掛起,直到所有的訂閱者都處理完當(dāng)前emit方法發(fā)送的數(shù)據(jù),才會(huì)恢復(fù)emit方法的掛起。如果MutableSharedFlow類型的對(duì)象沒有訂閱者,則調(diào)用emit方法會(huì)直接返回。
2)使用示例
代碼如下:
private suspend fun test() {
// 創(chuàng)建一個(gè)熱流
val flow = MutableSharedFlow<Int>(2, 3, BufferOverflow.SUSPEND)
// 啟動(dòng)一個(gè)協(xié)程,發(fā)射數(shù)據(jù):1
// 由于有緩存,因此會(huì)被添加到緩存中,不會(huì)掛起
GlobalScope.launch {
flow.emit(1)
}
// 將MutableSharedFlow對(duì)象轉(zhuǎn)換為SharedFlow對(duì)象
// SharedFlow對(duì)象不能調(diào)用emit方法,因此只能用于接收
val onlyReadFlow = flow.asSharedFlow()
// 接收者1
// 啟動(dòng)一個(gè)新協(xié)程
GlobalScope.launch {
// 訂閱監(jiān)聽,當(dāng)collect方法觸發(fā)訂閱時(shí),會(huì)首先會(huì)調(diào)onSubscription方法
onlyReadFlow.onSubscription {
Log.d("liduozuishuai", "test0: ")
// 發(fā)射數(shù)據(jù):3
// 向下游發(fā)射數(shù)據(jù):3,其他接收者收不到
emit(3)
}.onEach {
// 處理接收的數(shù)據(jù)
Log.d("liduozuishuai", "test1: $it")
}.collect()
}
// 接收者2
// 啟動(dòng)一個(gè)新的協(xié)程
GlobalScope.launch {
// 觸發(fā)并處理接收的數(shù)據(jù)
onlyReadFlow.collect {
Log.d("liduozuishuai", "test2: $it")
}
}
// 發(fā)送數(shù)據(jù):2
GlobalScope.launch {
flow.emit(2)
}
}
對(duì)于上面的代碼,接收者1會(huì)依次打印出:3、1、2,接收者2會(huì)依次打印出1、2。
原文鏈接:https://juejin.cn/post/7144646576949395486
相關(guān)推薦
- 2022-07-09 docker 中進(jìn)程的信號(hào)
- 2022-09-27 Python?Matplotlib繪制扇形圖標(biāo)簽重疊問題解決過程_python
- 2022-09-29 C++Vector容器常用函數(shù)接口詳解_C 語言
- 2022-03-23 Unity3d實(shí)現(xiàn)無限循環(huán)滾動(dòng)背景_C#教程
- 2022-11-06 golang?RPC包原理和使用詳細(xì)介紹_Golang
- 2022-04-19 基于HarmonyOS 的ArkUI編寫的社區(qū)類app(一)----隱私服務(wù)條款界面
- 2022-04-04 微信小程序:返回上一頁,刷新頁面內(nèi)容
- 2022-09-03 Python?groupby函數(shù)圖文詳解_python
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支