K8S Oprator

guquanheng / 2024-03-14 / 原文

K8S Oprator

概念

1、K8s-API

image-20231114091548539

k8s中使用restapi方式来进行资源描述
# GVK
group version kind
# GVR
group version resource
Group:资源组
	有组名:比如/apis/batch/v1/jobs,一般自定义资源
	无组名: 也叫核心资源组比如/api/v1/pods,一般是k8s核心资源
Version:版本,一般分为测试(alpha),社区版(beta),稳定版
Kind: 类型,可以理解为面向对象中的类
resource: 资源,可以理解为面向对象里的实例

1.1 GVK

(base) gu@python:/proc$ kubectl get crd tasks.tekton.dev -oyaml|head -2
apiVersion: apiextensions.k8s.io/v1 # group/version
kind: CustomResourceDefinition # kind

1.2 GVR

image-20231114092341957

(base) gu@python:/proc$ kubectl api-resources|head -1|awk '{print $1,$2,$3,$4,$5}';kubectl api-resources |grep job|awk '{print $1,$2,$3,$4,$5}'
NAME SHORTNAMES APIVERSION NAMESPACED KIND
jobs            batch/v1   true       Job
jobs是job类型的实例

2、scheme

Scheme 存储了 GVK 和 Go type 的映射关系

kubebuilder

架构

image-20231117121141220

1、组件安装

# kustomize
wget -c https://github.com/kubernetes-sigs/kustomize/archive/refs/heads/master.zip
# tools
wget -c https://github.com/kubernetes-sigs/controller-tools/archive/refs/heads/master.zip
# kubebuilder
curl -L -o kubebuilder "https://go.kubebuilder.io/dl/latest/$(go env GOOS)/$(go env GOARCH)"
chmod +x kubebuilder && mv kubebuilder /usr/local/bin/

2、使用

2.1 使用前置条件

需要设置gopath开启gomod111,做好代理

2.2 模版项目

2.2.1 初始化一个api模版

# 创建项目目录
mkdir $path/project
cd  $path/project # 需要为空目录
# 初始化一个mod
go mod init [$name]
# 将go worker更改添加路径到文件
# 初始化domain
kubebuilder init --domain quanheng.dev
# 目录下文件解释
(base) gu@python:~/k8s/oprator/project/cron$ tree ./
./
├── cmd
│   └── main.go
├── config # 启动配置 默认生成的时kustomize 所需要的配置,还可以根据控制器来编写rbac,crd,webhook
│   ├── default # 为启动控制器生成一个kustomiz的基础配置
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   └── manager_config_patch.yaml
│   ├── manager # 将控制器作为pod来启动
│   │   ├── kustomization.yaml
│   │   └── manager.yaml
│   ├── prometheus # 
│   │   ├── kustomization.yaml
│   │   └── monitor.yaml
│   └── rbac # 控制器所需要的权限
│       ├── auth_proxy_client_clusterrole.yaml
│       ├── auth_proxy_role_binding.yaml
│       ├── auth_proxy_role.yaml
│       ├── auth_proxy_service.yaml
│       ├── kustomization.yaml
│       ├── leader_election_role_binding.yaml
│       ├── leader_election_role.yaml
│       ├── role_binding.yaml
│       └── service_account.yaml
├── Dockerfile
├── go.mod # 一个与项目匹配,包含最基本依赖关系的 go module 文件
├── go.sum
├── hack
│   └── boilerplate.go.txt
├── Makefile # 为构建和部署控制器设定目标
├── PROJECT # Kubebuilder 元数据
└── README.md
# 创建一个api
# 将生成一个api/v1(group/version)目录,里面包含了我们指定的kind的代码模版
kubebuilder create api --group batch --version v1 --kind CronJob
(base) gu@python:~/桌面/git/go/src/oprator/cron$ tree api/
api/
└── v1
    ├── cronjob_types.go
    ├── groupversion_info.go
    └── zz_generated.deepcopy.go

2.2.2 设计一个api

api规范
序列化的字段必须是 驼峰式 : guQuanHeng // 首字母小写新单词首字母大写 
omitempty 标签:标记一个字段在空的时候应该在序列化的时候省略
数字只允许 int32 int64 和resource.Quantity
/* 针对cronjob的需求 spec
一个时间表( CronJob 中的 cron )
要运行的 Job 模板( CronJob 中的 Job )
一个已经启动的 Job 的超时时间(如果该 Job 执行超时,那么我们会将在下次调度的时候重新执行该 Job)。
如果多个 Job 同时运行,该怎么办(我们要等待吗?还是停止旧的 Job ?)
暂停 CronJob 运行的方法,以防出现问题。
对旧 Job 历史的限制

Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1

import (
	batchv1 "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.

// CronJobSpec defines the desired state of CronJob
// 定义CronJob的期望状态
type CronJobSpec struct {
	// 插入额外的 SPEC 字段 - 集群期待的状态
	// 重要:修改了这个文件之后运行"make"去重新生成代码

	// 例子
	Foo string `json:"foo,omitempty"`
	// 一个cron表达式
	Schedule string `json:"schedule"`
	// 任务开始的超时时间
	StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty"`
	// 是否允许并发执行
	ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`
	// 挂起后续的执行
	Suspend *bool `json:"suspend,omitempty"`
	// 指定Job的模板
	JobTemplate batchv1.JobTemplateSpec `json:"jobTemplate"`
	// 保留的成功完成的Job的数量
	SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty"`
	// 保留的失败完成的Job的数量
	FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty"`
}

// 定义并发策略
type ConcurrencyPolicy string

const (
	// 总是允许并发执行
	AllowConcurrent ConcurrencyPolicy = "Allow"

	// 禁止并发执行,如果有并发执行,则跳过
	ForbidConcurrent ConcurrencyPolicy = "Forbid"

	// 如果有并发执行,则替换掉之前的执行
	ReplaceConcurrent ConcurrencyPolicy = "Replace"
)

// 定义CronJob的观察状态
type CronJobStatus struct {
	// 插入额外的 STATUS 字段 - 定义集群观察的状态
	// 重要:修改了这个文件之后运行"make"去重新生成代码
	// 一个指向当前活跃的Job的引用
	Active []corev1.ObjectReference `json:"active,omitempty"`

	// 最后一次成功完成的Job的时间
	// 可选
	LastScheduleTime *metav1.Time `json:"lastScheduleTime,omitempty"`
}

// 定义CronJob的Schema,用于定义CronJob的API
type CronJob struct {
	// Root Object Definitions
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   CronJobSpec   `json:"spec,omitempty"`
	Status CronJobStatus `json:"status,omitempty"`
}


// 一个cron对象列表
type CronJobList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []CronJob `json:"items"`
}

func init() {
	SchemeBuilder.Register(&CronJob{}, &CronJobList{})
}

2.2.3 控制器

控制器的工作是确保对于任何给定的对象,实际状态与期望状态进行匹配也就是让status与spec相匹配
其过程称之为reconciling
在 controller-runtime 中,为特定种类实现 reconciling 的逻辑被称为 Reconciler,Reconciler 接受一个对象的名称,并返回我们是否需要再次尝试(比如hpa)
// 控制器实现
/*
控制器逻辑
根据名称加载定时任务

列出所有有效的 job,更新其状态

根据保留的历史版本数清理版本过旧的 job

检查当前 CronJob 是否被挂起(如果被挂起,则不执行任何操作)

计算 job 下一个定时执行时间

如果 job 符合执行时机,没有超出截止时间,且不被并发策略阻塞,执行该 job

当任务进入运行状态或到了下一次执行时间, job 重新排队
*/
根据名称加载定时任务

列出所有有效的 job,更新其状态

根据保留的历史版本数清理版本过旧的 job

检查当前 CronJob 是否被挂起(如果被挂起,则不执行任何操作)

计算 job 下一个定时执行时间

如果 job 符合执行时机,没有超出截止时间,且不被并发策略阻塞,执行该 job

