首頁 > 軟體

Golang Mutex互斥鎖原始碼分析

2022-10-10 14:00:17

前言

在上一篇文章中,我們一起學習瞭如何使用 Go 中的互斥鎖 Mutex,那麼本篇文章,我們就一起來探究下 Mutex 底層是如何實現的,知其然,更要知其所以然!

說明:本文中的範例,均是基於Go1.17 64位元機器

Mutex 特性

Mutex 就是一把互斥鎖,可以想象成一個令牌,有且只有這一個令牌,只有持有令牌的 goroutine 才能進入房間(臨界區),在房間內執行完任務後,走出房間並把令牌交出來,如果還有其餘的 goroutine 等著獲取這個令牌,讓他們再去搶這個令牌,搶到的重複上述過程,沒搶到的繼續等。

上述是從宏觀角度來看待互斥鎖的,但是在 Mutex 內部,有著非常複雜的搶鎖邏輯,Mutex 的發展也經歷了幾個版本,我們可以用拿令牌進餐廳吃飯來形象比喻下幾個主要版本的變化。

前提:餐廳一次只能進入一個人,餐廳有一個令牌,只有持有這個令牌的人才能進去;從餐廳出來後,需要把這個令牌歸還

版本一

餐廳在門外設定了一個隊伍,如果令牌空閒,拿著令牌去餐廳用餐;如果令牌不是空閒的,新來的人就要去隊伍後面排隊等待叫號。(不是空閒包含兩種情況:持有令牌的人在餐廳裡面,隊伍是空的;隊伍有人排隊。)

此版本的問題就是:只要令牌不是空閒的,新來的人必須直接去排隊,沒有商量的餘地。這樣看起來很公平,遵循先來後到的原則,但是對於餐廳來說,營業效率就會有所降低,即單位時間內接待顧客的數量(IO)會減少。為什麼這樣說呢,舉個例子,有個顧客從餐廳出來歸還令牌後,需要去等待佇列去叫號,被叫到號的這個人需要花費時間走到餐廳(獲取到CPU),這中間就浪費了不少時間。

版本二

為了提高營業效率,允許剛到門口的顧客和被叫到號的顧客一起去搶令牌,而不是直接去排隊,這樣就給了新人機會。舉個例子:當持有令牌的人從餐廳出來歸還令牌後,去等待佇列叫個號,如果此時有顧客剛到門口,被叫到號的和新到的顧客一起搶令牌,搶到的就可以直接進入餐廳,搶不到的接著去排隊,由於剛到的顧客離門口近(正在佔據CPU),被叫到號的顧客離得遠(需要等CPU),而且剛到的顧客可能不只一個,所以被叫到號的顧客很大概率搶不到令牌,可能還沒走到門口(還沒獲取到CPU)就被新來的顧客搶走了。不管怎麼樣,這樣提高了餐廳的效率,可以在單位時間內接待更多的客戶。

版本三

餐廳發現有些人用餐很快,如果讓搶不到令牌的先別直接去排隊,而是在門口轉悠會(當然不能一直轉悠,有條件限制,到了限制還是要去排隊),這種方式類似樂觀鎖,那麼有顧客從餐廳出來後,就不用去叫號了,直接讓門口的這些顧客繼續搶就行了,這樣就進一步提高了餐廳的執行效率,畢竟叫號真的太浪費時間了。

版本四

經過了多個版本的優化,餐廳的運營效率是越來越高了,但是有些人可要準備要罵娘了,這些人是誰呢,當然是已經在隊伍裡等待的那些人。由於給了新人機會,如果持續有新顧客來,那麼已經在隊伍裡的那些人永遠也拿不到令牌,可真的要餓死了。

Mutex 在這個版本只為三件事:公平、公平、還是tm的公平!堅持讓每一個人都不餓肚子的原則,餐廳搞出了一個新的模式:飢餓模式。如果有顧客等的時間超過了閾值(1ms),餐廳變為飢餓模式,在該模式下,所有新來的顧客直接去排隊,然後按照先來先到的順序,依次將令牌給等待佇列隊首的顧客。

那麼什麼時候由飢餓模式變為正常模式呢?當拿到令牌的顧客發現自己從等待到拿到令牌的時間小於閾值(1ms)了,或者等待隊伍沒人等了,此時餐廳就變為正常模式,畢竟上述兩個條件都說明當前餐廳競爭不是很激烈了。

