首頁 > 軟體

Go語言的互斥鎖的詳細使用

2022-06-29 14:03:01

前言

當提到並行程式設計、多執行緒程式設計時,都會在第一時間想到鎖,鎖是並行程式設計中的同步原語,他可以保證多執行緒在存取同一片記憶體時不會出現競爭來保證並行安全;在Go語言中更推崇由channel通過通訊的方式實現共用記憶體,這個設計點與許多主流程式語言不一致,但是Go語言也在sync包中提供了互斥鎖、讀寫鎖,畢竟channel也不能滿足所有場景,互斥鎖、讀寫鎖的使用與我們是分不開的,所以接下來我會分兩篇來分享互斥鎖、讀寫鎖是怎麼實現的,本文我們先來看看互斥鎖的實現。

本文基於Golang版本:1.18

Go語言互斥鎖設計實現

mutex介紹

sync 包下的mutex就是互斥鎖,其提供了三個公開方法:呼叫Lock()獲得鎖,呼叫Unlock()釋放鎖,在Go1.18新提供了TryLock()方法可以非阻塞式的取鎖操作:

  • Lock():呼叫Lock方法進行加鎖操作,使用時應注意在同一個goroutine中必須在鎖釋放時才能再次上鎖,否則會導致程式panic
  • Unlock():呼叫UnLock方法進行解鎖操作,使用時應注意未加鎖的時候釋放鎖會引起程式panic,已經鎖定的 Mutex 並不與特定的 goroutine 相關聯,這樣可以利用一個 goroutine 對其加鎖,再利用其他 goroutine 對其解鎖。
  • tryLock():呼叫TryLock方法嘗試獲取鎖,當鎖被其他 goroutine 佔有,或者當前鎖正處於飢餓模式,它將立即返回 false,當鎖可用時嘗試獲取鎖,獲取失敗不會自旋/阻塞,也會立即返回false;

mutex的結構比較簡單隻有兩個欄位:

type Mutex struct {
	state int32
	sema  uint32
}
  • state:表示當前互斥鎖的狀態,複合型欄位;
  • sema:號誌變數,用來控制等待goroutine的阻塞休眠和喚醒

初看結構你可能有點懵逼,互斥鎖應該是一個複雜東西,怎麼就兩個欄位就可以實現?那是因為設計使用了位的方式來做標誌,state的不同位分別表示了不同的狀態,使用最小的記憶體來表示更多的意義,其中低三位由低到高分別表示mutexedmutexWokenmutexStarving,剩下的位則用來表示當前共有多少個goroutine在等待鎖:

const (
   mutexLocked = 1 << iota // 表示互斥鎖的鎖定狀態
   mutexWoken // 表示從正常模式被從喚醒
   mutexStarving // 當前的互斥鎖進入飢餓狀態
   mutexWaiterShift = iota // 當前互斥鎖上等待者的數量
)

mutex最開始的實現只有正常模式,在正常模式下等待的執行緒按照先進先出的方式獲取鎖,但是新建立的gouroutine會與剛被喚起的 goroutine競爭,會導致剛被喚起的 goroutine獲取不到鎖,這種情況的出現會導致執行緒長時間被阻塞下去,所以Go語言在1.9中進行了優化,引入了飢餓模式,當goroutine超過1ms沒有獲取到鎖,就會將當前互斥鎖切換到飢餓模式,在飢餓模式中,互斥鎖會直接交給等待佇列最前面的goroutine,新的 goroutine 在該狀態下不能獲取鎖、也不會進入自旋狀態,它們只會在佇列的末尾等待。如果一個 goroutine 獲得了互斥鎖並且它在佇列的末尾或者它等待的時間少於 1ms,那麼當前的互斥鎖就會切換回正常模式。

mutex的基本情況大家都已經掌握了,接下來我們從加鎖到解鎖來分析mutex是如何實現的;

Lock加鎖

Lock方法入手:

func (m *Mutex) Lock() {
	// 判斷當前鎖的狀態,如果鎖是完全空閒的,即m.state為0,則對其加鎖,將m.state的值賦為1
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return
	}
	// Slow path (outlined so that the fast path can be inlined)
	m.lockSlow()
}

上面的程式碼主要兩部分邏輯:

  • 通過CAS判斷當前鎖的狀態,也就是state欄位的低1位,如果鎖是完全空閒的,即m.state為0,則對其加鎖,將m.state的值賦為1
  • 若當前鎖已經被其他goroutine加鎖,則進行lockSlow方法嘗試通過自旋或飢餓狀態下飢餓goroutine競爭方式等待鎖的釋放,我們在下面介紹lockSlow方法;

