首頁 > 軟體

从生成CRD到编写自定义控制器教学范例

2022-05-13 21:11:41

介绍

我们可以使用code-generator 以及controller-tools来进行程式码自动生成,通过程式码自动生成可以帮我们自动生成 CRD 资源物件,以及用户端存取的 ClientSet、Informer、Lister 等工具包,接下来我们就来了解下如何编写一个自定义的控制器。

CRD定义

首先初始化专案:

$ mkdir operator-crd && cd operator-crd
$ go mod init operator-crd
$ mkdir -p pkg/apis/example.com/v1

在该资料夹下新建doc.go档案,内容如下所示:

// +k8s:deepcopy-gen=package
// +groupName=example.com
package v1

根据 CRD 的规范定义,这里我们定义的 group 为example.com,版本为v1,在顶部新增了一个程式码自动生成的deepcopy-gen的 tag,为整个包中的型别生成深拷贝方法。

然后就是非常重要的资源物件的结构体定义,新建types.go档案,types.go内容可以使用type-scaffpld自动生成,具体档案内容如下:

package v1
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// BarSpec defines the desired state of Bar
type BarSpec struct {
    // INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster
    DeploymentName string `json:"deploymentName"`
    Image          string `json:"image"`
    Replicas       *int32 `json:"replicas"`
}
// BarStatus defines the observed state of Bar.
// It should always be reconstructable from the state of the cluster and/or outside world.
type BarStatus struct {
    // INSERT ADDITIONAL STATUS FIELDS -- observed state of cluster
}
// 下面这个一定不能少,少了的话不能生成 lister 和 informer
// +genclient  
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// Bar is the Schema for the bars API
// +k8s:openapi-gen=true
type Bar struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`
    Spec   BarSpec   `json:"spec,omitempty"`
    Status BarStatus `json:"status,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// BarList contains a list of Bar
type BarList struct {
    metav1.TypeMeta `json:",inline"`
    metav1.ListMeta `json:"metadata,omitempty"`
    Items           []Bar `json:"items"`
}

然后可以参考系统内建的资源物件,还需要提供 AddToScheme 与 Resource 两个变数供 client 注册,新建 register.go 档案,内容如下所示:

package v1
import (
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/apimachinery/pkg/runtime"
    "k8s.io/apimachinery/pkg/runtime/schema"
)
// SchemeGroupVersion 注册自己的自定义资源
var SchemeGroupVersion = schema.GroupVersion{Group: "example.com", Version: "v1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
    return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
    return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
    // SchemeBuilder initializes a scheme builder
    SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
    // AddToScheme is a global function that registers this API group & version to a scheme
    AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
    // 新增 Bar 与 BarList这两个资源到 scheme
    scheme.AddKnownTypes(SchemeGroupVersion,
        &Bar{},
        &BarList{},
    )
    metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
    return nil
}

使用controller-gen生成crd:

$ controller-gen crd paths=./... output:crd:dir=crd

生成example.com_bars.yaml档案如下所示:

---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: (devel)
  creationTimestamp: null
  name: bars.example.com
spec:
  group: example.com
  names:
    kind: Bar
    listKind: BarList
    plural: bars
    singular: bar
  scope: Namespaced
  versions:
  - name: v1
    schema:
      openAPIV3Schema:
        description: Bar is the Schema for the bars API
        properties:
          apiVersion:
            description: 'APIVersion defines the versioned schema of this representation
              of an object. Servers should convert recognized schemas to the latest
              internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources'
            type: string
          kind:
            description: 'Kind is a string value representing the REST resource this
              object represents. Servers may infer this from the endpoint the client
              submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds'
            type: string
          metadata:
            type: object
          spec:
            description: BarSpec defines the desired state of Bar
            properties:
              deploymentName:
                description: INSERT ADDITIONAL SPEC FIELDS -- desired state of cluster
                type: string
              image:
                type: string
              replicas:
                format: int32
                type: integer
            required:
            - deploymentName
            - image
            - replicas
            type: object
          status:
            description: BarStatus defines the observed state of Bar. It should always
              be reconstructable from the state of the cluster and/or outside world.
            type: object
        type: object
    served: true
    storage: true

最终专案结构如下所示:

$ tree
.
├── crd
│   └── example.com_bars.yaml
├── go.mod
├── go.sum
└── pkg
    └── apis
        └── example.com
            └── v1
                ├── doc.go
                ├── register.go
                └── types.go
5 directories, 6 files

生成用户端相关程式码

上面我们准备好资源的 API 资源型别后,就可以使用开始生成 CRD 资源的用户端使用的相关程式码了。

首先建立生成程式码的指令码,下面这些指令码均来源于sample-controller提供的范例:

$ mkdir hack && cd hack

