首頁 > 軟體

uber go zap 紀錄檔框架支援非同步紀錄檔輸出

2023-01-31 06:02:11

事件背景

過年在家正好閒得沒有太多事情,想起年前一個研發專案負責人反饋的問題:“老李啊,我們組一直在使用你這邊的 gin 封裝的 webservice 框架開發,我們需要一套標準的非同步紀錄檔輸出模組。現在組內和其他使用 gin 的小夥伴實現的‘各有千秋’不統一,沒有一個組或者部門對這部分的程式碼負責和長期維護。你能不能想想辦法。”

這一看就是掉頭髮的事情,雖然 gin 封裝的 webservice 框架是我開發底層服務包,已經推廣到公司所有 golang 開發組使用,現在需要一個統一非同步紀錄檔輸出的模組是否真的有意義,要認真的考慮和研究下,畢竟有核心業務團隊有這樣的需求。

索性開啟了 uber-go/zap 紀錄檔框架的原始碼,看看到底是什麼原因推動大家都要手寫非同步紀錄檔模組。不看不知道,一看嚇一跳,專案中 issue#998 就有討論,我看了下 issue 留言,覺得大家的說法都挺正確,而專案作者一直無動無衷,而且堅信 bufio + 定時 flush 的方式 才是正道,怪不得大家都要自己手寫一個非同步紀錄檔輸出模組。

心智負擔

在要寫 uber-go/zap 非同步紀錄檔模組之前,首先要明白非同步紀錄檔模組的優點、缺點以及適用的場景,這樣程式碼才寫的有意義,是真正的解決問題和能幫助到小夥伴的。

關於同步和非同步模型的差異,這邊就不展開了,估計再寫幾千字也不一定能說清楚,有需要深入瞭解的小夥伴,可以自行 baidu,那裡有很多相關的文章,而且講解得非常清晰。這裡我就不需要過多解析,而我需要講的是同步和非同步紀錄檔模組。

  • 同步紀錄檔:紀錄檔資訊投遞後,必須要等到紀錄檔資訊寫到對應的 io.Writer 中(os.Stdout, 檔案等等)並返回,這個呼叫過程結束。適合 Warning 級別以上紀錄檔輸出,強記錄或者落盤需求的紀錄檔資訊,不能丟失。
  • 非同步紀錄檔:紀錄檔資訊投遞後,呼叫過程結束。而紀錄檔資訊是否能夠正確寫到對應的 io.Writer 中(os.Stdout, 檔案等等)是由非同步紀錄檔模組保證,不等待呼叫過程。適合 Warning 級別以下紀錄檔輸出,儘量儲存紀錄檔,如果沒有儲存,丟失也沒有關係

那麼我就用一句話說明白這兩種紀錄檔模型的差別。

  • 同步紀錄檔:慢,安全,紀錄檔不丟
  • 非同步紀錄檔:快,不安全,紀錄檔盡力記錄

既然這裡說到是心智負擔,但是真正負擔在哪裡? 實際上面已經提到了心智負擔的核心內容:就是如何正確的選擇一個紀錄檔模型

而我們這邊需求是明確知道有部分紀錄檔可以丟失,追求介面響應速度,希望有統一的實現,有人維護程式碼和與整個 gin 封裝的 webservice 框架融合的品質。

前置知識

明確了開發的需求,開發的目標。確認了開發有意義,確實能解決問題。那麼:就是幹!!!

在動之前還是要準備些知識,還要做好結構設計,這樣才能解答:一套合理的非同步輸出模型應該是什麼樣的?

分享下我理解的一個非同步紀錄檔模型是什麼樣的(歡迎大家來“錘”,但是錘我的時候,麻煩輕點哈)

有的小夥伴看到這個圖覺得有點眼熟?Kafka?不對,不對,不對,還少了一個 Broker。因為這裡不需要對 Producer 實現一個獨立的緩衝器和分類器,那麼 Broker 這樣的角色就不存在了。

簡單的介紹下成員角色:

  • MessageProducer: 訊息和資料生成者
  • CriticalSurface: 並行臨介面,所有 MessageProducer 都到這邊競爭控制權,往 RingBuffer 中寫入資料
  • RingBuffer: 訊息和資料的緩衝(記得緩衝和快取區別,這邊用緩衝就是為了解決 Producer 和 Consumer 和速度差)
  • MessageConsumer: 訊息和資料消費者

