<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Kubernetes的控制器模式是其非常重要的一個設計模式,整個Kubernetes定義的資源物件以及其狀態都儲存在etcd資料庫中,通過apiserver對其進行增刪查改,而各種各樣的控制器需要從apiserver及時獲取這些物件,然後將其應用到實際中,即將這些物件的實際狀態調整為期望狀態,讓他們保持匹配。
不過這因為這樣,各種控制器需要和apiserver進行頻繁互動,需要能夠及時獲取物件狀態的變化,而如果簡單的通過暴力輪詢的話,會給apiserver造成很大的壓力,且效率很低,因此,Kubernetes設計了Informer這個機制,用來作為控制器跟apiserver互動的橋樑,它主要有兩方面的作用:
依賴Etcd的List&Watch機制,在本地維護了一份目標物件的快取。
Etcd的Watch機制能夠使使用者端及時獲知這些物件的狀態變化,然後通過List機制,更新本地快取,這樣就在使用者端為這些API物件維護了一份和Etcd資料庫中幾乎一致的資料,然後控制器等使用者端就可以直接存取快取獲取物件的資訊,而不用去直接存取apiserver,這一方面顯著提高了效能,另一方面則大大降低了對apiserver的存取壓力;
依賴Etcd的Watch機制,觸發控制器等使用者端註冊到Informer中的事件方法。
使用者端可能會對某些物件的某些事件感興趣,當這些事件發生時,希望能夠執行某些操作,比如通過apiserver新建了一個pod,那麼kube-scheduler中的控制器收到了這個事件,然後將這個pod加入到其佇列中,等待進行排程。
Kubernetes的各個元件本身就內建了非常多的控制器,而自定義的控制器也需要通過Informer跟apiserver進行互動,因此,Informer在Kubernetes中應用非常廣泛,本篇文章就重點分析下Informer的機制原理,以加深對其的理解。
先來看看Informer是怎麼用的,以Endpoint為例,來看下其使用Informer的相關程式碼:
# client-go/informers/factory.go sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
首先建立了一個SharedInformerFactory,這個結構主要有兩個作用:
# client-go/informers/core/v1/endpoints.go type EndpointsInformer interface { Informer() cache.SharedIndexInformer Lister() v1.EndpointsLister } endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()
使用InformerFactory建立出對應版本的物件的Informer結構體,如Endpoints物件對應的就是EndpointsInformer結構體,該結構體實現了兩個方法:Informer()和Lister()
# Client-go/tools/cache/shared_informer.go informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: onAdd, UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此處省略 workqueue 的使用 DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") }, }) func onAdd(obj interface{}) { node := obj.(*corev1.Endpoint) fmt.Println("add a endpoint:", endpoint.Name) }
這裡,首先呼叫Infomer()建立出來SharedIndexInformer,然後向其中註冊事件方法,這樣當有對應的事件發生時,就會觸發這裡註冊的方法去做相應的事情。其次呼叫Lister()獲取到快取介面,就可以通過它來查詢Informer中快取的資料了,而且Informer中快取的資料,是可以有索引的,這樣可以加快查詢的速度。
# kubernetes/cmd/kube-controller-manager/app/controllermanager.go controllerContext.InformerFactory.Start(controllerContext.Stop)
這裡InformerFactory的啟動,會遍歷Factory中建立的所有Informer,依次將其啟動。
Informer的實現都是在client-go這個庫中,通過上述的工廠方法,其實最終建立出來的是一個叫做SharedIndexInformer的結構體:
# k8s.io/client-go/tools/cache/shared_informer.go type sharedIndexInformer struct { indexer Indexer controller Controller processor *sharedProcessor cacheMutationDetector MutationDetector listerWatcher ListerWatcher ...... } func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), clock: realClock, } return sharedIndexInformer }
可以看到,在建立SharedIndexInformer時,就建立出了processor, indexer等結構,而在Informer啟動時,還建立出了controller, fifo queue, reflector等結構。
Reflector的作用,就是通過List&Watch的方式,從apiserver獲取到感興趣的物件以及其狀態,然後將其放到一個稱為”Delta”的先進先出佇列中。
所謂的Delta FIFO Queue,就是佇列中的元素除了物件本身外,還有針對該物件的事件型別:
type Delta struct { Type DeltaType Object interface{} }
目前有5種Type: Added, Updated, Deleted, Replaced, Resync,所以,針對同一個物件,可能有多個Delta元素在佇列中,表示對該物件做了不同的操作,比如短時間內,多次對某一個物件進行了更新操作,那麼就會有多個Updated型別的Delta放入到佇列中。後續佇列的消費者,可以根據這些Delta的型別,來回撥註冊到Informer中的事件方法。
而所謂的List&Watch,就是
此外,Informer還會週期性的傳送Resync型別的Delta元素到佇列中,目的是為了週期性的觸發註冊到Informer中的事件方法UpdateFunc,保證物件的期望狀態和實際狀態一致,該週期是由一個叫做resyncPeriod的引數決定的,在向Informer中新增EventHandler時,可以指定該引數,若為0的話,則關閉該功能。需要注意的是,Resync型別的Delta元素中的物件,是通過Indexer從快取中獲取到的,而不是直接從apiserver中拿的,即這裡resync的,其實是”快取”的物件的期望狀態和實際狀態的一致性。
根據以上Reflector的機制,依賴Etcd的Watch機制,通過事件來獲知物件變化狀態,建立本地快取。即使在Informer中,也沒有周期性的呼叫物件的List介面,正常情況下,List&Watch只會執行一次,即先執行List把資料拉過來,放入佇列中,後續就進入Watch階段。
那什麼時候才會再執行List呢?其實就是異常的時候,在List或者Watch的過程中,如果有異常,比如apiserver重啟了,那麼Reflector就開始週期性的執行List&Watch,直到再次正常進入Watch階段。為了在異常時段,不給apiserver造成壓力,這個週期是一個稱為backoff的可變的時間間隔,預設是一個指數型的間隔,即越往後重試的間隔越長,到一定時間又會重置回一開始的頻率。而且,為了讓不同的apiserver能夠均勻負載這些Watch請求,使用者端會主動斷開跟apiserver的連線,這個超時時間為60秒,然後重新發起Watch請求。此外,在控制器重啟過程中,也會再次執行List,所以會觀察到之前已經建立好的API物件,又重新觸發了一遍AddFunc方法。
從以上這些點,可以看出來,Kubernetes在效能和穩定性的提升上,還是下了很多功夫的。
這裡Controller的作用是通過輪詢不斷從佇列中取出Delta元素,根據元素的型別,一方面通過Indexer更新原生的快取,一方面呼叫Processor來觸發註冊到Informer的事件方法:
# k8s.io/client-go/tools/cache/controller.go func (c *controller) processLoop() { for { obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) } }
這裡的c.config.Process是定義在shared_informer.go中的HandleDeltas()方法:
# k8s.io/client-go/tools/cache/shared_informer.go func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() // from oldest to newest for _, d := range obj.(Deltas) { switch d.Type { case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: // Sync events are only propagated to listeners that requested resync isSync = true case d.Type == Replaced: if accessor, err := meta.Accessor(d.Object); err == nil { if oldAccessor, err := meta.Accessor(old); err == nil { // Replaced events that didn't change resourceVersion are treated as resync events // and only propagated to listeners that requested resync isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { if err := s.indexer.Add(d.Object); err != nil { return err } s.processor.distribute(addNotification{newObj: d.Object}, false) } case Deleted: if err := s.indexer.Delete(d.Object); err != nil { return err } s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
Processer和Listener則是觸發事件方法的機制,在建立Informer時,會建立一個Processer,而在向Informer中通過呼叫AddEventHandler()註冊事件方法時,會為每一個Handler生成一個Listener,然後將該Lisener中新增到Processer中,每一個Listener中有兩個channel:addCh和nextCh。Listener通過select監聽在這兩個channel上,當Controller從佇列中取出新的元素時,會呼叫processer來給它的listener傳送“通知”,這個“通知”就是向addCh中新增一個元素,即add(),然後一個goroutine就會將這個元素從addCh轉移到nextCh,即pop(),從而觸發另一個goroutine執行註冊的事件方法,即run()。
# k8s.io/client-go/tools/cache/shared_informer.go func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } } func (p *processorListener) add(notification interface{}) { p.addCh <- notification } func (p *processorListener) pop() { defer utilruntime.HandleCrash() defer close(p.nextCh) // Tell .run() to stop var nextCh chan<- interface{} var notification interface{} for { select { case nextCh <- notification: // Notification dispatched var ok bool notification, ok = p.pendingNotifications.ReadOne() if !ok { // Nothing to pop nextCh = nil // Disable this select case } case notificationToAdd, ok := <-p.addCh: if !ok { return } if notification == nil { // No notification to pop (and pendingNotifications is empty) // Optimize the case - skip adding to pendingNotifications notification = notificationToAdd nextCh = p.nextCh } else { // There is already a notification waiting to be dispatched p.pendingNotifications.WriteOne(notificationToAdd) } } } } func (p *processorListener) run() { // this call blocks until the channel is closed. When a panic happens during the notification // we will catch it, **the offending item will be skipped!**, and after a short delay (one second) // the next notification will be attempted. This is usually better than the alternative of never // delivering again. stopCh := make(chan struct{}) wait.Until(func() { for next := range p.nextCh { switch notification := next.(type) { case updateNotification: p.handler.OnUpdate(notification.oldObj, notification.newObj) case addNotification: p.handler.OnAdd(notification.newObj) case deleteNotification: p.handler.OnDelete(notification.oldObj) default: utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next)) } } // the only way to get here is if the p.nextCh is empty and closed close(stopCh) }, 1*time.Second, stopCh) }
Indexer是對快取進行增刪查改的介面,快取本質上就是用map構建的key:value鍵值對,都存在items這個map中,key為/:
type threadSafeMap struct { lock sync.RWMutex items map[string]interface{} // indexers maps a name to an IndexFunc indexers Indexers // indices maps a name to an Index indices Indices }
而為了加速查詢,還可以選擇性的給這些快取新增索引,索引儲存在indecies中,所謂索引,就是在向快取中新增記錄時,就將其key新增到索引結構中,在查詢時,可以根據索引條件,快速查詢到指定的key記錄,比如預設有個索引是按照namespace進行索引,可以根據快速找出屬於某個namespace的某種物件,而不用去遍歷所有的快取。
Indexer對外提供了Replace(), Resync(), Add(), Update(), Delete(), List(), Get(), GetByKey(), ByIndex()等介面。
本篇對Kubernetes Informer的使用方法和實現原理,進行了深入分析,整體上看,Informer的設計是相當不錯的,基於事件機制,一方面構建本地快取,一方面觸發事件方法,使得控制器能夠快速響應和快速獲取資料,此外,還有諸如共用Informer, resync, index, watch timeout等機制,使得Informer更加高效和穩定,有了Informer,控制器模式可以說是如虎添翼。
以上就是go語言K8S 的 informer機制淺析的詳細內容,更多關於go K8S informer機制淺析的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45