日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Kotlin協程Channel特點及使用細節詳解_Android

作者:無糖可樂愛好者 ? 更新時間: 2023-01-05 編程語言

正文

在協程啟動模式中已經知道async是可以返回結果的,但是只返回一個,那么在復雜場景下就會不夠用了,所以Channel就出現了。

1.認識Channel

Channel的意思是管道、通道,用圖表示如下:

Channel的左邊是發送方,右邊是接收方,中間則是消息,那么代碼表示就是下面這樣:

fun main() {
    channelTest()
}
fun channelTest() = runBlocking {
    val channel = Channel<Int>()            //關鍵點①
    launch {
        for (i in 1..3) {
            channel.send(i)                 //關鍵點②
            logX("send: $i")
        }
    }
    launch {
        for (i in channel) {                //關鍵點③
            logX("receiver: $i")
        }
    }
    logX("end")
}
//輸出結果:
//================================
//end 
//Thread:main @coroutine#1
//================================
//================================
//receiver: 1 
//Thread:main @coroutine#3
//================================
//================================
//send: 1 
//Thread:main @coroutine#2
//================================
//================================
//send: 2 
//Thread:main @coroutine#2
//================================
//================================
//receiver: 2 
//Thread:main @coroutine#3
//================================
//================================
//receiver: 3 
//Thread:main @coroutine#3
//================================
//================================
//send: 3 
//Thread:main @coroutine#2
//================================

上面的代碼中啟動了兩個協程,一個發送,一個接收,還有幾個關鍵點:

  • 關鍵點①:通過Channel創建一個管道,其中泛型Int表示發送的數據類型;
  • 關鍵點②:啟動一個協程通過send發送數據,send是一個掛起函數;
  • 關鍵點③:啟動一個協程遍歷channel打印出接收到的消息。

那么這里還有一個問題,在執行完上述代碼后程序并沒有終止,那要如何終止程序呢?

很簡單,在發送完所有消息后調用close方法即可。

launch {
        for (i in 1..3) {
            channel.send(i)                 //關鍵點②
            logX("send: $i")
        }
//			修改點
//			  ↓
        channel.close()
    }

Channel也是一種協程資源,用完后如果不關閉那么這個資源就會一直被占用。

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {
             ...
        }
        CONFLATED -> {
           ...
        }
        UNLIMITED -> {
            ...
        }
        BUFFERED -> { 
            ...
        }
        else -> {
            ...
        }
    }

Channel中有三個參數:

  • capacity 代表管道的容量,默認值為RENDEZVOUS,代表容量為0,除此之外還有三個類型:
  • CONFLATED:代表容量為1,新的數據會替代舊的數據;
  • UNLIMITED:代表無限容量;
  • BUFFERED:代表具備一定緩存的容量,默認情況下是64,具體容量由VM參數kotlinx.coroutines.channels.defaultBuffer決定。
  • onBufferOverflow 代表緩沖策略,也就是當緩沖的容量滿了之后要怎么做。默認值為SUSPEND,表示在緩沖區溢出時掛起。除此之外還有兩個類型:
  • DROP_OLDEST:在緩沖區溢出時刪除最舊的值,向緩沖區添加新值,不要掛起;

  • DROP_LATEST:在緩沖區溢出時,立即刪除正在添加到緩沖區的最新值(以便緩沖區內容保持不變),不要掛起。

  • onUndeliveredElement 它相當于一個異常處理回調。當管道中的某些數據沒有被成功接收的時候,這個回調就會被調用

現在寫個案例看一下capacity在其他類型下的區別