同時這個版本修復了以前的一個問題:之前從等待佇列喚醒的顧客如果沒有搶到令牌,再回到佇列後是插到隊尾,這樣對已經排到第一位的顧客太不友好了。在這個版本中修復了該問題,喚醒的顧客如果沒有搶到令牌,直接插入到隊首,下次叫號還是他。

特性總結

經過了多次迭代,目前的版本有了如下特性:

給新人機會:讓剛來的顧客和從佇列喚醒的顧客一起去搶令牌,喚醒也是按照先來先到的原則喚醒;

保持樂觀態度:沒搶到不是直接去排隊,而是可以在門口轉悠會,說不定裡面的人馬上就出來了;

正常模式和飢餓模式的切換:為了公平起見,正常模式下給了新人機會,一起去搶令牌;飢餓模式下照顧老人,所有人老老實實排隊,按照先來先到的順序拿令牌。整個餐廳既保持了公平,又提高了執行效率,一切井然有序起來了。

迴歸正題

讓我們從餐廳回到 Go 中來,Mutex 有兩種模式:正常模式和飢餓模式:

正常模式下,如果當前鎖正在被持有,搶不到鎖的就會進入一個先進先出的等待佇列。當持有鎖的 goroutine 釋放鎖之後,按照從前到後的順序喚醒等待佇列的第一個等待者,但是不會直接給被喚醒者鎖,還是需要他去搶,即在喚醒等待佇列等待者這個時間,同時也會有正在執行且還未進入等待佇列的 goroutine 正在搶鎖 (數量可能還很多),這些都會和剛喚醒的等待者一起去搶,剛喚醒的可能還沒有分到 CPU,而正在執行的正在佔據了CPU,所以正在執行的更有可能獲取到鎖,被喚醒的等待者可能搶鎖失敗。如果等待者搶鎖失敗,他會被放到等待佇列的隊首,如果超過 1ms 都沒搶到鎖,就會從 正常模式 切換到 飢餓模式。

飢餓模式下,要釋放鎖的 goroutine 會將鎖直接交給等待佇列的第一個等待者,不需要去搶了,而且新來的 goroutine 也不會嘗試去搶鎖,直接加入到等待佇列的尾部。那麼什麼時候會從飢餓模式切換到正常模式呢:

(1)如果當前被喚醒的等待者獲得到鎖後,發現自己是佇列中的最後一個,佇列中沒有其他等待者了,此時會切換到正常模式

(2)如果當前被喚醒的等待者獲得到鎖後,發現自己總共的等待時間不超過 1ms,就獲得到鎖了,此時也會切換到正常模式

正常模式會帶來更高的吞吐量:一個 goroutine 要釋放鎖,更大可能會被正在執行的 goroutine 搶到,這就避免了協程的上下文切換,執行更多的 goroutine,但是有可能造成一個問題,就是鎖始終被新來的 goroutine 搶走,在等待佇列中的等待者始終搶不到鎖,這就會導致飢餓問題。飢餓模式就是為了解決這個問題出現的,保證了每個 goroutine 都有執行的機會,防止等待時間過長。

資料結構

// 互斥鎖
type Mutex struct {
 state int32  // 狀態
 sema  uint32  // 號誌
}


const (
 mutexLocked = 1 << iota // 1
 mutexWoken // 2
 mutexStarving // 4
 mutexWaiterShift = iota // 3

 starvationThresholdNs = 1e6 // 判斷是否要進入飢餓狀態的閾值
)

號誌 sema 就相當於我們說的令牌,state 是 int32 型別,一共 32位元,通過每個位記錄了當前的狀態:

state欄位

mutexLocked:當前是否已經上鎖,state & mutexLocked = 1 表示已經上鎖;

mutexWoken:標記當前是否有喚醒的 goroutine,state & mutexWoken = 1 表示有喚醒的goroutine;

mutexStarving:當前是否為飢餓狀態,state & mutexWoken = 1 表示處於飢餓狀態;

mutexWaiterShift:29位,state >> mutexWaiterShift 得到等待者的數量;

Lock()

Lock()加鎖方法分為兩部分,第一部分是 fast path,可以理解為快捷通道,如果當前鎖沒被佔用,直接獲得鎖返回;否則需要進入 slow path,判斷各種條件去競爭鎖,主要邏輯都在此處。

瞭解過原子操作的同學,對 CompareAndSwap(CAS) 應該不陌生,CompareAndSwapInt32(addr *int32, old, new int32) 有三個引數,如果地址 addr 指向的值與 old 相等,則將 addr 的值改為 new,否則不變,也就是說在我們修改前,如果有人修改了 addr 指向的值,本次修改就會失敗。

