首頁 > 軟體

Go singleflight使用以及原理

2023-01-04 14:00:04

這個東西很重要,可以經常用在專案當中,所以我們單獨拿出來進行講解。

在使用它之前我們需要導包:

 go get golang.org/x/sync/singleflight

golang/sync/singleflight.Group 是 Go 語言擴充套件包中提供了另一種同步原語,它能夠在一個服務中抑制對下游的多次重複請求。一個比較常見的使用場景是:我們在使用 Redis 對資料庫中的資料進行快取,發生快取擊穿時,大量的流量都會打到資料庫上進而影響服務的尾延時。

但是 golang/sync/singleflight.Group 能有效地解決這個問題,它能夠限制對同一個鍵值對的多次重複請求,減少對下游的瞬時流量。

使用方法

singleflight類的使用方法就新建一個singleflight.Group,使用其方法Do或者DoChan來包裝方法,被包裝的方法在對於同一個key,只會有一個協程執行,其他協程等待那個協程執行結束後,拿到同樣的結果。

Group結構體

代表一類工作,同一個group中,同樣的key同時只能被執行一次

Do方法

func (g *Group) Do(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) (v interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, err error, shared bool)

key:同一個key,同時只有一個協程執行

fn:被包裝的函數

v:返回值,即執行結果。其他等待的協程都會拿到

shared:表示是否由其他協程得到了這個結果v

DoChan方法

func (g *Group) DoChan(key string, fn func() (interface{<!--{cke_protected}{C}%3C!%2D%2D%20%2D%2D%3E-->}, error)) <-chan Result

Do差不多其實,因此我們就只講解Do的實際應用場景了。

具體應用場景

var singleSetCache singleflight.Group
func GetAndSetCache(r *http.Request, cacheKey string) (string, error) {
	log.Printf("request %s start to get and set cache...", r.URL)
	value, err, _ := singleSetCache.Do(cacheKey, func() (interface{}, error) {
		log.Printf("request %s is getting cache...", r.URL)
		time.Sleep(3 * time.Second)
		log.Printf("request %s get cache success!", r.URL)
		return cacheKey, nil
	})
	return value.(string), err
}
func main() {
	r := gin.Default()
	r.GET("/sekill/:id", func(context *gin.Context) {
		ID := context.Param("id")
		cache, err := GetAndSetCache(context.Request, ID)
		if err != nil {
			log.Println(err)
		}
		log.Printf("request %s get value: %v", context.Request.URL, cache)
	})
	r.Run()
}

來看一下執行結果:

2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 is getting cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/9 start to get and set cache...
2022/12/29 16:21:18 request /sekill/5 start to get and set cache...
2022/12/29 16:21:19 request /sekill/9 start to get and set cache...
2022/12/29 16:21:19 request /sekill/5 start to get and set cache...
2022/12/29 16:21:21 request /sekill/9 get cache success!
2022/12/29 16:21:21 request /sekill/5 get cache success!
2022/12/29 16:21:21 request /sekill/5 get value: 5
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 |    3.0106529s |       127.0.0.1 | GET      "/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.8090881s |       127.0.0.1 | GET      "/sekill/5"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.2166003s |       127.0.0.1 | GET      "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.6064069s |       127.0.0.1 | GET      "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.4178652s |       127.0.0.1 | GET      "/sekill/9"
2022/12/29 16:21:21 request /sekill/9 get value: 9
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.8101267s |       127.0.0.1 | GET      "/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 |    3.0116892s |       127.0.0.1 | GET      "/sekill/9"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.6074537s |       127.0.0.1 | GET      "/sekill/5"
2022/12/29 16:21:21 request /sekill/5 get value: 5
[GIN] 2022/12/29 - 16:21:21 | 200 |    2.4076473s |       127.0.0.1 | GET      "/sekill/5"
[GIN] 2022/12/29 - 16:21:21 | 200 |     2.218686s |       127.0.0.1 | GET      "/sekill/5"

可以看到確實只有一個協程執行了被包裝的函數,並且其他協程都拿到了結果。

接下來我們來看一下它的原理吧!

原理

首先來看一下Group結構體:

type Group struct {
   mu sync.Mutex  // 鎖保證並行安全   
   m  map[string]*call //儲存key對應的函數執行過程和結果的變數。
}