為什麼選擇上面的模型:

  • 希望在現有的 uber-go/zap 的結構上擴充套件,實現一部分能力,滿足功能擴充套件。
  • 不希望重複做輪子,因為輪子做出來,需要有嚴格的程式碼測試和壓力測試,才能交付生產系統。
  • 模型簡單,好理解,也好實現。
  • 效能比較高,而且架構整體比較合理。

為了實現這個模型,還需要思考如下幾個問題:

  • CriticalSurface 如何實現?因為要滿足多個 MessageBroker 並行使用,那麼這個臨介面就必須要做,要不然就出現爭搶資源失控的情況。
  • 為什麼要選擇 RingBuffer?RingBuffer 是目前速度和效率最好的一種緩衝模型,Linux/Unix 系統中廣泛使用。
  • 選擇 RingBuffer 需要注意些什麼?RingBuffer 有快慢指標的問題,如果控制不好,快指標就回覆寫慢指標的資料,地址資料丟失的情況。
  • MessageConsumer 數量如何限制?如何平衡資訊的建立與消費之間的速度差異。
  • 如何支援多種紀錄檔方式輸出型別。(golang 多種 io.Writer 模型)

如果看到這裡,估計已經勸退了很多的小夥伴,我想這就是為什麼那個研發專案負責人帶著團隊問題來找我,希望能夠得到解決的原因吧。確實不容易。

解決思路

uber-go/zap 程式碼分析

在認真看看完了 uber-go/zap 的程式碼以後,發現 uber 就是 uber,程式碼質量還是非常不錯的,很多模組抽象的非常不錯。通過一段時間的思考後,確認我們要實現一個獨立的 WriteSyncer, 跟 uber-go/zap 中的 BufferedWriteSyncer 扮演相同的角色。

既然要實現,我們先看看 uber-go/zap 中的原始碼怎麼定義 WriteSyncer 的。

go.uber.org/zap@v1.24.0/zapcore/write_syncer.go

// A WriteSyncer is an io.Writer that can also flush any buffered data. Note
// that *os.File (and thus, os.Stderr and os.Stdout) implement WriteSyncer.
type WriteSyncer interface {
	io.Writer
	Sync() error
}

WriteSyncer 是一個 interface,也就是我們只要參照 io.Writer 和實現 Sync() error 這樣的一個方法就可以對接 uber-go/zap 系統中。那麼 Sync() 這個函數到底是幹嘛的? 顧名思義就是讓 zap 觸發資料同步動作時需要執行的一個方法。但是我們是非同步紀錄檔,明顯 uber-go/zap 處理完紀錄檔相關的資料,丟給我實現的 WriteSyncer 以後,就不應該在干預非同步紀錄檔模組的後期動作了,所以 Sync() 給他一個空殼函數就行了。

當然 uber-go/zap 早考慮到這樣的情況,就給一個非常棒的包裝函數 AddSync()

go.uber.org/zap@v1.24.0/zapcore/write_syncer.go

// AddSync converts an io.Writer to a WriteSyncer. It attempts to be
// intelligent: if the concrete type of the io.Writer implements WriteSyncer,
// we'll use the existing Sync method. If it doesn't, we'll add a no-op Sync.
func AddSync(w io.Writer) WriteSyncer {
	switch w := w.(type) {
	case WriteSyncer:
		return w
	default:
		return writerWrapper{w}
	}
}
type writerWrapper struct {
	io.Writer
}
func (w writerWrapper) Sync() error {
	return nil
}

uber-go/zap 已經把我們希望要做的事情都給做好了,我們只要實現一個標準的 io.Writer 就行了,那繼續看 io.Writer 的定義方式。

go/src/io/io.go

// Writer is the interface that wraps the basic Write method.
//
// Write writes len(p) bytes from p to the underlying data stream.
// It returns the number of bytes written from p (0 <= n <= len(p))
// and any error encountered that caused the write to stop early.
// Write must return a non-nil error if it returns n < len(p).
// Write must not modify the slice data, even temporarily.
//
// Implementations must not retain p.
type Writer interface {
	Write(p []byte) (n int, err error)
}

