<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Goroutine 是 Golang 提供的一種輕量級執行緒,我們通常稱之為「協程」,相比較執行緒,建立一個協程的成本是很低的。所以你會經常看到 Golang 開發的應用出現上千個協程並行的場景。
Goroutine 的優勢:
它們的堆疊大小隻有幾 kb,堆疊可以根據應用程式的需要增長和縮小,context switch 也很快,而線上程的情況下,堆疊大小必須指定並固定。
一個包含數千個 Goroutine 的程式中可能只有一個執行緒。如果該執行緒中的任何 Goroutine 阻塞等待使用者輸入,則建立另一個 OS 執行緒並將剩餘的 Goroutine 移動到新的 OS 執行緒。所有這些都由執行時處理,作為開發者無需耗費心力關心,這也使得我們有很乾淨的 API 來支援並行。
channel 的設計有效防止了在使用 Goroutine 存取共用記憶體時發生競爭條件(race conditions) 。channel 可以被認為是 Goroutine 進行通訊的管道。
下文中我們會以「協程」來代指 Goroutine。
在高並行場景下,我們可能會啟動大量的協程來處理業務邏輯。協程池是一種利用池化技術,複用物件,減少記憶體分配的頻率以及協程建立開銷,從而提高協程執行效率的技術。
最近抽空了解了位元組官方開源的 gopkg 庫提供的 gopool
協程池實現,感覺還是很高質量的,程式碼也非常簡潔清晰,而且 Kitex
底層也在使用 gopool
來管理協程,這裡我們梳理一下設計和實現。
Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool
gopool
is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines. It is an alternative to thego
keyword.
瞭解官方 README 就會發現gopool
的用法其實非常簡單,將曾經我們經常使用的 go func(){...}
替換為 gopool.Go(func(){...})
即可。
此時 gopool
將會使用預設的設定來管理你啟動的協程,你也可以選擇針對業務場景設定池子大小,以及擴容上限。
old:
go func() { // do your job }()
new:
import ( "github.com/bytedance/gopkg/util/gopool" ) gopool.Go(func(){ /// do your job })
下面我們來看看gopool
是怎樣實現協程池管理的。
Pool
是一個定義了協程池能力的介面。
type Pool interface { // 池子的名稱 Name() string // 設定池子內Goroutine的容量 SetCap(cap int32) // 執行 f 函數 Go(f func()) // 帶 ctx,執行 f 函數 CtxGo(ctx context.Context, f func()) // 設定發生panic時呼叫的函數 SetPanicHandler(f func(context.Context, interface{})) }
gopool
提供了這個介面的預設實現(即下面即將介紹的pool
),當我們直接呼叫 gopool.CtxGo 時依賴的就是這個。
這樣的設計模式在 Kitex
中也經常出現,所有的依賴均設計為介面,便於隨後擴充套件,底層提供一個預設的實現暴露出去,這樣對呼叫方也很友好。
type pool struct { // 池子名稱 name string // 池子的容量, 即最大並行工作的 goroutine 的數量 cap int32 // 池子設定 config *Config // task 連結串列 taskHead *task taskTail *task taskLock sync.Mutex taskCount int32 // 記錄當前正在執行的 worker 的數量 workerCount int32 // 當 worker 出現panic時被呼叫 panicHandler func(context.Context, interface{}) } // NewPool 建立一個新的協程池,初始化名稱,容量,設定 func NewPool(name string, cap int32, config *Config) Pool { p := &pool{ name: name, cap: cap, config: config, } return p }
呼叫 NewPool
獲取了以 Pool
的形式返回的 pool
結構體。
type task struct { ctx context.Context f func() next *task }
task
是一個連結串列結構,可以把它理解為一個待執行的任務,它包含了當前節點需要執行的函數f
, 以及指向下一個task
的指標。
綜合前一節 pool
的定義,我們可以看到,一個協程池 pool
對應了一組task
。
pool
維護了指向連結串列的頭尾的兩個指標:taskHead
和 taskTail
,以及連結串列的長度taskCount
和對應的鎖 taskLock
。
type worker struct { pool *pool }
一個 worker
就是邏輯上的一個執行器,它唯一對應到一個協程池 pool
。當一個worker
被喚起,將會開啟一個goroutine
,不斷地從 pool
中的 task
連結串列獲取任務並執行。
func (w *worker) run() { go func() { for { // 宣告即將執行的 task var t *task // 操作 pool 中的 task 連結串列,加鎖 w.pool.taskLock.Lock() if w.pool.taskHead != nil { // 拿到 taskHead 準備執行 t = w.pool.taskHead // 更新連結串列的 head 以及數量 w.pool.taskHead = w.pool.taskHead.next atomic.AddInt32(&w.pool.taskCount, -1) } // 如果前一步拿到的 taskHead 為空,說明無任務需要執行,清理後返回 if t == nil { w.close() w.pool.taskLock.Unlock() w.Recycle() return } w.pool.taskLock.Unlock() // 執行任務,針對 panic 會recover,並呼叫設定的 handler func() { defer func() { if r := recover(); r != nil { msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack()) logger.CtxErrorf(t.ctx, msg) if w.pool.panicHandler != nil { w.pool.panicHandler(t.ctx, r) } } }() t.f() }() t.Recycle() } }() }
看到這裡,其實就能把整個流程串起來了。我們來看看對外的介面 CtxGo(context.Context, f func())
到底做了什麼?
func Go(f func()) { CtxGo(context.Background(), f) } func CtxGo(ctx context.Context, f func()) { defaultPool.CtxGo(ctx, f) } func (p *pool) CtxGo(ctx context.Context, f func()) { // 建立一個 task 物件,將 ctx 和待執行的函數賦值 t := taskPool.Get().(*task) t.ctx = ctx t.f = f // 將 task 插入 pool 的連結串列的尾部,更新連結串列數量 p.taskLock.Lock() if p.taskHead == nil { p.taskHead = t p.taskTail = t } else { p.taskTail.next = t p.taskTail = t } p.taskLock.Unlock() atomic.AddInt32(&p.taskCount, 1) // 以下兩個條件滿足時,建立新的 worker 並喚起執行: // 1. task的數量超過了設定的限制 // 2. 當前執行的worker數量小於上限(或無worker執行) if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 { // worker數量+1 p.incWorkerCount() // 建立一個新的worker,並把當前 pool 賦值 w := workerPool.Get().(*worker) w.pool = p // 喚起worker執行 w.run() } }
相信看了程式碼註釋,大家就能理解發生了什麼。
gopool
會自行維護一個 defaultPool
,這是一個預設的 pool
結構體,在引入包的時候就進行初始化。當我們直接呼叫 gopool.CtxGo()
時,本質上是呼叫了 defaultPool
的同名方法
func init() { defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig()) } const ( defaultScalaThreshold = 1 ) // Config is used to config pool. type Config struct { // 控制擴容的門檻,一旦待執行的 task 超過此值,且 worker 數量未達到上限,就開始啟動新的 worker ScaleThreshold int32 } // NewConfig creates a default Config. func NewConfig() *Config { c := &Config{ ScaleThreshold: defaultScalaThreshold, } return c }
defaultPool
的名稱為 gopool.DefaultPool
,池子容量一萬,擴容下限為 1。
當我們呼叫 CtxGo
時,gopool
就會更新維護的任務連結串列,並且判斷是否需要擴容 worker
:
worker
啟動(底層一個 worker
對應一個 goroutine
),不需要擴容,就直接返回。worker
,並呼叫 worker.run()
方法啟動,各個worker
會非同步地檢查 pool
裡面的任務連結串列是否還有待執行的任務,如果有就執行。task
是一個待執行的任務節點,同時還包含了指向下一個任務的指標,連結串列結構;worker
是一個實際執行任務的執行器,它會非同步啟動一個 goroutine
執行協程池裡面未執行的task
;pool
是一個邏輯上的協程池,對應了一個task
連結串列,同時負責維護task
狀態的更新,以及在需要的時候建立新的 worker
。其實到這個地方,gopool
已經是一個程式碼簡潔清晰的協程池庫了,但是效能上顯然有改進空間,所以gopool
的作者應用了多次 sync.Pool
來池化物件的建立,複用woker和task物件。
這裡建議大家直接看原始碼,其實在上面的程式碼中已經有所涉及。
var taskPool sync.Pool func init() { taskPool.New = newTask } func newTask() interface{} { return &task{} } func (t *task) Recycle() { t.zero() taskPool.Put(t) }
var workerPool sync.Pool func init() { workerPool.New = newWorker } func newWorker() interface{} { return &worker{} } func (w *worker) Recycle() { w.zero() workerPool.Put(w) }
到此這篇關於Golang協程池gopool設計與實現的文章就介紹到這了,更多相關Golang協程池gopool內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45