網站首頁 編程語言 正文
正文
在協程啟動模式中已經知道async
是可以返回結果的,但是只返回一個,那么在復雜場景下就會不夠用了,所以Channel
就出現了。
1.認識Channel
Channel
的意思是管道、通道,用圖表示如下:
Channel
的左邊是發送方,右邊是接收方,中間則是消息,那么代碼表示就是下面這樣:
fun main() { channelTest() } fun channelTest() = runBlocking { val channel = Channel<Int>() //關鍵點① launch { for (i in 1..3) { channel.send(i) //關鍵點② logX("send: $i") } } launch { for (i in channel) { //關鍵點③ logX("receiver: $i") } } logX("end") } //輸出結果: //================================ //end //Thread:main @coroutine#1 //================================ //================================ //receiver: 1 //Thread:main @coroutine#3 //================================ //================================ //send: 1 //Thread:main @coroutine#2 //================================ //================================ //send: 2 //Thread:main @coroutine#2 //================================ //================================ //receiver: 2 //Thread:main @coroutine#3 //================================ //================================ //receiver: 3 //Thread:main @coroutine#3 //================================ //================================ //send: 3 //Thread:main @coroutine#2 //================================
上面的代碼中啟動了兩個協程,一個發送,一個接收,還有幾個關鍵點:
- 關鍵點①:通過
Channel
創建一個管道,其中泛型Int
表示發送的數據類型; - 關鍵點②:啟動一個協程通過
send
發送數據,send
是一個掛起函數; - 關鍵點③:啟動一個協程遍歷
channel
打印出接收到的消息。
那么這里還有一個問題,在執行完上述代碼后程序并沒有終止,那要如何終止程序呢?
很簡單,在發送完所有消息后調用close
方法即可。
launch { for (i in 1..3) { channel.send(i) //關鍵點② logX("send: $i") } // 修改點 // ↓ channel.close() }
Channel
也是一種協程資源,用完后如果不關閉那么這個資源就會一直被占用。
public fun <E> Channel( capacity: Int = RENDEZVOUS, onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement: ((E) -> Unit)? = null ): Channel<E> = when (capacity) { RENDEZVOUS -> { ... } CONFLATED -> { ... } UNLIMITED -> { ... } BUFFERED -> { ... } else -> { ... } }
Channel
中有三個參數:
-
capacity
: 代表管道的容量,默認值為RENDEZVOUS
,代表容量為0,除此之外還有三個類型:
- CONFLATED:代表容量為1,新的數據會替代舊的數據;
- UNLIMITED:代表無限容量;
-
BUFFERED:代表具備一定緩存的容量,默認情況下是64,具體容量由VM參數
kotlinx.coroutines.channels.defaultBuffer
決定。 -
onBufferOverflow
: 代表緩沖策略,也就是當緩沖的容量滿了之后要怎么做。默認值為SUSPEND
,表示在緩沖區溢出時掛起。除此之外還有兩個類型:
- DROP_OLDEST:在緩沖區溢出時刪除最舊的值,向緩沖區添加新值,不要掛起;
- DROP_LATEST:在緩沖區溢出時,立即刪除正在添加到緩沖區的最新值(以便緩沖區內容保持不變),不要掛起。
-
onUndeliveredElement
: 它相當于一個異常處理回調。當管道中的某些數據沒有被成功接收的時候,這個回調就會被調用
現在寫個案例看一下capacity
在其他類型下的區別
/** * Channel.CONFLATED */ fun channelTest() = runBlocking { val channel = Channel<Int>(Channel.CONFLATED) launch { for (i in 1..4) { channel.send(i) println("send: $i") } channel.close() } launch { for (i in channel) { println("receiver: $i") } } println("end") } //輸出結果: //end //send: 1 //send: 2 //send: 3 //send: 4 //receiver: 4 /** * Channel.UNLIMITED */ fun channelTest() = runBlocking { val channel = Channel<Int>(Channel.UNLIMITED) launch { for (i in 1..4) { channel.send(i) println("send: $i") } channel.close() } launch { for (i in channel) { println("receiver: $i") } } println("end") } //輸出結果: //end //send: 1 //send: 2 //send: 3 //send: 4 //receiver: 1 //receiver: 2 //receiver: 3 //receiver: 4 /** * Channel.BUFFERED */ fun channelTest() = runBlocking { val channel = Channel<Int>(Channel.BUFFERED) launch { for (i in 1..4) { channel.send(i) println("send: $i") } channel.close() } launch { for (i in channel) { println("receiver: $i") } } println("end") } //輸出結果: //end //send: 1 //send: 2 //send: 3 //send: 4 //receiver: 1 //receiver: 2 //receiver: 3 //receiver: 4
再看一下onBufferOverflow
在其他類型下的區別
/** * capacity = 3,onBufferOverflow = BufferOverflow.DROP_OLDEST * 緩沖區設置為3,緩沖區溢出時刪除最舊的值,向緩沖區添加新值 */ fun channelTest() = runBlocking { val channel = Channel<Int>( capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST ) launch { for (i in 1..4) { channel.send(i) println("send: $i") } channel.close() } launch { for (i in channel) { println("receiver: $i") } } println("end") } //輸出結果: //end //send: 1 //send: 2 //send: 3 //send: 4 //receiver: 2 //receiver: 3 //receiver: 4 /** * capacity = 3,onBufferOverflow = BufferOverflow.DROP_LATEST * 緩沖區設置為3,緩沖區溢出時立即刪除正在添加到緩沖區的最新值 */ fun channelTest() = runBlocking { val channel = Channel<Int>( capacity = 3, onBufferOverflow = BufferOverflow.DROP_LATEST ) launch { for (i in 1..4) { channel.send(i) println("send: $i") } channel.close() } launch { for (i in channel) { println("receiver: $i") } } println("end") } //輸出結果: //end //send: 1 //send: 2 //send: 3 //send: 4 //receiver: 1 //receiver: 2 //receiver: 3
再看一下onUndeliveredElement
要如何使用
/** * capacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST, onUndeliveredElement * 緩沖區設置為2,緩沖區溢出時立即刪除正在添加到緩沖區的最新值 * 接收一個數據后取消接收其他數據 */ fun channelTest() = runBlocking { val channel = Channel<Int>( capacity = 2, onBufferOverflow = BufferOverflow.DROP_LATEST, onUndeliveredElement = { println("onUndeliveredElement: $it") } ) launch { for (i in 1..4) { channel.send(i) println("send: $i") } } println("receive:${channel.receive()}") channel.cancel() } //輸出結果: //send: 1 //send: 2 //send: 3 //send: 4 //receive:1 //onUndeliveredElement: 2 //onUndeliveredElement: 3
上面的代碼容量設置為2,緩沖策略是刪除正在添加到緩沖區的最新值,接收一個數據后立即取消接收其他數據,也就是說接收到了【send: 1】的數據【receive:1】,【send: 4】的數據被緩沖策略刪除了,由于接收消息的同道已經被取消了那么【send: 2】和【send: 3】的數據就只能在異常中被處理,從輸出結果就可以看到。
從上面的代碼示例可以總結出它的應用場景:接收方很關心數據是否被消費,例如企業微信、釘釘的消息是否已讀的狀態,對于異常處理那塊的場景就像是發送消息過程中消息沒有被發送出去,那么接收方就無法接受到這個消息。
2.Channel使用中的細節
前面在使用Channel
時為了讓程序終止在發送完成后調用了channel.close()
,但是這個很容易被忘記,忘記添加就會造成程序無法終止的問題,那么Produce
就誕生了,它是一個高階函數。
fun produceTest() = runBlocking { val channel: ReceiveChannel<Int> = produce { for (i in 1..4) { send(i) } } launch { for (i in channel) { println("receive: $i") } } println("end") } //輸出結果: //end //receive: 1 //receive: 2 //receive: 3 //receive: 4 //Process finished with exit code 0
可以看到沒有加入close
代碼就可以正常結束,上面發送了4條數據,那么我要是接收5條數據會不會有什么問題?
fun produceTest() = runBlocking { val channel: ReceiveChannel<Int> = produce { for (i in 1..4) { send(i) } } println("receive: ${channel.receive()}") println("receive: ${channel.receive()}") println("receive: ${channel.receive()}") println("receive: ${channel.receive()}") println("receive: ${channel.receive()}") println("end") } //輸出結果: //receive: 1 //receive: 2 //receive: 3 //receive: 4 //ClosedReceiveChannelException: Channel was closed
可以看到當我接收第5條數據的時候報出channel
被關閉的提示,也就是說produce
確實會在消息發送完畢后關閉通道。
業務開發中有可能我們確實需要對channel
發送的消息進行單獨處理,那么也許并不知道具體發送了幾條數據,如果接收數據數量超過發送數據數量就會出現錯誤,那有沒有像isClose
這類的方法可以在接收前判斷是否被關閉呢?有的,在Channel
中還有兩個變量:
//如果該通道已通過調用[close]關閉,則返回' true '。這意味著調用[send]將導致異常。 public val isClosedForSend: Boolean //如果通過在SendChannel端調用close關閉了此通道, //并且已經接收到以前發送的所有項目,則返回true。 public val isClosedForReceive: Boolean
那么安全的調用channel.receive()
接收就可以這么寫
fun produceTest() = runBlocking { val channel: ReceiveChannel<Int> = produce(capacity = 3) { (1..4).forEach { send(it) println("Send $it") } } while (!channel.isClosedForReceive) { println("receive: ${channel.receive()}") } println("end") } //輸出結果: //Send 1 //Send 2 //Send 3 //Send 4 //receive: 1 //receive: 2 //receive: 3 //receive: 4 //end
但是這里會有一個問題,不定義capacity
的數量
fun produceTest() = runBlocking { // 變化在這里 // ↓ val channel: ReceiveChannel<Int> = produce { (1..4).forEach { send(it) println("Send $it") } } while (!channel.isClosedForReceive) { println("receive: ${channel.receive()}") } println("end") } //輸出結果: //Send 1 //receive: 1 //receive: 2 //Send 2 //Send 3 //receive: 3 //receive: 4 //Send 4 // //ClosedReceiveChannelException: Channel was closed
可以看到send
發送的數據全部都被接收了,但是還是報出channel
被關閉的錯誤,原因在注釋中已經寫明:如果通過在SendChannel端調用close關閉了此通道,并且已經接收到以前發送的所有項目,則返回true。
這意味著調用receive將導致closereceivechannelexception。 所以channel.receive()
要慎用。可以用channel.consumeEach
代替
fun produceTest() = runBlocking { val channel: ReceiveChannel<Int> = produce { (1..4).forEach { send(it) println("Send $it") } } //變化在這里 channel.consumeEach { println("receive: $it") } println("end") } //輸出結果: //Send 1 //receive: 1 //receive: 2 //Send 2 //Send 3 //receive: 3 //receive: 4 //Send 4 //end
3.Channe的特點
Channel
主要你用來傳遞數據流的,這個數據流指的是多個數據組合形成別的流,與它形成鮮明對比的是async
、掛起函數。
數據流的傳輸,有發送就有接收,而Channel
是完全符合這一點的。發送與接收存在兩種情況:
- 數據流的發送了但是還沒有被接收,沒有接收則不再進行發送消息,例如文件的傳輸;
- 數據流的發送了不管有沒有被接收,都要繼續發送消息,例如微信聊天。
Channel
符合第二個結論,無論發送的數據是否被消費或者說被接收,Channel
都會進行工作。我們來證明一下這個結論。
/** * 消息容量為10,發送4條數據 * 無論消息是否被接收都會吧消息發送完畢 */ fun produceTest() = runBlocking { val channel: ReceiveChannel<Int> = produce(capacity = 10) { (1..4).forEach { send(it) println("Send $it") } } println("end") } //輸出結果: //end //Send 1 //Send 2 //Send 3 //Send 4 /** * 消息容量改為默認,默認值時0,發送4條數據 * Channel依舊是在工作的,只是說在調用send方法的時候 * 接收方還沒有準備完畢且容量為0,所以會被掛起,程序一直無法退出 */ fun produceTest() = runBlocking { val channel: ReceiveChannel<Int> = produce { (1..4).forEach { send(it) println("Send $it") } } println("end") } //輸出結果: //end //程序沒有結束
通過上面的代碼引出一個結論:Channel
是“熱” 的。不管接收方是否存在,Channel
是一定會工作的。類似于自來水廠向像居民提供水源,發電廠向居民提供電能。
原文鏈接:https://juejin.cn/post/7173833391165407268
相關推薦
- 2023-05-30 關于keras中卷積層Conv2D的學習記錄_python
- 2022-10-24 C#?Winform實現自定義漂亮的通知效果_C#教程
- 2023-07-08 qt修改默認構建路徑
- 2022-06-16 golang?validator庫參數校驗實用技巧干貨_Golang
- 2022-08-17 R語言學習VennDiagram包繪制韋恩圖示例_R語言
- 2021-12-19 C/C++?Qt?TabWidget?實現多窗體創建詳解_C 語言
- 2023-03-02 SQLServer?清理日志的實現_MsSql
- 2022-09-16 numpy.reshape(-1,1)的具體使用_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支