阅读 61

Kubernetes kube-proxy iptables 模式深入剖析(二)-Kubernetes商业环境实战

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号,或请转发邮件至1120746959@qq.com。

1 kube-proxy技术原理

1.1 Iptables 代理模式

  • kube-proxy 会监视 Kubernetes master 对 Service 对象和 Endpoints 对象的添加和移除。 对每个 Service,它会配置 iptables 规则,从而捕获到达该 Service 的 clusterIP(虚拟 IP)和端口的 请求,进而将请求重定向到 Service 的一组 backend 中的某个上面。
  • 默认的策略是,随机选择一个 backend。 实现基于客户端 IP 的会话亲和性,可以将 service.spec.sessionAffinity 的值设置为 "ClientIP" (默认值为 "None")。
iptables -t nat -nvL KUBE-SERVICES
iptables –t nat –nvL KUBE-SVC-YREYKMMDZGMSMDZU
iptables –t nat –nvL KUBE-SEP-*
复制代码

1.2 IPVS 代理模式

  • 与iptables类似,ipvs基于netfilter 的 hook 功能,但使用哈希表作为底层数据结构并在内核空间中工作。这意味着ipvs可以更快地重定向流量,并且在同步代理规则时具有更好的性能。
  • ipvs为负载均衡算法提供了更多选项,例如: rr:轮询调度 lc:最小连接数 dh:目标哈希 sh:源哈希 sed:最短期望延迟 nq: 不排队调度

2 kube-proxy源码分析

2.1 主要框架

  • kube-proxy是kubernetes中用于实现service与pod之间流量转发的组件。当我们向一个service发送数据包时,实际的接收者是service代理的后端pod,这一功能就是由kube-proxy实现的。
  • 与其它组件一样,kube-proxy的入口函数位于cmd中,具体位置在cmd/kube-proxy/proxy.go,一样是采用了cobra方法:
func main() {
	command := app.NewProxyCommand()
	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}
复制代码
  • 进入NewProxyCommand方法,其核心仍在于Run方法。进入Run方法,就是生成了一个ProxyServer对象,具体位置在cmd/kube-proxy/sever.go并运行。
// Run runs the specified ProxyServer.
func (o *Options) Run() error {
	defer close(o.errCh)
	if len(o.WriteConfigTo) > 0 {
		return o.writeConfigFile()
	}

	proxyServer, err := NewProxyServer(o)
	if err != nil {
		return err
	}

	if o.CleanupAndExit {
		return proxyServer.CleanupAndExit()
	}

	o.proxyServer = proxyServer
	return o.runLoop()
}
复制代码
  • NewProxyServer方法调用了私有的newProxyServer方法。方法位于app包中,对于不同的操作系统,会执行server_windows.go或者server_others.go中的同名方法。这里只分析linux下的:
  • 最重要的在于判断kube-proxy的运行模式,并进行相应处理,生成对应的ProxyServer结构体。目前,kubernetes广泛使用的是iptables模式,因此这里以iptables为例,另外两个模式略过。
  • 可以看到,在iptables模式下,会调用iptables包中的NewProxier方法,生成适用于iptables的proxier。此外,eventhandler也都配置成这个proxier。最后,将处理后的字段填入ProxyServer结构体中,并返回。
  • newProxier方法相对直观,就是生成一个proxier并返回。如注释中所说,proxier会及时维护iptables的状态,确保iptables数据始终处于最新:
const (
	ProxyModeUserspace   ProxyMode = "userspace"
	ProxyModeIPTables    ProxyMode = "iptables"
	ProxyModeIPVS        ProxyMode = "ipvs"
	ProxyModeKernelspace ProxyMode = "kernelspace"
)

