k8s源码分析(2)- kube-apiserver

2,412 阅读13分钟

前言

最近加入云原生社区组织的k8s源码研习社,开始学习k8s底层源码,并整理成笔记。欢迎感兴趣的同学一起加入,共同学习进步。群里和社区里有各种大佬,随时可以帮你答疑解惑。github.com/cloudnative…

上一篇整理了client-go框架的Informer机制,informer源码分析, 同时api-server用到了go-restful这个web框架,go-restful的原理和源码参考go-restful 源码分析

先放一张kube-apiserver代码调用关系图

后续的源码分析链路很长,很容易陷进去出不来,建议随时根据这张图查看目前分析到哪一步了。

高清地址

api-server

概述

kube-apiserver作为k8s最核心的组件,是各个组件之间沟通的桥梁,各个组件不会直接通信,而是都要经过api-server做中转。详见之前的另一篇博客,本文从源码角度分析api-server

kube-apiserver主要职责

  • 提供整个集群的api接口管理,提供api注册、发现 --- 通过go-restful框架实现
  • 资源操作的唯一入口 --- 操作etcd资源
  • 集群内部各个组件的枢纽
  • 提供请求认证、授权、访问控制等安全控制

三大服务

kube-apiserver提供了3种http server服务,用于将庞大的kube-apiserver组件功能进行解耦,这三种Http Server分别是:

服务名 | 概述 | 对象管理 | 资源注册表

  • | - | - | - | KubeAPIServer | 核心服务,提供k8s内置核心资源服务,不允许开发者随意修改,如:Pod,Service等 | Master | Legacyscheme.Scheme APIExtensionsServer | API扩展服务,该服务提供了CRD自定义资源服务 | CustomResourceDefinitions | extensionsapiserver.Scheme AggregatorServer | API聚合服务,提供了聚合服务 | APIAggregator | aggregatorscheme.Scheme

三种服务底层都依赖GenericAPIServer,通过GenericAPIServer可以将k8s资源与rest api进行映射

kube-apiserver的启动流程概述

kube-apiserver是所有资源控制的入口,启动流程也略复杂,启动的代码逻辑可以分为9个步骤:

  • 资源注册
  • Cobra命令行参数解析
  • 创建apiserver通用配置
  • 创建APIExtensionsServer
  • 创建KubeAPIServer
  • 创建AggregatorServer
  • 创建GenericAPIServer
  • 启动http服务
  • 启动https服务

其中,三个sever是通过委托模式连接在一起的,初始化的过程都是类似的,包括:

  • 首先为每个server创建对应的config
  • 然后初始化http server,具体包括:
    • 初始化GoRestfulContainer
    • 安装server所包含的api,细节有:
      • 为每个api-resource创建对应的后端存储RESTStorage
      • 为每个api-resource所支持的verbs添加对应的handler
      • 将handler注册到router中
      • 将router注册到webservice

0. 入口函数

kube-apiserver组件是一个单独的进程,启动的入口函数如下:

源码位置:cmd/kube-apiserver/apiserver.go

import (
  ...
  // 引入legacyscheme,内部的init方法实现资源注册表的注册
  "k8s.io/kubernetes/pkg/api/legacyscheme"
  // 引入master,内部的init方法实现k8s所有资源的注册
  "k8s.io/kubernetes/pkg/master"
  ...
)
func main() {
  rand.Seed(time.Now().UnixNano())

  // 创建一个带有默认参数的Cobra Command对象
  command := app.NewAPIServerCommand()

  logs.InitLogs()
  defer logs.FlushLogs()

  if err := command.Execute(); err != nil {
    os.Exit(1)
  }
}

1. 资源注册

kube-apiserver组件启动后的第一件事情是将k8s所支持的资源注册到Scheme资源注册表中,这样后面的启动逻辑才能拿到资源信息,并启动和运行前面介绍的三个服务

资源的注册过程不是函数调用触发的,而是通过import和init机制触发的。前面第0步提到过的import "k8s.io/kubernetes/pkg/api/legacyscheme"

资源注册包括两步:

  • 初始化Scheme资源注册表
  • 注册k8s所支持的资源

初始化Scheme资源注册表

代码路径:pkg/api/legacyscheme/scheme.go

定义了三个全局变量服务于kube-apiserver,组件在任何地方都可以调用

package legacyscheme

import (
  "k8s.io/apimachinery/pkg/runtime"
  "k8s.io/apimachinery/pkg/runtime/serializer"
)

// Scheme资源注册表
var Scheme = runtime.NewScheme()
// Codec编解码器
var Codecs = serializer.NewCodecFactory(Scheme)
// 参数编解码器
var ParameterCodec = runtime.NewParameterCodec(Scheme)

注册k8s所支持的资源

apiserver启动时导入了master包,前面第0步介绍的import "k8s.io/kubernetes/pkg/master"

master包中的import_known_versions.go调用了k8s资源下的install包,通过导入包触发初始化函数。 每种资源下都定义install包,被引用时触发init函数完成资源注册过程

源码位置:pkg/master/import_known_versions.go

import (
  // These imports are the API groups the API server will support.
  _ "k8s.io/kubernetes/pkg/apis/admission/install"
  _ "k8s.io/kubernetes/pkg/apis/admissionregistration/install"
  _ "k8s.io/kubernetes/pkg/apis/apps/install"
  _ "k8s.io/kubernetes/pkg/apis/authentication/install"
  _ "k8s.io/kubernetes/pkg/apis/authorization/install"
  _ "k8s.io/kubernetes/pkg/apis/autoscaling/install"
  _ "k8s.io/kubernetes/pkg/apis/batch/install"
  _ "k8s.io/kubernetes/pkg/apis/certificates/install"
  _ "k8s.io/kubernetes/pkg/apis/coordination/install"
  _ "k8s.io/kubernetes/pkg/apis/core/install"
  _ "k8s.io/kubernetes/pkg/apis/discovery/install"
  _ "k8s.io/kubernetes/pkg/apis/events/install"
  _ "k8s.io/kubernetes/pkg/apis/extensions/install"
  _ "k8s.io/kubernetes/pkg/apis/flowcontrol/install"
  _ "k8s.io/kubernetes/pkg/apis/imagepolicy/install"
  _ "k8s.io/kubernetes/pkg/apis/networking/install"
  _ "k8s.io/kubernetes/pkg/apis/node/install"
  _ "k8s.io/kubernetes/pkg/apis/policy/install"
  _ "k8s.io/kubernetes/pkg/apis/rbac/install"
  _ "k8s.io/kubernetes/pkg/apis/scheduling/install"
  _ "k8s.io/kubernetes/pkg/apis/settings/install"
  _ "k8s.io/kubernetes/pkg/apis/storage/install"
)

