<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
在上一篇文章中我實現了一個支援Debug、Info、Error等多個級別的紀錄檔庫,並將紀錄檔寫到了磁碟檔案中,程式碼比較簡單,適合練手。有興趣的可以通過這個連結前往:https://github.com/bosima/ylog/releases/tag/v1.0.1
工程實踐中,我們往往還需要對紀錄檔進行採集,將紀錄檔歸集到一起,然後用於各種處理分析,比如生產環境上的錯誤分析、異常告警等等。在紀錄檔訊息系統領域,Kafka久負盛名,這篇文章就以將紀錄檔傳送到Kafka來實現紀錄檔的採集;同時考慮到紀錄檔分析時對結構化資料的需求,這篇文章還會提供一種輸出Json格式紀錄檔的方法。
這個升級版的紀錄檔庫還要保持向前相容,即還能夠使用普通文字格式,以及寫紀錄檔到磁碟檔案,這兩個特性和要新增的兩個功能分別屬於同類處理,因此我這裡對它們進行抽象,形成兩個介面:格式化介面、寫紀錄檔介面。
所謂格式化,就是紀錄檔的格式處理。這個紀錄檔庫目前要支援兩種格式:普通文字和Json。
為了在不同格式之上提供一個統一的抽象,ylog中定義 logEntry 來代表一條紀錄檔:
type logEntry struct { Ts time.Time `json:"ts"` File string `json:"file"` Line int `json:"line"` Level LogLevel `json:"level"` Msg string `json:"msg"` }
格式化介面的能力就是將紀錄檔從logEntry格式轉化為其它某種資料格式。ylog中對它的定義是:
type LoggerFormatter interface { Format(*logEntry, *[]byte) error }
第1個引數是一個logEntry範例,也就是要被格式化的紀錄檔,第2個引數是紀錄檔格式化之後要寫入的容器。
其實現是這樣的:
type textFormatter struct { } func NewTextFormatter() *textFormatter { return &textFormatter{} } func (f *textFormatter) Format(entry *logEntry, buf *[]byte) error { formatTime(buf, entry.Ts) *buf = append(*buf, ' ') file := toShort(entry.File) *buf = append(*buf, file...) *buf = append(*buf, ':') itoa(buf, entry.Line, -1) *buf = append(*buf, ' ') *buf = append(*buf, levelNames[entry.Level]...) *buf = append(*buf, ' ') *buf = append(*buf, entry.Msg...) return nil }
可以看到它的主要功能就是將logEntry中的各個欄位按照某種順序平鋪開來,中間用空格分隔。
其中的很多資料處理方法參考了Golang標準紀錄檔庫中的資料格式化處理程式碼,有興趣的可以去Github中詳細檢視。
這裡對日期時間格式化為字串做了特別的優化,在標準紀錄檔庫中為了將年、月、日、時、分、秒、毫秒、微秒等格式化指定長度的字串,使用了一個函數:
func itoa(buf *[]byte, i int, wid int) { // Assemble decimal in reverse order. var b [20]byte bp := len(b) - 1 for i >= 10 || wid > 1 { wid-- q := i / 10 b[bp] = byte('0' + i - q*10) bp-- i = q } // i < 10 b[bp] = byte('0' + i) *buf = append(*buf, b[bp:]...) }
其邏輯大概就是將數位中的每一位轉換為字元並存入byte中,注意這裡初始化byte陣列的時候是20位,這是int64最大的數位位數。
其實時間字串中的每個部分位數都是固定的,比如年是4位元、月日時分秒都是2位,根本不需要20位,所以這個空間可以節省;還有這裡用了迴圈,這對於CPU的分支預測可能有那麼點影響,所以我這裡分別對不同位數寫了專門的格式化方法,以2位數為例:
func itoa2(buf *[]byte, i int) { q := i / 10 s := byte('0' + i - q*10) f := byte('0' + q) *buf = append(*buf, f, s) }
其實現是這樣的:
type jsonFormatter struct { } func NewJsonFormatter() *jsonFormatter { return &jsonFormatter{} } func (f *jsonFormatter) Format(entry *logEntry, buf *[]byte) (err error) { entry.File = toShortFile(entry.File) jsonBuf, err := json.Marshal(entry) *buf = append(*buf, jsonBuf...) return }
程式碼也很簡單,使用標準庫的json序列化方法將logEntry範例轉化為Json格式的資料。
對於Json格式,後續考慮支援使用者自定義Json欄位,這裡暫時先簡單處理。
寫紀錄檔就是將紀錄檔輸出到別的目標,比如ylog要支援的輸出到磁碟檔案、輸出到Kafka等。
前邊格式化介面將格式化後的資料封裝到了 []byte 中,寫紀錄檔介面就是將格式化處理的輸出 []byte 寫到某種輸出目標中。參考Golang中各種Writer的定義,ylog中對它的定義是:
type LoggerWriter interface { Ensure(*logEntry) error Write([]byte) error Sync() error Close() error }
這裡有4個方法:
這裡定義一個名為fileWriter的型別,它需要實現LoggerWriter的介面。
先看型別的定義:
type fileWriter struct { file *os.File lastHour int64 Path string }
包含四個欄位:
再看其實現的介面:
func (w *fileWriter) Ensure(entry *logEntry) (err error) { if w.file == nil { f, err := w.createFile(w.Path, entry.Ts) if err != nil { return err } w.lastHour = w.getTimeHour(entry.Ts) w.file = f return nil } currentHour := w.getTimeHour(entry.Ts) if w.lastHour != currentHour { _ = w.file.Close() f, err := w.createFile(w.Path, entry.Ts) if err != nil { return err } w.lastHour = currentHour w.file = f } return } func (w *fileWriter) Write(buf []byte) (err error) { buf = append(buf, 'n') _, err = w.file.Write(buf) return } func (w *fileWriter) Sync() error { return w.file.Sync() } func (w *fileWriter) Close() error { return w.file.Close() }
Ensure 中的主要邏輯是建立當前要寫入的檔案物件,如果小時數變了,先把之前的關閉,再建立一個新的檔案。
Write 把資料寫入到檔案物件,這裡加了一個換行符,也就是說對於檔案紀錄檔,其每條紀錄檔最後都會有一個換行符,這樣比較方便閱讀。
Sync 呼叫檔案物件的Sync方法,將紀錄檔從作業系統快取刷到磁碟。
Close 關閉當前檔案物件。
這裡定義一個名為kafkaWriter的型別,它也需要實現LoggerWriter的介面。
先看其結構體定義:
type kafkaWriter struct { Topic string Address string writer *kafka.Writer batchSize int }
這裡包含四個欄位:
Topic 寫Kafka時需要一個主題,這裡預設當前Logger中所有紀錄檔使用同一個主題。
Address Kafka的存取地址。
writer 向Kafka寫資料時使用的Writer,這裡整合的是:github.com/segmentio/kafka-go,支援自動重試和重連。
batchSize Kafka寫紀錄檔的批次大小,批次寫可以提高紀錄檔的寫效率。
再看其實現的介面:
func (w *kafkaWriter) Ensure(curTime time.Time) (err error) { if w.writer == nil { w.writer = &kafka.Writer{ Addr: kafka.TCP(w.Address), Topic: w.Topic, BatchSize: w.batchSize, Async: true, } } return } func (w *kafkaWriter) Write(buf []byte) (err error) { // buf will be reused by ylog when this method return, // with aysnc write, we need copy data to a new slice kbuf := append([]byte(nil), buf...) err = w.writer.WriteMessages(context.Background(), kafka.Message{Value: kbuf}, ) return } func (w *kafkaWriter) Sync() error { return nil } func (w *kafkaWriter) Close() error { return w.writer.Close() }
這裡採用的是非同步傳送到Kafka的方式,WriteMessages方法不會阻塞,因為傳入的buf要被ylog重用,所以這裡copy了一下。非同步還會存在的一個問題就是不會返回錯誤,可能丟失資料,不過對於紀錄檔這種資料,沒有那麼嚴格的要求,也可以接受。
如果採用同步傳送,因為批次傳送比較有效率,這裡可以攢幾條再發,但紀錄檔比較稀疏時,可能短時間很難攢夠,就會出現長時間等不到紀錄檔的情況,所以還要有個超時機制,這有點麻煩,不過我也寫了一個版本,有興趣的可以去看看:https://github.com/bosima/ylog/blob/main/examples/kafka-writer.go
有了格式化介面和寫紀錄檔介面,下一步就是將它們組裝起來,以實現相應的處理能力。
首先是建立它們,因為我這裡也沒有動態設定的需求,所以就放到建立Logger範例的時候了,這樣比較簡單。
func NewYesLogger(opts ...Option) (logger *YesLogger) { logger = &YesLogger{} ... logger.writer = NewFileWriter("logs") logger.formatter = NewTextFormatter() for _, opt := range opts { opt(logger) } ... return }
可以看到預設的formatter是textFormatter,預設的writer是fileWriter。這個函數傳入的Option其實是個函數,在下邊的opt(logger)中會執行它們,所以使用其它的Formatter或者Writer可以這樣做:
logger := ylog.NewYesLogger( ... ylog.Writer(ylog.NewKafkaWriter(address, topic, writeBatchSize)), ylog.Formatter(ylog.NewJsonFormatter()), )
這裡 ylog.Writer 和 ylog.Formatter 就是符合Option型別的函數,呼叫它們可以設定不同的Formatter和Writer。
然後怎麼使用它們呢?
... l.formatter.Format(entry, &buf) l.writer.Ensure(entry) err := l.writer.Write(buf) ...
當 logEntry 進入訊息處理環節後,首先呼叫formatter的Format方法格式化logEntry;然後呼叫了writer的Ensure方法確保writer已經準備好,最後呼叫writer的Write方法將格式化之後的資料輸出到對應的目標。
為什麼不將Ensure方法放到Write中呢?這是因為目前寫文字紀錄檔的時候需要根據logEntry中的紀錄檔時間建立紀錄檔檔案,這樣就需要給Writer傳遞兩個引數,有點彆扭,所以這裡將它們分開了。
Kafka的吞吐量是很高的,那麼如果放到ylog自身來說,如何提高它的吞吐量呢?
首先想到的就是Channel,可以使用有緩衝的Channel模擬一個佇列,生產者不停的向Channel傳送資料,如果Writer可以一直在緩衝被填滿之前將資料取走,那麼理論上說生產者就是非阻塞的,相比同步輸出到某個Writer,沒有直接磁碟IO、網路IO,紀錄檔處理的吞吐量必將大幅提升。
定義一個Channel,其容量預設為當前機器邏輯處理器的數量:
logger.pipe = make(chan *logEntry, runtime.NumCPU())
傳送資料的程式碼:
entry := &logEntry{ Level: level, Msg: s, File: file, Line: line, Ts: now, } l.pipe <- entry
接收資料的程式碼:
for { select { case entry := <-l.pipe: // reuse the slice memory buf = buf[:0] l.formatter.Format(entry, &buf) l.writer.Ensure(entry.Ts) err := l.writer.Write(buf) ... } }
實際效果怎麼樣呢?看下Benchmark:
goos: darwin goarch: amd64 pkg: github.com/bosima/ylog cpu: Intel(R) Core(TM) i5-8259U CPU @ 2.30GHz BenchmarkInfo-8 1332333 871.6 ns/op 328 B/op 4 allocs/op
這個結果可以和zerolog、zap等高效能紀錄檔庫一較高下了,當然目前可以做的事情要比它們簡單很多。
如果對Java有所瞭解的同學應該聽說過log4j,在log4j2中引入了一個名為Disruptor的元件,它讓紀錄檔處理飛快了起來,受到很多Java開發者的追捧。Disruptor之所以這麼厲害,是因為它使用了無鎖並行、環形佇列、快取行填充等多種高階技術。
相比之下,Golang的Channel雖然也使用了環形緩衝,但是還是使用了鎖,作為佇列來說效能並不是最優的。
Golang中有沒有類似的東西呢?最近出來的ZenQ可能是一個不錯的選擇,不過看似還不太穩定,過段時間再嘗試下。有興趣的可以去看看:https://github.com/alphadose/ZenQ 。
好了,以上就是本文的主要內容。關於ylog的介紹也告一段落了,後續會在Github上持續更新,增加更多有用的功能,並不斷優化處理效能,歡迎關注:https://github.com/bosima/ylog 。
到此這篇關於Golang:將紀錄檔以Json格式輸出到Kafka的文章就介紹到這了,更多相關Golang紀錄檔輸出到Kafka內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45