網(wǎng)站首頁 編程語言 正文
異步編程在 Rust 中的地位非常高,很多 crate 尤其是多IO操作的都使用了 async/await.
首先弄清楚異步編程的幾個(gè)基本概念:
Future
Future 代表一個(gè)可在未來某個(gè)時(shí)候獲取返回值的 task,為了獲取這個(gè) task 的執(zhí)行狀況,F(xiàn)uture 提供了一個(gè)函數(shù)用于判斷該 task 是否執(zhí)行返回。
trait Future { type Output; fn poll(self: Pin<&mut self>, cx: &mut Context<'_>) -> Poll<Self::Output>; }
poll 函數(shù)就是一個(gè) Future 用于檢查自己的 task 是否已經(jīng)完成,例如我可以創(chuàng)建一個(gè)與某個(gè) IP 建立 TCP 連接的 struct,在構(gòu)建時(shí)完成建立連接的工作,然后實(shí)現(xiàn) Future trait 時(shí)檢查連接是否已經(jīng)建立完成。根據(jù)建立情況返回 enum Poll 中的兩個(gè)元素之一:
- Poll::Pending: task 還在等待
- Poll::Ready(result): task 攜帶 result 返回
實(shí)際上,基于 async 定義的函數(shù)和代碼塊也會(huì)被編譯器編譯為 Future。但是 async 函數(shù)或代碼塊無法顯式地返回 Pending,因此一般只能完成一些簡單的調(diào)用其他 Future 的工作。復(fù)雜的異步過程通常還是交由實(shí)現(xiàn)了 Future trait 的類型完成。
Wake & Context
你可能會(huì)好奇上面 poll 函數(shù)簽名里的 cx 參數(shù)的作用,在 Rust 官方文檔的定義中,Context 暫時(shí)只用于獲取 Waker,而 Waker 的作用是用于提醒 executor 該 task 已經(jīng)準(zhǔn)備好運(yùn)行了。
為什么需要 executor ?
同樣以上面的建立 TCP 連接的例子來說,在網(wǎng)絡(luò)卡頓時(shí),進(jìn)行一次 poll 可能都沒有建立連接,如果沒有設(shè)置 timeout 之類的東西的話,就需要進(jìn)行多次 poll。這樣的 Future 多了以后,我們可能會(huì)想,不妨將所有的 Future 都存儲(chǔ)在一起,然后另起一個(gè)線程用于循環(huán)遍歷所有的 Future 是否已經(jīng) ready,如果 ready 則返回結(jié)果。這就是一個(gè)非常簡單的單線程 executor 的雛形。
也就是說,executor 是一個(gè)托管運(yùn)行 task 的工具,類似于多線程,多線程要成功運(yùn)行需要一個(gè)調(diào)度器進(jìn)行調(diào)度。但是多線程至少需要語言層面甚至操作系統(tǒng)層面的支持,而 executor,如果你翻看 Rust 的官方文檔的話,會(huì)發(fā)現(xiàn)沒有任何關(guān)于 executor 的實(shí)現(xiàn)。實(shí)際上,Rust 選擇將 executor 的實(shí)現(xiàn)交給第三方,自己只保留相關(guān)的交互接口(我在隔壁C++看了看,似乎也是一樣的做法,并沒有一個(gè)官方的 executor 實(shí)現(xiàn),我唯一所知的在語言層面提供支持的只有Golang 的 goroutine)。
什么是 waker ?
上面講述的輪詢所有的 Future 是否已經(jīng)完成實(shí)際是最低效的一種做法,當(dāng) Future 多了以后會(huì)帶來相當(dāng)多的 CPU 損耗。考慮到這點(diǎn),Rust 還提供了一種機(jī)制可以用于通知 executor 某個(gè) Future 是否應(yīng)該被輪詢,當(dāng)然這只是其中的一種解決方式,實(shí)際上 Waker 的 wake 函數(shù)可以被實(shí)現(xiàn)為任何邏輯,取決于 executor。
在我看來,Waker 的內(nèi)部定義相當(dāng)不簡潔,相當(dāng)不 Rust。Waker 內(nèi)部定義有一個(gè) RawWaker,RawWaker 包含一個(gè) RawWakerVTable,RawWakerVTable 定義了四個(gè)函數(shù)指針,executor 要實(shí)現(xiàn) Waker 就需要定義這四種類型的函數(shù)然后賦值給 RawWakerVTable。
struct Waker { waker: RawWaker } struct RawWaker { data: *const (), vtable: &'static RawWakerVTable } struct RawWakerVTable { clone: unsafe fn(*const ()) -> RawWaker, wake: unsafe fn(*const ()), wake_by_ref: unsafe fn(*const ()), drop: unsafe fn(*const ()) }
之所以沒有設(shè)計(jì)為 trait 形式,主要是 clone 函數(shù),受限于 Rust 的 trait object safety,trait 中的任何函數(shù)的參數(shù)或返回值如果包含 Self 且有 type bound Sized,則不符合 trait object safe 規(guī)范,這樣的 trait 可以被定義,可以被實(shí)現(xiàn),但是無法與 dyn 一起進(jìn)行動(dòng)態(tài)綁定。
而 clones 函數(shù)又是必須的,因?yàn)?future 可能還會(huì)接著調(diào)用 future 的 poll 方法,就需要再 clone 一個(gè) context 傳入。
或許可以用 Box<dyn Waker>
或者 Arc<dyn Waker>
之類的,但是這些都不比 raw pointer 靈活,所以最終 Rust 還是選擇定義一個(gè)包含函數(shù)指針的 struct。
async/await
這兩個(gè)關(guān)鍵字可以說是異步編程領(lǐng)域的標(biāo)志。,但在 Rust 中這兩個(gè)關(guān)鍵字只是起到語法糖的作用,并不是異步的核心。
async 用于快速創(chuàng)建 Future,不管是函數(shù)還是代碼塊或者lambda表達(dá)式,都可以在前面加上 async 關(guān)鍵字快速變成 Future。對(duì)于
async fn bar() { foo().await; }
編譯器會(huì)自動(dòng)生成類似下面的代碼
fn bar() -> impl Future { std::future::from_generator(move |mut _task_context| { let _t = { match std::future::IntoFuture::into_future(foo()) { mut __awaitee => loop { match unsafe { std::future::Future::poll( std::pin::Pin::new_unchecked(&mut __awaitee), std::future::get_context(_task_context), ) } { std::task::Poll::Ready { 0: result } => break result, std::task::Poll::Pending {} => {} } _task_context = (yield ()); }, }; }; _t }) }
Tips:上面的代碼可以在 Rust Playground 里面點(diǎn)生成 HIR 看到。
Executor
前面講到 wake 的時(shí)候,其實(shí)現(xiàn)與具體的 executor 相關(guān),但是我覺得如果不從 executor 的實(shí)現(xiàn)角度看一下比較難以理解,只能淺顯地知道 wake 是告訴 executor 準(zhǔn)備再 poll 一遍。
Rust 中我知道的 async runtime lib 就是 futures-rs 和 tokio,前者在 GitHub 上是 rust-lang 官方組織推出的 repo,而后者雖然不清楚是否有官方參與,但是功能明顯比前者豐富,據(jù)我所知使用異步的項(xiàng)目大部分都是使用 tokio。
我這里選擇更簡單的 futures-rs 講一下其 executor 的實(shí)現(xiàn),雖然其更加輕量但起碼也是官方推出的,有質(zhì)量保證。
Waker struct 到 ArcWake trait
futures-rs 還是將標(biāo)準(zhǔn)庫里面的 Waker 封裝成了 ArcWake trait,并且是 pub 的。和 raw pointer 打交道畢竟是 unsafe 的,與其滿篇的 unsafe 亂飛,不如將 unsafe 限制在一定的范圍內(nèi)。
Waker 本質(zhì)上是一個(gè)變量的指針(data)帶著四個(gè)函數(shù)指針的結(jié)構(gòu)體(RawWakerVTable),因此在定義函數(shù)指針時(shí)只需要將指針強(qiáng)轉(zhuǎn)成實(shí)現(xiàn)某個(gè) trait 的泛型,再調(diào)用該 trait 的對(duì)應(yīng)方法不就可以了。以 wake 函數(shù)為例:
trait Wake { fn wake(self) { Wake::wake_by_ref(&self); } fn wake_by_ref(&self); } unsafe fn wake<T: WakeTrait>(data: *const ()) {//對(duì)應(yīng)RawWakerVTable里的函數(shù)指針 let v = data.cast::<T>(); v.wake(); }
這樣就實(shí)現(xiàn)了 Waker struct 到 Waker trait 的轉(zhuǎn)換。盡管如此,我們還需要一個(gè)結(jié)構(gòu)體用來表示 Waker,滿足下列條件:
- 實(shí)現(xiàn) Deref trait,在引用時(shí)返回 &std::task::Waker
- 為了滿足 Rust 的 safety rules,需要手動(dòng)管理data的內(nèi)存,顯然某個(gè)實(shí)現(xiàn)了 Wake 的類型不會(huì)為了創(chuàng)建 waker 就交出自己的擁有權(quán),因此只能通過傳入的引用轉(zhuǎn)成指針來創(chuàng)建 ManuallyDrop 實(shí)例,并考慮到 Deref trait 和后續(xù)的 Context 創(chuàng)建,需要通過 PhantomData 來管理 lifetime annotation
從而創(chuàng)建 WakeRef 結(jié)構(gòu)體:
use std::mem::ManuallyDrop; use std::task::Waker; use std::marker::PhantomData; struct WakeRef<'a> { waker: ManuallyDrop<Waker>, _marker: PhantomData<&'a ()> }
如何根據(jù)引用創(chuàng)建 WakeRef 實(shí)例:
use std::task::{Waker, RawWaker}; fn get_waker<W: Wake>(wake: &W) -> WakeRef<'_> { let ptr = wake as *const _ as *const (); WakeRef { waker: ManuallyDrop::new(unsafe {Waker::from_raw(RawWaker::new(ptr, ...))}),//...省略的是創(chuàng)建RawWakerVTable的過程 _marker: PhantomData } }
實(shí)現(xiàn) Deref
use std::task::Waker; impl std::ops::Deref for WakeRef<'_> { type Target = Waker; fn deref(&self) -> &Waker { &self.waker } }
因此對(duì)于某個(gè)實(shí)現(xiàn) Wake 的類型來說,只需要傳入引用就可以用 Context::from_waker(&waker) 來創(chuàng)建 context 了。
在 futures-rs 中,由于涉及到多線程,所以上述的其實(shí)并不安全,需要將普通引用改成 Arc 用于在多線程之間傳遞,Wake trait 也變成了 ArcWake,
trait ArcWake: Send + Sync { fn wake(self: Arc<Self>) { Self::wake_by_ref(&self) } fn wake_by_ref(arc_self: &Arc<Self>); }
但是道理差不多。RawWakerVTable 的四個(gè)函數(shù)也與這個(gè)有關(guān),以 wake 函數(shù)為例:
unsafe fn wake_arc_raw<T: ArcWake>(data: *const ()) { let arc: Arc<T> = Arc::from_raw(data.cast::<T>()); ArcWake::wake(arc); }
FuturesUnordered
FuturesUnordered 是一個(gè) Future 的托管容器,其有一條鏈表維護(hù)所有的 Future,再通過一個(gè)隊(duì)列維護(hù)所有需要運(yùn)行的 Future(當(dāng)然這里都不是 collections 里面那種普通的鏈表和隊(duì)列,由于 FuturesUnordered 其實(shí)要與單線程和線程池 executor 共用,所以這兩個(gè)數(shù)據(jù)結(jié)構(gòu)其實(shí)還涉及很多原子化操作,在保證原子化且無鎖的前提下要設(shè)計(jì)一個(gè)鏈表還挺麻煩的)。
struct FuturesUnordered<Fut> { ready_to_run_queue: Arc<ReadyToRunQueue<Fut>>,//需要運(yùn)行的Future隊(duì)列 head_all: AtomicPtr<Task<Fut>>,//所有Future組成的鏈表 is_terminated: AtomicBool }
這里重點(diǎn)看 FuturesUnordered 如何實(shí)現(xiàn) Waker,F(xiàn)uturesUnordered 將 Future 看作一個(gè)個(gè) Task 。
struct Task<Fut> { future: UnsafeCell<Option<Fut>>, next_all: AtomicPtr<Task<Fut>>,//下一個(gè)Task節(jié)點(diǎn) len_all: UnsafeCell<usize>,//鏈表長度 next_ready_to_run: AtomicPtr<Task<Fut>>,//下一個(gè)要運(yùn)行的Task ready_to_run_queue: Weak<ReadyToRunQueue<Fut>>, queued: AtomicBool,//是否在Task鏈表內(nèi)(Task運(yùn)行時(shí)需要從鏈表上摘下) woken: AtomicBool//是否已經(jīng)調(diào)用wake函數(shù) }
為 Task 實(shí)現(xiàn) ArcWake
impl<Fut> ArcWake for Task<Fut> { fn wake_by_ref(arc_self: &Arc<Self>) { let inner = match arc_self.ready_to_run_queue.upgrade() { Some(inner) => inner, None => return, }; arc_self.woken.store(true, Relaxed); let prev = arc_self.queued.swap(true, SeqCst); if !prev { inner.enqueue(Arc::as_ptr(arc_self)); inner.waker.wake(); } } }
當(dāng)一個(gè) Task 運(yùn)行(被poll)時(shí),其被從 FuturesUnordered 的 ready_to_run_queue 上摘下來,而在 wake 中又會(huì)重新放回去。因此,如果 Future 內(nèi)部調(diào)用了 wake,則 Task 會(huì)再被放到 ready_to_run_queue 上運(yùn)行,如果沒有則不會(huì)。
所以每個(gè) Future 使用的 context 其實(shí)是來自于 Task:
let waker = Task::waker_ref(task); let mut cx = Context::from_waker(&waker); future.poll(&mut cx);
FuturesUnordered 本身實(shí)現(xiàn)了 Stream trait
trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }
FuturesUnordered 輪流 poll ready_to_run_queue 里面的 Future,根據(jù)返回結(jié)果返回:
- Poll::Pending: ready_to_run_queue 為空或所有 Future 已經(jīng) poll 了一遍
- Poll::Ready(Some(res)): 某個(gè) Future 返回 Ready(res)
- Poll::Ready(None): Task 鏈表為空,所有 Task 都已經(jīng)結(jié)束返回
值得注意的是,在第一種情況下,所有的 Future 都 poll 了一遍,F(xiàn)uturesUnordered 會(huì)調(diào)用一次 wake,告訴 executor FuturesUnordered 已經(jīng)運(yùn)行了一個(gè)輪回,wake 具體的實(shí)現(xiàn)則取決于 executor。
單線程 executor
單線程 executor 允許在單線程上復(fù)用任意數(shù)量的 task,官方建議盡量在多I/O、只需要在 I/O 操作之間完成很少的工作的場景下使用。
struct LocalPool { pool: FuturesUnordered<LocalFutureObj<'static, ()>>, incoming: Rc<Incoming> }
單線程 executor 將 Waker 的 wake 與線程的 wake 綁定,當(dāng)調(diào)用 wake 時(shí),如果 executor 線程處于 park(即阻塞) 狀態(tài),則 unpark 線程。
struct ThreadNotify { thread: std::thread::Thread, unparked: AtomicBool } impl ArcWake for ThreadNotify { fn wake_by_ref(arc_self: &Arc<Self>) { let unparked = arc_self.unparked.swap(true, Ordering::Release); if !unparked { arc_self.thread.unpark(); } } }
先看 LocalPool 如何定義 run 操作:
fn run_executor<T, F>(mut f: F) -> T where F: FnMut(&mut Context<'_>) -> Poll<T> { CURRENT_THREAD_NOTIFY.with(|thread_notify| { let waker = waker_ref(thread_notify); let mut cx = Context::from_waker(&waker); loop { if let Poll::Ready(t) = f(&mut cx) {//f決定了executor的運(yùn)行方式,只要返回Ready就表明executor結(jié)束運(yùn)行。 return t; } while !thread_notify.unparked.swap(false, Ordering::Acquire) { thread::park(); } } }) }
從 FutureUnordered 的角度來看,在 poll 一遍之后,如果需要繼續(xù)運(yùn)行,則調(diào)用 wake,將 unparked token 置為 true,此時(shí)線程不會(huì)陷入阻塞;否則 executor 線程會(huì)主動(dòng)陷入阻塞。由于 FutureUnordered 和 executor 實(shí)際處于同一線程,因此此時(shí) executor 只能從其他線程 unpark。
這種設(shè)計(jì)節(jié)省了 CPU 資源,使得線程只在有 Future 需要 poll 時(shí)需要運(yùn)行,沒有則掛起,再有了就又可以繼續(xù)運(yùn)行。
線程池 executor
線程池顯然要比單線程 executor 更加復(fù)雜,隨便一想就想到其至少要實(shí)現(xiàn)以下幾點(diǎn):
- 新 spawn 一個(gè) Future,如何分配到某個(gè)線程
- 類似于單線程,在線程沒有被調(diào)用 wake 時(shí)主動(dòng)阻塞
對(duì)于第一點(diǎn),使用多生產(chǎn)者單消費(fèi)者管道 mpsc 進(jìn)行 Future 的分發(fā),實(shí)際的模型其實(shí)應(yīng)該是多消費(fèi)者單生產(chǎn)者,但是 Rust 并不提供這種管道,所以這里使用管道配合 mutex 使用。
struct PoolState { tx: Mutex<mpsc::Sender<Message>>, rx: Mutex<mpsc::Receiver<Message>>, cnt: AtomicUsize,//clone size size: usize//pool size }
將 PoolState 包在 Arc 下就變成了 ThreadPool
struct ThreadPool { state: Arc<PoolState> }
當(dāng) executor spawn 一個(gè)新的 future 時(shí),只需要將其封裝為一個(gè) Task,然后傳入管道:
fn spwan_obj_ok(&self, future: FutureObj<'static, ()>) { let task = Task { future, wake_handle: Arc::new(WakeHandl {exec: self.clone(), mutex: UnparkMutex::new()}), exec: self.clone() }; self.state.send(Message::Run(task)); }
ThreadPool 也有自定義的 Task:
struct Task { future: FutureObj<'static ()>, exec: ThreadPool, wake_handle: Arc<WakeHandle> } struct WakeHandle { mutex: UnparkMutex<Task>, exec: ThreadPool }
Task 主要分為以下狀態(tài):
- POLLING: 正在poll
- REPOLL: 正在 poll 的 Task 如果調(diào)用 wake 會(huì)變成 REPOLL 狀態(tài)
- WAITING: Task 正在等待
- COMPLETE:Task 已經(jīng)完成
為 Task 在不同狀態(tài)間的轉(zhuǎn)換,有些轉(zhuǎn)換是自動(dòng)的,比如 poll 返回 Ready 時(shí)自動(dòng)進(jìn)入 COMPLETE 狀態(tài),在 REPOLL 狀態(tài)會(huì)通過調(diào)用 wait 函數(shù)再次進(jìn)入 POLLING 狀態(tài)重復(fù)運(yùn)行一次 poll 函數(shù);有些轉(zhuǎn)換則需要調(diào)用函數(shù),比如從 WAITING 進(jìn)入 POLLING 需要調(diào)用 Task 的 run 函數(shù)才能運(yùn)行。poll 返回 Pending 時(shí)根據(jù) Future 是否調(diào)用 wake 函數(shù)分別進(jìn)入 REPOLL 和 WAITING 狀態(tài)。
impl Task { fn run(self) { let Self { mut future, wake_handle, mut exec } = self; let waker = waker_ref(&wake_handle); let mut cx = Context::from_waker(&waker); unsafe { wake_handle.mutex.start_poll(); loop { let res = future.poll_unpin(&mut cx); match res { Poll::Pending => {} Poll::Ready(()) => return wake_handle.mutex.complete(), } let task = Self { future, wake_handle: wake_handle.clone(), exec }; match wake_handle.mutex.wait(task) { Ok(()) => return, // we've waited Err(task) => { // someone's notified us future = task.future; exec = task.exec; } } } } } }
線程池 executor 和單線程 executor 對(duì)待 Pending 的方式,相同點(diǎn)在于如果 Future 沒有調(diào)用 wake,則放棄 Future,F(xiàn)uture 要運(yùn)行只能重新 spawn。不同點(diǎn):
- 線程池:如果 Future 調(diào)用 wake,所在的線程阻塞式調(diào)用 poll 直到返回 Ready 或者 Future 放棄調(diào)用 wake
- 單線程:調(diào)用 wake 不會(huì)立刻再屌用 poll,但加入到 ready_to_run_queue 里面在下一次循環(huán)中被 poll
總結(jié)
本文只是一篇介紹 Rust 異步編程的原理,并通過具體的倉庫稍微深挖一下實(shí)現(xiàn)的過程。具體的原因還是官方文檔的介紹非常模糊,以我來說,第一次看到 Waker 完全不知道怎么用,底層到底是干了什么,"Future be ready to run again" 又是什么意思。如果不稍微看一下 runtime lib 的源碼,有些東西很難理解。
本文只是簡單介紹了一個(gè) futures-rs 的實(shí)現(xiàn),executor 方面都忽略了很多細(xì)節(jié)。而 futures-rs 還有大量的擴(kuò)展代碼藏在 util 目錄下,但是這些東西一般看看文檔就知道大概做了什么,懂得異步的實(shí)現(xiàn)原理就知道大概是怎么實(shí)現(xiàn)的,如果實(shí)在不懂還是可以去看源碼。
原文鏈接:https://www.cnblogs.com/kaleidopink/archive/2022/09/05/16659468.html
相關(guān)推薦
- 2023-05-30 python中pip無法正確安裝或路徑出錯(cuò)的解決方案_python
- 2022-10-29 【npm 報(bào)錯(cuò) gyp info it worked if it ends with ok 大概率是
- 2023-11-15 latex表格添加注腳;對(duì)表格的內(nèi)容進(jìn)行注釋
- 2022-04-03 golang?歸并排序,快速排序,堆排序的實(shí)現(xiàn)_Golang
- 2022-12-05 關(guān)于EF的Code?First的使用以及踩坑記錄_實(shí)用技巧
- 2022-02-25 Servlet配置啟動(dòng)級(jí)別loadOnStartup注意事項(xiàng)
- 2022-08-31 .Net插件框架Managed?Extensibility?Framework簡介_實(shí)用技巧
- 2022-06-12 超詳細(xì)分析C語言動(dòng)態(tài)內(nèi)存管理問題_C 語言
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支