// 上鎖
func (m *Mutex) Lock() {
 // fastpath:期望當前鎖沒有被佔用,可以快速獲取到鎖, CAS 修改 state 最後一位的值為1(標記鎖是否被佔用)
 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
  return
 }
 // Slow path : 單獨抽出來放到一個函數裡,方便 fast path 被內聯
 m.lockSlow()
}
func (m *Mutex) lockSlow() {
 var waitStartTime int64 // // 記錄等待時間
 starving := false // 當前的 goroutine 是否已經飢餓了(如果已經飢餓,就會將 state 的飢餓狀態置為 1)
 awoke := false  // 當前的 goroutine 是否被喚醒的
 iter := 0 // 自旋次數
 old := m.state // 儲存當前的 state 狀態
 for {
    
    /*
    自旋:如果滿足如下條件,就會進入 if 語句,然後 continue,不斷自旋:
    1. 鎖被佔用,且不處於飢餓模式(飢餓狀態直接去排隊,不允許嘗試獲取鎖)
    2. 基於當前自旋的次數,再次自旋有意義 runtime_canSpin(iter)
    
    那麼退出自旋的條件也就是:
    1. 鎖被釋放了,當前處於沒被佔用狀態(說明等到了,該goroutine就會立即去獲取鎖)
  2. mutex進入了飢餓模式,不自旋了,沒意義(飢餓狀態會直接把鎖交給等待佇列隊首的goroutine)
  3. 不符合自旋狀態(自旋次數太多了,自旋失去了意義)
    
    如下程式碼是位元運算:
    mutexLocked|mutexStarving = 00000...101
    mutexLocked = 00000...001
    如果要滿足 old & 00000...101 = 00000...001,需要 old = ...0*1,即狀態為:鎖被佔用,且不處於飢餓狀態 
    
    runtime_canSpin(iter) 會根據自旋次數,判斷是否可以繼續自旋
    */
  if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
   
      /*
      如果 
       1. 當前goroutine不是被喚醒的 (awoke=false) 
       2. 鎖狀態喚醒標誌位為0(old&mutexWoken == 0) 
       3. 等待者數量不為0 (old>>mutexWaiterShift != 0  右移三位得到的就是等待者數量)
      
       那麼利用CAS,將 state 的喚醒標記置為1,標記自己是被喚醒的 (將state的喚醒標記置為1,說明外面有喚醒著的goroutine,那麼在釋放鎖的時候,就不去等待佇列叫號了,畢竟已經有喚醒的了)
       
       如果有其他 goroutine 已經設定了 state 的喚醒標記位,那麼本次就會失敗
      */

   if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
    awoke = true
   }
   runtime_doSpin()
      
      // 迭代次數加一
   iter++
      
      // 獲取最新的狀態
   old = m.state
      
      // 想再次自旋,看看鎖釋放了沒
   continue
  }
    
    // 到這裡,說明退出了自旋,當前鎖沒被佔用 或者  系統處於飢餓模式 或者 自旋次數太多導致不符合自旋條件
  
    // new 代表當前goroutine 基於當前狀態要設定的新狀態
  new := old
    
    // 只要不是飢餓狀態,就需要獲取鎖(飢餓狀態直接去排隊,不能搶鎖)
  if old&mutexStarving == 0 {
   new |= mutexLocked
  }
    
    // 鎖被佔用 或者 處於飢餓模式下,新增一個等待者
  if old&(mutexLocked|mutexStarving) != 0 {
   new += 1 << mutexWaiterShift
  }
    
    // 當前 goroutine 已經進入飢餓了,且鎖還沒有釋放,需要把 Mutex 的狀態改為飢餓狀態
  if starving && old&mutexLocked != 0 {
   new |= mutexStarving
  }
    
    // 如果是被喚醒的,把喚醒標誌位置0,表示外面沒有被喚醒的goroutine了(搶到就獲得鎖、搶不到就睡眠,把喚醒標誌置0)
  if awoke {
   
      // 由於是被喚醒的,new 裡面的 喚醒標記位一定是 1
   if new&mutexWoken == 0 {
    throw("sync: inconsistent mutex state")
   }
      
      // a &^ b 的意思就是 清零a中,ab都為1的位,即清除喚醒標記
   new &^= mutexWoken
  }
    
    /*
      利用CAS,將狀態設定為新的
      1. 如果是飢餓狀態,只增加一個等待者數量
      2. 正常狀態,加鎖標記置為 1,如果鎖已被佔用增加一個等待者數量
      3. 如果當前 goroutine 已經飢餓了,將 飢餓標記 置為 1
      4. 如果是被喚醒的,清除喚醒標記
    */
    
  if atomic.CompareAndSwapInt32(&m.state, old, new) {
      
      // 如果改狀態之前,鎖未被佔用 且 處於正常模式,那麼就相當於獲取到鎖了
   if old&(mutexLocked|mutexStarving) == 0 {
    break 
   }
      
      // 到這裡說明:1. 之前鎖被佔用  或者 2.之前是處於飢餓狀態 
      
      // 判斷之前是否等待過(是否從佇列裡喚醒的),之前等待過,再次排隊放在隊首
   queueLifo := waitStartTime != 0
      
      // 如果之前沒等過(新來的),設定等待起始時間當前時間
   if waitStartTime == 0 {
    waitStartTime = runtime_nanotime()
   }
      
     // 之前排過隊的老人,放到等待佇列隊首;新人放到隊尾,然後等待獲取號誌
   runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      
      // 鎖被釋放,goroutine 被喚醒
      
      // 設定當前 goroutine 飢餓狀態,如果之前已經飢餓,或者距離等待開始時間超過了 1ms,也變飢餓
   starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
      
      // 獲取最新的狀態
   old = m.state
      
      // 如果 state 飢餓標記為1,說明當前在飢餓模式,飢餓模式下被喚醒,已經獲取到鎖了;
      // 飢餓狀態下,釋放鎖沒有更新等待者數量和飢餓標記,需要獲得鎖的goroutine去更新狀態
   if old&mutexStarving != 0 {

        // 正確性校驗:
        // 1. 鎖還是鎖住的狀態(鎖已經釋放給當前goroutine了,不應該被鎖住)
    // 2. 或者有被喚醒的goroutine(飢餓模式下不應該有醒著的goroutine,都應該去乖乖等著)
    // 3. 或者當前goroutine 的等待者數量為0(當前goroutine就是等待者)
    // 這三種情況不應該出現,與預期狀態不符
        
    if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
     throw("sync: inconsistent mutex state")
    }
        
        // 加鎖,減去一個等待者
    delta := int32(mutexLocked - 1<<mutexWaiterShift)
        
        // 如果當前的 goroutine 非飢餓,或者等待者只有一個(也就是隻有當前goroutine,等待佇列空了),可以取消飢餓狀態,進入正常狀態
    if !starving || old>>mutexWaiterShift == 1 {
     delta -= mutexStarving
    }
        
        // 修改狀態:
        // 加鎖,減去一個等待者:m.state + mutexLocked - 1<<mutexWaiterShift : 
        // 滿足非飢餓條件,加鎖,減去一個等待者,取消飢餓狀態:
        // m.state + mutexLocked - 1<<mutexWaiterShift - mutexStarving: 
    atomic.AddInt32(&m.state, delta)
        
        // 飢餓模式下被喚醒,相當於獲得鎖了,可以結束
    break
   }
      
      // 之前是處於鎖被佔用且非飢餓狀態,被喚醒,去繼續搶鎖
   awoke = true
      
      // 新喚醒的,自旋數量置0
   iter = 0
  } else {
      
      // 修改新狀態失敗,狀態有更新,需要重試
   old = m.state
  }
 }
}