以Core资源为例,查看install。 源码位置:pkg/apis/core/install/install.go

func init() {
  // legacyscheme.Scheme是前面介绍的全局资源注册中心
  Install(legacyscheme.Scheme)
}

// Install registers the API group and adds types to a scheme
func Install(scheme *runtime.Scheme) {
  // 注册资源组
  utilruntime.Must(core.AddToScheme(scheme))
  utilruntime.Must(v1.AddToScheme(scheme))
  // 注册版本顺序
  utilruntime.Must(scheme.SetVersionPriority(v1.SchemeGroupVersion))
}

2. Cobra命令行参数解析

k8s中所有组件统一使用cobra来解析命令行参数。kube-apiserver组件通过Cobra填充配置参数默认值并验证参数,前面第0步介绍的app.NewAPIServerCommand()

源码位置:cmd/kube-apiserver/app/server.go

func NewAPIServerCommand() *cobra.Command {
  // 初始化各个模块的默认配置,内部调用了各个模块各自的默认配置
  s := options.NewServerRunOptions()
  cmd := &cobra.Command{
    ...
    RunE: func(cmd *cobra.Command, args []string) error {
      ...
      // 设置默认参数配置
      completedOptions, err := Complete(s)
      // 验证参数合法性
      if errs := completedOptions.Validate(); len(errs) != 0 {
        return utilerrors.NewAggregate(errs)
      }
      // 启动运行,常驻进程
      // Run函数后面专门介绍
      return Run(completedOptions, genericapiserver.SetupSignalHandler())
    },
    ...
  }
  ...
}

// genericapiserver.SetupSignalHandler()
func SetupSignalHandler() <-chan struct{} {
  return SetupSignalContext().Done()
}

func SetupSignalContext() context.Context {
  ...
  // 监听操作系统信号os.Interrupt和syscall.SIGTERM
  // 并将监听的信号与stopChan绑定,确保进程终止时,groutine优雅退出
  signal.Notify(shutdownHandler, shutdownSignals...)
  ...
}

3. 创建服务链

核心流程是前面介绍的kube-apiserver组件中三大服务的配置和创建,具体包括:

  • 创建kubeapi-server通用配置
  • 创建kubeapi-extension-server配置
  • 创建kubeapi-extension服务
  • 创建kubeapi-server服务
  • 创建aggregator-server配置
  • 创建aggregator-server服务
  • 启动服务

apiserver通用配置是kube-apiserver不同模块实例化所需的配置,具体包括:

  • genericConfig实例化
  • OpenAPI、Swagger配置
  • StorageFactory(Etcd)配置
  • Authentication认证配置
  • Authorization授权配置
  • Admission准控制器配置
func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) error {
  // 创建服务链
  server, err := CreateServerChain(completeOptions, stopCh)
  if err != nil {
    return err
  }
  // 预运行
  prepared, err := server.PrepareRun()
  if err != nil {
    return err
  }
  // 正式运行
  return prepared.Run(stopCh)
}

// 创建服务链
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
  ...
  // 创建 kubeapi-server 配置
  kubeAPIServerConfig, insecureServingInfo, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, nodeTunneler, proxyTransport)
  if err != nil {
    return nil, err
  }

  // 创建 kubeapi-extension-server 配置
  apiExtensionsConfig, err := createAPIExtensionsConfig(*kubeAPIServerConfig.GenericConfig, kubeAPIServerConfig.ExtraConfig.VersionedInformers, pluginInitializer, completedOptions.ServerRunOptions, completedOptions.MasterCount,
    serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(proxyTransport, kubeAPIServerConfig.GenericConfig.EgressSelector, kubeAPIServerConfig.GenericConfig.LoopbackClientConfig))

  // 创建 kubeapi-extension-server 服务
  apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegate())

  // 创建 kubeapi-server 服务
  kubeAPIServer, err := CreateKubeAPIServer(kubeAPIServerConfig, apiExtensionsServer.GenericAPIServer)

  // 创建 aggregator-server 配置
  aggregatorConfig, err := createAggregatorConfig(*kubeAPIServerConfig.GenericConfig, completedOptions.ServerRunOptions, kubeAPIServerConfig.ExtraConfig.VersionedInformers, serviceResolver, proxyTransport, pluginInitializer)

  // 创建 aggregator-server 服务
  aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)

  if insecureServingInfo != nil {
    insecureHandlerChain := kubeserver.BuildInsecureHandlerChain(aggregatorServer.GenericAPIServer.UnprotectedHandler(), kubeAPIServerConfig.GenericConfig)
    // 启动服务
    if err := insecureServingInfo.Serve(insecureHandlerChain, kubeAPIServerConfig.GenericConfig.RequestTimeout, stopCh); err != nil {
      return nil, err
    }
  }
  return aggregatorServer, nil
}

3.1 创建kubeapi-server通用配置

