首頁 > 軟體

Golang號誌設計實現範例詳解

2022-08-02 14:04:43

開篇

在我們此前的文章 Golang Mutex 原理解析 中曾提到過,Mutex 的底層結構包含了兩個欄位,state 和 sema:

type Mutex struct {
    state int32 
    sema  uint32
}
  • state 代表互斥鎖的狀態,比如是否被鎖定;
  • sema 表示號誌,協程阻塞會等待該號誌,解鎖的協程釋放號誌從而喚醒等待號誌的協程。

這個 sema 就是 semaphore 號誌的意思。Golang 協程之間的搶鎖,實際上爭搶給Locked賦值的權利,能給 Locked 置為1,就說明搶鎖成功。搶不到就阻塞等待 sema 號誌,一旦持有鎖的協程解鎖,那麼等待的協程會依次被喚醒。

有意思的是,雖然 semaphore 在鎖的實現中起到了至關重要的作用,Golang 對號誌的實現卻是隱藏在 runtime 中,並沒有包含到標準庫裡來,在 src 原始碼中我們可以看到底層依賴的號誌相關函數。

// defined in package runtime
// Semacquire waits until *s > 0 and then atomically decrements it.
// It is intended as a simple sleep primitive for use by the synchronization
// library and should not be used directly.
func runtime_Semacquire(s *uint32)
// Semrelease atomically increments *s and notifies a waiting goroutine
// if one is blocked in Semacquire.
// It is intended as a simple wakeup primitive for use by the synchronization
// library and should not be used directly.
// If handoff is true, pass count directly to the first waiter.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_Semrelease's caller.
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)
  • runtime_Semacquire:阻塞等待直到 s 大於 0,然後立刻將 s 減去 1【原子操作】;
  • runtime_Semrelease:將 s 增加 1,然後通知一個阻塞在 runtime_Semacquire 的 goroutine【原子操作】。

兩個原子操作,一個 acquire,一個 release,其實就代表了對資源的獲取和釋放。Mutex 作為 sync 包的核心,支撐了 RWMutex,channel,singleflight 等多個並行控制的能力,而對號誌的管理又是 Mutex 的基礎。

雖然原始碼看不到,但 Golang 其實在擴充套件庫 golang.org/x/sync/semaphore 也提供了一套號誌的實現,我們可以由此來參考一下,理解 semaphore 的實現思路。

號誌

在看原始碼之前,我們先理清楚【號誌】設計背後的場景和原理。

號誌的概念是荷蘭電腦科學家 Edsger Dijkstra 在 1963 年左右提出來的,廣泛應用在不同的作業系統中。在系統中,會給每一個程序一個號誌,代表每個程序目前的狀態。未得到控制權的程序,會在特定的地方被迫停下來,等待可以繼續進行的訊號到來。

在 Mutex 依賴的號誌機制中我們可以看到,這裡本質就是依賴 sema 一個 uint32 的變數 + 原子操作來實現並行控制能力。當 goroutine 完成對號誌等待時,該變數 -1,當 goroutine 完成對號誌的釋放時,該變數 +1。

如果一個新的 goroutine 發現號志不大於 0,說明資源暫時沒有,就得阻塞等待。直到號誌 > 0,此時的語意是有新的資源,該goroutine就會結束等待,完成對號誌的 -1 並返回。注意我們上面有提到,runtime 支援的兩個方法都是原子性的,不用擔心兩個同時在等待的 goroutine 同時搶佔同一份資源。

典型的號誌場景是【圖書館借書】。假設學校圖書館某熱門書籍現在只有 100 本存貨,但是上萬學生都想借閱,怎麼辦?

直接買一萬本書是非常簡單粗暴的解法,但資源有限,這不是長久之計。

常見的解決方案很簡單:學生們先登記,一個一個來。我們先給 100 個同學發出,剩下的你們繼續等,等到什麼時候借書的同學看完了,把書還回來了,就給排隊等待的同學們發放。同時,為了避免超發,每發一個,都需要在維護的記錄裡將【餘量】減去 1,每還回來一個,就把【餘量】加上 1。

runtime_Semacquire 就是排隊等待借書,runtime_Semrelease 就是看完了把書歸還給圖書館。

另外需要注意,雖然我們上面舉例的增加/減小的粒度都是 1,但這本質上只是一種場景,事實上就算是圖書館借書,也完全有可能出現一個人同時借了兩本一模一樣的書。所以,號誌的設計需要支援 N 個資源的獲取和釋放。

所以,我們對於 acquire 和 release 兩種操作的語意如下:

  • release: 將號誌增加 n【保證原子性】;
  • acquire: 若號誌 < n,阻塞等待,直到號誌 >= n,此時將號誌的值減去 n【保證原子性】。

semaphore 擴充套件庫實現

這裡我們結合golang.org/x/sync/semaphore 原始碼來看看怎樣設計出來我們上面提到的號誌結構。

// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
	w := &Weighted{size: n}
	return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
	size    int64       // 最大資源數
	cur     int64       // 當前已被使用的資源
	mu      sync.Mutex
	waiters list.List   // 等待佇列
}

