首頁 > 軟體

Kubernetes scheduler啟動監控資源變化解析

2022-11-27 14:01:47

確立目標

  • 理解kube-scheduler啟動的流程
  • 瞭解Informer是如何從kube-apiserver監聽資源變化的情況

理解kube-scheduler啟動的流程 程式碼在cmd/kube-scheduler

run

// kube-scheduler 類似於kube-apiserver,是個常駐程序,檢視其對應的Run函數
func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error {
	// 根據入參,返回設定cc與排程sched cc是completedConfig
   cc, sched, err := Setup(ctx, opts, registryOptions...)
	// 執行
   return Run(ctx, cc, sched)
}
// 執行排程策略
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
	// 將設定註冊到configz中,會儲存在一個全域性map裡 叫configs
	if cz, err := configz.New("componentconfig"); err == nil {
		cz.Set(cc.ComponentConfig)
	} else {
		return fmt.Errorf("unable to register configz: %s", err)
	}
	// 事件廣播管理器,涉及到k8s裡的一個核心資源 - Event事件,暫時不細講
	cc.EventBroadcaster.StartRecordingToSink(ctx.Done())
	// 健康監測的服務
	var checks []healthz.HealthChecker
	// 非同步各個Informer。Informer是kube-scheduler的一個重點
	go cc.PodInformer.Informer().Run(ctx.Done())
	cc.InformerFactory.Start(ctx.Done())
	cc.InformerFactory.WaitForCacheSync(ctx.Done())
	// 選舉Leader的工作,因為Master節點可以存在多個,選舉一個作為Leader
	if cc.LeaderElection != nil {
		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
      // 兩個勾點函數,開啟Leading時執行排程,結束時列印報錯
			OnStartedLeading: sched.Run,
			OnStoppedLeading: func() {
				klog.Fatalf("leaderelection lost")
			},
		}
		leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
		if err != nil {
			return fmt.Errorf("couldn't create leader elector: %v", err)
		}
    // 參與選舉的會持續通訊
		leaderElector.Run(ctx)
		return fmt.Errorf("lost lease")
	}
	// 不參與選舉的,也就是單節點的情況時,在這裡執行
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}
/*
到這裡,我們已經接觸了kube-scheduler的2個核心概念:
1. scheduler:正如程式名kube-scheduler,這個程序的核心作用是進行排程,會涉及到多種排程策略
2. Informer:k8s中有各種型別的資源,包括自定義的。而Informer的實現就將排程和資源結合了起來
*/

Scheduler