/**
 * Channel.CONFLATED
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(Channel.CONFLATED)
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
        channel.close()
    }
    launch {
        for (i in channel) {
            println("receiver: $i")
        }
    }
    println("end")
}
//輸出結果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 4
/**
 * Channel.UNLIMITED
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(Channel.UNLIMITED)
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
        channel.close()
    }
    launch {
        for (i in channel) {
            println("receiver: $i")
        }
    }
    println("end")
}
//輸出結果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
//receiver: 4
/**
 * Channel.BUFFERED
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(Channel.BUFFERED)
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
        channel.close()
    }
    launch {
        for (i in channel) {
            println("receiver: $i")
        }
    }
    println("end")
}
//輸出結果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3
//receiver: 4

再看一下onBufferOverflow在其他類型下的區別

/**
 * capacity = 3,onBufferOverflow = BufferOverflow.DROP_OLDEST
 * 緩沖區設置為3,緩沖區溢出時刪除最舊的值,向緩沖區添加新值
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(
        capacity = 3,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
        channel.close()
    }
    launch {
        for (i in channel) {
            println("receiver: $i")
        }
    }
    println("end")
}
//輸出結果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 2
//receiver: 3
//receiver: 4
/**
 * capacity = 3,onBufferOverflow = BufferOverflow.DROP_LATEST
 * 緩沖區設置為3,緩沖區溢出時立即刪除正在添加到緩沖區的最新值
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(
        capacity = 3,
        onBufferOverflow = BufferOverflow.DROP_LATEST
    )
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
        channel.close()
    }
    launch {
        for (i in channel) {
            println("receiver: $i")
        }
    }
    println("end")
}
//輸出結果:
//end
//send: 1
//send: 2
//send: 3
//send: 4
//receiver: 1
//receiver: 2
//receiver: 3

再看一下onUndeliveredElement要如何使用

/**
 * capacity = 2,onBufferOverflow = BufferOverflow.DROP_LATEST, onUndeliveredElement
 * 緩沖區設置為2,緩沖區溢出時立即刪除正在添加到緩沖區的最新值
 * 接收一個數據后取消接收其他數據
 */
fun channelTest() = runBlocking {
    val channel = Channel<Int>(
        capacity = 2,
        onBufferOverflow = BufferOverflow.DROP_LATEST,
        onUndeliveredElement = {
            println("onUndeliveredElement: $it")
        }
    )
    launch {
        for (i in 1..4) {
            channel.send(i)
            println("send: $i")
        }
    }
    println("receive:${channel.receive()}")
    channel.cancel()
}
//輸出結果:
//send: 1
//send: 2
//send: 3
//send: 4
//receive:1
//onUndeliveredElement: 2
//onUndeliveredElement: 3

上面的代碼容量設置為2,緩沖策略是刪除正在添加到緩沖區的最新值,接收一個數據后立即取消接收其他數據,也就是說接收到了【send: 1】的數據【receive:1】,【send: 4】的數據被緩沖策略刪除了,由于接收消息的同道已經被取消了那么【send: 2】和【send: 3】的數據就只能在異常中被處理,從輸出結果就可以看到。

從上面的代碼示例可以總結出它的應用場景:接收方很關心數據是否被消費,例如企業微信、釘釘的消息是否已讀的狀態,對于異常處理那塊的場景就像是發送消息過程中消息沒有被發送出去,那么接收方就無法接受到這個消息。

2.Channel使用中的細節

前面在使用Channel時為了讓程序終止在發送完成后調用了channel.close(),但是這個很容易被忘記,忘記添加就會造成程序無法終止的問題,那么Produce就誕生了,它是一個高階函數。

fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce {
        for (i in 1..4) {
            send(i)
        }
    }
    launch {
        for (i in channel) {
            println("receive: $i")
        }
    }
    println("end")
}
//輸出結果:
//end
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//Process finished with exit code 0

可以看到沒有加入close代碼就可以正常結束,上面發送了4條數據,那么我要是接收5條數據會不會有什么問題?

fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce {
        for (i in 1..4) {
            send(i)
        }
    }
    println("receive: ${channel.receive()}")
    println("receive: ${channel.receive()}")
    println("receive: ${channel.receive()}")
    println("receive: ${channel.receive()}")
    println("receive: ${channel.receive()}")
    println("end")
}
//輸出結果:
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//ClosedReceiveChannelException: Channel was closed

可以看到當我接收第5條數據的時候報出channel被關閉的提示,也就是說produce確實會在消息發送完畢后關閉通道。

