網站首頁 編程語言 正文
前言
協程通信三劍客:Channel、Select、Flow,本篇將會重點分析Select的使用及原理。
通過本篇文章,你將了解到:
- Select 的引入
- Select 的使用
- Invoke函數 的妙用
- Select 的原理
- Select 注意事項
1. Select 的引入
多路數據的選擇
串行執行
如今的二維碼識別應用場景越來越廣了,早期應用比較廣泛的識別SDK如zxing、zbar,它們各有各的特點,也存在識別不出來的情況,為了將兩者優勢結合起來,我們想到的方法是同一份二維碼圖片分別給兩者進行識別。
如下:
//從zxing 獲取二維碼信息
suspend fun getQrcodeInfoFromZxing(bitmap: Bitmap?): String {
//模擬耗時
delay(2000)
return "I'm fish"
}
//從zbar 獲取二維碼信息
suspend fun getQrcodeInfoFromZbar(bitmap: Bitmap?): String {
delay(1000)
return "I'm fish"
}
fun testSelect() {
runBlocking {
var bitmap = null
var starTime = System.currentTimeMillis()
var qrcoe1 = getQrcodeInfoFromZxing(bitmap)
var qrcode2 = getQrcodeInfoFromZbar(bitmap)
println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
}
}
查看打印,最后花費的時間:
qrcode1=I’m fish qrcode2=I’m fish useTime:3013 ms
當然這是串行的方式效率比較低,我們想到了用協程來優化它。
協程并行執行
如下:
fun testSelect1() {
var bitmap = null;
var starTime = System.currentTimeMillis()
var deferredZxing = GlobalScope.async {
getQrcodeInfoFromZxing(bitmap)
}
var deferredZbar = GlobalScope.async {
getQrcodeInfoFromZbar(bitmap)
}
runBlocking {
//掛起等待識別結果
var qrcoe1 = deferredZxing.await()
//掛起等待識別結果
var qrcode2 = deferredZbar.await()
println("qrcode1=$qrcoe1 qrcode2=$qrcode2 useTime:${System.currentTimeMillis() - starTime} ms")
}
}
查看打印,最后花費的時間:
qrcode1=I’m fish qrcode2=I’m fish useTime:2084 ms
可以看出,花費時間明顯變少了。
與上個Demo 相比,雖然識別過程是放在協程里并行執行的,但是在等待識別結果卻是串行的。我們引入兩個識別庫的初衷是哪個識別快就用哪個的結果,為了達成這個目的,傳統的方式是:
同時監聽并記錄識別結果的返回。
同時監聽多路結果
如下:
fun testSelect2() {
var bitmap = null;
var starTime = System.currentTimeMillis()
var deferredZxing = GlobalScope.async {
getQrcodeInfoFromZxing(bitmap)
}
var deferredZbar = GlobalScope.async {
getQrcodeInfoFromZbar(bitmap)
}
var isEnd = false
var result: String? = null
GlobalScope.launch {
if (!isEnd) {
//沒有結束,則繼續識別
var resultTmp = deferredZxing.await()
if (!isEnd) {
//識別沒有結束,說明自己是第一個返回結果的
result = resultTmp
println("zxing recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
//標記識別結束
isEnd = true
}
}
}
GlobalScope.launch {
if (!isEnd) {
var resultTmp = deferredZbar.await()
if (!isEnd) {
//識別沒有結束,說明自己是第一個返回結果的
result = resultTmp
println("zbar recognize ok useTime:${System.currentTimeMillis() - starTime} ms")
isEnd = true
}
}
}
//檢測是否有結果返回
runBlocking {
while (!isEnd) {
delay(1)
}
println("recognize result:$result")
}
}
通過檢測isEnd 標記來判斷是否有某個模塊返回結果。
結果如下:
zbar recognize ok useTime:1070 ms
recognize result:I’m fish
由于模擬設定的zbar 解析速度快,因此每次都是采納的是zbar的結果,所花費的時間大幅減少了,該結果符合預期。
Select 閃亮登場
雖說上個Demo結果符合預期,但是多了很多額外的代碼、多引入了其它協程,并且需要子模塊對標記進行賦值(對"isEnd"進行賦值),沒有達到解耦的目的。我們希望子模塊的任務是單一且閉環的,如果能在一個函數里統一檢測結果的返回就好了。
Select 就是為了解決多路數據的選擇而生的。
來看看它是怎么解決該問題的:
fun testSelect3() {
var bitmap = null;
var starTime = System.currentTimeMillis()
var deferredZxing = GlobalScope.async {
getQrcodeInfoFromZxing(bitmap)
}
var deferredZbar = GlobalScope.async {
getQrcodeInfoFromZbar(bitmap)
}
runBlocking {
//通過select 監聽zxing、zbar 結果返回
var result = select<String> {
//監聽zxing
deferredZxing.onAwait {value->
//value 為deferredZxing 識別的結果
"zxing result $value"
}
//監聽zbar
deferredZbar.onAwait { value->
"zbar result $value"
}
}
//運行到此,說明已經有結果返回
println("result from $result useTime:${System.currentTimeMillis() - starTime}")
}
}
結果如下:
result from zbar result I’m fish useTime:1079
符合預期,同時可以看出:相比上個Demo,這樣寫簡潔了許多。
2. Select 的使用
除了可以監聽async的結果,Select 還可以監聽Channel的發送方/接收方 數據,我們以監聽接收方數據為例:
fun testSelect4() {
runBlocking {
var bitmap = null;
var starTime = System.currentTimeMillis()
var receiveChannelZxing = produce {
//生產數據
var result = getQrcodeInfoFromZxing(bitmap)
//發送數據
send(result)
}
var receiveChannelZbar = produce {
var result = getQrcodeInfoFromZbar(bitmap)
send(result)
}
var result = select<String> {
//監聽是否有數據發送過來
receiveChannelZxing.onReceive {
value->"zxing result $value"
}
receiveChannelZbar.onReceive {
value->"zbar result $value"
}
}
println("result from $result useTime:${System.currentTimeMillis() - starTime}")
}
}
結果如下:
result from zbar result I’m fish useTime:1028
不論是async還是Channel,Select 都可以監聽它們的數據,從而形成多路復用的效果。
在監聽協程里調用select 表達式,表達式{}內聲明需要監聽的協程的數據,對于select 來說有兩種場景:
- 沒有數據,則select 掛起協程并等待直到其它協程數據準備完成后再次恢復select 所在的協程。
- 有數據,則select 正常執行并返回獲取的數據。
3. Invoke函數的妙用
在分析Select 原理之前,需要弄明白invoke函數的原理。
對于Kotlin 類來說,都可以重寫其invoke函數。
operator fun invoke():String {
return "I'm fish"
}
如上,重寫了SelectDemo里的invoke函數,和普通成員函數一樣,我們可以通過對象調用它。
fun main(args: Array<String>) {
var selectDemo = SelectDemo()
var result = selectDemo.invoke()
println("result:$result")
}
當然,可以進一步簡化:
fun main(args: Array<String>) {
var selectDemo = SelectDemo()
var result = selectDemo()
println("result:$result")
}
這里涉及到了kotlin的語法糖:對象居然可以像函數一樣調用。
作為函數,invoke 當然也可以接收高階函數作為參數:
operator fun invoke(block: (Int) -> String): String {
return block(3)
}
fun main(args: Array<String>) {
var selectDemo = SelectDemo()
var result = selectDemo { age ->
when (age) {
3 -> "I'm fish3"
4 -> "I'm fish4"
else -> "error"
}
}
println("result:$result")
}
因此,當看到對象作為函數調用時,實際上調用的是invoke函數,具體的邏輯需要查看其invoke函數的實現。
4. Select 的原理
上篇分析過Channel,因此本篇趁熱打鐵,通過Select 監聽Channel數據的變化來分析其原理,為方便講解,我們先以監聽一個Channel的為例。
先從select 表達式本身入手。
fun testSelect5() {
runBlocking {
var starTime = System.currentTimeMillis()
var receiveChannelZxing = produce {
//發送數據
send("I'm fish")
}
//確保channel 數據已經send
delay(1000)
var result = select<String> {
//監聽是否有數據發送過來
receiveChannelZxing.onReceive { value ->
"zxing result $value"
}
}
println("result from $result useTime:${System.currentTimeMillis() - starTime}")
}
}
select 是掛起函數,因此協程運行到此有可能被掛起。
#Select.kt
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
//...
return suspendCoroutineUninterceptedOrReturn { uCont ->
//傳入父協程體
val scope = SelectBuilderImpl(uCont)
try {
//執行builder
builder(scope)
} catch (e: Throwable) {
scope.handleBuilderException(e)
}
//通過返回值判斷是否需要掛起協程
scope.getResult()
}
}
重點看builder(scope),builder 是高階函數,實際上就是執行了select花括號里的內容,而它里面就是監聽數據是否返回。
receiveChannelZxing.onReceive
剛開始看的時候勢必以為onReceive是個函數,然而它是ReceiveChannel 里的成員變量:
#Channel.kt
public val onReceive: SelectClause1<E>
通過上一節的分析可知,關鍵是要找到SelectClause1 的invoke的實現。
#Select.kt
public interface SelectBuilder<in R> {
//block 有個入參
//聲明了SelectClause1的擴展函數invoke
public operator fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R)
}
override fun <Q> SelectClause1<Q>.invoke(block: suspend (Q) -> R) {
//SelectBuilderImpl 實現了 SelectClause1 的invoke函數
registerSelectClause1(this@SelectBuilderImpl, block)
}
再看onReceive 的賦值:
#AbstractChannel.kt
final override val onReceive: SelectClause1<E>
get() = object : SelectClause1<E> {
@Suppress("UNCHECKED_CAST")
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (E) -> R) {
registerSelectReceiveMode(select, RECEIVE_THROWS_ON_CLOSE, block as suspend (Any?) -> R)
}
}
因此,簡單總結調用棧如下:
當調用receiveChannelZxing.onReceive{},實際上調用了SelectClause1.invoke(),而它里面又調用了SelectClause1.registerSelectClause1(),最終調用了AbstractChannel.registerSelectReceiveMode。
AbstractChannel. registerSelectReceiveMode
#AbstractChannel.kt
private fun <R> registerSelectReceiveMode(select: SelectInstance<R>, receiveMode: Int, block: suspend (Any?) -> R) {
while (true) {
//如果已經有結果了,則直接返回------->①
if (select.isSelected) return
if (isEmptyImpl) {
//沒有發送者在等待,則入隊等待,并返回 ------->②
if (enqueueReceiveSelect(select, block, receiveMode)) return
} else {
//直接取出值------->③
val pollResult = pollSelectInternal(select)
when {
pollResult === ALREADY_SELECTED -> return
pollResult === POLL_FAILED -> {} // retry
pollResult === RETRY_ATOMIC -> {} // retry
//調用block------->④
else -> block.tryStartBlockUnintercepted(select, receiveMode, pollResult)
}
}
}
}
分為4個點,接著來一一分析。
①select 同時監聽多個值,若是有1個符合要求的數據返回了,那么該isSelected 標記為true,當檢測到該標記為true時直接退出。
結合之前的Demo,zbar 已經識別出結果了,當select 檢測zxing的結果時直接返回。
②:
#AbstractChannel.kt
private fun <R> enqueueReceiveSelect(
select: SelectInstance<R>,
block: suspend (Any?) -> R,
receiveMode: Int
): Boolean {
//構造為Node元素
val node = AbstractChannel.ReceiveSelect(this, select, block, receiveMode)
//添加到Channel隊列里
val result = enqueueReceive(node)
if (result) select.disposeOnSelect(node)
return result
}
當select 時,發現Channel里沒有數據,說明Channel還沒有開始send,因此構造了Node(ReceiveSelect)加入到Channel queue里。當send數據時,會查找queue里是否有接收者等待,若有則調用Node(ReceiveSelect.completeResumeReceive):
#AbstractChannel.kt
override fun completeResumeReceive(value: E) {
block.startCoroutineCancellable(
if (receiveMode == RECEIVE_RESULT) ChannelResult.success(value) else value,
select.completion,
resumeOnCancellationFun(value)
)
}
block 被調度執行,最后會恢復select 協程的執行。
③取出數據,并嘗試恢復send協程。
④在③的基礎上,拿到數據后,直接執行block(此時并沒有切換線程進行調度)。
小結一下select 原理:
可以看出:
select 本身執行并不耗時,若最終沒有數據返回則掛起等待,若是有數據返回則不會掛起協程。
我們從頭再捋一下select 配合Channel 的原理:
雖然以Channel為例講解了select 原理,實際上async等結合select 原理大致差不多,重點都是利用了協程的掛起/恢復做文章。
5. Select注意事項
如果select有多個數據同時到達,select 默認會選擇第一個數據,若想要隨機選擇數據,可做如下處理:
var result = selectUnbiased<String> {
//監聽是否有數據發送過來
receiveChannelZxing.onReceive { value ->
"zxing result $value"
}
}
想要知道select 還可以監聽哪些數據,可查看該數據是否實現了SelectClauseX(X 表示0、1、2)。
以上即為Select 的原理及其使用,下篇將會進入協程的精華部分:Flow的運用,該部分內容較多,可能會分幾篇分析,敬請期待。
本文基于Kotlin 1.5.3,文中完整Demo傳送門
原文鏈接:https://blog.csdn.net/wekajava/article/details/126808287
相關推薦
- 2022-12-08 Python如何遍歷numpy數組_python
- 2022-04-18 C#實現在窗體上的統計圖效果_C#教程
- 2023-02-28 ts之 Mixin混入(ts對象的混入、類的混入)
- 2022-10-17 React?中的?useContext使用方法_React
- 2022-03-17 C#表達式樹Expression動態創建表達式_C#教程
- 2022-11-10 C++在多線程中使用condition_variable實現wait_C 語言
- 2022-07-14 Android實現手勢劃定區域裁剪圖片_Android
- 2023-06-05 C++頭文件和cpp文件的原理分析_C 語言
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支