func CreateKubeAPIServerConfig(...) (...) {
  // 构建通用配置
  genericConfig, versionedInformers, insecureServingInfo, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
  ...
  // 设置端口范围
  serviceIPRange, apiServerServiceIP, err := master.ServiceIPRange(s.PrimaryServiceClusterIPRange)
  ...
  // 构造master.Config
  config := &master.Config{
    GenericConfig: genericConfig,
    ExtraConfig: master.ExtraConfig{
      APIResourceConfigSource: storageFactory.APIResourceConfigSource,
      ...
    },
  }
  ...
}
3.1.1 buildGenericConfig
func buildGenericConfig(...) (...) {
  genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
  // 配置启动、禁用GV
  genericConfig.MergedResourceConfig = master.DefaultAPIResourceConfigSource()
  ...
  // openapi/swagger配置
  // OpenAPIConfig用于生成OpenAPI规范
  genericConfig.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(legacyscheme.Scheme, extensionsapiserver.Scheme, aggregatorscheme.Scheme))
  genericConfig.OpenAPIConfig.Info.Title = "Kubernetes"
  genericConfig.LongRunningFunc = filters.BasicLongRunningRequestCheck(
    sets.NewString("watch", "proxy"),
    sets.NewString("attach", "exec", "proxy", "log", "portforward"),
  )

  kubeVersion := version.Get()
  genericConfig.Version = &kubeVersion
  // etcd配置
  // storageFactoryConfig对象定义了kube-apiserver与etcd的交互方式,如:etcd认证、地址、存储前缀等
  // 该对象也定义了资源存储方式,如:资源信息、资源编码信息、资源状态等
  storageFactoryConfig := kubeapiserver.NewStorageFactoryConfig()
  storageFactoryConfig.APIResourceConfig = genericConfig.MergedResourceConfig
  completedStorageFactoryConfig, err := storageFactoryConfig.Complete(s.Etcd)
  if err != nil {
    lastErr = err
    return
  }
  storageFactory, lastErr = completedStorageFactoryConfig.New()
  if lastErr != nil {
    return
  }
  if genericConfig.EgressSelector != nil {
    storageFactory.StorageConfig.Transport.EgressLookup = genericConfig.EgressSelector.Lookup
  }
  if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
    return
  }

  // NewSharedInformerFactory初始化
  versionedInformers = clientgoinformers.NewSharedInformerFactory(clientgoExternalClient, 10*time.Minute)

  // 认证配置
  // 内部调用 authenticatorConfig.New()
  // k8s提供9种认证机制,每种认证机制被实例化后都成为认证器
  if lastErr = s.Authentication.ApplyTo(&genericConfig.Authentication, genericConfig.SecureServing, genericConfig.EgressSelector, genericConfig.OpenAPIConfig, clientgoExternalClient, versionedInformers); lastErr != nil {
    return
  }

  // 授权配置
  // k8e提供6种授权机制,每种授权机制被实例化后都成为授权器
  genericConfig.Authorization.Authorizer, genericConfig.RuleResolver, err = BuildAuthorizer(s, genericConfig.EgressSelector, versionedInformers)

  // 准入器admission配置
  // k8s资源在认证和授权通过,被持久化到etcd之前进入准入控制逻辑
  // 准入控制包括:对请求的资源进行自定义操作(校验、修改、拒绝)
  // k8s支持31种准入控制
  // 准入控制器通过Plugins数据结构统一注册、存放、管理
  admissionConfig := &kubeapiserveradmission.Config{
    ExternalInformers:    versionedInformers,
    LoopbackClientConfig: genericConfig.LoopbackClientConfig,
    CloudConfigFile:      s.CloudProvider.CloudConfigFile,
  }
  serviceResolver = buildServiceResolver(s.EnableAggregatorRouting, genericConfig.LoopbackClientConfig.Host, versionedInformers)
  pluginInitializers, admissionPostStartHook, err = admissionConfig.New(proxyTransport, genericConfig.EgressSelector, serviceResolver)
  if err != nil {
    lastErr = fmt.Errorf("failed to create admission plugin initializer: %v", err)
    return
  }

  err = s.Admission.ApplyTo(
    genericConfig,
    versionedInformers,
    kubeClientConfig,
    feature.DefaultFeatureGate,
    pluginInitializers...)
  if err != nil {
    lastErr = fmt.Errorf("failed to initialize admission: %v", err)
  }

  if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIPriorityAndFairness) && s.GenericServerRunOptions.EnablePriorityAndFairness {
    genericConfig.FlowControl = BuildPriorityAndFairness(s, clientgoExternalClient, versionedInformers)
  }

  return
}

// 认证初始化
func (o *BuiltInAuthenticationOptions) ApplyTo(...) error {
  ...
  authInfo.Authenticator, openAPIConfig.SecurityDefinitions, err = authenticatorConfig.New()
  ...
}

