<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
gRPC
四種基本使用
常見的gRPC
呼叫寫法
func main(){ //... some code // 連結grpc服務 conn , err := grpc.Dial(":8000",grpc.WithInsecure) if err != nil { //...log } defer conn.Close() //...some code
存在的問題:面臨高並行的情況,效能問題很容易就會出現,例如我們在做效能測試的時候,就會發現,打一會效能測試,使用者端請求伺服器端的時候就會報錯:
rpc error: code = Unavailable desc = all SubConns are in TransientFailure, latest connection error: connection error: desc = "transport: Error while dialing dial tcp xxx:xxx: connect: connection refused
實際去檢視問題的時候,很明顯,這是 gRPC 的連線數被打滿了,很多連線都還未完全釋放。[#本文來源:janrs.com#]
gRPC
的通訊本質上也是 TCP
的連線,那麼一次連線就需要三次握手,和四次揮手,每一次建立連線和釋放連線的時候,都需要走這麼一個過程,如果我們頻繁的建立和釋放連線,這對於資源和效能其實都是一個大大的浪費。
在伺服器端,gRPC
伺服器端的連結管理不用我們操心,但是 gRPC
使用者端的連結管理非常有必要關心,要實現複用使用者端的連線。
建立連結池需要考慮的問題:
type Pool interface { // 獲取一個新的連線 , 當關閉連線的時候,會將該連線放入到池子中 Get() (Conn, error) // 關閉連線池,自然連線池子中的連線也不再可用 Close() error //[#本文來源:janrs.com#] Status() string }
建立連結池程式碼
func New(address string, option Options) (Pool, error) { if address == "" { return nil, errors.New("invalid address settings") } if option.Dial == nil { return nil, errors.New("invalid dial settings") } if option.MaxIdle <= 0 || option.MaxActive <= 0 || option.MaxIdle > option.MaxActive { return nil, errors.New("invalid maximum settings") } if option.MaxConcurrentStreams <= 0 { return nil, errors.New("invalid maximun settings") } p := &pool{ index: 0, current: int32(option.MaxIdle), ref: 0, opt: option, conns: make([]*conn, option.MaxActive), address: address, closed: 0, } for i := 0; i < p.opt.MaxIdle; i++ { c, err := p.opt.Dial(address) if err != nil { p.Close() return nil, fmt.Errorf("dial is not able to fill the pool: %s", err) } p.conns[i] = p.wrapConn(c, false) } log.Printf("new pool success: %vn", p.Status()) return p, nil }
關於以上的程式碼,需要特別注意每一個連線的建立也是在 New
裡面完成的,[#本文來源:janrs.com#]只要有 1
個連線未建立成功,那麼咱們的連線池就算是建立失敗,咱們會呼叫 p.Close()
將之前建立好的連線全部釋放掉。
關閉連結池程式碼
// 關閉連線池 func (p *pool) Close() error { atomic.StoreInt32(&p.closed, 1) atomic.StoreUint32(&p.index, 0) atomic.StoreInt32(&p.current, 0) atomic.StoreInt32(&p.ref, 0) p.deleteFrom(0) log.Printf("[janrs.com]close pool success: %vn", p.Status()) return nil }
從具體位置刪除連結池程式碼
// 清除從 指定位置開始到 MaxActive 之間的連線 func (p *pool) deleteFrom(begin int) { for i := begin; i < p.opt.MaxActive; i++ { p.reset(i) } }
銷燬具體的連結程式碼
// 清除具體的連線 func (p *pool) reset(index int) { conn := p.conns[index] if conn == nil { return } conn.reset() p.conns[index] = nil }
程式碼
func (c *conn) reset() error { cc := c.cc c.cc = nil c.once = false // 本文部落格來源:janrs.com if cc != nil { return cc.Close() } return nil } func (c *conn) Close() error { c.pool.decrRef() if c.once { return c.reset() } return nil }
在使用連線池通過 pool.Get()
拿到具體的連線控制程式碼 conn
之後,會使用 conn.Close()
關閉連線,實際上也是會走到上述的 Close()
實現的位置,但是並未指定當然也沒有許可權顯示的指定將 once
置位為 false
,也就是對於呼叫者來說,是關閉了連線,對於連線池來說,實際上是將連線歸還到連線池中。
關鍵程式碼
func (p *pool) Get() (Conn, error) { // the first selected from the created connections nextRef := p.incrRef() p.RLock() current := atomic.LoadInt32(&p.current) p.RUnlock() if current == 0 { return nil, ErrClosed } if nextRef <= current*int32(p.opt.MaxConcurrentStreams) { next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil } // 本文部落格來源:janrs.com // the number connection of pool is reach to max active if current == int32(p.opt.MaxActive) { // the second if reuse is true, select from pool's connections if p.opt.Reuse { next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil } // the third create one-time connection c, err := p.opt.Dial(p.address) return p.wrapConn(c, true), err } // the fourth create new connections given back to pool p.Lock() current = atomic.LoadInt32(&p.current) if current < int32(p.opt.MaxActive) && nextRef > current*int32(p.opt.MaxConcurrentStreams) { // 2 times the incremental or the remain incremental ##janrs.com increment := current if current+increment > int32(p.opt.MaxActive) { increment = int32(p.opt.MaxActive) - current } var i int32 var err error for i = 0; i < increment; i++ { c, er := p.opt.Dial(p.address) if er != nil { err = er break } p.reset(int(current + i)) p.conns[current+i] = p.wrapConn(c, false) } // 本文部落格來源:janrs.com current += i log.Printf("#janrs.com#grow pool: %d ---> %d, increment: %d, maxActive: %dn", p.current, current, increment, p.opt.MaxActive) atomic.StoreInt32(&p.current, current) if err != nil { p.Unlock() return nil, err } } p.Unlock() next := atomic.AddUint32(&p.index, 1) % uint32(current) return p.conns[next], nil }
Get
程式碼邏輯
current*int32(p.opt.MaxConcurrentStreams)
範圍內,那麼直接取連線進行使用即可。option
中的 reuse
引數是否是 true
,若是複用,則隨機取出連線池中的任意連線提供使用,如果不復用,則新建一個連線。2
倍或者 1
倍的數量對連線池進行擴容了。也可以在 Get
的實現上進行縮容,具體的縮容策略可以根據實際情況來定,例如當參照計數 nextRef
只有當前活躍連線數的 10%
的時候(這只是一個例子),就可以考慮縮容了。
有關連結池的建立以及效能測試
以上就是Go建立Grpc連結池實現過程詳解的詳細內容,更多關於Go建立Grpc連結池的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45