網站首頁 編程語言 正文
結論先行
Kotlin協程中的Channel用于處理多個數據組合的流,隨用隨取,時刻準備著,就像自來水一樣,打開開關就有水了。
Channel使用示例
fun main() = runBlocking {
logX("開始")
val channel = Channel<Int> { }
launch {
(1..3).forEach{
channel.send(it)
logX("發送數據: $it")
}
// 關閉channel, 節省資源
channel.close()
}
launch {
for (i in channel){
logX("接收數據: $i")
}
}
logX("結束")
}
示例代碼 使用Channel創建了一組int類型的數據流,通過send發送數據,并通過for循環取出channel中的數據,最后channel是一種協程資源,使用結束后應該及時調用close方法關閉,以免浪費不必要的資源。
Channel的源碼
public fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
when (capacity) {
RENDEZVOUS -> {}
CONFLATED -> {}
UNLIMITED -> {}
else -> {}
}
可以看到Channel的構造函數包含了三個參數,分別是capacity、onBufferOverflow、onUndeliveredElement.
首先看capacity,這個參數代表了管道的容量,默認參數是RENDEZVOUS,取值是0,還有其他一些值:
- UNLIMITED: Int = Int.MAX_VALUE,沒有限量
- CONFLATED: 容量為1,新的覆蓋舊的值
- BUFFERED: 添加緩沖容量,默認值是64,可以通過修改VM參數:kotlinx.coroutines.channels.defaultBuffer,進行修改
接下來看onBufferOverflow, 顧名思義就是管道容量滿了,怎么辦?默認是掛起,也就是suspend,一共有三種分別是:
SUSPNED、DROP_OLDEST以及DROP_LATEST
public enum class BufferOverflow {
/**
* Suspend on buffer overflow.
*/
SUSPEND,
/**
* Drop **the oldest** value in the buffer on overflow, add the new value to the buffer, do not suspend.
*/
DROP_OLDEST,
/**
* Drop **the latest** value that is being added to the buffer right now on buffer overflow
* (so that buffer contents stay the same), do not suspend.
*/
DROP_LATEST
}
- SUSPEND,當管道的容量滿了以后,如果發送方還要繼續發送,我們就會掛起當前的 send() 方法。由于它是一個掛起函數,所以我們可以以非阻塞的方式,將發送方的執行流程掛起,等管道中有了空閑位置以后再恢復,有點像生產者-消費者模型
- DROP_OLDEST,顧名思義,就是丟棄最舊的那條數據,然后發送新的數據,有點像LRU算法。
- DROP_LATEST,丟棄最新的那條數據。這里要注意,這個動作的含義是丟棄當前正準備發送的那條數據,而管道中的內容將維持不變。
最后一個參數是onUndeliveredElement,從名字看像是沒有投遞成功的回調,也確實如此,當管道中某些數據沒有成功接收時,這個就會被調用。
綜合這個參數使用一下
fun main() = runBlocking {
println("開始")
val channel = Channel<Int>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
println("onUndeliveredElement = $it")
}
launch {
(1..3).forEach{
channel.send(it)
println("發送數據: $it")
}
// 關閉channel, 節省資源
channel.close()
}
launch {
for (i in channel){
println("接收數據: $i")
}
}
println("結束")
}
輸出結果如下:
開始
結束
發送數據: 1
發送數據: 2
發送數據: 3
接收數據: 2
接收數據: 3
安全的從Channel中取數據
先看一個例子
val channel: ReceiveChannel<Int> = produce {
(1..100).forEach{
send(it)
println("發送: $it")
}
}
while (!channel.isClosedForReceive){
val i = channel.receive();
println("接收: $i")
}
輸出報錯信息:
Exception in thread "main" kotlinx.coroutines.channels.ClosedReceiveChannelException: Channel was closed
可以看到使用isClosedForReceive判斷是否關閉再使用receive方法接收數據,依然會報錯,所以不推薦使用這種方式。
推薦使用上面for循環的方式取數據,還有kotlin推薦的consumeEach方式,看一下示例代碼
val channel: ReceiveChannel<Int> = produce {
(1..100).forEach{
send(it)
println("發送: $it")
}
}
channel.consumeEach {
println("接收:$it")
}
所以,當我們想要獲取Channel當中的數據時,我們盡量使用 for 循環,或者是channel.consumeEach {},不要直接調用channel.receive()。
熱的數據流從何而來
先看一下代碼
println("開始")
val channel = Channel<Int>(capacity = 3, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
println("onUndeliveredElement = $it")
}
launch {
(1..3).forEach{
channel.send(it)
println("發送數據: $it")
}
}
println("結束")
}
輸出:
開始
結束
發送數據: 1
發送數據: 2
發送數據: 3
可以看到上述代碼中并沒有 取channel中的數據,但是發送的代碼正常執行了,這種“不管有沒有接收方,發送方都會工作”的模式,就是我們將其認定為“熱”的原因。
舉個例子,就像去海底撈吃火鍋一樣,你不需要主動要求服務員加水,服務員看到你的杯子中水少了,會自動給你添加,你只管拿起水杯喝水就行了。
總的來說,不管接收方是否存在,Channel 的發送方一定會工作。
Channel能力的來源
通過源碼可以看到Channel只是一個接口,它的能力來源于SendChannel和ReceiveChannel,一個發送管道,一個接收管道,相當于做了一個組合。
這也是一種良好的設計思想,“對讀取開放,對寫入封閉”的開閉原則。
原文鏈接:https://blog.csdn.net/wayne214/article/details/127998494
相關推薦
- 2023-06-18 Go語言實現關閉http請求的方式總結_Golang
- 2022-07-16 淺談常見的加密算法
- 2022-08-01 C語言深入探索遞歸的特點_C 語言
- 2022-10-04 Nginx?502?bad?gateway錯誤解決的九種方案及原因_nginx
- 2022-09-05 基于React?Context實現一個簡單的狀態管理的示例代碼_React
- 2022-04-24 C語言中的時間函數clock()和time()你都了解嗎_C 語言
- 2022-09-28 k8s證書有效期時間修改的方法詳解_云其它
- 2022-03-24 C++關于指針,繼承和多態介紹_C 語言
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支