首頁 > 軟體

GO語言協程互斥鎖Mutex和讀寫鎖RWMutex用法範例詳解

2022-04-18 13:01:14

sync.Mutex

Go中使用sync.Mutex型別實現mutex(排他鎖、互斥鎖)。在原始碼的sync/mutex.go檔案中,有如下定義:

// A Mutex is a mutual exclusion lock.
// The zero value for a Mutex is an unlocked mutex.
//
// A Mutex must not be copied after first use.
type Mutex struct {
	state int32
	sema uint32
}

這沒有任何非凡的地方。和mutex相關的所有事情都是通過sync.Mutex型別的兩個方法sync.Lock()和sync.Unlock()函數來完成的,前者用於獲取sync.Mutex鎖,後者用於釋放sync.Mutex鎖。sync.Mutex一旦被鎖住,其它的Lock()操作就無法再獲取它的鎖,只有通過Unlock()釋放鎖之後才能通過Lock()繼續獲取鎖。

也就是說,已有的鎖會導致其它申請Lock()操作的goroutine被阻塞,且只有在Unlock()的時候才會解除阻塞

另外需要注意,sync.Mutex不區分讀寫鎖,只有Lock()與Lock()之間才會導致阻塞的情況,如果在一個地方Lock(),在另一個地方不Lock()而是直接修改或存取共用資料,這對於sync.Mutex型別來說是允許的,因為mutex不會和goroutine進行關聯。如果想要區分讀、寫鎖,可以使用sync.RWMutex型別,見後文。

在Lock()和Unlock()之間的程式碼段稱為資源的臨界區(critical section),在這一區間內的程式碼是嚴格被Lock()保護的,是執行緒安全的,任何一個時間點都只能有一個goroutine執行這段區間的程式碼

以下是使用sync.Mutex的一個範例,稍後是非常詳細的分析過程。

package main

import (
	"fmt"
	"sync"
	"time"
)

// 共用變數
var (
	m  sync.Mutex
	v1 int
)

// 修改共用變數
// 在Lock()和Unlock()之間的程式碼部分是臨界區
func change(i int) {
	m.Lock()
	time.Sleep(time.Second)
	v1 = v1 + 1
	if v1%10 == 0 {
		v1 = v1 - 10*i
	}
	m.Unlock()
}

// 存取共用變數
// 在Lock()和Unlock()之間的程式碼部分是是臨界區
func read() int {
	m.Lock()
	a := v1
	m.Unlock()
	return a
}

func main() {
	var numGR = 21
	var wg sync.WaitGroup

	fmt.Printf("%d", read())

	// 迴圈建立numGR個goroutine
	// 每個goroutine都執行change()、read()
	// 每個change()和read()都會持有鎖
	for i := 0; i < numGR; i++ {
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
			change(i)
			fmt.Printf(" -> %d", read())
		}(i)
	}

	wg.Wait()
}

第一次執行結果:

0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 -> -100 -> -99
-> -98 -> -97 -> -96 -> -95 -> -94 -> -93 -> -92 -> -91 -> -260 -> -259

第二次執行結果:注意其中的-74和-72之間跨了一個數

0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6 -> 7 -> 8 -> 9 -> -80 -> -79 
-> -78 -> -77 -> -76 -> -75 -> -74 -> -72 -> -71 -> -230 -> -229 -> -229

上面的範例中,change()、read()都會申請鎖,並在準備執行完函數時釋放鎖,它們如何修改資料、存取資料本文不多做解釋。需要詳細解釋的是main()中的for迴圈部分。

在for迴圈中,會不斷啟用新的goroutine(共21個)執行匿名函數,在每個匿名函數中都會執行change()和read(),意味著每個goroutine都會申請兩次鎖、釋放兩次鎖,且for迴圈中沒有任何Sleep延遲,這21個goroutine幾乎是一瞬間同時啟用的。

但由於change()和read()中都申請鎖,對於這21個goroutine將要分別執行的42個critical section,Lock()保證了在某一時間點只有其中一個goroutine能存取其中一個critical section。當釋放了一個critical section,其它的Lock()將爭奪互斥鎖,也就是所謂的競爭現象(race condition)。因為競爭的存在,這42個critical section被存取的順序是隨機的,完全無法保證哪個critical section先被存取。

對於前9個被排程到的goroutine,無論是哪個goroutine取得這9個change(i)中的critical section,都只是對共用變數v1做加1運算,但當第10個goroutine被排程時,由於v1加1之後得到10,它滿足if條件,會執行v1 = v1 - i*10,但這個i可能是任意0到numGR之間的值(因為無法保證並行的goroutine的排程順序),這使得v1的值從第10個goroutine開始出現隨機性。但從第10到第19個goroutine被排程的過程中,也只是對共用變數v1做加1運算,這些值是可以根據第10個數推斷出來的,到第20個goroutine,又再次隨機。依此類推。