func newProxyServer(
	config *proxyconfigapi.KubeProxyConfiguration,
	cleanupAndExit bool,
	master string) (*ProxyServer, error) {

	if config == nil {
		return nil, errors.New("config is required")
	}

	if c, err := configz.New(proxyconfigapi.GroupName); err == nil {
		c.Set(config)
	} else {
		return nil, fmt.Errorf("unable to register configz: %s", err)
	}

	protocol := utiliptables.ProtocolIpv4
	if net.ParseIP(config.BindAddress).To4() == nil {
		klog.V(0).Infof("IPv6 bind address (%s), assume IPv6 operation", config.BindAddress)
		protocol = utiliptables.ProtocolIpv6
	}

	var iptInterface utiliptables.Interface
	var ipvsInterface utilipvs.Interface
	var kernelHandler ipvs.KernelHandler
	var ipsetInterface utilipset.Interface
	var dbus utildbus.Interface

	// Create a iptables utils.
	execer := exec.New()

	dbus = utildbus.New()
	iptInterface = utiliptables.New(execer, dbus, protocol)
	kernelHandler = ipvs.NewLinuxKernelHandler()
	ipsetInterface = utilipset.New(execer)
	canUseIPVS, _ := ipvs.CanUseIPVSProxier(kernelHandler, ipsetInterface)
	if canUseIPVS {
		ipvsInterface = utilipvs.New(execer)
	}

	// We omit creation of pretty much everything if we run in cleanup mode
	if cleanupAndExit {
		return &ProxyServer{
			execer:         execer,
			IptInterface:   iptInterface,
			IpvsInterface:  ipvsInterface,
			IpsetInterface: ipsetInterface,
		}, nil
	}

	client, eventClient, err := createClients(config.ClientConnection, master)
	if err != nil {
		return nil, err
	}

	// Create event recorder
	hostname, err := utilnode.GetHostname(config.HostnameOverride)
	if err != nil {
		return nil, err
	}
	eventBroadcaster := record.NewBroadcaster()
	recorder := eventBroadcaster.NewRecorder(proxyconfigscheme.Scheme, v1.EventSource{Component: "kube-proxy", Host: hostname})

	nodeRef := &v1.ObjectReference{
		Kind:      "Node",
		Name:      hostname,
		UID:       types.UID(hostname),
		Namespace: "",
	}

	var healthzServer *healthcheck.HealthzServer
	var healthzUpdater healthcheck.HealthzUpdater
	if len(config.HealthzBindAddress) > 0 {
		healthzServer = healthcheck.NewDefaultHealthzServer(config.HealthzBindAddress, 2*config.IPTables.SyncPeriod.Duration, recorder, nodeRef)
		healthzUpdater = healthzServer
	}

	var proxier proxy.Provider

	proxyMode := getProxyMode(string(config.Mode), kernelHandler, ipsetInterface, iptables.LinuxKernelCompatTester{})
	nodeIP := net.ParseIP(config.BindAddress)
	if nodeIP.IsUnspecified() {
		nodeIP = utilnode.GetNodeIP(client, hostname)
		if nodeIP == nil {
			return nil, fmt.Errorf("unable to get node IP for hostname %s", hostname)
		}
	}
	if proxyMode == proxyModeIPTables {
		klog.V(0).Info("Using iptables Proxier.")
		if config.IPTables.MasqueradeBit == nil {
			// MasqueradeBit must be specified or defaulted.
			return nil, fmt.Errorf("unable to read IPTables MasqueradeBit from config")
		}

		// TODO this has side effects that should only happen when Run() is invoked.
		proxier, err = iptables.NewProxier(
			iptInterface,
			utilsysctl.New(),
			execer,
			config.IPTables.SyncPeriod.Duration,
			config.IPTables.MinSyncPeriod.Duration,
			config.IPTables.MasqueradeAll,
			int(*config.IPTables.MasqueradeBit),
			config.ClusterCIDR,
			hostname,
			nodeIP,
			recorder,
			healthzUpdater,
			config.NodePortAddresses,
		)
		if err != nil {
			return nil, fmt.Errorf("unable to create proxier: %v", err)
		}
		metrics.RegisterMetrics()
	} else if proxyMode == proxyModeIPVS {
		klog.V(0).Info("Using ipvs Proxier.")
		proxier, err = ipvs.NewProxier(
			iptInterface,
			ipvsInterface,
			ipsetInterface,
			utilsysctl.New(),
			execer,
			config.IPVS.SyncPeriod.Duration,
			config.IPVS.MinSyncPeriod.Duration,
			config.IPVS.ExcludeCIDRs,
			config.IPVS.StrictARP,
			config.IPTables.MasqueradeAll,
			int(*config.IPTables.MasqueradeBit),
			config.ClusterCIDR,
			hostname,
			nodeIP,
			recorder,
			healthzServer,
			config.IPVS.Scheduler,
			config.NodePortAddresses,
		)
		if err != nil {
			return nil, fmt.Errorf("unable to create proxier: %v", err)
		}
		metrics.RegisterMetrics()
	} else {
		klog.V(0).Info("Using userspace Proxier.")

		// TODO this has side effects that should only happen when Run() is invoked.
		proxier, err = userspace.NewProxier(
			userspace.NewLoadBalancerRR(),
			net.ParseIP(config.BindAddress),
			iptInterface,
			execer,
			*utilnet.ParsePortRangeOrDie(config.PortRange),
			config.IPTables.SyncPeriod.Duration,
			config.IPTables.MinSyncPeriod.Duration,
			config.UDPIdleTimeout.Duration,
			config.NodePortAddresses,
		)
		if err != nil {
			return nil, fmt.Errorf("unable to create proxier: %v", err)
		}
	}

	iptInterface.AddReloadFunc(proxier.Sync)

	return &ProxyServer{
		Client:                 client,
		EventClient:            eventClient,
		IptInterface:           iptInterface,
		IpvsInterface:          ipvsInterface,
		IpsetInterface:         ipsetInterface,
		execer:                 execer,
		Proxier:                proxier,     //  不同模式代理
		Broadcaster:            eventBroadcaster,
		Recorder:               recorder,
		ConntrackConfiguration: config.Conntrack,
		Conntracker:            &realConntracker{},
		ProxyMode:              proxyMode,
		NodeRef:                nodeRef,
		MetricsBindAddress:     config.MetricsBindAddress,
		EnableProfiling:        config.EnableProfiling,
		OOMScoreAdj:            config.OOMScoreAdj,
		ConfigSyncPeriod:       config.ConfigSyncPeriod.Duration,
		HealthzServer:          healthzServer,
	}, nil
}
复制代码
  • 定义proxier的syncRunner字段,即proxier的具体运行逻辑
