Kubernetes DeltaFIFO数据结构及Reflector同步机制源码深入剖析-Kubernetes商业环境实战

1,132 阅读8分钟

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。

1 DeltaFIFO数据结构(仅追加变化数据)

  • Delta其实就是kubernetes系统中对象的变化(增、删、改、同步),FIFO比较好理解,是一个先入先出的队列,那么DeltaFIFO就是一个按序的(先入先出)kubernetes对象变化的队列。
  • DeltaFIFO内部一直追加变化数据,例如:(DeltaFIFO.Replace)同步一次,就向DeltaFIFO 是全量插入sync变量Delta。
  • 删除合并操作发生在最近两次更新的操作中,仅取其中之一。dedupDeltas()就是这种货色。
  • queueActionLocked就是DeltaFIFO的灵魂,任何的风吹草动(如:增、删、改、同步),都会放进items map[string]Deltas中。
items解释:
- items map[string] Deltas
- type Deltas []Delta  // Delta数组
- f.items表示Map集合
- f.items[id] 表示某一个key(资源对象)对应的数组集合,即:资源操作变化数组

// 代码源自client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
    lock sync.RWMutex             // 读写锁,因为涉及到同时读写,读写锁性能要高
    cond sync.Cond                // 给Pop()接口使用,在没有对象的时候可以阻塞,内部锁复用读写锁
    items map[string]Deltas       // 这个应该是Store的本质了,按照kv的方式存储对象,但是存储的是对象的Deltas数组
    queue []string                // 这个是为先入先出实现的,存储的就是对象的键
    populated bool                // 通过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
    initialPopulationCount int    // 通过Replace()接口将第一批对象放入队列的对象数量
    keyFunc KeyFunc               // 对象键计算函数,在Indexer那篇文章介绍过
    knownObjects KeyListerGetter  // 前面介绍就是为了这是用,该对象指向的就是Indexer,
    closed     bool               // 是否已经关闭的标记
    closedLock sync.Mutex         // 专为关闭设计的所,为什么不复用读写锁?
}
  • 一般是先追加,生成新变化数组,然后更新DeltaFIFO.items集合,类似:id:[add:obj1,update:obj2,delete:obj3]
    id, err := f.KeyOf(obj)  //得到obj对应的Map的key
    newDeltas := append(f.items[id], Delta{actionType, obj})  //追加,生成新数组
    f.items[id] = newDeltas   //更新DeltaFIFO.items集合
  • queueActionLocked最终会解决资源变化追加的问题,代码如下:
// 代码源自client-go/tools/cache/delta_fifo.go
// 从函数名称来看把“动作”放入队列中,这个动作就是DeltaType,而且已经加锁了
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
    // 前面提到的计算对象键的函数
    id, err := f.KeyOf(obj)
    if err != nil {
        return KeyError{obj, err}
    }
    // 如果是同步,并且对象未来会被删除,那么就直接返回,没必要记录这个动作了
    // 肯定有人会问为什么Add/Delete/Update这些动作可以,因为同步对于已经删除的对象是没有意义的
    // 已经删除的对象后续跟添加、更新有可能,因为同名的对象又被添加了,删除也是有可能
    // 删除有些复杂,后面会有说明
    if actionType == Sync && f.willObjectBeDeletedLocked(id) {
        return nil
    }
    // 同一个对象的多次操作,所以要追加到Deltas数组中
    newDeltas := append(f.items[id], Delta{actionType, obj})
    // 合并操作,去掉冗余的delta
    newDeltas = dedupDeltas(newDeltas)
    // 判断对象是否已经存在
    _, exists := f.items[id]
    // 合并后操作有可能变成没有Delta么?后面的代码分析来看应该不会,所以暂时不知道这个判断目的
    if len(newDeltas) > 0 {
        // 如果对象没有存在过,那就放入队列中,如果存在说明已经在queue中了,也就没必要再添加了
        if !exists {
            f.queue = append(f.queue, id)
        }
        // 更新Deltas数组,通知所有调用Pop()的人
        f.items[id] = newDeltas
        f.cond.Broadcast()
    } else if exists {
        // 直接把对象删除,这段代码我不知道什么条件会进来,因为dedupDeltas()肯定有返回结果的
        // 后面会有dedupDeltas()详细说明
        delete(f.items, id)
    }
    return nil
}
  • dedupDeltas 主要解决连续两个删除操作的变化的合并,id:[add:obj1,update:obj2,delete:obj3,delete:obj3],该函数会去掉最后一个,返回一个完整的f.items[id]: id:[add:obj1,update:obj2,delete:obj3]。最终对应的DeltaFIFO.items就是:Map( DeltaFIFO。key->id:[add:obj1,update:obj2,delete:obj3])
