隐藏的调度逻辑,ImageLocalityPriority 导致 Spark On Kubernetes 执行 Job Pod 调度不均匀 (包含源代码分析)

11,823 阅读9分钟

我是 LEE,老李,一个在 IT 行业摸爬滚打 17 年的技术老兵。

事件背景

昨天公司一套大数据 Spark Job 运行的 Kubernetes 集群突然出现大量 Job Pod 被 Pending,导致很多计算任务被卡住,然后大量超时报警发到了大数据业务部门。没一会就被小伙伴们叫到会议室准备一起解决问题,不到会议室不知道,那个热闹得跟赶集一样。我刚进门就被其他的小伙伴抓住,一起开始了确认大量 Job Pod 被 Pending 的问题,所有涉及上下游的小伙伴都在自查系统,我也投入到容器相关的系统的检查中。

通过一段时间的排查,发现 Pod 所运行的节点都很正常,而且整个 Kubernetes 所有节点资源都很充裕,没有理由 Pod 都集中运行到一个 Node 节点上。不知不觉就进入了茫茫的日志海洋的排查,直到最大的领导 GM 出现,任然没有找到出现这个问题位置。没有办法,只能求教我们 Kubernetes 源代码的大佬 - 吴老师,请他一起来“边看源代码,边解决问题”。这个时候 Spark Job 的任务还在不停的创建,集群上的 Job 任务还在疯狂的堆积,已经严重影响到真实的生产流,GM 脸色越来越严肃。

现象获取

我们做了很多假设,都逐一被否定。实在没有办法,既然大量 Job Pod 被 Pending,是因为被调度到一个固定的节点上导致的,大概率的是 Kubernetes 调度器的问题,我们把注意力集中到了 kube-scheduler 运行的 3 台服务器上。通过长时间的观测日志,总是发现相同的内容,所有的 Job Pod 都被调度到了一个 Node 节点上,不管怎么重启 Job 或者 kube-scheduler 结果都一样,日志如下图:

异常调度结果

在我们一筹莫展的时候,吴老师提议我们把 kube-scheduler 的日志用最详细的方式输出,再观察下日志。尤其要观察下 kube-scheduler 的 Filter 和 Scope 两个环节的数据变化。

一不做二不休,说干就干,直接让所有 kube-scheduler 的日志输出都在最高模式的下,我们继续观察日志,然后查看 kube-scheduler 中的 plugin 中数值的变化。果不其然,进过大概 10 分钟观察和数据统计,我们在结果中看到了一些内容,kube-scheduler 中的一个 plugin 让我们高度重视,而且对比了整个 kube-scheduler 调度结果,基本确认就是这个 plugin 导致的,它就是:ImageLocalityPriority。他影响了 kube-scheduler 调度结果。

具体 ImageLocalityPriority 产生的分数如下:

异常调度结果

从上面两张图对比就可以看出逻辑,10.10.33.57 获得 Scope 为 100,然后 Job Pod 都被调度到了 10.10.33.57 上。是不是感觉非常有意思呢?

如果你觉得非常有意思,那么你跟我一样,吴老师也是觉得非常有意思。 是不是很有想法跟我们一起往下看看具体原理呢?

原理分析

在定位到了问题以后,我们使用了一些“方法”解决了这个问题,让卡住的大量 Job Pod 从新在整个集群上快速执行起来。具体解决方案到下一部分我们再说,我们先看看是什么原因导致这个问题出现的,这样我们才能真正的理解解决方案中的内容。

(★)导致这次问题的真凶:ImageLocalityPriority

ImageLocalityPriority 前世今生

ImageLocalityPriority 插件的设计目的是通过优先将 Pod 分配到已经缓存了所需镜像的节点上来提高 Kubernetes 调度器的性能和效率。

在 Kubernetes 集群中,每个节点都需要下载所有需要运行的容器镜像。如果集群中的所有节点都没有所需镜像,则 Kubernetes 将会选择其中之一,并将镜像下载到该节点上。这可能会导致不必要的网络负载和较长的 Pod 启动时间。

