kubeedge v1.2 新特性具体分析(附源码解析)

812 阅读4分钟

特性1:增加业务层消息发送的校验机制

云端发送状态同步消息到边缘时,边缘在接收并且持久化成功后,会回复状态同步成功的ACK消息给云端。如果云端未收到边缘状态同步成功的消息回复,则由业务层代码触发重传机制,重新进行状态同步。

  1. clouldHub -> handler.InitHandler(messageq) -> 启动HandleServer

  2. MessageAcs: sync.map

  3. clouldHub sendMesg 时会 mh.MessageAcks.Store(msg.GetID(), ackChan)

  4. 同时会监听这个ackChan,如果超时就重试

LOOP:
    for {
       select {
       case <-ackChan:
          mh.MessageAcks.Delete(msg.GetID())
          mh.saveSuccessPoint(copyMsg, info, nodeStore)
          break LOOP
       case <-ticker.C:
          if retry == 4 {
             break LOOP
          }
          mh.send(hi, info, msg)
          retry++
          ticker.Reset(time.Second * retryInterval)
       }
    }

5) cloudHub 收到返回消息后,会close掉ackChan,关闭重试。

if container.Message.Router.Operation == beehiveModel.ResponseOperation {
		if ackChan, ok := mh.MessageAcks.Load(container.Message.Header.ParentID); ok {
			close(ackChan.(chan struct{}))
		}
		return
	}
	

特性2:持久化云边协同消息状态

在云和边缘状态同步的过程中,云端会实时记录每个边缘节点同步成功的最新消息版本号(ResourceVersion),并以CRD的形式持久化保存到K8s中。该机制可以保证在边际场景下云端故障或者边缘离线重启后消息发送的顺序和连续性,避免重发旧消息引起云边状态不一致问题。

6) 当cloudHub close掉ackChan时,步骤5) 中 将会执行

mh.saveSuccessPoint(copyMsg, info, nodeStore)

  1. saveSuccessPoint中将会获取到最新版本号(ResourceVersion),并以CRD的形式持久化保存到K8s中
objectSyncStatus.Status.ObjectResourceVersion = msg.GetResourceVersion()
			mh.MessageQueue.ObjectSyncController.CrdClient.ReliablesyncsV1alpha1().ObjectSyncs(resourceNamespace).UpdateStatus(objectSyncStatus)

特性三:周期性检查同步云边数据,保持一致性。

在前两步的基础上,KubeEdge又在云端CloudCore中添加了新的模块SyncController,它主要负责周期性检查个边缘节点的同步状态,对比K8s中资源的信息,将不一致的状态同步到边缘,确保云边状态的最终一致性。

  1. synccontroller定时监听云上k8s的资源(调k8s.io/client-go),同步到边缘端。这里是只有云端对边缘端的同步。

其中定时执行sync 放在了一个不起眼的地方

go wait.Until(sctl.reconcile, 5*time.Second, beehiveContext.Done())

  1. reconcile中 执行

sctl.manageObjectSync(allObjectSyncs)

  1. manageObjectSync 中根据资源类型执行相应的资源同步
for _, sync := range syncs {
		switch sync.Spec.ObjectKind {
		case model.ResourceTypePod:
			sctl.managePod(sync)
		case model.ResourceTypeConfigmap:
			sctl.manageConfigMap(sync)
		case model.ResourceTypeSecret:
			sctl.manageSecret(sync)
		case commonconst.ResourceTypeService:
			sctl.manageService(sync)
		case commonconst.ResourceTypeEndpoints:
			sctl.manageEndpoint(sync)
		// TODO: add device here
		default:
			klog.Errorf("Unsupported object kind: %v", sync.Spec.ObjectKind)
		}
	}
  1. 比如 managePod中,获取到pod信息后通过beehive下发到cloudHub后同步到边缘端
    
pod, err := sctl.podLister.Pods(sync.Namespace).Get(sync.Spec.ObjectName)

	nodeName := getNodeName(sync.Name)

	if err != nil && apierrors.IsNotFound(err) {
		sendEvents(err, nodeName, sync.Namespace, sync.Spec.ObjectName, model.ResourceTypePod,
			"", "", nil)
		return
	}
	sendEvents(err, nodeName, sync.Namespace, sync.Spec.ObjectName, model.ResourceTypePod,
		pod.ResourceVersion, sync.Status.ObjectResourceVersion, pod)

特性4:自动注册边缘节点到云端

在v1.2版本中,KubeEdge供了边缘节点自动注册到云端的功能,并且默认开启该特性,以减少用户在安装使用KubeEdge时的操作步骤。用户可以通过修改EdgeCore的“registerNode”配置项来关闭该特性(将其设置为“false”即可)

1) edged启动的时候会执行 go utilwait.Until(e.syncNodeStatus, e.nodeStatusUpdateFrequency, utilwait.NeverStop)

  1. 即执行syncNodestatus这个函数
