首頁 > 軟體

Kubernetes Informer資料儲存Index與Pod分配流程解析

2022-11-27 14:01:45

確立目標

  • 理解Informer的資料儲存方式
  • 大致理解Pod的分配流程

理解Informer的資料儲存方式 程式碼在k8s.io/client-go/tools/cache/controller

Process 檢視消費的過程

func (c *controller) processLoop() {
	for {
    // Pop出Object元素
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		if err != nil {
			if err == ErrFIFOClosed {
				return
			}
			if c.config.RetryOnError {
				// 重新進佇列
				c.config.Queue.AddIfNotPresent(obj)
			}
		}
	}
}
// 去檢視Pop的具體實現 點進Pop 找到fifo.go
func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for {
		// 呼叫process去處理item,然後返回
		item, ok := f.items[id]
		delete(f.items, id)
		err := process(item)
		return item, err
	}
}
// 然後去查一下 PopProcessFunc 的定義,在建立controller前 share_informer.go的Run()裡面
cfg := &Config{
		Process:           s.HandleDeltas,
	}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
	s.blockDeltas.Lock()
	defer s.blockDeltas.Unlock()
	for _, d := range obj.(Deltas) {
		switch d.Type {
    // 增、改、替換、同步
		case Sync, Replaced, Added, Updated:
			s.cacheMutationDetector.AddObject(d.Object)
      // 先去indexer查詢
			if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
        // 如果資料已經存在,就執行Update邏輯
				if err := s.indexer.Update(d.Object); err != nil {
					return err
				}
				isSync := false
				switch {
				case d.Type == Sync:
					isSync = true
				case d.Type == Replaced:
					if accessor, err := meta.Accessor(d.Object); err == nil {
							isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
						}
					}
				}
      	// 分發Update事件
				s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
			} else {
      	// 沒查到資料,就執行Add操作
				if err := s.indexer.Add(d.Object); err != nil {
					return err
				}
      	// 分發 Add 事件
				s.processor.distribute(addNotification{newObj: d.Object}, false)
			}
   	// 刪除
		case Deleted:
    	// 去indexer刪除
			if err := s.indexer.Delete(d.Object); err != nil {
				return err
			}
    	// 分發 delete 事件
			s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
		}
	}
	return nil
}

Index 掌握Index資料結構

Index 的定義為資源的本地儲存,保持與etcd中的資源資訊一致。

// 我們去看看Index是怎麼建立的
func NewSharedIndexInformer(lw ListerWatcher, exampleObject runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
	realClock := &clock.RealClock{}
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
    // indexer 的初始化
		indexer:                         NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
		listerWatcher:                   lw,
		objectType:                      exampleObject,
		resyncCheckPeriod:               defaultEventHandlerResyncPeriod,
		defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
		cacheMutationDetector:           NewCacheMutationDetector(fmt.Sprintf("%T", exampleObject)),
		clock:                           realClock,
	}
	return sharedIndexInformer
}
// 生成一個map和func組合而成的Indexer
func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
	return &cache{
		cacheStorage: NewThreadSafeStore(indexers, Indices{}),
		keyFunc:      keyFunc,
}
// ThreadSafeStore的底層是一個並行安全的map,具體實現我們暫不考慮
func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
	return &threadSafeMap{
		items:    map[string]interface{}{},
		indexers: indexers,
		indices:  indices,
	}
}

distribute 資訊的分發distribute

// 在上面的Process程式碼中,我們看到了將資料儲存到Indexer後,呼叫了一個分發的函數
s.processor.distribute()
// 分發process的建立
func NewSharedIndexInformer() SharedIndexInformer {
	sharedIndexInformer := &sharedIndexInformer{
		processor:                       &sharedProcessor{clock: realClock},
	}
	return sharedIndexInformer
}
// sharedProcessor的結構
type sharedProcessor struct {
	listenersStarted bool
 	// 讀寫鎖
	listenersLock    sync.RWMutex
  // 普通監聽列表
	listeners        []*processorListener
  // 同步監聽列表
	syncingListeners []*processorListener
	clock            clock.Clock
	wg               wait.Group
}
// 檢視distribute函數
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
	p.listenersLock.RLock()
	defer p.listenersLock.RUnlock()
	// 將object分發到 同步監聽 或者 普通監聽 的列表
	if sync {
		for _, listener := range p.syncingListeners {
			listener.add(obj)
		}
	} else {
		for _, listener := range p.listeners {
			listener.add(obj)
		}
	}
}
// 這個add的操作是利用了channel
func (p *processorListener) add(notification interface{}) {
	p.addCh <- notification
}

