首頁 > 軟體

Golang 編寫Tcp伺服器的解決方案

2022-10-26 14:01:53

Golang 開發 Tcp 伺服器及拆包粘包、優雅關閉的解決方案

Golang 作為廣泛用於伺服器端和雲端計算領域的程式語言,tcp socket 是其中至關重要的功能。您可以在 github.com/hdt3213/godis/tcp 中看到本文所述 TCP 伺服器的完整程式碼及其應用。

早期的 Tomcat/Apache 伺服器使用的是阻塞 IO 模型。它使用一個執行緒處理一個連線,在沒有收到新資料時監聽執行緒處於阻塞狀態,直到資料就緒後執行緒被喚醒。因為阻塞 IO 模型需要開啟大量執行緒並且頻繁地進行上下文切換,所以效率很差。

IO 多路複用技術為了解決上述問題採用了一個執行緒監聽多路連線的方案。一個執行緒持有多個連線並阻塞等待,當其中某個連線可讀寫時執行緒被喚醒進行處理。因為多個連線複用了一個執行緒所以 IO 多路複用需要的執行緒數少很多。

主流作業系統都提供了IO多路複用技術的實現,比如 Linux上的 epoll,freeBSD 上的 kqueue 以及 Windows 平臺上的 iocp。有得必有失,因為 epoll 等技術提供的介面面向 IO 事件而非面向連線,所以需要編寫複雜的非同步程式碼,開發難度很大。

Golang 的 netpoller 基於IO多路複用和 goroutine scheduler 構建了一個簡潔高效能的網路模型,並給開發者提供了 goroutine-per-connection 風格的極簡介面。

更多關於 netpoller 的剖析可以參考Golang實現四種負載均衡的演演算法(隨機,輪詢等), 接下來我們嘗試用 netpoller 編寫我們的伺服器。

Echo 伺服器

作為開始我們來實現一個簡單的 Echo 伺服器。它會接受使用者端連線並將使用者端傳送的內容原樣傳回使用者端。

package main

import (
    "fmt"
    "net"
    "io"
    "log"
    "bufio"
)

func ListenAndServe(address string) {
    // 繫結監聽地址
    listener, err := net.Listen("tcp", address)
    if err != nil {
        log.Fatal(fmt.Sprintf("listen err: %v", err))
    }
    defer listener.Close()
    log.Println(fmt.Sprintf("bind: %s, start listening...", address))

    for {
        // Accept 會一直阻塞直到有新的連線建立或者listen中斷才會返回
        conn, err := listener.Accept()
        if err != nil {
            // 通常是由於listener被關閉無法繼續監聽導致的錯誤
            log.Fatal(fmt.Sprintf("accept err: %v", err))
        }
        // 開啟新的 goroutine 處理該連線
        go Handle(conn)
    }
}

func Handle(conn net.Conn) {
    // 使用 bufio 標準庫提供的緩衝區功能
    reader := bufio.NewReader(conn)
    for {
        // ReadString 會一直阻塞直到遇到分隔符 'n'
        // 遇到分隔符後會返回上次遇到分隔符或連線建立後收到的所有資料, 包括分隔符本身
        // 若在遇到分隔符之前遇到異常, ReadString 會返回已收到的資料和錯誤資訊
        msg, err := reader.ReadString('n')
        if err != nil {
            // 通常遇到的錯誤是連線中斷或被關閉,用io.EOF表示
            if err == io.EOF {
                log.Println("connection close")
            } else {
                log.Println(err)
            }
            return
        }
        b := []byte(msg)
        // 將收到的資訊傳送給使用者端
        conn.Write(b)
    }
}

func main() {
    ListenAndServe(":8000")
}

使用 telnet 工具測試我們編寫的 Echo 伺服器:

$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
> a
a
> b
b
Connection closed by foreign host.

拆包與粘包問題

某些朋友可能看到"拆包與粘包"後表示極度震驚,並再三強調: TCP是個位元組流協定,不存在粘包問題。

我們常說的 TCP 伺服器並非「實現 TCP 協定的伺服器」而是「基於TCP協定的應用層伺服器」。TCP 是面向位元組流的協定,而應用層協定大多是訊息導向的,比如 HTTP 協定的請求/響應,Redis 協定的指令/回覆都是以訊息為單位進行通訊的。

作為應用層伺服器我們有責任從 TCP 提供的位元組流中正確地解析出應用層訊息,在這一步驟中我們會遇到「拆包/粘包」問題。

socket 允許我們通過 read 函數讀取新收到的一段資料(當然這段資料並不對應一個 TCP 包)。在上文的 Echo 伺服器範例中我們用n表示訊息結束,從 read 函數讀取的資料可能存在下列幾種情況:

  • 收到兩段資料: "abc", "defn" 它們屬於一條訊息 "abcdefn" 這是拆包的情況
  • 收到一段資料: "abcndefn" 它們屬於兩條訊息 "abcn", "defn" 這是粘包的情況

