<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
非同步程式設計在 Rust 中的地位非常高,很多 crate 尤其是多IO操作的都使用了 async/await.
首先弄清楚非同步程式設計的幾個基本概念:
Future 代表一個可在未來某個時候獲取返回值的 task,為了獲取這個 task 的執行狀況,Future 提供了一個函數用於判斷該 task 是否執行返回。
trait Future { type Output; fn poll(self: Pin<&mut self>, cx: &mut Context<'_>) -> Poll<Self::Output>; }
poll 函數就是一個 Future 用於檢查自己的 task 是否已經完成,例如我可以建立一個與某個 IP 建立 TCP 連線的 struct,在構建時完成建立連線的工作,然後實現 Future trait 時檢查連線是否已經建立完成。根據建立情況返回 enum Poll 中的兩個元素之一:
實際上,基於 async 定義的函數和程式碼塊也會被編譯器編譯為 Future。但是 async 函數或程式碼塊無法顯式地返回 Pending,因此一般只能完成一些簡單的呼叫其他 Future 的工作。複雜的非同步過程通常還是交由實現了 Future trait 的型別完成。
你可能會好奇上面 poll 函數簽名裡的 cx 引數的作用,在 Rust 官方檔案的定義中,Context 暫時只用於獲取 Waker,而 Waker 的作用是用於提醒 executor 該 task 已經準備好執行了。
同樣以上面的建立 TCP 連線的例子來說,在網路卡頓時,進行一次 poll 可能都沒有建立連線,如果沒有設定 timeout 之類的東西的話,就需要進行多次 poll。這樣的 Future 多了以後,我們可能會想,不妨將所有的 Future 都儲存在一起,然後另起一個執行緒用於迴圈遍歷所有的 Future 是否已經 ready,如果 ready 則返回結果。這就是一個非常簡單的單執行緒 executor 的雛形。
也就是說,executor 是一個託管執行 task 的工具,類似於多執行緒,多執行緒要成功執行需要一個排程器進行排程。但是多執行緒至少需要語言層面甚至作業系統層面的支援,而 executor,如果你翻看 Rust 的官方檔案的話,會發現沒有任何關於 executor 的實現。實際上,Rust 選擇將 executor 的實現交給第三方,自己只保留相關的互動介面(我在隔壁C++看了看,似乎也是一樣的做法,並沒有一個官方的 executor 實現,我唯一所知的在語言層面提供支援的只有Golang 的 goroutine)。
上面講述的輪詢所有的 Future 是否已經完成實際是最低效的一種做法,當 Future 多了以後會帶來相當多的 CPU 損耗。考慮到這點,Rust 還提供了一種機制可以用於通知 executor 某個 Future 是否應該被輪詢,當然這只是其中的一種解決方式,實際上 Waker 的 wake 函數可以被實現為任何邏輯,取決於 executor。
在我看來,Waker 的內部定義相當不簡潔,相當不 Rust。Waker 內部定義有一個 RawWaker,RawWaker 包含一個 RawWakerVTable,RawWakerVTable 定義了四個函數指標,executor 要實現 Waker 就需要定義這四種型別的函數然後賦值給 RawWakerVTable。
struct Waker { waker: RawWaker } struct RawWaker { data: *const (), vtable: &'static RawWakerVTable } struct RawWakerVTable { clone: unsafe fn(*const ()) -> RawWaker, wake: unsafe fn(*const ()), wake_by_ref: unsafe fn(*const ()), drop: unsafe fn(*const ()) }
之所以沒有設計為 trait 形式,主要是 clone 函數,受限於 Rust 的 trait object safety,trait 中的任何函數的引數或返回值如果包含 Self 且有 type bound Sized,則不符合 trait object safe 規範,這樣的 trait 可以被定義,可以被實現,但是無法與 dyn 一起進行動態繫結。
而 clones 函數又是必須的,因為 future 可能還會接著呼叫 future 的 poll 方法,就需要再 clone 一個 context 傳入。
或許可以用 Box<dyn Waker>
或者 Arc<dyn Waker>
之類的,但是這些都不比 raw pointer 靈活,所以最終 Rust 還是選擇定義一個包含函數指標的 struct。
這兩個關鍵字可以說是非同步程式設計領域的標誌。,但在 Rust 中這兩個關鍵字只是起到語法糖的作用,並不是非同步的核心。
async 用於快速建立 Future,不管是函數還是程式碼塊或者lambda表示式,都可以在前面加上 async 關鍵字快速變成 Future。對於
async fn bar() { foo().await; }
編譯器會自動生成類似下面的程式碼
fn bar() -> impl Future { std::future::from_generator(move |mut _task_context| { let _t = { match std::future::IntoFuture::into_future(foo()) { mut __awaitee => loop { match unsafe { std::future::Future::poll( std::pin::Pin::new_unchecked(&mut __awaitee), std::future::get_context(_task_context), ) } { std::task::Poll::Ready { 0: result } => break result, std::task::Poll::Pending {} => {} } _task_context = (yield ()); }, }; }; _t }) }
Tips:上面的程式碼可以在 Rust Playground 裡面點生成 HIR 看到。
前面講到 wake 的時候,其實現與具體的 executor 相關,但是我覺得如果不從 executor 的實現角度看一下比較難以理解,只能淺顯地知道 wake 是告訴 executor 準備再 poll 一遍。
Rust 中我知道的 async runtime lib 就是 futures-rs 和 tokio,前者在 GitHub 上是 rust-lang 官方組織推出的 repo,而後者雖然不清楚是否有官方參與,但是功能明顯比前者豐富,據我所知使用非同步的專案大部分都是使用 tokio。
我這裡選擇更簡單的 futures-rs 講一下其 executor 的實現,雖然其更加輕量但起碼也是官方推出的,有質量保證。
futures-rs 還是將標準庫裡面的 Waker 封裝成了 ArcWake trait,並且是 pub 的。和 raw pointer 打交道畢竟是 unsafe 的,與其滿篇的 unsafe 亂飛,不如將 unsafe 限制在一定的範圍內。
Waker 本質上是一個變數的指標(data)帶著四個函數指標的結構體(RawWakerVTable),因此在定義函數指標時只需要將指標強轉成實現某個 trait 的泛型,再呼叫該 trait 的對應方法不就可以了。以 wake 函數為例:
trait Wake { fn wake(self) { Wake::wake_by_ref(&self); } fn wake_by_ref(&self); } unsafe fn wake<T: WakeTrait>(data: *const ()) {//對應RawWakerVTable裡的函數指標 let v = data.cast::<T>(); v.wake(); }
這樣就實現了 Waker struct 到 Waker trait 的轉換。儘管如此,我們還需要一個結構體用來表示 Waker,滿足下列條件:
從而建立 WakeRef 結構體:
use std::mem::ManuallyDrop; use std::task::Waker; use std::marker::PhantomData; struct WakeRef<'a> { waker: ManuallyDrop<Waker>, _marker: PhantomData<&'a ()> }
如何根據參照建立 WakeRef 範例:
use std::task::{Waker, RawWaker}; fn get_waker<W: Wake>(wake: &W) -> WakeRef<'_> { let ptr = wake as *const _ as *const (); WakeRef { waker: ManuallyDrop::new(unsafe {Waker::from_raw(RawWaker::new(ptr, ...))}),//...省略的是建立RawWakerVTable的過程 _marker: PhantomData } }
實現 Deref
use std::task::Waker; impl std::ops::Deref for WakeRef<'_> { type Target = Waker; fn deref(&self) -> &Waker { &self.waker } }
因此對於某個實現 Wake 的型別來說,只需要傳入參照就可以用 Context::from_waker(&waker) 來建立 context 了。
在 futures-rs 中,由於涉及到多執行緒,所以上述的其實並不安全,需要將普通參照改成 Arc 用於在多執行緒之間傳遞,Wake trait 也變成了 ArcWake,
trait ArcWake: Send + Sync { fn wake(self: Arc<Self>) { Self::wake_by_ref(&self) } fn wake_by_ref(arc_self: &Arc<Self>); }
但是道理差不多。RawWakerVTable 的四個函數也與這個有關,以 wake 函數為例:
unsafe fn wake_arc_raw<T: ArcWake>(data: *const ()) { let arc: Arc<T> = Arc::from_raw(data.cast::<T>()); ArcWake::wake(arc); }
FuturesUnordered 是一個 Future 的託管容器,其有一條連結串列維護所有的 Future,再通過一個佇列維護所有需要執行的 Future(當然這裡都不是 collections 裡面那種普通的連結串列和佇列,由於 FuturesUnordered 其實要與單執行緒和執行緒池 executor 共用,所以這兩個資料結構其實還涉及很多原子化操作,在保證原子化且無鎖的前提下要設計一個連結串列還挺麻煩的)。
struct FuturesUnordered<Fut> { ready_to_run_queue: Arc<ReadyToRunQueue<Fut>>,//需要執行的Future佇列 head_all: AtomicPtr<Task<Fut>>,//所有Future組成的連結串列 is_terminated: AtomicBool }
這裡重點看 FuturesUnordered 如何實現 Waker,FuturesUnordered 將 Future 看作一個個 Task 。
struct Task<Fut> { future: UnsafeCell<Option<Fut>>, next_all: AtomicPtr<Task<Fut>>,//下一個Task節點 len_all: UnsafeCell<usize>,//連結串列長度 next_ready_to_run: AtomicPtr<Task<Fut>>,//下一個要執行的Task ready_to_run_queue: Weak<ReadyToRunQueue<Fut>>, queued: AtomicBool,//是否在Task連結串列內(Task執行時需要從連結串列上摘下) woken: AtomicBool//是否已經呼叫wake函數 }
為 Task 實現 ArcWake
impl<Fut> ArcWake for Task<Fut> { fn wake_by_ref(arc_self: &Arc<Self>) { let inner = match arc_self.ready_to_run_queue.upgrade() { Some(inner) => inner, None => return, }; arc_self.woken.store(true, Relaxed); let prev = arc_self.queued.swap(true, SeqCst); if !prev { inner.enqueue(Arc::as_ptr(arc_self)); inner.waker.wake(); } } }
當一個 Task 執行(被poll)時,其被從 FuturesUnordered 的 ready_to_run_queue 上摘下來,而在 wake 中又會重新放回去。因此,如果 Future 內部呼叫了 wake,則 Task 會再被放到 ready_to_run_queue 上執行,如果沒有則不會。
所以每個 Future 使用的 context 其實是來自於 Task:
let waker = Task::waker_ref(task); let mut cx = Context::from_waker(&waker); future.poll(&mut cx);
FuturesUnordered 本身實現了 Stream trait
trait Stream { type Item; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>; }
FuturesUnordered 輪流 poll ready_to_run_queue 裡面的 Future,根據返回結果返回:
值得注意的是,在第一種情況下,所有的 Future 都 poll 了一遍,FuturesUnordered 會呼叫一次 wake,告訴 executor FuturesUnordered 已經執行了一個輪迴,wake 具體的實現則取決於 executor。
單執行緒 executor 允許在單執行緒上覆用任意數量的 task,官方建議儘量在多I/O、只需要在 I/O 操作之間完成很少的工作的場景下使用。
struct LocalPool { pool: FuturesUnordered<LocalFutureObj<'static, ()>>, incoming: Rc<Incoming> }
單執行緒 executor 將 Waker 的 wake 與執行緒的 wake 繫結,當呼叫 wake 時,如果 executor 執行緒處於 park(即阻塞) 狀態,則 unpark 執行緒。
struct ThreadNotify { thread: std::thread::Thread, unparked: AtomicBool } impl ArcWake for ThreadNotify { fn wake_by_ref(arc_self: &Arc<Self>) { let unparked = arc_self.unparked.swap(true, Ordering::Release); if !unparked { arc_self.thread.unpark(); } } }
先看 LocalPool 如何定義 run 操作:
fn run_executor<T, F>(mut f: F) -> T where F: FnMut(&mut Context<'_>) -> Poll<T> { CURRENT_THREAD_NOTIFY.with(|thread_notify| { let waker = waker_ref(thread_notify); let mut cx = Context::from_waker(&waker); loop { if let Poll::Ready(t) = f(&mut cx) {//f決定了executor的執行方式,只要返回Ready就表明executor結束執行。 return t; } while !thread_notify.unparked.swap(false, Ordering::Acquire) { thread::park(); } } }) }
從 FutureUnordered 的角度來看,在 poll 一遍之後,如果需要繼續執行,則呼叫 wake,將 unparked token 置為 true,此時執行緒不會陷入阻塞;否則 executor 執行緒會主動陷入阻塞。由於 FutureUnordered 和 executor 實際處於同一執行緒,因此此時 executor 只能從其他執行緒 unpark。
這種設計節省了 CPU 資源,使得執行緒只在有 Future 需要 poll 時需要執行,沒有則掛起,再有了就又可以繼續執行。
執行緒池顯然要比單執行緒 executor 更加複雜,隨便一想就想到其至少要實現以下幾點:
對於第一點,使用多生產者單消費者管道 mpsc 進行 Future 的分發,實際的模型其實應該是多消費者單生產者,但是 Rust 並不提供這種管道,所以這裡使用管道配合 mutex 使用。
struct PoolState { tx: Mutex<mpsc::Sender<Message>>, rx: Mutex<mpsc::Receiver<Message>>, cnt: AtomicUsize,//clone size size: usize//pool size }
將 PoolState 包在 Arc 下就變成了 ThreadPool
struct ThreadPool { state: Arc<PoolState> }
當 executor spawn 一個新的 future 時,只需要將其封裝為一個 Task,然後傳入管道:
fn spwan_obj_ok(&self, future: FutureObj<'static, ()>) { let task = Task { future, wake_handle: Arc::new(WakeHandl {exec: self.clone(), mutex: UnparkMutex::new()}), exec: self.clone() }; self.state.send(Message::Run(task)); }
ThreadPool 也有自定義的 Task:
struct Task { future: FutureObj<'static ()>, exec: ThreadPool, wake_handle: Arc<WakeHandle> } struct WakeHandle { mutex: UnparkMutex<Task>, exec: ThreadPool }
Task 主要分為以下狀態:
為 Task 在不同狀態間的轉換,有些轉換是自動的,比如 poll 返回 Ready 時自動進入 COMPLETE 狀態,在 REPOLL 狀態會通過呼叫 wait 函數再次進入 POLLING 狀態重複執行一次 poll 函數;有些轉換則需要呼叫函數,比如從 WAITING 進入 POLLING 需要呼叫 Task 的 run 函數才能執行。poll 返回 Pending 時根據 Future 是否呼叫 wake 函數分別進入 REPOLL 和 WAITING 狀態。
impl Task { fn run(self) { let Self { mut future, wake_handle, mut exec } = self; let waker = waker_ref(&wake_handle); let mut cx = Context::from_waker(&waker); unsafe { wake_handle.mutex.start_poll(); loop { let res = future.poll_unpin(&mut cx); match res { Poll::Pending => {} Poll::Ready(()) => return wake_handle.mutex.complete(), } let task = Self { future, wake_handle: wake_handle.clone(), exec }; match wake_handle.mutex.wait(task) { Ok(()) => return, // we've waited Err(task) => { // someone's notified us future = task.future; exec = task.exec; } } } } } }
執行緒池 executor 和單執行緒 executor 對待 Pending 的方式,相同點在於如果 Future 沒有呼叫 wake,則放棄 Future,Future 要執行只能重新 spawn。不同點:
本文只是一篇介紹 Rust 非同步程式設計的原理,並通過具體的倉庫稍微深挖一下實現的過程。具體的原因還是官方檔案的介紹非常模糊,以我來說,第一次看到 Waker 完全不知道怎麼用,底層到底是幹了什麼,"Future be ready to run again" 又是什麼意思。如果不稍微看一下 runtime lib 的原始碼,有些東西很難理解。
本文只是簡單介紹了一個 futures-rs 的實現,executor 方面都忽略了很多細節。而 futures-rs 還有大量的擴充套件程式碼藏在 util 目錄下,但是這些東西一般看看檔案就知道大概做了什麼,懂得非同步的實現原理就知道大概是怎麼實現的,如果實在不懂還是可以去看原始碼。
到此這篇關於Rust 如何實現 async/await的文章就介紹到這了,更多相關Rust 實現 async await內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45