// NewProxier returns a new Proxier given an iptables and ipvs Interface instance.
// Because of the iptables and ipvs logic, it is assumed that there is only a single Proxier active on a machine.
// An error will be returned if it fails to update or acquire the initial lock.
// Once a proxier is created, it will keep iptables and ipvs rules up to date in the background and
// will not terminate if a particular iptables or ipvs call fails.

func NewProxier(ipt utiliptables.Interface,
	ipvs utilipvs.Interface,
	ipset utilipset.Interface,
	sysctl utilsysctl.Interface,
	exec utilexec.Interface,
	syncPeriod time.Duration,
	minSyncPeriod time.Duration,
	excludeCIDRs []string,
	strictARP bool,
	masqueradeAll bool,
	masqueradeBit int,
	clusterCIDR string,
	hostname string,
	nodeIP net.IP,
	recorder record.EventRecorder,
	healthzServer healthcheck.HealthzUpdater,
	scheduler string,
	nodePortAddresses []string,
) (*Proxier, error) {
	// Set the route_localnet sysctl we need for
	if val, _ := sysctl.GetSysctl(sysctlRouteLocalnet); val != 1 {
		if err := sysctl.SetSysctl(sysctlRouteLocalnet, 1); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlRouteLocalnet, err)
		}
	}

	// Proxy needs br_netfilter and bridge-nf-call-iptables=1 when containers
	// are connected to a Linux bridge (but not SDN bridges).  Until most
	// plugins handle this, log when config is missing
	if val, err := sysctl.GetSysctl(sysctlBridgeCallIPTables); err == nil && val != 1 {
		klog.Infof("missing br-netfilter module or unset sysctl br-nf-call-iptables; proxy may not work as intended")
	}

	// Set the conntrack sysctl we need for
	if val, _ := sysctl.GetSysctl(sysctlVSConnTrack); val != 1 {
		if err := sysctl.SetSysctl(sysctlVSConnTrack, 1); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlVSConnTrack, err)
		}
	}

	// Set the connection reuse mode
	if val, _ := sysctl.GetSysctl(sysctlConnReuse); val != 0 {
		if err := sysctl.SetSysctl(sysctlConnReuse, 0); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlConnReuse, err)
		}
	}

	// Set the expire_nodest_conn sysctl we need for
	if val, _ := sysctl.GetSysctl(sysctlExpireNoDestConn); val != 1 {
		if err := sysctl.SetSysctl(sysctlExpireNoDestConn, 1); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireNoDestConn, err)
		}
	}

	// Set the expire_quiescent_template sysctl we need for
	if val, _ := sysctl.GetSysctl(sysctlExpireQuiescentTemplate); val != 1 {
		if err := sysctl.SetSysctl(sysctlExpireQuiescentTemplate, 1); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlExpireQuiescentTemplate, err)
		}
	}

	// Set the ip_forward sysctl we need for
	if val, _ := sysctl.GetSysctl(sysctlForward); val != 1 {
		if err := sysctl.SetSysctl(sysctlForward, 1); err != nil {
			return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlForward, err)
		}
	}

	if strictARP {
		// Set the arp_ignore sysctl we need for
		if val, _ := sysctl.GetSysctl(sysctlArpIgnore); val != 1 {
			if err := sysctl.SetSysctl(sysctlArpIgnore, 1); err != nil {
				return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpIgnore, err)
			}
		}

		// Set the arp_announce sysctl we need for
		if val, _ := sysctl.GetSysctl(sysctlArpAnnounce); val != 2 {
			if err := sysctl.SetSysctl(sysctlArpAnnounce, 2); err != nil {
				return nil, fmt.Errorf("can't set sysctl %s: %v", sysctlArpAnnounce, err)
			}
		}
	}

	// Generate the masquerade mark to use for SNAT rules.
	masqueradeValue := 1 << uint(masqueradeBit)
	masqueradeMark := fmt.Sprintf("%#08x/%#08x", masqueradeValue, masqueradeValue)

	if nodeIP == nil {
		klog.Warningf("invalid nodeIP, initializing kube-proxy with 127.0.0.1 as nodeIP")
		nodeIP = net.ParseIP("127.0.0.1")
	}

	isIPv6 := utilnet.IsIPv6(nodeIP)

	klog.V(2).Infof("nodeIP: %v, isIPv6: %v", nodeIP, isIPv6)

	if len(clusterCIDR) == 0 {
		klog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic")
	} else if utilnet.IsIPv6CIDRString(clusterCIDR) != isIPv6 {
		return nil, fmt.Errorf("clusterCIDR %s has incorrect IP version: expect isIPv6=%t", clusterCIDR, isIPv6)
	}

	if len(scheduler) == 0 {
		klog.Warningf("IPVS scheduler not specified, use %s by default", DefaultScheduler)
		scheduler = DefaultScheduler
	}

	healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps

	proxier := &Proxier{
		portsMap:              make(map[utilproxy.LocalPort]utilproxy.Closeable),
		serviceMap:            make(proxy.ServiceMap),
		serviceChanges:        proxy.NewServiceChangeTracker(newServiceInfo, &isIPv6, recorder),
		endpointsMap:          make(proxy.EndpointsMap),
		endpointsChanges:      proxy.NewEndpointChangeTracker(hostname, nil, &isIPv6, recorder),
		syncPeriod:            syncPeriod,
		minSyncPeriod:         minSyncPeriod,
		excludeCIDRs:          parseExcludedCIDRs(excludeCIDRs),
		iptables:              ipt,
		masqueradeAll:         masqueradeAll,
		masqueradeMark:        masqueradeMark,
		exec:                  exec,
		clusterCIDR:           clusterCIDR,
		hostname:              hostname,
		nodeIP:                nodeIP,
		portMapper:            &listenPortOpener{},
		recorder:              recorder,
		healthChecker:         healthChecker,
		healthzServer:         healthzServer,
		ipvs:                  ipvs,
		ipvsScheduler:         scheduler,
		ipGetter:              &realIPGetter{nl: NewNetLinkHandle(isIPv6)},
		iptablesData:          bytes.NewBuffer(nil),
		filterChainsData:      bytes.NewBuffer(nil),
		natChains:             bytes.NewBuffer(nil),
		natRules:              bytes.NewBuffer(nil),
		filterChains:          bytes.NewBuffer(nil),
		filterRules:           bytes.NewBuffer(nil),
		netlinkHandle:         NewNetLinkHandle(isIPv6),
		ipset:                 ipset,
		nodePortAddresses:     nodePortAddresses,
		networkInterfacer:     utilproxy.RealNetwork{},
		gracefuldeleteManager: NewGracefulTerminationManager(ipvs),
	}
	// initialize ipsetList with all sets we needed
	proxier.ipsetList = make(map[string]*IPSet)
	for _, is := range ipsetInfo {
		proxier.ipsetList[is.name] = NewIPSet(ipset, is.name, is.setType, isIPv6, is.comment)
	}
	burstSyncs := 2
	klog.V(3).Infof("minSyncPeriod: %v, syncPeriod: %v, burstSyncs: %d", minSyncPeriod, syncPeriod, burstSyncs)
	
	//这里定义了proxier的syncRunner字段,即proxier的具体运行逻辑
	proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
	proxier.gracefuldeleteManager.Run()
	return proxier, nil
}
复制代码

