日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Flow如何解決背壓問題的方法詳解_Android

作者:RainyJiang ? 更新時間: 2022-12-11 編程語言

前言

隨著時間的推移,越來越多的主流應用已經開始全面擁抱Kotlin,協程的引入,Flow的誕生,給予了開發很多便捷,作為協程與響應式編程結合的流式處理框架,一方面它簡單的數據轉換與操作符,沒有繁瑣的操作符處理,廣受大部分開發的青睞,另一方面它并沒有響應式編程帶來的背壓問題(BackPressure)的困擾;接下來,本文將會就Flow如何解決背壓問題進行探討

關于背壓(BackPressure)

背壓問題是什么

首先我們要明確背壓問題是什么,它是如何產生的?簡單來說,在一般的流處理框架中,消息的接收處理速度跟不上消息的發送速度,從而導致數據不匹配,造成積壓。如果不及時正確處理背壓問題,會導致一些嚴重的問題

  • 比如說,消息擁堵了,系統運行不暢從而導致崩潰
  • 比如說,資源被消耗殆盡,甚至會發生數據丟失的情況

如下圖所示,可以直觀了解背壓問題的產生,它在生產者的生產速率高于消費者的處理速率的情況下出現

定義背壓策略

既然知道了背壓問題我們已經知道是如何產生的,就要去嘗試如何正確處理它,大致意思是,如果你有一個流,你需要一個緩沖區,以防數據產生的速度快于消耗的速度,所以往往就會針對這個背壓策略進行些討論

  • 定義的中間緩沖區需要多大才比較合適?
  • 如果緩沖區數據已滿了,我們怎么樣處理新的事件?

對于以上問題,通過學習Flow里的背壓策略,相信可以很快就知道答案了

Flow的背壓機制

由于Flow是基于協程中使用的,它不需要一些巧妙設計的解決方案來明確處理背壓,在Flow中,不同于一些傳統的響應式框架,它的背壓管理是使用Kotlin掛起函數suspend實現的,看下源碼你會發現,它里面所有的函數方法都是使用suspend修飾符標記,這個修飾符就是為了暫停調度者的執行不阻塞線程。因此,Flow<T>在同一個協程中發射和收集時,如果收集器跟不上數據流,它可以簡單地暫停元素的發射,直到它準備好接收更多。看到這,是不是覺得有點難懂.......

簡單舉個例子,假設我們擁有一個烤箱,可以用來烤面包,由于烤箱容量的限制,一次只能烤4個面包,如果你試著一次烤8個面包,會大大加大烤箱的承載負荷,這已經遠遠超過了它的內存使用量,很有可能會因此燒掉你的面包。

模擬背壓問題

回顧下之前所說的,當我們消耗的速度比生產的速度慢的時候,就會產生背壓,下面用代碼來模擬下這個過程

首先先創建一個方法,用來每秒發送元素

fun currentTime() = System.currentTimeMillis()
fun threadName() = Thread.currentThread().name
var start: Long = 0
?
fun createEmitter(): Flow<Int> =
    (1..5)
        .asFlow()
        .onStart { start = currentTime() }
        .onEach {
            delay(1000L)
            print("Emit $it (${currentTime() - start}ms) ")
        }

接著需要收集元素,這里我們延遲3秒再接收元素, 延遲是為了夸大緩慢的消費者并創建一個超級慢的收集器。

fun main() {
    runBlocking {
        val time = measureTimeMillis {
            createEmitter().collect {
                print("\nCollect $it starts ${start - currentTime()}ms")
                delay(3000L)
                println("   Collect $it ends ${currentTime() - start}ms")
            }
        }
        print("\nCollected in $time ms")
    }
}

看下輸出結果,如下圖所示

這樣整個過程下來,大概需要20多秒才能結束,這里我們模擬了接收元素比發送元素慢的情況,因此就需要一個背壓機制,而這正是Flow本質中的,它并不需要另外的設計來解決背壓

背壓處理方式