應用層協定通常採用下列幾種思路之一來定義訊息,以保證完整地進行讀取:

  • 定長訊息
  • 在訊息尾部新增特殊分隔符,如範例中的Echo協定和FTP控制協定。bufio 標準庫會快取收到的資料直到遇到分隔符才會返回,它可以幫助我們正確地分割位元組流。
  • 將訊息分為 header 和 body, 並在 header 中提供 body 總長度,這種分包方式被稱為 LTV(length,type,value) 包。這是應用最廣泛的策略,如HTTP協定。當從 header 中獲得 body 長度後, io.ReadFull 函數會讀取指定長度位元組流,從而解析應用層訊息。

在沒有具體應用層協定的情況下,我們很難詳細地討論拆包與粘包問題。在本系列的第二篇文章: 實現 Redis 協定解析器 中我們可以看到 Redis 序列化協定(RESP)對分隔符和 LTV 包的結合應用,以及兩種分包方式的具體解析程式碼。

優雅關閉

在生產環境下需要保證TCP伺服器關閉前完成必要的清理工作,包括將完成正在進行的資料傳輸,關閉TCP連線等。這種關閉模式稱為優雅關閉,可以避免資源洩露以及使用者端未收到完整資料導致故障。

TCP 伺服器的優雅關閉模式通常為: 先關閉listener阻止新連線進入,然後遍歷所有連線逐個進行關閉。首先修改一下TCP伺服器:

// handler 是應用層伺服器的抽象
type Handler interface {
    Handle(ctx context.Context, conn net.Conn)
    Close()error
}

// 監聽並提供服務,並在收到 closeChan 發來的關閉通知後關閉
func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) {
    // 監聽關閉通知
    go func() {
        <-closeChan
        logger.Info("shutting down...")
        // 停止監聽,listener.Accept()會立即返回 io.EOF
        _ = listener.Close()
        // 關閉應用層伺服器 
        _ = handler.Close()
    }()

    // 在異常退出後釋放資源
    defer func() {
        // close during unexpected error
        _ = listener.Close()
        _ = handler.Close()
    }()
    ctx := context.Background()
    var waitDone sync.WaitGroup
    for {
        // 監聽埠, 阻塞直到收到新連線或者出現錯誤
        conn, err := listener.Accept()
        if err != nil {
            break
        }
        // 開啟 goroutine 來處理新連線
        logger.Info("accept link")
        waitDone.Add(1)
        go func() {
            defer func() {
                waitDone.Done()
            }()
            handler.Handle(ctx, conn)
        }()
    }
    waitDone.Wait()
}

// ListenAndServeWithSignal 監聽中斷訊號並通過 closeChan 通知伺服器關閉
func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
    closeChan := make(chan struct{})
    sigCh := make(chan os.Signal)
    signal.Notify(sigCh, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
    go func() {
        sig := <-sigCh
        switch sig {
        case syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
            closeChan <- struct{}{}
        }
    }()
    listener, err := net.Listen("tcp", cfg.Address)
    if err != nil {
        return err
    }
    logger.Info(fmt.Sprintf("bind: %s, start listening...", cfg.Address))
    ListenAndServe(listener, handler, closeChan)
    return nil
}

接下來修改應用層伺服器:

// 使用者端連線的抽象
type Client struct {
    // tcp 連線
    Conn net.Conn
    // 當伺服器端開始傳送資料時進入waiting, 阻止其它goroutine關閉連線
    // wait.Wait是作者編寫的帶有最大等待時間的封裝: 
    // https://github.com/HDT3213/godis/blob/master/src/lib/sync/wait/wait.go
    Waiting wait.Wait
}

type EchoHandler struct {
    
    // 儲存所有工作狀態client的集合(把map當set用)
    // 需使用並行安全的容器
    activeConn sync.Map 

    // 關閉狀態標識位
    closing atomic.AtomicBool
}

func MakeEchoHandler()(*EchoHandler) {
    return &EchoHandler{}
}

func (h *EchoHandler)Handle(ctx context.Context, conn net.Conn) {
    // 關閉中的 handler 不會處理新連線
    if h.closing.Get() {
        conn.Close()
        return 
    }

    client := &Client {
        Conn: conn,
    }
    h.activeConn.Store(client, struct{}{}) // 記住仍然存活的連線

    reader := bufio.NewReader(conn)
    for {
        msg, err := reader.ReadString('n')
        if err != nil {
            if err == io.EOF {
                logger.Info("connection close")
                h.activeConn.Delete(client)
            } else {
                logger.Warn(err)
            }
            return
        }
        // 傳送資料前先置為waiting狀態,阻止連線被關閉
        client.Waiting.Add(1)

        // 模擬關閉時未完成傳送的情況
        //logger.Info("sleeping")
        //time.Sleep(10 * time.Second)

        b := []byte(msg)
        conn.Write(b)
        // 傳送完畢, 結束waiting
        client.Waiting.Done()
    }
}

// 關閉使用者端連線
func (c *Client)Close()error {
    // 等待資料傳送完成或超時
    c.Waiting.WaitWithTimeout(10 * time.Second)
    c.Conn.Close()
    return nil
}

// 關閉伺服器
func (h *EchoHandler)Close()error {
    logger.Info("handler shutting down...")
    h.closing.Set(true)
    // 逐個關閉連線
    h.activeConn.Range(func(key interface{}, val interface{})bool {
        client := key.(*Client)
        client.Close()
        return true
    })
    return nil
}

到此這篇關於Golang 編寫Tcp伺服器的解決方案的文章就介紹到這了,更多相關go tcp伺服器內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com