在该目录下面新建 tools.go 档案,新增 code-generator 依赖,因为在没有程式码使用 code-generator 时,go module 预设不会为我们依赖此包。档案内容如下所示:

// +build tools
// 建立 tools.go 来依赖 code-generator
// 因为在没有程式码使用 code-generator 时,go module 预设不会为我们依赖此包.
package tools
import _ "k8s.io/code-generator"

然后新建 update-codegen.sh 指令码,用来设定程式码生成的指令码:

#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
CODEGEN_PKG=${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}
bash "${CODEGEN_PKG}"/generate-groups.sh "deepcopy,client,informer,lister" 
  operator-crd/pkg/client operator-crd/pkg/apis example.com:v1 
  --output-base "${SCRIPT_ROOT}"/../ 
  --go-header-file "${SCRIPT_ROOT}"/hack/boilerplate.go.txt
# To use your own boilerplate text append:
#   --go-header-file "${SCRIPT_ROOT}"/hack/custom-boilerplate.go.txt

同样还有 verify-codegen.sh 指令码,用来校验生成的程式码是否是最新的:

#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
DIFFROOT="${SCRIPT_ROOT}/pkg"
TMP_DIFFROOT="${SCRIPT_ROOT}/_tmp/pkg"
_tmp="${SCRIPT_ROOT}/_tmp"
cleanup() {
  rm -rf "${_tmp}"
}
trap "cleanup" EXIT SIGINT
cleanup
mkdir -p "${TMP_DIFFROOT}"
cp -a "${DIFFROOT}"/* "${TMP_DIFFROOT}"
"${SCRIPT_ROOT}/hack/update-codegen.sh"
echo "diffing ${DIFFROOT} against freshly generated codegen"
ret=0
diff -Naupr "${DIFFROOT}" "${TMP_DIFFROOT}" || ret=$?
cp -a "${TMP_DIFFROOT}"/* "${DIFFROOT}"
if [[ $ret -eq 0 ]]
then
  echo "${DIFFROOT} up to date."
else
  echo "${DIFFROOT} is out of date. Please run hack/update-codegen.sh"
  exit 1
fi

还有一个为生成的程式码档案新增头部内容的 boilerplate.go.txt 档案,内容如下所示,其实就是为每个生成的程式码档案头部新增上下面的开源协定宣告:

/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

接下来我们就可以来执行程式码生成的指令码了,首先将依赖包放置到 vendor 目录中去:

$ go mod vendor

然后执行指令码生成程式码:

$ chmod +x ./hack/update-codegen.sh
$./hack/update-codegen.sh 
Generating deepcopy funcs
Generating clientset for example.com:v1 at operator-crd/pkg/client/clientset
Generating listers for example.com:v1 at operator-crd/pkg/client/listers
Generating informers for example.com:v1 at operator-crd/pkg/client/informers

程式码生成后,整个专案的 pkg 包变成了下面的样子:

$  tree pkg                
pkg
├── apis
│   └── example.com
│       └── v1
│           ├── doc.go
│           ├── register.go
│           ├── types.go
│           └── zz_generated.deepcopy.go
└── client
    ├── clientset
    │   └── versioned
    │       ├── clientset.go
    │       ├── doc.go
    │       ├── fake
    │       │   ├── clientset_generated.go
    │       │   ├── doc.go
    │       │   └── register.go
    │       ├── scheme
    │       │   ├── doc.go
    │       │   └── register.go
    │       └── typed
    │           └── example.com
    │               └── v1
    │                   ├── bar.go
    │                   ├── doc.go
    │                   ├── example.com_client.go
    │                   ├── fake
    │                   │   ├── doc.go
    │                   │   ├── fake_bar.go
    │                   │   └── fake_example.com_client.go
    │                   └── generated_expansion.go
    ├── informers
    │   └── externalversions
    │       ├── example.com
    │       │   ├── interface.go
    │       │   └── v1
    │       │       ├── bar.go
    │       │       └── interface.go
    │       ├── factory.go
    │       ├── generic.go
    │       └── internalinterfaces
    │           └── factory_interfaces.go
    └── listers
        └── example.com
            └── v1
                ├── bar.go
                └── expansion_generated.go
20 directories, 26 files

仔细观察可以发现pkg/apis/example.com/v1目录下面多了一个zz_generated.deepcopy.go档案,在pkg/client资料夹下生成了 clientset和 informers 和 listers 三个目录,有了这几个自动生成的用户端相关操作包,我们就可以去存取 CRD 资源了,可以和使用内建的资源物件一样去对 Bar 进行 List 和 Watch 操作了。

编写控制器

首先要先获取存取资源物件的 ClientSet,在专案根目录下面新建 main.go 档案。

package main
import (
    "k8s.io/client-go/tools/clientcmd"
   	"k8s.io/klog/v2"
    clientset "operator-crd/pkg/client/clientset/versioned"
    "operator-crd/pkg/client/informers/externalversions"
    "time"
    "os"
    "os/signal"
    "syscall"
)
var (
    onlyOneSignalHandler = make(chan struct{})
    shutdownSignals      = []os.Signal{os.Interrupt, syscall.SIGTERM}
)
// 注册 SIGTERM 和 SIGINT 信号
// 返回一个 stop channel, 该通道在捕获到第一个信号时被关闭
// 如果捕获到第二个信号,程式直接退出
func setupSignalHandler() (stopCh <-chan struct{}) {
    // 当呼叫两次的时候 panics
    close(onlyOneSignalHandler)
    stop := make(chan struct{})
    c := make(chan os.Signal, 2)
    // Notify 函数让 signal 包将输入信号转发到c
    // 如果没有列出要传递的信号,会将所有输入信号传递到 c; 否则只会传递列出的输入信号
    signal.Notify(c, shutdownSignals...)
    go func() {
        <-c
        close(stop)
        <-c
        os.Exit(1) // 第二个信号直接退出
    }()
    return stop
}
func main() {
    stopCh := setupSignalHandler()
    // 获取config
    config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
        klog.Fatalln(err)
    }
    // 通过config构建clientSet
    // 这里的clientSet 是 Bar 的
    clientSet, err := clientset.NewForConfig(config)
    if err != nil {
        klog.Fatalln(err)
    }
    // informerFactory 工厂类, 这里注入我们通过程式码生成的 client
    // client 主要用于和 API Server 进行通讯,实现 ListAndWatch
    factory := externalversions.NewSharedInformerFactory(clientSet, time.Second*30)
    // 范例化自定义控制器
    controller := NewController(factory.Example().V1().Bars())
    // 启动 informer,开始list 和 watch
    go factory.Start(stopCh)
    // 启动控制器
    if err = controller.Run(2, stopCh); err != nil {
        klog.Fatalf("Error running controller: %s", err.Error())
    }
}

首先初始化一个用于存取 Bar 资源的 ClientSet 物件,然后同样新建一个 Bar 的 InformerFactory 范例,通过这个工厂范例可以去启动 Informer 开始对 Bar 的 List 和 Watch 操作,然后同样我们要自己去封装一个自定义的控制器,在这个控制器里面去实现一个控制回圈,不断对 Bar 的状态进行调谐。

在专案根目录下新建controller.go档案,内容如下所示:

package main
import (
    "fmt"
    "k8s.io/apimachinery/pkg/api/errors"
    "k8s.io/apimachinery/pkg/util/runtime"
    "k8s.io/apimachinery/pkg/util/wait"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/util/workqueue"
    "k8s.io/klog/v2"
    v1 "operator-crd/pkg/apis/example.com/v1"
    "time"
    informers "operator-crd/pkg/client/informers/externalversions/example.com/v1"
)
type Controller struct {
    informer  informers.BarInformer
    workqueue workqueue.RateLimitingInterface
}
func NewController(informer informers.BarInformer) *Controller {
    controller := &Controller{
        informer: informer,
        // WorkQueue 的实现,负责同步 Informer 和控制回圈之间的资料
        workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "bar"),
    }
    klog.Info("Setting up Bar event handlers")
    // informer 注册了三个 Handler(AddFunc、UpdateFunc 和 DeleteFunc)
    // 分别对应 API 物件的「新增」「更新」和「删除」事件。
    // 而具体的处理操作,都是将该事件对应的 API 物件加入到工作伫列中
    informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    controller.addBar,
        UpdateFunc: controller.updateBar,
        DeleteFunc: controller.deleteBar,
    })
    return controller
}
func (c *Controller) Run(thread int, stopCh <-chan struct{}) error {
    defer runtime.HandleCrash()
    defer c.workqueue.ShuttingDown()
    // 记录开始纪录档
    klog.Info("Starting Bar control loop")
    klog.Info("Waiting for informer caches to sync")
    // 等待快取同步资料
    if ok := cache.WaitForCacheSync(stopCh, c.informer.Informer().HasSynced); !ok {
        return fmt.Errorf("failed to wati for caches to sync")
    }
    klog.Info("Starting workers")
    for i := 0; i < thread; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }
    klog.Info("Started workers")
    <-stopCh
    klog.Info("Shutting down workers")
    return nil
}
// runWorker 是一个不断执行的方法,并且一直会呼叫 c.processNextWorkItem 从 workqueue读取讯息
func (c *Controller) runWorker() {
    for c.processNExtWorkItem() {
    }
}
// 从workqueue读取和读取讯息
func (c *Controller) processNExtWorkItem() bool {
    // 获取 item
    item, shutdown := c.workqueue.Get()
    if shutdown {
        return false
    }
    if err := func(item interface{}) error {
        // 标记以及处理
        defer c.workqueue.Done(item)
        var key string
        var ok bool
        if key, ok = item.(string); !ok {
            // 判读key的型别不是字串,则直接丢弃
            c.workqueue.Forget(item)
            runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", item))
            return nil
        }
        if err := c.syncHandler(key); err != nil {
            return fmt.Errorf("error syncing '%s':%s", item, err.Error())
        }
        c.workqueue.Forget(item)
        return nil
    }(item); err != nil {
        runtime.HandleError(err)
        return false
    }
    return true
}
// 尝试从 Informer 维护的快取中拿到了它所对应的 Bar 物件
func (c *Controller) syncHandler(key string) error {
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
        runtime.HandleError(fmt.Errorf("invalid respirce key:%s", key))
        return err
    }
    bar, err := c.informer.Lister().Bars(namespace).Get(name)
    if err != nil {
        if errors.IsNotFound(err) {
            // 说明是在删除事件中新增进来的
            return nil
        }
        runtime.HandleError(fmt.Errorf("failed to get bar by: %s/%s", namespace, name))
        return err
    }
    fmt.Printf("[BarCRD] try to process bar:%#v ...", bar)
    // 可以根据bar来做其他的事。
    // todo
    return nil
}
func (c *Controller) addBar(item interface{}) {
    var key string
    var err error
    if key, err = cache.MetaNamespaceKeyFunc(item); err != nil {
        runtime.HandleError(err)
        return
    }
    c.workqueue.AddRateLimited(key)
}
func (c *Controller) deleteBar(item interface{}) {
    var key string
    var err error
    if key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(item); err != nil {
        runtime.HandleError(err)
        return
    }
    fmt.Println("delete crd")
    c.workqueue.AddRateLimited(key)
}
func (c *Controller) updateBar(old, new interface{}) {
    oldItem := old.(*v1.Bar)
    newItem := new.(*v1.Bar)
    // 比较两个资源版本,如果相同,则不处理
    if oldItem.ResourceVersion == newItem.ResourceVersion {
        return
    }
    c.workqueue.AddRateLimited(new)
}

我们这里自定义的控制器只封装了一个 Informer 和一个限速伫列,我们当然也可以在里面新增一个用于存取本地快取的 Indexer,但实际上 Informer 中已经包含了 Lister,对于 List 和 Get 操作都会去通过 Indexer 从本地快取中获取资料,所以只用一个 Informer 也是完全可行的。

同样在 Informer 中注册了3个事件处理器,将监听的事件获取到后送入 workqueue 伫列,然后通过控制器的控制回圈不断从伫列中消费资料,根据获取的 key 来获取资料判断物件是需要删除还是需要进行其他业务处理,这里我们同样也只是列印出了对应的操作纪录档,对于实际的专案则进行相应的业务逻辑处理即可。

到这里一个完整的自定义 API 物件和它所对应的自定义控制器就编写完毕了。

测试

接下来我们直接执行我们的main函数:

I0512 16:51:33.922138   39032 controller.go:29] Setting up Bar event handlers
I0512 16:51:33.922255   39032 controller.go:47] Starting Bar control loop
I0512 16:51:33.922258   39032 controller.go:48] Waiting for informer caches to sync
I0512 16:51:34.023108   39032 controller.go:55] Starting workers
I0512 16:51:34.023153   39032 controller.go:60] Started workers

现在我们建立一个Bar资源物件:

# bar.yaml
apiVersion: example.com/v1
kind: Bar
metadata:
  name: bar-demo
  namespace: default
spec:
  image: "nginx:1.17.1"
  deploymentName: example-bar
  replicas: 2

直接建立上面的物件,注意观察控制器的纪录档:

I0512 16:51:33.922138   39032 controller.go:29] Setting up Bar event handlers
I0512 16:51:33.922255   39032 controller.go:47] Starting Bar control loop
I0512 16:51:33.922258   39032 controller.go:48] Waiting for informer caches to sync
I0512 16:51:34.023108   39032 controller.go:55] Starting workers
I0512 16:51:34.023153   39032 controller.go:60] Started workers
[BarCRD] try to process bar:"bar-demo" ...

可以看到,我们上面建立 bar.yaml 的操作,触发了 EventHandler 的新增事件,从而被放进了工作伫列。然后控制器的控制回圈从伫列里拿到这个物件,并且列印出了正在处理这个 bar 物件的纪录档资讯。

同样我们删除这个资源的时候,也会有对应的提示。

这就是开发自定义 CRD 控制器的基本流程,当然我们还可以在事件处理的业务逻辑中去记录一些 Events 资讯,这样我们就可以通过 Event 去了解我们资源的状态了,更多关于CRD生成自定义控制器的资料请关注it145.com其它相关文章!


IT145.com E-mail:sddin#qq.com