2.2 ProxyServer运行

2.2.1 主要脉络

  • 具体位置在cmd/kube-proxy/sever.go
// runLoop will watch on the update change of the proxy server's configuration file.
// Return an error when updated
func (o *Options) runLoop() error {
	if o.watcher != nil {
		o.watcher.Run()
	}

	// run the proxy in goroutine
	go func() {
		err := o.proxyServer.Run()
		o.errCh <- err
	}()

	for {
		err := <-o.errCh
		if err != nil {
			return err
		}
	}
}
复制代码
  • 添加健康检查、监测等前置处理,略过。重点在于后半部分:
  • 创建service和endpoint的informer,并运行,即通过这两个informer来及时获取集群中service和endpoint资源的变化。
  • go serviceConfig.Run(wait.NeverStop)
  • go endpointsConfig.Run(wait.NeverStop)
  • 调用birthCry方法。这个方法没什么特别的,就是记录一个kube-proxy启动的事件。
  • 调用SyncLoop方法,持续运行Proxier。
  • s.Proxier.SyncLoop()
  • 具体位置在cmd/kube-proxy/sever.go
// Run runs the specified ProxyServer.  This should never exit (unless CleanupAndExit is set).
// TODO: At the moment, Run() cannot return a nil error, otherwise it's caller will never exit. Update callers of Run to handle nil errors.
func (s *ProxyServer) Run() error {
	// To help debugging, immediately log version
	klog.Infof("Version: %+v", version.Get())

	// TODO(vmarmol): Use container config for this.
	var oomAdjuster *oom.OOMAdjuster
	if s.OOMScoreAdj != nil {
		oomAdjuster = oom.NewOOMAdjuster()
		if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*s.OOMScoreAdj)); err != nil {
			klog.V(2).Info(err)
		}
	}

	if s.Broadcaster != nil && s.EventClient != nil {
		s.Broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: s.EventClient.Events("")})
	}

	// Start up a healthz server if requested
	if s.HealthzServer != nil {
		s.HealthzServer.Run()
	}

	// Start up a metrics server if requested
	if len(s.MetricsBindAddress) > 0 {
		proxyMux := mux.NewPathRecorderMux("kube-proxy")
		healthz.InstallHandler(proxyMux)
		proxyMux.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) {
			fmt.Fprintf(w, "%s", s.ProxyMode)
		})
		proxyMux.Handle("/metrics", prometheus.Handler())
		if s.EnableProfiling {
			routes.Profiling{}.Install(proxyMux)
		}
		configz.InstallHandler(proxyMux)
		go wait.Until(func() {
			err := http.ListenAndServe(s.MetricsBindAddress, proxyMux)
			if err != nil {
				utilruntime.HandleError(fmt.Errorf("starting metrics server failed: %v", err))
			}
		}, 5*time.Second, wait.NeverStop)
	}

	// Tune conntrack, if requested
	// Conntracker is always nil for windows
	if s.Conntracker != nil {
		max, err := getConntrackMax(s.ConntrackConfiguration)
		if err != nil {
			return err
		}
		if max > 0 {
			err := s.Conntracker.SetMax(max)
			if err != nil {
				if err != errReadOnlySysFS {
					return err
				}
				// errReadOnlySysFS is caused by a known docker issue (https://github.com/docker/docker/issues/24000),
				// the only remediation we know is to restart the docker daemon.
				// Here we'll send an node event with specific reason and message, the
				// administrator should decide whether and how to handle this issue,
				// whether to drain the node and restart docker.  Occurs in other container runtimes
				// as well.
				// TODO(random-liu): Remove this when the docker bug is fixed.
				const message = "CRI error: /sys is read-only: " +
					"cannot modify conntrack limits, problems may arise later (If running Docker, see docker issue #24000)"
				s.Recorder.Eventf(s.NodeRef, api.EventTypeWarning, err.Error(), message)
			}
		}

		if s.ConntrackConfiguration.TCPEstablishedTimeout != nil && s.ConntrackConfiguration.TCPEstablishedTimeout.Duration > 0 {
			timeout := int(s.ConntrackConfiguration.TCPEstablishedTimeout.Duration / time.Second)
			if err := s.Conntracker.SetTCPEstablishedTimeout(timeout); err != nil {
				return err
			}
		}

		if s.ConntrackConfiguration.TCPCloseWaitTimeout != nil && s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration > 0 {
			timeout := int(s.ConntrackConfiguration.TCPCloseWaitTimeout.Duration / time.Second)
			if err := s.Conntracker.SetTCPCloseWaitTimeout(timeout); err != nil {
				return err
			}
		}
	}

	informerFactory := informers.NewSharedInformerFactoryWithOptions(s.Client, s.ConfigSyncPeriod,
		informers.WithTweakListOptions(func(options *v1meta.ListOptions) {
			options.LabelSelector = "!" + apis.LabelServiceProxyName
		}))

	// Create configs (i.e. Watches for Services and Endpoints)
	// Note: RegisterHandler() calls need to happen before creation of Sources because sources
	// only notify on changes, and the initial update (on process start) may be lost if no handlers
	// are registered yet.
	
	// ServiceConfig是kube-proxy中用于监听service变化的组件,其本质就是informer,进入NewServiceConfig方法可知。
	
	serviceConfig := config.NewServiceConfig(informerFactory.Core().V1().Services(), s.ConfigSyncPeriod)
	// EventHandlerd的处理逻辑是由Proxier来处理的
	serviceConfig.RegisterEventHandler(s.Proxier)
	go serviceConfig.Run(wait.NeverStop)


    // endpointsConfig是kube-proxy中用于监听endpoints变化的组件,其本质就是informer
	endpointsConfig := config.NewEndpointsConfig(informerFactory.Core().V1().Endpoints(), s.ConfigSyncPeriod)
    //EventHandler的处理逻辑是由Proxier来处理的
	endpointsConfig.RegisterEventHandler(s.Proxier)
	go endpointsConfig.Run(wait.NeverStop)

	// This has to start after the calls to NewServiceConfig and NewEndpointsConfig because those
	// functions must configure their shared informer event handlers first.
	informerFactory.Start(wait.NeverStop)

	// Birth Cry after the birth is successful
	s.birthCry()

	// Just loop forever for now...
	s.Proxier.SyncLoop()
	return nil
}
复制代码