当任务进入运行状态或到了下一次执行时间, job 重新排队
// 代码
/*
Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
	"context"
	"fmt"
	"sort"

	"github.com/go-logr/logr"
	"github.com/robfig/cron"
	kbatch "k8s.io/api/batch/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	ref "k8s.io/client-go/tools/reference"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"

	batchv1 "cron/api/v1"
	"time"
)

// CronJobReconciler reconciles a CronJob object
// CronJob 调谐器对 CronJob 对象进行调谐
type CronJobReconciler struct {
	client.Client
	Log    logr.Logger
	Scheme *runtime.Scheme
	Clock
}

type realClock struct{}

func (_ realClock) Now() time.Time { return time.Now() }

// clock接口可以获取当前的时间
// 可以帮助我们在测试中模拟计时
type Clock interface {
	Now() time.Time
}

// 针对控制器的权限
// +kubebuilder:rbac:groups=batch.quanheng.dev,resources=cronjobs/finalizers,verbs=update
// +kubebuilder:rbac:groups=batch.quanheng.dev,resources=cronjobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=batch.quanheng.dev,resources=cronjobs/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=batch,resources=jobs,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=batch,resources=jobs/status,verbs=get

// Reconcile is part of the main kubernetes reconciliation loop which aims to
// move the current state of the cluster closer to the desired state.
// TODO(user): Modify the Reconcile function to compare the state specified by
// the CronJob object against the actual cluster state, and then
// perform operations to make the cluster state reflect the state specified by
// the user.
//
// For more details, check Reconcile and its Result here:
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.16.0/pkg/reconcile
// 实现了 reconcile.Reconciler 接口

var (
	scheduledTimeAnnotation = "batchv1.quanheng.dev/scheduled-at"
)

func (r *CronJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {

	_ = log.FromContext(ctx)
	log := r.Log.WithValues("cronjob", req.NamespacedName)
	// 1、根据名称加载定时任务
	var cronJob batchv1.CronJob

	if err := r.Get(ctx, req.NamespacedName, &cronJob); err != nil {
		log.Error(err, "无法获取 CronJob")
		//忽略掉 not-found 错误,它们不能通过重新排队修复(要等待新的通知)
		//在删除一个不存在的对象时,可能会报这个错误。
		return ctrl.Result{}, client.IgnoreNotFound(err)
	}
	// 2、列出所有有效 job,更新它们的状态
	/*
		调谐器会获取 cronjob 下的所有 job 以更新它们状态。随着 cronjob 数量的增加,遍历全部 conjob 查找会变的相当低效。为了提高查询效率,这些任务会根据控制器名称建立索引。缓存后的 job 对象会 被添加上一个 jobOwnerKey 字段。这个字段引用其归属控制器和函数作为索引。在下文中,我们将配置 manager 作为这个字段的索引。
	*/
	var childJobs kbatch.JobList
	if err := r.List(ctx, &childJobs, client.InNamespace(req.Namespace), client.MatchingFields{jobOwnerKey: req.Name}); err != nil {
		log.Info("无法列出子Job", "namespace", req.Namespace, jobOwnerKey)
		return ctrl.Result{}, err
	}

	// 找出所有有效的 job
	var activeJobs []*kbatch.Job
	var successfulJobs []*kbatch.Job
	var failedJobs []*kbatch.Job
	var mostRecentTime *time.Time // 记录其最近一次运行时间以便更新状态

	/*
		当一个 job 被标记为 “succeeded” 或 “failed” 时,我们认为这个任务处于 “finished” 状态。 Status conditions 允许我们给 job 对象添加额外的状态信息,开发人员或控制器可以通过 这些校验信息来检查 job 的完成或健康状态
	*/
	isJobFinished := func(job *kbatch.Job) (bool, kbatch.JobConditionType) {
		for _, c := range job.Status.Conditions {
			if (c.Type == kbatch.JobComplete || c.Type == kbatch.JobFailed) && c.Status == corev1.ConditionTrue {
				return true, c.Type
			}
		}

		return false, ""
	}

	/*
		获取创建 job 时注释中排定的执行时间
	*/
	getScheduledTimeForJob := func(job *kbatch.Job) (*time.Time, error) {
		timeRaw := job.Annotations[scheduledTimeAnnotation]
		if len(timeRaw) == 0 {
			return nil, nil
		}

		timeParsed, err := time.Parse(time.RFC3339, timeRaw)
		if err != nil {
			return nil, err
		}
		return &timeParsed, nil
	}
	// 遍历所有 job,更新状态
	for i, job := range childJobs.Items {
		_, finishedType := isJobFinished(&job)
		switch finishedType {
		case "": // ongoing
			activeJobs = append(activeJobs, &childJobs.Items[i])
		case kbatch.JobFailed:
			failedJobs = append(failedJobs, &childJobs.Items[i])
		case kbatch.JobComplete:
			successfulJobs = append(successfulJobs, &childJobs.Items[i])
		}

		//将启动时间存放在注释中,当job生效时可以从中读取
		scheduledTimeForJob, err := getScheduledTimeForJob(&job)
		if err != nil {
			log.Error(err, "无法解析调度时间", "job", &job)
			continue
		}
		if scheduledTimeForJob != nil {
			if mostRecentTime == nil {
				mostRecentTime = scheduledTimeForJob
			} else if mostRecentTime.Before(*scheduledTimeForJob) {
				mostRecentTime = scheduledTimeForJob
			}
		}
	}

	if mostRecentTime != nil {
		cronJob.Status.LastScheduleTime = &metav1.Time{Time: *mostRecentTime}
	} else {
		cronJob.Status.LastScheduleTime = nil
	}
	cronJob.Status.Active = nil
	for _, activeJob := range activeJobs {
		jobRef, err := ref.GetReference(r.Scheme, activeJob)
		if err != nil {
			log.Error(err, "unable to make reference to active job", "job", activeJob)
			continue
		}
		cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
	}
	// 记录job的状态
	log.Info("job count", "active jobs", len(activeJobs), "successful jobs", len(successfulJobs), "failed jobs", len(failedJobs))
	// 更新状态
	if err := r.Status().Update(ctx, &cronJob); err != nil {
		log.Error(err, "无法更新 CronJob 状态")
		return ctrl.Result{}, err
	}

	// 3、 根据保留的历史版本数清理过旧的 job
	/*
		注意: 删除操作采用的“尽力而为”策略,如果个别 job 删除失败了,不会将其重新排队,直接结束删除操作
	*/
	if cronJob.Spec.FailedJobsHistoryLimit != nil {
		sort.Slice(failedJobs, func(i, j int) bool {
			if failedJobs[i].Status.StartTime == nil {
				return failedJobs[j].Status.StartTime != nil
			}
			return failedJobs[i].Status.StartTime.Before(failedJobs[j].Status.StartTime)
		})
		for i, job := range failedJobs {
			if int32(i) >= int32(len(failedJobs))-*cronJob.Spec.FailedJobsHistoryLimit {
				break
			}
			if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
				log.Error(err, "unable to delete old failed job", "job", job)
			} else {
				log.V(0).Info("deleted old failed job", "job", job)
			}
		}
	}

	if cronJob.Spec.SuccessfulJobsHistoryLimit != nil {
		sort.Slice(successfulJobs, func(i, j int) bool {
			if successfulJobs[i].Status.StartTime == nil {
				return successfulJobs[j].Status.StartTime != nil
			}
			return successfulJobs[i].Status.StartTime.Before(successfulJobs[j].Status.StartTime)
		})
		for i, job := range successfulJobs {
			if int32(i) >= int32(len(successfulJobs))-*cronJob.Spec.SuccessfulJobsHistoryLimit {
				break
			}
			if err := r.Delete(ctx, job, client.PropagationPolicy(metav1.DeletePropagationBackground)); (err) != nil {
				log.Error(err, "unable to delete old successful job", "job", job)
			} else {
				log.V(0).Info("deleted old successful job", "job", job)
			}
		}
	}

	// 4、检查是否被挂起
	if cronJob.Spec.Suspend != nil && *cronJob.Spec.Suspend {
		log.V(1).Info("cronjob suspended, skipping")
		return ctrl.Result{}, nil
	}

	// 5、计算下一次执行时间
	// 定义一个函数,用于计算下一次执行时间
	getNextSchedule := func(cronJob *batchv1.CronJob, now time.Time) (lastMissed time.Time, next time.Time, err error) {
		sched, err := cron.ParseStandard(cronJob.Spec.Schedule)
		if err != nil {
			return time.Time{}, time.Time{}, fmt.Errorf("Unparseable schedule %q: %v", cronJob.Spec.Schedule, err)
		}

		// 出于优化的目的,我们可以使用点技巧。从上一次观察到的执行时间开始执行,
		// 这个执行时间可以被在这里被读取。但是意义不大,因为我们刚更新了这个值。

		var earliestTime time.Time
		if cronJob.Status.LastScheduleTime != nil {
			earliestTime = cronJob.Status.LastScheduleTime.Time
		} else {
			earliestTime = cronJob.ObjectMeta.CreationTimestamp.Time
		}
		if cronJob.Spec.StartingDeadlineSeconds != nil {
			// 如果开始执行时间超过了截止时间,不再执行
			schedulingDeadline := now.Add(-time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds))

			if schedulingDeadline.After(earliestTime) {
				earliestTime = schedulingDeadline
			}
		}
		if earliestTime.After(now) {
			return time.Time{}, sched.Next(now), nil
		}

		starts := 0
		for t := sched.Next(earliestTime); !t.After(now); t = sched.Next(t) {
			lastMissed = t
			// 一个 CronJob 可能会遗漏多次执行。举个例子,周五 5:00pm 技术人员下班后,
			// 控制器在 5:01pm 发生了异常。然后直到周二早上才有技术人员发现问题并
			// 重启控制器。那么所有的以1小时为周期执行的定时任务,在没有技术人员
			// 进一步的干预下,都会有 80 多个 job 在恢复正常后一并启动(如果 job 允许
			// 多并发和延迟启动)

			// 如果 CronJob 的某些地方出现异常,控制器或 apiservers (用于设置任务创建时间)
			// 的时钟不正确, 那么就有可能出现错过很多次执行时间的情形(跨度可达数十年)
			// 这将会占满控制器的CPU和内存资源。这种情况下,我们不需要列出错过的全部
			// 执行时间。

			starts++
			if starts > 100 {
				// 获取不到最近一次执行时间,直接返回空切片
				return time.Time{}, time.Time{}, fmt.Errorf("Too many missed start times (> 100). Set or decrease .spec.startingDeadlineSeconds or check clock skew.")
			}
		}
		return lastMissed, sched.Next(now), nil
	}

	/*
		// 计算出定时任务下一次执行时间(或是遗漏的执行时间)
	*/
	missedRun, nextRun, err := getNextSchedule(&cronJob, r.Now())
	if err != nil {
		log.Error(err, "unable to figure out CronJob schedule")
		// 重新排队直到有更新修复这次定时任务调度,不必返回错误
		return ctrl.Result{}, nil
	}

	// 将准备好的请求加入队列直到下次执行, 然后确定这些 job 是否要实际执行
	scheduledResult := ctrl.Result{RequeueAfter: nextRun.Sub(r.Now())} // 保存以便别处复用
	log = log.WithValues("now", r.Now(), "next run", nextRun)

	// 6、如果 job 符合执行时机,并且没有超出截止时间,且不被并发策略阻塞,执行该 job
	if missedRun.IsZero() {
		log.V(1).Info("no upcoming scheduled times, sleeping until next")
		return scheduledResult, nil
	}

	// 确保错过的执行没有超过截止时间
	log = log.WithValues("current run", missedRun)
	tooLate := false
	if cronJob.Spec.StartingDeadlineSeconds != nil {
		tooLate = missedRun.Add(time.Duration(*cronJob.Spec.StartingDeadlineSeconds) * time.Second).Before(r.Now())
	}
	if tooLate {
		log.V(1).Info("missed starting deadline for last run, sleeping till next")
		// TODO(directxman12): events
		return scheduledResult, nil
	}

	// 确定要 job 的执行策略 —— 并发策略可能禁止多个job同时运行
	if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(activeJobs) > 0 {
		log.V(1).Info("concurrency policy blocks concurrent runs, skipping", "num active", len(activeJobs))
		return scheduledResult, nil
	}

	// 直接覆盖现有 job
	if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
		for _, activeJob := range activeJobs {
			// we don't care if the job was already deleted
			if err := r.Delete(ctx, activeJob, client.PropagationPolicy(metav1.DeletePropagationBackground)); client.IgnoreNotFound(err) != nil {
				log.Error(err, "unable to delete active job", "job", activeJob)
				return ctrl.Result{}, err
			}
		}
	}
	/*
		基于 CronJob 模版构建 job,从模板复制 spec 及对象的元信息。
		然后在注解中设置执行时间,这样我们可以在每次的调谐中获取起作为“上一次执行时间”
		最后,还需要设置 owner reference字段。当我们删除 CronJob 时,Kubernetes 垃圾收集 器会根据这个字段对应的 job。同时,当某个job状态发生变更(创建,删除,完成)时, controller-runtime 可以根据这个字段识别出要对那个 CronJob 进行调谐。
	*/
	constructJobForCronJob := func(cronJob *batchv1.CronJob, scheduledTime time.Time) (*kbatch.Job, error) {
		// job 名称带上执行时间以确保唯一性,避免排定执行时间的 job 创建两次
		name := fmt.Sprintf("%s-%d", cronJob.Name, scheduledTime.Unix())

		job := &kbatch.Job{
			ObjectMeta: metav1.ObjectMeta{
				Labels:      make(map[string]string),
				Annotations: make(map[string]string),
				Name:        name,
				Namespace:   cronJob.Namespace,
			},
			Spec: *cronJob.Spec.JobTemplate.Spec.DeepCopy(),
		}
		for k, v := range cronJob.Spec.JobTemplate.Annotations {
			job.Annotations[k] = v
		}
		job.Annotations[scheduledTimeAnnotation] = scheduledTime.Format(time.RFC3339)
		for k, v := range cronJob.Spec.JobTemplate.Labels {
			job.Labels[k] = v
		}
		if err := ctrl.SetControllerReference(cronJob, job, r.Scheme); err != nil {
			return nil, err
		}

		return job, nil
	}

	// 构建 job
	job, err := constructJobForCronJob(&cronJob, missedRun)
	if err != nil {
		log.Error(err, "unable to construct job from template")
		// job 的 spec 没有变更,无需重新排队
		return scheduledResult, nil
	}

	// ...在集群中创建 job
	if err := r.Create(ctx, job); err != nil {
		log.Error(err, "unable to create Job for CronJob", "job", job)
		return ctrl.Result{}, err
	}

	log.V(1).Info("created Job for CronJob run", "job", job)
	// 7、当 job 开始运行或到了 job 下一次的执行时间,重新排队

	return scheduledResult, nil
}