lockSlow程式碼段有點長,主體是一個for迴圈,其主要邏輯可以分為以下三部分:

  • 狀態初始化
  • 判斷是否符合自旋條件,符合條件進行自旋操作
  • 搶鎖準備期望狀態
  • 通過CAS操作更新期望狀態

初始化狀態

locakSlow方法內會先初始化5個欄位:

func (m *Mutex) lockSlow() {
	var waitStartTime int64 
	starving := false
	awoke := false
	iter := 0
	old := m.state
	........
}
  • waitStartTime用來計算waiter的等待時間
  • starving是飢餓模式標誌,如果等待時長超過1ms,starving置為true,後續操作會把Mutex也標記為飢餓狀態。
  • awoke表示協程是否喚醒,當goroutine在自旋時,相當於CPU上已經有在等鎖的協程。為避免Mutex解鎖時再喚醒其他協程,自旋時要嘗試把Mutex置為喚醒狀態,Mutex處於喚醒狀態後 要把本協程的 awoke 也置為true。
  • iter用於記錄協程的自旋次數,
  • old記錄當前鎖的狀態

自旋

自旋的判斷條件非常苛刻:

for {
    // 判斷是否允許進入自旋 兩個條件,條件1是當前鎖不能處於飢餓狀態
    // 條件2是在runtime_canSpin內實現,其邏輯是在多核CPU執行,自旋的次數小於4
		if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
      // !awoke 判斷當前goroutine不是在喚醒狀態
      // old&mutexWoken == 0 表示沒有其他正在喚醒的goroutine
      // old>>mutexWaiterShift != 0 表示等待佇列中有正在等待的goroutine
      // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) 嘗試將當前鎖的低2位的Woken狀態位設定為1,表示已被喚醒, 這是為了通知在解鎖Unlock()中不要再喚醒其他的waiter了
			if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
				atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
					// 設定當前goroutine喚醒成功
          awoke = true
			}
      // 進行自旋
			runtime_doSpin()
      // 自旋次數
			iter++
      // 記錄當前鎖的狀態
			old = m.state
			continue
		}
}

自旋這裡的條件還是很複雜的,我們想讓當前goroutine進入自旋轉的原因是我們樂觀的認為當前正在持有鎖的goroutine能在較短的時間內歸還鎖,所以我們需要一些條件來判斷,mutex的判斷條件我們在文字描述一下:

old&(mutexLocked|mutexStarving) == mutexLocked 用來判斷鎖是否處於正常模式且加鎖,為什麼要這麼判斷呢?

mutexLocked 二進位制表示為 0001

mutexStarving 二進位制表示為 0100

mutexLocked|mutexStarving 二進位制為 0101. 使用0101在當前狀態做 &操作,如果當前處於飢餓模式,低三位一定會是1,如果當前處於加鎖模式,低1位一定會是1,所以使用該方法就可以判斷出當前鎖是否處於正常模式且加鎖;

runtime_canSpin()方法用來判斷是否符合自旋條件:

// / go/go1.18/src/runtime/proc.go
const active_spin     = 4
func sync_runtime_canSpin(i int) bool {
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

自旋條件如下:

  • 自旋的次數要在4次以內
  • CPU必須為多核
  • GOMAXPROCS>1
  • 當前機器上至少存在一個正在執行的處理器 P 並且處理的執行佇列為空;

判斷當前goroutine可以進自旋後,呼叫runtime_doSpin方法進行自旋:

const active_spin_cnt = 30
func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}
// asm_amd64.s
TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

迴圈次數被設定為30次,自旋操作就是執行30次PAUSE指令,通過該指令佔用CPU並消費CPU時間,進行忙等待;

這就是整個自旋操作的邏輯,這個就是為了優化 等待阻塞->喚醒->參與搶佔鎖這個過程不高效,所以使用自旋進行優化,在期望在這個過程中鎖被釋放。

搶鎖準備期望狀態

自旋邏輯處理好後開始根據上下文計算當前互斥鎖最新的狀態,根據不同的條件來計算mutexLockedmutexStarvingmutexWokenmutexWaiterShift

首先計算mutexLocked的值:

    // 基於old狀態宣告到一個新狀態
		new := old
		// 新狀態處於非飢餓的條件下才可以加鎖
		if old&mutexStarving == 0 {
			new |= mutexLocked
		}

計算mutexWaiterShift的值:

//如果old已經處於加鎖或者飢餓狀態,則等待者按照FIFO的順序排隊
if old&(mutexLocked|mutexStarving) != 0 {
			new += 1 << mutexWaiterShift
		}

計算mutexStarving的值:

// 如果當前鎖處於飢餓模式,並且已被加鎖,則將低3位的Starving狀態位設定為1,表示飢餓
if starving && old&mutexLocked != 0 {
			new |= mutexStarving
		}

計算mutexWoken的值:

// 當前goroutine的waiter被喚醒,則重置flag
if awoke {
			// 喚醒狀態不一致,直接丟擲異常
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
     // 新狀態清除喚醒標記,因為後面的goroutine只會阻塞或者搶鎖成功
     // 如果是掛起狀態,那就需要等待其他釋放鎖的goroutine來喚醒。
     // 假如其他goroutine在unlock的時候發現Woken的位置不是0,則就不會去喚醒,那該goroutine就無法在被喚醒後加鎖
			new &^= mutexWoken
}

通過CAS操作更新期望狀態

上面我們已經得到了鎖的期望狀態,接下來通過CAS將鎖的狀態進行更新:

// 這裡嘗試將鎖的狀態更新為期望狀態
if atomic.CompareAndSwapInt32(&m.state, old, new) {
  // 如果原來鎖的狀態是沒有加鎖的並且不處於飢餓狀態,則表示當前goroutine已經獲取到鎖了,直接推出即可
			if old&(mutexLocked|mutexStarving) == 0 {
				break // locked the mutex with CAS
			}
			// 到這裡就表示goroutine還沒有獲取到鎖,waitStartTime是goroutine開始等待的時間,waitStartTime != 0就表示當前goroutine已經等待過了,則需要將其放置在等待佇列隊頭,否則就排到佇列隊尾
			queueLifo := waitStartTime != 0
			if waitStartTime == 0 {
				waitStartTime = runtime_nanotime()
			}
      // 阻塞等待
			runtime_SemacquireMutex(&m.sema, queueLifo, 1)
      // 被號誌喚醒後檢查當前goroutine是否應該表示為飢餓
     // 1. 當前goroutine已經飢餓
     // 2. goroutine已經等待了1ms以上
			starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
  // 再次獲取當前鎖的狀態
			old = m.state
   // 如果當前處於飢餓模式,
			if old&mutexStarving != 0 {
        // 如果當前鎖既不是被獲取也不是被喚醒狀態,或者等待佇列為空 這代表鎖狀態產生了不一致的問題
				if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
					throw("sync: inconsistent mutex state")
				}
        // 當前goroutine已經獲取了鎖,等待佇列-1
				delta := int32(mutexLocked - 1<<mutexWaiterShift
         // 當前goroutine非飢餓狀態 或者 等待佇列只剩下一個waiter,則退出飢餓模式(清除飢餓標識位)              
				if !starving || old>>mutexWaiterShift == 1 {
					delta -= mutexStarving
				}
        // 更新狀態值並中止for迴圈,拿到鎖退出
				atomic.AddInt32(&m.state, delta)
				break
			}
      // 設定當前goroutine為喚醒狀態,且重置自璇次數
			awoke = true
			iter = 0
		} else {
      // 鎖被其他goroutine佔用了,還原狀態繼續for迴圈
			old = m.state
		}

這塊的邏輯很複雜,通過CAS來判斷是否獲取到鎖,沒有通過 CAS 獲得鎖,會呼叫 runtime.sync_runtime_SemacquireMutex通過號誌保證資源不會被兩個 goroutine 獲取,runtime.sync_runtime_SemacquireMutex會在方法中不斷嘗試獲取鎖並陷入休眠等待號誌的釋放,一旦當前 goroutine 可以獲取號誌,它就會立刻返回,如果是新來的goroutine,就需要放在隊尾;如果是被喚醒的等待鎖的goroutine,就放在隊頭,整個過程還需要啃程式碼來加深理解。

解鎖

相對於加鎖操作,解鎖的邏輯就沒有那麼複雜了,接下來我們來看一看UnLock的邏輯:

func (m *Mutex) Unlock() {
	// Fast path: drop lock bit.
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		// Outlined slow path to allow inlining the fast path.
		// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
		m.unlockSlow(new)
	}
}

使用AddInt32方法快速進行解鎖,將m.state的低1位置為0,然後判斷新的m.state值,如果值為0,則代表當前鎖已經完全空閒了,結束解鎖,不等於0說明當前鎖沒有被佔用,會有等待的goroutine還未被喚醒,需要進行一系列喚醒操作,這部分邏輯就在unlockSlow方法內:

func (m *Mutex) unlockSlow(new int32) {
  // 這裡表示解鎖了一個沒有上鎖的鎖,則直接發生panic
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}
  // 正常模式的釋放鎖邏輯
	if new&mutexStarving == 0 {
		old := new
		for {
      // 如果沒有等待者則直接返回即可
      // 如果鎖處於加鎖的狀態,表示已經有goroutine獲取到了鎖,可以返回
      // 如果鎖處於喚醒狀態,這表明有等待的goroutine被喚醒了,不用嘗試獲取其他goroutine了
      // 如果鎖處於飢餓模式,鎖之後會直接給等待隊頭goroutine
			if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
				return
			}
			// 搶佔喚醒標誌位,這裡是想要把鎖的狀態設定為被喚醒,然後waiter佇列-1
			new = (old - 1<<mutexWaiterShift) | mutexWoken
			if atomic.CompareAndSwapInt32(&m.state, old, new) {
        // 搶佔成功喚醒一個goroutine
				runtime_Semrelease(&m.sema, false, 1)
				return
			}
      // 執行搶佔不成功時重新更新一下狀態資訊,下次for迴圈繼續處理
			old = m.state
		}
	} else {
    // 飢餓模式釋放鎖邏輯,直接喚醒等待佇列goroutine
		runtime_Semrelease(&m.sema, true, 1)
	}
}

我們在喚醒goroutine時正常模式/飢餓模式都呼叫func runtime_Semrelease(s *uint32, handoff bool, skipframes int),這兩種模式在第二個引數的傳參上不同,如果handoff is true, pass count directly to the first waiter.

非阻塞加鎖

Go語言在1.18版本中引入了非阻塞加鎖的方法TryLock(),其實現就很簡潔:

func (m *Mutex) TryLock() bool {
  // 記錄當前狀態
	old := m.state
  //  處於加鎖狀態/飢餓狀態直接獲取鎖失敗
	if old&(mutexLocked|mutexStarving) != 0 {
		return false
	}
	// 嘗試獲取鎖,獲取失敗直接獲取失敗
	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
		return false
	}


	return true
}

TryLock的實現就比較簡單了,主要就是兩個判斷邏輯:

  • 判斷當前鎖的狀態,如果鎖處於加鎖狀態或飢餓狀態直接獲取鎖失敗
  • 嘗試獲取鎖,獲取失敗直接獲取鎖失敗

TryLock並不被鼓勵使用,至少我還沒想到有什麼場景可以使用到它。

總結

通讀原始碼後你會發現互斥鎖的邏輯真的十分複雜,程式碼量雖然不多,但是很難以理解,一些細節點還需要大家多看看幾遍才能理解其為什麼這樣做,文末我們再總結一下互斥鎖的知識點:

  • 互斥鎖有兩種模式:正常模式、飢餓模式,飢餓模式的出現是為了優化正常模式下剛被喚起的goroutine與新建立的goroutine競爭時長時間獲取不到鎖,在Go1.9時引入飢餓模式,如果一個goroutine獲取鎖失敗超過1ms,則會將Mutex切換為飢餓模式,如果一個goroutine獲得了鎖,並且他在等待佇列隊尾 或者 他等待小於1ms,則會將Mutex的模式切換回正常模式
  • 加鎖的過程:
    • 鎖處於完全空閒狀態,通過CAS直接加鎖
    • 當鎖處於正常模式、加鎖狀態下,並且符合自旋條件,則會嘗試最多4次的自旋
    • 若當前goroutine不滿足自旋條件時,計算當前goroutine的鎖期望狀態
    • 嘗試使用CAS更新鎖狀態,若更新鎖狀態成功判斷當前goroutine是否可以獲取到鎖,獲取到鎖直接退出即可,若獲取不到鎖則陷入睡眠,等待被喚醒
    • goroutine被喚醒後,如果鎖處於飢餓模式,則直接拿到鎖,否則重置自旋次數、標誌喚醒位,重新走for迴圈自旋、獲取鎖邏輯;
  • 解鎖的過程
    • 原子操作mutexLocked,如果鎖為完全空閒狀態,直接解鎖成功
    • 如果鎖不是完全空閒狀態,,那麼進入unlockedslow邏輯
    • 如果解鎖一個未上鎖的鎖直接panic,因為沒加鎖mutexLocked的值為0,解鎖時進行mutexLocked - 1操作,這個操作會讓整個互斥鎖混亂,所以需要有這個判斷
    • 如果鎖處於飢餓模式直接喚醒等待佇列隊頭的waiter
    • 如果鎖處於正常模式下,沒有等待的goroutine可以直接退出,如果鎖已經處於鎖定狀態、喚醒狀態、飢餓模式則可以直接退出,因為已經有被喚醒的 goroutine 獲得了鎖.
  • 使用互斥鎖時切記拷貝Mutex,因為拷貝Mutex時會連帶狀態一起拷貝,因為Lock時只有鎖在完全空閒時才會獲取鎖成功,拷貝時連帶狀態一起拷貝後,會造成死鎖
  • TryLock的實現邏輯很簡單,主要判斷當前鎖處於加鎖狀態、飢餓模式就會直接獲取鎖失敗,嘗試獲取鎖失敗直接返回;

到此這篇關於Go語言的互斥鎖的詳細使用的文章就介紹到這了,更多相關Go語言 互斥鎖內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com