2.2.2 NewServiceConfig实现Add/Update/Delete,仅更新事件到Map

  • syncProxyRules执行时机,在service和endpoint的Config刚创建并初始化的时候,以及在service和endpoint发生变化的时候。
  • 参考2.2.1,EventHandlerd的处理逻辑是由Proxier来处理的,通过传参:serviceConfig.RegisterEventHandler(s.Proxier)
// NewServiceConfig creates a new ServiceConfig.
func NewServiceConfig(serviceInformer coreinformers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
	result := &ServiceConfig{
		listerSynced: serviceInformer.Informer().HasSynced,
	}

	serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
		cache.ResourceEventHandlerFuncs{
			AddFunc:    result.handleAddService,
			UpdateFunc: result.handleUpdateService,
			DeleteFunc: result.handleDeleteService,
		},
		resyncPeriod,
	)

	return result
}

// result.handleUpdateService
func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
	oldService, ok := oldObj.(*v1.Service)
	if !ok {
		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
		return
	}
	service, ok := newObj.(*v1.Service)
	if !ok {
		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
		return
	}
	for i := range c.eventHandlers {
		klog.V(4).Info("Calling handler.OnServiceUpdate")
		c.eventHandlers[i].OnServiceUpdate(oldService, service)
	}
}
复制代码
  • add和delete的回调函数本质上仍是执行了update的回调函数,特殊点在于前者为nil更新为目标service,后者为目标service更新为nil。因此我们以update为例。
  • 其回调函数handleUpdateService本质上是调用了Proxier.OnServiceUpdate方法:
  • 方法很短,首先调用了Update方法,成功后调用Run方法。