// 代码源自client-go/tools/cache/delta_fifo.go
func dedupDeltas(deltas Deltas) Deltas {
    n := len(deltas)
    if n < 2 {
        return deltas
    }
    // 取出最后两个
    a := &deltas[n-1]
    b := &deltas[n-2]
    // 判断如果是重复的,那就删除这两个delta把合并后的追加到Deltas数组尾部
    if out := isDup(a, b); out != nil {
        d := append(Deltas{}, deltas[:n-2]...)
        return append(d, *out)
    }
    return deltas
}
// 判断两个Delta是否是重复的
func isDup(a, b *Delta) *Delta {
    // 只有一个判断,只能判断是否为删除类操作,和我们上面的判断相同
    // 这个函数的本意应该还可以判断多种类型的重复,当前来看只能有删除这一种能够合并
    if out := isDeletionDup(a, b); out != nil {
        return out
    }
	
    return nil
}
// 判断是否为删除类的重复
func isDeletionDup(a, b *Delta) *Delta {
    // 二者都是删除那肯定有一个是重复的
    if b.Type != Deleted || a.Type != Deleted {
        return nil
    }
    // 理论上返回最后一个比较好,但是对象已经不再系统监控范围,前一个删除状态是好的
    if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
        return a
    }
    return b
}

2 DeltaFIFO.replace()(全量插入sync类型变量,并做Delete检测)

  • Reflector通过List(),拿到全量变化,然后调用Replace, 然后每一个item执行:if err := f.queueActionLocked(Sync, item)
  • replace操作还是追加变化量,类似于kafka的Append日志。
  • replace向DeltaFIFO 是全量插入sync变量,类似:append(f.items[id], Delta{Sync, obj}),并做Delete检测,以追加if err := f.queueActionLocked(Deleted, item)
// 代码源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    keys := make(sets.String, len(list))
    // 遍历所有的输入目标
    for _, item := range list {
        // 计算目标键
        key, err := f.KeyOf(item)
        if err != nil {
            return KeyError{item, err}
        }
        // 记录处理过的目标键,采用set存储,是为了后续快速查找
        keys.Insert(key)
        // 因为输入是目标全量,所以每个目标相当于重新同步了一次
        if err := f.queueActionLocked(Sync, item); err != nil {
            return fmt.Errorf("couldn't enqueue object: %v", err)
        }
    }
    // 如果没有存储的话,自己存储的就是所有的老对象,目的要看看那些老对象不在全量集合中,那么就是删除的对象了
    if f.knownObjects == nil {
        // 遍历所有的元素
        for k, oldItem := range f.items {
            // 这个目标在输入的对象中存在就可以忽略
            if keys.Has(k) {
                continue
            }
            // 输入对象中没有,说明对象已经被删除了。
            var deletedObj interface{}
            if n := oldItem.Newest(); n != nil {
                deletedObj = n.Object
            }
            // 终于看到哪里用到DeletedFinalStateUnknown了,队列中存储对象的Deltas数组中
            // 可能已经存在Delete了,避免重复,采用DeletedFinalStateUnknown这种类型
            if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
                return err
            }
        }
        
        // 如果populated还没有设置,说明是第一次并且还没有任何修改操作执行过
        if !f.populated {
            f.populated = true
            f.initialPopulationCount = len(list)  // 记录第一次通过来的对象数量
        }
 
        return nil
    }
    // 下面处理的就是检测某些目标删除但是Delta没有在队列中
    // 从存储中获取所有对象键
    knownKeys := f.knownObjects.ListKeys()
    queuedDeletions := 0
    for _, k := range knownKeys {
        // 对象还存在那就忽略
        if keys.Has(k) {
            continue
        }
        // 获取对象
        deletedObj, exists, err := f.knownObjects.GetByKey(k)
        if err != nil {
            deletedObj = nil
            glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
        } else if !exists {
            deletedObj = nil
            glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
        }
        // 累积删除的对象数量
        queuedDeletions++
        // 把对象删除的Delta放入队列
        if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
            return err
        }    
    }
    // 和上面的代码差不多,只是计算initialPopulationCount值的时候增加了删除对象的数量
    if !f.populated {
        f.populated = true
        f.initialPopulationCount = len(list) + queuedDeletions
    }
 
    return nil
}

