阅读 43

Kubernetes JobController与JobInformer关联设计源码深入剖析-Kubernetes商业环境实战

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。

1 ControllerManager作为集群的管理控制中心使命

1.1 CreateControllerContext 使命一

  • 拿到对kube-APIserver中资源的操作句柄,创建控制器上下文(包含informor factory)
//1:拿到对kube-APIserver中资源的操作句柄,创建控制器上下文(包含informor factory)
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
	klog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
复制代码
  • CreateControllerContext的具体实现(包含informor factory)
// CreateControllerContext creates a context struct containing references to resources needed by the
// controllers such as the cloud provider and clientBuilder. rootClientBuilder is only used for
// the shared-informers client and token controller.
func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
	versionedClient := rootClientBuilder.ClientOrDie("shared-informers")

	sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())

	metadataClient := metadata.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("metadata-informers"))
	metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, ResyncPeriod(s)())

	// If apiserver is not running we should wait for some time and fail only then. This is particularly
	// important when we start apiserver and controller manager at the same time.
	if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
		return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
	}

	// Use a discovery client capable of being refreshed.
	discoveryClient := rootClientBuilder.ClientOrDie("controller-discovery")
	cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient.Discovery())
	restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
	go wait.Until(func() {
		restMapper.Reset()
	}, 30*time.Second, stop)

	availableResources, err := GetAvailableResources(rootClientBuilder)
	if err != nil {
		return ControllerContext{}, err
	}

	cloud, loopMode, err := createCloudProvider(s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
		s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
	if err != nil {
		return ControllerContext{}, err
	}

	ctx := ControllerContext{
		ClientBuilder:                   clientBuilder,
		InformerFactory:                 sharedInformers,
		ObjectOrMetadataInformerFactory: controller.NewInformerFactory(sharedInformers, metadataInformers),
		ComponentConfig:                 s.ComponentConfig,
		RESTMapper:                      restMapper,
		AvailableResources:              availableResources,
		Cloud:                           cloud,
		LoopMode:                        loopMode,
		Stop:                            stop,
		InformersStarted:                make(chan struct{}),
		ResyncPeriod:                    ResyncPeriod(s),
	}
	return ctx, nil
}
复制代码

1.2 StartControllers使命二

  • 初始化的所有控制器(包括apiserver的客户端,informer的回调函数等等)
   if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
   	klog.Fatalf("error starting controllers: %v", err)
   }
复制代码
  • 初始化所有的controllers,并封装成InitFunc函数
  • 循环遍历启动所有的Controller,注意这里启动Controller后,Contoller属于空转状态,并没有事件来处理,需要Informer来不断传递事件。例如:jobController不断地从Queue中取出事件来处理,需要jobInformer随后启动,但是Informer需要借助Factory启动,最后通过jobController自定义的EventHadler函数(可将事件放进jobController定义的Queue中)初始化jobInformer.EventHadler,最后由jobInformer.EventHadler将事件传递到jobController中的queue中。
  • 3: jobInformer使用JobController提供的AddEventHandler回调,把变化存进JobController的queue中。
  • 4: JobController.run 执行死循环,完成处理队列中的job事件。

11.3 controllerContext.InformerFactory.Start

  • 启动所有的informer,来辅助具体资源对应的Controller(如:JobController),来进行事件处理。
  • informer实际上就是三级缓冲前哨。
    //3:启动Informer,并完成Controller最终的启动以及资源监听机制(使用JobController的队列)
    controllerContext.InformerFactory.Start(controllerContext.Stop)
    
    // Start initializes all requested informers.
    func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    	f.lock.Lock()
    	defer f.lock.Unlock()
    
    	for informerType, informer := range f.informers {
    		if !f.startedInformers[informerType] {
    			go informer.Run(stopCh)
    			f.startedInformers[informerType] = true
    		}
    	}
}
复制代码

2 Controller与Infomer关联设计

2.1 SharedIndexInformer与Controller都是抽象的通用架构

  • 为了减少apiserver频繁访问,而设计的缓存、索引、无界队列、共享informer等机制
  • 因为彼此共用informer,但是每个组件的处理逻辑可能各不相同,在informer中通过观察者模式,各个组件可以注册一个EventHandler来实现业务逻辑的注入