哇,好簡單。要實現 io.Writer 僅僅只要實現一個 Write(p []byte) (n int, err error) 方法就行了,So Easy !!!!

上手開發

還是回到上一章中的 5 個核心問題,我想到這裡應該有答案了:

  • MessageProducer:用一個函數實現,實際上就是 Write(p []byte),接收 uber-go/zap 投遞來的訊息內容。
  • CriticalSurface 和 RingBuffer: 是最核心的部件,既然要考慮到效能、安全、相容各種資料型別,同時要有一個 Locker 保證臨介面,也要滿足 FIFO 模型。思來想去,當然自己也實現了幾版,最後還是用 golang 自身的 channel 來完成。
  • MessageConsumer:用一個 go 協程來執行從 RingBuffer 迴圈讀取,然後往真正的 os.Stdout/os.StdErr/os.File 中輸出。(為什麼是一個而不是多個?一個速度就足夠快了,同時系統底層 io.Writer 自身也帶鎖,所以一個能減少鎖衝撞。)

TIPS: 這裡說說為什麼我要選擇 golang 自身的 channel 作為 CriticalSurface 和 RingBuffer 的實現體:

  • channel 是 golang 官方的程式碼包,有專門的團隊對這個程式碼質量負責。channel 很早就出來了,Bugs 修復的差不多了,非常的穩定可靠。(也有自己懶了,不想自己寫 RingBuffer,然後要考慮各種場景的程式碼測試。)
  • channel 的 “<-” 動作天生就有一個 Locker,有非常好的臨介面控制。
  • channel 底層是就是一個 RingBuffer 的實現,效率非常不錯,而且如果 channel 滿了,資料投遞動作就會卡住,如果 channel 空了,資料提取動作也會被卡住,這個機制非常棒。
  • channel 天生就是一個 FIFO 的模型,非常合適做資料緩衝,解決 Producer 和 Consumer 和速度差這樣問題。

有了上面的思路,我的程式碼架構也基本出來了,結構圖如下:

這裡我貼出一個實現程式碼(DEMO 測試用,生產要謹慎重新實現):

const defaultQueueCap = math.MaxUint16 * 8
var QueueIsFullError = errors.New("queue is full")
var DropWriteMessageError = errors.New("message writing failure and drop it")
type Writer struct {
	name        string
	bufferPool  *extraBufferPool
	writer      io.Writer
	wg          sync.WaitGroup
	lock        sync.RWMutex
	channel     chan *extraBuffer
}
func NewBufferWriter(name string, w io.Writer, queueCap uint32) *Writer {
	if len(name) <= 0 {
		name = "bw_" + utils.GetRandIdString()
	}
	if queueCap <= 0 {
		queueCap = defaultQueueCap
	}
	if w == nil {
		return nil
	}
	wr := Writer{
		name:          name,
		bufferPool:    newExtraBufferPool(defaultBufferSize),
		writer:        w,
		channel:       make(chan *extraBuffer, queueCap),
	}
	wr.wg.Add(1)
	go wr.poller(utils.GetRandIdString())
	return &wr
}
func (w *Writer) Write(p []byte) (int, error) {
	if w.lock.TryRLock() {
		defer w.lock.RUnlock()
		b := w.bufferPool.Get()
		count, err := b.buff.Write(p)
		if err != nil {
			w.bufferPool.Put(b)
			return count, err
		}
		select {
		case w.channel <- b: // channel 內部傳遞的是 buffer 的指標,速度比傳遞物件快。
			break
		default:
			w.bufferPool.Put(b)
			return count, QueueIsFullError
		}
		return len(p), nil
	} else {
		return -1, DropWriteMessageError
	}
}
func (w *Writer) Close() {
	w.lock.Lock()
	close(w.channel)
	w.wg.Wait()
	w.lock.Unlock()
}
func (w *Writer) poller(id string) {
	var (
		eb  *extraBuffer
		err error
	)
	defer w.wg.Done()
	for eb = range w.channel {
		_, err = w.writer.Write(eb.buff.Bytes())
		if err != nil {
			log.Printf("writer: %s, id: %s, error: %s, message: %s", w.name, id,
				err.Error(), utils.BytesToString(eb.buff.Bytes()))
		}
		w.bufferPool.Put(eb)
	}
}