使用buffer進行緩存收集

為了使緩沖和背壓處理正常工作,我們需要在單獨的協程中運行收集器。這就是.buffer()操作符進來的地方,它是將所有發出的項目發送Channel到在單獨的協程中運行的收集器。

public fun <T> Flow<T>.buffer(
    capacity: Int = BUFFERED, 
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND
): Flow<T>

它還為我們提供了緩沖功能,我們可以指定capacity我們的緩沖區和處理策略onBufferOverflow,所以當Buffer溢出的時候,它為我們提供了三個選項

enum BufferOverflow { 
   SUSPEND,
   DROP_OLDEST,
   DROP_LATEST
 }
  • 默認使用SUSPEND:會將當前協程掛起,直到緩沖區中的數據被消費了
  • DROP_OLDEST:它會丟棄最老的數據
  • DROP_LATEST: 它會丟棄最新的數據

好的,我們回到上文所展示的模擬示例,這時候我們可以加入緩沖收集buffer,不指定任何參數,這樣默認就是使用SUSPEND,它會將當前協程進行掛起

此時當收集器繁忙的時候,程序就開始緩沖,并在第一次收集方法調用結束的時候,兩次發射后再次開始收集,此時流程的耗時時長縮短到大約16秒就可以執行完畢,如下圖所示輸出結果

使用conflate解決

conflate操作符于Channel中的Conflate模式是一直的,新數據會直接覆蓋掉舊數據,它不設緩沖區,也就是緩沖區大小為 0,丟棄舊數據,也就是采取 DROP_OLDEST 策略,那么不就等于buffer(0,BufferOverflow.DROP_OLDEST),可以看下它的源碼可以佐證我們的判斷

public fun <T> Flow<T>.conflate(): Flow<T> = buffer(CONFLATED)

在某些情況下,由于根本原因是解決生產消費速率不匹配的問題,我們需要做一些取舍的操作,conflate將丟棄掉舊數據,只有在收集器空閑之前發出的最后一個元素才被收集,將上文的模擬實例改為conflate執行,你會發現我們直接丟棄掉了2和4,或者說新的數據直接覆蓋掉了它們,整個流程只需要10秒左右就執行完成了

使用collectLatest解決

通過官方介紹,我們知道collectLatest作用在于當原始流發出一個新的值的時候,前一個值的處理將被取消,也就是不會被接收, 和conflate的區別在于它不會用新的數據覆蓋,而是每一個都會被處理,只不過如果前一個還沒被處理完后一個就來了的話,處理前一個數據的邏輯就會被取消

suspend fun <T> Flow<T>.collectLatest(action: suspend (T) -> Unit)

還是上文的模擬實例,這里我們使用collectLatest看下輸出結果:

這樣也是有副作用的,如果每個更新都非常重要,例如一些視圖,狀態刷新,這個時候就不必要用collectLatest ; 當然如果有些更新可以無損失的覆蓋,例如數據庫刷新,就可以使用到collectLatest,具體詳細的使用場景,還需要靠開發者自己去衡量選擇使用

小結

對于Flow可以說不需要額外提供什么巧妙的方式解決背壓問題,Flow的本質,亦或者說Kotlin協程本身就已經提供了相應的解決方案;開發者只需要在不同的場景中選擇正確的背壓策略即可。總的來說,它們都是通過使用Kotlin掛起函數suspend,當流的收集器不堪重負時,它可以簡單地暫停發射器,然后在準備好接受更多元素時恢復它。

關于掛起函數suspend這里就不過多贅述了,只需要明白的一點是它與傳統的基于線程的同步數據管道中背壓管理非常相似,無非就是,緩慢的消費者通過阻塞生產者的線程自動向生產者施加背壓,簡單來說,suspend通過透明地管理跨線程的背壓而不阻塞它們,將其超越單個線程并進入異步編程領域。

原文鏈接:https://juejin.cn/post/7165380647304282126

欄目分類
最近更新