func (e *edged) syncNodeStatus() {
	if !e.registrationCompleted {
		if err := e.registerNode(); err != nil {
			klog.Errorf("Register node failed: %v", err)
		}
	} else {
		if err := e.updateNodeStatus(); err != nil {
			klog.Errorf("Unable to update node status: %v", err)
		}
	}
}
  1. 初始状态时 e.registrationCompleted == false ,需要执行registerNode()来注册节点,当registerNode == false 时,即关闭自动注册节点
if config.Get().RegisterNode == false {
		//when register-node set to false, do not auto register node
		klog.Infof("register-node is set to false")
		e.registrationCompleted = true
		return nil
	}

4) 自动注册Node即 发送节点信息至Edgehub

nodeInfoMsg := message.BuildMsg(modules.MetaGroup, "", modules.EdgedModuleName, resource, model.InsertOperation, node)
	res, err := beehiveContext.SendSync(edgehub.ModuleNameEdgeHub, *nodeInfoMsg, syncMsgRespTimeout)
	if err != nil || res.Content != "OK" {
		klog.Errorf("register node failed, error: %v", err)
		return err
	}

	klog.Infof("Successfully registered node %s", e.nodeName)
	e.registrationCompleted = true
  1. 由Edgehub组件发送注册信息至云端cloudHub来完成注册,当Edgehub连接cloudHub server时(比如websocket),如果是第一次连接,这时将会调用ConnNotify回调函数完成注册并将连接加入进连接池AddConnection。
func (srv *WSServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
	if srv.exOpts.Filter != nil {
		if filtered := srv.exOpts.Filter(w, req); filtered {
			klog.Warning("failed to filter req")
			return
		}
	}

	wsConn := srv.upgrade(w, req)
	if wsConn == nil {
		return
	}

	conn := conn.NewConnection(&conn.ConnectionOptions{
		ConnType: api.ProtocolTypeWS,
		Base:     wsConn,
		ConnUse:  api.UseType(req.Header.Get("ConnectionUse")),
		Consumer: srv.options.Consumer,
		Handler:  srv.options.Handler,
		CtrlLane: lane.NewLane(api.ProtocolTypeWS, wsConn),
		State: &conn.ConnectionState{
			State:   api.StatConnected,
			Headers: utils.DeepCopyHeader(req.Header),
		},
		AutoRoute: srv.options.AutoRoute,
	})

	// connection callback
	if srv.options.ConnNotify != nil {
		srv.options.ConnNotify(conn)
	}

	// connection manager
	if srv.options.ConnMgr != nil {
		srv.options.ConnMgr.AddConnection(conn)
	}

	// serve connection
	go conn.ServeConn()
}

6) 这里ConnNotify函数为 handler.CloudhubHandler.OnRegister

func startWebsocketServer() {
	tlsConfig := createTLSConfig(hubconfig.Get().Ca, hubconfig.Get().Cert, hubconfig.Get().Key)
	svc := server.Server{
		Type:       api.ProtocolTypeWS,
		TLSConfig:  &tlsConfig,
		AutoRoute:  true,
		ConnNotify: handler.CloudhubHandler.OnRegister,
		Addr:       fmt.Sprintf("%s:%d", hubconfig.Get().WebSocket.Address, hubconfig.Get().WebSocket.Port),
		ExOpts:     api.WSServerOption{Path: "/"},
	}
	klog.Infof("Startting cloudhub %s server", api.ProtocolTypeWS)
	svc.ListenAndServeTLS("", "")
}
  1. OnRegister 里完成相关node节点注册
// OnRegister register node on first connection
func (mh *MessageHandle) OnRegister(connection conn.Connection) {
	nodeID := connection.ConnectionState().Headers.Get("node_id")
	projectID := connection.ConnectionState().Headers.Get("project_id")

	if _, ok := mh.KeepaliveChannel[nodeID]; !ok {
		mh.KeepaliveChannel[nodeID] = make(chan struct{}, 1)
	}

	io := &hubio.JSONIO{Connection: connection}
	go mh.ServeConn(io, &model.HubInfo{ProjectID: projectID, NodeID: nodeID})
}
// ServeConn starts serving the incoming connection
func (mh *MessageHandle) ServeConn(hi hubio.CloudHubIO, info *model.HubInfo) {
	err := mh.RegisterNode(hi, info)
	if err != nil {
		klog.Errorf("fail to register node %s, reason %s", info.NodeID, err.Error())
		return
	}

	klog.Infof("edge node %s for project %s connected", info.NodeID, info.ProjectID)
	exitServe := make(chan ExitCode, 3)
	stopSendMsg := make(chan struct{})

	for _, handle := range mh.Handlers {
		go handle(hi, info, exitServe, stopSendMsg)
	}

	code := <-exitServe
	mh.UnregisterNode(hi, info, code)
}