然後在 uber-go/zap 中如何使用呢?

import (
	"go.uber.org/zap"
	"go.uber.org/zap/zapcore"
	"os"
	"time"
)
func main() {
	wr := NewBufferWriter("lee", os.Stdout, 0)
	defer wr.Close()
	c := zapcore.NewCore(
		zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()),
		zapcore.AddSync(wr),
		zap.NewAtomicLevelAt(zap.DebugLevel),
	)
	log := zap.New(c)
	log.Info("demo log")
	time.Sleep(3 * time.Second) // 這裡要稍微等待下,因為是非同步的輸出,log.Info() 執行完畢,紀錄檔並沒有完全輸出到 console
}

Console 輸出:

$ go run asynclog.go
{"level":"info","ts":1674808100.0148869,"msg":"demo log"}

輸出結果符合逾期

測試程式碼

為了驗證架構和程式碼質量,這裡做了非同步輸出紀錄檔、同步輸出紀錄檔和不輸出紀錄檔 3 種情況下,對 gin 封裝的 webservice 框架吞吐力的影響。

#測試內容Requests/sec
1同步輸出紀錄檔20074.24
2非同步輸出紀錄檔64197.08
3不輸出紀錄檔65551.84

同步輸出紀錄檔

$ wrk -t 10 -c 1000 http://127.0.0.1:8080/xx/
Running 10s test @ http://127.0.0.1:8080/xx/
  10 threads and 1000 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency    12.03ms   14.23ms 202.46ms   89.23%
    Req/Sec     2.03k     1.36k    9.49k    59.28%
  202813 requests in 10.10s, 100.58MB read
  Socket errors: connect 757, read 73, write 0, timeout 0
Requests/sec:  20074.24
Transfer/sec:      9.96MB

非同步輸出紀錄檔

$ wrk -t 10 -c 1000 http://127.0.0.1:8080/xx/
Running 10s test @ http://127.0.0.1:8080/xx/
  10 threads and 1000 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     3.75ms    2.43ms  39.94ms   92.68%
    Req/Sec     6.48k     3.86k   14.78k    57.11%
  648554 requests in 10.10s, 321.62MB read
  Socket errors: connect 757, read 79, write 0, timeout 0
Requests/sec:  64197.08
Transfer/sec:     31.84MB

不輸出紀錄檔

$ wrk -t 10 -c 1000 http://127.0.0.1:8080/xx/
Running 10s test @ http://127.0.0.1:8080/xx/
  10 threads and 1000 connections
  Thread Stats   Avg      Stdev     Max   +/- Stdev
    Latency     3.69ms  505.13us   9.29ms   77.36%
    Req/Sec     6.60k     4.25k   15.31k    56.45%
  662381 requests in 10.10s, 328.48MB read
  Socket errors: connect 757, read 64, write 0, timeout 0
Requests/sec:  65551.84
Transfer/sec:     32.51MB

總結

通過對上面的工程程式碼測試,基本實現了 gin + zap 的非同步紀錄檔輸出功能的實現。當然上面的程式碼僅供小夥伴學習研究用,並不能作為生產程式碼使用。

從結果來看,golang 的 channel 整體效能還是非常不錯。基於 channel 實現的非同步紀錄檔輸出基本於不輸出紀錄檔的吞吐力和效能相當。

在實際工作中,我們能用 golang 原生庫的時候就儘量用,因為 golang 團隊在寫庫的時候,大多數的情況和場景都考慮過,所以沒有必自己做一個輪子。安全!安全!安全!

至於 uber-go/zap 團隊為什麼不願意實現這樣的非同步紀錄檔輸出模型,可能有他們的想法吧。但是我想,不論那種非同步紀錄檔模型,都存在著程式異常會丟紀錄檔的情況。這裡再次提醒小夥伴,要慎重選擇紀錄檔系統模型,切不可以一味追求速度而忽略紀錄檔,因為服務紀錄檔也是重要的業務資料。

以上就是uber go zap 紀錄檔框架支援非同步紀錄檔輸出的詳細內容,更多關於uber go zap紀錄檔非同步輸出的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com