首頁 > 軟體

Go prometheus metrics條目自動回收與清理方法

2022-11-09 14:01:20

事件背景

現網上執行著一個自己開發的 metrics exporter,它是專門來捕獲後端資源的執行狀態,並生成對應的 prometheus metrics 供監控報警系統使用。當然這個 exporter 只是負責遠端監控資源,並不能實際控制後端的資源,也不能實時動態獲得被監控的資源的變動事件。當我們的運維小夥伴手動錯誤刪除後端被監控的資源,導致業務流量異常。此時也沒有報警出來,而這個報警卻是依賴這個 metrics exporter 所採集的資料,導致了一次小型事件。因為這個事件,才有今天寫文章的動力,同時也分享下解決這個問題的方法。

現象獲取

架構圖

問題定位

通過跟小夥伴們一起復盤,以及追查可能出現問題的位置後,大家都覺得沒有任何問題。在運維刪除對應的監控資源後,同時沒有關閉報警規則的情況下,應該有大量的任何異常報警產生。但實際情況,沒有任何報警發出來。

當大家一籌莫展的時候,我突然說了一句,會不會是資料採集出現了問題?大家眼前一亮,趕緊拿出 metrics exporter 的程式碼檢查。通過反覆檢查,也沒有發現可疑的地方,於是大家又開始了思考。這時我開啟了 metrics exporter 偵錯模式,打上斷點,然後請運維小夥伴刪除一個測試資源,觀察監控資料的變化。果不其然,資源刪除了,對應監控的 metrics 條目的值沒有變化(也就是說,還是上次資源的狀態)

這下破案了,搞了半天是因為 metrics 條目內容沒有跟隨資源的刪除而被自動的刪除。導致了報警系統一直認為被刪除的資源還在執行,而且狀態正常。

原理分析

既然知道了原因,再回過頭看 metrics exporter 的程式碼,程式碼中有 prometheus.MustRegister、prometheus.Unregister 和相關的 MetricsVec 值變更的實現和呼叫。就是沒有判斷監控資源在下線或者刪除的情況下,如何刪除和清理建立出來的 MetricsVec。

在我的印象中 MetricsVec 會根據 labels 會自動建立相關的條目,從來沒有手動的新增和建立。根據這個邏輯我也認為,MetricsVec 中如果 labels 對應的值不更新或者處於不活躍的狀態,應該自動刪除才是。

最後還是把 golang 的 github.com/prometheus/client_golang 這個庫想太完美了。沒有花時間對 github.com/prometheus/client_golang 內部結構、原理、處理機制充分理解,才導致這個事件的發生。

github.com/prometheus/client_golang 中的 metrics 主要是 4 個種類,這個可以 baidu 上搜尋,很多介紹,我這裡不詳細展開。這些種類的 metrics 又可以分為:一次性使用和多次使用

  • 一次性使用:當請求到達了 http 伺服器,被 promhttp 中的 handler 處理後,返回資料給請求方。隨後 metrics 資料就失效了,不儲存。下次再有請求到 http 介面查詢 metrics,資料重新計算生成,返回給請求方。
  • 多次性使用:當請求到達了 http 伺服器,被 promhttp 中的 handler 處理後,返回資料給請求方。隨後 metrics 儲存,並不會刪除,需要手動清理和刪除。 下次再有請求到 http 介面查詢 metrics,直接返回之前儲存過的資料給請求方。

注意這兩者的區別,他們有不同的應用場景。

  • 一次性使用:一次請求一次新資料,資料與資料間隔時間由資料讀取者決定。 如果有多個資料讀取者,每一個讀取者讀取到的資料可能不會相同。每一個請求計算一次,如果採集請求量比較大,或者內部計算壓力比較大,都會導致負載壓力很高。 計算和輸出是同步邏輯。 例如:k8s 上的很多 exporter 是這樣的方式。
  • 多次性使用:每次請求都是從 map 中獲得,資料與資料間隔時間由資料寫入者決定。如果有多個資料讀取者,每一個讀取者採集的資料相同(讀取的過程中沒有更新資料寫入)。每一個請求獲得都是相同的計算結果,1 次計算多數讀取。計算和輸出是非同步邏輯。例如:http server 上 http 請求狀態統計,延遲統計,轉發位元組彙總,並行量等等。

這次專案中寫的 metrics exporter 本應該是採用 “一次性使用” 這樣的模型來開發,但是內部結構模型採用了 “多次性使用” 模型,因為指標資料寫入者和資料讀取者之間沒有必然聯絡,不屬於一個對談系統,所以之間是非同步結構。具體我們看下圖:

從圖中有 2 個身份說明下:

  • 資料讀取者:主要是 Prometheus 系統的採集器,根據設定的規則週期性的來 metrics 介面讀取資料。
  • 資料寫入者:開發的 scanner ,通過介面去讀遠端資源狀態資訊和相關資料,通過計算得到最後的結果,寫入指定的 metrics 條目內。