var (
	jobOwnerKey = ".metadata.controller"
	apiGVStr    = batchv1.GroupVersion.String()
)

func (r *CronJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
	// 此处不是测试,我们需要创建一个真实的时钟
	if r.Clock == nil {
		r.Clock = realClock{}
	}
	if err := mgr.GetFieldIndexer().IndexField(context.Background(), &kbatch.Job{}, jobOwnerKey, func(o client.Object) []string {
		job := o.(*kbatch.Job)
		owner := metav1.GetControllerOf(job)
		if owner == nil {
			return nil
		}
		if owner.APIVersion != apiGVStr || owner.Kind != "CronJob" {
			return nil
		}
		return []string{owner.Name}
	}); err != nil {
		return err
	}

	return ctrl.NewControllerManagedBy(mgr).
		For(&batchv1.CronJob{}).
		Owns(&kbatch.Job{}).
		Complete(r)
}

2.2.4 webhook

2.2.4.1 种类

# admission webhook
	更改准入webhook:当接收到请求时对资源进行添加字段等信息
	验证准入webhook:当接收到请求时对资源进行验证,不符合条件的请求将被拒绝
# authorization webhook
	权限认证
# CRD conversion webhook。
	将资源版本进行转化

2.2.4.2 实现

# 创建一个框架
kubebuilder create webhook --group batch --version v1 --kind CronJob --defaulting --programmatic-validation
# 安装cert-manager
# kind安装(可选)
go install sigs.k8s.io/kind@v0.20.0 && sudo ln -sv $(go env GOPATH)/bin/kind /usr/bin/

2.2.4.3 配置webhook

