網站首頁 編程語言 正文
Kotlin中SharedFlow的使用 VS StateFlow
SharedFlow 是繼承于 Flow ,同時它是 StateFlow 的父類,它們都是是熱流,先說一下冷流與熱流的概念。
- 冷流 :只有訂閱者訂閱時,才開始執行發射數據流的代碼。并且冷流和訂閱者只能是一對一的關系,當有多個不同的訂閱者時,消息是重新完整發送的。也就是說對冷流而言,有多個訂閱者的時候,他們各自的事件是獨立的。
- 熱流:無論有沒有訂閱者訂閱,事件始終都會發生。當 熱流有多個訂閱者時,熱流與訂閱者們的關系是一對多的關系,可以與多個訂閱者共享信息。
SharedFlow的特點
- SharedFlow沒有默認值
- SharedFlow可以保存舊的數據,根據配置可以將舊的數據回播給新的訂閱者
- SharedFlow使用emit/tryEmit發射數據,StateFlow內部其實都是調用的setValue。
- SharedFlow會掛起直到所有的訂閱者處理完成。
為什么我先講的 StateFlow ,而不是SharedFlow,是因為 StateFlow 是 繼承 SharedFlow 實現,是在其基礎的場景化實現,我們可以把 StateFlow 理解為是 SharedFlow 的 “青春版”。并不是它更輕量,而是它使用更簡單。
我們舉例看看怎么使用 SharedFlow,看看它與 StateFlow的區別。
既然 StateFlow 是 繼承 SharedFlow 實現,那么StateFlow
一、SharedFlow的使用
方式一,我們自己 new 出來
public fun <T> MutableSharedFlow(
// 重放數據個數
replay: Int = 0,
// 額外緩存容量
extraBufferCapacity: Int = 0,
// 緩存溢出策略
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): MutableSharedFlow<T> {
val bufferCapacity0 = replay + extraBufferCapacity
val bufferCapacity = if (bufferCapacity0 < 0) Int.MAX_VALUE else bufferCapacity0 // coerce to MAX_VALUE on overflow
return SharedFlowImpl(replay, bufferCapacity, onBufferOverflow)
}
public enum class BufferOverflow {
// 掛起
SUSPEND,
// 丟棄最早的一個
DROP_OLDEST,
// 丟棄最近的一個
DROP_LATEST
}
舉例說明
@HiltViewModel
class Demo4ViewModel @Inject constructor(
val savedState: SavedStateHandle
) : BaseViewModel() {
private val _sharedFlow = MutableSharedFlow<String>(replay = 1, onBufferOverflow = BufferOverflow.SUSPEND)
val sharedFlow: SharedFlow<String> = _sharedFlow
fun changeSearch(keyword: String) {
_sharedFlow.tryEmit(keyword)
}
}
在Activity中我們就可以像類似 LiveData 一樣的使用 SharedFlow
private fun testflow() {
mViewModel.changeSearch("key")
}
override fun startObserve() {
mViewModel.sharedFlow.collect {
YYLogUtils.w("value $it")
}
}
方式二,通過一個 冷流 Flow 轉換為 sharedFlow
class NewsRemoteDataSource(...,
private val externalScope: CoroutineScope,
) {
val latestNews: Flow<List<ArticleHeadline>> = flow {
...
}.shareIn(
externalScope,
replay = 1,
started = SharingStarted.WhileSubscribed() // 啟動政策
)
}
幾個重要參數的說明如下
- scope 共享開始時所在的協程作用域范圍
- started 控制共享的開始和結束的策略
- replay 為0 代表不重放,也就是沒有粘性,為1 代表重放最新的一個數據
scope 和 replay 不需要過多解釋,主要介紹下 started: SharingStarted 啟動策略,分為三種:
Eagerly(熱啟動式): 立即啟動數據流,并保持數據流(直到 scope 指定的作用域結束);
Lazily(懶啟動式): 在首個訂閱者注冊時啟動,并保持數據流(直到 scope 指定的作用域結束);
WhileSubscribed(): 在首個訂閱者注冊時啟動,并保持數據流直到在最后一個訂閱者注銷時結束(或直到 scope 指定的作用域結束)。
使用示例:
val sharedFlow = flowOf(1, 2, 3).shareIn(
scope = lifecycleScope,
// started = WhileSubscribed(5000, 1000),
// started = Eagerly,
started = Lazily,
replay = 0
)
lifecycleScope.launch {
sharedFlow.collect {
YYLogUtils.w("shared-value $it")
}
}
打印結果:
創建的幾種方式基本和StateFlow類似,那么它們之間有什么區別?
二、SharedFlow、StateFlow、LiveData的對比
我們直接舉例,實現 LiveData 的功能。我們看看 LiveData StateFlow SharedFlow 實現同樣的效果如何操作
@HiltViewModel
class Demo4ViewModel @Inject constructor(
val savedState: SavedStateHandle
) : BaseViewModel() {
private val _searchLD = MutableLiveData<String>()
val searchLD: LiveData<String> = _searchLD
private val _searchFlow = MutableStateFlow("")
val searchFlow: StateFlow<String> = _searchFlow
private val _sharedFlow = MutableSharedFlow<String>(replay = 1, onBufferOverflow = BufferOverflow.SUSPEND)
val sharedFlow: SharedFlow<String> = _sharedFlow
fun changeSearch(keyword: String) {
_sharedFlow.tryEmit(keyword)
_searchFlow.value = keyword
_searchLD.value = keyword
}
}
打印的結果:
可以看到 SharedFlow 通過設置之后是可以達到 LiveData 和 StateFlow 的效果的。
SharedFlow對比StateFlow的優勢,不需要設置默認值,沒有默認值的發送。
SharedFlow對比StateFlow的劣勢,不能自由取值,這是致命的。
例如下面的代碼,StateFlow 我可以在代碼的任意地方取值,但是 SharedFlow 只能接收流,不能自由取值。
所以,我們一般才說 StateFlow 平替 LiveData,雖然 SharedFlow 可以通過 參數的方式達到一部分 LiveData 的效果,但是痛點更明顯。
另外需要說明的是 StateFlow 與 SharedFlow 這么設置是去重的,也就是說如果點擊登錄按鈕之后登錄失敗報告密碼錯誤,然后再次點擊登錄按鈕,就不會彈出吐司了。
這不符合我們的業務場景啊,如果按照 StateFlow 平替 LiveData 的原則,我們還需要改用 Channel 的方式才行 (畢竟SharedFlow不能自由取值真的不適合這個場景)。
@HiltViewModel
class Demo4ViewModel @Inject constructor(
val savedState: SavedStateHandle
) : BaseViewModel() {
val channel = Channel<String>(Channel.CONFLATED)
private val _searchLD = MutableLiveData<String>()
val searchLD: LiveData<String> = _searchLD
private val _searchFlow = MutableStateFlow("")
val searchFlow: StateFlow<String> = _searchFlow
private val _sharedFlow = MutableSharedFlow<String>(replay = 1, onBufferOverflow = BufferOverflow.SUSPEND)
val sharedFlow: SharedFlow<String> = _sharedFlow
fun changeSearch(keyword: String) {
_sharedFlow.tryEmit(keyword)
_searchFlow.value = keyword
_searchLD.value = keyword
channel.trySend(keyword)
}
}
private fun testflow() {
mViewModel.changeSearch("1234")
}
override fun startObserve() {
mViewModel.searchLD.observe(this) {
YYLogUtils.w("value $it")
}
lifecycleScope.launch {
mViewModel.sharedFlow.collect {
YYLogUtils.w("shared-value1 $it")
}
}
lifecycleScope.launch {
mViewModel.channel.consumeAsFlow().collect {
YYLogUtils.w("shared-value2 $it")
}
}
lifecycleScope.launchWhenCreated {
mViewModel.searchFlow.collect {
YYLogUtils.w("state-value $it")
}
}
}
我們加入了使用 Channel 的方式,前文我們講過 Channel 是協程中的通信通道,我們這邊發送那一邊轉為Flow來collect。打印結果如下:
好麻煩哦,這還不如LiveData呢,所以大家知道 StateFlow 與 LiveData 的優缺點之后,按需選擇即可。
三、SharedFlow 的粘性設置與事件總線
可以看到雖然 SharedFlow 不能平替 LiveData ,但是它在事件的發送與接收相關的配置與使用到時得天獨厚,我們常用于事件總線的實現,例如SharedFlowBus,用于替代 EventBus
object FlowBus {
private val busMap = mutableMapOf<String, EventBus<*>>()
private val busStickMap = mutableMapOf<String, StickEventBus<*>>()
@Synchronized
fun <T> with(key: String): EventBus<T> {
var eventBus = busMap[key]
if (eventBus == null) {
eventBus = EventBus<T>(key)
busMap[key] = eventBus
}
return eventBus as EventBus<T>
}
@Synchronized
fun <T> withStick(key: String): StickEventBus<T> {
var eventBus = busStickMap[key]
if (eventBus == null) {
eventBus = StickEventBus<T>(key)
busStickMap[key] = eventBus
}
return eventBus as StickEventBus<T>
}
//真正實現類
open class EventBus<T>(private val key: String) : LifecycleObserver {
//私有對象用于發送消息
private val _events: MutableSharedFlow<T> by lazy {
obtainEvent()
}
//暴露的公有對象用于接收消息
val events = _events.asSharedFlow()
open fun obtainEvent(): MutableSharedFlow<T> = MutableSharedFlow(0, 1, BufferOverflow.DROP_OLDEST)
//主線程接收數據
fun register(lifecycleOwner: LifecycleOwner, action: (t: T) -> Unit) {
lifecycleOwner.lifecycle.addObserver(this)
lifecycleOwner.lifecycleScope.launch {
events.collect {
try {
action(it)
} catch (e: Exception) {
e.printStackTrace()
YYLogUtils.e("FlowBus - Error:$e")
}
}
}
}
//協程中發送數據
suspend fun post(event: T) {
_events.emit(event)
}
//主線程發送數據
fun post(scope: CoroutineScope, event: T) {
scope.launch {
_events.emit(event)
}
}
//自動銷毀
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun onDestroy() {
YYLogUtils.w("FlowBus - 自動onDestroy")
val subscriptCount = _events.subscriptionCount.value
if (subscriptCount <= 0)
busMap.remove(key)
}
}
class StickEventBus<T>(key: String) : EventBus<T>(key) {
override fun obtainEvent(): MutableSharedFlow<T> = MutableSharedFlow(1, 1, BufferOverflow.DROP_OLDEST)
}
}
發送與接收消息
// 主線程-發送消息
FlowBus.with<String>("test-key-01").post(this@Demo11OneFragment2.lifecycleScope, "Test Flow Bus Message")
// 接收消息
FlowBus.with<String>("test-key-01").register(this) {
LogUtils.w("收到FlowBus消息 - " + it)
}
發送粘性消息
FlowBus.withStick<String>("test-key-02").post(lifecycleScope, "Test Stick Message")
// 接收粘性消息
FlowBus.withStick<String>("test-key-02").register(this){
LogUtils.w("收到粘性消息:$it")
}
看源碼就知道粘性的實現就得益于 SharedFlow 的構造參數
replay的設置 ,代表重放的數據個數
replay 為0 代表不重放,也就是沒有粘性
replay 為1 代表重放最新的一個數據,后來的接收器能接受1個最新數據。
replay 為2 代表重放最新的兩個數據,后來的接收器能接受2個最新數據。
我們知道Flow的操作符有針對背壓的處理,那么 SharedFlow 內部還對背壓做了快速處理。我們只需要通過參數快速設置即可實現。
extraBufferCapacity的設置,額外數據的緩存
當上游事件發送過快,而消費太慢的情況,這種情況下,就需要使用緩存池,把未消費的數據存下來。
緩沖池容量 = replay + extraBufferCapacity
如果總量為 0 ,就 Int.MAX_VALUE
onBufferOverflow的設置
如果指定了有限的緩存容量,那么超過容量以后怎么辦?
BufferOverflow.SUSPEND : 超過就掛起,默認實現
BufferOverflow.DROP_OLDEST : 丟棄最老的數據
BufferOverflow.DROP_LATEST : 丟棄最新的數據
總結
StateFlow 更加簡便特定的場景使用,而 SharedFlow 更加的靈活,他們兩者的側重點也不同。
SharedFlow 基于緩存的處理可以實現一些特定的需求,如當發生訂閱時,我需要將過去已經更新的N個值,同步給新的訂閱者。比如有多個新的訂閱者都想訂閱這些改動的值。都可以使用 SharedFlow 來實現
而關于 SharedFlow、StateFlow、LiveData的對比,個人的結論是:根據不同的場景 LiveData StateFlow SharedFlow 都有自己特定的使用場景,誰也無法真的完全平替誰。誰也不是誰的超集,都有它們各自的有點和缺點,并不能完美覆蓋所有場景,所以根據使用的場景不同按需選擇即可。
關于StateFlow 與 SharedFlow 的實戰,后面會總結一期。
原文鏈接:https://juejin.cn/post/7127454075666300965
相關推薦
- 2022-07-22 CSS3:盒陰影、邊界圖片、指定每一個圓角、背景、過度、動畫、
- 2022-07-18 async+await:發送Ajax請求
- 2022-10-31 Kotlin類與屬性及構造函數的使用詳解_Android
- 2023-07-04 Linux直接創建SSH無密碼連接
- 2023-02-14 詳解Go語言如何利用高階函數寫出優雅的代碼_Golang
- 2022-07-15 Python打印數據類型的全過程_python
- 2022-06-25 JQuery選擇器用法詳解_jquery
- 2022-09-29 C#/VB.NET中從?PDF?文檔中提取所有表格_C#教程
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支