// OnServiceUpdate is called whenever modification of an existing
// service object is observed.
func (proxier *Proxier) OnServiceUpdate(oldService, service *v1.Service) {
	if proxier.serviceChanges.Update(oldService, service) && proxier.isInitialized() {
		proxier.syncRunner.Run()
	}
}

// 仅更新事件到Map
// Run the function as soon as possible.  If this is called while Loop is not
// running, the call may be deferred indefinitely.
// If there is already a queued request to call the underlying function, it
// may be dropped - it is just guaranteed that we will try calling the
// underlying function as soon as possible starting from now.
func (bfr *BoundedFrequencyRunner) Run() {
	// If it takes a lot of time to run the underlying function, noone is really
	// processing elements from <run> channel. So to avoid blocking here on the
	// putting element to it, we simply skip it if there is already an element
	// in it.
	select {
	case bfr.run <- struct{}{}:
	default:
	}
}
复制代码

2.2.2 NewServiceConfig.run

  • syncProxyRules执行时机,在service和endpoint的Config刚创建并初始化的时候,以及在service和endpoint发生变化的时候。
  • 转到proxier.OnServiceSynced,首先调用setInitialized方法,将proxier初始化。只有经过初始化后,proxier才会开始调用回调函数。最后,就是执行一次syncProxyRules方法。
  • 总而言之,Run方法是将刚创建好的ServiceConfig初始化,并在初始化后先调用一次syncProxyRules方法。而这个方法,就是kube-proxy维护iptables的具体操作,我们后面再详细分析。