config/default/kustomization.yaml
resources:
# 打开webhook和certmanager
... 
# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# crd/kustomization.yaml
- ../webhook
# [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER'. 'WEBHOOK' components are required.
- ../certmanager
# 将打开webhook账号认证,和ca自动注入
patches:
# Protect the /metrics endpoint by putting it behind auth.
# If you want your controller-manager to expose the /metrics
# endpoint w/o any authn/z, please comment the following line.
- path: manager_auth_proxy_patch.yaml

# [WEBHOOK] To enable webhook, uncomment all the sections with [WEBHOOK] prefix including the one in
# crd/kustomization.yaml
- path: manager_webhook_patch.yaml

# [CERTMANAGER] To enable cert-manager, uncomment all sections with 'CERTMANAGER'.
# Uncomment 'CERTMANAGER' sections in crd/kustomization.yaml to enable the CA injection in the admission webhooks.
# 'CERTMANAGER' needs to be enabled to use ca injection
- path: webhookcainjection_patch.yaml
# 打开下面的配置进行替换
replacements:
  - source:
      kind: Certificate
      ......

2.2.5 运行控制器

debug:
export ENABLE_WEBHOOKS=false && make run

2.3 工具

2.3.1 controller-gen

生成工具代码和 Kubernetes 的 YAML 对象
用一种特殊的 “标记注释”(以 // + 开头)来表示这里要插入字段,类型和包相关的信息
通常是从以 _types.go 结尾的文件中产生的
此字段一般在要定义的规则对象之前,以 '// +kubebuilder:validation:' 开头
// 查看帮助信息
controller-gen -hhh

2.3.1.1 验证

type ToySpec struct {
    // 定义一个字符串属性,最短为1,最长为15
    // +kubebuilder:validation:MaxLength=15
    // +kubebuilder:validation:MinLength=1
    Name string `json:"name,omitempty"`

2.3.1.2 打印信息

// 2、打印其他信息
/*
	kubectl get 可以展示的信息,下面的例子定义了要打印4列信息
*/
// +kubebuilder:printcolumn:name="Alias",type=string,JSONPath=`.spec.alias`
// +kubebuilder:printcolumn:name="Rank",type=integer,JSONPath=`.spec.rank`
// +kubebuilder:printcolumn:name="Bravely Run Away",type=boolean,JSONPath=`.spec.knights[?(@ == "Sir Robin")]`,description="when danger rears its ugly head, he bravely turned his tail and fled",priority=10
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
type Toy struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   ToySpec   `json:"spec,omitempty"`
    Status ToyStatus `json:"status,omitempty"`
}

2.3.1.3 子资源

/*
	设置子资源的状态。当时启用状态时,更新主资源不会修改它的状态。类似的,更新子资源状态也只是修改了状态字段。
*/
// +kubebuilder:subresource:status
type Toy struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   ToySpec   `json:"spec,omitempty"`
    Status ToyStatus `json:"status,omitempty"`
}

2.3.1.4 扩缩容

/*
	+kubebuilder:subresource:scale 启用扩缩容	
	启用后,用户可以使用 kubectl scale 来对你的资源进行扩容或者缩容
	selectorpath 参数被指定为字符串形式的标签选择器,HPA 将可以自动扩容你的资源
*/
type CustomSetSpec struct {
    Replicas *int32 `json:"replicas"`
}

type CustomSetStatus struct {
    Replicas int32 `json:"replicas"`
    Selector string `json:"selector"` // this must be the string form of the selector
}

// +kubebuilder:subresource:status
// +kubebuilder:subresource:scale:specpath=.spec.replicas,statuspath=.status.replicas,selectorpath=.status.selector
type CustomSet struct {
    metav1.TypeMeta   `json:",inline"`
    metav1.ObjectMeta `json:"metadata,omitempty"`

    Spec   CustomSetSpec   `json:"spec,omitempty"`
    Status CustomSetStatus `json:"status,omitempty"`
}

2.3.2 Finalizers

一种垃圾回收机制,使用此机制可以实现级联删除,比如deployment和pod直接就存在此机制
package controllers

import (
    "context"

    "github.com/go-logr/logr"
    ctrl "sigs.k8s.io/controller-runtime"
    "sigs.k8s.io/controller-runtime/pkg/client"

    batchv1 "tutorial.kubebuilder.io/project/api/v1"
)
func (r *CronJobReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
    ctx := context.Background()
    log := r.Log.WithValues("cronjob", req.NamespacedName)

    var cronJob *batchv1.CronJob
    if err := r.Get(ctx, req.NamespacedName, cronJob); err != nil {
        log.Error(err, "unable to fetch CronJob")
        // 在我们删除一个不存在的对象的时,我们会遇到not-found errors这样的报错
            // 我们将暂时忽略,因为不能通过重新加入队列的方式来修复这些错误
            //(我们需要等待新的通知),而且我们可以根据删除的请求来获取它们
        return ctrl.Result{}, client.IgnoreNotFound(err)
    }

    // 自定义 finalizer 的名字
    myFinalizerName := "storage.finalizers.tutorial.kubebuilder.io"

    // 检查 DeletionTimestamp 以确定对象是否在删除中
    if cronJob.ObjectMeta.DeletionTimestamp.IsZero() {
        // 如果当前对象没有 finalizer, 说明其没有处于正被删除的状态。
        // 接着让我们添加 finalizer 并更新对象,相当于注册我们的 finalizer。
        if !containsString(cronJob.ObjectMeta.Finalizers, myFinalizerName) {
            cronJob.ObjectMeta.Finalizers = append(cronJob.ObjectMeta.Finalizers, myFinalizerName)
            if err := r.Update(context.Background(), cronJob); err != nil {
                return ctrl.Result{}, err
            }
        }
    } else {
        // 这个对象将要被删除
        if containsString(cronJob.ObjectMeta.Finalizers, myFinalizerName) {
            // 我们的 finalizer 就在这, 接下来就是处理外部依赖
            if err := r.deleteExternalResources(cronJob); err != nil {
                // 如果无法在此处删除外部依赖项,则返回错误
                // 以便可以重试
                return ctrl.Result{}, err
            }
            // 从列表中删除我们的 finalizer 并进行更新。
            cronJob.ObjectMeta.Finalizers = removeString(cronJob.ObjectMeta.Finalizers, myFinalizerName)
            if err := r.Update(context.Background(), cronJob); err != nil {
                return ctrl.Result{}, err
            }
        }

        // 当它们被删除的时候停止 reconciliation
        return ctrl.Result{}, nil
    }

    // Your reconcile logic

    return ctrl.Result{}, nil
}

func (r *Reconciler) deleteExternalResources(cronJob *batch.CronJob) error {

    // 删除与 cronJob 相关的任何外部资源
    // 确保删除是幂等性操作且可以安全调用同一对象多次。
}

// 辅助函数用于检查并从字符串切片中删除字符串。
func containsString(slice []string, s string) bool {
    for _, item := range slice {
        if item == s {
            return true
        }
    }
    return false
}

func removeString(slice []string, s string) (result []string) {
    for _, item := range slice {
        if item == s {
            continue
        }
        result = append(result, item)
    }
    return
}

2.3.3 kind

2.3.3.1 安装

go install sigs.k8s.io/kind@v0.20.0 && sudo ln -sv $(go env GOPATH)/bin/kind /usr/bin/

2.3.3.2 镜像

https://hub.docker.com/r/kindest/node/tags?page=1

2.3.3.3 使用

配置
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
- role: worker
- role: worker
创建
kind create cluster --name k8s-1.14 --config kind-config.yaml --image=quanheng.com/k8s/

2.3.4 标记语法(生成资源)

2.3.4.1 kubebuilder:printcolumn

用于生成 kubectl get 输出列的标记
    name: 列的显示名称。
    type: 列的数据类型,通常是字符串("string")、整数("integer")、浮点数("number")等。
    JSONPath: 用于从资源对象中提取数据的 JSONPath 表达式
    description: 列的描述。
    priority: 列的显示优先级。
		当值大于0那么要加-owide才可以显示,默认为0,可以直接显示
    format: 列的格式,参考https://github.com/OAI/OpenAPI-Specification/blob/main/versions/2.0.md#data-types。
