专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。
1 ControllerManager作为集群的管理控制中心使命
1.1 CreateControllerContext 使命一
- 拿到对kube-APIserver中资源的操作句柄,创建控制器上下文(包含informor factory)
//1:拿到对kube-APIserver中资源的操作句柄,创建控制器上下文(包含informor factory)
controllerContext, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, ctx.Done())
if err != nil {
klog.Fatalf("error building controller context: %v", err)
}
saTokenControllerInitFunc := serviceAccountTokenControllerStarter{rootClientBuilder: rootClientBuilder}.startServiceAccountTokenController
- CreateControllerContext的具体实现(包含informor factory)
// CreateControllerContext creates a context struct containing references to resources needed by the
// controllers such as the cloud provider and clientBuilder. rootClientBuilder is only used for
// the shared-informers client and token controller.
func CreateControllerContext(s *config.CompletedConfig, rootClientBuilder, clientBuilder controller.ControllerClientBuilder, stop <-chan struct{}) (ControllerContext, error) {
versionedClient := rootClientBuilder.ClientOrDie("shared-informers")
sharedInformers := informers.NewSharedInformerFactory(versionedClient, ResyncPeriod(s)())
metadataClient := metadata.NewForConfigOrDie(rootClientBuilder.ConfigOrDie("metadata-informers"))
metadataInformers := metadatainformer.NewSharedInformerFactory(metadataClient, ResyncPeriod(s)())
// If apiserver is not running we should wait for some time and fail only then. This is particularly
// important when we start apiserver and controller manager at the same time.
if err := genericcontrollermanager.WaitForAPIServer(versionedClient, 10*time.Second); err != nil {
return ControllerContext{}, fmt.Errorf("failed to wait for apiserver being healthy: %v", err)
}
// Use a discovery client capable of being refreshed.
discoveryClient := rootClientBuilder.ClientOrDie("controller-discovery")
cachedClient := cacheddiscovery.NewMemCacheClient(discoveryClient.Discovery())
restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedClient)
go wait.Until(func() {
restMapper.Reset()
}, 30*time.Second, stop)
availableResources, err := GetAvailableResources(rootClientBuilder)
if err != nil {
return ControllerContext{}, err
}
cloud, loopMode, err := createCloudProvider(s.ComponentConfig.KubeCloudShared.CloudProvider.Name, s.ComponentConfig.KubeCloudShared.ExternalCloudVolumePlugin,
s.ComponentConfig.KubeCloudShared.CloudProvider.CloudConfigFile, s.ComponentConfig.KubeCloudShared.AllowUntaggedCloud, sharedInformers)
if err != nil {
return ControllerContext{}, err
}
ctx := ControllerContext{
ClientBuilder: clientBuilder,
InformerFactory: sharedInformers,
ObjectOrMetadataInformerFactory: controller.NewInformerFactory(sharedInformers, metadataInformers),
ComponentConfig: s.ComponentConfig,
RESTMapper: restMapper,
AvailableResources: availableResources,
Cloud: cloud,
LoopMode: loopMode,
Stop: stop,
InformersStarted: make(chan struct{}),
ResyncPeriod: ResyncPeriod(s),
}
return ctx, nil
}
1.2 StartControllers使命二
- 初始化的所有控制器(包括apiserver的客户端,informer的回调函数等等)
if err := StartControllers(controllerContext, saTokenControllerInitFunc, NewControllerInitializers(controllerContext.LoopMode), unsecuredMux); err != nil {
klog.Fatalf("error starting controllers: %v", err)
}
- 初始化所有的controllers,并封装成InitFunc函数
- 循环遍历启动所有的Controller,注意这里启动Controller后,Contoller属于空转状态,并没有事件来处理,需要Informer来不断传递事件。例如:jobController不断地从Queue中取出事件来处理,需要jobInformer随后启动,但是Informer需要借助Factory启动,最后通过jobController自定义的EventHadler函数(可将事件放进jobController定义的Queue中)初始化jobInformer.EventHadler,最后由jobInformer.EventHadler将事件传递到jobController中的queue中。
- 3: jobInformer使用JobController提供的AddEventHandler回调,把变化存进JobController的queue中。
- 4: JobController.run 执行死循环,完成处理队列中的job事件。
11.3 controllerContext.InformerFactory.Start
- 启动所有的informer,来辅助具体资源对应的Controller(如:JobController),来进行事件处理。
- informer实际上就是三级缓冲前哨。
//3:启动Informer,并完成Controller最终的启动以及资源监听机制(使用JobController的队列)
controllerContext.InformerFactory.Start(controllerContext.Stop)
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
2 Controller与Infomer关联设计
2.1 SharedIndexInformer与Controller都是抽象的通用架构
- 为了减少apiserver频繁访问,而设计的缓存、索引、无界队列、共享informer等机制
- 因为彼此共用informer,但是每个组件的处理逻辑可能各不相同,在informer中通过观察者模式,各个组件可以注册一个EventHandler来实现业务逻辑的注入
2.1.1 抽象的通用架构源码逻辑
- SharedIndexInformer与Controller都是抽象的,需要依赖于具体的资源类型的控制器内部实例化实现,如:DeploymentController,JobController等
- SharedIndexInformer内部主要实现了通用的事件处理逻辑,但是缺乏具体的回调函数Eventhandler定义,需要JobController传入。如下图所示,Eventhandler是没有具体实现的。
- SharedIndexInformer整体流程,主要干了四件事情:
1:把DeltaFIFO给Controller,作为一级缓存
2:检测cache变动,这一功能一般不使用
3:事件回调函数
4:调用conroller的Run方法
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// DeltaFIFO
1:把DeltaFIFO给Controller,作为一级缓存
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 纽带 ListerWatcher/Process
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
2:检测cache变动,这一功能一般不使用
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
3:事件回调函数(根据自己定义的 EventHandler 来对这些 Event 做出处理)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
4:调用conroller的Run方法(ProcessLoop把DeltaFIFO的东西,
放到NewThreadSafeStore(indexer本地缓存中),由processorListener.run处理
// controller核心逻辑: obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
// HandleDeltas(
FIFO(c.config.Queue.Pop)
->HandleDeltas(Process)(indexer.add/sharedIndexInformer.processor.distribute)
->processorListener.add(obj)
->本地缓存(addCh chan interface{})
->pendingNotifications
->processorListener.run
->Eventhandler处理
)
s.controller.Run(stopCh)
}
2.1.2 JobInformer与SharedIndexInformer的亲密关系
- JobInformer实例化是通过cache.NewSharedIndexInformer实现的
- JobInformer通过Informer()进行注册到factory。
- 如下是job的具体定义,里面主要包含Informer和ListWatch的具体定义,也是在这里关联上SharedIndexInformer的
2.2 jobController与jobInformer关联设计
2.2.1 总体思路
-
jobController中的queue队列<打通了>SharedIndexInformer的事件监听处理,纽带是EventHandler
-
主要也是通过jobController自定义传入Eventhandler,初始化SharedIndexInformer的EventHandler,把事件传入到jobController中的queue队列中
-
最终的job事件的处理,主要是通过jobController.run实现的,
-
SharedInformer主要实现的功能是事件监听及三级缓冲,最终处理还是要交给具体的jobController多线程处理
2.2.1 jobController借助jobInformer实现的事件处理机制
- 1:SharedIndexInformer内部主要实现了通用的事件处理逻辑,但是缺乏具体的回调函数Eventhandler定义,需要JobController传入。如下图所示,Eventhandler是没有具体实现的。
- 2:JobController 提供了一个队列:queue ,用于给 jobInformer中的refector(List|Watch)监听到的Job资源(Add,Update,Delete)事件,通过Eventhandler处理函数,放进JobController中定义的queue中,方便JobController作缓冲循环处理。
- 2: jobInformer.Informer().AddEventHandler:自定义Eventhandler,初始化SharedIndexInformer的EventHandler,然后SharedIndexInformer把job的变化处理Event放进放进JobController中定义的queue中,最终JobController得到了Event变化队列
- 从JobController中定义的queue中取出事件,进行处理
3 总结
-
jobController中的queue队列<打通了>SharedIndexInformer的事件监听处理,纽带是EventHandler
-
主要也是通过jobController自定义传入Eventhandler,初始化SharedIndexInformer的EventHandler,把事件传入到jobController中的queue队列中
-
最终的job事件的处理,主要是通过jobController.run实现的,
专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。