此外,每個goroutine中的read()也都會參與鎖競爭,所以並不能保證每次change(i)之後會隨之執行到read(),可能goroutine 1的change()執行完後,會跳轉到goroutine 3的change()上,這樣一來,goroutine 1的read()就無法讀取到goroutine 1所修改的v1值,而是存取到其它goroutine中修改後的值。所以,前面的第二次執行結果中出現了一次資料跨越。只不過執行完change()後立即執行read()的機率比較大,所以多數時候輸出的資料都是連續的。

總而言之,Mutex保證了每個critical section安全,某一時間點只有一個goroutine存取到這部分,但也因此而出現了隨機性

如果Lock()後忘記了Unlock(),將會永久阻塞而出現死鎖。如果

適合sync.Mutex的資料型別

其實,對於內建型別的共用變數來說,使用sync.Mutex和Lock()、Unlock()來保護也是不合理的,因為它們自身不包含Mutex屬性。真正合理的共用變數是那些包含Mutex屬性的struct型別。例如:

type mytype struct {
	m   sync.Mutex
	var int
}

x := new(mytype)

這時只要想保護var變數,就先x.m.Lock(),操作完var後,再x.m.Unlock()。這樣就能保證x中的var欄位變數一定是被保護的。

sync.RWMutex

Go中使用sync.RWMutex型別實現讀寫互斥鎖rwmutex。在原始碼的sync/rwmutex.go檔案中,有如下定義:

// A RWMutex is a reader/writer mutual exclusion lock.
// The lock can be held by an arbitrary number of readers or a single writer.
// The zero value for a RWMutex is an unlocked mutex.
//
// A RWMutex must not be copied after first use.
//
// If a goroutine holds a RWMutex for reading and another goroutine might
// call Lock, no goroutine should expect to be able to acquire a read lock
// until the initial read lock is released. In particular, this prohibits
// recursive read locking. This is to ensure that the lock eventually becomes
// available; a blocked Lock call excludes new readers from acquiring the
// lock.
type RWMutex struct {
	w           Mutex  // held if there are pending writers
	writerSem   uint32 // 寫鎖需要等待讀鎖釋放的號誌
	readerSem   uint32 // 讀鎖需要等待寫鎖釋放的號誌
	readerCount int32  // 讀鎖後面掛起了多少個寫鎖申請
	readerWait  int32  // 已釋放了多少個讀鎖
}

上面的註釋和原始碼說明了幾點:

  • RWMutex是基於Mutex的,在Mutex的基礎之上增加了讀、寫的號誌,並使用了類似參照計數的讀鎖數量
  • 讀鎖與讀鎖相容,讀鎖與寫鎖互斥,寫鎖與寫鎖互斥,只有在鎖釋放後才可以繼續申請互斥的鎖
    • 可以同時申請多個讀鎖
    • 有讀鎖時申請寫鎖將阻塞,有寫鎖時申請讀鎖將阻塞
    • 只要有寫鎖,後續申請讀鎖和寫鎖都將阻塞

此型別有幾個鎖和解鎖的方法:

func (rw *RWMutex) Lock()
func (rw *RWMutex) RLock()
func (rw *RWMutex) RLocker() Locker
func (rw *RWMutex) RUnlock()
func (rw *RWMutex) Unlock()

其中:

  • Lock()和Unlock()用於申請和釋放寫鎖
  • RLock()和RUnlock()用於申請和釋放讀鎖
    • 一次RUnlock()操作只是對讀鎖數量減1,即減少一次讀鎖的參照計數
  • 如果不存在寫鎖,則Unlock()引發panic,如果不存在讀鎖,則RUnlock()引發panic
  • RLocker()用於返回一個實現了Lock()和Unlock()方法的Locker介面

此外,無論是Mutex還是RWMutex都不會和goroutine進行關聯,這意味著它們的鎖申請行為可以在一個goroutine中操作,釋放鎖行為可以在另一個goroutine中操作。

由於RLock()和Lock()都能保證資料不被其它goroutine修改,所以在RLock()與RUnlock()之間的,以及Lock()與Unlock()之間的程式碼區都是critical section。

以下是一個範例,此範例中同時使用了Mutex和RWMutex,RWMutex用於讀、寫,Mutex只用於讀。

package main

import (
	"fmt"
	"os"
	"sync"
	"time"
)

var Password = secret{password: "myPassword"}

