網(wǎng)站首頁 編程語言 正文
聊一聊kotlin協(xié)程“低級”api
Kotlin協(xié)程已經(jīng)出來很久了,相信大家都有不同程度的用上了,由于最近處理的需求有遇到協(xié)程相關(guān),因此今天來聊一Kotlin協(xié)程的“低級”api,首先低級api并不是它真的很“低級”,而是kotlin協(xié)程庫中的基礎(chǔ)api,我們一般開發(fā)用的,其實都是通過低級api進行封裝的高級函數(shù),本章會通過低級api的組合,實現(xiàn)一個自定義的async await 函數(shù)(下文也會介紹kotlin 高級api的async await),涉及的低級api有startCoroutine ,ContinuationInterceptor 等
startCoroutine
我們知道,一個suspend關(guān)鍵字修飾的函數(shù),只能在協(xié)程體中執(zhí)行,伴隨著suspend 關(guān)鍵字,kotlin coroutine common庫(平臺無關(guān))也提供出來一個api,用于直接通過suspend 修飾的函數(shù)直接啟動一個協(xié)程,它就是startCoroutine
@SinceKotlin("1.3")
@Suppress("UNCHECKED_CAST")
public fun <R, T> (suspend R.() -> T).startCoroutine(
作為Receiver
receiver: R,
當前協(xié)程結(jié)束時的回調(diào)
completion: Continuation<T>
) {
createCoroutineUnintercepted(receiver, completion).intercepted().resume(Unit)
}
可以看到,它的Receiver是(suspend R.() -> T),即是一個suspend修飾的函數(shù),那么這個有什么作用呢?我們知道,在普通函數(shù)中無法調(diào)起suspend函數(shù)(因為普通函數(shù)沒有隱含的Continuation對象,這里我們不在這章講,可以參考kotlin協(xié)程的資料)
但是普通函數(shù)是可以調(diào)起一個以suspend函數(shù)作為Receiver的函數(shù)(本質(zhì)也是一個普通函數(shù))
其中startCoroutine就是其中一個,本質(zhì)就是我們直接從外部提供了一個Continuation,同時調(diào)用了resume方法,去進入到了協(xié)程的世界
startCoroutine實現(xiàn)
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
這個原理我們就不細講下去原理,之前也有寫過相關(guān)的文章。通過這種調(diào)用,我們其實就可以實現(xiàn)在普通的函數(shù)環(huán)境,開啟一個協(xié)程環(huán)境(即帶有了Continuation),進而調(diào)用其他的suspend函數(shù)。
ContinuationInterceptor
我們都知道攔截器的概念,那么kotlin協(xié)程也有,就是ContinuationInterceptor,它提供以AOP的方式,讓外部在resume(協(xié)程恢復(fù))前后進行自定義的攔截操作,比如高級api中的Diapatcher就是。當然什么是resume協(xié)程恢復(fù)呢,可能讀者有點懵,我們還是以上圖中出現(xiàn)的mySuspendFunc舉例子
mySuspendFunc是一個suspned函數(shù)
::mySuspendFunc.startCoroutine(object : Continuation<Unit> {
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
}
})
它其實等價于
val continuation = ::mySuspendFunc.createCoroutine(object :Continuation<Unit>{
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
Log.e("hello","當前協(xié)程執(zhí)行完成的回調(diào)")
}
})
continuation.resume(Unit)
startCoroutine方法就相當于創(chuàng)建了一個Continuation對象,并調(diào)用了resume。創(chuàng)建Continuation可通過createCoroutine方法,返回一個Continuation,如果我們不調(diào)用resume方法,那么它其實什么也不會執(zhí)行,只有調(diào)用了resume等執(zhí)行方法之后,才會執(zhí)行到后續(xù)的協(xié)程體(這個也是協(xié)程內(nèi)部實現(xiàn),感興趣可以看看之前文章)
而我們的攔截器,就相當于在continuation.resume前后,可以添加自己的邏輯。我們可以通過繼承ContinuationInterceptor,實現(xiàn)自己的攔截器邏輯,其中需要復(fù)寫的方法是interceptContinuation方法,用于返回一個自己定義的Continuation對象,而我們可以在這個Continuation的resumeWith方法里面(當調(diào)用了resume之后,會執(zhí)行到resumeWith方法),進行前后打印/其他自定義操作(比如切換線程)
class ClassInterceptor() :ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>):Continuation<T> by continuation{
override fun resumeWith(result: Result<T>) {
Log.e("hello","MyContinuation start ${result.getOrThrow()}")
continuation.resumeWith(result)
Log.e("hello","MyContinuation end ")
}
}
其中的key是ContinuationInterceptor,協(xié)程內(nèi)部會在每次協(xié)程恢復(fù)的時候,通過coroutineContext取出key為ContinuationInterceptor的攔截器,進行攔截調(diào)用,當然這也是kotlin協(xié)程內(nèi)部實現(xiàn),這里簡單提一下。
實戰(zhàn)
kotlin協(xié)程api中的 async await
我們來看一下kotlin Coroutine 的高級api async await用法
CoroutineScope(Dispatchers.Main).launch {
val block = async(Dispatchers.IO) {
// 阻塞的事項
}
// 處理其他主線程的事務(wù)
// 此時必須需要async的結(jié)果時,則可通過await()進行獲取
val result = block.await()
}
我們可以通過async方法,在其他線程中處理其他阻塞事務(wù),當主線程必須要用async的結(jié)果的時候,就可以通過await等待,這里如果結(jié)果返回了,則直接獲取值,否則就等待async執(zhí)行完成。這是Coroutine提供給我們的高級api,能夠?qū)⑷蝿?wù)簡單分層而不需要過多的回調(diào)處理。
通過startCoroutine與ContinuationInterceptor實現(xiàn)自定義的 async await
我們可以參考其他語言的async,或者Dart的異步方法調(diào)用,都有類似這種方式進行線程調(diào)用
async {
val result = await {
suspend 函數(shù)
}
消費result
}
await在async作用域里面,同時獲取到result后再進行消費,async可以直接在普通函數(shù)調(diào)用,而不需要在協(xié)程體內(nèi),下面我們來實現(xiàn)一下這個做法。
首先我們想要限定await函數(shù)只能在async的作用域才能使用,那么首先我們就要定義出來一個Receiver,我們可以在Receiver里面定義出自己想要暴露的方法
interface AsyncScope {
fun myFunc(){
}
}
fun async(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend AsyncScope.() -> Unit
) {
// 這個有兩個作用 1.充當receiver 2.completion,接收回調(diào)
val completion = AsyncStub(context)
block.startCoroutine(completion, completion)
}
注意這個類,resumeWith 只會跟startCoroutine的這個協(xié)程綁定關(guān)系,跟await的協(xié)程沒有關(guān)系
class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) :
Continuation<Unit>, AsyncScope {
override fun resumeWith(result: Result<Unit>) {
// 這個是干嘛的 == > 完成的回調(diào)
Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}")
}
}
上面我們定義出來一個async函數(shù),同時定義出來了一個AsyncStub的類,它有兩個用處,第一個是為了充當Receiver,用于規(guī)范后續(xù)的await函數(shù)只能在這個Receiver作用域中調(diào)用,第二個作用是startCoroutine函數(shù)必須要傳入一個參數(shù)completion,是為了收到當前協(xié)程結(jié)束的回調(diào)resumeWith中可以得到當前協(xié)程體結(jié)束回調(diào)的信息
await方法里面
suspend fun<T> AsyncScope.await(block:() -> T) = suspendCoroutine<T> {
// 自定義的Receiver函數(shù)
myFunc()
Thread{
切換線程執(zhí)行await中的方法
it.resumeWith(Result.success(block()))
}.start()
}
在await中,其實是一個擴展函數(shù),我們可以調(diào)用任何在AsyncScope中定義的方法,同時這里我們模擬了一下線程切換的操作(Dispatcher的實現(xiàn),這里不采用Dispatcher就是想讓大家知道其實Dispatcher.IO也是這樣實現(xiàn)的),在子線程中調(diào)用it.resumeWith(Result.success(block())),用于返回所需要的信息
通過上面定的方法,我們可以實現(xiàn)
async {
val result = await {
suspend 函數(shù)
}
消費result
}
public interface ContinuationInterceptor : CoroutineContext.Element
//而CoroutineContext.Element又是繼承于CoroutineContext
CoroutineContext.Element:CoroutineContext
而我們的攔截器,正是CoroutineContext的子類,我們把上文的ClassInterceptor修改一下
class ClassInterceptor() : ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>) :
Continuation<T> by continuation {
private val handler = Handler(Looper.getMainLooper())
override fun resumeWith(result: Result<T>) {
Log.e("hello", "MyContinuation start ${result.getOrThrow()}")
handler.post {
continuation.resumeWith(Result.success(自定義內(nèi)容))
}
Log.e("hello", "MyContinuation end ")
}
}
同時把async默認參數(shù)CoroutineContext實現(xiàn)一下即可
fun async(
context: CoroutineContext = ClassInterceptor(),
block: suspend AsyncScope.() -> Unit
) {
// 這個有兩個作用 1.充當receiver 2.completion,接收回調(diào)
val completion = AsyncStub(context)
block.startCoroutine(completion, completion)
}
此后我們就可以直接通過,完美實現(xiàn)了一個類js協(xié)程的調(diào)用,同時具備了自動切換線程的能力
async {
val result = await {
test()
}
Log.e("hello", "result is $result ${Looper.myLooper() == Looper.getMainLooper()}")
}
結(jié)果
? E ?start?
? E ?MyContinuation start kotlin.Unit
? E ?MyContinuation end?
? E ?end?
? E ?執(zhí)行阻塞函數(shù) test 1923
? E ?MyContinuation start 自定義內(nèi)容數(shù)值
? E ?MyContinuation end?
? E ?result is 自定義內(nèi)容的數(shù)值 ? true
? E ?AsyncStub resumeWith 2 kotlin.Unit
最后,這里需要注意的是,為什么攔截器回調(diào)了兩次,因為我們async的時候開啟了一個協(xié)程,同時await的時候也開啟了一個,因此是兩個。AsyncStub只回調(diào)了一次,是因為AsyncStub被當作complete參數(shù)傳入了async開啟的協(xié)程block.startCoroutine,因此只是async中的協(xié)程結(jié)束才會被回調(diào)。
本章代碼
class ClassInterceptor() : ContinuationInterceptor {
override val key = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
MyContinuation(continuation)
}
class MyContinuation<T>(private val continuation: Continuation<T>) :
Continuation<T> by continuation {
private val handler = Handler(Looper.getMainLooper())
override fun resumeWith(result: Result<T>) {
Log.e("hello", "MyContinuation start ${result.getOrThrow()}")
handler.post {
continuation.resumeWith(Result.success(6 as T))
}
Log.e("hello", "MyContinuation end ")
}
}
interface AsyncScope {
fun myFunc(){
}
}
fun async(
context: CoroutineContext = ClassInterceptor(),
block: suspend AsyncScope.() -> Unit
) {
// 這個有兩個作用 1.充當receiver 2.completion,接收回調(diào)
val completion = AsyncStub(context)
block.startCoroutine(completion, completion)
}
class AsyncStub(override val context: CoroutineContext = EmptyCoroutineContext) :
Continuation<Unit>, AsyncScope {
override fun resumeWith(result: Result<Unit>) {
// 這個是干嘛的 == > 完成的回調(diào)
Log.e("hello","AsyncStub resumeWith ${Thread.currentThread().id} ${result.getOrThrow()}")
}
}
suspend fun<T> AsyncScope.await(block:() -> T) = suspendCoroutine<T> {
myFunc()
Thread{
it.resumeWith(Result.success(block()))
}.start()
}
?模擬阻塞
fun test(): Int {
Thread.sleep(5000)
Log.e("hello", "執(zhí)行阻塞函數(shù) test ${Thread.currentThread().id}")
return 5
}
async {
val result = await {
test()
}
Log.e("hello", "result is $result ${Looper.myLooper() == Looper.getMainLooper()}")
}
最后
我們通過協(xié)程的低級api,實現(xiàn)了一個與官方庫不同版本的async await,同時也希望通過對低級api的設(shè)計,也能對Coroutine官方庫的高級api的實現(xiàn)有一定的了解。
原文鏈接:https://juejin.cn/post/7172813333148958728
相關(guān)推薦
- 2022-06-17 如何使用Python?VTK繪制線條_python
- 2022-05-20 關(guān)于Mybatis-plus中PaginationInterceptor分頁攔截器過時問題解決辦法
- 2022-05-12 基于nginx反向代理獲取用戶真實Ip地址詳解_nginx
- 2021-12-01 C語言system函數(shù)使用方法詳解_C 語言
- 2022-08-22 .Net彈性和瞬態(tài)故障處理庫Polly實現(xiàn)執(zhí)行策略_實用技巧
- 2022-08-04 詳解C++中賦值,關(guān)系,函數(shù)調(diào)用運算符重載的實現(xiàn)_C 語言
- 2022-10-26 Android?Framework層獲取及處理按鍵事件流程_Android
- 2022-05-06 SQL查看表字段信息如:字段名、字段類型、字段精度、字段大小、索引、主鍵等
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動態(tài)字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支