理解一個pod的被排程的大致流程

Scheduler

在前面,我們瞭解了Pod排程演演算法的註冊和Informer機制來監聽kube-apiserver上的資源變化,這一次,我們就將兩者串聯起來,看看在kube-scheduler中,Informer監聽到資源變化後,如何用排程演演算法將pod進行排程。

// 在setup()中找到scheduler
// 在執行 kube-scheduler 的初期,我們建立了一個Scheduler的資料結構,回頭再看看有什麼和pod排程演演算法相關的
type Scheduler struct {
	SchedulerCache internalcache.Cache
	Algorithm core.ScheduleAlgorithm
	// 獲取下一個需要排程的Pod
	NextPod func() *framework.QueuedPodInfo
	Error func(*framework.QueuedPodInfo, error)
	StopEverything <-chan struct{}
	// 等待排程的Pod佇列,我們重點看看這個佇列是什麼
	SchedulingQueue internalqueue.SchedulingQueue
	Profiles profile.Map
	scheduledPodsHasSynced func() bool
	client clientset.Interface
}
// Scheduler的範例化函數 在最新的版本中少了create這一層 直接是進行裡面的邏輯
func New(){
  var sched *Scheduler
	switch {
  // 從 Provider 建立
	case source.Provider != nil:
		sc, err := configurator.createFromProvider(*source.Provider)
		sched = sc
  // 從檔案或者ConfigMap中建立
	case source.Policy != nil:
		sc, err := configurator.createFromConfig(*policy)
		sched = sc
	default:
		return nil, fmt.Errorf("unsupported algorithm source: %v", source)
	}
}
// 兩個建立方式,底層都是呼叫的 create 函數
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
	return c.create()
}
func (c *Configurator) createFromConfig(policy schedulerapi.Policy) (*Scheduler, error){
	return c.create()
}
func (c *Configurator) create() (*Scheduler, error) {
	// 範例化 podQueue
	podQueue := internalqueue.NewSchedulingQueue(
		lessFn,
		internalqueue.WithPodInitialBackoffDuration(time.Duration(c.podInitialBackoffSeconds)*time.Second),
		internalqueue.WithPodMaxBackoffDuration(time.Duration(c.podMaxBackoffSeconds)*time.Second),
		internalqueue.WithPodNominator(nominator),
	)
	return &Scheduler{
		SchedulerCache:  c.schedulerCache,
		Algorithm:       algo,
		Profiles:        profiles,
    // NextPod 函數依賴於 podQueue
		NextPod:         internalqueue.MakeNextPodFunc(podQueue),
		Error:           MakeDefaultErrorFunc(c.client, c.informerFactory.Core().V1().Pods().Lister(), podQueue, c.schedulerCache),
		StopEverything:  c.StopEverything,
    // 排程佇列被賦值為podQueue
		SchedulingQueue: podQueue,
	}, nil
}
// 再看看這個排程佇列的初始化函數,點進去podQueue,從命名可以看到是一個優先佇列,它的實現細節暫不細看
// 結合實際情況思考下,pod會有重要程度的區分,所以排程的順序需要考慮優先順序的
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
	return NewPriorityQueue(lessFn, opts...)
}

SchedulingQueue