2.2.3 Proxier.SyncLoop

  • syncProxyRules执行时机,每隔一段时间会自动执行。
  • 三个处理线程,下面我们再来看SyncLoop, SyncLoop本质上调用了bounded_frequency_runner.go中的Loop方法:
  • 可以看到,此方法运行一个无限循环,并定时运行tryRun方法。此外,当bfr的run字段有消息传入时,也会执行一次tryRun。那么这个channel什么时候传入消息呢?答案就是上一篇文章中提到的,ServiceConfig的回调函数被调用的时候。所以说,每当service发生变化,回调函数被调用时,最终都会执行一次tryRun方法。
  • 所有其他的代码,都在为bfr.fn服务,此方法的核心,就是在合适的时机运行bfr.fn方法。而这一方法,是在创建proxier的时候注册进去的。回忆一下上一篇的内容,在NewProxier方法中有一行:
    pkg/proxy/iptables/proxier.go
    
    func NewProxier(...) (*Proxier, error) {
        
        ......
    
        proxier.syncRunner = async.NewBoundedFrequencyRunner("sync-runner", proxier.syncProxyRules, minSyncPeriod, syncPeriod, burstSyncs)
        return proxier, nil
}
复制代码

2.2.4 syncProxyRules三种执行时机

  • 可以看到,这里将前面提到的proxier.syncProxyRules方法注册为了bfr.fn。 所以综上可知,syncProxyRules方法有三种被执行的途径,即:
  • 在service和endpoint的Config刚创建并初始化的时候;
  • 在service和endpoint发生变化的时候;
  • 每隔一段时间会自动执行。

