首頁 > 軟體

Go並行4種方法簡明講解

2022-04-06 19:00:23

一、goroutine

1、協程(Coroutine)

Golang 在語言層面對並行程式設計進行了支援,使用了一種協程(goroutine)機制,

協程本質上是一種使用者態執行緒,不需要作業系統來進行搶佔式排程,但是又寄生於執行緒中,因此係統開銷極小,可以有效的提高執行緒的任務並行性,而避免多執行緒的缺點。但是協程需要語言上的支援,需要使用者自己實現排程器,因為在Go語言中,實現了排程器所以我們可以很方便的能過 go關鍵字來使用協程。

func main() {
	for i := 0; i <10; i++ {
		go func(i int) {
			for  {
				fmt.Printf("Hello goroutine %dn",i)
			}
		}(i)
	}
	time.Sleep(time.Millisecond)
}

最簡單的一個並行程式設計小例子,並行輸出一段話。

我們同時開了10個協程進行輸出,每次在fmt.printf時交出控制權(不一定每次都會交出控制權),回到排程器中,再由排程器分配。

2、goroutine 可能切換的點

  • I/O,Select
  • channel
  • 等待鎖
  • 函數呼叫
  • runtime.Gosched()

我們看一個小例子:

func main() {
	var a [10]int
	for i := 0; i <10; i++ {
		go func(i int) {
			for  {
				a[i]++
			}
		}(i)
	}
	time.Sleep(time.Millisecond)
	fmt.Println(a)
}

在這裡,程式碼直接鎖死,程式沒有退出,因為在執行函數中沒有協程的切換,因為 main函數也是一個協程。

如果想要程式退出,可以通過 runtime.Gosched()函數,在執行函數中新增一行。

for  {
  a[i]++
  runtime.Gosched()
}

加上這個函數之後,程式碼是可以正常執行了,但是真的是正常執行嗎?不一定,我們可以使用 -reac命令來看一下資料是否有衝突:

這說明資料還是有衝突的,陣列a中的元素一邊在做自增,一邊在輸出。解決這個問題,我們只能使用 channel 來解決。

二、Channel

Channel 中 Go語言在語言級別提供了對 goroutine 之間通訊的支援,我們可以使用 channel 在兩個或者多個goroutine之間進行資訊傳遞,能過 channel 傳遞對像的過程和呼叫函數時的引數傳遞行為一樣,可以傳遞普通引數和指標。

Channel 有兩種模式:

var ch1 = make(chan int)		// 無緩衝 channel,同步
var ch2 = make(chan int, 2)	// 有緩衝 channel, 非同步

無緩衝的方式,資料進入 channel 只要沒有被接收,就會處在阻塞狀態。

var ch1 = make(chan int)		// 無緩衝 channel,同步
ch1 <- 1
ch1 <- 2
//  error: all goroutines are asleep - deadlock!
fmt.Println(<-ch1)

如果想要執行,必須要再開一個協程不停的去請求資料:

var ch1 = make(chan int)		// 無緩衝 channel,同步
go func() {
  for  {
    n := <-ch1
    fmt.Println(n)
  }
}()
ch1 <- 1
ch1 <- 2

有緩衝的方式,只要緩衝區沒有滿就可以一直進資料,緩衝區在填滿之後沒有接收也會處理阻塞狀態。

func bufferChannel() {
	var ch2 = make(chan int,2)
	ch2<-1
	ch2<-2
	fmt.Println(ch2)
  // 不加這一行的話,是可以正常執行的
	ch2<-3			// error: all goroutines are asleep - deadlock!
}

1、chaanel 指定方向

比如我現在有一個函數建立一個 channel,並且不斷的需要消費channel中的資料:

func worker(ch chan int) {
	for {
		fmt.Printf("hello goroutine worker %dn", <-ch)
	}
}

func createWorker() chan int{
	ch := make(chan int)
	go worker(ch)
	return ch
}

func main() {
	ch := createWorker()
	ch<-1
	ch<-2
	ch<-3
	time.Sleep(time.Millisecond)
}

這個函數我是要給別人用的,但是我怎麼保證使用 createWorker 函數建立的 channel 都是往裡面傳入資料的呢?

