日本免费高清视频-国产福利视频导航-黄色在线播放国产-天天操天天操天天操天天操|www.shdianci.com

學無先后,達者為師

網站首頁 編程語言 正文

Kotlin?Select協程多路復用的實現詳解_Android

作者:小魚人愛編程 ? 更新時間: 2022-11-05 編程語言

前言

協程通信三劍客:Channel、Select、Flow,本篇將會重點分析Select的使用及原理。

通過本篇文章,你將了解到:

  • Select 的引入
  • Select 的使用
  • Invoke函數 的妙用
  • Select 的原理
  • Select 注意事項

1. Select 的引入

多路數據的選擇

串行執行

如今的二維碼識別應用場景越來越廣了,早期應用比較廣泛的識別SDK如zxing、zbar,它們各有各的特點,也存在識別不出來的情況,為了將兩者優勢結合起來,我們想到的方法是同一份二維碼圖片分別給兩者進行識別。

如下:

    //從zxing 獲取二維碼信息
    suspend fun getQrcodeInfoFromZxing(bitmap: Bitmap?): String {
        //模擬耗時
        delay(2000)
        return "I'm fish"
    }
    //從zbar 獲取二維碼信息
    suspend fun getQrcodeInfoFromZbar(bitmap: Bitmap?): String {
        delay(1000)
        return "I'm fish"
    }
    fun testSelect() {
        runBlocking {
            var bitmap = null
            var starTime = System.currentTimeMillis()
            var qrcoe1 = getQrcodeInfoFromZxing(bitmap)
            var qrcode2 = getQrcodeInfoFromZbar(bitmap)
            println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
        }
    }

查看打印,最后花費的時間:

qrcode1=I’m fish qrcode2=I’m fish useTime:3013 ms

當然這是串行的方式效率比較低,我們想到了用協程來優化它。

協程并行執行

如下:

    fun testSelect1() {
        var bitmap = null;
        var starTime = System.currentTimeMillis()
        var deferredZxing = GlobalScope.async {
            getQrcodeInfoFromZxing(bitmap)
        }
        var deferredZbar = GlobalScope.async {
            getQrcodeInfoFromZbar(bitmap)
        }
        runBlocking {
            //掛起等待識別結果
            var qrcoe1 = deferredZxing.await()
            //掛起等待識別結果
            var qrcode2 = deferredZbar.await()
            println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
        }
    }

查看打印,最后花費的時間:

qrcode1=I’m fish qrcode2=I’m fish useTime:2084 ms

可以看出,花費時間明顯變少了。

與上個Demo 相比,雖然識別過程是放在協程里并行執行的,但是在等待識別結果卻是串行的。我們引入兩個識別庫的初衷是哪個識別快就用哪個的結果,為了達成這個目的,傳統的方式是:

同時監聽并記錄識別結果的返回。

同時監聽多路結果