然後我們來看一下call結構體:

type call struct {
    wg sync.WaitGroup //用WaitGroup實現只有一個協程執行函數
    val interface{} //函數執行結果
    err error
    forgotten bool
    dups  int  //含義是duplications,即同時執行同一個key的協程數量
    chans []chan<- Result
}

然後我們來看一下Do方法:

func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    // 寫Group的m欄位時,加鎖保證寫安全
	g.mu.Lock()
	if g.m == nil {
		g.m = make(map[string]*call)
	}
	if c, ok := g.m[key]; ok {
        // 如果key已經存在,說明已經由協程在執行,則dups++並等待其執行結果,執行結果儲存在對應的call的val欄位裡
		c.dups++
		g.mu.Unlock()
		c.wg.Wait()
		if e, ok := c.err.(*panicError); ok {
			panic(e)
		} else if c.err == errGoexit {
			runtime.Goexit()
		}
		return c.val, c.err, true
	}
    // 如果key不存在,則新建一個call,並使用WaitGroup來阻塞其他協程,同時在m欄位裡寫入key和對應的call
	c := new(call)
	c.wg.Add(1)
	g.m[key] = c
	g.mu.Unlock()
	g.doCall(c, key, fn) // 進來的第一個協程就來執行這個函數
	return c.val, c.err, c.dups > 0
}

然後我們來分析一下doCall函數:

func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
	c.val, c.err = fn()
	c.wg.Done()
	g.mu.Lock()
	delete(g.m, key)
	for _, ch := range c.chans {
		ch <- Result{c.val, c.err, c.dups > 0}
	}
	g.mu.Unlock()
}
  • 執行傳入的函數 fn,該函數的返回值會賦值給 c.valc.err
  • 呼叫 sync.WaitGroup.Done 方法通知所有等待結果的 Goroutine — 當前函數已經執行完成,可以從 call 結構體中取出返回值並返回了;
  • 獲取持有的互斥鎖並通過管道將資訊同步給使用 golang/sync/singleflight.Group.DoChan 方法的 Goroutine

問題分析

分析了原始碼之後,我們得出了一個結論,這個東西是用阻塞來實現的,這就引發了一個問題:如果我們處理的那個請求剛好遇到問題了,那麼後面的所有請求都會被阻塞,也就是,我們應該加上適合的超時控制,如果在一定時間內,沒有獲得結果,那麼就當作超時處理。

於是這個適合我們應該使用DoChan()。兩者實現上完全一樣,不同的是, DoChan() 通過 channel 返回結果。因此可以使用 select 語句實現超時控制。

var singleSetCache singleflight.Group
func GetAndSetCache(r *http.Request, cacheKey string) (string, error) {
   log.Printf("request %s start to get and set cache...", r.URL)
   retChan := singleSetCache.DoChan(cacheKey, func() (interface{}, error) {
      log.Printf("request %s is getting cache...", r.URL)
      time.Sleep(3 * time.Second)
      log.Printf("request %s get cache success!", r.URL)
      return cacheKey, nil
   })
   var ret singleflight.Result
   timeout := time.After(2 * time.Second)
   select {
   case <-timeout:
      log.Println("time out!")
      return "", errors.New("time out")
   case ret = <-retChan: // 從chan中獲取結果
      return ret.Val.(string), ret.Err
   }
}
func main() {
   r := gin.Default()
   r.GET("/sekill/:id", func(context *gin.Context) {
      ID := context.Param("id")
      cache, err := GetAndSetCache(context.Request, ID)
      if err != nil {
         log.Println(err)
      }
      log.Printf("request %s get value: %v", context.Request.URL, cache)
   })
   r.Run()
}

補充

這裡其實還有一個Forget方法,它可以在對映表中刪除某個鍵,接下來對鍵的呼叫就不會等待前面的函數返回了。

總結

當然,如果單次的失敗無法容忍,在高並行的場景下更好的處理方案是:

放棄使用同步請求,犧牲資料更新的實時性

“快取” 儲存準實時的資料 + “非同步更新” 資料到快取

到此這篇關於Go singleflight使用以及原理的文章就介紹到這了,更多相關Go singleflight內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com