// 根据配置决定9种认证器的初始化
func (config Config) New() (authenticator.Request, *spec.SecurityDefinitions, error) {
  // 定义认证器列表
  var authenticators []authenticator.Request

  // 下面根据不同的开关,决定是否配置某种认证器

  // RequestHeader认证器
  if config.RequestHeaderConfig != nil {
    requestHeaderAuthenticator := headerrequest.NewDynamicVerifyOptionsSecure(
      config.RequestHeaderConfig.CAContentProvider.VerifyOptions,
      config.RequestHeaderConfig.AllowedClientNames,
      config.RequestHeaderConfig.UsernameHeaders,
      config.RequestHeaderConfig.GroupHeaders,
      config.RequestHeaderConfig.ExtraHeaderPrefixes,
    )
    authenticators = append(authenticators, authenticator.WrapAudienceAgnosticRequest(config.APIAudiences, requestHeaderAuthenticator))
  }
  // X509 methods
  if config.ClientCAContentProvider != nil {
    certAuth := x509.NewDynamic(config.ClientCAContentProvider.VerifyOptions, x509.CommonNameUserConversion)
    authenticators = append(authenticators, certAuth)
  }

  // TokenAuth认证器
  if len(config.TokenAuthFile) > 0 {
    tokenAuth, err := newAuthenticatorFromTokenFile(config.TokenAuthFile)
    if err != nil {
      return nil, nil, err
    }
    tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, tokenAuth))
  }
  // ServiceAccountAuth认证器
  if len(config.ServiceAccountKeyFiles) > 0 {
    serviceAccountAuth, err := newLegacyServiceAccountAuthenticator(config.ServiceAccountKeyFiles, config.ServiceAccountLookup, config.APIAudiences, config.ServiceAccountTokenGetter)
    if err != nil {
      return nil, nil, err
    }
    tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
  }
  if utilfeature.DefaultFeatureGate.Enabled(features.TokenRequest) && config.ServiceAccountIssuer != "" {
    serviceAccountAuth, err := newServiceAccountAuthenticator(config.ServiceAccountIssuer, config.ServiceAccountKeyFiles, config.APIAudiences, config.ServiceAccountTokenGetter)
    if err != nil {
      return nil, nil, err
    }
    tokenAuthenticators = append(tokenAuthenticators, serviceAccountAuth)
  }
  // BootstrapToken认证器
  if config.BootstrapToken {
    if config.BootstrapTokenAuthenticator != nil {
      // TODO: This can sometimes be nil because of
      tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, config.BootstrapTokenAuthenticator))
    }
  }
  // NOTE(ericchiang): Keep the OpenID Connect after Service Accounts.
  //
  // Because both plugins verify JWTs whichever comes first in the union experiences
  // cache misses for all requests using the other. While the service account plugin
  // simply returns an error, the OpenID Connect plugin may query the provider to
  // update the keys, causing performance hits.
  if len(config.OIDCIssuerURL) > 0 && len(config.OIDCClientID) > 0 {
    oidcAuth, err := newAuthenticatorFromOIDCIssuerURL(oidc.Options{
      IssuerURL:            config.OIDCIssuerURL,
      ClientID:             config.OIDCClientID,
      CAFile:               config.OIDCCAFile,
      UsernameClaim:        config.OIDCUsernameClaim,
      UsernamePrefix:       config.OIDCUsernamePrefix,
      GroupsClaim:          config.OIDCGroupsClaim,
      GroupsPrefix:         config.OIDCGroupsPrefix,
      SupportedSigningAlgs: config.OIDCSigningAlgs,
      RequiredClaims:       config.OIDCRequiredClaims,
    })
    if err != nil {
      return nil, nil, err
    }
    tokenAuthenticators = append(tokenAuthenticators, authenticator.WrapAudienceAgnosticToken(config.APIAudiences, oidcAuth))
  }
  // WebhookTokenAuth认证器
  if len(config.WebhookTokenAuthnConfigFile) > 0 {
    webhookTokenAuth, err := newWebhookTokenAuthenticator(config)
    if err != nil {
      return nil, nil, err
    }

    tokenAuthenticators = append(tokenAuthenticators, webhookTokenAuth)
  }

  if len(tokenAuthenticators) > 0 {
    // Union the token authenticators
    tokenAuth := tokenunion.New(tokenAuthenticators...)
    // Optionally cache authentication results
    if config.TokenSuccessCacheTTL > 0 || config.TokenFailureCacheTTL > 0 {
      tokenAuth = tokencache.New(tokenAuth, true, config.TokenSuccessCacheTTL, config.TokenFailureCacheTTL)
    }
    authenticators = append(authenticators, bearertoken.New(tokenAuth), websocket.NewProtocolAuthenticator(tokenAuth))
    securityDefinitions["BearerToken"] = &spec.SecurityScheme{
      SecuritySchemeProps: spec.SecuritySchemeProps{
        Type:        "apiKey",
        Name:        "authorization",
        In:          "header",
        Description: "Bearer Token authentication",
      },
    }
  }
  // 匿名认证器
  if len(authenticators) == 0 {
    if config.Anonymous {
      return anonymous.NewAuthenticator(), &securityDefinitions, nil
    }
    return nil, &securityDefinitions, nil
  }
  ...
  // 将多个认证器合并
  authenticator := union.New(authenticators...)
  ...
}

// 授权初始化
func BuildAuthorizer(s *options.ServerRunOptions, EgressSelector *egressselector.EgressSelector, versionedInformers clientgoinformers.SharedInformerFactory) (authorizer.Authorizer, authorizer.RuleResolver, error) {
  ...
  return authorizationConfig.New()
}

// 6种授权器配置
func (config Config) New() (authorizer.Authorizer, authorizer.RuleResolver, error) {
  ...
  // 声明认证器Authorizer列表
  var (
    authorizers   []authorizer.Authorizer
    ruleResolvers []authorizer.RuleResolver
  )

  for _, authorizationMode := range config.AuthorizationModes {
    switch authorizationMode {
    // Node授权器
    case modes.ModeNode:
      graph := node.NewGraph()
      node.AddGraphEventHandlers(
        graph,
        config.VersionedInformerFactory.Core().V1().Nodes(),
        config.VersionedInformerFactory.Core().V1().Pods(),
        config.VersionedInformerFactory.Core().V1().PersistentVolumes(),
        config.VersionedInformerFactory.Storage().V1().VolumeAttachments(),
      )
      nodeAuthorizer := node.NewAuthorizer(graph, nodeidentifier.NewDefaultNodeIdentifier(), bootstrappolicy.NodeRules())
      authorizers = append(authorizers, nodeAuthorizer)
      ruleResolvers = append(ruleResolvers, nodeAuthorizer)
    // AlwaysAllow授权器
    case modes.ModeAlwaysAllow:
      alwaysAllowAuthorizer := authorizerfactory.NewAlwaysAllowAuthorizer()
      authorizers = append(authorizers, alwaysAllowAuthorizer)
      ruleResolvers = append(ruleResolvers, alwaysAllowAuthorizer)
    // AlwaysDeny授权器
    case modes.ModeAlwaysDeny:
      alwaysDenyAuthorizer := authorizerfactory.NewAlwaysDenyAuthorizer()
      authorizers = append(authorizers, alwaysDenyAuthorizer)
      ruleResolvers = append(ruleResolvers, alwaysDenyAuthorizer)
    // ABAC授权器
    case modes.ModeABAC:
      abacAuthorizer, err := abac.NewFromFile(config.PolicyFile)
      if err != nil {
        return nil, nil, err
      }
      authorizers = append(authorizers, abacAuthorizer)
      ruleResolvers = append(ruleResolvers, abacAuthorizer)
    // Webhook授权器
    case modes.ModeWebhook:
      webhookAuthorizer, err := webhook.New(config.WebhookConfigFile,
        config.WebhookVersion,
        config.WebhookCacheAuthorizedTTL,
        config.WebhookCacheUnauthorizedTTL,
        config.CustomDial)
      if err != nil {
        return nil, nil, err
      }
      authorizers = append(authorizers, webhookAuthorizer)
      ruleResolvers = append(ruleResolvers, webhookAuthorizer)
    // RBAC授权器
    case modes.ModeRBAC:
      rbacAuthorizer := rbac.New(
        &rbac.RoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().Roles().Lister()},
        &rbac.RoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().RoleBindings().Lister()},
        &rbac.ClusterRoleGetter{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoles().Lister()},
        &rbac.ClusterRoleBindingLister{Lister: config.VersionedInformerFactory.Rbac().V1().ClusterRoleBindings().Lister()},
      )
      authorizers = append(authorizers, rbacAuthorizer)
      ruleResolvers = append(ruleResolvers, rbacAuthorizer)
    default:
      return nil, nil, fmt.Errorf("unknown authorization mode %s specified", authorizationMode)
    }
  }
  // 将已启用的认证器合并到列表中
  // 请求到来时,kube-apiserver会遍历认证器列表,当有一个返回True时,表明认证成功
  return union.New(authorizers...), union.NewRuleResolvers(ruleResolvers...), nil
}