// +kubebuilder:printcolumn:name="Image",type="string",JSONPath=".spec.jobTemplate.spec.template.spec.containers[0].image",description="The Docker Image of MyAPP"
// +kubebuilder:printcolumn:name="Cron",type="string",JSONPath=".spec.schedule",description="test"
// +kubebuilder:subresource:status
type CronJob struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   CronJobSpec   `json:"spec,omitempty"`
	Status CronJobStatus `json:"status,omitempty"`
}

2.3.4.2 kubebuilder:resource

kubebuilder:resource 是用于控制 Kubernetes 资源的行为和属性的标记。在使用 kubebuilder 构建 Kubernetes 控制器时,这个标记通常用于指定自定义资源的行为和属性,比如kubectl api-resources中显示的信息
    path: 指定 Kubernetes 资源的 API 路径。这个属性通常用于自定义资源定义文件中,用于指定资源的 API 路径。
    scope: 指定资源的命名空间作用域。可以是 "Cluster"(跨命名空间)或 "Namespaced"(单一命名空间)。
    name: 指定资源的名称。这通常用于定义资源的名称,以便在 kubectl 中使用。
    kind: 指定资源的 Kubernetes 类型(Kind)。
    shortName: 指定资源的短名称,用于简化资源的引用。
    singular: 指定资源的单数形式。
    plural: 指定资源的复数形式。
    categories: 指定资源所属的类别。
    storageVersion: 指定资源的存储版本。
// +kubebuilder:resource:path=myresources,scope=Namespaced,shortName=mr,singular=myresource,plural=myresources,categories={mygroup}
type MyCustomResource struct {
  // ...
}

2.3.4.3 kubebuilder:skipversion

生成代码时跳过某个 API 版本,一般用于api的变更或者实验性的api

2.3.4.4 kubebuilder:storageversion

用于指定 Custom Resource 的存储版本(storage version)
存储版本是一个 API 资源的版本,该版本的对象将被持久化到 etcd 存储中。这对于确定对象如何存储在 etcd 中以及如何进行版本升级非常重要。

2.3.4.5 kubebuilder:subresource:scale

针对可以被scale的资源进行标记使其可以进行scale
见2.3.1.4

2.3.4.6 kubebuilder:subresource:status

开启status子资源

2.3.4.7 kubebuilder:unservedversion

和skip类似2.3.4.3

2.3.4.8 +groupName

指定组名和版本名称
// +groupName=mygroup.example.com
// +versionName=v1alpha1
type MyCustomResource struct {
  // ...
}

2.3.4.9 kubebuilder:skip

跳过某些元素
可以是包,结构体,字段

2.3.4.10 kubebuilder:object:root

定义自定义资源的根部结构

2.3.4.11 // +kubebuilder:scaffold:imports

控制代码生成时导入路径的标签。该标签指定了生成的代码文件中需要导入的外部包的路径

2.3.4.12 +kubebuilder:scaffold:scheme

生成的代码文件中需要包含用于注册自定义资源和控制器的 Scheme 代码

2.3.5 标记语法(验证)

2.3.5.1 kubebuilder:default

为字段设置默认值
将a字段的默认值设置为a1,若不指定此值默认为a1
// +kubebuilder:default=a1
type MyCustomResource struct {
  // ...
  a string `json:"myField,omitempty" +kubebuilder:default=myDefaultValue`
}

2.3.5.2 kubebuilder:validation:EmbeddedResource

在 Kubernetes 控制器生成的 DeepCopy 方法中,将嵌套资源(Embedded Resource)的字段进行深度拷贝
在生成的 DeepCopy 方法中自动处理 EmbeddedResource 的深度拷贝
// +kubebuilder:validation:EmbeddedResource
type MyCustomResource struct {
  // ...
  EmbeddedResource MyEmbeddedResource `json:"embeddedResource,omitempty"`
}

2.3.5.3 kubebuilder:validation:Enum

将字段的值进行枚举,仅允许在此选择
// +kubebuilder:validation:Enum=Value1;Value2;Value3
type MyCustomResource struct {
  // ...
  MyField string `json:"myField,omitempty" +kubebuilder:validation:Enum=Value1;Value2;Value3`
}

2.3.5.4 kubebuilder:validation:ExclusiveMaximum

用来确保字段的值小于指定的最大值,并且不等于该最大值
// +kubebuilder:validation:ExclusiveMaximum=100
type MyCustomResource struct {
    // 其他字段...
    Value int `json:"value"`
}

2.3.5.5 kubebuilder:validation:ExclusiveMinimum

同上面,指定字段的最小值,且不等于最小值

2.3.5.6 kubebuilder:validation:Format