2.1.1 抽象的通用架构源码逻辑

  • SharedIndexInformer与Controller都是抽象的,需要依赖于具体的资源类型的控制器内部实例化实现,如:DeploymentController,JobController等
  • SharedIndexInformer内部主要实现了通用的事件处理逻辑,但是缺乏具体的回调函数Eventhandler定义,需要JobController传入。如下图所示,Eventhandler是没有具体实现的。
  • SharedIndexInformer整体流程,主要干了四件事情:
    1:把DeltaFIFO给Controller,作为一级缓存
    2:检测cache变动,这一功能一般不使用
    3:事件回调函数
    4:调用conroller的Run方法
        
    func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
    	defer utilruntime.HandleCrash()
    
    	// DeltaFIFO
    	1:把DeltaFIFO给Controller,作为一级缓存
    	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
    
    	// 纽带 ListerWatcher/Process
    	cfg := &Config{
    		Queue:            fifo,
    		ListerWatcher:    s.listerWatcher,
    		ObjectType:       s.objectType,
    		FullResyncPeriod: s.resyncCheckPeriod,
    		RetryOnError:     false,
    		ShouldResync:     s.processor.shouldResync,
    
    		Process: s.HandleDeltas,
    	}
    
    	func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
    
    		s.controller = New(cfg)
    		s.controller.(*controller).clock = s.clock
    		s.started = true
    	}()
    
    	// Separate stop channel because Processor should be stopped strictly after controller
    	processorStopCh := make(chan struct{})
    	var wg wait.Group
    	defer wg.Wait()              // Wait for Processor to stop
    	defer close(processorStopCh) // Tell Processor to stop
    
    	2:检测cache变动,这一功能一般不使用
    	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
    
    	3:事件回调函数(根据自己定义的 EventHandler 来对这些 Event 做出处理)
    	wg.StartWithChannel(processorStopCh, s.processor.run)
    
    	defer func() {
    		s.startedLock.Lock()
    		defer s.startedLock.Unlock()
    		s.stopped = true // Don't want any new listeners
    	}()
    
    	4:调用conroller的Run方法(ProcessLoop把DeltaFIFO的东西,
    	  放到NewThreadSafeStore(indexer本地缓存中),由processorListener.run处理
    	  
    	//  controller核心逻辑: obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    	//  HandleDeltas(
            	FIFO(c.config.Queue.Pop)
            	->HandleDeltas(Process)(indexer.add/sharedIndexInformer.processor.distribute)
            	->processorListener.add(obj)
            	->本地缓存(addCh  chan interface{})
            	->pendingNotifications
            	->processorListener.run
            	->Eventhandler处理
                  )
    	s.controller.Run(stopCh)
    }
复制代码

2.1.2 JobInformer与SharedIndexInformer的亲密关系

  • JobInformer实例化是通过cache.NewSharedIndexInformer实现的
  • JobInformer通过Informer()进行注册到factory。
  • 如下是job的具体定义,里面主要包含Informer和ListWatch的具体定义,也是在这里关联上SharedIndexInformer的

2.2 jobController与jobInformer关联设计

2.2.1 总体思路

  • jobController中的queue队列<打通了>SharedIndexInformer的事件监听处理,纽带是EventHandler

  • 主要也是通过jobController自定义传入Eventhandler,初始化SharedIndexInformer的EventHandler,把事件传入到jobController中的queue队列中

  • 最终的job事件的处理,主要是通过jobController.run实现的,

  • SharedInformer主要实现的功能是事件监听及三级缓冲,最终处理还是要交给具体的jobController多线程处理

2.2.1 jobController借助jobInformer实现的事件处理机制

  • 1:SharedIndexInformer内部主要实现了通用的事件处理逻辑,但是缺乏具体的回调函数Eventhandler定义,需要JobController传入。如下图所示,Eventhandler是没有具体实现的。
  • 2:JobController 提供了一个队列:queue ,用于给 jobInformer中的refector(List|Watch)监听到的Job资源(Add,Update,Delete)事件,通过Eventhandler处理函数,放进JobController中定义的queue中,方便JobController作缓冲循环处理。
  • 2: jobInformer.Informer().AddEventHandler:自定义Eventhandler,初始化SharedIndexInformer的EventHandler,然后SharedIndexInformer把job的变化处理Event放进放进JobController中定义的queue中,最终JobController得到了Event变化队列
  • 从JobController中定义的queue中取出事件,进行处理

3 总结

  • jobController中的queue队列<打通了>SharedIndexInformer的事件监听处理,纽带是EventHandler

  • 主要也是通过jobController自定义传入Eventhandler,初始化SharedIndexInformer的EventHandler,把事件传入到jobController中的queue队列中

  • 最终的job事件的处理,主要是通过jobController.run实现的,

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。