首頁 > 軟體

Golang協程池gopool設計與實現

2022-04-15 19:01:07

Goroutine

Goroutine 是 Golang 提供的一種輕量級執行緒,我們通常稱之為「協程」,相比較執行緒,建立一個協程的成本是很低的。所以你會經常看到 Golang 開發的應用出現上千個協程並行的場景。

Goroutine 的優勢:

  • 與執行緒相比,Goroutines 成本很低。

它們的堆疊大小隻有幾 kb,堆疊可以根據應用程式的需要增長和縮小,context switch 也很快,而線上程的情況下,堆疊大小必須指定並固定。

  • Goroutine 被多路複用到更少數量的 OS 執行緒。

一個包含數千個 Goroutine 的程式中可能只有一個執行緒。如果該執行緒中的任何 Goroutine 阻塞等待使用者輸入,則建立另一個 OS 執行緒並將剩餘的 Goroutine 移動到新的 OS 執行緒。所有這些都由執行時處理,作為開發者無需耗費心力關心,這也使得我們有很乾淨的 API 來支援並行。

  • Goroutines 使用 channel 進行通訊。

channel 的設計有效防止了在使用 Goroutine 存取共用記憶體時發生競爭條件(race conditions) 。channel 可以被認為是 Goroutine 進行通訊的管道。

下文中我們會以「協程」來代指 Goroutine。

協程池

在高並行場景下,我們可能會啟動大量的協程來處理業務邏輯。協程池是一種利用池化技術,複用物件,減少記憶體分配的頻率以及協程建立開銷,從而提高協程執行效率的技術。

最近抽空了解了位元組官方開源的 gopkg 庫提供的 gopool 協程池實現,感覺還是很高質量的,程式碼也非常簡潔清晰,而且 Kitex 底層也在使用 gopool 來管理協程,這裡我們梳理一下設計和實現。

gopool

Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool

gopool is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to the go keyword.

瞭解官方 README 就會發現gopool的用法其實非常簡單,將曾經我們經常使用的 go func(){...} 替換為 gopool.Go(func(){...}) 即可。

此時 gopool 將會使用預設的設定來管理你啟動的協程,你也可以選擇針對業務場景設定池子大小,以及擴容上限。

old:

go func() {
	// do your job
}()

new:

import (
    "github.com/bytedance/gopkg/util/gopool"
)

gopool.Go(func(){
	/// do your job
})

核心實現

下面我們來看看gopool是怎樣實現協程池管理的。

Pool

Pool 是一個定義了協程池能力的介面。

type Pool interface {
	// 池子的名稱
	Name() string
        
	// 設定池子內Goroutine的容量
	SetCap(cap int32)
        
	// 執行 f 函數
	Go(f func())
        
	// 帶 ctx,執行 f 函數
	CtxGo(ctx context.Context, f func())
        
	// 設定發生panic時呼叫的函數
	SetPanicHandler(f func(context.Context, interface{}))
}

gopool 提供了這個介面的預設實現(即下面即將介紹的pool),當我們直接呼叫 gopool.CtxGo 時依賴的就是這個。

這樣的設計模式在 Kitex 中也經常出現,所有的依賴均設計為介面,便於隨後擴充套件,底層提供一個預設的實現暴露出去,這樣對呼叫方也很友好。

type pool struct {
	// 池子名稱
	name string

	// 池子的容量, 即最大並行工作的 goroutine 的數量
	cap int32
        
	// 池子設定
	config *Config
        
	// task 連結串列
	taskHead  *task
	taskTail  *task
	taskLock  sync.Mutex
	taskCount int32

	// 記錄當前正在執行的 worker 的數量
	workerCount int32

	// 當 worker 出現panic時被呼叫
	panicHandler func(context.Context, interface{})
}

// NewPool 建立一個新的協程池,初始化名稱,容量,設定
func NewPool(name string, cap int32, config *Config) Pool {
	p := &pool{
		name:   name,
		cap:    cap,
		config: config,
	}
	return p
}

呼叫 NewPool 獲取了以 Pool 的形式返回的 pool 結構體。

Task

type task struct {
	ctx context.Context
	f   func()

	next *task
}

task 是一個連結串列結構,可以把它理解為一個待執行的任務,它包含了當前節點需要執行的函數f, 以及指向下一個task的指標。

綜合前一節 pool 的定義,我們可以看到,一個協程池 pool 對應了一組task

pool 維護了指向連結串列的頭尾的兩個指標:taskHeadtaskTail,以及連結串列的長度taskCount 和對應的鎖 taskLock

Worker

type worker struct {
	pool *pool
}

一個 worker 就是邏輯上的一個執行器,它唯一對應到一個協程池 pool。當一個worker被喚起,將會開啟一個goroutine ,不斷地從 pool 中的 task連結串列獲取任務並執行。