指定字段的格式,比如指定格式为email,string,date-time
// +kubebuilder:validation:Format=email
type MyCustomResource struct {
    // 其他字段...
    EmailAddress string `json
}

2.3.5.7 kubebuilder:validation:{Max|Min}Items

指定列表中最多或最少允许多少元素
// +kubebuilder:validation:MaxItems=5
// +kubebuilder:validation:MinItems=1
type MyCustomResource struct {
    // 其他字段...
    Items []string `json:"items"`
}

2.3.5.8 kubebuilder:validation:MaxLength

指定字符串的长度
// +kubebuilder:validation:MaxLength=50
type MyCustomResource struct {
    // 其他字段...
    Description string `json:"description"`
}

2.3.5.9 kubebuilder:validation:Maximum

kubebuilder:validation:Maximum
指定int的最大值

2.3.5.10 kubebuilder:validation:MultipleOf

指定此字段数值必须为给定条件的倍数

2.3.5.11 kubebuilder:validation:Pattern

指定字段必须满足给定的正则表达式
// +kubebuilder:validation:Pattern=^[A-Za-z0-9]+$
type MyCustomResource struct {
    // 其他字段...
    Name string `json:"name"`
}

2.3.5.12 kubebuilder:validation:Required

指定此字段必须有值
type MyCustomResource struct {
    // 其他字段...

    // Name 是一个必填字段
    // +kubebuilder:validation:Required
    Name string `json:"name"`
}

2.3.5.13 ubebuilder:validation:Type

字段的类型必须和指定的类型一致

2.3.5.14 kubebuilder:validation:UniqueItems

对于数组类型的元素必须唯一,不可重复
// +kubebuilder:validation:UniqueItems=true
type MyCustomResource struct {
    // 其他字段...
    Items []string `json:"items"`
}

2.3.5.15 nullable

允许字段使用null值

2.3.5.16 kubebuilder:validation:Enum

字段的类型为枚举中的一种

2.3.5.17 kubebuilder:validation:ExclusiveMaximum

int类型的数值的最大值

2.3.5.18 kubebuilder:validation:Format

字段值需要符合指定的格式
ipv4,email,hostname,uri

2.3.5.19 kubebuilder:validation:MaxItems

列表中最多允许多少元素
// +kubebuilder:validation:MaxItems=5
type MyCustomResource struct {
    // 其他字段...
    Items []string `json:"items"`
}

2.3.5.20 kubebuilder:validation:MaxLength

字段的值长度最大值 string

2.3.5.21 kubebuilder:validation:Maximum

字段中值的最大值 int

2.3.5.22 kubebuilder:validation:MultipleOf

字段值必须为指定值得倍数

2.3.6 标记语法(处理)

2.3.6.1 kubebuilder:pruning:PreserveUnknownFields

更新字段时,如果有未知字段不将其删除而进行保留
// +kubebuilder:pruning:PreserveUnknownFields
type MyCustomResource struct {
    // 其他字段...
    Foo string `json:"foo"`
}

2.3.7 webhook

kubebuilder:webhook
// 此例子表示在进行创建和更新操作时,进行变更型webhook操作使用名称为mymutatingwebhook.example.com的webhook
// +kubebuilder:webhook:path=/mutate-mygroup-myresource,mutating=true,failurePolicy=fail,groups=mygroup,resources=myresources,verbs=create;update,versions=v1,name=mymutatingwebhook.example.com
type MyCustomResource struct {
    // 其他字段...
    Spec MyCustomResourceSpec `json:"spec"`
}

type MyCustomResourceSpec struct {
    // 其他字段...
}
path: Webhook 服务的路径。
mutating: 指定 Webhook 是变更型(mutating)还是验证型(validating)。
failurePolicy: Webhook 失败的策略,可以是 fail、ignore 或者 dryRun。
groups: Webhook 关联的 API 组。
resources: Webhook 关联的 API 资源。
verbs: Webhook 支持的动词,如 create、update、delete 等。
versions: Webhook 关联的 API 版本。
name: Webhook 的名称。

2.3.8 RBAC

// 定义与 Kubernetes RBAC(Role-Based Access Control)相关的规则。这个标签通常与 Controller 或 Webhook 相关的代码一起用
// +kubebuilder:rbac:groups=mygroup,resources=myresources,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=mygroup,resources=myresources/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=mygroup,resources=myresources/finalizers,verbs=update
// +kubebuilder:rbac:groups=mygroup,resources=myresources/finalizers,verbs=get
// +kubebuilder:rbac:groups=mygroup,resources=myresources/finalizers,verbs=create
type MyCustomResourceReconciler struct {
    // 其他字段...
}
第一行规定了 mygroup 组下的 myresources 资源的基本操作权限:get、list、watch、create、update、patch 和 delete。
第二行规定了对 myresources 资源的 status 子资源的 get、update、patch 操作权限。
第三至五行规定了对 myresources 资源的 finalizers 子资源的 update、get、create 操作权限

2.3.9 cert-manage

image-20231121120625252

# cert-manage安装
helm repo add jetstack https://charts.jetstack.io
helm repo update
helm pull  jetstack/cert-manager
helm install cert-manager ./ --namespace cert-manager --create-namespace
# cert-manager可以实现自动颁发证书我们配置好kusimiz就行

client-go

1、客户端

1.1 RESTClient

最基础的客户端,仅对 HTTP Request 进行了封装。实现位置在 rest 目录

1.2 ClientSet

基于 RESTClient 实现,封装了 Kubernetes 内置资源(Resource)和版本(Version)的方法。实现位置在 kubernetes 目录

1.3 DynamicClient

动态客户端,基于 RESTClient 实现,封装了 Kubernetes 任意资源(包括 CRD 自定义资源)和版本的方法。实现位置在 dynamic 目录

1.4 DiscoveryClient

发现客户端,基于 RESTClient 实现,用于发现 kube-apiserver 所支持的资源组(Group)、资源版本(Versions)和资源信息(Resources)。实现位置在 discovery 目录

2、工具

2.1 clientcmd

2.1.1 利用kubeconfig生成rest.config对象

package main

import (
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	// 使用client工具生成kubeconfig配置
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
}

2.2 生成pod使用的config文件

serviceAccountPath := os.Getenv("SERVICE_ACCOUNT_PATH")
	if serviceAccountPath == "" {
		serviceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount"
	}

3、用法

3.1 resetclient

RESTclient是一种通过http方法与k8s集群进行交互的客户端,也是clent-go所有客户端的基础,所有客户端均在此基础上进行封装

3.1.1 生成客户端

package main

import (
	"context"
	"fmt"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/kubernetes/scheme"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
	// 指定GV
	config.APIPath = "/api"
	config.GroupVersion = &corev1.SchemeGroupVersion
	config.NegotiatedSerializer = scheme.Codecs
	
}

3.1.2 使用客户端与集群交互

3.1.2.1 get

package main

import (
	"context"
	"fmt"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
	config.APIPath = "/api"
	config.GroupVersion = &corev1.SchemeGroupVersion
	config.NegotiatedSerializer = scheme.Codecs
	restClient, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err.Error())
	}

	result := restClient.Get().
		Namespace("default").
		Resource("pods").
		Name("nginx-app-0").
		Do(context.TODO())

	if result.Error() != nil {
		panic(result.Error())
	}

	pod := &corev1.Pod{}
	if err := result.Into(pod); err != nil {
		panic(err)
	}

	fmt.Printf("Pod Name: %s\n", pod.Name)

}

3.1.2.2 post

/*
	新增资源
*/
package main

import (
	"context"
	"fmt"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
	config.APIPath = "/api"
	config.GroupVersion = &corev1.SchemeGroupVersion
	config.NegotiatedSerializer = scheme.Codecs
	restClient, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err.Error())
	}

	// post pod
	// 初始化一个 Pod 对象
	newPod := &corev1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name: "new-pod",
		},
		Spec: corev1.PodSpec{
			Containers: []corev1.Container{
				{
					Name:  "nginx",
					Image: "nginx:latest",
				},
			},
		},
	}
	// 调用post方法,创建Pod
	result := restClient.Post().
		Namespace("default").
		Resource("pods").
		Body(newPod).
		Do(context.TODO())

	if result.Error() != nil {
		panic(result.Error())
	}
	
	createdPod := &corev1.Pod{}
	if err := result.Into(createdPod); err != nil {
		panic(err)
	}

	fmt.Printf("Pod created: %s\n", createdPod.Name)
}

3.1.2.3 执行http请求

package main

import (
	"context"
	"fmt"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
	config.APIPath = "/api"
	config.GroupVersion = &corev1.SchemeGroupVersion
	config.NegotiatedSerializer = scheme.Codecs
	restClient, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err.Error())
	}

    // 使用Verb方法指定操作类型(DELETE,PUT,PATCH)
	result := restClient.Verb("DELETE").
		Namespace("default").
		Resource("pods").
		Name("nginx-app-0").
		Do(context.TODO())

	if result.Error() != nil {
		panic(result.Error())
	}

	fmt.Println("Pod deleted")
}

3.1.2.4 watch

package main

import (
	"context"
	"fmt"

	corev1 "k8s.io/api/core/v1"
	"k8s.io/client-go/kubernetes/scheme"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
	config.APIPath = "/api"
	config.GroupVersion = &corev1.SchemeGroupVersion
	config.NegotiatedSerializer = scheme.Codecs
	restClient, err := rest.RESTClientFor(config)
	if err != nil {
		panic(err.Error())
	}

	watcher, err := restClient.Get().
		Namespace("default").
		Resource("pods").
		Name("nginx-app-0").
		Watch(context.TODO())
	if err != nil {
		panic(err.Error())

	}
	// 处理 Watch 事件
	go func() {
		for event := range watcher.ResultChan() {
			pod, ok := event.Object.(*corev1.Pod)
			if !ok {
				fmt.Println("Unexpected object type")
				continue
			}
			fmt.Printf("Pod %s event: %s\n", event.Type, pod.Name)
		}
	}()
}

3.2 clientset

针对k8s集群核心api进行操作的客户端

3.2.1 创建客户端

clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		fmt.Printf("Error creating clientset: %v\n", err)
		os.Exit(1)
	}

3.2.2 与集群镜像交互

3.2.2.1 获取api对象

package main

import (
	"context"
	"fmt"
	"os"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		fmt.Printf("Error creating clientset: %v\n", err)
		os.Exit(1)
	}
	// 使用clientset.version.kind.list方法来进行获取
	// 获取Pods
	pods, err := clientset.CoreV1().Pods("default").List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		fmt.Printf("Error getting Pods: %v\n", err)
		os.Exit(1)
	}

	for _, pod := range pods.Items {
		fmt.Printf("Pod Name: %s\n", pod.Name)
	}
}

3.2.2.2 创建对象

package main

import (
	"context"
	"fmt"
	"os"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		fmt.Printf("Error creating clientset: %v\n", err)
		os.Exit(1)
	}
	// 新建一个pod对象 可以查看pod的结构针对性构建
	newPod := &corev1.Pod{
		ObjectMeta: metav1.ObjectMeta{
			Name: "nginx-app",
		},
		Spec: corev1.PodSpec{
			Containers: []corev1.Container{
				{
					Name:  "nginx",
					Image: "quanheng.com/k8s/nginx:1.20",
				},
			},
		},
	}
	//调用create方法创建pod
	createdPod, err := clientset.CoreV1().Pods("default").Create(context.TODO(), newPod, metav1.CreateOptions{})
	if err != nil {
		fmt.Printf("Error creating Pod: %v\n", err)
		os.Exit(1)
	}

	fmt.Printf("Pod created: %s\n", createdPod.Name)

}

3.2.2.3 删除对象

package main

import (
	"context"
	"fmt"
	"os"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		fmt.Printf("Error creating clientset: %v\n", err)
		os.Exit(1)
	}
	// 删除pod
	err = clientset.CoreV1().Pods("default").Delete(context.TODO(), "nginx-app", metav1.DeleteOptions{})
	if err != nil {
		fmt.Printf("Error deleting Pod: %v\n", err)
		os.Exit(1)
	}

	fmt.Println("Pod deleted")

}

3.2.2.4 更新资源

package main

import (
	"context"
	"fmt"
	"os"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		fmt.Printf("Error creating clientset: %v\n", err)
		os.Exit(1)
	}
	// 更新nginx-app-0的lable
	pods, err := clientset.CoreV1().Pods("default").List(context.Background(), metav1.ListOptions{})
	if err != nil {
		fmt.Printf("Error listing pods: %v\n", err)
		os.Exit(1)
	}
	for _, pod := range pods.Items {
		fmt.Printf("Pod name %s\n", pod.Name)
		if pod.Name == "nginx-app-0" {
			// 直接调用label进行标签更换
			pod.Labels["app"] = "nginx-app-0"
			// 调用update方法更新此标签
			_, err := clientset.CoreV1().Pods("default").Update(context.Background(), &pod, metav1.UpdateOptions{})
			if err != nil {
				fmt.Printf("Error updating pod: %v\n", err)
				os.Exit(1)
			}
		}

	}
}

3.2.2.5 watch

package main

import (
	"context"
	"fmt"
	"os"
	"time"

	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {
	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}
	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		fmt.Printf("Error creating clientset: %v\n", err)
		os.Exit(1)
	}
	// watch Pod
	watcher, err := clientset.CoreV1().Pods("default").Watch(context.TODO(), metav1.ListOptions{
        LabelSelector: "app=nginx", // 此处结构体可以使用过滤器进行过滤
    })
	if err != nil {
		fmt.Printf("Error setting up watch: %v\n", err)
		os.Exit(1)
	}

	// 处理 Watch 事件
	go func() {
		for event := range watcher.ResultChan() {
			pod, ok := event.Object.(*corev1.Pod)
			if !ok {
				fmt.Println("Unexpected object type")
				continue
			}
			fmt.Printf("Pod %s event: %s\n", event.Type, pod.Name)
		}
	}()
	time.Sleep(10 * time.Minute)
}

3.3 Dynamic Client

动态客户端,一般用来操作crd资源

3.3.1 创建客户端

package main

import (
	"context"
	"fmt"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {

	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}

	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
}

3.3.2 与集群交互

3.3.2.1 获取资源信息

package main

import (
	"context"
	"fmt"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {

	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}

	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
	// 获取资源
	// 指定资源和命名空间
	resource := "cronjobs"
	namespace := "default"
	// 调用动态客户端的Resource方法,指定GVR,返回一个资源的接口
	crResource := dynamicClient.Resource(schema.GroupVersionResource{Group: "batch.quanheng.dev", Version: "v1", Resource: resource})
	// 调用接口内的方法,获取资源
	crresource, err := crResource.Namespace(namespace).Get(context.TODO(), "cronjob-test", metav1.GetOptions{})
	if err != nil {
		panic(err.Error())
	}

	fmt.Printf("Pod Name: %s\n", crresource.GetName())
}

3.3.2.2 创建新资源

package main

import (
	"context"
	"fmt"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {

	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}

	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
	// 新建crd
	// 定义crd
	resource := "cronjobs"
	namespace := "default"
	crdResource := dynamicClient.Resource(schema.GroupVersionResource{Group: "batch.quanheng.dev", Version: "v1", Resource: resource})
	newCrd := map[string]interface{}{
		"apiVersion": "batch.quanheng.dev/v1",
		"kind":       "CronJob",
		"metadata": map[string]interface{}{
			"name": "new-cronjob",
		},
		"spec": map[string]interface{}{
			"schedule": "*/1 * * * *",
			"jobTemplate": map[string]interface{}{
				"spec": map[string]interface{}{
					"template": map[string]interface{}{
						"spec": map[string]interface{}{
							"containers": []map[string]interface{}{
								{
									"name":  "cronjob-1",
									"image": "quanheng.com/k8s/busybox@sha256:acaddd9ed544f7baf3373064064a51250b14cfe3ec604d65765a53da5958e5f5",
								},
							},
						},
					},
				},
			},
		},
	}
    // 调用Create方法创建crd,传入crd对象(一个空接口),注意将crd资源转化为空接口(map类型)
	createdCrd, err := crdResource.Namespace(namespace).Create(context.TODO(), &unstructured.Unstructured{Object: newCrd}, metav1.CreateOptions{})
	if err != nil {
		panic(err.Error())
	}
	fmt.Printf("Pod created: %s\n", createdCrd.GetName())
}

3.3.3 列出指定ns下的指定资源

package main

import (
	"context"
	"fmt"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {

	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}

	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
	// 列出所有的cronjobs
	resource := "cronjobs"
	namespace := "default"
	crdResource := dynamicClient.Resource(schema.GroupVersionResource{Group: "batch.quanheng.dev", Version: "v1", Resource: resource})
	// 调用list方法,返回所有的pod列表
	crdList, err := crdResource.Namespace(namespace).List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		panic(err.Error())
	}
	// 打印所有的pod名称
	for _, crd := range crdList.Items {
		fmt.Printf("Crd Name: %s\n", crd.GetName())
	}

}

3.3.4 更新资源

package main

import (
	"context"
	"fmt"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {

	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}

	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
	// 更新crd信息
	resource := "cronjobs"
	namespace := "default"
	crdResource := dynamicClient.Resource(schema.GroupVersionResource{Group: "batch.quanheng.dev", Version: "v1", Resource: resource})

	crd, err := crdResource.Namespace(namespace).Get(context.TODO(), "new-cronjob", metav1.GetOptions{})
	if err != nil {
		panic(err.Error())
	}
	/*
		创建使用的crd对象
		newCrd := map[string]interface{}{
			"apiVersion": "batch.quanheng.dev/v1",
			"kind":       "CronJob",
			"metadata": map[string]interface{}{
				"name": "new-cronjob",
			},
			"spec": map[string]interface{}{
				"schedule": "/1 * * * *",
				"jobTemplate": map[string]interface{}{
					"spec": map[string]interface{}{
						"template": map[string]interface{}{
							"spec": map[string]interface{}{
								"containers": []map[string]interface{}{
									{
										"name":  "cronjob-1",
										"image": "quanheng.com/k8s/busybox@sha256:acaddd9ed544f7baf3373064064a51250b14cfe3ec604d65765a53da5958e5f5",
									},
								},
							},
						},
					},
				},
			},
		}
	*/
	// 修改 crd 对象
	crd.Object["spec"].(map[string]interface{})["schedule"] = "*/10 * * * *"
	// 调用Update方法更新
	updatedcrd, err := crdResource.Namespace(namespace).Update(context.TODO(), crd, metav1.UpdateOptions{})
	if err != nil {
		panic(err.Error())
	}

	fmt.Printf("crd updated: %s\n", updatedcrd.GetName())

}

3.3.5 删除crd

package main

import (
	"context"
	"fmt"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime/schema"
	"k8s.io/client-go/dynamic"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {

	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}

	dynamicClient, err := dynamic.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
	// 定义资源
	resource := "cronjobs"
	namespace := "default"
	podResource := dynamicClient.Resource(schema.GroupVersionResource{Group: "batch.quanheng.dev", Version: "v1", Resource: resource})

	err = podResource.Namespace(namespace).Delete(context.TODO(), "new-cronjob", metav1.DeleteOptions{})
	if err != nil {
		panic(err.Error())
	}

	fmt.Println("cronjob deleted")

}

3.4 Discovery Client

3.4.1 列出GV

package main

import (
	"fmt"

	"k8s.io/client-go/discovery"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {

	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}

	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	preferredResources, err := discoveryClient.ServerPreferredResources()
	if err != nil {
		panic(err.Error())
	}

	for _, group := range preferredResources {
		if group.GroupVersion == "batch.quanheng.dev/v1" {
			fmt.Printf("API Group: %s\n", group.GroupVersion)
			for _, resource := range group.APIResources {
				fmt.Printf("- Resource: %s, Kind: %s\n", resource.Name, resource.Kind)
			}
		}
	}

}

3.4.2 列出G

package main

import (
	"fmt"

	"k8s.io/client-go/discovery"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {

	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}

	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	serverGroups, err := discoveryClient.ServerGroups()
	if err != nil {
		panic(err.Error())
	}

	for _, group := range serverGroups.Groups {
		fmt.Printf("API Group: %s\n", group.Name)
	}

}

3.4.3 列出指定的G下的R,K

package main

import (
	"fmt"

	"k8s.io/client-go/discovery"
	"k8s.io/client-go/tools/clientcmd"
)

func main() {

	config, err := clientcmd.BuildConfigFromFlags("", "kubeconfig")
	if err != nil {
		panic(err.Error())
	}

	discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	groupVersion := "v1"
	serverResources, err := discoveryClient.ServerResourcesForGroupVersion(groupVersion)
	if err != nil {
		panic(err.Error())
	}

	for _, resource := range serverResources.APIResources {
		fmt.Printf("Resource: %s, Kind: %s\n", resource.Name, resource.Kind)
	}

}

4、informer机制

用于在客户端缓存中维护对 Kubernetes 资源对象的实时视图的一种机制。Informer 允许应用程序监视和响应 Kubernetes 资源对象的变化,而不必频繁地查询 Kubernetes API

image-20231113151756521

个人项目

1、subnet

目标: 使用kubectl 获取集群pod网段已经已使用量和剩余可以使用量

1.1 初始化

mkdir subnet
cd subnet && go mod init
go work use -r ./src/
kubebuilder init --domain quanheng.dev
kubebuilder create api --group batch --version v1alpha1 --kind Subnet

1.2 设计api

/*
Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// 在每一个结构体或字段上面添加注释,这些注释会被转换成文档,可以通过kubectl explain来进行查看
// 期望状态的结构
type SubnetSpec struct {
	// 给定一个集群使用的CIDR列表
	CIDR []string `json:"cidr,omitempty"`
}

// 实际状态的结构
type SubnetStatus struct {
	// 目前使用的CIDR
	CIDR []string `json:"cidr,omitempty"`
	// 可用IP数量
	AvailableIPs int `json:"availableIPs,omitempty"`
	// 已使用IP数量
	UsedIPs int `json:"usedIPs,omitempty"`
	// 总IP数量
	Total int `json:"total,omitempty"`
	// 网关
	Gateway []string `json:"gateway,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=sub
// +kubebuilder:printcolumn:name="CIDR",type="string",JSONPath=".status.cidr"
// +kubebuilder:printcolumn:name="Total",type="string",JSONPath=".status.total"
// +kubebuilder:printcolumn:name="GATEWAY",type="string",JSONPath=".status.gateway"
// +kubebuilder:printcolumn:name="UsedIP",type="string",JSONPath=".status.usedIPs"
// +kubebuilder:printcolumn:name="AvailableIPs",type="string",JSONPath=".status.availableIPs"
type Subnet struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata,omitempty"`

	Spec   SubnetSpec   `json:"spec,omitempty"`
	Status SubnetStatus `json:"status,omitempty"`
}