業務開發中有可能我們確實需要對channel發送的消息進行單獨處理,那么也許并不知道具體發送了幾條數據,如果接收數據數量超過發送數據數量就會出現錯誤,那有沒有像isClose這類的方法可以在接收前判斷是否被關閉呢?有的,在Channel中還有兩個變量:

//如果該通道已通過調用[close]關閉,則返回' true '。這意味著調用[send]將導致異常。
public val isClosedForSend: Boolean
//如果通過在SendChannel端調用close關閉了此通道,
//并且已經接收到以前發送的所有項目,則返回true。
public val isClosedForReceive: Boolean

那么安全的調用channel.receive()接收就可以這么寫

fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce(capacity = 3) {
        (1..4).forEach {
            send(it)
            println("Send $it")
        }
    }
    while (!channel.isClosedForReceive) {
        println("receive: ${channel.receive()}")
    }
    println("end")
}
//輸出結果:
//Send 1
//Send 2
//Send 3
//Send 4
//receive: 1
//receive: 2
//receive: 3
//receive: 4
//end

但是這里會有一個問題,不定義capacity的數量

fun produceTest() = runBlocking {
    //										變化在這里
    //											↓
    val channel: ReceiveChannel<Int> = produce {
        (1..4).forEach {
            send(it)
            println("Send $it")
        }
    }
    while (!channel.isClosedForReceive) {
        println("receive: ${channel.receive()}")
    }
    println("end")
}
//輸出結果:
//Send 1
//receive: 1
//receive: 2
//Send 2
//Send 3
//receive: 3
//receive: 4
//Send 4
//
//ClosedReceiveChannelException: Channel was closed

可以看到send發送的數據全部都被接收了,但是還是報出channel被關閉的錯誤,原因在注釋中已經寫明:如果通過在SendChannel端調用close關閉了此通道,并且已經接收到以前發送的所有項目,則返回true。

這意味著調用receive將導致closereceivechannelexception。 所以channel.receive()要慎用。可以用channel.consumeEach代替

fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce {
        (1..4).forEach {
            send(it)
            println("Send $it")
        }
    }
    //變化在這里    
    channel.consumeEach {
        println("receive: $it")
    }
    println("end")
}
//輸出結果:
//Send 1
//receive: 1
//receive: 2
//Send 2
//Send 3
//receive: 3
//receive: 4
//Send 4
//end

3.Channe的特點

Channel主要你用來傳遞數據流的,這個數據流指的是多個數據組合形成別的流,與它形成鮮明對比的是async、掛起函數。

數據流的傳輸,有發送就有接收,而Channel是完全符合這一點的。發送與接收存在兩種情況:

  • 數據流的發送了但是還沒有被接收,沒有接收則不再進行發送消息,例如文件的傳輸;
  • 數據流的發送了不管有沒有被接收,都要繼續發送消息,例如微信聊天。

Channel符合第二個結論,無論發送的數據是否被消費或者說被接收,Channel都會進行工作。我們來證明一下這個結論。

/**
 * 消息容量為10,發送4條數據
 * 無論消息是否被接收都會吧消息發送完畢
 */
fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce(capacity = 10) {
        (1..4).forEach {
            send(it)
            println("Send $it")
        }
    }
    println("end")
}
//輸出結果:
//end
//Send 1
//Send 2
//Send 3
//Send 4
/**
 * 消息容量改為默認,默認值時0,發送4條數據
 * Channel依舊是在工作的,只是說在調用send方法的時候
 * 接收方還沒有準備完畢且容量為0,所以會被掛起,程序一直無法退出
 */
fun produceTest() = runBlocking {
    val channel: ReceiveChannel<Int> = produce {
        (1..4).forEach {
            send(it)
            println("Send $it")
        }
    }
    println("end")
}
//輸出結果:
//end
//程序沒有結束

通過上面的代碼引出一個結論:Channel是“熱” 的。不管接收方是否存在,Channel是一定會工作的。類似于自來水廠向像居民提供水源,發電廠向居民提供電能。

原文鏈接:https://juejin.cn/post/7173833391165407268

欄目分類
最近更新