func (w *worker) run() {
	go func() {
		for {
                        // 宣告即將執行的 task
			var t *task
                        
                        // 操作 pool 中的 task 連結串列,加鎖
			w.pool.taskLock.Lock()
			if w.pool.taskHead != nil {
                                // 拿到 taskHead 準備執行
				t = w.pool.taskHead
                                
                                // 更新連結串列的 head 以及數量
				w.pool.taskHead = w.pool.taskHead.next
				atomic.AddInt32(&w.pool.taskCount, -1)
			}
                        // 如果前一步拿到的 taskHead 為空,說明無任務需要執行,清理後返回
			if t == nil {
				w.close()
				w.pool.taskLock.Unlock()
				w.Recycle()
				return
			}
			w.pool.taskLock.Unlock()
                        
                        // 執行任務,針對 panic 會recover,並呼叫設定的 handler
			func() {
				defer func() {
					if r := recover(); r != nil {
						msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())
						logger.CtxErrorf(t.ctx, msg)
						if w.pool.panicHandler != nil {
							w.pool.panicHandler(t.ctx, r)
						}
					}
				}()
				t.f()
			}()
			t.Recycle()
		}
	}()
}

整體來看

看到這裡,其實就能把整個流程串起來了。我們來看看對外的介面 CtxGo(context.Context, f func()) 到底做了什麼?

func Go(f func()) {
	CtxGo(context.Background(), f)
}

func CtxGo(ctx context.Context, f func()) {
	defaultPool.CtxGo(ctx, f)
}

func (p *pool) CtxGo(ctx context.Context, f func()) {

        // 建立一個 task 物件,將 ctx 和待執行的函數賦值
	t := taskPool.Get().(*task)
	t.ctx = ctx
	t.f = f
        
        // 將 task 插入 pool 的連結串列的尾部,更新連結串列數量
	p.taskLock.Lock()
	if p.taskHead == nil {
		p.taskHead = t
		p.taskTail = t
	} else {
		p.taskTail.next = t
		p.taskTail = t
	}
	p.taskLock.Unlock()
	atomic.AddInt32(&p.taskCount, 1)
        
        
	// 以下兩個條件滿足時,建立新的 worker 並喚起執行:
	// 1. task的數量超過了設定的限制 
	// 2. 當前執行的worker數量小於上限(或無worker執行)
	if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {
        
                // worker數量+1
		p.incWorkerCount()
                
                // 建立一個新的worker,並把當前 pool 賦值
		w := workerPool.Get().(*worker)
		w.pool = p
                
                // 喚起worker執行
		w.run()
	}
}

相信看了程式碼註釋,大家就能理解發生了什麼。

gopool 會自行維護一個 defaultPool,這是一個預設的 pool 結構體,在引入包的時候就進行初始化。當我們直接呼叫 gopool.CtxGo() 時,本質上是呼叫了 defaultPool 的同名方法

func init() {
	defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}

const (
	defaultScalaThreshold = 1
)

// Config is used to config pool.
type Config struct {
	// 控制擴容的門檻,一旦待執行的 task 超過此值,且 worker 數量未達到上限,就開始啟動新的 worker
	ScaleThreshold int32
}

// NewConfig creates a default Config.
func NewConfig() *Config {
	c := &Config{
		ScaleThreshold: defaultScalaThreshold,
	}
	return c
}

defaultPool 的名稱為 gopool.DefaultPool,池子容量一萬,擴容下限為 1。

當我們呼叫 CtxGo時,gopool 就會更新維護的任務連結串列,並且判斷是否需要擴容 worker

  • 若此時已經有很多 worker 啟動(底層一個 worker 對應一個 goroutine),不需要擴容,就直接返回。
  • 若判斷需要擴容,就建立一個新的worker,並呼叫 worker.run()方法啟動,各個worker會非同步地檢查 pool 裡面的任務連結串列是否還有待執行的任務,如果有就執行。

三個角色的定位

  • task 是一個待執行的任務節點,同時還包含了指向下一個任務的指標,連結串列結構;
  • worker 是一個實際執行任務的執行器,它會非同步啟動一個 goroutine 執行協程池裡面未執行的task
  • pool 是一個邏輯上的協程池,對應了一個task連結串列,同時負責維護task狀態的更新,以及在需要的時候建立新的 worker

使用 sync.Pool 進行效能優化

其實到這個地方,gopool已經是一個程式碼簡潔清晰的協程池庫了,但是效能上顯然有改進空間,所以gopool的作者應用了多次 sync.Pool 來池化物件的建立,複用woker和task物件。

這裡建議大家直接看原始碼,其實在上面的程式碼中已經有所涉及。

  • task 池化
var taskPool sync.Pool

func init() {
	taskPool.New = newTask
}

func newTask() interface{} {
	return &task{}
}

func (t *task) Recycle() {
	t.zero()
	taskPool.Put(t)
}
  • worker 池化
var workerPool sync.Pool

func init() {
	workerPool.New = newWorker
}

func newWorker() interface{} {
	return &worker{}
}

func (w *worker) Recycle() {
	w.zero()
	workerPool.Put(w)
}

到此這篇關於Golang協程池gopool設計與實現的文章就介紹到這了,更多相關Golang協程池gopool內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com