阅读 26

Kubernetes 控制器核心SharedInformer源码深入剖析(二)-Kubernetes商业环境实战

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。

1. SharedInformer核心功能详解

  • List-Watch 的代码实现主要在:k8s.io/client-go/tools/cache 中。
  • SharedInformer 是万恶之源,先后构建了NewDeltaFIFO,Controller,HandleDeltas,sharedProcessor->processorListener处理器,并最后驱动Controller.run。它会创建一个 DeltaFIFO ,然后初始化一个 Controller ,指明它使用的 store 是刚才创建的 DeltaFIFO,它使用的 Process 是 sharedInformer 的 HandleDeltas 。它主要干了四件事情:
  • 1:把DeltaFIFO给Controller,作为一级缓存
  • 2:检测cache变动,这一功能一般不使用
  • 3:事件回调函数
  • 4:调用conroller的Run方法
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	// DeltaFIFO
	1:把DeltaFIFO给Controller,作为一级缓存
	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

	// 纽带 ListerWatcher/Process
	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop

	2:检测cache变动,这一功能一般不使用
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)

	3:事件回调函数(根据自己定义的 EventHandler 来对这些 Event 做出处理)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()

	4:调用conroller的Run方法(ProcessLoop把DeltaFIFO的东西,
	  放到NewThreadSafeStore(indexer本地缓存中),由processorListener.run处理
	  
	//  controller核心逻辑: obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
	//  HandleDeltas(
        	FIFO(c.config.Queue.Pop)
        	->HandleDeltas(Process)(indexer.add/sharedIndexInformer.processor.distribute)
        	->processorListener.add(obj)
        	->本地缓存(addCh  chan interface{})
        	->pendingNotifications
        	->processorListener.run
        	->Eventhandler处理
              )
	s.controller.Run(stopCh)
}
复制代码

2. SharedInformer处理逻辑核心架构图

这个图更加清晰得展现了,我们一直在说的三个缓存的位置和作用:

    1. DeltaFIFO:ListWatch 事件最开始存储的地方。
    1. ThreadSafeStore(indexer):一个线程安全缓存,存储了数据以供其他 controller的Lister 方法调用。
    1. pendingNotifications(RingGrowing):事件在调用前的最后缓存。

2. 总结

  • SharedIndexInformer与Controller都是抽象的,需要依赖于具体的资源类型的控制器内部实例化实现,如:DeploymentController,JobController等

  • SharedIndexInformer内部主要实现了通用的事件处理逻辑,但是缺乏具体的回调函数Eventhandler定义,需要JobController传入,Eventhandler是没有具体实现的。

  • SharedInformer主要实现的功能是事件监听及三级缓冲,最终处理还是要交给具体的jobController多线程处理

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。