在此次專案中 metrics 條目是用 prometheus.GaugeVec 作為採集資料計算後結果的儲存型別。

說了這麼多,想要分析真正的原因,就必須深入 github.com/prometheus/client_golang 程式碼中 GaugeVec 這個具體程式碼實現。

// GaugeVec is a Collector that bundles a set of Gauges that all share the same
// Desc, but have different values for their variable labels. This is used if
// you want to count the same thing partitioned by various dimensions
// (e.g. number of operations queued, partitioned by user and operation
// type). Create instances with NewGaugeVec.
type GaugeVec struct {
	*MetricVec
}
type MetricVec struct {
	*metricMap
	curry []curriedLabelValue
	// hashAdd and hashAddByte can be replaced for testing collision handling.
	hashAdd     func(h uint64, s string) uint64
	hashAddByte func(h uint64, b byte) uint64
}
// metricMap is a helper for metricVec and shared between differently curried
// metricVecs.
type metricMap struct {
	mtx       sync.RWMutex // Protects metrics.
	metrics   map[uint64][]metricWithLabelValues  // 真正的資料儲存位置
	desc      *Desc
	newMetric func(labelValues ...string) Metric
}

通過上面的程式碼,一條 metric 條目是儲存在 metricMap.metrics 下。 我們繼續往下看:

讀取資料

// Collect implements Collector.
func (m *metricMap) Collect(ch chan<- Metric) {
	m.mtx.RLock()
	defer m.mtx.RUnlock()
	// 遍歷 map
	for _, metrics := range m.metrics {
		for _, metric := range metrics {
			ch <- metric.metric // 讀取資料到通道
		}
	}
}

寫入資料

// To create Gauge instances, use NewGauge.
type Gauge interface {
	Metric
	Collector
	// Set sets the Gauge to an arbitrary value.
	Set(float64)
	// Inc increments the Gauge by 1. Use Add to increment it by arbitrary
	// values.
	Inc()
	// Dec decrements the Gauge by 1. Use Sub to decrement it by arbitrary
	// values.
	Dec()
	// Add adds the given value to the Gauge. (The value can be negative,
	// resulting in a decrease of the Gauge.)
	Add(float64)
	// Sub subtracts the given value from the Gauge. (The value can be
	// negative, resulting in an increase of the Gauge.)
	Sub(float64)
	// SetToCurrentTime sets the Gauge to the current Unix time in seconds.
	SetToCurrentTime()
}
func NewGauge(opts GaugeOpts) Gauge {
	desc := NewDesc(
		BuildFQName(opts.Namespace, opts.Subsystem, opts.Name),
		opts.Help,
		nil,
		opts.ConstLabels,
	)
	result := &gauge{desc: desc, labelPairs: desc.constLabelPairs}
	result.init(result) // Init self-collection.
	return result
}
type gauge struct {
	// valBits contains the bits of the represented float64 value. It has
	// to go first in the struct to guarantee alignment for atomic
	// operations.  http://golang.org/pkg/sync/atomic/#pkg-note-BUG
	valBits uint64
	selfCollector
	desc       *Desc
	labelPairs []*dto.LabelPair
}
func (g *gauge) Set(val float64) {
	atomic.StoreUint64(&g.valBits, math.Float64bits(val))  // 寫入資料到變數
}

看到上面的程式碼,有的小夥伴就會說讀取和寫入的位置不一樣啊,沒有找到真正的位置。不要著急,後面還有。

// getOrCreateMetricWithLabelValues retrieves the metric by hash and label value
// or creates it and returns the new one.
//
// This function holds the mutex.
func (m *metricMap) getOrCreateMetricWithLabelValues(hash uint64, lvs []string, curry []curriedLabelValue,) Metric { // 返回了一個介面
	m.mtx.RLock()
	metric, ok := m.getMetricWithHashAndLabelValues(hash, lvs, curry)
	m.mtx.RUnlock()
	if ok {
		return metric
	}
	m.mtx.Lock()
	defer m.mtx.Unlock()
	metric, ok = m.getMetricWithHashAndLabelValues(hash, lvs, curry)
	if !ok {
		inlinedLVs := inlineLabelValues(lvs, curry)
		metric = m.newMetric(inlinedLVs...)
		m.metrics[hash] = append(m.metrics[hash], metricWithLabelValues{values: inlinedLVs, metric: metric})  // 這裡寫入 metricMap.metrics
	}
	return metric
}
// A Metric models a single sample value with its meta data being exported to
// Prometheus. Implementations of Metric in this package are Gauge, Counter,
// Histogram, Summary, and Untyped.
type Metric interface { // 哦哦哦哦,是介面啊。Gauge 實現這個介面
	// Desc returns the descriptor for the Metric. This method idempotently
	// returns the same descriptor throughout the lifetime of the
	// Metric. The returned descriptor is immutable by contract. A Metric
	// unable to describe itself must return an invalid descriptor (created
	// with NewInvalidDesc).
	Desc() *Desc
	// Write encodes the Metric into a "Metric" Protocol Buffer data
	// transmission object.
	//
	// Metric implementations must observe concurrency safety as reads of
	// this metric may occur at any time, and any blocking occurs at the
	// expense of total performance of rendering all registered
	// metrics. Ideally, Metric implementations should support concurrent
	// readers.
	//
	// While populating dto.Metric, it is the responsibility of the
	// implementation to ensure validity of the Metric protobuf (like valid
	// UTF-8 strings or syntactically valid metric and label names). It is
	// recommended to sort labels lexicographically. Callers of Write should
	// still make sure of sorting if they depend on it.
	Write(*dto.Metric) error
	// TODO(beorn7): The original rationale of passing in a pre-allocated
	// dto.Metric protobuf to save allocations has disappeared. The
	// signature of this method should be changed to "Write() (*dto.Metric,
	// error)".
}

