網站首頁 編程語言 正文
前言
隨著時間的推移,越來越多的主流應用已經開始全面擁抱Kotlin
,協程的引入,Flow
的誕生,給予了開發很多便捷,作為協程與響應式編程結合的流式處理框架,一方面它簡單的數據轉換與操作符,沒有繁瑣的操作符處理,廣受大部分開發的青睞,另一方面它并沒有響應式編程帶來的背壓問題(BackPressure
)的困擾;接下來,本文將會就Flow如何解決背壓問題進行探討
關于背壓(BackPressure)
背壓問題是什么
首先我們要明確背壓問題是什么,它是如何產生的?簡單來說,在一般的流處理框架中,消息的接收處理速度跟不上消息的發送速度,從而導致數據不匹配,造成積壓。如果不及時正確處理背壓問題,會導致一些嚴重的問題
- 比如說,消息擁堵了,系統運行不暢從而導致崩潰
- 比如說,資源被消耗殆盡,甚至會發生數據丟失的情況
如下圖所示,可以直觀了解背壓問題的產生,它在生產者的生產速率高于消費者的處理速率的情況下出現
定義背壓策略
既然知道了背壓問題我們已經知道是如何產生的,就要去嘗試如何正確處理它,大致意思是,如果你有一個流,你需要一個緩沖區,以防數據產生的速度快于消耗的速度,所以往往就會針對這個背壓策略進行些討論
- 定義的中間緩沖區需要多大才比較合適?
- 如果緩沖區數據已滿了,我們怎么樣處理新的事件?
對于以上問題,通過學習Flow
里的背壓策略,相信可以很快就知道答案了
Flow的背壓機制
由于Flow
是基于協程中使用的,它不需要一些巧妙設計的解決方案來明確處理背壓,在Flow
中,不同于一些傳統的響應式框架,它的背壓管理是使用Kotlin
掛起函數suspend
實現的,看下源碼你會發現,它里面所有的函數方法都是使用suspend
修飾符標記,這個修飾符就是為了暫停調度者的執行不阻塞線程。因此,Flow<T>
在同一個協程中發射和收集時,如果收集器跟不上數據流,它可以簡單地暫停元素的發射,直到它準備好接收更多。看到這,是不是覺得有點難懂.......
簡單舉個例子,假設我們擁有一個烤箱,可以用來烤面包,由于烤箱容量的限制,一次只能烤4個面包,如果你試著一次烤8個面包,會大大加大烤箱的承載負荷,這已經遠遠超過了它的內存使用量,很有可能會因此燒掉你的面包。
模擬背壓問題
回顧下之前所說的,當我們消耗的速度比生產的速度慢的時候,就會產生背壓,下面用代碼來模擬下這個過程
首先先創建一個方法,用來每秒發送元素
fun currentTime() = System.currentTimeMillis() fun threadName() = Thread.currentThread().name var start: Long = 0 ? fun createEmitter(): Flow<Int> = (1..5) .asFlow() .onStart { start = currentTime() } .onEach { delay(1000L) print("Emit $it (${currentTime() - start}ms) ") }
接著需要收集元素,這里我們延遲3秒再接收元素, 延遲是為了夸大緩慢的消費者并創建一個超級慢的收集器。
fun main() { runBlocking { val time = measureTimeMillis { createEmitter().collect { print("\nCollect $it starts ${start - currentTime()}ms") delay(3000L) println(" Collect $it ends ${currentTime() - start}ms") } } print("\nCollected in $time ms") } }
看下輸出結果,如下圖所示
這樣整個過程下來,大概需要20多秒才能結束,這里我們模擬了接收元素比發送元素慢的情況,因此就需要一個背壓機制,而這正是Flow本質中的,它并不需要另外的設計來解決背壓
背壓處理方式
使用buffer進行緩存收集
為了使緩沖和背壓處理正常工作,我們需要在單獨的協程中運行收集器。這就是.buffer()
操作符進來的地方,它是將所有發出的項目發送Channel
到在單獨的協程中運行的收集器。
public fun <T> Flow<T>.buffer( capacity: Int = BUFFERED, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND ): Flow<T>
它還為我們提供了緩沖功能,我們可以指定capacity
我們的緩沖區和處理策略onBufferOverflow
,所以當Buffer
溢出的時候,它為我們提供了三個選項
enum BufferOverflow { SUSPEND, DROP_OLDEST, DROP_LATEST }
- 默認使用
SUSPEND
:會將當前協程掛起,直到緩沖區中的數據被消費了 -
DROP_OLDEST
:它會丟棄最老的數據 -
DROP_LATEST
: 它會丟棄最新的數據
好的,我們回到上文所展示的模擬示例,這時候我們可以加入緩沖收集buffer
,不指定任何參數,這樣默認就是使用SUSPEND
,它會將當前協程進行掛起
此時當收集器繁忙的時候,程序就開始緩沖,并在第一次收集方法調用結束的時候,兩次發射后再次開始收集,此時流程的耗時時長縮短到大約16秒就可以執行完畢,如下圖所示輸出結果
使用conflate解決
conflate
操作符于Channel
中的Conflate
模式是一直的,新數據會直接覆蓋掉舊數據,它不設緩沖區,也就是緩沖區大小為 0,丟棄舊數據,也就是采取 DROP_OLDEST
策略,那么不就等于buffer(0,BufferOverflow.DROP_OLDEST)
,可以看下它的源碼可以佐證我們的判斷
public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)
在某些情況下,由于根本原因是解決生產消費速率不匹配的問題,我們需要做一些取舍的操作,conflate
將丟棄掉舊數據,只有在收集器空閑之前發出的最后一個元素才被收集,將上文的模擬實例改為conflate
執行,你會發現我們直接丟棄掉了2和4,或者說新的數據直接覆蓋掉了它們,整個流程只需要10秒左右就執行完成了
使用collectLatest解決
通過官方介紹,我們知道collectLatest
作用在于當原始流發出一個新的值的時候,前一個值的處理將被取消,也就是不會被接收, 和conflate
的區別在于它不會用新的數據覆蓋,而是每一個都會被處理,只不過如果前一個還沒被處理完后一個就來了的話,處理前一個數據的邏輯就會被取消
suspend fun <T> Flow<T>.collectLatest(action: suspend (T) -> Unit)
還是上文的模擬實例,這里我們使用collectLatest
看下輸出結果:
這樣也是有副作用的,如果每個更新都非常重要,例如一些視圖,狀態刷新,這個時候就不必要用collectLatest
; 當然如果有些更新可以無損失的覆蓋,例如數據庫刷新,就可以使用到collectLatest
,具體詳細的使用場景,還需要靠開發者自己去衡量選擇使用
小結
對于Flow
可以說不需要額外提供什么巧妙的方式解決背壓問題,Flow
的本質,亦或者說Kotlin
協程本身就已經提供了相應的解決方案;開發者只需要在不同的場景中選擇正確的背壓策略即可。總的來說,它們都是通過使用Kotlin
掛起函數suspend
,當流的收集器不堪重負時,它可以簡單地暫停發射器,然后在準備好接受更多元素時恢復它。
關于掛起函數suspend
這里就不過多贅述了,只需要明白的一點是它與傳統的基于線程的同步數據管道中背壓管理非常相似,無非就是,緩慢的消費者通過阻塞生產者的線程自動向生產者施加背壓,簡單來說,suspend
通過透明地管理跨線程的背壓而不阻塞它們,將其超越單個線程并進入異步編程領域。
原文鏈接:https://juejin.cn/post/7165380647304282126
相關推薦
- 2022-03-15 eclipse文件上傳錯誤:the request doesn‘t contain a multip
- 2022-11-30 React中常見的TypeScript定義實戰教程_React
- 2022-06-06 解決:Access denied for user ‘root‘@‘localhost‘ (usin
- 2022-02-26 Assert.assertEquals()方法參數詳解_Android
- 2022-08-03 GoFrame框架garray并發安全數組使用開箱體驗_Golang
- 2022-04-08 WPF綁定Binding用法_基礎應用
- 2023-01-13 C#實現動態圖標閃爍顯示的示例代碼_C#教程
- 2023-02-10 一文教會你用Python實現pdf轉word_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支