有意思的是,雖然包名是 semaphore,但是擴充套件庫裡真正給【號誌結構體】定義的名稱是 Weighted。從上面的定義我們可以看到,傳入初始資源個數 n(對應 size),就可以生成一個 Weighted 號誌結構。

Weighted 提供了三個方法來實現對號誌機制的支援:

  • Acquire

對應上面我們提到的 acquire 語意,注意我們提到過,抽象的來講,acquire 成功與否其實不太看返回值,而是隻要獲取不了就一直阻塞,如果返回了,就意味著獲取到了。

但在 Golang 實現當中,我們肯定不希望,如果發生了異常 case,導致一直阻塞在這裡。所以你可以看到 Acquire 的入參裡有個 context.Context,借用 context 的上下文控制能力,你可以對此進行 cancel, 可以設定 timeout 等待超時,就能對 acquire 行為進行更多約束。

所以,acquire 之後我們仍然需要檢查返回值 error,如果為 nil,代表正常獲取了資源。否則可能是 context 已經不合法了。

  • Release

跟上面提到的 release 語意完全一致,傳入你要釋放的資源數 n,保證原子性地增加號誌。

  • TryAcquire

這裡其實跟 sync 包中的各類 TryXXX 函數定位很像。並行的機制中大都包含 fast path 和 slow path,比如首個 goroutine 先來 acquire,那麼一定是能拿到的,後續再來請求的 goroutine 由於慢了一步,只能走 slow path 進行等待,自旋等操作。sync 包中絕大部分精華,都在於 slow path 的處理。fast path 大多是一個基於 atomic 包的原子操作,比如 CAS 就可以解決。

TryAcquire 跟 Acquire 的區別在於,雖然也是要資源,但是不等待。有了我就獲取,就減號誌,返回 trye。但是如果目前還沒有,我不會阻塞在這裡,而是直接返回 false。

下面我們逐個方法看看,Weighted 是怎樣實現的。

Acquire

// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
	s.mu.Lock()
	if s.size-s.cur >= n && s.waiters.Len() == 0 {
		s.cur += n
		s.mu.Unlock()
		return nil
	}
	if n > s.size {
		// Don't make other Acquire calls block on one that's doomed to fail.
		s.mu.Unlock()
		<-ctx.Done()
		return ctx.Err()
	}
	ready := make(chan struct{})
	w := waiter{n: n, ready: ready}
	elem := s.waiters.PushBack(w)
	s.mu.Unlock()
	select {
	case <-ctx.Done():
		err := ctx.Err()
		s.mu.Lock()
		select {
		case <-ready:
			// Acquired the semaphore after we were canceled.  Rather than trying to
			// fix up the queue, just pretend we didn't notice the cancelation.
			err = nil
		default:
			isFront := s.waiters.Front() == elem
			s.waiters.Remove(elem)
			// If we're at the front and there're extra tokens left, notify other waiters.
			if isFront && s.size > s.cur {
				s.notifyWaiters()
			}
		}
		s.mu.Unlock()
		return err
	case <-ready:
		return nil
	}
}

在閱讀之前回憶一下上面 Weighted 結構的定義,注意 Weighted 並沒有維護一個變數用來表示【當前剩餘的資源】,這一點是通過 size(初始化的時候設定,表示總資源數)減去 cur(當前已被使用的資源),二者作差得到的。

我們來拆解一下上面這段程式碼:

第一步:這是常規意義上的 fast path

s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
}
  • 先上鎖,保證並行安全;
  • 校驗如果 size - cur >= n,代表剩餘的資源是足夠,同時 waiters 這個等待佇列為空,代表沒有別的協程在等待;
  • 此時就沒什麼多想的,直接 cur 加上 n 即可,代表又消耗了 n 個資源,然後解鎖返回,很直接。

第二步:針對特定場景做提前剪枝

if n > s.size {
        // Don't make other Acquire calls block on one that's doomed to fail.
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
}

如果請求的資源數量,甚至都大於資源總數量了,說明這個協程心裡沒數。。。。就算我現在把所有初始化的資源都拿回來,也喂不飽你呀!!!那能怎麼辦,我就不煩勞後面流程處理了,直接等你的 context 什麼時候 Done,給你返回 context 的錯誤就行了,同時先解個鎖,別耽誤別的 goroutine 拿資源。

第三步:資源是夠的,只是現在沒有,那就把當前goroutine加到排隊的隊伍裡

ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()

這裡 ready 結構是個空結構體的 channel,僅僅是為了實現協程間通訊,通知什麼時候資源 ready,建立一個屬於這個 goroutine 的 waiter,然後塞到 Weighted 結構的等待佇列 waiters 裡。

搞定了以後直接解鎖,因為你已經來排隊了,手續處理完成,以後的路有別的通知機制保證,就沒必要在這兒拿著鎖阻塞新來的 goroutine 了,人家也得排隊。

第四步:排隊等待

