首頁 > 軟體

go並行程式設計sync.Cond使用場景及實現原理

2022-08-31 14:02:42

使用場景

sync.Cond是go標準庫提供的一個條件變數,用於控制一組goroutine在滿足特定條件下被喚醒。

sync.Cond常用於一組goroutine等待,一個goroutine通知(事件發生)的場景。如果只有一個goroutine等待,一個goroutine通知(事件發生),使用Mutex或者Channel就可以實現。

可以用一個全域性變數標誌特定條件condition,每個sync.Cond都必須要關聯一個互斥鎖(Mutex或者RWMutex),當condition發生變更或者呼叫Wait時,都必須加鎖,保證多個goroutine安全地存取condition。

下面是go標準庫http中關於pipe的部分實現,我們可以看到,pipe使用sync.Cond來控制管道中位元組流的寫入和讀取,在pipe中資料可用並且位元組流複製到pipe的緩衝區之前,所有的需要讀取該管道資料的goroutine都必須等待,直到資料準備完成。

type pipe struct {
   mu       sync.Mutex
   c        sync.Cond     // c.L lazily initialized to &p.mu
   b        pipeBuffer    // nil when done reading
   ...
}
// Read waits until data is available and copies bytes
// from the buffer into p.
func (p *pipe) Read(d []byte) (n int, err error) {
   p.mu.Lock()
   defer p.mu.Unlock()
   if p.c.L == nil {
      p.c.L = &p.mu
   }
   for {
      ...
      if p.b != nil && p.b.Len() > 0 {
         return p.b.Read(d)
      }
      ...
      p.c.Wait() // write未完成前呼叫Wait進入等待
   }
}
// Write copies bytes from p into the buffer and wakes a reader.
// It is an error to write more data than the buffer can hold.
func (p *pipe) Write(d []byte) (n int, err error) {
   p.mu.Lock()
   defer p.mu.Unlock()
   if p.c.L == nil {
      p.c.L = &p.mu
   }
   defer p.c.Signal() // 喚醒所有等待的goroutine
   if p.err != nil {
      return 0, errClosedPipeWrite
   }
   if p.breakErr != nil {
      p.unread += len(d)
      return len(d), nil // discard when there is no reader
   }
   return p.b.Write(d)
}

實現原理

type Cond struct {
   noCopy noCopy       // 用來保證結構體無法在編譯期間拷貝
   // L is held while observing or changing the condition
   L Locker             // 用來保證condition變更安全
   notify  notifyList   // 待通知的goutine列表
   checker copyChecker  // 用於禁止執行期間發生的拷貝
}
type notifyList struct {
   wait   uint32      // 正在等待的goroutine的ticket
   notify uint32      // 已經通知到的goroutine的ticket
   lock   uintptr // key field of the mutex
   head   unsafe.Pointer     // 連結串列頭部
   tail   unsafe.Pointer     // 連結串列尾部
}

copyChecker

copyChecker是一個指標型別,在建立時,它的值指向自身地址,用於檢測該物件是否發生了拷貝。如果發生了拷貝,則直接panic。

// copyChecker holds back pointer to itself to detect object copying.
type copyChecker uintptr
func (c *copyChecker) check() {
   if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
      !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
      uintptr(*c) != uintptr(unsafe.Pointer(c)) {
      panic("sync.Cond is copied")
   }
}

Wait

呼叫 Wait 會自動釋放鎖 c.L,並掛起呼叫者所在的 goroutine,因此當前協程會阻塞在 Wait 方法呼叫的地方。如果其他協程呼叫了 Signal 或 Broadcast 喚醒了該協程,那麼 Wait 方法在結束阻塞時,會重新給 c.L 加鎖,並且繼續執行 Wait 後面的程式碼。

對條件的檢查,使用了 for !condition() 而非 if,是因為當前協程被喚醒時,條件不一定符合要求,需要再次 Wait 等待下次被喚醒。為了保險起見,使用 for 能夠確保條件符合要求後,再執行後續的程式碼。

func (c *Cond) Wait() {
   c.checker.check()
   t := runtime_notifyListAdd(&c.notify)
   c.L.Unlock()
   runtime_notifyListWait(&c.notify, t)
   c.L.Lock()
}
  • 檢查Cond是否被複制,如果被複制,直接panic;
  • 呼叫runtime_notifyListAdd呼叫者新增到通知列表並解鎖,以便可以接收到通知,然後將返回的ticket傳入到runtime_notifyListWait來等待通知。
  • 當前goroutine會阻塞在wait呼叫的地方,直到其他goroutine呼叫Signal或Broadcast喚醒該協程。
func notifyListAdd(l *notifyList) uint32 {
    return atomic.Xadd(&l.wait, 1) - 1
}

notifyListWait會將當前goroutine追加到連結串列的尾端,同時呼叫goparkunlock讓當前goroutine陷入休眠,該方法會直接讓出當前處理器的使用權並等待排程器的喚醒。

func notifyListWait(l *notifyList, t uint32) {
    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    if l.tail == nil {
       l.head = s
    } else {
       l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    releaseSudog(s)
}

Signal

Signal會喚醒佇列最前面的Goroutine。

func (c *Cond) Signal() {
   c.checker.check()
   runtime_notifyListNotifyOne(&c.notify)
}
func notifyListNotifyOne(l *notifyList) {
   t := l.notify
   atomic.Store(&l.notify, t+1)
   for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
      if s.ticket == t {
         n := s.next
         if p != nil {
            p.next = n
         } else {
            l.head = n
         }
         if n == nil {
            l.tail = p
         }
         s.next = nil
         readyWithTime(s, 4)
         return
      }
   }
}

Broadcast

Broadcast會喚醒佇列中全部的goroutine。

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}
func notifyListNotifyAll(l *notifyList) {
   s := l.head
   l.head = nil
   l.tail = nil
   atomic.Store(&l.notify, atomic.Load(&l.wait))
   for s != nil {
      next := s.next
      s.next = nil
      readyWithTime(s, 4)
      s = next
   }
}

以上就是go並行程式設計sync.Cond使用場景及實現原理的詳細內容,更多關於go並行程式設計sync.Cond的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com