網(wǎng)站首頁 編程語言 正文
Run
確立目標(biāo)
理解 kube-controller-manager 的運(yùn)行機(jī)制
從主函數(shù)找到run函數(shù),代碼較長,這里精簡了一下
func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { // configz 模塊,在kube-scheduler分析中已經(jīng)了解 if cfgz, err := configz.New(ConfigzName); err == nil { cfgz.Set(c.ComponentConfig) } else { klog.Errorf("unable to register configz: %v", err) } // 健康監(jiān)測與http服務(wù),跳過 var checks []healthz.HealthChecker var unsecuredMux *mux.PathRecorderMux run := func(ctx context.Context) { rootClientBuilder := controller.SimpleControllerClientBuilder{ ClientConfig: c.Kubeconfig, } // client認(rèn)證相關(guān) var clientBuilder controller.ControllerClientBuilder // 創(chuàng)建controller的上下文context controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done()) if err != nil { klog.Fatalf("error building controller context: %v", err) } saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil { klog.Fatalf("error starting controllers: %v", err) } // 這里的 InformerFactory 和我們?cè)趉ube-scheduler中看的 SharedInformerFactory 基本一致 controllerContext.InformerFactory.Start(controllerContext.Stop) controllerContext.ObjectOrMetadataInformerFactory.Start(controllerContext.Stop) close(controllerContext.InformersStarted) select {} } // 是否進(jìn)行選舉 if !c.ComponentConfig.Generic.LeaderElection.LeaderElect { run(context.TODO()) panic("unreachable") } // 拼接出一個(gè)全局唯一的id id, err := os.Hostname() if err != nil { return err } id = id + "_" + string(uuid.NewUUID()) rl, err := resourcelock.New(c.ComponentConfig.Generic.LeaderElection.ResourceLock, c.ComponentConfig.Generic.LeaderElection.ResourceNamespace, c.ComponentConfig.Generic.LeaderElection.ResourceName, c.LeaderElectionClient.CoreV1(), c.LeaderElectionClient.CoordinationV1(), resourcelock.ResourceLockConfig{ Identity: id, EventRecorder: c.EventRecorder, }) if err != nil { klog.Fatalf("error creating lock: %v", err) } // 正常情況下都是阻塞在RunOrDie這個(gè)函數(shù)中,不停地進(jìn)行選舉相關(guān)的工作 leaderelection.RunOrDie(context.TODO(), leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: c.ComponentConfig.Generic.LeaderElection.LeaseDuration.Duration, RenewDeadline: c.ComponentConfig.Generic.LeaderElection.RenewDeadline.Duration, RetryPeriod: c.ComponentConfig.Generic.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ // 開始成為Leader的時(shí)候,調(diào)用run函數(shù) OnStartedLeading: run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, }, WatchDog: electionChecker, Name: "kube-controller-manager", }) panic("unreachable") }
StartControllers
func StartControllers(ctx ControllerContext, startSATokenController InitFunc, controllers map[string]InitFunc, unsecuredMux *mux.PathRecorderMux) error { // 關(guān)鍵性的循環(huán),啟動(dòng)每個(gè)controllers,key為控制器名字,value為初始化函數(shù) for controllerName, initFn := range controllers { // 是否允許啟動(dòng) if !ctx.IsControllerEnabled(controllerName) { klog.Warningf("%q is disabled", controllerName) continue } time.Sleep(wait.Jitter(ctx.ComponentConfig.Generic.ControllerStartInterval.Duration, ControllerStartJitter)) klog.V(1).Infof("Starting %q", controllerName) // 調(diào)用init函數(shù)進(jìn)行啟動(dòng) debugHandler, started, err := initFn(ctx) if err != nil { klog.Errorf("Error starting %q", controllerName) return err } if !started { klog.Warningf("Skipping %q", controllerName) continue } // 注冊(cè)對(duì)應(yīng)controller到debug的url中 if debugHandler != nil && unsecuredMux != nil { basePath := "/debug/controllers/" + controllerName unsecuredMux.UnlistedHandle(basePath, http.StripPrefix(basePath, debugHandler)) unsecuredMux.UnlistedHandlePrefix(basePath+"/", http.StripPrefix(basePath, debugHandler)) } klog.Infof("Started %q", controllerName) } return nil } // 我們?cè)偃魅隿ontroller的函數(shù)去看看,對(duì)應(yīng)的controller有哪些,這里有我們很多常見的概念,不一一細(xì)講 func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc { controllers := map[string]InitFunc{} controllers["endpoint"] = startEndpointController controllers["endpointslice"] = startEndpointSliceController controllers["endpointslicemirroring"] = startEndpointSliceMirroringController controllers["replicationcontroller"] = startReplicationController controllers["podgc"] = startPodGCController controllers["resourcequota"] = startResourceQuotaController controllers["namespace"] = startNamespaceController controllers["serviceaccount"] = startServiceAccountController controllers["garbagecollector"] = startGarbageCollectorController controllers["daemonset"] = startDaemonSetController controllers["job"] = startJobController controllers["deployment"] = startDeploymentController controllers["replicaset"] = startReplicaSetController controllers["horizontalpodautoscaling"] = startHPAController controllers["disruption"] = startDisruptionController controllers["statefulset"] = startStatefulSetController controllers["cronjob"] = startCronJobController controllers["csrsigning"] = startCSRSigningController controllers["csrapproving"] = startCSRApprovingController controllers["csrcleaner"] = startCSRCleanerController controllers["ttl"] = startTTLController controllers["bootstrapsigner"] = startBootstrapSignerController controllers["tokencleaner"] = startTokenCleanerController controllers["nodeipam"] = startNodeIpamController controllers["nodelifecycle"] = startNodeLifecycleController if loopMode == IncludeCloudLoops { controllers["service"] = startServiceController controllers["route"] = startRouteController controllers["cloud-node-lifecycle"] = startCloudNodeLifecycleController } controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController controllers["persistentvolume-expander"] = startVolumeExpandController controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController controllers["pvc-protection"] = startPVCProtectionController controllers["pv-protection"] = startPVProtectionController controllers["ttl-after-finished"] = startTTLAfterFinishedController controllers["root-ca-cert-publisher"] = startRootCACertPublisher controllers["ephemeral-volume"] = startEphemeralVolumeController return controllers }
ReplicaSet
由于我們的示例是創(chuàng)建一個(gè)nginx的pod,涉及到kube-controller-manager的內(nèi)容很少。
但是,為了加深大家對(duì) kube-controller-manager 的認(rèn)識(shí),我們引入一個(gè)新的概念 - ReplicaSet,下面是官方說明:
A ReplicaSet’s purpose is to maintain a stable set of replica Pods running at any given time. As such, it is often used to guarantee the availability of a specified number of identical Pods.
ReplicaSet 的目的是維護(hù)一組在任何時(shí)候都處于運(yùn)行狀態(tài)的 Pod 副本的穩(wěn)定集合。 因此,它通常用來保證給定數(shù)量的、完全相同的 Pod 的可用性。
簡單來說,ReplicaSet 就是用來生成指定個(gè)數(shù)的Pod
代碼在pkg/controller/replica_set.go
ReplicaSetController
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) { if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] { return nil, false, nil } // 用goroutine異步運(yùn)行,包含了 ReplicaSet和Pod 的兩個(gè)Informer // 這一點(diǎn)很好理解:我們是要控制ReplicaSet聲明的數(shù)量和運(yùn)行的Pod數(shù)量一致,需要同時(shí)觀察者兩種資源 go replicaset.NewReplicaSetController( ctx.InformerFactory.Apps().V1().ReplicaSets(), ctx.InformerFactory.Core().V1().Pods(), ctx.ClientBuilder.ClientOrDie("replicaset-controller"), replicaset.BurstReplicas, ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop) return nil, true, nil } // 運(yùn)行函數(shù) func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer rsc.queue.ShutDown() controllerName := strings.ToLower(rsc.Kind) klog.Infof("Starting %v controller", controllerName) defer klog.Infof("Shutting down %v controller", controllerName) if !cache.WaitForNamedCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) { return } for i := 0; i < workers; i++ { // 工作的函數(shù) go wait.Until(rsc.worker, time.Second, stopCh) } <-stopCh } func (rsc *ReplicaSetController) worker() { // 繼續(xù)查找實(shí)現(xiàn) for rsc.processNextWorkItem() { } } func (rsc *ReplicaSetController) processNextWorkItem() bool { // 這里也有個(gè)queue的概念,可以類比kube-scheduler中的實(shí)現(xiàn) // 不同的是,這里的queue是 workqueue.RateLimitingInterface ,也就是限制速率的,具體實(shí)現(xiàn)今天不細(xì)看 // 獲取元素 key, quit := rsc.queue.Get() if quit { return false } defer rsc.queue.Done(key) // 處理對(duì)應(yīng)的元素 err := rsc.syncHandler(key.(string)) if err == nil { rsc.queue.Forget(key) return true } utilruntime.HandleError(fmt.Errorf("sync %q failed with %v", key, err)) rsc.queue.AddRateLimited(key) return true } // 再回過頭,去查看syncHandler的具體實(shí)現(xiàn) func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int, gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController { rsc.syncHandler = rsc.syncReplicaSet return rsc }
syncReplicaSet
func (rsc *ReplicaSetController) syncReplicaSet(key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing %v %q (%v)", rsc.Kind, key, time.Since(startTime)) }() // 從key中拆分出 namespace 和 name namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } // 根據(jù)name,從 Lister 獲取對(duì)應(yīng)的 ReplicaSets 信息 rs, err := rsc.rsLister.ReplicaSets(namespace).Get(name) if errors.IsNotFound(err) { klog.V(4).Infof("%v %v has been deleted", rsc.Kind, key) rsc.expectations.DeleteExpectations(key) return nil } if err != nil { return err } rsNeedsSync := rsc.expectations.SatisfiedExpectations(key) // 獲取 selector (k8s 是根據(jù)selector中的label來匹配 ReplicaSets 和 Pod 的) selector, err := metav1.LabelSelectorAsSelector(rs.Spec.Selector) if err != nil { utilruntime.HandleError(fmt.Errorf("error converting pod selector to selector: %v", err)) return nil } // 根據(jù)namespace和labels獲取所有的pod allPods, err := rsc.podLister.Pods(rs.Namespace).List(labels.Everything()) if err != nil { return err } // 過濾無效的pod filteredPods := controller.FilterActivePods(allPods) // 根據(jù)selector再過濾pod filteredPods, err = rsc.claimPods(rs, selector, filteredPods) if err != nil { return err } var manageReplicasErr error if rsNeedsSync && rs.DeletionTimestamp == nil { // 管理 ReplicaSet,下面詳細(xì)分析 manageReplicasErr = rsc.manageReplicas(filteredPods, rs) } rs = rs.DeepCopy() newStatus := calculateStatus(rs, filteredPods, manageReplicasErr) // 更新狀態(tài) updatedRS, err := updateReplicaSetStatus(rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace), rs, newStatus) if err != nil { return err } if manageReplicasErr == nil && updatedRS.Spec.MinReadySeconds > 0 && updatedRS.Status.ReadyReplicas == *(updatedRS.Spec.Replicas) && updatedRS.Status.AvailableReplicas != *(updatedRS.Spec.Replicas) { rsc.queue.AddAfter(key, time.Duration(updatedRS.Spec.MinReadySeconds)*time.Second) } return manageReplicasErr } // 我們?cè)僖黄鹂纯矗?dāng)Pod數(shù)量和ReplicaSet中聲明的不同時(shí),是怎么工作的 func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { // diff = 當(dāng)前pod數(shù) - 期望pod數(shù) diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { utilruntime.HandleError(fmt.Errorf("couldn't get key for %v %#v: %v", rsc.Kind, rs, err)) return nil } // diff小于0,表示需要擴(kuò)容,即新增Pod if diff < 0 { // 具體的實(shí)現(xiàn)暫時(shí)不細(xì)看 // diff 大于0,即需要縮容 } else if diff > 0 { } return nil }
站在前人的肩膀上,向前輩致敬,Respect!
Summary
- kube-controller-manager 的核心思想是: 根據(jù)期望狀態(tài)和當(dāng)前狀態(tài),管理Kubernetes中的資源。 以ReplicaSet為例,它對(duì)比了定義聲明的Pod數(shù)和當(dāng)前集群中滿足條件的Pod數(shù),進(jìn)行相對(duì)應(yīng)的擴(kuò)縮容。
原文鏈接:https://juejin.cn/post/7169189388701433864
相關(guān)推薦
- 2022-07-17 gethostbyaddr在Python3中引發(fā)UnicodeDecodeError_python
- 2023-02-04 golang?Gorm框架講解_Golang
- 2022-09-09 C#流程控制詳解_C#教程
- 2024-03-24 IDEA配置熱啟動(dòng)
- 2022-03-22 C++using聲明和using編譯指令_C 語言
- 2022-09-29 React實(shí)現(xiàn)下拉框的key,value的值同時(shí)傳送_React
- 2022-03-31 python常用內(nèi)置模塊你了解嗎_python
- 2022-07-01 Nginx的gzip指令使用小結(jié)_nginx
- 最近更新
-
- window11 系統(tǒng)安裝 yarn
- 超詳細(xì)win安裝深度學(xué)習(xí)環(huán)境2025年最新版(
- Linux 中運(yùn)行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲(chǔ)小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎(chǔ)操作-- 運(yùn)算符,流程控制 Flo
- 1. Int 和Integer 的區(qū)別,Jav
- spring @retryable不生效的一種
- Spring Security之認(rèn)證信息的處理
- Spring Security之認(rèn)證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權(quán)
- redisson分布式鎖中waittime的設(shè)
- maven:解決release錯(cuò)誤:Artif
- restTemplate使用總結(jié)
- Spring Security之安全異常處理
- MybatisPlus優(yōu)雅實(shí)現(xiàn)加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務(wù)發(fā)現(xiàn)-Nac
- Spring Security之基于HttpR
- Redis 底層數(shù)據(jù)結(jié)構(gòu)-簡單動(dòng)態(tài)字符串(SD
- arthas操作spring被代理目標(biāo)對(duì)象命令
- Spring中的單例模式應(yīng)用詳解
- 聊聊消息隊(duì)列,發(fā)送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠(yuǎn)程分支