<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在我們此前的文章 Golang Mutex 原理解析 中曾提到過,Mutex 的底層結構包含了兩個欄位,state 和 sema:
type Mutex struct { state int32 sema uint32 }
這個 sema 就是 semaphore 號誌的意思。Golang 協程之間的搶鎖,實際上爭搶給Locked
賦值的權利,能給 Locked
置為1,就說明搶鎖成功。搶不到就阻塞等待 sema
號誌,一旦持有鎖的協程解鎖,那麼等待的協程會依次被喚醒。
有意思的是,雖然 semaphore 在鎖的實現中起到了至關重要的作用,Golang 對號誌的實現卻是隱藏在 runtime 中,並沒有包含到標準庫裡來,在 src 原始碼中我們可以看到底層依賴的號誌相關函數。
// defined in package runtime // Semacquire waits until *s > 0 and then atomically decrements it. // It is intended as a simple sleep primitive for use by the synchronization // library and should not be used directly. func runtime_Semacquire(s *uint32) // Semrelease atomically increments *s and notifies a waiting goroutine // if one is blocked in Semacquire. // It is intended as a simple wakeup primitive for use by the synchronization // library and should not be used directly. // If handoff is true, pass count directly to the first waiter. // skipframes is the number of frames to omit during tracing, counting from // runtime_Semrelease's caller. func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
兩個原子操作,一個 acquire,一個 release,其實就代表了對資源的獲取和釋放。Mutex 作為 sync 包的核心,支撐了 RWMutex,channel,singleflight 等多個並行控制的能力,而對號誌的管理又是 Mutex 的基礎。
雖然原始碼看不到,但 Golang 其實在擴充套件庫 golang.org/x/sync/semaphore
也提供了一套號誌的實現,我們可以由此來參考一下,理解 semaphore 的實現思路。
在看原始碼之前,我們先理清楚【號誌】設計背後的場景和原理。
號誌的概念是荷蘭電腦科學家 Edsger Dijkstra 在 1963 年左右提出來的,廣泛應用在不同的作業系統中。在系統中,會給每一個程序一個號誌,代表每個程序目前的狀態。未得到控制權的程序,會在特定的地方被迫停下來,等待可以繼續進行的訊號到來。
在 Mutex 依賴的號誌機制中我們可以看到,這裡本質就是依賴 sema 一個 uint32 的變數 + 原子操作來實現並行控制能力。當 goroutine 完成對號誌等待時,該變數 -1,當 goroutine 完成對號誌的釋放時,該變數 +1。
如果一個新的 goroutine 發現號志不大於 0,說明資源暫時沒有,就得阻塞等待。直到號誌 > 0,此時的語意是有新的資源,該goroutine就會結束等待,完成對號誌的 -1 並返回。注意我們上面有提到,runtime 支援的兩個方法都是原子性的,不用擔心兩個同時在等待的 goroutine 同時搶佔同一份資源。
典型的號誌場景是【圖書館借書】。假設學校圖書館某熱門書籍現在只有 100 本存貨,但是上萬學生都想借閱,怎麼辦?
直接買一萬本書是非常簡單粗暴的解法,但資源有限,這不是長久之計。
常見的解決方案很簡單:學生們先登記,一個一個來。我們先給 100 個同學發出,剩下的你們繼續等,等到什麼時候借書的同學看完了,把書還回來了,就給排隊等待的同學們發放。同時,為了避免超發,每發一個,都需要在維護的記錄裡將【餘量】減去 1,每還回來一個,就把【餘量】加上 1。
runtime_Semacquire 就是排隊等待借書,runtime_Semrelease 就是看完了把書歸還給圖書館。
另外需要注意,雖然我們上面舉例的增加/減小的粒度都是 1,但這本質上只是一種場景,事實上就算是圖書館借書,也完全有可能出現一個人同時借了兩本一模一樣的書。所以,號誌的設計需要支援 N 個資源的獲取和釋放。
所以,我們對於 acquire 和 release 兩種操作的語意如下:
這裡我們結合golang.org/x/sync/semaphore
原始碼來看看怎樣設計出來我們上面提到的號誌結構。
// NewWeighted creates a new weighted semaphore with the given // maximum combined weight for concurrent access. func NewWeighted(n int64) *Weighted { w := &Weighted{size: n} return w } // Weighted provides a way to bound concurrent access to a resource. // The callers can request access with a given weight. type Weighted struct { size int64 // 最大資源數 cur int64 // 當前已被使用的資源 mu sync.Mutex waiters list.List // 等待佇列 }
有意思的是,雖然包名是 semaphore,但是擴充套件庫裡真正給【號誌結構體】定義的名稱是 Weighted。從上面的定義我們可以看到,傳入初始資源個數 n(對應 size),就可以生成一個 Weighted 號誌結構。
Weighted 提供了三個方法來實現對號誌機制的支援:
對應上面我們提到的 acquire 語意,注意我們提到過,抽象的來講,acquire 成功與否其實不太看返回值,而是隻要獲取不了就一直阻塞,如果返回了,就意味著獲取到了。
但在 Golang 實現當中,我們肯定不希望,如果發生了異常 case,導致一直阻塞在這裡。所以你可以看到 Acquire 的入參裡有個 context.Context,借用 context 的上下文控制能力,你可以對此進行 cancel, 可以設定 timeout 等待超時,就能對 acquire 行為進行更多約束。
所以,acquire 之後我們仍然需要檢查返回值 error,如果為 nil,代表正常獲取了資源。否則可能是 context 已經不合法了。
跟上面提到的 release 語意完全一致,傳入你要釋放的資源數 n,保證原子性地增加號誌。
這裡其實跟 sync 包中的各類 TryXXX 函數定位很像。並行的機制中大都包含 fast path 和 slow path,比如首個 goroutine 先來 acquire,那麼一定是能拿到的,後續再來請求的 goroutine 由於慢了一步,只能走 slow path 進行等待,自旋等操作。sync 包中絕大部分精華,都在於 slow path 的處理。fast path 大多是一個基於 atomic 包的原子操作,比如 CAS 就可以解決。
TryAcquire 跟 Acquire 的區別在於,雖然也是要資源,但是不等待。有了我就獲取,就減號誌,返回 trye。但是如果目前還沒有,我不會阻塞在這裡,而是直接返回 false。
下面我們逐個方法看看,Weighted 是怎樣實現的。
// Acquire acquires the semaphore with a weight of n, blocking until resources // are available or ctx is done. On success, returns nil. On failure, returns // ctx.Err() and leaves the semaphore unchanged. // // If ctx is already done, Acquire may still succeed without blocking. func (s *Weighted) Acquire(ctx context.Context, n int64) error { s.mu.Lock() if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil } if n > s.size { // Don't make other Acquire calls block on one that's doomed to fail. s.mu.Unlock() <-ctx.Done() return ctx.Err() } ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock() select { case <-ctx.Done(): err := ctx.Err() s.mu.Lock() select { case <-ready: // Acquired the semaphore after we were canceled. Rather than trying to // fix up the queue, just pretend we didn't notice the cancelation. err = nil default: isFront := s.waiters.Front() == elem s.waiters.Remove(elem) // If we're at the front and there're extra tokens left, notify other waiters. if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return err case <-ready: return nil } }
在閱讀之前回憶一下上面 Weighted 結構的定義,注意 Weighted 並沒有維護一個變數用來表示【當前剩餘的資源】,這一點是通過 size(初始化的時候設定,表示總資源數)減去 cur(當前已被使用的資源),二者作差得到的。
我們來拆解一下上面這段程式碼:
第一步:這是常規意義上的 fast path
s.mu.Lock() if s.size-s.cur >= n && s.waiters.Len() == 0 { s.cur += n s.mu.Unlock() return nil }
第二步:針對特定場景做提前剪枝
if n > s.size { // Don't make other Acquire calls block on one that's doomed to fail. s.mu.Unlock() <-ctx.Done() return ctx.Err() }
如果請求的資源數量,甚至都大於資源總數量了,說明這個協程心裡沒數。。。。就算我現在把所有初始化的資源都拿回來,也喂不飽你呀!!!那能怎麼辦,我就不煩勞後面流程處理了,直接等你的 context 什麼時候 Done,給你返回 context 的錯誤就行了,同時先解個鎖,別耽誤別的 goroutine 拿資源。
第三步:資源是夠的,只是現在沒有,那就把當前goroutine加到排隊的隊伍裡
ready := make(chan struct{}) w := waiter{n: n, ready: ready} elem := s.waiters.PushBack(w) s.mu.Unlock()
這裡 ready 結構是個空結構體的 channel,僅僅是為了實現協程間通訊,通知什麼時候資源 ready,建立一個屬於這個 goroutine 的 waiter,然後塞到 Weighted 結構的等待佇列 waiters 裡。
搞定了以後直接解鎖,因為你已經來排隊了,手續處理完成,以後的路有別的通知機制保證,就沒必要在這兒拿著鎖阻塞新來的 goroutine 了,人家也得排隊。
第四步:排隊等待
select { case <-ctx.Done(): err := ctx.Err() s.mu.Lock() select { case <-ready: // Acquired the semaphore after we were canceled. Rather than trying to // fix up the queue, just pretend we didn't notice the cancelation. err = nil default: isFront := s.waiters.Front() == elem s.waiters.Remove(elem) // If we're at the front and there're extra tokens left, notify other waiters. if isFront && s.size > s.cur { s.notifyWaiters() } } s.mu.Unlock() return err case <-ready: return nil }
一個 select 語句,只看兩種情況:1. 這個 goroutine 的 context 超時了;2. 拿到了資源,皆大歡喜。
重點在於 ctx.Done 分支裡 default 的處理。我們可以看到,如果是超時了,此時還沒拿到資源,首先會把當前 goroutine 從 waiters 等待佇列裡移除(合情合理,你既然因為自己的原因做不了主,沒法繼續等待了,就別耽誤別人事了)。
然後接著判斷,若這個 goroutine 同時也是排在最前的 goroutine,而且恰好現在有資源了,就趕緊通知隊裡的 goroutine 們,夥計們,現在有資源了,趕緊來拿。我們來看看這個 notifyWaiters 幹了什麼:
func (s *Weighted) notifyWaiters() { for { next := s.waiters.Front() if next == nil { break // No more waiters blocked. } w := next.Value.(waiter) if s.size-s.cur < w.n { // Not enough tokens for the next waiter. We could keep going (to try to // find a waiter with a smaller request), but under load that could cause // starvation for large requests; instead, we leave all remaining waiters // blocked. // // Consider a semaphore used as a read-write lock, with N tokens, N // readers, and one writer. Each reader can Acquire(1) to obtain a read // lock. The writer can Acquire(N) to obtain a write lock, excluding all // of the readers. If we allow the readers to jump ahead in the queue, // the writer will starve — there is always one token available for every // reader. break } s.cur += w.n s.waiters.Remove(next) close(w.ready) } }
其實很簡單,遍歷 waiters 這個等待佇列,拿到排隊最前的 waiter,判斷資源夠不夠,如果夠了,增加 cur 變數,資源給你,然後把你從等待佇列裡移出去,再 close ready 那個goroutine 就行,算是通知一下。
重點部分在於,如果資源不夠怎麼辦?
想象一下現在的處境,Weighted 這個 semaphore 的確有資源,而目前要處理的這個 goroutine 的的確確就是排隊最靠前的,而且人家也沒獅子大開口,要比你總 size 還大的資源。但是,但是,好巧不巧,現在你要的數量,比我手上有的少。。。。
很無奈,那怎麼辦呢?
無非兩種解法:
擴充套件庫實際選用的是第 2 種策略,即一定要滿足排在最前面的 goroutine,這裡的考慮在註釋裡有提到,如果直接繼續看後面的 goroutine 夠不夠,優先滿足後面的,在一些情況下會餓死有大資源要求的 goroutine,設計上不希望這樣的情況發生。
簡單說:要的多不是錯,既然你排第一,目前貨不多,那就大家一起阻塞等待,保障你的權利。
// Release releases the semaphore with a weight of n. func (s *Weighted) Release(n int64) { s.mu.Lock() s.cur -= n if s.cur < 0 { s.mu.Unlock() panic("semaphore: released more than held") } s.notifyWaiters() s.mu.Unlock() }
Release 這裡的實現非常簡單,一把鎖保障不出現並行,然後將 cur 減去 n 即可,說明此時又有 n 個資源回到了貨倉。然後和上面 Acquire 一樣,呼叫 notifyWaiters,叫排隊第一的哥們(哦不,是 goroutine)來領東西了。
// TryAcquire acquires the semaphore with a weight of n without blocking. // On success, returns true. On failure, returns false and leaves the semaphore unchanged. func (s *Weighted) TryAcquire(n int64) bool { s.mu.Lock() success := s.size-s.cur >= n && s.waiters.Len() == 0 if success { s.cur += n } s.mu.Unlock() return success }
其實就是 Acquire 方法的 fast path,只是返回了個 bool,標識是否獲取成功。
今天我們瞭解了擴充套件庫 semaphore 對於號誌的封裝實現,整體程式碼加上註釋也才 100 多行,是非常好的學習材料,建議大家有空了對著原始碼再過一遍。Acquire 和 Release 的實現都很符合直覺。
其實,我們使用 buffered channel 其實也可以模擬出來 n 個號誌的效果,但就不具備 semaphore Weighted 這套實現裡面,一次獲取多個資源的能力了。
以上就是Golang號誌設計實現範例詳解的詳細內容,更多關於Go號誌設計的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45