// +kubebuilder:object:root=true

type SubnetList struct {
	metav1.TypeMeta `json:",inline"`
	metav1.ListMeta `json:"metadata,omitempty"`
	Items           []Subnet `json:"items"`
}

func init() {
	SchemeBuilder.Register(&Subnet{}, &SubnetList{})
}

1.3 实现控制器

/*
Copyright 2023.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controller

import (
	"context"
	"fmt"
	"net"
	"os"
	"time"

	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	ctrl "sigs.k8s.io/controller-runtime"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"sigs.k8s.io/controller-runtime/pkg/log"

	batchv1alpha1 "oprator/subnet/api/v1alpha1"
)

// SubnetReconciler reconciles a Subnet object
type SubnetReconciler struct {
	client.Client
	Scheme *runtime.Scheme
}

// 注意如果此处导入其他资源,那么需要在RBAC中添加对应的权限
// crd操作权限
//+kubebuilder:rbac:groups=batch.quanheng.dev,resources=subnets,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=batch.quanheng.dev,resources=subnets/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=batch.quanheng.dev,resources=subnets/finalizers,verbs=update
// pod操作权限
//+kubebuilder:rbac:groups="",resources=pods,verbs=get;list;watch

func (r *SubnetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
	// 1、初始化一个日志记录器
	log := log.FromContext(ctx)
	// 2、更新子网状态
	var sublist batchv1alpha1.SubnetList
	var subnet batchv1alpha1.Subnet
	// 2.1 获取subnet资源
	if err := r.Get(ctx, req.NamespacedName, &subnet); err != nil {
		log.Error(err, "无法获取子网列表")
		return ctrl.Result{}, err
	}
	// 2.2 列出所有子网
	if err := r.List(ctx, &sublist); err != nil {
		log.Error(err, "无法列出使用的网段列表")
		return ctrl.Result{}, err
	}
	// 2.3 更新CIDR
	var cidrlist []string
	for {
		cidrlist = []string{}
		subnet.Status.CIDR = cidrlist
		log.Info("更新Subnet")
		for _, sub := range sublist.Items {
			cidrlist = append(cidrlist, sub.Spec.CIDR...)
			subnet.Status.CIDR = cidrlist
			// 2.4 更新总可用IP数量
			subnet.Status.Total, _ = getTotalip(cidrlist)
			// 2.5 更新Gateway
			gw, _ := getGw(cidrlist)
			subnet.Status.Gateway = gw
			// 2.6 更新已使用IP数量
			subnet.Status.UsedIPs, _ = getUsedip(cidrlist)
			// 2.7 更新可用IP数量
			subnet.Status.AvailableIPs = subnet.Status.Total - subnet.Status.UsedIPs
			// 更新subnet状态
			if err := r.Status().Update(ctx, &subnet); err != nil {
				log.Error(err, "无法更新subnet状态")
				return ctrl.Result{}, err

			}
		}
		log.Info("等待10秒")
		time.Sleep(10 * time.Second)
	}
}
func getTotalip(cidr []string) (int, error) {
	iptotal := 0
	for _, gw := range cidr {
		_, ipnet, err := net.ParseCIDR(gw)
		if err != nil {
			panic(err)
		}
		ones, bits := ipnet.Mask.Size()

		iptotal += 1<<uint(bits-ones) - 2

	}
	return iptotal, nil
}

func getGw(cidr []string) ([]string, error) {
	var gwlist []string
	for _, gw := range cidr {
		_, ipnet, err := net.ParseCIDR(gw)
		if err != nil {
			panic(err)
		}
		gwlist = append(gwlist, ipnet.IP.To4().String()[:len(ipnet.IP.String())-1]+"1")
	}
	return gwlist, nil
}

func getUsedip(cidr []string) (int, error) {
	ipused := 0
	serviceAccountPath := os.Getenv("SERVICE_ACCOUNT_PATH")
	if serviceAccountPath == "" {
		serviceAccountPath = "/var/run/secrets/kubernetes.io/serviceaccount"
	}

	// 使用 in-cluster 配置创建 Kubernetes 配置
	config, err := rest.InClusterConfig()
	if err != nil {
		fmt.Printf("Error creating in-cluster config: %v\n", err)
	}

	clientset, err := kubernetes.NewForConfig(config)
	if err != nil {
		fmt.Printf("Error creating clientset: %v\n", err)
		os.Exit(1)
	}
	// 获取Pods
	pods, err := clientset.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		fmt.Printf("Error getting Pods: %v\n", err)
		os.Exit(1)
	}
	ipused = len(pods.Items)
	return ipused, nil
}

func (r *SubnetReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&batchv1alpha1.Subnet{}).
		Complete(r)
}

1.4 实现webhook

创建webhook
kubebuilder create webhook --group batch --version v1alpha1 --kind subnet --defaulting --programmatic-validation
	--defaulting 默认的webhook,一般用来实现对配置的默认信息注入,比如若不配置某字段,将其设置为默认值
	--programmatic-validation 开启验证,字段格式必须符合自定义规则
实现 defaulting 和 validation 接口
# 安装cert-manager 以实现webhooktls证书自动安装
见2.3.9