看到這裡就知道了寫入、儲存、讀取已經連線到了一起。 同時如果沒有顯式的呼叫方法刪除 metricMap.metrics 的內容,那麼記錄的 metrics 條目的值就會一直存在,而原生程式碼中只是建立和變更內部值。正是因為這個邏輯才導致上面說的事情。

處理方法

既然找到原因,也找到對應的程式碼以及對應的內部邏輯,就清楚了 prometheus.GaugeVec 這個變數真正的使用方法。到此解決方案也就有了,找到合適的位置新增程式碼,顯式呼叫 DeleteLabelValues 這個方法來刪除無效 metrics 條目。

為了最後實現整體效果,我總結下有幾個關鍵詞:“非同步”、“多次性使用”、“自動回收”

最後的改造思路

  • 建立一個 scanner 掃描結果儲存的狀態機 (status)
  • 每次 scanner 掃描結果會向這個狀態機做更新動作,並記錄對應的更新時間
  • 啟動一個 goroutine (cleaner) 定期掃描狀態機,然後遍歷分析記錄資料的更新時間。如果遍歷到對應資料的更新時間跟現在的時間差值超過一個固定的閾值,就主動刪除狀態機中對應的資訊,同時刪除對應的 metrics 條目

通過這個動作就可以實現自動回收和清理無效的 metrics 條目,最後驗證下來確實有效。

最終效果

通過測試程式碼來驗證這個方案的效果,具體如下演示:

package main
import (
	"context"
	"github.com/prometheus/client_golang/prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
	"net/http"
	"strconv"
	"sync"
	"time"
)
type metricsMetaData struct {
	UpdatedAt int64
	Labels    []string
}
func main() {
	var wg sync.WaitGroup
	var status sync.Map
	vec := prometheus.NewGaugeVec(
		prometheus.GaugeOpts{
			Namespace: "app",
			Name:      "running_status",
		}, []string{"id"},
	)
	prometheus.MustRegister(vec)
	defer prometheus.Unregister(vec)
	// 寫入資料
	for i := 0; i < 10; i++ {
		labels := strconv.Itoa(i)
		vec.WithLabelValues(labels).Set(1)                                                            // 寫入 metric 條目
		status.Store(labels, metricsMetaData{UpdatedAt: time.Now().Unix(), Labels: []string{labels}}) // 寫入狀態
	}
	// 建立退出 ctx
	stopCtx, stopCancel := context.WithCancel(context.Background())
	// 啟動清理器
	go func(ctx *context.Context, g *sync.WaitGroup) {
		defer g.Done()
		ticker := time.NewTicker(time.Second * 2)
		for {
			select {
			case <-ticker.C:
				now := time.Now().Unix()
				status.Range(func(key, value interface{}) bool {
					if now-value.(metricsMetaData).UpdatedAt > 5 {
						vec.DeleteLabelValues(value.(metricsMetaData).Labels...) // 刪除 metrics 條目
						status.Delete(key)                                       // 刪除 map 中的記錄
					}
					return true
				})
				break
			case <-(*ctx).Done():
				return
			}
		}
	}(&stopCtx, &wg)
	wg.Add(1)
	// 建立 http
	http.Handle("/metrics", promhttp.Handler())
	srv := http.Server{Addr: "0.0.0.0:8080"}
	// 啟動 http server
	go func(srv *http.Server, g *sync.WaitGroup) {
		defer g.Done()
		_ = srv.ListenAndServe()
	}(&srv, &wg)
	wg.Add(1)
	// 退出
	time.Sleep(time.Second * 10)
	stopCancel()
	_ = srv.Shutdown(context.Background())
	wg.Wait()
}

結果動畫:

以上就是Go prometheus metrics條目自動回收與清理方法的詳細內容,更多關於Go prometheus metrics回收清理的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com