3 何为同步?

period       time.Duration     // 反射器在List和Watch的时候理论上是死循环,只有出现错误才会退出
                                  这个变量用在出错后多长时间再执行List和Watch,默认值是1秒钟
resyncPeriod time.Duration     // 重新同步的周期,很多人肯定认为这个同步周期指的是从apiserver的同步周期
                                  其实这里面同步指的是shared_informer使用者需要定期同步全量对象
  • 周期同步,采用period,从apiserver的进行同步
  • 使用者需要定期同步全量对象
  • 这里发现是从indexr中,来进行数据同步的,knownObjects

4 knownObjects是谁(index)

  • 发现Reflector就是一个通过NEW封装的Controller,完成资源变化监听,并放进DeltaFIFO和Index队列中。
  • knownObjects是谁也一目了然,就是最终完成检索

5 HandleDeltas打通任督二脉

  • sharedIndexInformer.run -> controler.run-> wait.Until(c.processLoop, time.Second, stopCh)->obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))->Process: s.HandleDeltas ->更新Index,并发送通知Controller 来回调处理
  • sharedIndexInformer是什么?就是包含了两个重要步骤:1:Reflector来实现变化更新到DeltaFIFO,注意Reflector本身初始化是在Controller内部,2:Controller来实现更新Index,并发送通知Controller来处理,这个Controller也是(s.controller = New(cfg))。
  • sharedIndexInformer是什么?其实就是包含上述两个步骤,而最终呈现的就是s.controller.Run(stopCh)。
  • sharedIndexInformer是什么?其实就是对controller的配置,初始化了Config。该Config不仅用来初始化Reflector,也用来初始化controller。
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
	defer utilruntime.HandleCrash()

	fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)

	cfg := &Config{
		Queue:            fifo,
		ListerWatcher:    s.listerWatcher,
		ObjectType:       s.objectType,
		FullResyncPeriod: s.resyncCheckPeriod,
		RetryOnError:     false,
		ShouldResync:     s.processor.shouldResync,

		Process: s.HandleDeltas,
	}

	func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()

		s.controller = New(cfg)
		s.controller.(*controller).clock = s.clock
		s.started = true
	}()

	// Separate stop channel because Processor should be stopped strictly after controller
	processorStopCh := make(chan struct{})
	var wg wait.Group
	defer wg.Wait()              // Wait for Processor to stop
	defer close(processorStopCh) // Tell Processor to stop
	wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
	wg.StartWithChannel(processorStopCh, s.processor.run)

	defer func() {
		s.startedLock.Lock()
		defer s.startedLock.Unlock()
		s.stopped = true // Don't want any new listeners
	}()
	
	//初始化Reflector,更新Index, 并发送通知给Controller来回调处理
	s.controller.Run(stopCh)
}
  • 完成index的增删改操作,也即最终的变量同步。

6 sharedIndexInformer是什么?

  • sharedIndexInformer就是一个包装,初始化Reflector,也用来初始化controller。最终呈现给大家的就是controller.run。
  • sharedIndexInformer和controller是一一对应的。

7 总结

本文综合分析了Kubernetes 大量源码,试图从较高的视野来看问题,猛看表,一天时间就过去了。辛苦成文,各自珍惜,谢谢!

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。