// 在建立scheduler的函數 runcommand()
func Setup() {
	// 建立scheduler,包括多個選項
	sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		cc.PodInformer,
		recorderFactory,
		ctx.Done(),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
	)
	return &cc, sched, nil
}
// 我們再看一下New這個函數
func New() (*Scheduler, error) {
  // 先註冊了所有的演演算法,儲存到一個 map[string]PluginFactory 中
  registry := frameworkplugins.NewInTreeRegistry()
  //NewInTreeRegistry裡面的一些排程外掛
 /*   return runtime.Registry{
       selectorspread.Name:                  selectorspread.New,
       imagelocality.Name:                   imagelocality.New,
       tainttoleration.Name:                 tainttoleration.New,
       nodename.Name:                        nodename.New,
       nodeports.Name:                       nodeports.New,
       nodeaffinity.Name:                    nodeaffinity.New,
       podtopologyspread.Name:               runtime.FactoryAdapter(fts, podtopologyspread.New),
       ...
      */
  // 重點看一下Scheduler的建立過程
  var sched *Scheduler
	source := options.schedulerAlgorithmSource
	switch {
   // 根據Provider建立,重點看這裡
	case source.Provider != nil:
		sc, err := configurator.createFromProvider(*source.Provider)
		if err != nil {
			return nil, fmt.Errorf("couldn't create scheduler using provider %q: %v", *source.Provider, err)
		}
		sched = sc
  // 根據使用者設定建立,來自檔案或者ConfigMap
	case source.Policy != nil:
		policy := &schedulerapi.Policy{}
		switch {
		case source.Policy.File != nil:
			if err := initPolicyFromFile(source.Policy.File.Path, policy); err != nil {
				return nil, err
			}
		case source.Policy.ConfigMap != nil:
			if err := initPolicyFromConfigMap(client, source.Policy.ConfigMap, policy); err != nil {
				return nil, err
			}
		}
		configurator.extenders = policy.Extenders
		sc, err := configurator.createFromConfig(*policy)
		if err != nil {
			return nil, fmt.Errorf("couldn't create scheduler from policy: %v", err)
		}
		sched = sc
	default:
		return nil, fmt.Errorf("unsupported algorithm source: %v", source)
	}
}
// 建立
func (c *Configurator) createFromProvider(providerName string) (*Scheduler, error) {
	klog.V(2).Infof("Creating scheduler from algorithm provider '%v'", providerName)
  // 範例化演演算法的Registry
	r := algorithmprovider.NewRegistry()
	defaultPlugins, exist := r[providerName]
	if !exist {
		return nil, fmt.Errorf("algorithm provider %q is not registered", providerName)
	}
  // 將各種演演算法作為plugin進行設定
	for i := range c.profiles {
		prof := &c.profiles[i]
		plugins := &schedulerapi.Plugins{}
		plugins.Append(defaultPlugins)
		plugins.Apply(prof.Plugins)
		prof.Plugins = plugins
	}
	return c.create()
}
// 從這個初始化中可以看到,主要分為2類:預設與ClusterAutoscaler兩種演演算法
func NewRegistry() Registry {
  // 預設演演算法包括過濾、打分、繫結等,有興趣的去原始碼中逐個閱讀
	defaultConfig := getDefaultConfig()
	applyFeatureGates(defaultConfig)
	// ClusterAutoscaler 是叢集自動擴充套件的演演算法,被單獨拎出來
	caConfig := getClusterAutoscalerConfig()
	applyFeatureGates(caConfig)
	return Registry{
		schedulerapi.SchedulerDefaultProviderName: defaultConfig,
		ClusterAutoscalerProvider:                 caConfig,
	}
}
/*
在這裡,熟悉k8s的朋友會有個疑問:以前聽說kubernets的排程有個Predicate和Priority兩個演演算法,這裡怎麼沒有分類?
這個疑問,我們在後面具體場景時再進行分析。
在新的版本中,這部分程式碼邏輯是由拓展buildExtenders和nodelist,podQueue,維護了一個排程佇列,其餘都是與上面差別不大的
*/

NodeName

// 為了加深大家對Plugin的印象,我選擇一個最簡單的範例:根據Pod的spec欄位中的NodeName,分配到指定名稱的節點
package nodename
import (
	"context"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/runtime"
	framework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1"
)
type NodeName struct{}
var _ framework.FilterPlugin = &NodeName{}
// 這個排程演演算法的名稱和錯誤資訊
const (
	Name = "NodeName"
	ErrReason = "node(s) didn't match the requested hostname"
)
// 排程演演算法的明明
func (pl *NodeName) Name() string {
	return Name
}
// 過濾功能,這個就是NodeName演演算法的實現
func (pl *NodeName) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
  // 找不到Node
	if nodeInfo.Node() == nil {
		return framework.NewStatus(framework.Error, "node not found")
	}
  // 匹配不到,返回錯誤
	if !Fits(pod, nodeInfo) {
		return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReason)
	}
	return nil
}
/*
  匹配的演演算法,兩種條件滿足一個就認為成功
  1. spec沒有填NodeName 
  2.spec的NodeName和節點匹配
*/
func Fits(pod *v1.Pod, nodeInfo *framework.NodeInfo) bool {
	return len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == nodeInfo.Node().Name
}
// 初始化
func New(_ runtime.Object, _ framework.FrameworkHandle) (framework.Plugin, error) {
	return &NodeName{}, nil
}

瞭解Informer是如何從kube-apiserver監聽資源變化的情況

Informer

什麼是Informer?先重點講一下這個Informer,因為它是理解k8s執行機制的核心概念。

簡單概況下,Informer的核心功能是 獲取並監聽(ListAndWatch)對應資源的增刪改,觸發相應的事件操作(ResourceEventHandler)

在Setup()中有個Config,裡面有個scheduler.NewInformerFactory()在這裡進入,程式碼在k8s.io/client-go/informers/factory.go中