// 在上面範例化Scheduler後,有個註冊事件 Handler 的函數:addAllEventHandlers(sched, informerFactory, podInformer)  informer接到訊息之後觸發對應的Handler
func addAllEventHandlers(
	sched *Scheduler,
	informerFactory informers.SharedInformerFactory,
	podInformer coreinformers.PodInformer,
) {
	/*
	函數前後有很多註冊的Handler,但是和未排程pod新增到佇列相關的,只有這個
	*/
	podInformer.Informer().AddEventHandler(
		cache.FilteringResourceEventHandler{
      // 定義過濾函數:必須為未排程的pod
			FilterFunc: func(obj interface{}) bool {
				switch t := obj.(type) {
				case *v1.Pod:
					return !assignedPod(t) && responsibleForPod(t, sched.Profiles)
				case cache.DeletedFinalStateUnknown:
					if pod, ok := t.Obj.(*v1.Pod); ok {
						return !assignedPod(pod) && responsibleForPod(pod, sched.Profiles)
					}
					utilruntime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, sched))
					return false
				default:
					utilruntime.HandleError(fmt.Errorf("unable to handle object in %T: %T", sched, obj))
					return false
				}
			},
     	// 增改刪三個操作對應的Handler,操作到對應的Queue
			Handler: cache.ResourceEventHandlerFuncs{
				AddFunc:    sched.addPodToSchedulingQueue,
				UpdateFunc: sched.updatePodInSchedulingQueue,
				DeleteFunc: sched.deletePodFromSchedulingQueue,
			},
		},
	)
}
// 牢記我們第一階段要分析的物件:create nginx pod,所以進入這個add的操作,對應加入到佇列
func (sched *Scheduler) addPodToSchedulingQueue(obj interface{}) {
	pod := obj.(*v1.Pod)
	klog.V(3).Infof("add event for unscheduled pod %s/%s", pod.Namespace, pod.Name)
  // 加入到佇列
	if err := sched.SchedulingQueue.Add(pod); err != nil {
		utilruntime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err))
	}
}
// 在範例化Scheduler的地方 
// 入隊操作我們清楚了,那出隊呢?我們回過頭去看看上面定義的NextPod的方法實現
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
	return func() *framework.QueuedPodInfo {
    // 從佇列中彈出
		podInfo, err := queue.Pop()
		if err == nil {
			klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name)
			return podInfo
		}
		klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err)
		return nil
	}
}

scheduleOne

// 瞭解入隊和出隊操作後,我們看一下Scheduler執行的過程
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
  // 排程一個pod物件
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}
// 接下來scheduleOne方法程式碼很長,我們一步一步來看
func (sched *Scheduler) scheduleOne(ctx context.Context) {
  // podInfo 就是從佇列中獲取到的pod物件
	podInfo := sched.NextPod()
	// 檢查pod的有效性
	if podInfo == nil || podInfo.Pod == nil {
		return
	}
	pod := podInfo.Pod
  // 根據定義的 pod.Spec.SchedulerName 查到對應的profile
	prof, err := sched.profileForPod(pod)
	if err != nil {
		klog.Error(err)
		return
	}
  // 可以跳過排程的情況,一般pod進不來
	if sched.skipPodSchedule(prof, pod) {
		return
	}
  // 呼叫排程演演算法,獲取結果
	scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
	if err != nil {
		/*
		出現排程失敗的情況:
		這個時候可能會觸發搶佔preempt,搶佔是一套複雜的邏輯,後面我們專門會講
		目前假設各類資源充足,能正常排程
		*/
	}
	metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
  // assumePod 是假設這個Pod按照前面的排程演演算法分配後,進行驗證
	assumedPodInfo := podInfo.DeepCopy()
	assumedPod := assumedPodInfo.Pod
	// SuggestedHost 為建議的分配的Host
	err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
	if err != nil {
		// 失敗就重新分配,不考慮這種情況 
	}
	// 執行相關外掛的程式碼先跳過 比如一些搶佔外掛
	// 非同步繫結pod
	go func() {
		// 有一系列的檢查工作
                // 真正做繫結的動作
		err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state)
		if err != nil {
			// 錯誤處理,清除狀態並重試
		} else {
			// 列印結果,偵錯時將log level調整到2以上
			if klog.V(2).Enabled() {
				klog.InfoS("Successfully bound pod to node", "pod", klog.KObj(pod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
			}
      // metrics中記錄相關的監控指標
			metrics.PodScheduled(prof.Name, metrics.SinceInSeconds(start))
			metrics.PodSchedulingAttempts.Observe(float64(podInfo.Attempts))
      metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(podInfo)).Observe(metrics.SinceInSeconds(podInfo.InitialAttemptTimestamp))
			// 執行繫結後的外掛
			prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost)
		}
	}()
}

