網(wǎng)站首頁(yè) 編程語(yǔ)言 正文
正文
Kotlin協(xié)程中的Flow主要用于處理復(fù)雜的異步數(shù)據(jù),以一種”流“的方式,從上到下依次處理,和RxJava的處理方式類(lèi)型,但是比后者更加強(qiáng)大。
Flow基本概念
Flow中基本上有三個(gè)概念,即 發(fā)送方,處理中間層,接收方,可以類(lèi)比水利發(fā)電站中的上游,發(fā)電站,下游的概念, 數(shù)據(jù)從上游開(kāi)始發(fā)送”流淌“至中間站被”處理“了一下,又流淌到了下游。
示例代碼如下
flow { // 發(fā)送方、上游
emit(1) // 掛起函數(shù),發(fā)送數(shù)據(jù)
emit(2)
emit(3)
emit(4)
emit(5)
}
.filter { it > 2 } // 中轉(zhuǎn)站,處理數(shù)據(jù)
.map { it * 2 }
.take(2)
.collect{ // 接收方,下游
println(it)
}
輸出內(nèi)容:
6
8
通過(guò)上面代碼我們可以看到,基于一種鏈?zhǔn)秸{(diào)用api的方式,流式的進(jìn)行處理數(shù)據(jù)還是很棒的,接下來(lái)具體看一下上面的組成:
- flow{},是個(gè)高階函數(shù),主要用于創(chuàng)建一個(gè)新的Flow。在其Lambda函數(shù)內(nèi)部使用了emit()掛起函數(shù)進(jìn)行發(fā)送數(shù)據(jù)。
- filter{}、map{}、take{},屬于中間處理層,也是中間數(shù)據(jù)處理的操作符,F(xiàn)low最大的優(yōu)勢(shì),就是它的操作符跟集合操作符高度一致。只要會(huì)用List、Sequence,那么就可以快速上手 Flow 的操作符。
- collect{},下游接收方,也成為終止操作符,它的作用其實(shí)只有一個(gè):終止Flow數(shù)據(jù)流,并且接收這些數(shù)據(jù)。
其他創(chuàng)建Flow的方式還是flowOf()函數(shù),示例代碼如下
fun main() = runBlocking{aassssssssaaaaaaaas
flowOf(1,2,3,4,5).filter { it > 2 }
.map { it * 2 }
.take(2)
.collect{
println("flowof: $it")
}
}
我們?cè)诳匆幌耹ist集合的操作示例
listOf(1,2,3,4,5).filter { it > 2 }
.map { it * 2 }
.take(2)
.forEach{
println("listof: $it")
}
通過(guò)以上對(duì)比發(fā)現(xiàn),兩者的基本操作幾乎一致,Kotlin也提供了兩者相互轉(zhuǎn)換的API,F(xiàn)low.toList()、List.asFlow()這兩個(gè)擴(kuò)展函數(shù),讓數(shù)據(jù)在 List、Flow 之間來(lái)回轉(zhuǎn)換,示例代碼如下:
//flow 轉(zhuǎn)list
flowOf(1,2,3)
.toList()
.filter { it > 1 }
.map { it * 2 }
.take(2)
.forEach{
println(it)
}
// list 轉(zhuǎn) flow
listOf(1,2,3).asFlow()
.filter { it > 2 }
.map { it * 2 }
.take(2)
.collect{
println(it)
}
Flow生命周期
雖然從上面操作看和集合類(lèi)型,但是Flow還是有些特殊操作符的,畢竟它是協(xié)程的一部分,和Channel不同,F(xiàn)low是有生命周期的,只是以操作符的形式回調(diào)而已,比如onStart、onCompletion這兩個(gè)中間操作符。
flowOf(1,2,3,4,5,6)
.filter {
println("filter: $it")
it > 3
}
.map {
println("map: $it")
it * 2
}
.take(2)
.onStart { println("onStart") }
.collect{
println("collect: $it")
}
輸出內(nèi)容:
onStart
filter: 1
filter: 2
filter: 3
filter: 4
map: 4
collect: 8
filter: 5
map: 5
collect: 10
我們可以看到onStart,它的作用是注冊(cè)一個(gè)監(jiān)聽(tīng)事件:當(dāng) flow 啟動(dòng)以后,它就會(huì)被回調(diào)。
和filter、map、take這些中間操作符不同,他們的順序會(huì)影響數(shù)據(jù)的處理結(jié)果,這也很好理解;onStart和位置沒(méi)有關(guān)系,它本質(zhì)上是一個(gè)回調(diào),不是一個(gè)數(shù)據(jù)處理的中間站。同樣的還有數(shù)據(jù)處理完成的回調(diào)onCompletion。
flowOf(1,2,3,4,5,6)
.filter {
println("filter: $it")
it > 3
}
.map {
println("map: $it")
it * 2
}
.take(2)
.onStart { println("onStart") }
.onCompletion { println("onCompletion") }
.collect{
println("collect: $it")
}
Flow中onCompletion{} 在面對(duì)以下三種情況時(shí)都會(huì)進(jìn)行回調(diào):
- 1,F(xiàn)low 正常執(zhí)行完畢
- 2,F(xiàn)low 當(dāng)中出現(xiàn)異常
- 3,F(xiàn)low 被取消。
處理異常
在數(shù)據(jù)流的處理過(guò)程中,很難保證不出現(xiàn)問(wèn)題,那么出現(xiàn)異常之后再該怎么處理呢?
- 對(duì)于發(fā)生在上游、中間操作這兩個(gè)階段的異常,我們可以直接使用 catch 這個(gè)操作符來(lái)進(jìn)行捕獲和進(jìn)一步處理。
- 對(duì)于發(fā)生在下游,使用try-catch,把collect{}當(dāng)中可能出現(xiàn)問(wèn)題的代碼包裹起來(lái)進(jìn)行捕獲處理。
上游或者中間異常使用catch
fun main() = runBlocking{
val flow = flow {
emit(1)
emit(2)
throw IllegalStateException()
emit(3)
}
flow.map { it * 2 }
.catch { println("catch: $it") }
.collect{
println("collect: $it")
}
}
輸出:
collect: 2
collect: 4
catch: java.lang.IllegalStateException
catch 這個(gè)操作符的作用是和它的位置強(qiáng)相關(guān)的,catch 的作用域,僅限于catch的上游。換句話說(shuō),發(fā)生在 catch 上游的異常,才會(huì)被捕獲,發(fā)生在 catch 下游的異常,則不會(huì)被捕獲。
val flow = flow {
emit(1)
emit(2)
throw IllegalStateException()
emit(3)
}
flow.map { it * 2 }
.catch { println("catch: $it") }
.filter { it / 0 > 1 } // catch之后發(fā)生異常
.collect{
println("collect: $it")
}
輸出內(nèi)容:
Exception in thread "main" java.lang.ArithmeticException: / by zero
下游使用try-catch
flowOf(1,2,3)
.onCompletion { println("onCompletion $it") }
.collect{
try {
println("collect: $it")
throw IllegalStateException();
}catch (e: Exception){
println("catch $e")
}
}
輸出:
collect: 1
catch java.lang.IllegalStateException
collect: 2
catch java.lang.IllegalStateException
collect: 3
catch java.lang.IllegalStateException
onCompletion null
切換執(zhí)行線程
Flow適合處理復(fù)雜的異步任務(wù),大多數(shù)情況下耗時(shí)任務(wù)放在子線程或線程池中處理,對(duì)于UI任務(wù)放在主線程中進(jìn)行。
在Flow中可以使用flowOn操作符實(shí)現(xiàn)上述場(chǎng)景中的線程切換。
flowOf(1,2,3,4,5)
.filter {
logX("filter: $it")
it > 2 }
.flowOn(Dispatchers.IO) // 切換線程
.collect{
logX("collect: $it")
}
輸出內(nèi)容:
================================
filter: 1
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 2
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 3
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 4
Thread:DefaultDispatcher-worker-1
================================
================================
filter: 5
Thread:DefaultDispatcher-worker-1
================================
================================
collect: 3
Thread:main
================================
================================
collect: 4
Thread:main
================================
================================
collect: 5
Thread:main
================================
flowOn 操作符也是和它的位置強(qiáng)相關(guān)的。作用域限于它的上游。在上面的代碼中,flowOn 的上游,就是 flowOf{}、filter{} 當(dāng)中的代碼,所以,它們的代碼全都運(yùn)行在 DefaultDispatcher 這個(gè)線程池當(dāng)中。只有collect{}當(dāng)中的代碼是運(yùn)行在 main 線程當(dāng)中的。
終止操作符
Flow 里面,最常見(jiàn)的終止操作符就是collect。除此之外,還有一些從集合中借鑒過(guò)來(lái)的操作符,也是Flow的終止操作符。比如 first()、single()、fold{}、reduce{},本質(zhì)上來(lái)說(shuō)說(shuō)當(dāng)我們嘗試將 Flow 轉(zhuǎn)換成集合的時(shí)候,已經(jīng)不屬于Flow的API,也不屬于協(xié)程的范疇了,它本身也就意味著 Flow 數(shù)據(jù)流的終止。
"冷的數(shù)據(jù)流"從何而來(lái)
在上面文章《Kotlin協(xié)程Channel淺析》中,我們認(rèn)識(shí)到Channel是”熱數(shù)據(jù)流“,隨時(shí)準(zhǔn)備好,隨用隨取,就像海底撈里的服務(wù)員。
現(xiàn)在我們看下Flow和Channel的區(qū)別
val flow = flow {
(1..4).forEach{
println("Flow發(fā)送前:$it")
emit(it)
println("Flow發(fā)送后: $it")
}
}
val channel: ReceiveChannel<Int> = produce {
(1..4).forEach{
println("Channel發(fā)送前: $it")
send(it)
println("Channel發(fā)送后: $it")
}
}
輸出內(nèi)容:
Channel發(fā)送前: 1
Flow中的邏輯并未執(zhí)行,因此我們可以這樣類(lèi)比,Channel之所以被認(rèn)為是“熱”的原因,是因?yàn)椴还苡袥](méi)有接收方,發(fā)送方都會(huì)工作。那么對(duì)應(yīng)的,F(xiàn)low被認(rèn)為是“冷”的原因,就是因?yàn)橹挥姓{(diào)用終止操作符之后,F(xiàn)low才會(huì)開(kāi)始工作。
除此之外,F(xiàn)low一次處理一條數(shù)據(jù),是個(gè)”懶家伙“。
val flow = flow {
(3..6).forEach {
println("Flow發(fā)送前:$it")
emit(it)
println("Flow發(fā)送后: $it")
}
}.filter {
println("filter: $it")
it > 3
}.map {
println("map: $it")
it * 2
}.collect {
println("結(jié)果collect: $it")
}
輸出內(nèi)容:
Flow發(fā)送前:3
filter: 3
Flow發(fā)送后: 3
Flow發(fā)送前:4
filter: 4
map: 4
結(jié)果collect: 8
Flow發(fā)送后: 4
Flow發(fā)送前:5
filter: 5
map: 5
結(jié)果collect: 10
Flow發(fā)送后: 5
Flow發(fā)送前:6
filter: 6
map: 6
結(jié)果collect: 12
Flow發(fā)送后: 6
相比于滿(mǎn)面春風(fēng),熱情服務(wù)的Channel,Flow更像個(gè)冷漠的家伙,你不找他,他不搭理你。
- Channel,響應(yīng)速度快,但數(shù)據(jù)可能是舊的,占用資源
- Flow,響應(yīng)速度慢,但數(shù)據(jù)是最新的,節(jié)省資源
Flow也可以是”熱“的,你知道嗎?
原文鏈接:https://juejin.cn/post/7170512939445649444
相關(guān)推薦
- 2023-02-27 c++數(shù)組排序的5種方法實(shí)例代碼_C 語(yǔ)言
- 2022-06-21 C語(yǔ)言容易被忽視的函數(shù)設(shè)計(jì)原則基礎(chǔ)_C 語(yǔ)言
- 2022-12-24 Qt實(shí)現(xiàn)給窗口繪制陰影的示例代碼_C 語(yǔ)言
- 2022-06-15 golang?Gin上傳文件返回前端及中間件實(shí)現(xiàn)示例_Golang
- 2022-06-17 Ruby3多線程并行Ractor使用方法詳解_ruby專(zhuān)題
- 2022-10-03 Golang?Http請(qǐng)求返回結(jié)果處理_Golang
- 2023-07-25 springmvc全局異常處理
- 2022-04-25 ASP.NET?Core?MVC中過(guò)濾器工作原理介紹_實(shí)用技巧
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過(guò)濾器
- Spring Security概述快速入門(mén)
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡(jiǎn)單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支