首頁 > 軟體

Golang監聽紀錄檔檔案並行送到kafka中

2022-04-14 19:01:20

前言

紀錄檔收集專案的準備中,本文主要講的是利用golang的tail庫,監聽紀錄檔檔案的變動,將紀錄檔資訊傳送到kafka中。

涉及的golang庫和視覺化工具:

go-ini,sarama,tail其中:

  • go-ini:用於讀取組態檔,統一管理設定項,有利於後其的維護
  • sarama:是一個go操作kafka的使用者端。目前我用於向kefka傳送訊息
  • tail:類似於linux的tail命令了,讀取檔案的後幾行。如果檔案有追加資料,會檢測到。就是通過它來監聽紀錄檔檔案

視覺化工具:

offsetexplorer:是kafka的視覺化工具,這裡用來檢視訊息是否投遞成功

工作的流程

  • 載入設定,初始化saramakafka
  • 起一個的協程,利用tail不斷去監聽紀錄檔檔案的變化。
  • 主協程中一直阻塞等待tail傳送訊息,兩者通過一個管道通訊。一旦主協程接收到新紀錄檔,組裝格式,然後傳送到kafka中

環境準備

環境的話,確保zookeeperkafka正常執行。因為還沒有使用sarama讀取資料,使用offsetexplorer來檢視任務是否真的投遞成功了。

程式碼分層

serve來存放寫tail服務類和sarama服務類,conf存放ini組態檔

main函數為程式入口

 

關鍵的程式碼

main.go

main函數做的有:構建設定結構體,對映組態檔。呼叫和初始化tail,srama服務。

package main

import (
	"fmt"
	"sarama/serve"

	"github.com/go-ini/ini"
)

type KafkaConfig struct {
	Address     string `ini:"address"`
	ChannelSize int    `ini:"chan_size"`
}
type TailConfig struct {
	Path     string `ini:"path"`
	Filename string `ini:"fileName"`
	// 如果是結構體,則指明分割區名
	Children `ini:"tailfile.children"`
}
type Config struct {
	KafkaConfig `ini:"kafka"`
	TailConfig  `ini:"tailfile"`
}
type Children struct {
	Name string `ini:"name"`
}

func main() {
	// 載入設定
	var cfg = new(Config)
	err := ini.MapTo(cfg, "./conf/go-conf.ini")
	if err != nil {
		fmt.Print(err)
	}
	// 初始化kafka
	ks := &serve.KafukaServe{}
	// 啟動kafka訊息監聽。非同步
	ks.InitKafka([]string{cfg.KafkaConfig.Address}, int64(cfg.KafkaConfig.ChannelSize))
	// 關閉主協程時,關閉channel
	defer ks.Destruct()

	// 初始化tail
	ts := &serve.TailServe{}
	ts.TailInit(cfg.TailConfig.Path + "/" + cfg.TailConfig.Filename)
	// 阻塞
	ts.Listener(ks.MsgChan)

}

kafka.go

有3個方法 :

  • InitKafka,組裝設定項以及初始化接收訊息的管道,
  • Listener,監聽管道訊息,收到訊息後,將訊息組裝,傳送到kafka
  • Destruct, 關閉管道
package serve

import (
	"fmt"

	"github.com/Shopify/sarama"
)

type KafukaServe struct {
	MsgChan chan string
	//err         error
}

func (ks *KafukaServe) InitKafka(addr []string, chanSize int64) {

	// 讀取設定
	config := sarama.NewConfig()
	// 1. 初始化生產者設定
	config.Producer.RequiredAcks = sarama.WaitForAll
	// 選擇分割區
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	// 成功交付的資訊
	config.Producer.Return.Successes = true

	ks.MsgChan = make(chan string, chanSize)

	go ks.Listener(addr, chanSize, config)

}

func (ks *KafukaServe) Listener(addr []string, chanSize int64, config *sarama.Config) {
	//  連線kafka
	var kafkaClient, _ = sarama.NewSyncProducer(addr, config)
	defer kafkaClient.Close()
	for {
		select {
		case content := <-ks.MsgChan:
			//
			msg := &sarama.ProducerMessage{
				Topic: "weblog",
				Value: sarama.StringEncoder(content),
			}
			partition, offset, err := kafkaClient.SendMessage(msg)
			if err != nil {
				fmt.Println(err)
			}
			fmt.Println("分割區,偏移量:")
			fmt.Println(partition, offset)
			fmt.Println("___")
		}

	}
}

func (ks *KafukaServe) Destruct() {
	close(ks.MsgChan)
}

tail.go

主要包括了兩個方法:

  • TailInit初始化,組裝tail設定。Listener
  • Listener,儲存kafka服務類初始化之後的管道。監聽紀錄檔檔案,如果有新紀錄檔,就往管道里傳送
package serve

import (
	"fmt"

	"github.com/hpcloud/tail"
)

type TailServe struct {
	tails *tail.Tail
}

func (ts *TailServe) TailInit(filenName string) {
	config := tail.Config{
		ReOpen:    true,
		Follow:    true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
		MustExist: false,
		Poll:      true,
	}
	// 開啟檔案開始讀取資料

	ts.tails, _ = tail.TailFile(filenName, config)

	// if err != nil {
	// 	fmt.Println("tails %s failed,err:%vn", filenName, err)
	// 	return nil, err
	// }
	fmt.Println("啟動," + filenName + "監聽")
}

func (ts *TailServe) Listener(MsgChan chan string) {
	for {
		msg, ok := <-ts.tails.Lines
		if !ok {
			// todo
			fmt.Println("資料接收失敗")
			return
		}
		fmt.Println(msg.Text)
		MsgChan <- msg.Text
	}
}

// 測試案例
func Demo() {
	filename := `E:xx.log`
	config := tail.Config{
		ReOpen:    true,
		Follow:    true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
		MustExist: false,
		Poll:      true,
	}
	// 開啟檔案開始讀取資料
	tails, err := tail.TailFile(filename, config)
	if err != nil {
		fmt.Println("tails %s failed,err:%vn", filename, err)
		return
	}
	var (
		msg *tail.Line
		ok  bool
	)
	fmt.Println("啟動")
	for {
		msg, ok = <-tails.Lines
		if !ok {
			fmt.Println("tails file close reopen,filename:$sn", tails.Filename)
		}
		fmt.Println("msg:", msg.Text)
	}
}

到此這篇關於Golang監聽紀錄檔檔案並行送到kafka中的文章就介紹到這了,更多相關Golang 監聽紀錄檔檔案 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com