<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
理解Informer的資料儲存方式 程式碼在k8s.io/client-go/tools/cache/controller
func (c *controller) processLoop() { for { // Pop出Object元素 obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) if err != nil { if err == ErrFIFOClosed { return } if c.config.RetryOnError { // 重新進佇列 c.config.Queue.AddIfNotPresent(obj) } } } } // 去檢視Pop的具體實現 點進Pop 找到fifo.go func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { f.lock.Lock() defer f.lock.Unlock() for { // 呼叫process去處理item,然後返回 item, ok := f.items[id] delete(f.items, id) err := process(item) return item, err } } // 然後去查一下 PopProcessFunc 的定義,在建立controller前 share_informer.go的Run()裡面 cfg := &Config{ Process: s.HandleDeltas, } func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error { s.blockDeltas.Lock() defer s.blockDeltas.Unlock() for _, d := range obj.(Deltas) { switch d.Type { // 增、改、替換、同步 case Sync, Replaced, Added, Updated: s.cacheMutationDetector.AddObject(d.Object) // 先去indexer查詢 if old, exists, err := s.indexer.Get(d.Object); err == nil && exists { // 如果資料已經存在,就執行Update邏輯 if err := s.indexer.Update(d.Object); err != nil { return err } isSync := false switch { case d.Type == Sync: isSync = true case d.Type == Replaced: if accessor, err := meta.Accessor(d.Object); err == nil { isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion() } } } // 分發Update事件 s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync) } else { // 沒查到資料,就執行Add操作 if err := s.indexer.Add(d.Object); err != nil { return err } // 分發 Add 事件 s.processor.distribute(addNotification{newObj: d.Object}, false) } // 刪除 case Deleted: // 去indexer刪除 if err := s.indexer.Delete(d.Object); err != nil { return err } // 分發 delete 事件 s.processor.distribute(deleteNotification{oldObj: d.Object}, false) } } return nil }
Index
的定義為資源的本地儲存,保持與etcd中的資源資訊一致。
// 我們去看看Index是怎麼建立的 func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer { realClock := &clock.RealClock{} sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, // indexer 的初始化 indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers), listerWatcher: lw, objectType: exampleObject, resyncCheckPeriod: defaultEventHandlerResyncPeriod, defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod, cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)), clock: realClock, } return sharedIndexInformer } // 生成一個map和func組合而成的Indexer func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer { return &cache{ cacheStorage: NewThreadSafeStore(indexers, Indices{}), keyFunc: keyFunc, } // ThreadSafeStore的底層是一個並行安全的map,具體實現我們暫不考慮 func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore { return &threadSafeMap{ items: map[string]interface{}{}, indexers: indexers, indices: indices, } }
// 在上面的Process程式碼中,我們看到了將資料儲存到Indexer後,呼叫了一個分發的函數 s.processor.distribute() // 分發process的建立 func NewSharedIndexInformer() SharedIndexInformer { sharedIndexInformer := &sharedIndexInformer{ processor: &sharedProcessor{clock: realClock}, } return sharedIndexInformer } // sharedProcessor的結構 type sharedProcessor struct { listenersStarted bool // 讀寫鎖 listenersLock sync.RWMutex // 普通監聽列表 listeners []*processorListener // 同步監聽列表 syncingListeners []*processorListener clock clock.Clock wg wait.Group } // 檢視distribute函數 func (p *sharedProcessor) distribute(obj interface{}, sync bool) { p.listenersLock.RLock() defer p.listenersLock.RUnlock() // 將object分發到 同步監聽 或者 普通監聽 的列表 if sync { for _, listener := range p.syncingListeners { listener.add(obj) } } else { for _, listener := range p.listeners { listener.add(obj) } } } // 這個add的操作是利用了channel func (p *processorListener) add(notification interface{}) { p.addCh <- notification }
在前面,我們瞭解了Pod排程演演算法的註冊和Informer機制來監聽kube-apiserver上的資源變化,這一次,我們就將兩者串聯起來,看看在kube-scheduler中,Informer監聽到資源變化後,如何用排程演演算法將pod進行排程。
// 在setup()中找到scheduler // 在執行 kube-scheduler 的初期,我們建立了一個Scheduler的資料結構,回頭再看看有什麼和pod排程演演算法相關的 type Scheduler struct { SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm // 獲取下一個需要排程的Pod NextPod func() *framework.QueuedPodInfo Error func(*framework.QueuedPodInfo, error) StopEverything <-chan struct{} // 等待排程的Pod佇列,我們重點看看這個佇列是什麼 SchedulingQueue internalqueue.SchedulingQueue Profiles profile.Map scheduledPodsHasSynced func() bool client clientset.Interface } // Scheduler的範例化函數 在最新的版本中少了create這一層 直接是進行裡面的邏輯 func New(){ var sched *Scheduler switch { // 從 Provider 建立 case source.Provider != nil: sc, err := configurator.createFromProvider(*source.Provider) sched = sc // 從檔案或者ConfigMap中建立 case source.Policy != nil: sc, err := configurator.createFromConfig(*policy) sched = sc default: return nil, fmt.Errorf("unsupported algorithm source: %v", source) } } // 兩個建立方式,底層都是呼叫的 create 函數 func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) { return c.create() } func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, error){ return c.create() } func (c *Configurator) create() (*Scheduler, error) { // 範例化 podQueue podQueue := internalqueue.NewSchedulingQueue( lessFn, internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second), internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second), internalqueue.WithPodNominator(nominator), ) return &Scheduler{ SchedulerCache: c.schedulerCache, Algorithm: algo, Profiles: profiles, // NextPod 函數依賴於 podQueue NextPod: internalqueue.MakeNextPodFunc(podQueue), Error: MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache), StopEverything: c.StopEverything, // 排程佇列被賦值為podQueue SchedulingQueue: podQueue, }, nil } // 再看看這個排程佇列的初始化函數,點進去podQueue,從命名可以看到是一個優先佇列,它的實現細節暫不細看 // 結合實際情況思考下,pod會有重要程度的區分,所以排程的順序需要考慮優先順序的 func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue { return NewPriorityQueue(lessFn, opts...) }
// 在上面範例化Scheduler後,有個註冊事件 Handler 的函數:addAllEventHandlers(sched, informerFactory, podInformer) informer接到訊息之後觸發對應的Handler func addAllEventHandlers( sched *Scheduler, informerFactory informers.SharedInformerFactory, podInformer coreinformers.PodInformer, ) { /* 函數前後有很多註冊的Handler,但是和未排程pod新增到佇列相關的,只有這個 */ podInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ // 定義過濾函數:必須為未排程的pod FilterFunc: func(obj interface{}) bool { switch t := obj.(type) { case *v1.Pod: return !assignedPod(t) && responsibleForPod(t, sched.Profiles) case cache.DeletedFinalStateUnknown: if pod, ok := t.Obj.(*v1.Pod); ok { return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles) } utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched)) return false default: utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj)) return false } }, // 增改刪三個操作對應的Handler,操作到對應的Queue Handler: cache.ResourceEventHandlerFuncs{ AddFunc: sched.addPodToSchedulingQueue, UpdateFunc: sched.updatePodInSchedulingQueue, DeleteFunc: sched.deletePodFromSchedulingQueue, }, }, ) } // 牢記我們第一階段要分析的物件:create nginx pod,所以進入這個add的操作,對應加入到佇列 func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) { pod := obj.(*v1.Pod) klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name) // 加入到佇列 if err := sched.SchedulingQueue.Add(pod); err != nil { utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) } } // 在範例化Scheduler的地方 // 入隊操作我們清楚了,那出隊呢?我們回過頭去看看上面定義的NextPod的方法實現 func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo { return func() *framework.QueuedPodInfo { // 從佇列中彈出 podInfo, err := queue.Pop() if err == nil { klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name) return podInfo } klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err) return nil } }
// 瞭解入隊和出隊操作後,我們看一下Scheduler執行的過程 func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } sched.SchedulingQueue.Run() // 排程一個pod物件 wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() } // 接下來scheduleOne方法程式碼很長,我們一步一步來看 func (sched *Scheduler) scheduleOne(ctx context.Context) { // podInfo 就是從佇列中獲取到的pod物件 podInfo := sched.NextPod() // 檢查pod的有效性 if podInfo == nil || podInfo.Pod == nil { return } pod := podInfo.Pod // 根據定義的 pod.Spec.SchedulerName 查到對應的profile prof, err := sched.profileForPod(pod) if err != nil { klog.Error(err) return } // 可以跳過排程的情況,一般pod進不來 if sched.skipPodSchedule(prof, pod) { return } // 呼叫排程演演算法,獲取結果 scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) if err != nil { /* 出現排程失敗的情況: 這個時候可能會觸發搶佔preempt,搶佔是一套複雜的邏輯,後面我們專門會講 目前假設各類資源充足,能正常排程 */ } metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start)) // assumePod 是假設這個Pod按照前面的排程演演算法分配後,進行驗證 assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod // SuggestedHost 為建議的分配的Host err = sched.assume(assumedPod, scheduleResult.SuggestedHost) if err != nil { // 失敗就重新分配,不考慮這種情況 } // 執行相關外掛的程式碼先跳過 比如一些搶佔外掛 // 非同步繫結pod go func() { // 有一系列的檢查工作 // 真正做繫結的動作 err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state) if err != nil { // 錯誤處理,清除狀態並重試 } else { // 列印結果,偵錯時將log level調整到2以上 if klog.V(2).Enabled() { klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes) } // metrics中記錄相關的監控指標 metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start)) metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts)) metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp)) // 執行繫結後的外掛 prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() }
// 呼叫演演算法下的Schedule func New(){ scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) } func (c *Configurator) create() (*Scheduler, error) { algo := core.NewGenericScheduler( c.schedulerCache, c.nodeInfoSnapshot, extenders, c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(), c.disablePreemption, c.percentageOfNodesToScore, ) return &Scheduler{ Algorithm: algo, }, nil } // genericScheduler 的 Schedule 的實現 func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) { // 對 pod 進行 pvc 的資訊檢查 if err := podPassesBasicChecks(pod, g.pvcLister); err != nil { return result, err } // 對當前的資訊做一個快照 if err := g.snapshot(); err != nil { return result, err } // Node 節點數量為0,表示無可用節點 if g.nodeInfoSnapshot.NumNodes() == 0 { return result, ErrNoNodesAvailable } // Predict階段:找到所有滿足排程條件的節點feasibleNodes,不滿足的就直接過濾 feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod) // 沒有可用節點直接報錯 if len(feasibleNodes) == 0 { return result, &FitError{ Pod: pod, NumAllNodes: g.nodeInfoSnapshot.NumNodes(), FilteredNodesStatuses: filteredNodesStatuses, } } // 只有一個節點就直接選用 if len(feasibleNodes) == 1 { return ScheduleResult{ SuggestedHost: feasibleNodes[0].Name, EvaluatedNodes: 1 + len(filteredNodesStatuses), FeasibleNodes: 1, }, nil } // Priority階段:通過打分,找到一個分數最高、也就是最優的節點 priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes) host, err := g.selectHost(priorityList) return ScheduleResult{ SuggestedHost: host, EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses), FeasibleNodes: len(feasibleNodes), }, err } /* Predict 和 Priority 是選擇排程節點的兩個關鍵性步驟, 它的底層呼叫了各種algorithm演演算法。我們暫時不細看。 以我們前面講到過的 NodeName 演演算法為例,節點必須與 NodeName 匹配,它是屬於Predict階段的。 在新版本中 這部分演演算法的實現放到了extenders,邏輯是一樣的 */
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { // 將 host 填入到 pod spec欄位的nodename,假定分配到對應的節點上 assumed.Spec.NodeName = host // 呼叫 SchedulerCache 下的 AssumePod if err := sched.SchedulerCache.AssumePod(assumed); err != nil { klog.Errorf("scheduler cache AssumePod failed: %v", err) return err } if sched.SchedulingQueue != nil { sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed) } return nil } // 回頭去找 SchedulerCache 初始化的地方 func (c *Configurator) create() (*Scheduler, error) { return &Scheduler{ SchedulerCache: c.schedulerCache, }, nil } func New() (*Scheduler, error) { // 這裡就是初始化的範例 schedulerCache schedulerCache := internalcache.New(30*time.Second, stopEverything) configurator := &Configurator{ schedulerCache: schedulerCache, } } // 看看AssumePod做了什麼 func (cache *schedulerCache) AssumePod(pod *v1.Pod) error { // 獲取 pod 的 uid key, err := framework.GetPodKey(pod) if err != nil { return err } // 加鎖操作,保證並行情況下的一致性 cache.mu.Lock() defer cache.mu.Unlock() // 根據 uid 找不到 pod 當前的狀態 看看被排程了沒有 if _, ok := cache.podStates[key]; ok { return fmt.Errorf("pod %v is in the cache, so can't be assumed", key) } // 把 Assume Pod 的資訊放到對應 Node 節點中 cache.addPod(pod) // 把 pod 狀態設定為 Assume 成功 ps := &podState{ pod: pod, } cache.podStates[key] = ps cache.assumedPods[key] = true return nil }
func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) { start := time.Now() // 把 assumed 的 pod 資訊儲存下來 defer func() { sched.finishBinding(prof, assumed, targetNode, start, err) }() // 階段1: 執行擴充套件繫結進行驗證,如果已經繫結報錯 bound, err := sched.extendersBinding(assumed, targetNode) if bound { return err } // 階段2:執行繫結外掛驗證狀態 bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode) if bindStatus.IsSuccess() { return nil } if bindStatus.Code() == framework.Error { return bindStatus.AsError() } return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message()) }
// 這塊的程式碼我不做細緻的逐層分析了,大家根據興趣自行探索 func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status { klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName) binding := &v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID}, Target: v1.ObjectReference{Kind: "Node", Name: nodeName}, } // ClientSet就是存取kube-apiserver的使用者端,將資料更新上去 err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{}) if err != nil { return framework.NewStatus(framework.Error, err.Error()) } return nil }
站在前人的肩膀上,向前輩致敬,Respect!
Informer
依賴於 Reflector
模組,它有個元件為 xxxInformer,如 podInformer
Informer
包含了一個連線到kube-apiserver
的client
,通過List
和Watch
介面查詢資源變更情況檢測到資源發生變化後,通過Controller
將資料放入佇列DeltaFIFOQueue
裡,生產階段完成
在DeltaFIFOQueue
的另一端,有消費者在不停地處理資源變化的事件,處理邏輯主要分2步
distribute
將object分發到同步監聽或者普通監聽的列表,然後被對應的handler處理
SchedulingQueue
非同步工作的單個pod的排程主要分為3個步驟:
Assume
這個Pod被排程到對應的Node,儲存到cache,加鎖保證一致性。Bind
繫結成功後,將資料通過client向kube-apiserver傳送,更新etcd
以上就是Kubernetes Informer資料儲存Index與Pod分配流程解析的詳細內容,更多關於Kubernetes Informer資料儲存的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45