加鎖的這部分程式碼,新來的 goroutine 或者從佇列裡面喚醒的 goroutine 都會進入如下邏輯,相當於給新人機會

1.樂觀態度的自旋:判斷是否可以自旋,如果可以自旋,就自旋等待;如果有可能,把喚醒標記位置為1,標記外面有喚醒的 goroutine,釋放鎖的時候就不會去佇列裡面喚醒了,畢竟已經有人在等待了;

2.修改系統狀態:跳出自旋後,每個 goroutine 根據當前系統狀態修改系統狀態:

  • 非飢餓狀態,想要加鎖(如果本來就是加鎖狀態,將加鎖位 設定為 1 相當於不變)
  • 鎖被佔用 或者 處於飢餓模式下,新增一個等待者
  • 當前 goroutine 已經進入飢餓了,且鎖還沒有釋放,需要把 Mutex 的狀態改為飢餓狀態
  • 如果當前 goroutine 是被喚醒的,清除系統喚醒標記

3.利用 CAS 修改系統狀態,同一時刻只有一個 goroutine 能夠設定成功,但是設定成功並不代表獲取到鎖了:

  • 之前是非上鎖的正常狀態,設定成功說明本次搶鎖成功,可以返回去操作臨界區了;
  • 之前是上鎖狀態或者飢餓狀態,本次只是新增了一個等待者,然後根據是否是新來的,去佇列隊尾或者隊首排隊,等待叫號;