type secret struct {
	RWM      sync.RWMutex
	M        sync.Mutex
	password string
}

// 通過rwmutex寫
func Change(c *secret, pass string) {
	c.RWM.Lock()
	fmt.Println("Change with rwmutex lock")
	time.Sleep(3 * time.Second)
	c.password = pass
	c.RWM.Unlock()
}

// 通過rwmutex讀
func rwMutexShow(c *secret) string {
	c.RWM.RLock()
	fmt.Println("show with rwmutex",time.Now().Second())
	time.Sleep(1 * time.Second)
	defer c.RWM.RUnlock()
	return c.password
}

// 通過mutex讀,和rwMutexShow的唯一區別在於鎖的方式不同
func mutexShow(c *secret) string {
	c.M.Lock()
	fmt.Println("show with mutex:",time.Now().Second())
	time.Sleep(1 * time.Second)
	defer c.M.Unlock()
	return c.password
}

func main() {
	// 定義一個稍後用於覆蓋(重寫)的函數
	var show = func(c *secret) string { return "" }

	// 通過變數賦值的方式,選擇並重寫showFunc函數
	if len(os.Args) != 2 {
		fmt.Println("Using sync.RWMutex!",time.Now().Second())
		show = rwMutexShow
	} else {
		fmt.Println("Using sync.Mutex!",time.Now().Second())
		show = mutexShow
	}
	
	var wg sync.WaitGroup

	// 啟用5個goroutine,每個goroutine都檢視
	// 根據選擇的函數不同,showFunc()加鎖的方式不同
	for i := 0; i < 5; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println("Go Pass:", show(&Password),time.Now().Second())
		}()
	}
	
	// 啟用一個申請寫鎖的goroutine
	go func() {
		wg.Add(1)
		defer wg.Done()
		Change(&Password, "123456")
	}()
	// 阻塞,直到所有wg.Done
	wg.Wait()
}

Change()函數申請寫鎖,並睡眠3秒後修改資料,然後釋放寫鎖。

rwMutexShow()函數申請讀鎖,並睡眠一秒後取得資料,並釋放讀鎖。注意,rwMutexShow()中的print和return是相隔一秒鐘的。

mutexShow()函數申請Mutex鎖,和RWMutex互不相干。和rwMutexShow()唯一不同之處在於申請的鎖不同。

main()中,先根據命令列引數數量決定執行哪一個show()。之所以能根據函數變數來賦值,是因為先定義了一個show()函數,它的函數簽名和rwMutexShow()、mutexShow()的簽名相同,所以可以相互賦值。

for迴圈中啟用了5個goroutine並行執行,for瞬間啟用5個goroutine後,繼續執行main()程式碼會啟用另一個用於申請寫鎖的goroutine。這6個goroutine的執行順序是隨機的。

如果show選中的函數是rwMutexShow(),則5個goroutine要申請的RLock()鎖和寫鎖是衝突的,但5個RLock()是相容的。所以,只要某個時間點排程到了寫鎖的goroutine,剩下的讀鎖goroutine都會從那時開始阻塞3秒。

除此之外,還有一個不嚴格準確,但在時間持續長短的理論上來說能保證的一個規律:當修改資料結束後,各個剩下的goroutine都申請讀鎖,因為申請後立即print輸出,然後睡眠1秒,但1秒時間足夠所有剩下的goroutine申請完讀鎖,使得show with rwmutex輸出是連在一起,輸出的Go Pass: 123456又是連在一起的。

某次結果如下:

Using sync.RWMutex! 58
show with rwmutex 58
Change with rwmutex lock
Go Pass: myPassword 59
show with rwmutex 2
show with rwmutex 2
show with rwmutex 2
show with rwmutex 2
Go Pass: 123456 3
Go Pass: 123456 3
Go Pass: 123456 3
Go Pass: 123456 3

如果show選中的函數是mutexShow(),則讀資料和寫資料互不衝突,但讀和讀是衝突的(因為Mutex的Lock()是互斥的)。

某次結果如下:

Using sync.Mutex! 30
Change with rwmutex lock
show with mutex: 30
Go Pass: myPassword 31
show with mutex: 31
Go Pass: myPassword 32
show with mutex: 32
Go Pass: 123456 33
show with mutex: 33
show with mutex: 34
Go Pass: 123456 34
Go Pass: 123456 35

用Mutex還是用RWMutex

Mutex和RWMutex都不關聯goroutine,但RWMutex顯然更適用於讀多寫少的場景。僅針對讀的效能來說,RWMutex要高於Mutex,因為rwmutex的多個讀可以並存。

 


IT145.com E-mail:sddin#qq.com