首頁 > 軟體

Golang Mutex互斥鎖深入理解

2022-08-02 14:04:45

引言

Golang的並行程式設計令人著迷,使用輕量的協程、基於CSP的channel、簡單的go func()就可以開始並行程式設計,在並行程式設計中,往往離不開鎖的概念。

本文介紹了常用的同步原語 sync.Mutex,同時從原始碼剖析它的結構與實現原理,最後簡單介紹了mutex在日常使用中可能遇到的問題,希望大家讀有所獲。

Mutex結構

Mutex執行時資料結構位於sync/mutex.go

type Mutex struct {
   state int32
   sema  uint32
}

其中state表示當前互斥鎖的狀態,sema表示 控制鎖狀態的號誌.

互斥鎖的狀態定義在常數中:

const (
   mutexLocked = 1 << iota // 1 ,處於鎖定狀態; 2^0
   mutexWoken // 2 ;從正常模式被從喚醒;  2^1
   mutexStarving // 4 ;處於飢餓狀態;    2^2
   mutexWaiterShift = iota // 3 ;獲得互斥鎖上等待的Goroutine個數需要左移的位數: 1 << mutexWaiterShift
   starvationThresholdNs = 1e6 // 鎖進入飢餓狀態的等待時間
)

0即其他狀態。

sema是一個組合,低三位分別表示鎖的三種狀態,高29位表示正在等待互斥鎖釋放的gorountine個數,和Java表示執行緒池狀態那部分有點類似

一個mutex物件僅佔用8個位元組,讓人不禁感嘆其設計的巧妙

飢餓模式和正常模式

正常模式

在正常模式下,等待的協程會按照先進先出的順序得到鎖 在正常模式下,剛被喚醒的goroutine與新建立的goroutine競爭時,大概率無法獲得鎖。

飢餓模式

為了避免正常模式下,goroutine被“餓死”的情況,go在1.19版本引入了飢餓模式,保證了Mutex的公平性

在飢餓模式中,互斥鎖會直接交給等待佇列最前面的goroutine。新的goroutine 在該狀態下不能獲取鎖、也不會進入自旋狀態,它們只會在佇列的末尾等待。

狀態的切換

在正常模式下,一旦Goroutine超過1ms沒有獲取到鎖,它就會將當前互斥鎖切換飢餓模式

如果一個goroutine 獲得了互斥鎖並且它在佇列的末尾或者它等待的時間少於 1ms,那麼當前的互斥鎖就會切換回正常模式。

加鎖和解鎖

加鎖

func (m *Mutex) Lock() {
   // Fast path: grab unlocked mutex.
   if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
      return
   }
   // 原註釋: Slow path (outlined so that the fast path can be inlined)
   // 將
   m.lockSlow()
}

可以看到,當前互斥鎖的狀態為0時,嘗試將當前鎖狀態設定為更新鎖定狀態,且這些操作是原子的。

若當前狀態不為0,則進入lockSlow方法
先定義了幾個引數

var waitStartTime int64
starving := false // 
awoke := false
iter := 0
old := m.state

隨後進入一個很大的for迴圈,讓我們來逐步分析

自旋

for {
     // 1 && 2 
   if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      //  3. 
      if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
         atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
         awoke = true
      }
      runtime_doSpin()
      iter++
      old = m.state
      continue
   }

old&(mutexLocked|mutexStarving) == mutexLocked

當且僅當當前鎖狀態為mutexLocked時,表示式為true

runtime_canSpin(iter) 是否滿足自旋條件

  • 執行在擁有多個CPU的機器上;
  • 當前Goroutine為了獲取該鎖進入自旋的次數小於四次;
  • 當前機器上至少存在一個正在執行的處理器 P,並且處理的執行佇列為空;

如果當前狀態下自旋是合理的,將awoke置為true,同時設定鎖狀態為mutexWoken,進入自旋邏輯

runtime_doSpin()會執行30次PAUSE指令,並且僅佔用CPU資源 程式碼位於:runtimeasm_amd64.s +567

//go:linkname sync_runtime_doSpin sync.runtime_doSpin
//go:nosplit
func sync_runtime_doSpin() {
   procyield(active_spin_cnt)
}
TEXT runtime·procyield(SB),NOSPLIT,$0-0
    MOVL    cycles+0(FP), AX
again:
    PAUSE 
    SUBL    $1, AX
    JNZ again
    RET

計算鎖的新狀態

停止了自旋後,

new := old
// 1. 
if old&mutexStarving == 0 {
   new |= mutexLocked
}
// 2.
if old&(mutexLocked|mutexStarving) != 0 {
   new += 1 << mutexWaiterShift
}
// 3 && 4. 
if starving && old&mutexLocked != 0 {
   new |= mutexStarving
}
// 5. 
if awoke {
   if new&mutexWoken == 0 {
      throw("sync: inconsistent mutex state")
   }
   new &^= mutexWoken
}
  • old&mutexStarving == 0 表明原來不是飢餓模式。如果是飢餓模式的話,其他goroutine不會執行接下來的程式碼,直接進入等待佇列隊尾
  • 如果原來是 mutexLocked 或者 mutexStarving模式,waiterCounts數加一
  • 如果被標記為飢餓狀態,且鎖狀態為mutexLocked的話,設定鎖的新狀態為飢餓狀態。
  • 被標記為飢餓狀態的前提是 被喚醒過且搶鎖失敗
  • 計算新狀態

更新鎖狀態

