首頁 > 軟體

Golang 實現Redis 協定解析器的解決方案

2022-10-26 14:01:53

本文是 《用 Golang 實現一個 Redis》系列文章第二篇,本文將分別介紹Redis 通訊協定 以及 協定解析器 的實現,若您對協定有所瞭解可以直接閱讀協定解析器部分。

Redis 通訊協定

Redis 自 2.0 版本起使用了統一的協定 RESP (REdis Serialization Protocol),該協定易於實現,計算機可以高效的進行解析且易於被人類讀懂。

RESP 是一個二進位制安全的文字協定,工作於 TCP 協定上。RESP 以行作為單位,使用者端和伺服器傳送的命令或資料一律以 rn (CRLF)作為換行符。

二進位制安全是指允許協定中出現任意字元而不會導致故障。比如 C 語言的字串以 作為結尾不允許字串中間出現, 而 Go 語言的 string 則允許出現 ,我們說 Go 語言的 string 是二進位制安全的,而 C 語言字串不是二進位制安全的。

RESP 的二進位制安全性允許我們在 key 或者 value 中包含 r 或者 n 這樣的特殊字元。在使用 redis 儲存 protobuf、msgpack 等二進位制資料時,二進位制安全性尤為重要。

RESP 定義了5種格式:

  • 簡單字串(Simple String): 伺服器用來返回簡單的結果,比如"OK"。非二進位制安全,且不允許換行。
  • 錯誤資訊(Error): 伺服器用來返回簡單的錯誤資訊,比如"ERR Invalid Synatx"。非二進位制安全,且不允許換行。
  • 整數(Integer): llen、scard 等命令的返回值, 64位元有符號整數
  • 字串(Bulk String): 二進位制安全字串, 比如 get 等命令的返回值
  • 陣列(Array, 又稱 Multi Bulk Strings): Bulk String 陣列,使用者端傳送指令以及 lrange 等命令響應的格式

RESP 通過第一個字元來表示格式:

  • 簡單字串:以"+" 開始, 如:"+OKrn"
  • 錯誤:以"-" 開始,如:"-ERR Invalid Synatxrn"
  • 整數:以":"開始,如:":1rn"
  • 字串:以 $ 開始
  • 陣列:以 * 開始

Bulk String有兩行,第一行為 $+正文長度,第二行為實際內容。如:

$3rnSETrn

Bulk String 是二進位制安全的可以包含任意位元組,就是說可以在 Bulk String 內部包含 "rn" 字元(行尾的CRLF被隱藏):

$4arnb

$-1 表示 nil, 比如使用 get 命令查詢一個不存在的key時,響應即為$-1

Array 格式第一行為 "*"+陣列長度,其後是相應數量的 Bulk String。如, ["foo", "bar"]的報文:

*2
$3
foo
$3
bar

使用者端也使用 Array 格式向伺服器端傳送指令。命令本身將作為第一個引數,如 SET key value指令的RESP報文:

*3
$3
SET
$3
key
$5
value

將換行符列印出來:

*3rn$3rnSETrn$3rnkeyrn$5rnvaluern

協定解析器

我們在 實現TCP伺服器 一文中已經介紹過TCP伺服器的實現,協定解析器將實現其 Handler 介面充當應用層伺服器。

協定解析器將接收 Socket 傳來的資料,並將其資料還原為 [][]byte 格式,如 "*3rn$3rnSETrn$3rnkeyrn$5rvaluern" 將被還原為 ['SET', 'key', 'value']

本文完整程式碼: github.com/hdt3213/godis/redis/parser

來自使用者端的請求均為陣列格式,它在第一行中標記報文的總行數並使用CRLF作為分行符。

bufio 標準庫可以將從 reader 讀到的資料快取到 buffer 中,直至遇到分隔符或讀取完畢後返回,所以我們使用 reader.ReadBytes('n') 來保證每次讀取到完整的一行。

需要注意的是RESP是二進位制安全的協定,它允許在正文中使用CRLF字元。舉例來說 Redis 可以正確接收並執行SET "arnb" 1指令, 這條指令的正確報文是這樣的:

*3  
$3
SET
$4
arnb 
$7
myvalue

ReadBytes 讀取到第五行 "arnbrn"時會將其誤認為兩行:

*3  
$3
SET
$4
a  // 錯誤的分行
b // 錯誤的分行
$7
myvalue

因此當讀取到第四行$4後, 不應該繼續使用 ReadBytes('n') 讀取下一行, 應使用 io.ReadFull(reader, msg) 方法來讀取指定長度的內容。

msg = make([]byte, 4 + 2) // 正文長度4 + 換行符長度2
_, err = io.ReadFull(reader, msg)

首先我們來定義解析器的介面:

// Payload stores redis.Reply or error
type Payload struct {
	Data redis.Reply
	Err  error
}

// ParseStream 通過 io.Reader 讀取資料並將結果通過 channel 將結果返回給呼叫者
// 流式處理的介面適合供使用者端/伺服器端使用
func ParseStream(reader io.Reader) <-chan *Payload {
	ch := make(chan *Payload)
	go parse0(reader, ch)
	return ch
}

// ParseOne 解析 []byte 並返回 redis.Reply 
func ParseOne(data []byte) (redis.Reply, error) {
	ch := make(chan *Payload)
	reader := bytes.NewReader(data)
	go parse0(reader, ch)
	payload := <-ch // parse0 will close the channel
	if payload == nil {
		return nil, errors.New("no reply")
	}
	return payload.Data, payload.Err
}

接下來我們可以看一下解析器核心流程的虛擬碼,您可以在parser.go看到完整程式碼:

func parse0(reader io.Reader, ch chan<- *Payload) {
    // 初始化讀取狀態
    readingMultiLine := false
    expectedArgsCount := 0
    var args [][]byte
    var bulkLen int64
    for {
        // 上文中我們提到 RESP 是以行為單位的
        // 因為行分為簡單字串和二進位制安全的BulkString,我們需要封裝一個 readLine 函數來相容
        line, err = readLine(reader, bulkLen)
        if err != nil { 
            // 處理錯誤
            return
        }
        // 接下來我們對剛剛讀取的行進行解析
        // 我們簡單的將 Reply 分為兩類:
        // 單行: StatusReply, IntReply, ErrorReply
        // 多行: BulkReply, MultiBulkReply

        if !readingMultiLine {
            if isMulitBulkHeader(line) {
                // 我們收到了 MulitBulkReply 的第一行
                // 獲得 MulitBulkReply 中 BulkString 的個數
                expectedArgsCount = parseMulitBulkHeader(line)
                // 等待 MulitBulkReply 後續行
                readingMultiLine = true
            } else if isBulkHeader(line) {
                // 我們收到了 BulkReply 的第一行
                // 獲得 BulkReply 第二行的長度, 通過 bulkLen 告訴 readLine 函數下一行 BulkString 的長度
                bulkLen = parseBulkHeader()
                // 這個 Reply 中一共有 1 個 BulkString
                expectedArgsCount = 1 
                // 等待 BulkReply 後續行
                readingMultiLine = true
            } else {
                // 處理 StatusReply, IntReply, ErrorReply 等單行 Reply
                reply := parseSingleLineReply(line)
                // 通過 ch 返回結果
                emitReply(ch)
            }
        } else {
            // 進入此分支說明我們正在等待 MulitBulkReply 或 BulkReply 的後續行
            // MulitBulkReply 的後續行有兩種,BulkHeader 或者 BulkString
            if isBulkHeader(line) {
                bulkLen = parseBulkHeader()
            } else {
                // 我們正在讀取一個 BulkString, 它可能是 MulitBulkReply 或 BulkReply 
                args = append(args, line)
            }
            if len(args) == expectedArgsCount { // 我們已經讀取了所有後續行
                // 通過 ch 返回結果
                emitReply(ch)
                // 重置狀態, 準備解析下一條 Reply
                readingMultiLine = false
                expectedArgsCount = 0
                args = nil
                bulkLen = 0
            }
        }
    }
}

貼一下工具函數的實現:

func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {
	var msg []byte
	var err error
	if state.bulkLen == 0 { // read simple line
		msg, err = bufReader.ReadBytes('n')
		if err != nil {
			return nil, true, err
		}
		if len(msg) == 0 || msg[len(msg)-2] != 'r' {
			return nil, false, errors.New("protocol error: " + string(msg))
		}
	} else { // read bulk line (binary safe)
		msg = make([]byte, state.bulkLen+2)
		_, err = io.ReadFull(bufReader, msg)
		if err != nil {
			return nil, true, err
		}
		if len(msg) == 0 ||
			msg[len(msg)-2] != 'r' ||
			msg[len(msg)-1] != 'n' {
			return nil, false, errors.New("protocol error: " + string(msg))
		}
		state.bulkLen = 0
	}
	return msg, false, nil
}

func parseMultiBulkHeader(msg []byte, state *readState) error {
	var err error
	var expectedLine uint64
	expectedLine, err = strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32)
	if err != nil {
		return errors.New("protocol error: " + string(msg))
	}
	if expectedLine == 0 {
		state.expectedArgsCount = 0
		return nil
	} else if expectedLine > 0 {
		// first line of multi bulk reply
		state.msgType = msg[0]
		state.readingMultiLine = true
		state.expectedArgsCount = int(expectedLine)
		state.args = make([][]byte, 0, expectedLine)
		return nil
	} else {
		return errors.New("protocol error: " + string(msg))
	}
}

func parseBulkHeader(msg []byte, state *readState) error {
	var err error
	state.bulkLen, err = strconv.ParseInt(string(msg[1:len(msg)-2]), 10, 64)
	if err != nil {
		return errors.New("protocol error: " + string(msg))
	}
	if state.bulkLen == -1 { // null bulk
		return nil
	} else if state.bulkLen > 0 {
		state.msgType = msg[0]
		state.readingMultiLine = true
		state.expectedArgsCount = 1
		state.args = make([][]byte, 0, 1)
		return nil
	} else {
		return errors.New("protocol error: " + string(msg))
	}
}

func parseSingleLineReply(msg []byte) (redis.Reply, error) {
	str := strings.TrimSuffix(string(msg), "n")
	str = strings.TrimSuffix(str, "r")
	var result redis.Reply
	switch msg[0] {
	case '+': // status reply
		result = reply.MakeStatusReply(str[1:])
	case '-': // err reply
		result = reply.MakeErrReply(str[1:])
	case ':': // int reply
		val, err := strconv.ParseInt(str[1:], 10, 64)
		if err != nil {
			return nil, errors.New("protocol error: " + string(msg))
		}
		result = reply.MakeIntReply(val)
	default:
		// parse as text protocol
		strs := strings.Split(str, " ")
		args := make([][]byte, len(strs))
		for i, s := range strs {
			args[i] = []byte(s)
		}
		result = reply.MakeMultiBulkReply(args)
	}
	return result, nil
}

到此這篇關於Golang   實現 Redis 協定解析器的文章就介紹到這了,更多相關go redis 協定解析器內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com