<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
channel是golang中標誌性的概念之一,很好很強大!
channel(通道),顧名思義,是一種通道,一種用於並行環境中資料傳遞的通道。通常結合golang中另一重要概念goroutine(go協程)使用,使得在golang中的並行程式設計變得清晰簡潔同時又高效強大。
今天嘗試著讀讀golang對channel的實現原始碼,本文主要是自己個人對於Channel原始碼的學習筆記,需要的朋友可以參考以下內容,希望對大家有幫助。
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx: uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
hchan結構就是channel的底層資料結構,看原始碼定義,可以說是非常清晰了。
其中關於recvq和sendq的兩個列表所用的結構waitq簡單看下。
type waitq struct { first *sudog last *sudog } type sudog struct { g *g selectdone *uint32 // CAS to 1 to win select race (may point to stack) next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) ... c *hchan // channel }
可以看出waiq是一個雙向連結串列結構,鏈上的節點是sudog。從sudog的結構定義可以粗略看出,sudog是對g(即協程)的一個封裝。用於記錄一個等待在某個channel上的協程g、等待的元素elem等資訊。
func makechan(t *chantype, size int64) *hchan { elem := t.elem // compiler checks this but be safe. if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size) { panic(plainError("makechan: size out of range")) } var c *hchan if elem.kind&kindNoPointers != 0 || size == 0 { // Allocate memory in one call. // Hchan does not contain pointers interesting for GC in this case: // buf points into the same allocation, elemtype is persistent. // SudoG's are referenced from their owning thread so they can't be collected. // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true)) if size > 0 && elem.size != 0 { c.buf = add(unsafe.Pointer(c), hchanSize) } else { // race detector uses this location for synchronization // Also prevents us from pointing beyond the allocation (see issue 9401). c.buf = unsafe.Pointer(c) } } else { c = new(hchan) c.buf = newarray(elem, int(size)) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "n") } return c }
第一部分的3個if是對初始化引數的合法性檢查。
if elem.size >= 1<<16:
檢查channel元素大小,小於2位元組
if hchanSize%maxAlign != 0 || elem.align > maxAlign
沒看懂(對齊?)
if size < 0 || int64(uintptr(size)) != size || (elem.size > 0 && uintptr(size) > (_MaxMem-hchanSize)/elem.size)
第一個判斷快取大小需要大於等於0
int64(uintptr(size)) != size這一句實際是用於判斷size是否為負數。由於uintptr實際是一個無符號整形,負數經過轉換後會變成一個與原數完全不同的很大的正整數,而正數經過轉換後並沒有變化。
最後一句判斷channel的快取大小要小於heap中能分配的大小。_MaxMem是可分配的堆大小。
第二部分是具體的記憶體分配。
元素型別為kindNoPointers的時候,既非指標型別,則直接分配(hchanSize+uintptr(size)*elem.size)大小的連續空間。c.buf指向hchan後面的elem佇列首地址。
如果channel快取大小為0,則c.buf實際上是沒有給他分配空間的
如果型別為非kindNoPointers,則channel的空間和buf的空間是分別分配的。
// entry point for c <- x from compiled code //go:nosplit func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc(unsafe.Pointer(&c))) }
channel傳送,即協程向channel中傳送資料,與此操作對應的go程式碼如c <- x。
channel傳送的實現原始碼中,通過chansend1(),呼叫chansend(),其中block引數為true。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { if !block { return false } gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2) throw("unreachable") } ... }
chansend()首先對c進行判斷, if c == nil:即channel沒有被初始化,這個時候會直接呼叫gopark使得當前協程進入等待狀態。而且用於喚醒的引數unlockf傳的nil,即沒有人來喚醒它,這樣系統進入死鎖。所以channel必須被初始化之後才能使用,否則死鎖。
接下來是正式的傳送處理,且後續操作會加鎖。
lock(&c.lock)
close判斷
if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
如果channel已經是closed狀態,解鎖然後直接panic。也就是說我們不可以向已經關閉的通道內在傳送資料。
將資料發給接收協程
if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
嘗試從接收等待協程佇列中取出一個協程,如果有則直接資料發給它。也就是說傳送到channel的資料會優先檢查接收等待佇列,如果有協程等待取數,就直接給它。發完解鎖,操作完成。
這裡send()方法會將資料寫到從佇列裡取出來的sg中,通過goready()喚醒sg.g(即等待的協程),進行後續處理。
資料放到快取
if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
如果沒有接收協程在等待,則去檢查channel的快取佇列是否還有空位。如果有空位,則將資料放到快取佇列中。
通過c.sendx遊標找到佇列中的空餘位置,然後將資料存進去。移動遊標,更新資料,然後解鎖,操作完成。
if c.sendx == c.dataqsiz { c.sendx = 0 }
通過這一段遊標的處理可以看出,快取佇列是一個環形。
阻塞傳送協程
gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.selectdone = nil mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
如果快取也慢了,這時候就只能阻塞住傳送協程了, 等有合適的機會了,再將資料傳送出去。
getg()獲取當前協程物件g的指標,acquireSudog()生成一個sudog,然後將當前協程及相關資料封裝好連結到sendq列表中。然年通過goparkunlock()將其轉為等待狀態,並解鎖。操作完成。
// entry points for <- c from compiled code //go:nosplit func chanrecv1(c *hchan, elem unsafe.Pointer) { chanrecv(c, elem, true) }
channel接收,即協程從channel中接收資料,與此操作對應的go程式碼如<- c。
channel接收的實現原始碼中,通過chanrecv1(),呼叫chanrecv(),其中block引數為true。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if c == nil { if !block { return } gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2) throw("unreachable") } ... }
同傳送一樣,接收也會首先檢查c是否為nil,如果為nil,會呼叫gopark()休眠當前協程,從而最終造成死鎖。
接收操作同樣先進行加鎖,然後開始正式操作。
close處理
if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(unsafe.Pointer(c)) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
接收和傳送略有不同,當channel關閉並且channel的快取佇列裡沒有資料了,那麼接收動作會直接結束,但不會報錯。
也就是說,允許從已關閉的channel中接收資料。
從傳送等待協程中接收
if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
嘗試從傳送等待協程列表中取出一個等待協程,如果存在,則呼叫recv()方法接收資料。
這裡的recv()方法比send()方法稍微複雜一點,我們簡單分析下。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if c.dataqsiz == 0 { ... if ep != nil { // copy data from sender recvDirect(c.elemtype, sg, ep) } } else { qp := chanbuf(c, c.recvx) ... // copy data from queue to receiver if ep != nil { typedmemmove(c.elemtype, ep, qp) } // copy data from sender to queue typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
recv()的接收動作分為兩種情況:
channel必須初始化後才能使用;
channel關閉後,不允許在傳送資料,但是還可以繼續從中接收未處理完的資料。所以儘量從傳送端關閉channel;
無快取的channel需要注意在一個協程中的操作不會造成死鎖;
到此這篇關於Golang通道channel的原始碼分析的文章就介紹到這了,更多相關Golang通道channel內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45