3.2 创建kubeapi-extension-server配置

func createAPIExtensionsConfig(...)(...) {
  ...
  apiextensionsConfig := &apiextensionsapiserver.Config{
    GenericConfig: &genericapiserver.RecommendedConfig{
      Config:                genericConfig,
      SharedInformerFactory: externalInformers,
    },
    ExtraConfig: apiextensionsapiserver.ExtraConfig{
      CRDRESTOptionsGetter: apiextensionsoptions.NewCRDRESTOptionsGetter(etcdOptions),
      MasterCount:          masterCount,
      AuthResolverWrapper:  authResolverWrapper,
      ServiceResolver:      serviceResolver,
    },
  }
  ...
}

3.3 创建kubeapi-extension服务

func createAPIExtensionsServer(...) (*apiextensionsapiserver.CustomResourceDefinitions, error) {
  return apiextensionsConfig.Complete().New(delegateAPIServer)
}

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*CustomResourceDefinitions, error) {
  // APIExtensionsServer依赖GenericAPIServer
  // 通过GenericConfig创建一个名为apiextensions-apiserver的服务
  genericServer, err := c.GenericConfig.New("apiextensions-apiserver", delegationTarget)

  // APIExtensionsServer通过CustomResourceDefinitions对象进行管理
  // 实例化该对象后才能注册APIExtensionsServer下的资源
  s := &CustomResourceDefinitions{
    GenericAPIServer: genericServer,
  }

  apiResourceConfig := c.GenericConfig.MergedResourceConfig

  // 实例化APIGroupInfo,该对象用于描述资源组信息
  apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiextensions.GroupName, Scheme, metav1.ParameterCodec, Codecs)

  // 完成资源与资源存储对象的映射
  // 如果开启了v1beta1资源版本,将资源版本、资源、资源存储存放到APIGroupInfo的map中
  if apiResourceConfig.VersionEnabled(v1beta1.SchemeGroupVersion) {
    storage := map[string]rest.Storage{}
    // 通过NewRest创建资源存储对象
    customResourceDefinitionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
    if err != nil {
      return nil, err
    }
    storage["customresourcedefinitions"] = customResourceDefinitionStorage
    storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefinitionStorage)

    apiGroupInfo.VersionedResourcesStorageMap[v1beta1.SchemeGroupVersion.Version] = storage
  }
  // 如果开启了v1资源版本,将资源版本、资源、资源存储存放到APIGroupInfo的map中
  if apiResourceConfig.VersionEnabled(v1.SchemeGroupVersion) {
    storage := map[string]rest.Storage{}
    // customresourcedefinitions
    customResourceDefintionStorage, err := customresourcedefinition.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
    if err != nil {
      return nil, err
    }
    storage["customresourcedefinitions"] = customResourceDefintionStorage
    storage["customresourcedefinitions/status"] = customresourcedefinition.NewStatusREST(Scheme, customResourceDefintionStorage)

    apiGroupInfo.VersionedResourcesStorageMap[v1.SchemeGroupVersion.Version] = storage
  }
  // 注册api,这个函数后面单独介绍
  if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
    return nil, err
  }

  crdClient, err := internalclientset.NewForConfig(s.GenericAPIServer.LoopbackClientConfig)

  s.Informers = internalinformers.NewSharedInformerFactory(crdClient, 5*time.Minute)
  // 初始化主controller
  establishingController := establish.NewEstablishingController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
  // 申明handler
  crdHandler, err := NewCustomResourceDefinitionHandler(
    versionDiscoveryHandler,
    groupDiscoveryHandler,
    s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
    delegateHandler,
    c.ExtraConfig.CRDRESTOptionsGetter,
    c.GenericConfig.AdmissionControl,
    establishingController,
    c.ExtraConfig.ServiceResolver,
    c.ExtraConfig.AuthResolverWrapper,
    c.ExtraConfig.MasterCount,
    s.GenericAPIServer.Authorizer,
    c.GenericConfig.MaxRequestBodyBytes,
  )
  if err != nil {
    return nil, err
  }
  // 添加handler函数
  s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
  s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)

  // 初始化crdController
  crdController := NewDiscoveryController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
  // 初始化namingController
  namingController := status.NewNamingConditionController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(), crdClient.Apiextensions())
  // 初始化finalizingController
  finalizingController := finalizer.NewCRDFinalizer(
    s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions(),
    crdClient.Apiextensions(),
    crdHandler,
  )
  // 初始化openapiController
  var openapiController *openapicontroller.Controller
  if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
    openapiController = openapicontroller.NewController(s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions())
  }

  // 注册hook函数:监听informer
  s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-informers", func(context genericapiserver.PostStartHookContext) error {
    s.Informers.Start(context.StopCh)
    return nil
  })
  // 注册hook函数:启动controller
  s.GenericAPIServer.AddPostStartHookOrDie("start-apiextensions-controllers", func(context genericapiserver.PostStartHookContext) error {
    if utilfeature.DefaultFeatureGate.Enabled(apiextensionsfeatures.CustomResourcePublishOpenAPI) {
      go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
    }
    // 启动前面初始化的各种controller
    go crdController.Run(context.StopCh)
    go namingController.Run(context.StopCh)
    go establishingController.Run(context.StopCh)
    go finalizingController.Run(5, context.StopCh)
    return nil
  })
  // 注册hook函数:同步crd资源
  s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
    return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
      return s.Informers.Apiextensions().InternalVersion().CustomResourceDefinitions().Informer().HasSynced(), nil
    }, context.StopCh)
  })

  return s, nil
}
3.3.1 实例化APIGroupInfo

APIGroupInfo用于描述资源组信息,一个资源对应一个APIGroupInfo对象,每个资源对应一个资源存储对象