如下:

    fun testSelect2() {
        var bitmap = null;
        var starTime = System.currentTimeMillis()
        var deferredZxing = GlobalScope.async {
            getQrcodeInfoFromZxing(bitmap)
        }
        var deferredZbar = GlobalScope.async {
            getQrcodeInfoFromZbar(bitmap)
        }
        var isEnd = false
        var result: String? = null
        GlobalScope.launch {
            if (!isEnd) {
                //沒有結束,則繼續識別
                var resultTmp = deferredZxing.await()
                if (!isEnd) {
                    //識別沒有結束,說明自己是第一個返回結果的
                    result = resultTmp
                    println("zxing recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
                    //標記識別結束
                    isEnd = true
                }
            }
        }
        GlobalScope.launch {
            if (!isEnd) {
                var resultTmp = deferredZbar.await()
                if (!isEnd) {
                    //識別沒有結束,說明自己是第一個返回結果的
                    result = resultTmp
                    println("zbar recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
                    isEnd = true
                }
            }
        }
        //檢測是否有結果返回
        runBlocking {
            while (!isEnd) {
                delay(1)
            }
            println("recognize result:$result")
        }
    }

通過檢測isEnd 標記來判斷是否有某個模塊返回結果。

結果如下:

zbar recognize ok useTime:1070 ms

recognize result:I’m fish

由于模擬設定的zbar 解析速度快,因此每次都是采納的是zbar的結果,所花費的時間大幅減少了,該結果符合預期。

Select 閃亮登場

雖說上個Demo結果符合預期,但是多了很多額外的代碼、多引入了其它協程,并且需要子模塊對標記進行賦值(對"isEnd"進行賦值),沒有達到解耦的目的。我們希望子模塊的任務是單一且閉環的,如果能在一個函數里統一檢測結果的返回就好了。

Select 就是為了解決多路數據的選擇而生的。

來看看它是怎么解決該問題的:

    fun testSelect3() {
        var bitmap = null;
        var starTime = System.currentTimeMillis()
        var deferredZxing = GlobalScope.async {
            getQrcodeInfoFromZxing(bitmap)
        }
        var deferredZbar = GlobalScope.async {
            getQrcodeInfoFromZbar(bitmap)
        }
        runBlocking {
            //通過select 監聽zxing、zbar 結果返回
            var result = select<String> {
                //監聽zxing
                deferredZxing.onAwait {value->
                    //value 為deferredZxing 識別的結果
                    "zxing result $value"
                }
                //監聽zbar
                deferredZbar.onAwait { value->
                    "zbar result $value"
                }
            }
            //運行到此,說明已經有結果返回
            println("result from $result useTime:${System.currentTimeMillis() - starTime}")
        }
    }

結果如下:

result from zbar result I’m fish useTime:1079

符合預期,同時可以看出:相比上個Demo,這樣寫簡潔了許多。

2. Select 的使用

除了可以監聽async的結果,Select 還可以監聽Channel的發送方/接收方 數據,我們以監聽接收方數據為例:

    fun testSelect4() {
        runBlocking {
            var bitmap = null;
            var starTime = System.currentTimeMillis()
            var receiveChannelZxing = produce {
                //生產數據
                var result = getQrcodeInfoFromZxing(bitmap)
                //發送數據
                send(result)
            }
            var receiveChannelZbar = produce {
                var result = getQrcodeInfoFromZbar(bitmap)
                send(result)
            }
            var result = select<String> {
                //監聽是否有數據發送過來
                receiveChannelZxing.onReceive {
                    value->"zxing result $value"
                }
                receiveChannelZbar.onReceive {
                        value->"zbar result $value"
                }
            }
            println("result from $result useTime:${System.currentTimeMillis() - starTime}")
        }
    }

結果如下:

result from zbar result I’m fish useTime:1028

不論是async還是Channel,Select 都可以監聽它們的數據,從而形成多路復用的效果。

在監聽協程里調用select 表達式,表達式{}內聲明需要監聽的協程的數據,對于select 來說有兩種場景:

  • 沒有數據,則select 掛起協程并等待直到其它協程數據準備完成后再次恢復select 所在的協程。
  • 有數據,則select 正常執行并返回獲取的數據。

3. Invoke函數的妙用

在分析Select 原理之前,需要弄明白invoke函數的原理。

對于Kotlin 類來說,都可以重寫其invoke函數。

    operator fun invoke():String {
        return "I'm fish"
    }

如上,重寫了SelectDemo里的invoke函數,和普通成員函數一樣,我們可以通過對象調用它。

fun main(args: Array<String>) {
    var selectDemo = SelectDemo()
    var result = selectDemo.invoke()
    println("result:$result")
}

當然,可以進一步簡化:

fun main(args: Array<String>) {
    var selectDemo = SelectDemo()
    var result = selectDemo()
    println("result:$result")
}

這里涉及到了kotlin的語法糖:對象居然可以像函數一樣調用。

作為函數,invoke 當然也可以接收高階函數作為參數:

    operator fun invoke(block: (Int) -> String): String {
        return block(3)
    }
fun main(args: Array<String>) {
    var selectDemo = SelectDemo()
    var result = selectDemo { age ->
        when (age) {
            3 -> "I'm fish3"
            4 -> "I'm fish4"
            else -> "error"
        }
    }
    println("result:$result")
}

因此,當看到對象作為函數調用時,實際上調用的是invoke函數,具體的邏輯需要查看其invoke函數的實現。

4. Select 的原理

上篇分析過Channel,因此本篇趁熱打鐵,通過Select 監聽Channel數據的變化來分析其原理,為方便講解,我們先以監聽一個Channel的為例。

先從select 表達式本身入手。

    fun testSelect5() {
        runBlocking {
            var starTime = System.currentTimeMillis()
            var receiveChannelZxing = produce {
                //發送數據
                send("I'm fish")
            }
            //確保channel 數據已經send
            delay(1000)
            var result = select<String> {
                //監聽是否有數據發送過來
                receiveChannelZxing.onReceive { value ->
                    "zxing result $value"
                }
            }
            println("result from $result useTime:${System.currentTimeMillis() - starTime}")
        }
    }

select 是掛起函數,因此協程運行到此有可能被掛起。

#Select.kt
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
    //...
    return suspendCoroutineUninterceptedOrReturn { uCont ->
        //傳入父協程體
        val scope = SelectBuilderImpl(uCont)
        try {
            //執行builder
            builder(scope)
        } catch (e: Throwable) {
            scope.handleBuilderException(e)
        }
        //通過返回值判斷是否需要掛起協程
        scope.getResult()
    }
}

重點看builder(scope),builder 是高階函數,實際上就是執行了select花括號里的內容,而它里面就是監聽數據是否返回。

receiveChannelZxing.onReceive

剛開始看的時候勢必以為onReceive是個函數,然而它是ReceiveChannel 里的成員變量:

#Channel.kt
    public val onReceive: SelectClause1<E>

通過上一節的分析可知,關鍵是要找到SelectClause1 的invoke的實現。

#Select.kt
public interface SelectBuilder<in R> {
    //block 有個入參
    //聲明了SelectClause1的擴展函數invoke
    public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
}
override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
    //SelectBuilderImpl 實現了 SelectClause1 的invoke函數
    registerSelectClause1(this@SelectBuilderImpl, block)
}