// 1.
if atomic.CompareAndSwapInt32(&m.state, old, new) {
      if old&(mutexLocked|mutexStarving) == 0 {
         break // locked the mutex with CAS
      }
      // 2. 
      queueLifo := waitStartTime != 0
      if waitStartTime == 0 {
         waitStartTime = runtime_nanotime()
      }
      // 3.
      runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      // 4.
      starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
      old = m.state
      // 5.
      if old&mutexStarving != 0 {
         /
         if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
            throw("sync: inconsistent mutex state")
         }
         delta := int32(mutexLocked - 1<<mutexWaiterShift)
         if !starving || old>>mutexWaiterShift == 1 {
            delta -= mutexStarving
         }
         atomic.AddInt32(&m.state, delta)
         break
      }
      awoke = true
      iter = 0
   } else {
      old = m.state
   }
}
  • 嘗試將鎖狀態設定為new 。這裡設定成功不代表上鎖成功,有可能new不為mutexLocked 或者是waiterCount數量的改變
  • waitStartTime不為0 說明當前goroutine已經等待過了,將當前goroutine放到等待佇列的隊頭
  • 走到這裡,會呼叫runtime_SemacquireMutex 方法使當前協程阻塞,runtime_SemacquireMutex方法中會不斷嘗試獲得鎖,並會陷入休眠 等待號誌釋放。
  • 當前協程可以獲得號誌,從runtime_SemacquireMutex方法中返回。此時協程會去更新starving標誌位:如果當前starving標誌位為true或者等待時間超過starvationThresholdNs ,將starving置為true

之後會按照飢餓模式與正常模式,走不同的邏輯

  • - 在正常模式下,這段程式碼會設定喚醒和飢餓標記、重置迭代次數並重新執行獲取鎖的迴圈;  
  • - 在飢餓模式下,當前 Goroutine 會獲得互斥鎖,如果等待佇列中只存在當前 Goroutine,互斥鎖還會從飢餓模式中退出;

解鎖

func (m *Mutex) Unlock() {
   // 1.
   new := atomic.AddInt32(&m.state, -mutexLocked)
   if new != 0 {
      // 2. 
      m.unlockSlow(new)
   }
}
  • 將鎖狀態的值增加 -mutexLocked 。如果新狀態不等於0,進入unlockSlow方法
func (m *Mutex) unlockSlow(new int32) {
    // 1. 
   if (new+mutexLocked)&mutexLocked == 0 {
      throw("sync: unlock of unlocked mutex")
   }
   if new&mutexStarving == 0 {
      old := new
      for {
      // 2.
         if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
            return
         }
         // 2.1.
         new = (old - 1<<mutexWaiterShift) | mutexWoken
         if atomic.CompareAndSwapInt32(&m.state, old, new) {
         // 2.2.
            runtime_Semrelease(&m.sema, false, 1)
            return
         }
         old = m.state
      }
   } else {
   // 3.
      runtime_Semrelease(&m.sema, true, 1)
   }
}

1.new+mutexLocked代表將鎖置為1,如果兩個狀態& 不為0,則說明重複解鎖.如果重複解鎖則丟擲panic

2. 如果等待者數量等於0,或者鎖的狀態已經變為mutexWoken、mutexStarving、mutexStarving,則直接返回

  • 將waiterCount數量-1,嘗試選擇一個goroutine喚醒
  • 嘗試更新鎖狀態,如果更新鎖狀態成功,則喚醒隊尾的一個gorountine

3. 如果不滿足 2的判斷條件,則進入飢餓模式,同時交出鎖的使用權

可能遇到的問題

鎖拷貝

mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()

此時mu2能夠正常解鎖,那麼我們再試試解鎖mu1

mu1 := &sync.Mutex{}
mu1.Lock()
mu2 := mu1
mu2.Unlock()
mu1.Unlock()

可以看到發生了error

panic導致沒有unlock

當lock()之後,可能由於程式碼問題導致程式發生了panic,那麼mutex無法被及時unlock(),由於其他協程還在等待鎖,此時可能觸發死鎖

func TestWithLock() {
   nums := 100
   wg := &sync.WaitGroup{}
   safeSlice := SafeSlice{
      s:    []int{},
      lock: new(sync.RWMutex),
   }
   i := 0
   for idx := 0; idx < nums; idx++ { // 並行nums個協程做append
      wg.Add(1)
      go func() {
         defer func() {
            if r := recover(); r != nil {
               log.Println("recover")
            }
            wg.Done()
         }()
         safeSlice.lock.Lock()
         safeSlice.s = append(safeSlice.s, i)
         if i == 98{
            panic("123")
         }
         i++
         safeSlice.lock.Unlock()
      }()
   }
   wg.Wait()
   log.Println(len(safeSlice.s))
}

修改:

func TestWithLock() {
   nums := 100
   wg := &sync.WaitGroup{}
   safeSlice := SafeSlice{
      s:    []int{},
      lock: new(sync.RWMutex),
   }
   i := 0
   for idx := 0; idx < nums; idx++ { // 並行nums個協程做append
      wg.Add(1)
      go func() {
         defer func() {
            if r := recover(); r != nil {
            }
            safeSlice.lock.Unlock()
            wg.Done()
         }()
         safeSlice.lock.Lock()
         safeSlice.s = append(safeSlice.s, i)
         if i == 98{
            panic("123")
         }
         i++
      }()
   }
   wg.Wait()
   log.Println(len(safeSlice.s))
}

以上就是Golang Mutex互斥鎖深入理解的詳細內容,更多關於Golang Mutex互斥鎖的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com