4.從佇列中被叫號喚醒,不一定是獲取到鎖了:

  • 當前是飢餓狀態,那麼一定是獲取到鎖了,因為飢餓狀態只把鎖給佇列的第一個 goroutine
  • 非飢餓狀態,將自己狀態置為喚醒,再去搶鎖,重複上述過程

問:系統會不會同時存在 喚醒標誌和飢餓標誌都為1 的情況呢?

答:不會。只有等待時間大於 1ms 的才會去設定飢餓標記,也就是隻有從佇列喚醒的才會去設定,那麼從佇列中喚醒的 goroutine ,自身的 awoke=true,每當去設定飢餓標記的時候會把喚醒標記清除。

Unlock()

Unlock()解鎖方法也分為兩部分,第一部分是 fast path,可以理解為快捷通道,直接把鎖狀態位清除,如果此時系統狀態恢復到初始狀態,說明沒有 goroutine 在搶鎖等鎖,直接返回,否則進入 slow path

slow path 會根據是否為飢餓狀態,做出不一樣的反應:

正常狀態:喚醒一個 goroutine 去搶鎖,等待者數量減一,並將喚醒狀態置為 1

飢餓狀態:直接喚醒等待佇列隊首的 goroutine,鎖的所有權直接移交(修改等待者數量、是否取消飢餓標記,由喚醒的 goroutine 去處理)。

func (m *Mutex) Unlock() {

 // Fast path: 把鎖標記清除
 new := atomic.AddInt32(&m.state, -mutexLocked)
 if new != 0 {
  // 清除完鎖標記,發現還有其他狀態,比如等待佇列不為空,需要喚醒其他 goroutine
  m.unlockSlow(new)
 }
}
func (m *Mutex) unlockSlow(new int32) {
  
  /* 狀態正確性校驗:
    1. 如果解鎖一個上鎖狀態的鎖,最後一位則為1,fast path 中 new 已經減去了1, 此時 new 最後一位應當為0
   2. 如果解鎖一個未上鎖狀態的鎖,最後一位則為0,fast path 中 new 已經減去了1, 此時 new 最後一位應當為1
    如果 (new+mutexLocked)&mutexLocked == 0,說明 new 當前最後一位是1,那麼就是解鎖了一個沒有上鎖的鎖,狀態有誤
 */
 if (new+mutexLocked)&mutexLocked == 0 {
  throw("sync: unlock of unlocked mutex")
 }
  
  // 正常模式,非飢餓,可能需要喚醒佇列中的 goroutine,飢餓狀態直接移交鎖
 if new&mutexStarving == 0 {
  old := new
  for {

      /* 系統運轉正常,鎖可以正確交接,可以直接返回了:
        1. 沒有等待者了 (沒有等鎖的了,去喚醒誰?)
        2. 有喚醒狀態的 goroutine  (自旋狀態的 goroutine,將喚醒狀態置為1)
       3. 有 goroutine 已經獲取了鎖 (Unlock方法已經將鎖標記置為了0,可能自旋的此時已經搶到了鎖)
      */
   if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
    return
   }
      
      // 沒有喚醒狀態的 goroutine,喚醒一個去搶鎖
   // 減去一個等待者,並且將 喚醒標記 置為 1
   new = (old - 1<<mutexWaiterShift) | mutexWoken
   if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 第二個引數為false, 喚醒隊首的 goroutine 去搶鎖(不一定能搶到)
    runtime_Semrelease(&m.sema, false, 1)
    return
   }
      
      // 上面 CAS 失敗,可能由於新增了一個等待者,for 迴圈重試
   old = m.state
  }
 } else {
  
    /*
     1. 第二個引數為 true,直接將鎖的所有權,交給等待佇列的第一個等待者
   2. 注意,此時沒有設定 mutexLocked =1 ,被喚醒的 goroutine 會設定
   3. 雖然沒有設定 mutexLocked ,但是飢餓模式下, Mutex 始終被認為是鎖住的,都會直接排隊等待移交鎖
    */
  runtime_Semrelease(&m.sema, true, 1)
 }
}

到此這篇關於Golang Mutex互斥鎖原始碼分析的文章就介紹到這了,更多相關Golang Mutex互斥鎖內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com