<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
本篇文章來分析一下 Go 語言 HTTP 標準庫是如何實現的。
本文使用的go的原始碼1.15.7
基於HTTP構建的服務標準模型包括兩個端,使用者端(Client
)和伺服器端(Server
)。HTTP 請求從使用者端發出,伺服器端接受到請求後進行處理然後將響應返回給使用者端。所以http伺服器的工作就在於如何接受來自使用者端的請求,並向用戶端返回響應。
一個典型的 HTTP 服務應該如圖所示:
在 Go 中可以直接通過 HTTP 包的 Get 方法來發起相關請求資料,一個簡單例子:
func main() { resp, err := http.Get("http://httpbin.org/get?name=luozhiyun&age=27") if err != nil { fmt.Println(err) return } defer resp.Body.Close() body, _ := ioutil.ReadAll(resp.Body) fmt.Println(string(body)) }
我們下面通過這個例子來進行分析。
HTTP 的 Get 方法會呼叫到 DefaultClient 的 Get 方法,DefaultClient 是 Client 的一個空範例,所以最後會呼叫到 Client 的 Get 方法:
type Client struct { Transport RoundTripper CheckRedirect func(req *Request, via []*Request) error Jar CookieJar Timeout time.Duration }
Client 結構體總共由四個欄位組成:
Transport:表示 HTTP 事務,用於處理使用者端的請求連線並等待伺服器端的響應;
CheckRedirect:用於指定處理重定向的策略;
Jar:用於管理和儲存請求中的 cookie;
Timeout:指定使用者端請求的最大超時時間,該超時時間包括連線、任何的重定向以及讀取相應的時間;
func (c *Client) Get(url string) (resp *Response, err error) { // 根據方法名、URL 和請求體構建請求 req, err := NewRequest("GET", url, nil) if err != nil { return nil, err } // 執行請求 return c.Do(req) }
我們要發起一個請求首先需要根據請求型別構建一個完整的請求頭、請求體、請求引數。然後才是根據請求的完整結構來執行請求。
NewRequest 會呼叫到 NewRequestWithContext 函數上。這個函數會根據請求返回一個 Request 結構體,它裡面包含了一個 HTTP 請求所有資訊。
Request 結構體有很多欄位,我這裡列舉幾個大家比較熟悉的欄位:
NewRequestWithContext
func NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error) { ... // parse url u, err := urlpkg.Parse(url) if err != nil { return nil, err } rc, ok := body.(io.ReadCloser) if !ok && body != nil { rc = ioutil.NopCloser(body) } u.Host = removeEmptyPort(u.Host) req := &Request{ ctx: ctx, Method: method, URL: u, Proto: "HTTP/1.1", ProtoMajor: 1, ProtoMinor: 1, Header: make(Header), Body: rc, Host: u.Host, } ... return req, nil }
NewRequestWithContext 函數會將請求封裝成一個 Request 結構體並返回。
如上圖所示,Client 呼叫 Do 方法處理傳送請求最後會呼叫到 send 函數中。
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) { resp, didTimeout, err = send(req, c.transport(), deadline) if err != nil { return nil, didTimeout, err } ... return resp, nil, nil }
Client 的 send 方法在呼叫 send 函數進行下一步的處理前會先呼叫 transport 方法獲取 DefaultTransport 範例,該範例如下:
var DefaultTransport RoundTripper = &Transport{ // 定義 HTTP 代理策略 Proxy: ProxyFromEnvironment, DialContext: (&net.Dialer{ Timeout: 30 * time.Second, KeepAlive: 30 * time.Second, DualStack: true, }).DialContext, ForceAttemptHTTP2: true, // 最大空閒連線數 MaxIdleConns: 100, // 空閒連線超時時間 IdleConnTimeout: 90 * time.Second, // TLS 握手超時時間 TLSHandshakeTimeout: 10 * time.Second, ExpectContinueTimeout: 1 * time.Second, }
Transport 實現 RoundTripper 介面,該結構體會傳送 http 請求並等待響應。
type RoundTripper interface { RoundTrip(*Request) (*Response, error) }
從 RoundTripper 介面我們也可以看出,該介面定義的 RoundTrip 方法會具體的處理請求,處理完畢之後會響應 Response。
回到我們上面的 Client 的 send 方法中,它會呼叫 send 函數,這個函數主要邏輯都交給 Transport 的 RoundTrip 方法來執行。
RoundTrip 會呼叫到 roundTrip 方法中:
func (t *Transport) roundTrip(req *Request) (*Response, error) { t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) ctx := req.Context() trace := httptrace.ContextClientTrace(ctx) ... for { select { case <-ctx.Done(): req.closeBody() return nil, ctx.Err() default: } // 封裝請求 treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey} cm, err := t.connectMethodForRequest(treq) if err != nil { req.closeBody() return nil, err } // 獲取連線 pconn, err := t.getConn(treq, cm) if err != nil { t.setReqCanceler(cancelKey, nil) req.closeBody() return nil, err } // 等待響應結果 var resp *Response if pconn.alt != nil { // HTTP/2 path. t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest resp, err = pconn.alt.RoundTrip(req) } else { resp, err = pconn.roundTrip(treq) } if err == nil { resp.Request = origReq return resp, nil } ... } }
roundTrip 方法會做兩件事情:
getConn 有兩個階段:
呼叫 queueForIdleConn 獲取空閒 connection;呼叫 queueForDial 等待建立新的 connection;
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) { req := treq.Request trace := treq.trace ctx := req.Context() if trace != nil && trace.GetConn != nil { trace.GetConn(cm.addr()) } // 將請求封裝成 wantConn 結構體 w := &wantConn{ cm: cm, key: cm.key(), ctx: ctx, ready: make(chan struct{}, 1), beforeDial: testHookPrePendingDial, afterDial: testHookPostPendingDial, } defer func() { if err != nil { w.cancel(t, err) } }() // 獲取空閒連線 if delivered := t.queueForIdleConn(w); delivered { pc := w.pc ... t.setReqCanceler(treq.cancelKey, func(error) {}) return pc, nil } // 建立連線 t.queueForDial(w) select { // 獲取到連線後進入該分支 case <-w.ready: ... return w.pc, w.err ... }
成功獲取到空閒 connection:
成功獲取 connection 分為如下幾步:
獲取不到空閒 connection:
當獲取不到空閒 connection 時:
從上面的圖解應該就很能看出這一步會怎麼操作了,這裡簡要的分析一下程式碼,讓大家更清楚裡面的邏輯:
func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) { if t.DisableKeepAlives { return false } t.idleMu.Lock() defer t.idleMu.Unlock() t.closeIdle = false if w == nil { return false } // 計算空閒連線超時時間 var oldTime time.Time if t.IdleConnTimeout > 0 { oldTime = time.Now().Add(-t.IdleConnTimeout) } // Look for most recently-used idle connection. // 找到key相同的 connection 列表 if list, ok := t.idleConn[w.key]; ok { stop := false delivered := false for len(list) > 0 && !stop { // 找到connection列表最後一個 pconn := list[len(list)-1] // 檢查這個 connection 是不是等待太久了 tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime) if tooOld { go pconn.closeConnIfStillIdle() } // 該 connection 被標記為 broken 或 閒置太久 continue if pconn.isBroken() || tooOld { list = list[:len(list)-1] continue } // 嘗試將該 connection 寫入到 w 中 delivered = w.tryDeliver(pconn, nil) if delivered { // 操作成功,需要將 connection 從空閒列表中移除 if pconn.alt != nil { } else { t.idleLRU.remove(pconn) list = list[:len(list)-1] } } stop = true } if len(list) > 0 { t.idleConn[w.key] = list } else { // 如果該 key 對應的空閒列表不存在,那麼將該key從字典中移除 delete(t.idleConn, w.key) } if stop { return delivered } } // 如果找不到空閒的 connection if t.idleConnWait == nil { t.idleConnWait = make(map[connectMethodKey]wantConnQueue) } // 將該 wantConn 加入到 等待獲取空閒 connection 字典中 q := t.idleConnWait[w.key] q.cleanFront() q.pushBack(w) t.idleConnWait[w.key] = q return false }
上面的註釋已經很清楚了,我這裡就不再解釋了。
在獲取不到空閒連線之後,會嘗試去建立連線,從上面的圖大致可以看到,總共分為以下幾個步驟:
dialConnFor 方法首先會呼叫 dialConn 方法建立 TCP 連線,然後啟動兩個非同步執行緒來處理讀寫資料,然後呼叫 tryDeliver 將連線繫結到 wantConn 上面。
下面進行程式碼分析:
func (t *Transport) queueForDial(w *wantConn) { w.beforeDial() // 小於零說明無限制,非同步建立連線 if t.MaxConnsPerHost <= 0 { go t.dialConnFor(w) return } t.connsPerHostMu.Lock() defer t.connsPerHostMu.Unlock() // 每個 host 建立的連線數沒達到上限,非同步建立連線 if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { if t.connsPerHost == nil { t.connsPerHost = make(map[connectMethodKey]int) } t.connsPerHost[w.key] = n + 1 go t.dialConnFor(w) return } //每個 host 建立的連線數已達到上限,需要進入等待佇列 if t.connsPerHostWait == nil { t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue) } q := t.connsPerHostWait[w.key] q.cleanFront() q.pushBack(w) t.connsPerHostWait[w.key] = q }
這裡主要進行引數校驗,如果最大連線數限制為零,亦或是每個 host 建立的連線數沒達到上限,那麼直接非同步建立連線。
dialConnFor
func (t *Transport) dialConnFor(w *wantConn) { defer w.afterDial() // 建立連線 pc, err := t.dialConn(w.ctx, w.cm) // 連線繫結 wantConn delivered := w.tryDeliver(pc, err) // 建立連線成功,但是繫結 wantConn 失敗 // 那麼將該連線放置到空閒連線字典或呼叫 等待獲取空閒 connection 字典 中的元素執行 if err == nil && (!delivered || pc.alt != nil) { t.putOrCloseIdleConn(pc) } if err != nil { t.decConnsPerHost(w.key) } }
dialConnFor 會呼叫 dialConn 進行 TCP 連線建立,建立完畢之後呼叫 tryDeliver 方法和 wantConn 進行繫結。
dialConn
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { // 建立連線結構體 pconn = &persistConn{ t: t, cacheKey: cm.key(), reqch: make(chan requestAndChan, 1), writech: make(chan writeRequest, 1), closech: make(chan struct{}), writeErrCh: make(chan error, 1), writeLoopDone: make(chan struct{}), } ... if cm.scheme() == "https" && t.hasCustomTLSDialer() { ... } else { // 建立 tcp 連線 conn, err := t.dial(ctx, "tcp", cm.addr()) if err != nil { return nil, wrapErr(err) } pconn.conn = conn } ... if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { alt := next(cm.targetAddr, pconn.conn.(*tls.Conn)) if e, ok := alt.(http2erringRoundTripper); ok { // pconn.conn was closed by next (http2configureTransport.upgradeFn). return nil, e.err } return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil } } pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) //為每個連線非同步處理讀寫資料 go pconn.readLoop() go pconn.writeLoop() return pconn, nil }
這裡會根據 schema 的不同設定不同的連線設定,我上面顯示的是我們常用的 HTTP 連線的建立過程。對於 HTTP 來說會建立 tcp 連線,然後為連線非同步處理讀寫資料,最後將建立好的連線返回。
這一部分的內容會稍微複雜一些,但確實非常的有趣。
在建立連線的時候會初始化兩個 channel :writech 負責寫入請求資料,reqch負責讀取響應資料。我們在上面建立連線的時候,也提到了會為連線建立兩個非同步迴圈 readLoop 和 writeLoop 來負責處理讀寫資料。
在獲取到連線之後,會呼叫連線的 roundTrip 方法,它首先會將請求資料寫入到 writech 管道中,writeLoop 接收到資料之後就會處理請求。
然後 roundTrip 會將 requestAndChan 結構體寫入到 reqch 管道中,然後 roundTrip 會迴圈等待。readLoop 讀取到響應資料之後就會通過 requestAndChan 結構體中儲存的管道將資料封裝成 responseAndError 結構體回寫,這樣 roundTrip 就可以接受到響應資料結束迴圈等待並返回。
roundTrip
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { ... writeErrCh := make(chan error, 1) // 將請求資料寫入到 writech 管道中 pc.writech <- writeRequest{req, writeErrCh, continueCh} // 用於接收響應的管道 resc := make(chan responseAndError) // 將用於接收響應的管道封裝成 requestAndChan 寫入到 reqch 管道中 pc.reqch <- requestAndChan{ req: req.Request, cancelKey: req.cancelKey, ch: resc, ... } ... for { testHookWaitResLoop() select { // 接收到響應資料 case re := <-resc: if (re.res == nil) == (re.err == nil) { panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) } if debugRoundTrip { req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) } if re.err != nil { return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) } // 返回響應資料 return re.res, nil ... } }
這裡會封裝好 writeRequest 作為傳送請求的資料,並將用於接收響應的管道封裝成 requestAndChan 寫入到 reqch 管道中,然後迴圈等待接受響應。
然後 writeLoop 會進行請求資料 writeRequest :
func (pc *persistConn) writeLoop() { defer close(pc.writeLoopDone) for { select { case wr := <-pc.writech: startBytesWritten := pc.nwrite // 向 TCP 連線中寫入資料,並行送至目標伺服器 err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) ... case <-pc.closech: return } } }
這裡會將從 writech 管道中獲取到的資料寫入到 TCP 連線中,並行送至目標伺服器。
readLoop
func (pc *persistConn) readLoop() { closeErr := errReadLoopExiting // default value, if not changed below defer func() { pc.close(closeErr) pc.t.removeIdleConn(pc) }() ... alive := true for alive { pc.readLimit = pc.maxHeaderResponseSize() // 獲取 roundTrip 傳送的結構體 rc := <-pc.reqch trace := httptrace.ContextClientTrace(rc.req.Context()) var resp *Response if err == nil { // 讀取資料 resp, err = pc.readResponse(rc, trace) } else { err = transportReadFromServerError{err} closeErr = err } ... // 將響應資料寫回到管道中 select { case rc.ch <- responseAndError{res: resp}: case <-rc.callerGone: return } ... } }
這裡是從 TCP 連線中讀取到對應的請求響應資料,通過 roundTrip 傳入的管道再回寫,然後 roundTrip 就會接受到資料並獲取的響應資料返回。
我這裡繼續以一個簡單的例子作為開頭:
func HelloHandler(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Hello World") } func main () { http.HandleFunc("/", HelloHandler) http.ListenAndServe(":8000", nil) }
在實現上面我先用一張圖進行簡要的介紹一下:
其實我們從上面例子的方法名就可以知道一些大致的步驟:
處理器的註冊如上面的例子所示,是通過呼叫 HandleFunc 函數來實現的。
HandleFunc 函數會一直呼叫到 ServeMux 的 Handle 方法中。
func (mux *ServeMux) Handle(pattern string, handler Handler) { mux.mu.Lock() defer mux.mu.Unlock() ... e := muxEntry{h: handler, pattern: pattern} mux.m[pattern] = e if pattern[len(pattern)-1] == '/' { mux.es = appendSorted(mux.es, e) } if pattern[0] != '/' { mux.hosts = true } }
Handle 會根據路由作為 hash 表的鍵來儲存 muxEntry
物件,muxEntry
封裝了 pattern 和 handler。如果路由表示式以'/'
結尾,則將對應的muxEntry
物件加入到[]muxEntry
中。
hash 表是用於路由精確匹配,[]muxEntry
用於部分匹配。
監聽是通過呼叫 ListenAndServe 函數,裡面會呼叫 server 的 ListenAndServe 方法:
func (srv *Server) ListenAndServe() error { if srv.shuttingDown() { return ErrServerClosed } addr := srv.Addr if addr == "" { addr = ":http" } // 監聽埠 ln, err := net.Listen("tcp", addr) if err != nil { return err } // 迴圈接收監聽到的網路請求 return srv.Serve(ln) }
Serve
func (srv *Server) Serve(l net.Listener) error { ... baseCtx := context.Background() ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { // 接收 listener 過來的網路連線 rw, err := l.Accept() ... tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew) // 建立協程處理連線 go c.serve(connCtx) } }
Serve 這個方法裡面會用一個迴圈去接收監聽到的網路連線,然後建立協程處理連線。所以難免就會有一個問題,如果並行很高的話,可能會一次性建立太多協程,導致處理不過來的情況。
處理請求是通過為每個連線建立 goroutine 來處理對應的請求:
func (c *conn) serve(ctx context.Context) { c.remoteAddr = c.rwc.RemoteAddr().String() ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr()) ... ctx, cancelCtx := context.WithCancel(ctx) c.cancelCtx = cancelCtx defer cancelCtx() c.r = &connReader{conn: c} c.bufr = newBufioReader(c.r) c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10) for { // 讀取請求 w, err := c.readRequest(ctx) ... // 根據請求路由呼叫處理器處理請求 serverHandler{c.server}.ServeHTTP(w, w.req) w.cancelCtx() if c.hijacked() { return } w.finishRequest() ... } }
當一個連線建立之後,該連線中所有的請求都將在這個協程中進行處理,直到連線被關閉。在 for 迴圈裡面會迴圈呼叫 readRequest 讀取請求進行處理。
請求處理是通過呼叫 ServeHTTP 進行的:
type serverHandler struct { srv *Server } func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) { handler := sh.srv.Handler if handler == nil { handler = DefaultServeMux } if req.RequestURI == "*" && req.Method == "OPTIONS" { handler = globalOptionsHandler{} } handler.ServeHTTP(rw, req) }
serverHandler 其實就是 Server 包裝了一層。這裡的 sh.srv.Handler
引數實際上是傳入的 ServeMux 範例,所以這裡最後會呼叫到 ServeMux 的 ServeHTTP 方法。
最終會通過 handler 呼叫到 match 方法進行路由匹配:
func (mux *ServeMux) match(path string) (h Handler, pattern string) { v, ok := mux.m[path] if ok { return v.h, v.pattern } for _, e := range mux.es { if strings.HasPrefix(path, e.pattern) { return e.h, e.pattern } } return nil, "" }
這個方法裡首先會利用進行精確匹配,如果匹配成功那麼直接返回;匹配不成功,那麼會根據 []muxEntry
中儲存的和當前路由最接近的已註冊的父節點路由進行匹配,否則繼續匹配下一個父節點路由,直到根路由/
。最後會呼叫對應的處理器進行處理。
https://cloud.tencent.com/developer/article/1515297
https://duyanghao.github.io/http-transport
https://draveness.me/golang/docs/part4-advanced/ch09-stdlib/golang-net-http
https://laravelacademy.org/post/21003
https://segmentfault.com/a/1190000021653550
到此這篇關於快速掌握Go 語言 HTTP 標準庫的實現方法的文章就介紹到這了,更多相關Go 語言 HTTP 標準庫內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45