select {
    case <-ctx.Done():
            err := ctx.Err()
            s.mu.Lock()
            select {
            case <-ready:
                    // Acquired the semaphore after we were canceled.  Rather than trying to
                    // fix up the queue, just pretend we didn't notice the cancelation.
                    err = nil
            default:
                    isFront := s.waiters.Front() == elem
                    s.waiters.Remove(elem)
                    // If we're at the front and there're extra tokens left, notify other waiters.
                    if isFront && s.size > s.cur {
                            s.notifyWaiters()
                    }
            }
            s.mu.Unlock()
            return err
    case <-ready:
            return nil
    }

一個 select 語句,只看兩種情況:1. 這個 goroutine 的 context 超時了;2. 拿到了資源,皆大歡喜。

重點在於 ctx.Done 分支裡 default 的處理。我們可以看到,如果是超時了,此時還沒拿到資源,首先會把當前 goroutine 從 waiters 等待佇列裡移除(合情合理,你既然因為自己的原因做不了主,沒法繼續等待了,就別耽誤別人事了)。

然後接著判斷,若這個 goroutine 同時也是排在最前的 goroutine,而且恰好現在有資源了,就趕緊通知隊裡的 goroutine 們,夥計們,現在有資源了,趕緊來拿。我們來看看這個 notifyWaiters 幹了什麼:

func (s *Weighted) notifyWaiters() {
	for {
		next := s.waiters.Front()
		if next == nil {
			break // No more waiters blocked.
		}
		w := next.Value.(waiter)
		if s.size-s.cur < w.n {
			// Not enough tokens for the next waiter.  We could keep going (to try to
			// find a waiter with a smaller request), but under load that could cause
			// starvation for large requests; instead, we leave all remaining waiters
			// blocked.
			//
			// Consider a semaphore used as a read-write lock, with N tokens, N
			// readers, and one writer.  Each reader can Acquire(1) to obtain a read
			// lock.  The writer can Acquire(N) to obtain a write lock, excluding all
			// of the readers.  If we allow the readers to jump ahead in the queue,
			// the writer will starve — there is always one token available for every
			// reader.
			break
		}
		s.cur += w.n
		s.waiters.Remove(next)
		close(w.ready)
	}
}

其實很簡單,遍歷 waiters 這個等待佇列,拿到排隊最前的 waiter,判斷資源夠不夠,如果夠了,增加 cur 變數,資源給你,然後把你從等待佇列裡移出去,再 close ready 那個goroutine 就行,算是通知一下。

重點部分在於,如果資源不夠怎麼辦?

想象一下現在的處境,Weighted 這個 semaphore 的確有資源,而目前要處理的這個 goroutine 的的確確就是排隊最靠前的,而且人家也沒獅子大開口,要比你總 size 還大的資源。但是,但是,好巧不巧,現在你要的數量,比我手上有的少。。。。

很無奈,那怎麼辦呢?

無非兩種解法:

  • 我先不管你,反正你要的不夠,我先看看你後面那個 goroutine 人家夠不夠,雖然你現在是排位第一個,但是也得繼續等著;
  • 沒辦法,你排第一,需求我就得滿足,所以我們都繼續等,等啥時候資源夠了就給你。

擴充套件庫實際選用的是第 2 種策略,即一定要滿足排在最前面的 goroutine,這裡的考慮在註釋裡有提到,如果直接繼續看後面的 goroutine 夠不夠,優先滿足後面的,在一些情況下會餓死有大資源要求的 goroutine,設計上不希望這樣的情況發生。

簡單說:要的多不是錯,既然你排第一,目前貨不多,那就大家一起阻塞等待,保障你的權利。

Release

// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
	s.mu.Lock()
	s.cur -= n
	if s.cur &lt; 0 {
		s.mu.Unlock()
		panic("semaphore: released more than held")
	}
	s.notifyWaiters()
	s.mu.Unlock()
}

Release 這裡的實現非常簡單,一把鎖保障不出現並行,然後將 cur 減去 n 即可,說明此時又有 n 個資源回到了貨倉。然後和上面 Acquire 一樣,呼叫 notifyWaiters,叫排隊第一的哥們(哦不,是 goroutine)來領東西了。

TryAcquire

// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
	s.mu.Lock()
	success := s.size-s.cur >= n && s.waiters.Len() == 0
	if success {
		s.cur += n
	}
	s.mu.Unlock()
	return success
}

其實就是 Acquire 方法的 fast path,只是返回了個 bool,標識是否獲取成功。

總結

今天我們瞭解了擴充套件庫 semaphore 對於號誌的封裝實現,整體程式碼加上註釋也才 100 多行,是非常好的學習材料,建議大家有空了對著原始碼再過一遍。Acquire 和 Release 的實現都很符合直覺。

其實,我們使用 buffered channel 其實也可以模擬出來 n 個號誌的效果,但就不具備 semaphore Weighted 這套實現裡面,一次獲取多個資源的能力了。

以上就是Golang號誌設計實現範例詳解的詳細內容,更多關於Go號誌設計的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com