首頁 > 軟體

Go中groutine通訊與context控制範例詳解

2022-02-11 19:01:18

需求背景:

專案中需要定期執行任務A來做一些輔助的工作,A的執行需要在超時時間內完成,如果本次執行超時了,那就不對本次的執行結果進行處理(即放棄這次執行)。同時A又依賴B,C兩個子任務的執行結果。B, C之間相互獨立,可以並行的執行。但無論B,C哪一個執行失敗或超時都會導致本次任務執行失敗。

Groutine的並行控制:

go中對於groutine的並行控制有三種解決方案:

  • 通過channel控制。

    父groutine中宣告無buffer的chan切片,向要開啟的子groutine中傳入切片中的一個chan

    子groutine執行完成後向這個chan中寫入資料(可以是和父groutine通訊的也可以不是)

    父groutine遍歷所有chan並執行 <-chan 操作, 利用無buffer的channel只有讀寫同時準備好才能執行的特性進行控

  • WaitGroup控制。

    通過sync.Waitgroup, 每開啟一個子groutine就執行 wg.Add(1), 子groutine內部執行wg.Done(), 父groutine通過wg.Wait()等待所有子協程

  • Context控制。

    waitGroup和Context應該是Go中較為常用的兩種並行控制。相較而言,context對於派生groutine有更強大的控制力,可以控制多級樹狀分佈的groutine。

    當然waitGroup的子groutine也可以再開啟新的waitGroup並且等待多個孫groutine, 但是不如context的控制更加方便.

Context:

context包提供了四個方法建立不同型別的context

  • WitchCancel()
  • WithDeadline()
  • WithTimeout()
  • WithValue()

WithValue()主要用於通過context傳遞一些上下文訊息,不在本次討論中。WithTimeout和WithDeadLine幾乎是一致的。但無論哪種,控制groutine都需要使用ctx.Done()方法. Done() 方法返回一個 "唯讀"的chan <-chan struct{}, 需要編寫程式碼監聽這個chan,一旦收到它的訊息就說明這個context應當結束了,無論是到達了超時時間還是在某個地方主動cancel()了方法。

看看程式碼:

var ch1 chan int
var ch2 chan int
<br>// 任務A, 通過最外層的for來控制定期執行
func TestMe(t *testing.T) {
    ch1 = make(chan int, 0)
    ch2 = make(chan int, 0)
    count := 0
    for {
        count ++
        ctx, cancel := context.WithTimeout(context.Background(), time.Second * 2)<br>                // 任務A的邏輯部分,開啟子任務B, C。<br>                // B,C通過ch1,ch2和A通訊。<br>                // 同時監聽ctx.Done,如果超時了立即結束本次任務不繼續執行
        go func(ctx context.Context) {
            go g1(ctx, count)
            go g2(ctx, count)
            v1, v2 := -1, -1
            for v1 == -1 || v2 == -1 {
                select {
                case <- ctx.Done():
                    cancel()
                    fmt.Println("父級2超時退出,當前count值為", count, "當前時間:", time.Now())
                    return
                case v1 = <- ch1:
                case v2 = <- ch2:
                }
            }
            fmt.Println("正常執行完成退出, 開啟下次迴圈,當前count值為:", count, "當前 v1: ", v1, "當前 v2: ", v2)
        }(ctx)<br>                // 任務A監控ctx是否到達timeOUT,timeout就終止本次執行
        select {
        case <- ctx.Done():
            fmt.Println("父級1超時退出,當前count值為", count, "當前時間:", time.Now())
        }
        time.Sleep(time.Second * 3)
    }
}
<br>// 改進後的任務B,即使計算出了結果,也不會再向ch1寫資料了,不會造成髒資料
func g1 (ctx context.Context, num int) {
    fmt.Println("g1 num", num, "time", time.Now())
    select {
    case <-ctx.Done():
        fmt.Println("子級 g1關閉, 不向channel中寫資料")
        return
    default:
        ch1 <- num
    }
}
<br>// 改進前的任務C
func g2 (ctx context.Context, num int) {
    fmt.Println("g2 num", num, "time", time.Now())
    ch2 <- num

基於上述程式碼,子任務B, C的處理其實有一次較大的變動。一開始B,C都是類似於子任務C,即g2的這種寫法。

這種寫法在執行完成後就把自身的結果交給channel, 父groutine通過channel來讀取資料,正常情況下也能工作。但異常情況下,如子任務B執行完成,子任務C(即g2)因為網路通訊等原因執行了5s(超過context的最大時長), 就會出現比較嚴重的問題。到達超時時間後,A檢測到了超時就自動結束了本次任務,但g2還在執行過程中。g2執行完成後向ch2寫資料阻塞了(因為A已關閉,沒有讀取ch2的groutine)。下一個迴圈中A再次開啟讀取ch1與ch2, 實際上讀取ch1是當次的結果,ch2是上次任務中g2返回的結果,導致兩處依賴的資料來源不一致。

模擬上述情況,將g2做了一些改動如下:

// 在第3次任務重等待3s, 使得它超時<br>func g2 (ctx context.Context, num int) {
    if num == 3 {
        time.Sleep(time.Second * 3)
    }
    fmt.Println("g2 num", num, "time", time.Now())
    ch2 <- num
}

實際上,如果想要通過context控制groutine, 一定要監控Done()方法。如g1所示。相同情況下A超時退出,C仍在執行。C執行完成後先檢測Context是否已退出,如果已退出就不再向ch2中寫入本次的資料了。(拋磚引玉了,也可能有更好的寫法,希望大佬不吝賜教)

將g2改成和g1類似的寫法後測試結果如下:

func g2 (ctx context.Context, num int) {
    if num == 3 {
        time.Sleep(time.Second * 10)
        fmt.Println("這次g2 超時,應當g1, g2都不返回")
    }
    fmt.Println("g2 num", num, "time", time.Now())
    select {
    case <-ctx.Done():
        fmt.Println("子級 g2關閉, 不向channel中寫資料")
        return
    default:
        ch2 <- num
    }
}

總結

到此這篇關於Go中groutine通訊與context控制的文章就介紹到這了,更多相關Go groutine通訊與context控制內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com