func NewDefaultAPIGroupInfo(group string, scheme *runtime.Scheme, parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory) APIGroupInfo {
  return APIGroupInfo{
    PrioritizedVersions:          scheme.PrioritizedVersionsForGroup(group),
    // 这个map用于存储资源、资源存储对象的映射关系
    // 格式:资源版本/资源/资源存储对象
    // 资源存储对象RESTStorage,负责资源的增删改查
    // 后续会将RESTStorage转换为http的handler函数
    VersionedResourcesStorageMap: map[string]map[string]rest.Storage{},
    // TODO unhardcode this.  It was hardcoded before, but we need to re-evaluate
    OptionsExternalVersion: &schema.GroupVersion{Version: "v1"},
    Scheme:                 scheme,
    ParameterCodec:         parameterCodec,
    NegotiatedSerializer:   codecs,
  }
}
3.3.2 注册api函数:InstallAPIGroup

注册APIGroupInfo的函数非常重要,将APIGroupInfo中的资源对象注册到APIExtensionServerHandler函数。其过程是:

  • 遍历APIGroupInfo
  • 将资源组、资源版本、资源名称映射到http path请求路径
  • 通过InstallREST函数将资源存储对象作为资源的handlers方法
  • 最后用go-restful的ws.Route将定义好的请求路径和handlers方法添加路由到go-restful
func (s *GenericAPIServer) InstallAPIGroup(apiGroupInfo *APIGroupInfo) error {
  return s.InstallAPIGroups(apiGroupInfo)
}

// InstallAPIGroups
func (s *GenericAPIServer) InstallAPIGroups(apiGroupInfos ...*APIGroupInfo) error {
  ...
  // 遍历所有的资源信息,一次安装资源版本处理器
  for _, apiGroupInfo := range apiGroupInfos {
    if err := s.installAPIResources(APIGroupPrefix, apiGroupInfo, openAPIModels); err != nil {
      return fmt.Errorf("unable to install api resources: %v", err)
    }
    ...
    apiGroup := metav1.APIGroup{
      Name:             apiGroupInfo.PrioritizedVersions[0].Group,
      Versions:         apiVersionsForDiscovery,
      PreferredVersion: preferredVersionForDiscovery,
    }

    s.DiscoveryGroupManager.AddGroup(apiGroup)
    s.Handler.GoRestfulContainer.Add(discovery.NewAPIGroupHandler(s.Serializer, apiGroup).WebService())
  }
  return nil
}

// installAPIResources
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
  for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
    ...
    // 调用InstallREST
    // 参数为go-restful的container对象
    if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
      return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
    }
  }

  return nil
}

// InstallREST
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
  // 定义http path请求路径
  // 格式:<apiPrefix>/<group>/<version>
  // apiPrefix是api或者apis
  prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
  // 实例化APIInstaller实例化器
  installer := &APIInstaller{
    group:             g,
    prefix:            prefix,
    minRequestTimeout: g.MinRequestTimeout,
  }
  // 注册api,返回go-restful的WebService对象
  apiResources, ws, registrationErrors := installer.Install()
  versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
  versionDiscoveryHandler.AddToWebService(ws)
  // 这里用到go-restful框架的知识:将WebService添加到Container中
  container.Add(ws)
  return utilerrors.NewAggregate(registrationErrors)
}

// installer.Install
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
  // 构造WebService对象
  ws := a.newWebService()
  ...
  // 遍历所有的路径
  for _, path := range paths {
    // 实现Storage到Router的转换,将路由注册到webservice
    apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
    ...
    if apiResource != nil {
      // 添加到列表中
      apiResources = append(apiResources, *apiResource)
    }
  }
  return apiResources, ws, errors
}

// 这个方法很长,核心功能是根据storage构造handler,再将handler和path构造成go-restful框架的Route对象,最后Route添加到webservice
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (...) {
  ...
  // 判断storage实现了哪些Rest接口
  creater, isCreater := storage.(rest.Creater)
  namedCreater, isNamedCreater := storage.(rest.NamedCreater)
  ...
  // 构造action列表
  actions = appendIf(actions, action{"LIST", resourcePath, resourceParams, namer, false}, isLister)
  ...
  for _, action := range actions {
    ...
    // 构造go-restful的RouteBuilder对象
    routes := []*restful.RouteBuilder{}
    // 根据action的不同Verb,注册不同的handler
    switch action.Verb {
    case "GET":
      ...
      // 初始化handler
      handler = restfulGetResource(getter, exporter, reqScope)
      ...
      // 构造route
      route := ws.GET(action.Path).To(handler).xxx
      ...
      // route追加到routes
      routes = append(routes, route)
    ...
    case "POST":
      ...
      // handler初始化,后面专门介绍
      handler = restfulCreateResource(creater, reqScope, admit)
      route := ws.POST(action.Path).To(handler).xxx
      routes = append(routes, route)
    ...

    // 遍历所有的route
    for _, route := range routes {
        route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
          Group:   reqScope.Kind.Group,
          Version: reqScope.Kind.Version,
          Kind:    reqScope.Kind.Kind,
        })
        // 添加自定义扩展属性(k8s所有的扩展属性以x-打头)
        route.Metadata(ROUTE_META_ACTION, strings.ToLower(action.Verb))
        // 将route加入到WebService中
        ws.Route(route)
      }
  }
}

// handler初始化
func restfulCreateResource(r rest.Creater, scope handlers.RequestScope, admit admission.Interface) restful.RouteFunction {
  return func(req *restful.Request, res *restful.Response) {
    handlers.CreateResource(r, &scope, admit)(res.ResponseWriter, req.Request)
  }
}

// 返回一个处理资源的handler
func CreateResource(r rest.Creater, scope *RequestScope, admission admission.Interface) http.HandlerFunc {
  return createHandler(&namedCreaterAdapter{r}, scope, admission, false)
}

// 返回一个http标准库handler函数,处理对应的路由请求
func createHandler(r rest.NamedCreater, scope *RequestScope, admit admission.Interface, includeName bool) http.HandlerFunc {
  // http库标准的handler写法
  return func(w http.ResponseWriter, req *http.Request) {
    ...
    // 找到合适的SerializeInfo
    s, err := negotiation.NegotiateInputSerializer(req, false, scope.Serializer)
    ...
    // 寻找合适的编解码器
    decoder := scope.Serializer.DecoderToVersion(s.Serializer, scope.HubGroupVersion)

    // 解码
    obj, gvk, err := decoder.Decode(body, &defaultGVK, original)
    // 处理请求
    result, err := finishRequest(timeout, func() (runtime.Object, error) {
      ...
    })
    ...
  }
}

