首頁 > 軟體

go語言K8S 的 informer機制淺析

2022-10-30 14:00:46

正文

Kubernetes的控制器模式是其非常重要的一個設計模式,整個Kubernetes定義的資源物件以及其狀態都儲存在etcd資料庫中,通過apiserver對其進行增刪查改,而各種各樣的控制器需要從apiserver及時獲取這些物件,然後將其應用到實際中,即將這些物件的實際狀態調整為期望狀態,讓他們保持匹配。

不過這因為這樣,各種控制器需要和apiserver進行頻繁互動,需要能夠及時獲取物件狀態的變化,而如果簡單的通過暴力輪詢的話,會給apiserver造成很大的壓力,且效率很低,因此,Kubernetes設計了Informer這個機制,用來作為控制器跟apiserver互動的橋樑,它主要有兩方面的作用:

依賴Etcd的List&Watch機制,在本地維護了一份目標物件的快取。

Etcd的Watch機制能夠使使用者端及時獲知這些物件的狀態變化,然後通過List機制,更新本地快取,這樣就在使用者端為這些API物件維護了一份和Etcd資料庫中幾乎一致的資料,然後控制器等使用者端就可以直接存取快取獲取物件的資訊,而不用去直接存取apiserver,這一方面顯著提高了效能,另一方面則大大降低了對apiserver的存取壓力;

依賴Etcd的Watch機制,觸發控制器等使用者端註冊到Informer中的事件方法。

使用者端可能會對某些物件的某些事件感興趣,當這些事件發生時,希望能夠執行某些操作,比如通過apiserver新建了一個pod,那麼kube-scheduler中的控制器收到了這個事件,然後將這個pod加入到其佇列中,等待進行排程。

Kubernetes的各個元件本身就內建了非常多的控制器,而自定義的控制器也需要通過Informer跟apiserver進行互動,因此,Informer在Kubernetes中應用非常廣泛,本篇文章就重點分析下Informer的機制原理,以加深對其的理解。

使用方法

先來看看Informer是怎麼用的,以Endpoint為例,來看下其使用Informer的相關程式碼:

建立Informer工廠

# client-go/informers/factory.go
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

首先建立了一個SharedInformerFactory,這個結構主要有兩個作用:

  • 一個是用來作為建立Informer的工廠,典型的工廠模式,在Kubernetes中這種設計模式也很常用;
  • 一個是共用Informer,所謂共用,就是多個Controller可以共用同一個Informer,因為不同的Controller可能對同一種API物件感興趣,這樣相同的API物件,快取就只有一份,通知機制也只有一套,大大提高了效率,減少了資源浪費。

建立物件Informer結構體

# client-go/informers/core/v1/endpoints.go
type EndpointsInformer interface {
  Informer() cache.SharedIndexInformer
  Lister() v1.EndpointsLister
}
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()

使用InformerFactory建立出對應版本的物件的Informer結構體,如Endpoints物件對應的就是EndpointsInformer結構體,該結構體實現了兩個方法:Informer()和Lister()

  • 前者用來構建出最終的Informer,即我們本篇文章的重點:SharedIndexInformer,
  • 後者用來獲取建立出來的Informer的快取介面:Indexer,該介面可以用來查詢快取的資料,我準備下一篇文章單獨介紹其底層如何實現快取的。

註冊事件方法

# Client-go/tools/cache/shared_informer.go
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc:    onAdd,
    UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此處省略 workqueue 的使用
    DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
  })
func onAdd(obj interface{}) {
  node := obj.(*corev1.Endpoint)
  fmt.Println("add a endpoint:", endpoint.Name)
}

這裡,首先呼叫Infomer()建立出來SharedIndexInformer,然後向其中註冊事件方法,這樣當有對應的事件發生時,就會觸發這裡註冊的方法去做相應的事情。其次呼叫Lister()獲取到快取介面,就可以通過它來查詢Informer中快取的資料了,而且Informer中快取的資料,是可以有索引的,這樣可以加快查詢的速度。

啟動Informer

# kubernetes/cmd/kube-controller-manager/app/controllermanager.go
controllerContext.InformerFactory.Start(controllerContext.Stop)

這裡InformerFactory的啟動,會遍歷Factory中建立的所有Informer,依次將其啟動。

機制解析

Informer的實現都是在client-go這個庫中,通過上述的工廠方法,其實最終建立出來的是一個叫做SharedIndexInformer的結構體:

