網站首頁 編程語言 正文
確立目標
- 理解啟動kube-apiserver的權限相關的三個核心概念?
Authentication
/Authorization
/Admission
分別是認證,授權,準入 - 理解kube-apiserver是中的管理核心資源的
KubeAPIServer
是怎么啟動的 - 理解Pod發送到
kube-apiserver
后是怎么保存的
Run
kube-apiserver的啟動 代碼在cmd/kube-apiserver
// 類似kubectl的源代碼,kube-apiserver的命令行工具也使用了cobra,我們很快就能找到啟動的入口 RunE: func(cmd *cobra.Command, args []string) error { // 這里包含2個參數,前者是參數completedOptions,后者是一個stopCh <-chan struct{} return Run(completedOptions, genericapiserver.SetupSignalHandler()) } /* 在這里,我們可以和kubectl結合起來思考: kubectl是一個命令行工具,執行完命令就退出;kube-apiserver是一個常駐的服務器進程,監聽端口 這里引入了一個stopCh <-chan struct{},可以在啟動后,用一個 <-stopCh 作為阻塞,使程序不退出 用channel阻塞進程退出,對比傳統的方法 - 用一個永不退出的for循環,是一個很優雅的實現 */ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error { // 這里進行創建服務鏈 server, err := CreateServerChain(completeOptions) if err != nil { return err } prepared, err := server.PrepareRun() if err != nil { return err } return prepared.Run(stopCh) }
Three Servers
// 在CreateServerChain這個函數下,創建了3個server func CreateServerChain(){ // API擴展服務,主要針對CRD createAPIExtensionsServer(){} // API核心服務,包括常見的Pod/Deployment/Service,我們今天的重點聚焦在這里 // 我會跳過很多非核心的配置參數,一開始就去研究細節,很影響整體代碼的閱讀效率 CreateKubeAPIServer(){} // API聚合服務,主要針對metrics createAggregatorServer(){} //細節是第二個ApiServer需要第一個server的配置,第三個server會要第二個server的配置,最后返回的是聚合server // 這些server的config都是由一個GenericConfig和一個ExtraConfig組成 有自己的特點和鏈上的 return aggregatorServer, nil }
KubeAPIServer
// 創建配置的流程 func CreateKubeAPIServerConfig(){ // 創建通用配置genericConfig genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport) }
GenericConfig
// 通用配置的創建 func buildGenericConfig(s *options.ServerRunOptions,proxyTransport *http.Transport){ // Insecure對應的非安全的通信,也就是HTTP if lastErr = s.InsecureServing... // Secure對應的就是HTTPS if lastErr = s.SecureServing... // OpenAPIConfig是對外提供的API文檔 genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig() // 這一塊是storageFactory的實例化,可以看到采用的是etcd作為存儲方案 storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig() storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd) storageFactory, lastErr = completedStorageFactoryConfig.New() // Authentication 認證相關 if lastErr = s.Authentication.ApplyTo()... // Authorization 授權相關 genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer() // Admission 準入機制 err = s.Admission.ApplyTo() }
Authentication
func (o *BuiltInAuthenticationOptions) ApplyTo(){ // 前面都是對認證config進行參數設置,這里才是真正的實例化 authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New() } // New這塊的代碼,我們要抓住核心變量authenticators和tokenAuthenticators,也就是各種認證方法 func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) { // 核心變量authenticators和tokenAuthenticators var authenticators []authenticator.Request var tokenAuthenticators []authenticator.Token if config.RequestHeaderConfig != nil { // 1. 添加requestHeader authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator)) } if config.ClientCAContentProvider != nil { // 2. 添加ClientCA authenticators = append(authenticators, certAuth) } if len(config.TokenAuthFile) > 0 { // 3. token 添加tokenfile tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth)) } // 4. token 添加 service account,分兩種來源 if len(config.ServiceAccountKeyFiles) > 0 { tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth) } if utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) && config.ServiceAccountIssuer != "" { tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth) } if config.BootstrapToken { if config.BootstrapTokenAuthenticator != nil { // 5. token 添加 bootstrap tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator)) } } if len(config.OIDCIssuerURL) > 0 && len(config.OIDCClientID) > 0 { // 6. token 添加 oidc Authenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, oidcAuth)) } if len(config.WebhookTokenAuthnConfigFile) > 0 { // 7. token 添加 webhook tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth) } // 8. 組合tokenAuthenticators到tokenAuthenticators中 if len(tokenAuthenticators) > 0 { tokenAuth := tokenunion.New(tokenAuthenticators...) if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 { tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL) } authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth)) } // 9. 沒有任何認證方式且啟用了Anonymous if len(authenticators) == 0 { if config.Anonymous { return anonymous.NewAuthenticator(), &securityDefinitions, nil } return nil, &securityDefinitions, nil } // 10. 組合authenticators authenticator := union.New(authenticators...) return authenticator, &securityDefinitions, nil }
復雜的Authentication模塊的初始化順序我們看完了,有初步的了解即可,沒必要去強制記憶其中的加載順序。
Authorization
func BuildAuthorizer(){ // 與上面一致,實例化是在這個New中 return authorizationConfig.New() } // 不得不說,Authorizer這塊的閱讀體驗更好 func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) { // 必須傳入一個Authorizer機制 if len(config.AuthorizationModes) == 0 { return nil, nil, fmt.Errorf("at least one authorization mode must be passed") } var ( authorizers []authorizer.Authorizer ruleResolvers []authorizer.RuleResolver ) for _, authorizationMode := range config.AuthorizationModes { // 具體的mode定義,可以跳轉到對應的鏈接去看,不細講 switch authorizationMode { case modes.ModeNode: authorizers = append(authorizers, nodeAuthorizer) ruleResolvers = append(ruleResolvers, nodeAuthorizer) case modes.ModeAlwaysAllow: authorizers = append(authorizers, alwaysAllowAuthorizer) ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer) case modes.ModeAlwaysDeny: authorizers = append(authorizers, alwaysDenyAuthorizer) ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer) case modes.ModeABAC: authorizers = append(authorizers, abacAuthorizer) ruleResolvers = append(ruleResolvers, abacAuthorizer) case modes.ModeWebhook: authorizers = append(authorizers, webhookAuthorizer) ruleResolvers = append(ruleResolvers, webhookAuthorizer) case modes.ModeRBAC: authorizers = append(authorizers, rbacAuthorizer) ruleResolvers = append(ruleResolvers, rbacAuthorizer) default: return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode) } } return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil } const ( // ModeAlwaysAllow is the mode to set all requests as authorized ModeAlwaysAllow string = "AlwaysAllow" // ModeAlwaysDeny is the mode to set no requests as authorized ModeAlwaysDeny string = "AlwaysDeny" // ModeABAC is the mode to use Attribute Based Access Control to authorize ModeABAC string = "ABAC" // ModeWebhook is the mode to make an external webhook call to authorize ModeWebhook string = "Webhook" // ModeRBAC is the mode to use Role Based Access Control to authorize ModeRBAC string = "RBAC" // ModeNode is an authorization mode that authorizes API requests made by kubelets. ModeNode string = "Node" )
Admission
// 查看定義 err = s.Admission.ApplyTo() func (a *AdmissionOptions) ApplyTo(){ return a.GenericAdmission.ApplyTo() } func (ps *Plugins) NewFromPlugins(){ for _, pluginName := range pluginNames { // InitPlugin 為初始化的工作 plugin, err := ps.InitPlugin(pluginName, pluginConfig, pluginInitializer) if err != nil { return nil, err } } } func (ps *Plugins) InitPlugin(name string, config io.Reader, pluginInitializer PluginInitializer) (Interface, error){ // 獲取plugin plugin, found, err := ps.getPlugin(name, config) } // 查看一下Interface的定義,就是對準入機制的控制 抽象化的插件化的接口 服務于Admission Control // Interface is an abstract, pluggable interface for Admission Control decisions. type Interface interface { Handles(operation Operation) bool } // 再去看看獲取plugin的地方 func (ps *Plugins) getPlugin(name string, config io.Reader) (Interface, bool, error) { ps.lock.Lock() defer ps.lock.Unlock() // 我們再去研究ps.registry這個參數是在哪里被初始化的 f, found := ps.registry[name] } // 接下來,我們從kube-apiserver啟動過程,逐步找到Admission被初始化的地方 // 啟動命令 command := app.NewAPIServerCommand() // server配置 s := options.NewServerRunOptions() // admission選項 Admission: kubeoptions.NewAdmissionOptions() // 注冊準入機制 RegisterAllAdmissionPlugins(options.Plugins) // 準入機制的所有內容 func RegisterAllAdmissionPlugins(plugins *admission.Plugins){ // 這里有很多plugin的注冊 } // 往上翻,我們能找到所有plugin,也就是準入機制的定義 有三十幾種 已經進行了排序的 var AllOrderedPlugins = []string{ admit.PluginName, // AlwaysAdmit autoprovision.PluginName, // NamespaceAutoProvision lifecycle.PluginName, // NamespaceLifecycle exists.PluginName, // NamespaceExists scdeny.PluginName, // SecurityContextDeny antiaffinity.PluginName, // LimitPodHardAntiAffinityTopology limitranger.PluginName, // LimitRanger serviceaccount.PluginName, // ServiceAccount noderestriction.PluginName, // NodeRestriction nodetaint.PluginName, // TaintNodesByCondition alwayspullimages.PluginName, // AlwaysPullImages imagepolicy.PluginName, // ImagePolicyWebhook podsecurity.PluginName, // PodSecurity podnodeselector.PluginName, // PodNodeSelector podpriority.PluginName, // Priority defaulttolerationseconds.PluginName, // DefaultTolerationSeconds podtolerationrestriction.PluginName, // PodTolerationRestriction eventratelimit.PluginName, // EventRateLimit extendedresourcetoleration.PluginName, // ExtendedResourceToleration label.PluginName, // PersistentVolumeLabel setdefault.PluginName, // DefaultStorageClass storageobjectinuseprotection.PluginName, // StorageObjectInUseProtection gc.PluginName, // OwnerReferencesPermissionEnforcement resize.PluginName, // PersistentVolumeClaimResize runtimeclass.PluginName, // RuntimeClass certapproval.PluginName, // CertificateApproval certsigning.PluginName, // CertificateSigning certsubjectrestriction.PluginName, // CertificateSubjectRestriction defaultingressclass.PluginName, // DefaultIngressClass denyserviceexternalips.PluginName, // DenyServiceExternalIPs // new admission plugins should generally be inserted above here // webhook, resourcequota, and deny plugins must go at the end mutatingwebhook.PluginName, // MutatingAdmissionWebhook validatingwebhook.PluginName, // ValidatingAdmissionWebhook resourcequota.PluginName, // ResourceQuota deny.PluginName, // AlwaysDeny }
GenericAPIServer的初始化
理解kube-apiserver是中的管理核心資源的KubeAPIServer
是怎么啟動的
New
// 先對配置進行complete補全再進行new func CreateKubeAPIServer(kubeAPIServerConfig *controlplane.Config, delegateAPIServer genericapiserver.DelegationTarget) (*controlplane.Instance, error) { kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer) if err != nil { return nil, err } return kubeAPIServer, nil }
GenericServer
// 在APIExtensionsServer、KubeAPIServer和AggregatorServer三種Server啟動時,我們都能發現這么一個函數 // APIExtensionsServer genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget) // KubeAPIServer s, err := c.GenericConfig.New("kube-apiserver", delegationTarget) // AggregatorServer genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget) // 都通過GenericConfig創建了genericServer,我們先大致瀏覽下 func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) { // 新建Handler apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler()) // 實例化一個Server s := &GenericAPIServer{ ... } // 處理鉤子hook操作 for k, v := range delegationTarget.PostStartHooks() { s.postStartHooks[k] = v } for k, v := range delegationTarget.PreShutdownHooks() { s.preShutdownHooks[k] = v } // 健康監測 for _, delegateCheck := range delegationTarget.HealthzChecks() { skip := false for _, existingCheck := range c.HealthzChecks { if existingCheck.Name() == delegateCheck.Name() { skip = true break } } if skip { continue } s.AddHealthChecks(delegateCheck) } // 安裝API相關參數,這個是重點 installAPI(s, c.Config) return s, nil }
NewAPIServerHandler
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler { // 采用了 github.com/emicklei/go-restful 這個庫作為 RESTful 接口的設計,目前了解即可 gorestfulContainer := restful.NewContainer() }
installAPI
一些通用的
func installAPI(s *GenericAPIServer, c *Config) { // 添加 /index.html 路由規則 if c.EnableIndex { routes.Index{}.Install(s.listedPathProvider, s.Handler.NonGoRestfulMux) } // 添加go語言 /pprof 的路由規則,常用于性能分析 if c.EnableProfiling { routes.Profiling{}.Install(s.Handler.NonGoRestfulMux) if c.EnableContentionProfiling { goruntime.SetBlockProfileRate(1) } routes.DebugFlags{}.Install(s.Handler.NonGoRestfulMux, "v", routes.StringFlagPutHandler(logs.GlogSetter)) } // 添加監控相關的 /metrics 的指標路由規則 if c.EnableMetrics { if c.EnableProfiling { routes.MetricsWithReset{}.Install(s.Handler.NonGoRestfulMux) } else { routes.DefaultMetrics{}.Install(s.Handler.NonGoRestfulMux) } } // 添加版本 /version 的路由規則 routes.Version{Version: c.Version}.Install(s.Handler.GoRestfulContainer) // 開啟服務發現 if c.EnableDiscovery { s.Handler.GoRestfulContainer.Add(s.DiscoveryGroupManager.WebService()) } if feature.DefaultFeatureGate.Enabled(features.APIPriorityAndFairness) { c.FlowControl.Install(s.Handler.NonGoRestfulMux) } }
Apiserver
func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) { // genericServer的初始化 s, err := c.GenericConfig.New("kube-apiserver", delegationTarget) // 核心KubeAPIServer的實例化 m := &Master{ GenericAPIServer: s, ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo, } // 注冊Legacy API的注冊 if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) { legacyRESTStorageProvider := corerest.LegacyRESTStorageProvider{} if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil { return nil, err } } // REST接口的存儲定義,可以看到很多k8s上的常見定義,比如node節點/storage存儲/event事件等等 restStorageProviders := []RESTStorageProvider{ authenticationrest.RESTStorageProvider{Authenticator: c.GenericConfig.Authentication.Authenticator, APIAudiences: c.GenericConfig.Authentication.APIAudiences}, authorizationrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer, RuleResolver: c.GenericConfig.RuleResolver}, autoscalingrest.RESTStorageProvider{}, batchrest.RESTStorageProvider{}, certificatesrest.RESTStorageProvider{}, coordinationrest.RESTStorageProvider{}, discoveryrest.StorageProvider{}, extensionsrest.RESTStorageProvider{}, networkingrest.RESTStorageProvider{}, noderest.RESTStorageProvider{}, policyrest.RESTStorageProvider{}, rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer}, schedulingrest.RESTStorageProvider{}, settingsrest.RESTStorageProvider{}, storagerest.RESTStorageProvider{}, flowcontrolrest.RESTStorageProvider{}, // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names. // See https://github.com/kubernetes/kubernetes/issues/42392 appsrest.StorageProvider{}, admissionregistrationrest.RESTStorageProvider{}, eventsrest.RESTStorageProvider{TTL: c.ExtraConfig.EventTTL}, } // 注冊API if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil { return nil, err } // 添加Hook m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error { }) return m, nil }
注冊API的關鍵在InstallLegacyAPI
和InstallAPIs
,如果你對kubernetes的資源有一定的了解,會知道核心資源都放在Legacy中如pod(如果不了解的話,點擊函數看一下,就能有所有了解)
InstallLegacyAPI
// 定義了legacy和非legacy資源的路由前綴 const ( // DefaultLegacyAPIPrefix is where the legacy APIs will be located. DefaultLegacyAPIPrefix="/api" // APTGroupPrefix is where non-legacy API group will be located. APIGroupPrefix ="/apis" ) func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error { // RESTStorage的初始化 legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter) // 前綴為 /api,注冊上對應的Version和Resource // Pod作為核心資源,沒有Group的概念 if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil { return fmt.Errorf("error in registering group versions: %v", err) } return nil } // 我們再細看這個RESTStorage的初始化 func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) { // pod 模板 podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter) // event事件 eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds())) // limitRange資源限制 limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter) // resourceQuota資源配額 resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter) // secret加密 secretStorage, err := secretstore.NewREST(restOptionsGetter) // PV 存儲 persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter) // PVC 存儲 persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter) // ConfigMap 配置 configMapStorage, err := configmapstore.NewREST(restOptionsGetter) // 等等核心資源,暫不一一列舉 // pod模板,我們的示例nginx-pod屬于這個類型的資源 podStorage, err := podstore.NewStorage() // 保存storage的對應關系 restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, "pods/attach": podStorage.Attach, "pods/status": podStorage.Status, "pods/log": podStorage.Log, "pods/exec": podStorage.Exec, "pods/portforward": podStorage.PortForward, "pods/proxy": podStorage.Proxy, "pods/binding": podStorage.Binding, "bindings": podStorage.LegacyBinding, ... } }
Create Pod
// 查看Pod初始化 上一步的podStorage func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) { store := &genericregistry.Store{ NewFunc: func() runtime.Object { return &api.Pod{} }, NewListFunc: func() runtime.Object { return &api.PodList{} }, PredicateFunc: registrypod.MatchPod, DefaultQualifiedResource: api.Resource("pods"), // 增改刪的策略 CreateStrategy: registrypod.Strategy, UpdateStrategy: registrypod.Strategy, DeleteStrategy: registrypod.Strategy, ReturnDeletedObject: true, TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}, } } // 查看 Strategy 的初始化 是一個全局變量 進行實例化 調用了Scheme,核心資源的schme,legacyscheme var Strategy = podStrategy{legacyscheme.Scheme, names.SimpleNameGenerator} // 又查詢到Scheme的初始化。Schema可以理解為Kubernetes的注冊表,即所有的資源類型必須先注冊進Schema才可使用 注冊里有資源的增刪改的策略 var Scheme = runtime.NewScheme()
Pod數據的保存
理解Pod發送到kube-apiserver
后是怎么保存的
RESTCreateStrategy
// podStrategy 是封裝了 Pod 的各類動作,這里我們先關注create這個操作 type podStrategy struct { runtime.ObjectTyper names.NameGenerator } // podStrategy 的接口 type RESTCreateStrategy interface { runtime.ObjectTyper names.NameGenerator // 是否屬于當前的 namespace NamespaceScoped() bool // 準備創建前的檢查 PrepareForCreate(ctx context.Context, obj runtime.Object) // 驗證資源對象 Validate(ctx context.Context, obj runtime.Object) field.ErrorList // 規范化 Canonicalize(obj runtime.Object) } // 完成了檢查,我們就要保存數據了
Storage
// PodStorage 是 Pod 存儲的實現,里面包含了多個存儲的定義 type PodStorage struct { // REST implements a RESTStorage for pods Pod *REST // BindingREST implements the REST endpoint for binding pods to nodes when etcd is in use. Binding *BindingREST // LegacyBindingREST implements the REST endpoint for binding pods to nodes when etcd is in use. LegacyBinding *LegacyBindingREST Eviction *EvictionREST // StatusREST implements the REST endpoint for changing the status of a pod. Status *StatusREST // EphemeralContainersREST implements the REST endpoint for adding EphemeralContainers EphemeralContainers *EphemeralContainersREST Log *podrest.LogREST Proxy *podrest.ProxyREST Exec *podrest.ExecREST Attach *podrest.AttachREST PortForward *podrest.PortForwardREST } /* 從上一節的map關系中,保存在REST中 restStorageMap := map[string]rest.Storage{ "pods": podStorage.Pod, } */ type REST struct { *genericregistry.Store // 代理傳輸層 大概率是和網絡相關的先不看 proxyTransport http.RoundTripper } // Store是一個通用的數據結構 type Store struct { // Storage定義 ... Storage DryRunnableStorage } // DryRunnableStorage中的Storage是一個Interface type DryRunnableStorage struct { Storage storage.Interface // 和編解碼相關的codec Codec runtime.Codec } func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64, dryRun bool) error { if dryRun { if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err == nil { return storage.NewKeyExistsError(key, 0) } return s.copyInto(obj, out) } // 這里,就是Create的真正調用 return s.Storage.Create(ctx, key, obj, out, ttl) }
Storage Implement
// Storage Interface 的定義,包括基本的增刪改查,以及watch等等進階操作 type Interface interface { Versioner() Versioner Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error Delete(ctx context.Context, key string, out runtime.Object, preconditions *Preconditions, validateDeletion ValidateObjectFunc) error Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error) WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error) Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error GuaranteedUpdate( ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, precondtions *Preconditions, tryUpdate UpdateFunc, suggestion ...runtime.Object) error Count(key string) (int64, error) } // 去找Storage的初始化 func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc, error) { return factory.Create(*config) } func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) { switch c.Type { // 已經不支持etcd2 case "etcd2": return nil, nil, fmt.Errorf("%v is no longer a supported storage backend", c.Type) // 默認為etcd3版本 case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3: return newETCD3Storage(c) default: return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type) } }
Summary
-
kube-apiserver
?包含三個apiserverAPIExtensionsServer
、KubeAPIServer
和AggregatorServer
三個APIServer底層均依賴通用的GenericServer
,使用go-restful
對外提供RESTful風格的API服務,三個server,都有兩類配置一類是專有的一個通用的genericServer,通用的配置中有三種Authentication/Authorization/Admission
,控制權限的方式, -
kube-apiserver
?對請求進行?Authentication
、Authorization
和Admission
三層驗證,Admission
是插件化的,可以通過webhook
來拓展 - 完成驗證后,請求會根據路由規則,觸發到對應資源的handler,主要包括數據的
預處理
和保存
,pod的底層是podStorage
的對象,使用到注冊表schme
-
kube-apiserver
?的底層存儲為etcd v3,它被抽象為一種RESTStorage
,使網絡請求和底層存儲操作一一對應
原文鏈接:https://juejin.cn/post/7154591873066565668
相關推薦
- 2023-01-13 Qt中QList與QLinkedList類的常用方法總結_C 語言
- 2022-12-29 Python?Setuptools的?setup.py實例詳解_python
- 2022-11-23 Python數據清洗&預處理入門教程_python
- 2022-08-10 python中ThreadPoolExecutor線程池和ProcessPoolExecutor進程
- 2022-07-19 C語言實現KMP算法+優化
- 2022-07-17 Linux安裝Anaconda及簡單操作
- 2022-11-10 Android開發之ViewPager實現滑動切換頁面_Android
- 2023-04-08 React中useCallback?useMemo到底該怎么用_React
- 最近更新
-
- window11 系統安裝 yarn
- 超詳細win安裝深度學習環境2025年最新版(
- Linux 中運行的top命令 怎么退出?
- MySQL 中decimal 的用法? 存儲小
- get 、set 、toString 方法的使
- @Resource和 @Autowired注解
- Java基礎操作-- 運算符,流程控制 Flo
- 1. Int 和Integer 的區別,Jav
- spring @retryable不生效的一種
- Spring Security之認證信息的處理
- Spring Security之認證過濾器
- Spring Security概述快速入門
- Spring Security之配置體系
- 【SpringBoot】SpringCache
- Spring Security之基于方法配置權
- redisson分布式鎖中waittime的設
- maven:解決release錯誤:Artif
- restTemplate使用總結
- Spring Security之安全異常處理
- MybatisPlus優雅實現加密?
- Spring ioc容器與Bean的生命周期。
- 【探索SpringCloud】服務發現-Nac
- Spring Security之基于HttpR
- Redis 底層數據結構-簡單動態字符串(SD
- arthas操作spring被代理目標對象命令
- Spring中的單例模式應用詳解
- 聊聊消息隊列,發送消息的4種方式
- bootspring第三方資源配置管理
- GIT同步修改后的遠程分支