为了避免这种情况,ImageLocalityPriority 插件引入了镜像本地性的概念,即首选在已经拥有所需镜像的节点上启动 Pod。这样可以减少镜像下载时间和网络负载,并且可以提高调度效率和性能。

需要注意的是,使用 ImageLocalityPriority 插件会使节点之间的镜像缓存不一致,因此需要根据实际情况进行权衡和调整。例如,在使用容器镜像仓库时,可以配置自己的镜像缓存策略来确保节点之间的镜像缓存一致性。

ImageLocalityPriority 存在的目的

Kubernetes 调度器中的 ImageLocalityPriority 插件是通过优先将 Pod 分配到已经缓存了所需镜像的节点上来提高调度性能的。

当需要将一个 Pod 分配给某个节点时,ImageLocalityPriority 插件会考虑该节点上是否已经缓存了该 Pod 所需的镜像。如果该节点已经拥有了所需镜像,则该节点的得分会更高;否则,该节点的得分会相应降低。

为了确定一个节点是否已经缓存了所需镜像,ImageLocalityPriority 插件会查找该节点上的 Docker 版本和镜像列表,并与 Pod 的镜像列表进行比较。如果发现镜像列表匹配,则该节点的得分会更高。

需要注意的是,ImageLocalityPriority 插件只考虑节点上已经缓存了的镜像,而不考虑镜像从其他节点下载的时间和网络负载等因素。因此,在使用 ImageLocalityPriority 插件时,需要根据实际情况进行权衡和调整,并确保集群中的所有节点都能够快速可靠地获取所需镜像

ImageLocalityPriority 算法解析

结合上面的提到的内容,ImageLocalityPriority 算法实现非常简单和粗暴。

总共非常了两部分:

  1. sumImageScores: 计算节点上应用 Pod 中所有 Container 的容量打分,并最后汇总这个分数。
  2. calculatePriority: 根据 sumImageScores 和 Container 的数量计算这个 Pod 的分数。

有上面的两部分计算的结果,最后通过 Scope 方法,将这个 plugin 计算的分数返回给 kube-scheduler。

sumImageScores

pkg/scheduler/framework/plugins/imagelocality/image_locality.go

// sumImageScores returns the sum of image scores of all the containers that are already on the node.
// Each image receives a raw score of its size, scaled by scaledImageScore. The raw scores are later used to calculate
// the final score. Note that the init containers are not considered for it's rare for users to deploy huge init containers.
func sumImageScores(nodeInfo *framework.NodeInfo, containers []v1.Container, totalNumNodes int) int64 {
	var sum int64
	for _, container := range containers {
		if state, ok := nodeInfo.ImageStates[normalizedImageName(container.Image)]; ok {
			sum += scaledImageScore(state, totalNumNodes)
		}
	}
	return sum
}

// scaledImageScore returns an adaptively scaled score for the given state of an image.
// The size of the image is used as the base score, scaled by a factor which considers how much nodes the image has "spread" to.
// This heuristic aims to mitigate the undesirable "node heating problem", i.e., pods get assigned to the same or
// a few nodes due to image locality.
func scaledImageScore(imageState *framework.ImageStateSummary, totalNumNodes int) int64 {
	spread := float64(imageState.NumNodes) / float64(totalNumNodes)
	return int64(float64(imageState.Size) * spread)
}

  • scaledImageScore 负责计算已经下载镜像(待调度 Job Pod 的镜像)节点数量占 Kubernetes 总节点数量的比重,比重值与指定的 Container 镜像大小值相乘,返回 int64 值。
  • sumImageScores 负责将所有的 Pod 中所有的 Container 执行 scaledImageScore 计算,将所有值进行求和,返回 int64 值。

算法公式:

ImageLocalityPriority-5.png

calculatePriority

pkg/scheduler/framework/plugins/imagelocality/image_locality.go

// The two thresholds are used as bounds for the image score range. They correspond to a reasonable size range for
// container images compressed and stored in registries; 90%ile of images on dockerhub drops into this range.
const (
	mb                    int64 = 1024 * 1024
	minThreshold          int64 = 23 * mb
	maxContainerThreshold int64 = 1000 * mb
)