2.2.5 syncProxyRules主角登场

  • (1)调用UpdateServiceMap和UpdateEndpointMap方法,执行Service和Endpoint的更新。
  • (2)添加数据链表。
  • (3)遍历所有的service,判断每个service的类型,并添加相应的规则。
  • (4)删除多余的链表,并对iptables进行重构。

  • Create and link the kube chains
    for _, chain := range iptablesJumpChains {
        if _, err := proxier.iptables.EnsureChain(chain.table, chain.chain); err != nil {
            klog.Errorf("Failed to ensure that %s chain %s exists: %v", chain.table, kubeServicesChain, err)
            return
        }
        args := append(chain.extraArgs,
            "-m", "comment", "--comment", chain.comment,
            "-j", string(chain.chain),
        )
        if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, chain.table, chain.sourceChain, args...); err != nil {
            klog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", chain.table, chain.sourceChain, chain.chain, err)
            return
        }
    }
复制代码
  • Build rules for each service
        svcInfo, ok := svc.(*serviceInfo)
        if !ok {
            klog.Errorf("Failed to cast serviceInfo %q", svcName.String())
            continue
        }
        isIPv6 := utilnet.IsIPv6(svcInfo.ClusterIP)
        protocol := strings.ToLower(string(svcInfo.Protocol))
        svcNameString := svcInfo.serviceNameString
        hasEndpoints := len(proxier.endpointsMap[svcName]) > 0

        svcChain := svcInfo.servicePortChainName
        if hasEndpoints {
            // Create the per-service chain, retaining counters if possible.
            if chain, ok := existingNATChains[svcChain]; ok {
                writeBytesLine(proxier.natChains, chain)
            } else {
                writeLine(proxier.natChains, utiliptables.MakeChainLine(svcChain))
            }
            activeNATChains[svcChain] = true
        }

        ...// Capture the clusterIP.
        if hasEndpoints {
            args = append(args[:0],
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcNameString),
                "-m", protocol, "-p", protocol,
                "-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
                "--dport", strconv.Itoa(svcInfo.Port),
            )
            ...
        } else {
            writeLine(proxier.filterRules,
                "-A", string(kubeServicesChain),
                "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
                "-m", protocol, "-p", protocol,
                "-d", utilproxy.ToCIDR(svcInfo.ClusterIP),
                "--dport", strconv.Itoa(svcInfo.Port),
                "-j", "REJECT",
            )
        }

        // Capture externalIPs.
        for _, externalIP := range svcInfo.ExternalIPs {

            ...

        }
复制代码
  • Sync rules
    proxier.iptablesData.Reset()
    proxier.iptablesData.Write(proxier.filterChains.Bytes())
    proxier.iptablesData.Write(proxier.filterRules.Bytes())
    proxier.iptablesData.Write(proxier.natChains.Bytes())
    proxier.iptablesData.Write(proxier.natRules.Bytes())

    klog.V(5).Infof("Restoring iptables rules: %s", proxier.iptablesData.Bytes())
    err = proxier.iptables.RestoreAll(proxier.iptablesData.Bytes(), utiliptables.NoFlushTables, utiliptables.RestoreCounters)
    if err != nil {
        klog.Errorf("Failed to execute iptables-restore: %v", err)
        // Revert new local ports.
        klog.V(2).Infof("Closing local ports after iptables-restore failure")
        utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
        return
    }

复制代码

3 总结

proxier和syncProxyRules才是笑到最后,掌握实权的,核心精华就在这里。

专注于大数据及容器云核心技术解密,可提供全栈的大数据+云原生平台咨询方案,请持续关注本套博客。如有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号,或请转发邮件至1120746959@qq.com