3.4 创建kubeapi-server服务

创建KubeAPIServer的流程与创建KubeAPIExtensionServer的流程类似,原理一样。包括:

  • 将//与资源存储对象进行映射并存储到APIGroupInfo的map中
  • 通过installer.install安装器为资源注册对应的handlers方法(即资源存储对象的ResourceStorage)
  • 完成资源与handlers方法的绑定,并构造Route添加到WebService
  • 最后将WebService添加到container中
func CreateKubeAPIServer(kubeAPIServerConfig *master.Config, delegateAPIServer genericapiserver.DelegationTarget) (*master.Master, error) {
  kubeAPIServer, err := kubeAPIServerConfig.Complete().New(delegateAPIServer)
  ...
}

func (c *Config) Complete() CompletedConfig {
  ...
  // 内部调用createEndpointReconciler
  if cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler == nil {
    cfg.ExtraConfig.EndpointReconcilerConfig.Reconciler = c.createEndpointReconciler()
  }

  return CompletedConfig{&cfg}
}

// createEndpointReconciler
func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
  switch c.ExtraConfig.EndpointReconcilerType {
  // there are numerous test dependencies that depend on a default controller
  case "", reconcilers.MasterCountReconcilerType:
    return c.createMasterCountReconciler()
  case reconcilers.LeaseEndpointReconcilerType:
    return c.createLeaseReconciler()
  case reconcilers.NoneEndpointReconcilerType:
    return c.createNoneReconciler()
  default:
    klog.Fatalf("Reconciler not implemented: %v", c.ExtraConfig.EndpointReconcilerType)
  }
  return nil
}

func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
  ...
  // 初始化Storage
  leaseStorage, _, err := storagefactory.Create(*config)
  ...
  return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases)
}

func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
  switch c.Type {
  ...
  case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
    // 初始化Storage
    return newETCD3Storage(c)
  ...
  }
}

func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) (*Master, error) {
  // 初始化kube-apiserver
  s, err := c.GenericConfig.New("kube-apiserver", delegationTarget)
  // 初始化Master,k8s的核心服务通过Master对象进行管理
  // 实例化后的对象才能注册KubeAPIServer下的资源
  m := &Master{
    GenericAPIServer:          s,
    ClusterAuthenticationInfo: c.ExtraConfig.ClusterAuthenticationInfo,
  }

  if c.ExtraConfig.APIResourceConfigSource.VersionEnabled(apiv1.SchemeGroupVersion) {
    // 注册没有组名的资源组,路径前缀为"/api"
    if err := m.InstallLegacyAPI(&c, c.GenericConfig.RESTOptionsGetter, legacyRESTStorageProvider); err != nil {
      return nil, err
    }
  }

 // 注册有组名的资源组,路径前缀为"/apis"
  if err := m.InstallAPIs(c.ExtraConfig.APIResourceConfigSource, c.GenericConfig.RESTOptionsGetter, restStorageProviders...); err != nil {
    return nil, err
  }

  m.GenericAPIServer.AddPostStartHookOrDie("start-cluster-authentication-info-controller", func(hookContext genericapiserver.PostStartHookContext) error {
    kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig)
    // 创建认证controller
    controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
    ...
    // 启动controller
    go controller.Run(1, hookContext.StopCh)
    return nil
  })

  return m, nil
}
3.4.1 InstallLegacyAPI
func (m *Master) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter, legacyRESTStorageProvider corerest.LegacyRESTStorageProvider) error {
  // 实例化APIGroupInfo
  // 生成各种资源对应的storage
  legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(restOptionsGetter)

  ...
  // 创建bootstrapController
  bootstrapController := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
  // 注册api
  if err := m.GenericAPIServer.InstallLegacyAPIGroup(genericapiserver.DefaultLegacyAPIPrefix, &apiGroupInfo); err != nil {
    return fmt.Errorf("error in registering group versions: %v", err)
  }
  return nil
}

// 通过NewStorage、NewRest等创建各种资源的存储,存放到map中
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
  ...
  restStorage := LegacyRESTStorage{}
  podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)
  podStorage, err := podstore.NewStorage(...)
  ...
  // NewStorage内部通过etcd客户端去操作etcd
  controllerStorage, err := controllerstore.NewStorage(restOptionsGetter)
  ...
  restStorageMap := map[string]rest.Storage{
    "pods":             podStorage.Pod,
    ...
    "replicationControllers":        controllerStorage.Controller,
    ...
  }
  ...
  return restStorage, apiGroupInfo, nil
}
3.4.2 InstallLegacyAPIGroup
func (s *GenericAPIServer) InstallLegacyAPIGroup(apiPrefix string, apiGroupInfo *APIGroupInfo) error {
  ...
  // 内部也是调用installAPIResources,这个函数前面介绍过
  if err := s.installAPIResources(apiPrefix, apiGroupInfo, openAPIModels); err != nil {
    return err
  }

  s.Handler.GoRestfulContainer.Add(discovery.NewLegacyRootAPIHandler(s.discoveryAddresses, s.Serializer, apiPrefix).WebService())
  return nil
}
3.4.2 InstallAPIs
func (m *Master) InstallAPIs(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter, restStorageProviders ...RESTStorageProvider) error {
  apiGroupsInfo := []*genericapiserver.APIGroupInfo{}

  // 遍历所有的provider
  for _, restStorageBuilder := range restStorageProviders {
    ...
    // 获取资源对应的Storage
    apiGroupInfo, enabled, err := restStorageBuilder.NewRESTStorage(apiResourceConfigSource, restOptionsGetter)
    ...
  }

  // InstallAPIGroups这个函数在前面已经分析过
  if err := m.GenericAPIServer.InstallAPIGroups(apiGroupsInfo...); err != nil {
    return fmt.Errorf("error in registering group versions: %v", err)
  }
  return nil
}

// RESTStorageProvider接口,每种资源都实现了该接口,并实现自己的业务逻辑
// 实现逻辑都大同小异,跟前面介绍的一样。都调用NewStorage、NewREST操作etcd
type RESTStorageProvider interface {
  GroupName() string
  NewRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (genericapiserver.APIGroupInfo, bool, error)
}