ScheduleResult 排程計算結果

// 呼叫演演算法下的Schedule
func New(){
  scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod)
}
func (c *Configurator) create() (*Scheduler, error) {
  algo := core.NewGenericScheduler(
		c.schedulerCache,
		c.nodeInfoSnapshot,
		extenders,
		c.informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
		c.disablePreemption,
		c.percentageOfNodesToScore,
	)
  return &Scheduler{
		Algorithm:       algo,
	}, nil
}
// genericScheduler 的 Schedule 的實現
func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
	// 對 pod 進行 pvc 的資訊檢查
	if err := podPassesBasicChecks(pod, g.pvcLister); err != nil {
		return result, err
	}
	// 對當前的資訊做一個快照
	if err := g.snapshot(); err != nil {
		return result, err
	}
	// Node 節點數量為0,表示無可用節點
	if g.nodeInfoSnapshot.NumNodes() == 0 {
		return result, ErrNoNodesAvailable
	}
  // Predict階段:找到所有滿足排程條件的節點feasibleNodes,不滿足的就直接過濾
	feasibleNodes, filteredNodesStatuses, err := g.findNodesThatFitPod(ctx, prof, state, pod)
	// 沒有可用節點直接報錯
	if len(feasibleNodes) == 0 {
		return result, &FitError{
			Pod:                   pod,
			NumAllNodes:           g.nodeInfoSnapshot.NumNodes(),
			FilteredNodesStatuses: filteredNodesStatuses,
		}
	}
	// 只有一個節點就直接選用
	if len(feasibleNodes) == 1 {
		return ScheduleResult{
			SuggestedHost:  feasibleNodes[0].Name,
			EvaluatedNodes: 1 + len(filteredNodesStatuses),
			FeasibleNodes:  1,
		}, nil
	}
	// Priority階段:通過打分,找到一個分數最高、也就是最優的節點
	priorityList, err := g.prioritizeNodes(ctx, prof, state, pod, feasibleNodes)
	host, err := g.selectHost(priorityList)
	return ScheduleResult{
		SuggestedHost:  host,
		EvaluatedNodes: len(feasibleNodes) + len(filteredNodesStatuses),
		FeasibleNodes:  len(feasibleNodes),
	}, err
}
/*
Predict 和 Priority 是選擇排程節點的兩個關鍵性步驟, 它的底層呼叫了各種algorithm演演算法。我們暫時不細看。
以我們前面講到過的 NodeName 演演算法為例,節點必須與 NodeName 匹配,它是屬於Predict階段的。
在新版本中 這部分演演算法的實現放到了extenders,邏輯是一樣的
*/

Assume 初步推算

func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
  // 將 host 填入到 pod spec欄位的nodename,假定分配到對應的節點上
	assumed.Spec.NodeName = host
  // 呼叫 SchedulerCache 下的 AssumePod
	if err := sched.SchedulerCache.AssumePod(assumed); err != nil {
		klog.Errorf("scheduler cache AssumePod failed: %v", err)
		return err
	}
	if sched.SchedulingQueue != nil {
		sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
	}
	return nil
}
// 回頭去找 SchedulerCache 初始化的地方
func (c *Configurator) create() (*Scheduler, error) {
	return &Scheduler{
		SchedulerCache:  c.schedulerCache,
	}, nil
}
func New() (*Scheduler, error) {
  // 這裡就是初始化的範例 schedulerCache
	schedulerCache := internalcache.New(30*time.Second, stopEverything)
	configurator := &Configurator{
		schedulerCache:           schedulerCache,
	}
}
// 看看AssumePod做了什麼
func (cache *schedulerCache) AssumePod(pod *v1.Pod) error {
  // 獲取 pod 的 uid
	key, err := framework.GetPodKey(pod)
	if err != nil {
		return err
	}
	// 加鎖操作,保證並行情況下的一致性
	cache.mu.Lock()
	defer cache.mu.Unlock()
      // 根據 uid 找不到 pod 當前的狀態  看看被排程了沒有 
	if _, ok := cache.podStates[key]; ok {
		return fmt.Errorf("pod %v is in the cache, so can't be assumed", key)
	}
  // 把 Assume Pod 的資訊放到對應 Node 節點中
	cache.addPod(pod)
  // 把 pod 狀態設定為 Assume 成功
	ps := &podState{
		pod: pod,
	}
	cache.podStates[key] = ps
	cache.assumedPods[key] = true
	return nil
}