// calculatePriority returns the priority of a node. Given the sumScores of requested images on the node, the node's
// priority is obtained by scaling the maximum priority value with a ratio proportional to the sumScores.
func calculatePriority(sumScores int64, numContainers int) int64 {
	// 1G 容量 * Pod 中 Container 的数量,获得最大的上限
	maxThreshold := maxContainerThreshold * int64(numContainers)
	if sumScores < minThreshold {
		sumScores = minThreshold
	} else if sumScores > maxThreshold {
		sumScores = maxThreshold
	}

	// 返回值在 0 - 100 之间
	return int64(framework.MaxNodeScore) * (sumScores - minThreshold) / (maxThreshold - minThreshold)
}

calculatePrioritysumImageScores 计算的结果在函数内做对比:

  1. 如果 sumScores < 23 * 1024 * 1024,则 calculatePriority 返回 0
  2. 如果 sumScores >= numContainers * 1000 * 1024 * 1024, 则 calculatePriority 返回 100

不管你的 Pod 中间有多少的 Container,最后 Pod 计算结果只会落在 0 - 100 之间。

TIPS:我们这边出问题的应用是单 Container 的 Pod,这个 Container 的 Image 镜像已经超过了 1G,所以我们看到的打分是 100。

算法公式:

ImageLocalityPriority-6.png

Scope

pkg/scheduler/framework/plugins/imagelocality/image_locality.go

// Score invoked at the score extension point.
func (pl *ImageLocality) Score(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (int64, *framework.Status) {
    // 从 Snapshot 中获取 NodeInfo。
    nodeInfo, err := pl.handle.SnapshotSharedLister().NodeInfos().Get(nodeName)
    // 如果出现错误,则返回 0 和带有错误信息的 Status。
    if err != nil {
        return 0, framework.AsStatus(fmt.Errorf("getting node %q from Snapshot: %w", nodeName, err))
    }

    // 获取所有 NodeInfo 列表,并得到节点数。
    nodeInfos, err := pl.handle.SnapshotSharedLister().NodeInfos().List()
    if err != nil {
        return 0, framework.AsStatus(err)
    }
    totalNumNodes := len(nodeInfos)

    // 计算 pod 的 priority,并返回 score。
    score := calculatePriority(sumImageScores(nodeInfo, pod.Spec.Containers, totalNumNodes), len(pod.Spec.Containers))
    return score, nil
}

Score 方法就是返回 calculatePriority 计算的结果给 kube-scheduler,然后 kube-scheduler 继续通过其他的 plugin 打分,最后返回所有 plugin 分数总和,决定 Pod 在哪个 Node 节点上运行。

处理方法

清楚了 ImageLocalityPriority 整体结构,以及相关代码以及算法实现,那么对应的解决方案也就有了。

确实如此,而且解决方案也非常的简单:

  1. 在 kube-scheduler 中关闭 ImageLocalityPriority 插件。
  2. 在 kube-scheduler 中降低 ImageLocalityPriority 的优先级。
  3. 全部节点上部署 Image 镜像同步器,让 ImageLocalityPriority 打分返回 100 的 Pod 使用镜像在 Kubernetes 集群中所有节点都被下载。

这里提供两种实操,第 3 种有很多实现方式,这个大家可以自行 baidu,然后选择复合自己的方案。

关闭 ImageLocalityPriority

关闭 ImageLocalityPriority

降低 ImageLocalityPriority 优先级

降低 ImageLocalityPriority 优先级

最终效果

最后我们这边为了紧急恢复 Spark Job 在 Kubernetes 上的运行,我们用了最简单的方案 1。当然我们这边还有更多的工作要做,才能真正让这个问题在我们这边测底消除。

同时也通过一个 test-job 来验证我们的解决方案的效果,与预期相符

恢复正常调度

多啰嗦一句:ImageLocalityPriority 导致 Spark Job 的 Pod 在 Kubernetes 调度不均的问题,确实没有想过是因为镜像的大小的导致的。我想这个问题也会导致很多小伙伴一头雾水,最后在获得吴老师同意后,决定把我这次碰到问题成文,提供给大家“避坑”。