3.5 创建aggregator-server配置

func createAggregatorConfig(...) (*aggregatorapiserver.Config, error) {
  ...
  aggregatorConfig := &aggregatorapiserver.Config{
    GenericConfig: &genericapiserver.RecommendedConfig{
      Config:                genericConfig,
      SharedInformerFactory: externalInformers,
    },
    ExtraConfig: aggregatorapiserver.ExtraConfig{
      ProxyClientCertFile: commandOptions.ProxyClientCertFile,
      ProxyClientKeyFile:  commandOptions.ProxyClientKeyFile,
      ServiceResolver:     serviceResolver,
      ProxyTransport:      proxyTransport,
    },
  }

  return aggregatorConfig, nil
}

3.6 创建aggregator-server服务

创建AggregatorServer的流程与创建KubeAPIExtensionServer的流程类似。

func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
  // 初始化delegate
  aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
  ...
  // 创建autoRegistrationController
  autoRegistrationController := autoregister.NewAutoRegisterController(aggregatorServer.APIRegistrationInformers.Apiregistration().V1().APIServices(), apiRegistrationClient)
  apiServices := apiServicesToRegister(delegateAPIServer, autoRegistrationController)
  // 创建crdRegistrationController
  crdRegistrationController := crdregistration.NewCRDRegistrationController(
    apiExtensionInformers.Apiextensions().V1().CustomResourceDefinitions(),
    autoRegistrationController)

  err = aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error {
    // 启动crdRegistrationController
    go crdRegistrationController.Run(5, context.StopCh)
    go func() {
      if aggregatorConfig.GenericConfig.MergedResourceConfig.AnyVersionForGroupEnabled("apiextensions.k8s.io") {
        crdRegistrationController.WaitForInitialSync()
      }
      // 启动autoRegistrationController
      autoRegistrationController.Run(5, context.StopCh)
    }()
    return nil
  })
  ...
}

func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.DelegationTarget) (*APIAggregator, error) {
  // 创建kube-aggregator
  genericServer, err := c.GenericConfig.New("kube-aggregator", delegationTarget)
  // 初始化APIAggregator
  s := &APIAggregator{
    GenericAPIServer:           genericServer,
    delegateHandler:            delegationTarget.UnprotectedHandler(),
    proxyTransport:             c.ExtraConfig.ProxyTransport,
    proxyHandlers:              map[string]*proxyHandler{},
    handledGroups:              sets.String{},
    lister:                     informerFactory.Apiregistration().V1().APIServices().Lister(),
    APIRegistrationInformers:   informerFactory,
    serviceResolver:            c.ExtraConfig.ServiceResolver,
    openAPIConfig:              openAPIConfig,
    egressSelector:             c.GenericConfig.EgressSelector,
    proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
  }
  // 初始化Storage,逻辑和前面是一样的
  apiGroupInfo := apiservicerest.NewRESTStorage(c.GenericConfig.MergedResourceConfig, c.GenericConfig.RESTOptionsGetter)
  // 安装api,和前面也一样
 if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
    return nil, err
  }
  ...
}

3.7 创建GenericAPIServer-server

前面三个服务的创建过程,都依赖GenericAPIServer。通过genericapiserver将k8s资源与RestAPI进行映射

3.7.1 genericConfig实例化
func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*GenericAPIServer, error) {
  ...
  // 构造handler链
  handlerChainBuilder := func(handler http.Handler) http.Handler {
    return c.BuildHandlerChainFunc(handler, c.Config)
  }
  apiServerHandler := NewAPIServerHandler(name, c.Serializer, handlerChainBuilder, delegationTarget.UnprotectedHandler())

  s := &GenericAPIServer{
    ...
  }
  ...
  installAPI(s, c.Config)
  ...
}

// NewAPIServerHandler
func NewAPIServerHandler(name string, s runtime.NegotiatedSerializer, handlerChainBuilder HandlerChainBuilderFn, notFoundHandler http.Handler) *APIServerHandler {
  ...
  // 创建go-restful的container对象
  gorestfulContainer := restful.NewContainer()
  gorestfulContainer.ServeMux = http.NewServeMux()
  gorestfulContainer.Router(restful.CurlyRouter{}) // e.g. for proxy/{kind}/{name}/{*}
  gorestfulContainer.RecoverHandler(func(panicReason interface{}, httpWriter http.ResponseWriter) {
    logStackOnRecover(s, panicReason, httpWriter)
  })
  gorestfulContainer.ServiceErrorHandler(func(serviceErr restful.ServiceError, request *restful.Request, response *restful.Response) {
    serviceErrorHandler(s, serviceErr, request, response)
  })

  director := director{
    name:               name,
    goRestfulContainer: gorestfulContainer,
    nonGoRestfulMux:    nonGoRestfulMux,
  }
  // 创建handler
  return &APIServerHandler{
    FullHandlerChain:   handlerChainBuilder(director),
    GoRestfulContainer: gorestfulContainer,
    NonGoRestfulMux:    nonGoRestfulMux,
    Director:           director,
  }
}

3.8 启动服务

CreateServerChain的最后一步便是启动服务insecureServingInfo.Serve函数

func (s *DeprecatedInsecureServingInfo) Serve(handler http.Handler, shutdownTimeout time.Duration, stopCh <-chan struct{}) error {
  // 初始化http服务
  insecureServer := &http.Server{
    Addr:           s.Listener.Addr().String(),
    Handler:        handler,
    MaxHeaderBytes: 1 << 20,
  }

  ...
  // 启动服务,内部调用server.Serve(listener)
  _, err := RunServer(insecureServer, s.Listener, shutdownTimeout, stopCh)
  return err
}

func RunServer(
  server *http.Server,
  ln net.Listener,
  shutDownTimeout time.Duration,
  stopCh <-chan struct{},
) (<-chan struct{}, error) {
  ...
  go func() {
    ...
    // 通过go语言标准库server.Serve监听listener
    // 并在运行过程中为每个连接建立groutine,groutine读取请求,调用handler函数来处理并响应请求
    err := server.Serve(listener)
    ...
  }()

  return stoppedCh, nil
}

参考

  • 《kubernetes源码剖析》