Bind 實際繫結

func (sched *Scheduler) bind(ctx context.Context, prof *profile.Profile, assumed *v1.Pod, targetNode string, state *framework.CycleState) (err error) {
	start := time.Now()
  // 把 assumed 的 pod 資訊儲存下來
	defer func() {
		sched.finishBinding(prof, assumed, targetNode, start, err)
	}()
	// 階段1: 執行擴充套件繫結進行驗證,如果已經繫結報錯
	bound, err := sched.extendersBinding(assumed, targetNode)
	if bound {
		return err
	}
  // 階段2:執行繫結外掛驗證狀態
	bindStatus := prof.RunBindPlugins(ctx, state, assumed, targetNode)
	if bindStatus.IsSuccess() {
		return nil
	}
	if bindStatus.Code() == framework.Error {
		return bindStatus.AsError()
	}
	return fmt.Errorf("bind status: %s, %v", bindStatus.Code().String(), bindStatus.Message())
}

Update To Etcd

// 這塊的程式碼我不做細緻的逐層分析了,大家根據興趣自行探索
func (b DefaultBinder) Bind(ctx context.Context, state *framework.CycleState, p *v1.Pod, nodeName string) *framework.Status {
	klog.V(3).Infof("Attempting to bind %v/%v to %v", p.Namespace, p.Name, nodeName)
	binding := &v1.Binding{
		ObjectMeta: metav1.ObjectMeta{Namespace: p.Namespace, Name: p.Name, UID: p.UID},
		Target:     v1.ObjectReference{Kind: "Node", Name: nodeName},
	}
  // ClientSet就是存取kube-apiserver的使用者端,將資料更新上去
	err := b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
	if err != nil {
		return framework.NewStatus(framework.Error, err.Error())
	}
	return nil
}

站在前人的肩膀上,向前輩致敬,Respect!

Summary

  • Informer 依賴於 Reflector 模組,它有個元件為 xxxInformer,如 podInformer
  • 具體資源的 Informer 包含了一個連線到kube-apiserverclient,通過ListWatch介面查詢資源變更情況

檢測到資源發生變化後,通過Controller 將資料放入佇列DeltaFIFOQueue裡,生產階段完成

DeltaFIFOQueue的另一端,有消費者在不停地處理資源變化的事件,處理邏輯主要分2步

  • 將資料儲存到本地儲存Indexer,它的底層實現是一個並行安全的threadSafeMap
  • 有些元件需要實時關注資源變化,會實時監聽listen,就將事件分發到對應註冊上來的listener上,自行處理

distribute將object分發到同步監聽或者普通監聽的列表,然後被對應的handler處理

  • Pod的排程是通過一個佇列SchedulingQueue非同步工作的
  • 監聽到對應pod事件後,放入佇列
  • 有個消費者從佇列中獲取pod,進行排程

單個pod的排程主要分為3個步驟:

  • 根據Predict和Priority兩個階段,呼叫各自的演演算法外掛,選擇最優的Node
  • Assume這個Pod被排程到對應的Node,儲存到cache,加鎖保證一致性。
  • 用extender和plugins進行驗證,如果通過則繫結Bind

繫結成功後,將資料通過client向kube-apiserver傳送,更新etcd

以上就是Kubernetes Informer資料儲存Index與Pod分配流程解析的詳細內容,更多關於Kubernetes Informer資料儲存的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com