再看onReceive 的賦值:

#AbstractChannel.kt
final override val onReceive: SelectClause1<E>
    get() = object : SelectClause1<E> {
        @Suppress("UNCHECKED_CAST")
        override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
            registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R)
        }
    }

因此,簡單總結調用棧如下:

當調用receiveChannelZxing.onReceive{},實際上調用了SelectClause1.invoke(),而它里面又調用了SelectClause1.registerSelectClause1(),最終調用了AbstractChannel.registerSelectReceiveMode。

AbstractChannel. registerSelectReceiveMode

#AbstractChannel.kt
private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
    while (true) {
        //如果已經有結果了,則直接返回------->①
        if (select.isSelected) return
        if (isEmptyImpl) {
            //沒有發送者在等待,則入隊等待,并返回 ------->②
            if (enqueueReceiveSelect(select, block, receiveMode)) return
        } else {
            //直接取出值------->③
            val pollResult = pollSelectInternal(select)
            when {
                pollResult === ALREADY_SELECTED -> return
                pollResult === POLL_FAILED -> {} // retry
                pollResult === RETRY_ATOMIC -> {} // retry
                //調用block------->④
                else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
            }
        }
    }
}

分為4個點,接著來一一分析。

①select 同時監聽多個值,若是有1個符合要求的數據返回了,那么該isSelected 標記為true,當檢測到該標記為true時直接退出。

結合之前的Demo,zbar 已經識別出結果了,當select 檢測zxing的結果時直接返回。

②:

#AbstractChannel.kt
private fun <R> enqueueReceiveSelect(
    select: SelectInstance<R>,
    block: suspend (Any?) -> R,
    receiveMode: Int
): Boolean {
    //構造為Node元素
    val node = AbstractChannel.ReceiveSelect(this, select, block, receiveMode)
    //添加到Channel隊列里
    val result = enqueueReceive(node)
    if (result) select.disposeOnSelect(node)
    return result
}

當select 時,發現Channel里沒有數據,說明Channel還沒有開始send,因此構造了Node(ReceiveSelect)加入到Channel queue里。當send數據時,會查找queue里是否有接收者等待,若有則調用Node(ReceiveSelect.completeResumeReceive):

#AbstractChannel.kt
        override fun completeResumeReceive(value: E) {
            block.startCoroutineCancellable(
                if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
                select.completion,
                resumeOnCancellationFun(value)
            )
        }

block 被調度執行,最后會恢復select 協程的執行。

③取出數據,并嘗試恢復send協程。

④在③的基礎上,拿到數據后,直接執行block(此時并沒有切換線程進行調度)。

小結一下select 原理:

可以看出:

select 本身執行并不耗時,若最終沒有數據返回則掛起等待,若是有數據返回則不會掛起協程。

我們從頭再捋一下select 配合Channel 的原理:

雖然以Channel為例講解了select 原理,實際上async等結合select 原理大致差不多,重點都是利用了協程的掛起/恢復做文章。

5. Select注意事項

如果select有多個數據同時到達,select 默認會選擇第一個數據,若想要隨機選擇數據,可做如下處理:

            var result = selectUnbiased<String> {
                //監聽是否有數據發送過來
                receiveChannelZxing.onReceive { value ->
                    "zxing result $value"
                }
            }

想要知道select 還可以監聽哪些數據,可查看該數據是否實現了SelectClauseX(X 表示0、1、2)。

以上即為Select 的原理及其使用,下篇將會進入協程的精華部分:Flow的運用,該部分內容較多,可能會分幾篇分析,敬請期待。

本文基于Kotlin 1.5.3,文中完整Demo傳送門

原文鏈接:https://blog.csdn.net/wekajava/article/details/126808287

欄目分類
最近更新