網站首頁 編程語言 正文
一、背景
在kubernetes的世界中,很多組件僅僅需要一個實例在運行,比如controller-manager或第三方的controller,但是為了高可用性,需要組件有多個副本,在發生故障的時候需要自動切換。因此,需要利用leader election的機制多副本部署,單實例運行的模式。應用程序可以使用外部的組件比如ZooKeeper或Etcd等中間件進行leader eleaction, ZooKeeper的實現是采用臨時節點的方案,臨時節點存活與客戶端與ZooKeeper的會話期間,在會話結束后,臨時節點會被立刻刪除,臨時節點被刪除后,其他處于被動狀態的服務實例會競爭生成臨時節點,生成臨時節點的客戶端(服務實例)就變成Leader,從而保證整個集群中只有一個活躍的實例,在發生故障的時候,也能快速的實現主從之間的遷移。Etcd是一個分布式的kv存儲組件,利用Raft協議維護副本的狀態服務,Etcd的Revision機制可以實現分布式鎖的功能,Etcd的concurrency利用的分布式鎖的能力實現了選Leader的功能(本文更多關注的是k8s本身的能力,Etcd的concurrency機制不做詳細介紹)。
kubernetes使用的Etcd作為底層的存儲組件,因此我們是不是有可能利用kubernetes的API實現選leader的功能呢?其實kubernetes的SIG已經提供了這方面的能力,主要是通過configmap/lease/endpoint的資源實現選Leader的功能。
二、官網代碼示例
kubernetes官方提供了一個使用的例子,源碼在:github.com/kubernetes/…
選舉的過程中,每個實例的狀態有可能是:
- 選擇成功->運行業務代碼
- 等待狀態,有其他實例成為了leader。當leader放棄鎖后,此狀態的實例有可能會成為新的leader
- 釋放leader的鎖,在運行的業務代碼退出
在穩定的環境中,實例一旦成為了leader,通常情況是不會釋放鎖的,會保持一直運行的狀態,這樣有利于業務的穩定和Controller快速的對資源的狀態變化做成相應的操作。只有在網絡不穩定或誤操作刪除實例的情況下,才會觸發leader的重新選舉。
kubernetes官方提供的選舉例子詳解如下:
package main import ( "context" "flag" "os" "os/signal" "syscall" "time" "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/klog/v2" ) func buildConfig(kubeconfig string) (*rest.Config, error) { if kubeconfig != "" { cfg, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { return nil, err } return cfg, nil } cfg, err := rest.InClusterConfig() if err != nil { return nil, err } return cfg, nil } func main() { klog.InitFlags(nil) var kubeconfig string var leaseLockName string var leaseLockNamespace string var id string // kubeconfig 指定了kubernetes集群的配置文文件路徑 flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file") // 鎖的擁有者的ID,如果沒有傳參數進來,就隨機生成一個 flag.StringVar(&id, "id", uuid.New().String(), "the holder identity name") // 鎖的ID,對應kubernetes中資源的name flag.StringVar(&leaseLockName, "lease-lock-name", "", "the lease lock resource name") // 鎖的命名空間 flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace") // 解析命令行參數 flag.Parse() if leaseLockName == "" { klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag).") } if leaseLockNamespace == "" { klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag).") } // leader election uses the Kubernetes API by writing to a // lock object, which can be a LeaseLock object (preferred), // a ConfigMap, or an Endpoints (deprecated) object. // Conflicting writes are detected and each client handles those actions // independently. config, err := buildConfig(kubeconfig) if err != nil { klog.Fatal(err) } // 獲取kubernetes集群的客戶端,如果獲取不到,就拋異常退出 client := clientset.NewForConfigOrDie(config) // 模擬Controller的邏輯代碼 run := func(ctx context.Context) { // complete your controller loop here klog.Info("Controller loop...") // 不退出 select {} } // use a Go context so we can tell the leaderelection code when we // want to step down ctx, cancel := context.WithCancel(context.Background()) defer cancel() // listen for interrupts or the Linux SIGTERM signal and cancel // our context, which the leader election code will observe and // step down // 處理系統的系統,收到SIGTERM信號后,會退出進程 ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt, syscall.SIGTERM) go func() { <-ch klog.Info("Received termination, signaling shutdown") cancel() }() // we use the Lease lock type since edits to Leases are less common // and fewer objects in the cluster watch "all Leases". // 根據參數,生成鎖。這里使用的Lease這種類型資源作為鎖 lock := &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: leaseLockName, Namespace: leaseLockNamespace, }, // 跟kubernetes集群關聯起來 Client: client.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, }, } // start the leader election code loop // 注意,選舉邏輯啟動時候,會傳入ctx參數,如果ctx對應的cancel函數被調用,那么選舉也會結束 leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ // 選舉使用的鎖 Lock: lock, // IMPORTANT: you MUST ensure that any code you have that // is protected by the lease must terminate **before** // you call cancel. Otherwise, you could have a background // loop still running and another process could // get elected before your background loop finished, violating // the stated goal of the lease. //主動放棄leader,當ctx canceled的時候 ReleaseOnCancel: true, LeaseDuration: 60 * time.Second, // 選舉的任期,60s一個任期,如果在60s后沒有renew,那么leader就會釋放鎖,重新選舉 RenewDeadline: 15 * time.Second, // renew的請求的超時時間 RetryPeriod: 5 * time.Second, // leader獲取到鎖后,renew leadership的間隔。非leader,搶鎖成為leader的間隔(有1.2的jitter因子,詳細看代碼) // 回調函數的注冊 Callbacks: leaderelection.LeaderCallbacks{ // 成為leader的回調 OnStartedLeading: func(ctx context.Context) { // we're notified when we start - this is where you would // usually put your code // 運行controller的邏輯 run(ctx) }, OnStoppedLeading: func() { // we can do cleanup here // 退出leader的 klog.Infof("leader lost: %s", id) os.Exit(0) }, OnNewLeader: func(identity string) { // 有新的leader當選 // we're notified when new leader elected if identity == id { // I just got the lock return } klog.Infof("new leader elected: %s", identity) }, }, }) }
啟動一個實例,觀察日志輸出和kubernetes集群上的lease資源,啟動命令
go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1
可以看到,日志有輸出,id=1的實例獲取到資源了。
go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=1 I1023 17:00:21.670298 94227 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:00:21.784234 94227 leaderelection.go:258] successfully acquired lease default/example I1023 17:00:21.784316 94227 main.go:78] Controller loop...
在kubernetes的集群上,看到
我們接著啟動一個實例,id=2,日志中輸出
go run main.go --kubeconfig=/tmp/test-kubeconfig.yaml -logtostderr=true -lease-lock-name=example -lease-lock-namespace=default -id=2 I1023 17:05:00.555145 95658 leaderelection.go:248] attempting to acquire leader lease default/example... I1023 17:05:00.658202 95658 main.go:151] new leader elected: 1
可以看出,id=2的實例,沒有獲取到鎖,并且觀察到id=1的鎖獲取到了實例。接著我們嘗試退出id=1的實例,觀察id=2的實例是否會成為新的leader
三、鎖的實現
kubernets的資源都可以實現Get/Create/Update的操作,因此,理論上所有的資源都可以作為鎖的底層。kubernetes 提供了Lease/Configmap/Endpoint作為鎖的底層。
鎖的狀態轉移如下:
鎖需要實現以下的接口
type Interface interface { // Get returns the LeaderElectionRecord Get(ctx context.Context) (*LeaderElectionRecord, []byte, error) // Create attempts to create a LeaderElectionRecord Create(ctx context.Context, ler LeaderElectionRecord) error // Update will update and existing LeaderElectionRecord Update(ctx context.Context, ler LeaderElectionRecord) error // RecordEvent is used to record events RecordEvent(string) // Identity will return the locks Identity Identity() string // Describe is used to convert details on current resource lock // into a string Describe() string }
理論上,有Get/Create/Update三個方法,就可以實現鎖的機制了。但是,需要保證update和create操作的原子性,這個就是kuberenetes的機制保證了。第二章的官網代碼例子中,leaderelection.RunOrDie
使用的RunOrDie接口,其實就是調用Run接口,而Run接口實現非常簡單:
func (le *LeaderElector) Run(ctx context.Context) { defer runtime.HandleCrash() defer func() { le.config.Callbacks.OnStoppedLeading() }() // 獲取鎖,如果沒有獲取到,就一直等待 if !le.acquire(ctx) { return // ctx signalled done } ctx, cancel := context.WithCancel(ctx) defer cancel() // 獲取到鎖后,需要調用回調函數中的OnStartedLeading,運行controller的代碼 go le.config.Callbacks.OnStartedLeading(ctx) // 獲取到鎖后,需要不斷地進行renew操作 le.renew(ctx) }
LeaderElector關鍵是需要acquire和renew的操作,acquire和renew操作代碼如下:
func (le *LeaderElector) acquire(ctx context.Context) bool { ctx, cancel := context.WithCancel(ctx) defer cancel() succeeded := false desc := le.config.Lock.Describe() klog.Infof("attempting to acquire leader lease %v...", desc) // 此接口會阻塞,利用定時的機制,獲取鎖,如果獲取不到一直循環,除非ctx被取消。 wait.JitterUntil(func() { // 獲取鎖 succeeded = le.tryAcquireOrRenew(ctx) le.maybeReportTransition() if !succeeded { klog.V(4).Infof("failed to acquire lease %v", desc) return } le.config.Lock.RecordEvent("became leader") le.metrics.leaderOn(le.config.Name) klog.Infof("successfully acquired lease %v", desc) cancel() }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) return succeeded } // renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. func (le *LeaderElector) renew(ctx context.Context) { ctx, cancel := context.WithCancel(ctx) defer cancel() // 循環renew機制,renew成功,不會返回true,導致Until會不斷循環 wait.Until(func() { //RenewDeadline的實現在這里,如果renew超過了RenewDeadline,會導致renew失敗,主退出 timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) defer timeoutCancel() err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { // renew鎖 return le.tryAcquireOrRenew(timeoutCtx), nil }, timeoutCtx.Done()) le.maybeReportTransition() desc := le.config.Lock.Describe() if err == nil { klog.V(5).Infof("successfully renewed lease %v", desc) // renew成功 return } le.config.Lock.RecordEvent("stopped leading") le.metrics.leaderOff(le.config.Name) klog.Infof("failed to renew lease %v: %v", desc, err) cancel() }, le.config.RetryPeriod, ctx.Done()) // if we hold the lease, give it up if le.config.ReleaseOnCancel { le.release() } }
關鍵的實現在于tryAcquireOrRenew,而tryAcquireOrRenew就是依賴鎖的狀態轉移機制完成核心邏輯。
func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool { now := metav1.Now() leaderElectionRecord := rl.LeaderElectionRecord{ HolderIdentity: le.config.Lock.Identity(), LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), RenewTime: now, AcquireTime: now, } // 1. obtain or create the ElectionRecord // 檢查鎖有沒有 oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx) if err != nil { // 沒有鎖的資源,就創建一個 if !errors.IsNotFound(err) { klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) return false } if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil { klog.Errorf("error initially creating leader election record: %v", err) return false } //對外宣稱自己成為了leader le.setObservedRecord(&leaderElectionRecord) return true } // 2. Record obtained, check the Identity & Time if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) { // 這個機制很重要,會如果leader會不斷正常renew這個鎖,oldLeaderElectionRawRecord會一直發生變化,發生變化會更新le.observedTime le.setObservedRecord(oldLeaderElectionRecord) le.observedRawRecord = oldLeaderElectionRawRecord } // 如果還沒超時并且此實例不是leader(leader是其他實例),那么就直接退出 if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && !le.IsLeader() { klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) return false } // 3. We're going to try to update. The leaderElectionRecord is set to it's default // here. Let's correct it before updating. // 如果是leader,就更新時間RenewTime,保證其他實例(非主)可以觀察到:主還活著 if le.IsLeader() { leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions } else { // 不是leader,那么鎖就發生了轉移 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 } // 更新鎖 // update the lock itself if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil { klog.Errorf("Failed to update lock: %v", err) return false } le.setObservedRecord(&leaderElectionRecord) return true }
原文鏈接:https://juejin.cn/post/7157648925078323207
相關推薦
- 2022-05-24 C++實現路口交通燈模擬系統_C 語言
- 2022-10-29 CSS 漸變彩色字體
- 2023-03-27 詳解如何在React中優雅的使用addEventListener_React
- 2022-08-19 Python截取字符串的簡單方法實例_python
- 2022-06-08 FreeRTOS實時操作系統的任務創建和刪除_操作系統
- 2022-07-04 C#?WinForm制作登錄界面的實現步驟_C#教程
- 2022-08-19 Linux系統文件目錄介紹
- 2022-05-02 Python的os包與os.path模塊的用法詳情_python
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支