Shared Informer

/*
client 是連線到 kube-apiserver 的使用者端。
我們要理解k8s的設計:
1. etcd是核心的資料儲存,對資源的修改會進行持久化
2. 只有kube-apiserver可以存取etcd
所以,kube-scheduler要了解資源的變化情況,只能通過kube-apiserver
*/
// 定義了 Shared Informer,其中這個client是用來連線kube-apiserver的
c.InformerFactory = informers.NewSharedInformerFactory(client, 0)
// 這裡解答了為什麼叫shared:一個資源會對應多個Informer,會導致效率低下,所以讓一個資源對應一個sharedInformer,而一個sharedInformer內部自己維護多個Informer
type sharedInformerFactory struct {
	client           kubernetes.Interface
	namespace        string
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	lock             sync.Mutex
	defaultResync    time.Duration
	customResync     map[reflect.Type]time.Duration
  // 這個map就是維護多個Informer的關鍵實現
	informers map[reflect.Type]cache.SharedIndexInformer
	startedInformers map[reflect.Type]bool
}
// 執行函數
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
	f.lock.Lock()
	defer f.lock.Unlock()
	for informerType, informer := range f.informers {
		if !f.startedInformers[informerType] {
      // goroutine非同步處理
			go informer.Run(stopCh)
      // 標記為已經執行,這樣即使下次Start也不會重複執行
			f.startedInformers[informerType] = true
		}
	}
}
// 查詢對應的informer
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
	f.lock.Lock()
	defer f.lock.Unlock()
	// 找到就直接返回
	informerType := reflect.TypeOf(obj)
	informer, exists := f.informers[informerType]
	if exists {
		return informer
	}
	resyncPeriod, exists := f.customResync[informerType]
	if !exists {
		resyncPeriod = f.defaultResync
	}
	// 沒找到就會新建Informer
	informer = newFunc(f.client, resyncPeriod)
	f.informers[informerType] = informer
	return informer
}
// SharedInformerFactory 是 sharedInformerFactory 的介面定義,點進func NewSharedInformerFactoryWithOptions的返回值
type SharedInformerFactory interface {
  // 我們這一階段關注的Pod的Informer,屬於核心資源
	Core() core.Interface
}
// core.Interface的定義
type Interface interface {
	// V1 provides access to shared informers for resources in V1.
	V1() v1.Interface
}
// v1.Interface 的定義
type Interface interface {
  // Pod的定義 
        ...
	Pods() PodInformer
        ...
}
// PodInformer 是對應的介面
type PodInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() v1.PodLister
}
// podInformer 是具體的實現
type podInformer struct {
	factory          internalinterfaces.SharedInformerFactory
	tweakListOptions internalinterfaces.TweakListOptionsFunc
	namespace        string
}
// 最後,我們可以看到podInformer呼叫了InformerFor函數進行了新增
func (f *podInformer) Informer() cache.SharedIndexInformer {
	return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

PodInformer

// 範例化PodInformer,把對應的List/Watch操作方法傳入到範例化函數,生成統一的SharedIndexInformer介面
func NewFilteredPodInformer() cache.SharedIndexInformer {
	return cache.NewSharedIndexInformer(
    // List和Watch實現從PodInterface裡面查詢
		&cache.ListWatch{
			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).List(context.TODO(), options)
			},
			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
				if tweakListOptions != nil {
					tweakListOptions(&options)
				}
				return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
			},
		},
		&corev1.Pod{},
		resyncPeriod,
		indexers,
	)
}
// 點進List,在這個檔案中
// 我們先看看Pod基本的List和Watch是怎麼定義的
// Pod基本的增刪改查等操作
type PodInterface interface {
	List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
	Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
	...
}
// pods 是PodInterface的實現
type pods struct {
	client rest.Interface
	ns     string
}
// List 和 Watch 是依賴使用者端,也就是從kube-apiserver中查詢的
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
	err = c.client.Get().
		Namespace(c.ns).
		Resource("pods").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Do(ctx).
		Into(result)
	return
}
func (c *pods) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
	return c.client.Get().
		Namespace(c.ns).
		Resource("pods").
		VersionedParams(&opts, scheme.ParameterCodec).
		Timeout(timeout).
		Watch(ctx)
}
// 在func NewPodInformer中找到他的返回值 點進去cache.SharedIndexInformer這是Informer的統一介面 在這個檔案的裡面找到下面的程式碼
// 在上面,我們看到了非同步執行Informer的程式碼 go informer.Run(stopCh),我們看看是怎麼run的
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  // 這裡有個 DeltaFIFO 的物件,
  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          s.indexer,
		EmitDeltaTypeReplaced: true,
	})
	// 傳入這個fifo到cfg
	cfg := &Config{
		Queue:            fifo,
		...
	}
	// 新建controller
	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()
	// 執行controller
	s.controller.Run(stopCh)
}
// 點進New看Controller的執行
func (c *controller) Run(stopCh <-chan struct{}) {
	// 
	r := NewReflector(
		c.config.ListerWatcher,
		c.config.ObjectType,
		c.config.Queue,
		c.config.FullResyncPeriod,
	)
	r.ShouldResync = c.config.ShouldResync
	r.clock = c.clock
	if c.config.WatchErrorHandler != nil {
		r.watchErrorHandler = c.config.WatchErrorHandler
	}
	c.reflectorMutex.Lock()
	c.reflector = r
	c.reflectorMutex.Unlock()
	var wg wait.Group
  // 生產,往Queue裡放資料
	wg.StartWithChannel(stopCh, r.Run)
  // 消費,從Queue消費資料
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

Reflect

點進r.Run() Reflect監聽事件放到FIFO中然後處理迴圈 取出事件消費

// 我們再回頭看看這個Reflect結構
r := NewReflector(
  	// ListerWatcher 我們已經有了解,就是通過client監聽kube-apiserver暴露出來的Resource
		c.config.ListerWatcher,
		c.config.ObjectType,
  	// Queue 是我們前文看到的一個 DeltaFIFOQueue,認為這是一個先進先出的佇列
		c.config.Queue,
		c.config.FullResyncPeriod,
)
func (r *Reflector) Run(stopCh <-chan struct{}) {
	klog.V(2).Infof("Starting reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
	wait.BackoffUntil(func() {
    // 呼叫了ListAndWatch
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
	klog.V(2).Infof("Stopping reflector %s (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
}
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
		// watchHandler顧名思義,就是Watch到對應的事件,呼叫對應的Handler
		if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
			if err != errorStopRequested {
				switch {
				case isExpiredError(err):
					klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
				default:
					klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
				}
			}
			return nil
		}
	}
}
func (r *Reflector) watchHandler() error {
loop:
	for {
    // 一個經典的GO語言select監聽多channel的模式
		select {
    // 整體的step channel
		case <-stopCh:
			return errorStopRequested
    // 錯誤相關的error channel
		case err := <-errc:
			return err
    // 接收事件event的channel
		case event, ok := <-w.ResultChan():
      // channel被關閉,退出loop
			if !ok {
				break loop
			}
			// 一系列的資源驗證程式碼跳過
			switch event.Type {
      // 增刪改三種Event,分別對應到去store,即DeltaFIFO中,操作object
			case watch.Added:
				err := r.store.Add(event.Object)
			case watch.Modified:
				err := r.store.Update(event.Object)
			case watch.Deleted:
				err := r.store.Delete(event.Object)
			case watch.Bookmark:
			default:
				utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
			}
		}
	}
	return nil
}

站在前人的肩膀上,向前輩致敬,Respect!

Summary

  • kube-scheduler也是外掛化的排程策略,通過設定在啟動的時候註冊上plugins,通過Informer來監聽資源的狀態和變化,進行排程
  • Informer 依賴於 Reflector 模組,它的元件為 xxxInformer,如 podInformer
  • 具體資源的 Informer 包含了一個連線到kube-apiserverclient,通過ListWatch介面查詢資源變更情況
  • 檢測到資源發生變化後,通過Controller 將資料放入佇列DeltaFIFOQueue裡,生產階段完成,交給對應的handler處理常式進行下一步的操作

以上就是Kubernetes scheduler啟動監控資源變化解析的詳細內容,更多關於Kubernetes scheduler啟動監控的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com