CRD简介
K8S中一切都是resource,比如Deployment,Service等等。
我们可以基于CRD(CustomResourceDefinitions)功能新增resource,比如我想自定义一种Deployment资源,提供不同的部署策略。
k8s中resource可以通过RESTFUL API进行CURD操作,对于CRD创建的resource也是一样的。
CRD仅仅是定义一种resource,我们还需要实现controller,类似于deployment controller等等,监听对应资源的CURD事件,做出对应的处理,比如部署POD。
CRD官方文档
编写自定义CRD
编写自定义CRD包括两部分, 一个是定义crd, 一个是编写controller
####自定义CRD
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: name: crontabs.stable.example.com spec: group: stable.example.com version: v1 scope: Namespaced names: plural: crontabs singular: crontab kind: CronTab shortNames: - ct additionalPrinterColumns: - name: Spec type: string description: The cron spec defining the interval a CronJob is run JSONPath: .spec.cronSpec - name: Replicas type: integer description: The number of jobs launched by the CronJob JSONPath: .spec.replicas - name: Age type: date JSONPath: .metadata.creationTimestamp validation: openAPIV3Schema: properties: spec: properties: cronSpec: type: string pattern: '^(\d+|\*)(/\d+)?(\s+(\d+|\*)(/\d+)?){4}$' replicas: type: integer minimum: 1 maximum: 10
|
自定义crd还可以包含自定义验证规则,额外打印信息等
编写完自定义crd, 只需kubectl apply -f xxx.yaml
就行
定义完crd就可以定义cr
定义CR
1 2 3 4 5 6 7
| apiVersion: "stable.example.com/v1" kind: CronTab metadata: name: my-new-cron-object spec: cronSpec: "* * * * */5" image: my-awesome-cron-image
|
通过kubectl create -f crd.yaml
可以创建一个CRD
通过kubectl get crd
可以获取创建的所有CRD。
定义controller
定义完crd部分, 就需要实现controller部分
可以使用client-go来作为Kubernetes的客户端, 常用的controller生成框架包括Kubebuilder
官方的自定义controller例子sample-controller
client-go原理
client-go的原理图如下:
具体流程就是list-watch机制, api-server会发送指定事件, controller通过watch事件, 处理事件。
各名词解析:
Reflector: 定义在 cache 包的 Reflector 类中,它监听特定资源类型(Kind)的 Kubernetes API,在ListAndWatch方法中执行。监听的对象可以是 Kubernetes 的内置资源类型或者是自定义资源类型。当 reflector 通过 watch API 发现新的资源实例被创建,它将通过对应的 list API 获取到新创建的对象并在watchHandler方法中将其加入到Delta Fifo队列中。
Informer: 定义在 cache 包的 base controller 中,它从Delta Fifo队列中 pop 出对象,在processLoop方法中执行。base controller 的工作是将对象保存一遍后续获取,并调用 controller 将对象传给 controller。
Indexer: 提供对象的 indexing 方法,定义在 cache 包的 Indexer中。一个典型的 indexing 的应用场景是基于对象的 label 创建索引。Indexer 基于几个 indexing 方法维护索引,它使用线程安全的 data store 来存储对象和他们的key。在 cache 包的 Store 类中定义了一个名为MetaNamespaceKeyFunc的默认方法,可以为对象生成一个/形式的key。
编写controller代码
这部分代码来源于maoqide
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
|
kubeClient, err := kubernetes.NewForConfig(cfg) exampleClient, err := clientset.NewForConfig(cfg)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(kubeClient, exampleClient, kubeInformerFactory.Apps().V1().Deployments(), exampleInformerFactory.Samplecontroller().V1alpha1().Foos())
kubeInformerFactory.Start(stopCh) exampleInformerFactory.Start(stopCh)
controller.Run(2, stopCh)
NewController() *Controller { utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
eventBroadcaster := record.NewBroadcaster()
fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.enqueueFoo, UpdateFunc: func(old, new interface{}) { controller.enqueueFoo(new) }, })
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.handleObject, UpdateFunc: func(old, new interface{}) { newDepl := new.(*appsv1.Deployment) oldDepl := old.(*appsv1.Deployment) if newDepl.ResourceVersion == oldDepl.ResourceVersion { return } controller.handleObject(new) }, DeleteFunc: controller.handleObject, }) }
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") }
for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } }
func (c *Controller) runWorker() { for c.processNextWorkItem() { } }
func (c *Controller) processNextWorkItem() bool { obj, shutdown := c.workqueue.Get() if shutdown { return false }
err := func(obj interface{}) error { defer c.workqueue.Done(obj) var key string var ok bool
if key, ok = obj.(string); !ok { c.workqueue.Forget(obj) utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) return nil }
if err := c.syncHandler(key); err != nil { c.workqueue.AddRateLimited(key) return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) }
c.workqueue.Forget(obj) klog.Infof("Successfully synced '%s'", key) return nil }(obj)
if err != nil { utilruntime.HandleError(err) return true }
return true }
func (c *Controller) syncHandler(key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) foo, err := c.foosLister.Foos(namespace).Get(name) deploymentName := foo.Spec.DeploymentName deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
if errors.IsNotFound(err) { deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo)) }
if !metav1.IsControlledBy(deployment, foo) { msg := fmt.Sprintf(MessageResourceExists, deployment.Name) c.recorder.Event(foo, corev1.EventTypeWarning, ErrResourceExists, msg) return fmt.Errorf(msg) }
if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas { deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo)) }
err = c.updateFooStatus(foo, deployment) c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced) }
|
参考文档