如果外面有人消費了這個 channel 中的資料,我們怎麼限制?

這個時候,我們就可以給返回的channel 加上方向,指明這個 channel 中能往裡傳入資料,不能從中取資料:

func worker(ch <-chan int) {
	for {
		fmt.Printf("hello goroutine worker %dn", <-ch)
	}
}

func createWorker() chan<- int{
	ch := make(chan int)
	go worker(ch)
	return ch
}

我們可以在返回 channel 的地方加上方向,指明返回的函數只能是一個往裡傳入資料,不能從中取資料。

並且我們還可以給專門消費的函數加上一個方向,指明這個函數只能出不能進。

2、channel 關閉

在使用 channel 的時候,隨說我們可以等待channel中的函數使用完之後自己結束,或者等待 main 函數結束時關閉所有的 goroutine 函數,但是這樣的方式顯示不夠優雅。

當一個資料我們明確知道他的結束時候,我們可以傳送一個關閉資訊給這個 channel ,當這個 channel 接收到這個訊號之後,自己關閉。

// 方法一
func worker(ch <-chan int) {
	for {
		if c ,ok := <- ch;ok{
			fmt.Printf("hello goroutine worker %dn", c)
		}else {
			break
		}
	}
}
// 方法二
func worker(ch <-chan int) {
	for c := range ch{
		fmt.Printf("hello goroutine worker %dn", c)
	}
}

func main() {
	ch := createWorker()
	ch<-1
	ch<-2
	ch<-3
	close(ch)
	time.Sleep(time.Millisecond)
}

通過 Closeb函數,我們可以能過 channel 已經關閉,並且我們還可以通過兩種方法判斷通道內是否還有值。

三、Select

當我們在實際開發中,我們一般同時處理兩個或者多個 channel 的資料,我們想要完成一個那個 channel 先來資料,我們先來處理個那 channel 怎麼辦呢?

此時,我們就可以使用 select 排程:

func genInt() chan int {
	ch := make(chan int)
	go func() {
		i := 0
		for {
			// 隨機兩秒以內生成一次資料
			time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)
			ch <- i
			i++
		}
	}()
	return ch
}

func main() {
	var c1 = genInt()
	var c2 = genInt()
	for {
		select {
		case n := <-c1:
			fmt.Printf("server 1 generator %dn", n)
		case n := <- c2:
			fmt.Printf("server 2 generator %dn", n)
		}
	}
}

1、定時器

	for {
		tick := time.Tick(time.Second)
		select {
		case n := <-c1:
			fmt.Printf("server 1 generator %dn", n)
		case n := <-c2:
			fmt.Printf("server 2 generator %dn", n)
		case <-tick:
			fmt.Println("定時每秒輸出一次!")
		}
	}

2、超時

	for {
		tick := time.Tick(time.Second)
		select {
		case n := <-c1:
			fmt.Printf("server 1 generator %dn", n)
		case n := <-c2:
			fmt.Printf("server 2 generator %dn", n)
		case <-tick:
			fmt.Println("定時每秒輸出一次!")
		case <-time.After(1300 * time.Millisecond): // 如果 1.3秒內沒有資料進來,那麼就輸出超時
			fmt.Println("timeout")
		}
	}

四、傳統的並行控制

1、sync.Mutex

type atomicInt struct {
	value int
	lock sync.Mutex
}

func (a *atomicInt) increment() {
	a.lock.Lock()
	defer a.lock.Unlock()		// 使用 defer 解鎖,以防忘記
	a.value++
}

func main() {
	var a atomicInt
	a.increment()
	go func() {
		a.increment()
	}()
	time.Sleep(time.Millisecond)
	fmt.Println(a.value)
}

2、sync.WaitGroup

type waitGrouInt struct {
	value int
	wg sync.WaitGroup
}

func (w *waitGrouInt) addInt() {
	w.wg.Add(1)
	w.value++
}

func main() {
	var w waitGrouInt
	for i := 0; i < 10; i++ {
		w.addInt()
		w.wg.Done()
	}
	w.wg.Wait()
	fmt.Println(w.value)
}

 更多關於Go並行簡明講解請檢視下面的相關連結


IT145.com E-mail:sddin#qq.com