# k8s.io/client-go/tools/cache/shared_informer.go
type sharedIndexInformer struct {
    indexer    Indexer
    controller Controller
    processor             *sharedProcessor
    cacheMutationDetector MutationDetector
    listerWatcher ListerWatcher
    ......
}
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
    realClock := &clock.RealClock{}
    sharedIndexInformer := &sharedIndexInformer{
        processor:                       &sharedProcessor{clock: realClock},
        indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
        listerWatcher:                   lw,
        objectType:                      exampleObject,
        resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
        defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
        cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
        clock:                           realClock,
    }
    return sharedIndexInformer
}

可以看到,在建立SharedIndexInformer時,就建立出了processor, indexer等結構,而在Informer啟動時,還建立出了controller, fifo queue, reflector等結構。

Reflector

Reflector的作用,就是通過List&Watch的方式,從apiserver獲取到感興趣的物件以及其狀態,然後將其放到一個稱為”Delta”的先進先出佇列中。

所謂的Delta FIFO Queue,就是佇列中的元素除了物件本身外,還有針對該物件的事件型別:

type Delta struct {
    Type   DeltaType
    Object interface{}
}

目前有5種Type: Added, Updated, Deleted, Replaced, Resync,所以,針對同一個物件,可能有多個Delta元素在佇列中,表示對該物件做了不同的操作,比如短時間內,多次對某一個物件進行了更新操作,那麼就會有多個Updated型別的Delta放入到佇列中。後續佇列的消費者,可以根據這些Delta的型別,來回撥註冊到Informer中的事件方法。

而所謂的List&Watch,就是

  • 先呼叫該API物件的List介面,獲取到物件列表,將它們新增到佇列中,Delta元素型別為Replaced,
  • 然後再呼叫Watch介面,持續監聽該API物件的狀態變化事件,將這些事件按照不同的事件型別,組成對應的Delta型別,新增到佇列中,Delta元素型別有Added, Updated, Deleted三種。

此外,Informer還會週期性的傳送Resync型別的Delta元素到佇列中,目的是為了週期性的觸發註冊到Informer中的事件方法UpdateFunc,保證物件的期望狀態和實際狀態一致,該週期是由一個叫做resyncPeriod的引數決定的,在向Informer中新增EventHandler時,可以指定該引數,若為0的話,則關閉該功能。需要注意的是,Resync型別的Delta元素中的物件,是通過Indexer從快取中獲取到的,而不是直接從apiserver中拿的,即這裡resync的,其實是”快取”的物件的期望狀態和實際狀態的一致性。

根據以上Reflector的機制,依賴Etcd的Watch機制,通過事件來獲知物件變化狀態,建立本地快取。即使在Informer中,也沒有周期性的呼叫物件的List介面,正常情況下,List&Watch只會執行一次,即先執行List把資料拉過來,放入佇列中,後續就進入Watch階段。

那什麼時候才會再執行List呢?其實就是異常的時候,在List或者Watch的過程中,如果有異常,比如apiserver重啟了,那麼Reflector就開始週期性的執行List&Watch,直到再次正常進入Watch階段。為了在異常時段,不給apiserver造成壓力,這個週期是一個稱為backoff的可變的時間間隔,預設是一個指數型的間隔,即越往後重試的間隔越長,到一定時間又會重置回一開始的頻率。而且,為了讓不同的apiserver能夠均勻負載這些Watch請求,使用者端會主動斷開跟apiserver的連線,這個超時時間為60秒,然後重新發起Watch請求。此外,在控制器重啟過程中,也會再次執行List,所以會觀察到之前已經建立好的API物件,又重新觸發了一遍AddFunc方法。

從以上這些點,可以看出來,Kubernetes在效能和穩定性的提升上,還是下了很多功夫的。

Controller

這裡Controller的作用是通過輪詢不斷從佇列中取出Delta元素,根據元素的型別,一方面通過Indexer更新原生的快取,一方面呼叫Processor來觸發註冊到Informer的事件方法:

# k8s.io/client-go/tools/cache/controller.go
func (c *controller) processLoop() {
    for {
        obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    }
}

這裡的c.config.Process是定義在shared_informer.go中的HandleDeltas()方法:

# k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
    s.blockDeltas.Lock()
    defer s.blockDeltas.Unlock()
    // from oldest to newest
    for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Replaced, Added, Updated:
            s.cacheMutationDetector.AddObject(d.Object)
            if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
                if err := s.indexer.Update(d.Object); err != nil {
                    return err
                }
                isSync := false
                switch {
                case d.Type == Sync:
                    // Sync events are only propagated to listeners that requested resync
                    isSync = true
                case d.Type == Replaced:
                    if accessor, err := meta.Accessor(d.Object); err == nil {
                        if oldAccessor, err := meta.Accessor(old); err == nil {
                            // Replaced events that didn't change resourceVersion are treated as resync events
                            // and only propagated to listeners that requested resync
                            isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
                        }
                    }
                }
                s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
            } else {
                if err := s.indexer.Add(d.Object); err != nil {
                    return err
                }
                s.processor.distribute(addNotification{newObj: d.Object}, false)
            }
        case Deleted:
            if err := s.indexer.Delete(d.Object); err != nil {
                return err
            }
            s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
        }
    }
    return nil
}

Processer & Listener

Processer和Listener則是觸發事件方法的機制,在建立Informer時,會建立一個Processer,而在向Informer中通過呼叫AddEventHandler()註冊事件方法時,會為每一個Handler生成一個Listener,然後將該Lisener中新增到Processer中,每一個Listener中有兩個channel:addCh和nextCh。Listener通過select監聽在這兩個channel上,當Controller從佇列中取出新的元素時,會呼叫processer來給它的listener傳送“通知”,這個“通知”就是向addCh中新增一個元素,即add(),然後一個goroutine就會將這個元素從addCh轉移到nextCh,即pop(),從而觸發另一個goroutine執行註冊的事件方法,即run()。

# k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    if sync {
        for _, listener := range p.syncingListeners {
            listener.add(obj)
        }
    } else {
        for _, listener := range p.listeners {
            listener.add(obj)
        }
    }
}
func (p *processorListener) add(notification interface{}) {
    p.addCh <- notification
}
func (p *processorListener) pop() {
    defer utilruntime.HandleCrash()
    defer close(p.nextCh) // Tell .run() to stop
    var nextCh chan<- interface{}
    var notification interface{}
    for {
        select {
        case nextCh <- notification:
            // Notification dispatched
            var ok bool
            notification, ok = p.pendingNotifications.ReadOne()
            if !ok { // Nothing to pop
                nextCh = nil // Disable this select case
            }
        case notificationToAdd, ok := <-p.addCh:
            if !ok {
                return
            }
            if notification == nil { // No notification to pop (and pendingNotifications is empty)
                // Optimize the case - skip adding to pendingNotifications
                notification = notificationToAdd
                nextCh = p.nextCh
            } else { // There is already a notification waiting to be dispatched
                p.pendingNotifications.WriteOne(notificationToAdd)
            }
        }
    }
}
func (p *processorListener) run() {
    // this call blocks until the channel is closed.  When a panic happens during the notification
    // we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
    // the next notification will be attempted.  This is usually better than the alternative of never
    // delivering again.
    stopCh := make(chan struct{})
    wait.Until(func() {
        for next := range p.nextCh {
            switch notification := next.(type) {
            case updateNotification:
                p.handler.OnUpdate(notification.oldObj, notification.newObj)
            case addNotification:
                p.handler.OnAdd(notification.newObj)
            case deleteNotification:
                p.handler.OnDelete(notification.oldObj)
            default:
                utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
            }
        }
        // the only way to get here is if the p.nextCh is empty and closed
        close(stopCh)
    }, 1*time.Second, stopCh)
}

Indexer

Indexer是對快取進行增刪查改的介面,快取本質上就是用map構建的key:value鍵值對,都存在items這個map中,key為/:

type threadSafeMap struct {
    lock  sync.RWMutex
    items map[string]interface{}
    // indexers maps a name to an IndexFunc
    indexers Indexers
    // indices maps a name to an Index
    indices Indices
}

而為了加速查詢,還可以選擇性的給這些快取新增索引,索引儲存在indecies中,所謂索引,就是在向快取中新增記錄時,就將其key新增到索引結構中,在查詢時,可以根據索引條件,快速查詢到指定的key記錄,比如預設有個索引是按照namespace進行索引,可以根據快速找出屬於某個namespace的某種物件,而不用去遍歷所有的快取。

Indexer對外提供了Replace(), Resync(), Add(), Update(), Delete(), List(), Get(), GetByKey(), ByIndex()等介面。

總結

本篇對Kubernetes Informer的使用方法和實現原理,進行了深入分析,整體上看,Informer的設計是相當不錯的,基於事件機制,一方面構建本地快取,一方面觸發事件方法,使得控制器能夠快速響應和快速獲取資料,此外,還有諸如共用Informer, resync, index, watch timeout等機制,使得Informer更加高效和穩定,有了Informer,控制器模式可以說是如虎添翼。

以上就是go語言K8S 的 informer機制淺析的詳細內容,更多關於go K8S informer機制淺析的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com