首頁 > 軟體

golang基於errgroup實現並行呼叫的方法

2022-09-05 14:01:25

序列呼叫

在用go編寫web/rpc伺服器的時候,經常會出現需要對下游多 個/組 服務呼叫rpc(或者其他比較耗時的操作)的情況。
按照自然的寫法,比如對下游有ABC三個呼叫,序列順著寫,就總共要花費TimeA+TimeB+TimeC的時間:

func Handler(ctx context.Context) {
    var a, b, c respType
    a = A(ctx)
    b = B(ctx)
    c = C(ctx)
}

基於sync.WaitGroup實現簡單的並行呼叫

但經常地,幾個rpc相互之間沒有依賴關係的情況,這時,我們稍加思考就會想到使用並行的方式,同時發出請求,阻塞等到所有請求返回,這樣,總體耗時就變成了Max(TimeA, TimeB, TimeC),我們可以通過常用的sync.WaitGroup輕鬆實現這事:

func Handler(ctx context.Context) {
    var a, b, c respType
   	wg := sync.WaitGroup{}
	wg.Add(3)
	go func() {
		defer wg.Done()
		a = A(ctx)
	}()
	go func() {
		defer wg.Done()
		b = B(ctx)
	}()
	go func() {
		defer wg.Done()
		c = C(ctx)
	}()
	wg.Wait()
}

但是現實事件是不完美的,尤其是在加入了網路這一因素後,我們經常會需要處理呼叫失敗的情況,很多情況下,並行的幾個操作只要任一失敗,整個處理就算失敗了,但是由於WaitGroup要等所有呼叫都done才能返回,因此呼叫時間是由耗時最長的那個(不一定是失敗的)決定的,如果不是失敗的那個,其實就產生了資源浪費,如下圖,B最先失敗了,此時邏輯上已經可以返回,但是實際卻等到了最長的呼叫-A返回了整個函數才返回:

func Handler(ctx context.Context) {
    var a, b, c respType
    var errA, errB, errC error
   	wg := sync.WaitGroup{}
	wg.Add(3)
	go func() {
		defer wg.Done()
		a, errA = A(ctx)
	}()
	go func() {
		defer wg.Done()
		b, errB = B(ctx)
	}()
	go func() {
		defer wg.Done()
		c, errC = C(ctx)
	}()
	wg.Wait()
	if errA != nil {
	// ...
	}
	if errB != nil {
	// ...
	}
	if errC != nil {
	// ...
	}
}

基於errgroup.Group實現並行呼叫

這對於追求極致的我們來說顯然是不能接受的,我們希望達到,如果有任意一個呼叫報錯,立刻讓所有呼叫返回的效果:

好在,我們有現成的工具可以用,通過引入"golang.org/x/sync/errgroup",可以輕鬆實現上面的目的。

為了使用errgroup,先使用WithContext方法建立一個Group

wg, groupCtx := errgroup.WithContext(ctx)

返回的第一個引數是*errgroup.Group,第二個則是在子呼叫中應該使用的context。

然後,使用Go方法呼叫所有的並行方法

	wg.Go(func() error {
		var err error
		a, err = A(groupCtx)
		return err
	})

最後, 使用Wait方法等待並行結束,返回值是所有子呼叫中第一個非nil的error,全成功的話就是nil。

if err := wg.Wait(); err != nil {
// ...
}

因此整體,我們的程式碼差不多就長這個樣子

func handler(ctx context.Context) {
    var a, b, c respType
    wg, groupCtx := errgroup.WithContext(ctx)
	wg.Go(func() error {
		var err error
		a, err = A(groupCtx)
		return err
	})
	wg.Go(func() error {
		var err error
		b, err = B(groupCtx)
		return err
	})
	wg.Go(func() error {
		var err error
		c, err = C(groupCtx)
		return err
	})
	if err := wg.Wait(); err != nil {
    // ... 錯誤處理
    }
    // 全部成功
}

errgroup內部通過封裝了waitGroup和sync.Once實現了這個語法糖。

使用時特別要注意的是,errgroup的提前取消呼叫rpc是通過cancel那個返回的context(即上面的groupCtx)實現的,因此在所有子呼叫中都要實現監聽groupCtx的Done事件。而在正常的rpc框架中都已經幫我們實現了這件事,因此我們只要保證傳進去的是groupCtx即可。

總結

errgroup幫我們封裝了並行呼叫下游時快速失敗的邏輯,我們能很方便地使用它進行業務程式碼的編寫。使用的關鍵是一定要記得在子呼叫中傳遞WithContext中返回的Context。

好用的工具千千萬,讓我們一個個來掌握!


IT145.com E-mail:sddin#qq.com