網站首頁 編程語言 正文
一、select是什么
select——>用于選擇更快的結果。
基于場景理解
比如客戶端要查詢一個商品的詳情。兩個服務:緩存服務,速度快但信息可能是舊的;網絡服務,速度慢但信息一定是最新的。
如何實現上述邏輯:
runBlocking {
suspend fun getCacheInfo(productId: String): Product {
delay(100L)
return Product(productId, 8.9)
}
suspend fun getNetworkInfo(productId: String): Product? {
delay(200L)
return Product(productId, 8.8)
}
fun updateUI(product: Product) {
println("${product.productId} : ${product.price}")
}
val startTime = System.currentTimeMillis()
val productId = "001"
val cacheInfo = getCacheInfo(productId)
if (cacheInfo != null) {
updateUI(cacheInfo)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
val latestInfo = getNetworkInfo(productId)
if (latestInfo != null) {
updateUI(latestInfo)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
}
001 : 8.9
Time cost: 113
001 : 8.8
Time cost: 324
上述程序分為四步:第一步:查詢緩存信息;第二步:緩存服務返回信息,更新 UI;第三步:查詢網絡服務;第四步:網絡服務返回信息,更新 UI。
用戶可以第一時間看到商品的信息,雖然它暫時會展示舊的信息,但由于我們同時查詢了網絡服務,舊緩存信息也馬上會被替代成新的信息。但是可能存在一些問題:如果程序卡在了緩存服務,獲取網絡服務就會無法執行。是因為 getCacheInfo() 它是一個掛起函數,只有這個程序執行成功以后,才可以繼續執行后面的任務。能否做到:兩個掛起函數同時執行,誰返回的速度更快,就選擇哪個結果。答案是使用select。
runBlocking {
suspend fun getCacheInfo(productId: String): Product {
delay(100L)
return Product(productId, 8.9)
}
suspend fun getNetworkInfo(productId: String): Product {
delay(200L)
return Product(productId, 8.8)
}
fun updateUI(product: Product) {
println("${product.productId} : ${product.price}")
}
val startTime = System.currentTimeMillis()
val productId = "001"
val product = select<Product?> {
async {
getCacheInfo(productId)
}.onAwait {
it
}
async {
getNetworkInfo(productId)
}.onAwait {
it
}
}
if (product != null) {
updateUI(product)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
}
001 : 8.9
Time cost: 134
?
Process finished with exit code 0
由于緩存的服務更快,所以,select 確實幫我們選擇了更快的那個結果。我們的 select 可以在緩存服務出現問題的時候,靈活選擇網絡服務的結果。從而避免用戶等待太長的時間,得到糟糕的體驗。
在上述代碼中,用戶大概率是會展示舊的緩存信息。但實際場景下,我們是需要進一步更新最新信息的。
runBlocking {
suspend fun getCacheInfo(productId: String): Product {
delay(100L)
return Product(productId, 8.9)
}
suspend fun getNetworkInfo(productId: String): Product {
delay(200L)
return Product(productId, 8.8)
}
fun updateUI(product: Product) {
println("${product.productId} : ${product.price}")
}
val startTime = System.currentTimeMillis()
val productId = "001"
val cacheDeferred = async {
getCacheInfo(productId)
}
val latestDeferred = async {
getNetworkInfo(productId)
}
val product = select<Product?> {
cacheDeferred.onAwait {
it.copy(isCache = true)
}
latestDeferred.onAwait {
it.copy(isCache = false)
}
}
if (product != null) {
updateUI(product)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
if (product != null && product.isCache) {
val latest = latestDeferred.await() ?: return@runBlocking
updateUI(latest)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
}
001 : 8.9
Time cost: 124
001 : 8.8
Time cost: 228
?
Process finished with exit code 0
如果是多個服務的緩存場景呢?
runBlocking {
val startTime = System.currentTimeMillis()
val productId = "001"
suspend fun getCacheInfo(productId: String): Product {
delay(100L)
return Product(productId, 8.9)
}
suspend fun getCacheInfo2(productId: String): Product {
delay(50L)
return Product(productId, 8.85)
}
suspend fun getNetworkInfo(productId: String): Product {
delay(200L)
return Product(productId, 8.8)
}
fun updateUI(product: Product) {
println("${product.productId} : ${product.price}")
}
val cacheDeferred = async {
getCacheInfo(productId)
}
val cacheDeferred2 = async {
getCacheInfo2(productId)
}
val latestDeferred = async {
getNetworkInfo(productId)
}
val product = select<Product?> {
cacheDeferred.onAwait {
it.copy(isCache = true)
}
cacheDeferred2.onAwait {
it.copy(isCache = true)
}
latestDeferred.onAwait {
it.copy(isCache = true)
}
}
if (product != null) {
updateUI(product)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
if (product != null && product.isCache) {
val latest = latestDeferred.await()
updateUI(latest)
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
}
Log
?
001 : 8.85
Time cost: 79
001 : 8.8
Time cost: 229
?
Process finished with exit code 0
select 代碼模式,可以提升程序的整體響應速度。
二、select和Channel
runBlocking {
val startTime = System.currentTimeMillis()
val channel1 = produce {
send(1)
delay(200L)
send(2)
delay(200L)
send(3)
}
val channel2 = produce {
delay(100L)
send("a")
delay(200L)
send("b")
delay(200L)
send("c")
}
channel1.consumeEach {
println(it)
}
channel2.consumeEach {
println(it)
}
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
Log
?
1
2
3
a
b
c
Time cost: 853
?
Process finished with exit code 0
上述代碼串行執行,可以使用select進行優化。
runBlocking {
val startTime = System.currentTimeMillis()
val channel1 = produce {
send(1)
delay(200L)
send(2)
delay(200L)
send(3)
}
val channel2 = produce {
delay(100L)
send("a")
delay(200L)
send("b")
delay(200L)
send("c")
}
suspend fun selectChannel(
channel1: ReceiveChannel<Int>,
channel2: ReceiveChannel<String>
): Any {
return select<Any> {
if (!channel1.isClosedForReceive) {
channel1.onReceive {
it.also {
println(it)
}
}
}
if (!channel2.isClosedForReceive) {
channel2.onReceive {
it.also {
println(it)
}
}
}
}
}
repeat(6) {
selectChannel(channel1, channel2)
}
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
Log
1
a
2
b
3
c
Time cost: 574
?
Process finished with exit code 0
從代碼執行結果可以發現程序的執行耗時有效減少。onReceive{} 是 Channel 在 select 當中的語法,當 Channel 當中有數據以后,它就會被回調,通過這個 Lambda,將結果傳出去。執行了 6 次 select,目的是要把兩個管道中的所有數據都消耗掉。
如果Channel1不生產數據了,程序會如何執行?
runBlocking {
val startTime = System.currentTimeMillis()
val channel1 = produce<String> {
delay(5000L)
}
val channel2 = produce<String> {
delay(100L)
send("a")
delay(200L)
send("b")
delay(200L)
send("c")
}
suspend fun selectChannel(
channel1: ReceiveChannel<String>,
channel2: ReceiveChannel<String>
): String = select<String> {
channel1.onReceive {
it.also {
println(it)
}
}
channel2.onReceive {
it.also {
println(it)
}
}
}
repeat(3) {
selectChannel(channel1, channel2)
}
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
Log
a
b
c
Time cost: 570
?
Process finished with exit code 0
如果不知道Channel的個數,如何避免ClosedReceiveChannelException?
使用:onReceiveCatching{}
runBlocking {
val startTime = System.currentTimeMillis()
val channel1 = produce<String> {
delay(5000L)
}
val channel2 = produce<String> {
delay(100L)
send("a")
delay(200L)
send("b")
delay(200L)
send("c")
}
suspend fun selectChannel(
channel1: ReceiveChannel<String>,
channel2: ReceiveChannel<String>
): String = select<String> {
channel1.onReceiveCatching {
it.getOrNull() ?: "channel1 is closed!"
}
channel2.onReceiveCatching {
it.getOrNull() ?: "channel2 is closed!"
}
}
repeat(6) {
val result = selectChannel(channel1, channel2)
println(result)
}
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
Log
a
b
c
channel2 is closed!
channel2 is closed!
channel2 is closed!
Time cost: 584
?
Process finished with exit code 0
得到所有結果以后,程序不會立即退出,因為 channel1 一直在 delay()。
所以我們需要在6次repeat之后將channel關閉。
runBlocking {
val startTime = System.currentTimeMillis()
val channel1 = produce<String> {
delay(15000L)
}
val channel2 = produce<String> {
delay(100L)
send("a")
delay(200L)
send("b")
delay(200L)
send("c")
}
suspend fun selectChannel(
channel1: ReceiveChannel<String>,
channel2: ReceiveChannel<String>
): String = select<String> {
channel1.onReceiveCatching {
it.getOrNull() ?: "channel1 is closed!"
}
channel2.onReceiveCatching {
it.getOrNull() ?: "channel2 is closed!"
}
}
repeat(6) {
val result = selectChannel(channel1, channel2)
println(result)
}
channel1.cancel()
channel2.cancel()
println("Time cost: ${System.currentTimeMillis() - startTime}")
}
Log
a
b
c
channel2 is closed!
channel2 is closed!
channel2 is closed!
Time cost: 612
?
Process finished with exit code 0
Deferred、Channel 的 API:
public interface Deferred : CoroutineContext.Element {
public suspend fun join()
public suspend fun await(): T
public val onJoin: SelectClause0
public val onAwait: SelectClause1<T>
}
public interface SendChannel<in E>
public suspend fun send(element: E)
public val onSend: SelectClause2<E, SendChannel<E>>
}
public interface ReceiveChannel<out E> {
public suspend fun receive(): E
public suspend fun receiveCatching(): ChannelResult<E>
public val onReceive: SelectClause1<E>
public val onReceiveCatching: SelectClause1<ChannelResult<E>>
}
當 select 與 Deferred 結合使用的時候,當并行的 Deferred 比較多的時候,你往往需要在得到一個最快的結果以后,去取消其他的 Deferred。
通過 async 并發執行協程,也可以借助 select 得到最快的結果。
runBlocking {
suspend fun <T> fastest(vararg deferreds: Deferred<T>): T = select {
fun cancelAll() = deferreds.forEach {
it.cancel()
}
for (deferred in deferreds) {
deferred.onAwait {
cancelAll()
it
}
}
}
val deferred1 = async {
delay(100L)
println("done1")
"result1"
}
val deferred2 = async {
delay(200L)
println("done2")
"result2"
}
val deferred3 = async {
delay(300L)
println("done3")
"result3"
}
val deferred4 = async {
delay(400L)
println("done4")
"result4"
}
val deferred5 = async {
delay(5000L)
println("done5")
"result5"
}
val fastest = fastest(deferred1, deferred2, deferred3, deferred4, deferred5)
println(fastest)
}
Log
?
done1
result1
?
Process finished with exit code 0
原文鏈接:https://blog.csdn.net/zhangying1994/article/details/127485681
相關推薦
- 2022-12-01 使用SDLocalize實現高效完成iOS多語言工作_IOS
- 2022-05-24 C#?連接本地數據庫的實現示例_C#教程
- 2022-12-07 C語言實現打印數字金字塔_C 語言
- 2023-03-26 react組件實例屬性props實例詳解_React
- 2022-03-14 C語言撲克牌游戲示例(c語言紙牌游戲)
- 2022-03-23 Android應用內懸浮窗Activity的簡單實現_Android
- 2021-10-12 shell實現Fisher–Yates?shuffle洗牌算法介紹_linux shell
- 2022-04-20 Xamarin渲染器移植到.NET?MAUI項目中_實用技巧
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支