kube-scheduler 磁盘调度源码分析
作者 |?leadersnowy
來源 | CSDN博客
kube-scheduler介紹
首先我們知道,kube-scheduler的根本工作任務是根據各種調度算法將Pod調度到最合適的工作節點上
一、整個調度流程分為兩個階段:
1、預選(Predicates):
輸入是所有節點,輸出是滿足預選條件的節點。kube-scheduler根據預選策略過濾掉不滿足策略的Nodes。例如,如果某節點的資源不足或者不滿足預選策略的條件如“Node的label必須與Pod的Selector一致”時則無法通過預選。
2、優選(Priorities):
輸入是預選階段篩選出的節點,優選會根據優先策略為通過預選的Nodes進行打分排名,選擇得分最高的Node。例如,資源越富裕、負載越小的Node可能具有越高的排名。
而整個調度過程是有很多因素影響的,包括節點狀態(cpu,內存,磁盤),節點與pod的親和性,節點標簽等,而我們本次只做與磁盤相關的源碼分析。
源碼分析
2.1、磁盤預選(Predicates)
這一部分核心代碼在pkg/scheduler/framework/plugins/nodevolumelimits/這個目錄下,分兩個文件
csi.go,non_csi.go 兩個文件的核心思想基本一致,都是通過Filter來限制一個節點上的最大盤的數量,這個最大數量是默認配置在k8s源碼中的。csi的就是限制一個節點上最多能掛幾塊csi的volume,而non_csi的相對比較復雜,因為牽扯到in-tree插件跟flexvolume插件,而且對每一種in-tree插件都進行了分別的可掛載盤的數量的配置,下面我們對于non-csi的進行具體的源碼分析。
話不多說,直接上代碼:
2.1.1 nonCSILimits
type nonCSILimits struct {name stringfilter VolumeFiltervolumeLimitKey v1.ResourceNamemaxVolumeFunc func(node *v1.Node) intcsiNodeLister storagelisters.CSINodeListerpvLister corelisters.PersistentVolumeListerpvcLister corelisters.PersistentVolumeClaimListerscLister storagelisters.StorageClassLister// The string below is generated randomly during the struct's initialization.// It is used to prefix volumeID generated inside the predicate() method to// avoid conflicts with any real volume.randomVolumeIDPrefix string }最核心的結構體nonCSILimits ,提供了幾個成員
name 顧名思義,每一個nonCSILimits結構體變量的名稱
filter VolumeFilter類型的變量,具體包括幾個方法,FilterVolume(),FilterPersistentVolume(),MatchProvisioner()以及IsMigrated()具體調用我們后面再講
volumeLimitKey 其實就是一個string類型,指定了幾種類型的key
maxVolumeFunc() 一個獲取該Limits的最大數量的方法
csiNodeLister csinode的監聽對象
pvLister pv的監聽對象
pvcLister pvc的監聽對象
scLister sc的監聽對象
randomVolumeIDPrefix 一個string類型,用于生成唯一的pvID
2.1.2 nonCSILimits初始化
func newNonCSILimits(filterName string,csiNodeLister storagelisters.CSINodeLister,scLister storagelisters.StorageClassLister,pvLister corelisters.PersistentVolumeLister,pvcLister corelisters.PersistentVolumeClaimLister, ) framework.Plugin {var filter VolumeFiltervar volumeLimitKey v1.ResourceNamevar name stringswitch filterName {case ebsVolumeFilterType:name = EBSNamefilter = ebsVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.EBSVolumeLimitKey)case gcePDVolumeFilterType:name = GCEPDNamefilter = gcePDVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.GCEVolumeLimitKey)case azureDiskVolumeFilterType:name = AzureDiskNamefilter = azureDiskVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.AzureVolumeLimitKey)case indiskVolumeFilterType:name = IndiskNamefilter = indiskVolumeFiltervolumeLimitKey = v1.ResourceName(volumeutil.IndiskVolumeLimitKey)default:klog.Fatalf("Wrong filterName, Only Support %v %v %v %v %v", ebsVolumeFilterType,gcePDVolumeFilterType, azureDiskVolumeFilterType, cinderVolumeFilterType, indiskVolumeFilterType)return nil}pl := &nonCSILimits{name: name,filter: filter,volumeLimitKey: volumeLimitKey,maxVolumeFunc: getMaxVolumeFunc(filterName),csiNodeLister: csiNodeLister,pvLister: pvLister,pvcLister: pvcLister,scLister: scLister,randomVolumeIDPrefix: rand.String(32),}return pl }初始化時引用了大量的常量
const (// defaultMaxGCEPDVolumes defines the maximum number of PD Volumes for GCE.// GCE instances can have up to 16 PD volumes attached.defaultMaxGCEPDVolumes = 16// defaultMaxAzureDiskVolumes defines the maximum number of PD Volumes for Azure.// Larger Azure VMs can actually have much more disks attached.// TODO We should determine the max based on VM sizedefaultMaxAzureDiskVolumes = 16// ebsVolumeFilterType defines the filter name for ebsVolumeFilter.ebsVolumeFilterType = "EBS"// gcePDVolumeFilterType defines the filter name for gcePDVolumeFilter.gcePDVolumeFilterType = "GCE"// azureDiskVolumeFilterType defines the filter name for azureDiskVolumeFilter.azureDiskVolumeFilterType = "AzureDisk"// cinderVolumeFilterType defines the filter name for cinderVolumeFilter.indiskVolumeFilterType = "Indisk"// ErrReasonMaxVolumeCountExceeded is used for MaxVolumeCount predicate error.ErrReasonMaxVolumeCountExceeded = "node(s) exceed max volume count"// KubeMaxPDVols defines the maximum number of PD Volumes per kubelet.KubeMaxPDVols = "KUBE_MAX_PD_VOLS"// MaxIndiskVolumes defines the maximum number of Indisk Volumes per node.MaxIndiskVolumes = "MAX_INDISK_VOLUMES"// DefaultIndiskVolumes defines the default number of Indisk Volumes per node.DefaultIndiskVolumes = "DEFAULT_INDISK_VOLUMES" )這些常量包括各intree存儲類型的名字,以及默認的最大volume數量等信息,在nonCSILimits初始化時進行賦值。
除了這些常量之外,最關鍵的就是filter方法的初始化,看一下cinder的例子,是怎么區分這個volume是cinder的volume的。
var cinderVolumeFilter = VolumeFilter{FilterVolume: func(vol *v1.Volume) (string, bool) {if vol.Cinder != nil {return vol.Cinder.VolumeID, true}return "", false},FilterPersistentVolume: func(pv *v1.PersistentVolume) (string, bool) {if pv.Spec.Cinder != nil {return pv.Spec.Cinder.VolumeID, true}return "", false},MatchProvisioner: func(sc *storage.StorageClass) (relevant bool) {if sc.Provisioner == csilibplugins.CinderInTreePluginName {return true}return false},IsMigrated: func(csiNode *storage.CSINode) bool {return isCSIMigrationOn(csiNode, csilibplugins.CinderInTreePluginName)}, }可以看到,filer其實是通過對元數據的字段(vol.Cinder,pv.Spec.Cinder,sc.Provisioner)等進行判斷來分辨是不是本類型的volume的。
如果不是intree的,可以通過其它的字段來進行過濾
Flexvolume的我們可以用這些字段:
vol.FlexVolume.Driver,pv.Spec.FlexVolume.Driver
CSI的我們可以用這些字段:
vol.CSI.Driver,pv.Spec.CSI.Driver
2.1.3 核心方法Filter
// Filter invoked at the filter extension point. func (pl *nonCSILimits) Filter(ctx context.Context, _ *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {// If a pod doesn't have any volume attached to it, the predicate will always be true.// Thus we make a fast path for it, to avoid unnecessary computations in this case.if len(pod.Spec.Volumes) == 0 {return nil}newVolumes := make(map[string]bool)if err := pl.filterVolumes(pod.Spec.Volumes, pod.Namespace, newVolumes); err != nil {return framework.NewStatus(framework.Error, err.Error())}// quick returnif len(newVolumes) == 0 {return nil}node := nodeInfo.Node()if node == nil {return framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))}var csiNode *storage.CSINodevar err errorif pl.csiNodeLister != nil {csiNode, err = pl.csiNodeLister.Get(node.Name)if err != nil {// we don't fail here because the CSINode object is only necessary// for determining whether the migration is enabled or notklog.V(5).Infof("Could not get a CSINode object for the node: %v", err)}}// if a plugin has been migrated to a CSI driver, defer to the CSI predicateif pl.filter.IsMigrated(csiNode) {return nil}// count unique volumesexistingVolumes := make(map[string]bool)for _, existingPod := range nodeInfo.Pods {if err := pl.filterVolumes(existingPod.Pod.Spec.Volumes, existingPod.Pod.Namespace, existingVolumes); err != nil {return framework.NewStatus(framework.Error, err.Error())}}numExistingVolumes := len(existingVolumes)// filter out already-mounted volumesfor k := range existingVolumes {delete(newVolumes, k)}numNewVolumes := len(newVolumes)maxAttachLimit := pl.maxVolumeFunc(node)volumeLimits := volumeLimits(nodeInfo)if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok {maxAttachLimit = int(maxAttachLimitFromAllocatable)}if numExistingVolumes+numNewVolumes > maxAttachLimit {return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)}if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {nodeInfo.TransientInfo.TransientLock.Lock()defer nodeInfo.TransientInfo.TransientLock.Unlock()nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumesnodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes}return nil }接下來看一下核心代碼的邏輯
最開始是一些校驗,首先校驗這個pod有沒有使用volume,再校驗這個pod使用的volume是什么類型的volume,然后由對應類型的volume的filter進行處理。
看一下最核心的處理流程
numExistingVolumes := len(existingVolumes)// filter out already-mounted volumesfor k := range existingVolumes {delete(newVolumes, k)}numNewVolumes := len(newVolumes)maxAttachLimit := pl.maxVolumeFunc(node)volumeLimits := volumeLimits(nodeInfo)if maxAttachLimitFromAllocatable, ok := volumeLimits[pl.volumeLimitKey]; ok {maxAttachLimit = int(maxAttachLimitFromAllocatable)}if numExistingVolumes+numNewVolumes > maxAttachLimit {return framework.NewStatus(framework.Unschedulable, ErrReasonMaxVolumeCountExceeded)}if nodeInfo != nil && nodeInfo.TransientInfo != nil && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) {nodeInfo.TransientInfo.TransientLock.Lock()defer nodeInfo.TransientInfo.TransientLock.Unlock()nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumesnodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes = numNewVolumes}return?nil邏輯非常清楚
numExistingVolumes 表示該節點上已經有的volume數
numNewVolumes是該pod需要創建的volume數
maxAttachLimit 是該類型的volume在節點上所能創建的最大volume數
如果 numExistingVolumes+numNewVolumes > maxAttachLimit 則該節點不可調度
如果可以調度,則return nil,然后釋放資源
到此,預選階段的工作做完。
這里還需要注意一個地方
nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount = maxAttachLimit - numExistingVolumesnodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes?=?numNewVolumesAllocatableVolumesCount,RequestedVolumes這兩個變量的賦值是給后面進行優選打分的時候用的
2.2 磁盤優選(Priorities)
優選打分的條件也很多,這里我們只處理跟存儲相關的,具體源碼在
pkg\scheduler\framework\plugins\noderesources\resource_allocation.go
pkg\scheduler\framework\plugins\noderesources\least_allocated.go
2.2.1 resource_allocation
resource_allocation 字體意思就可以理解,就是資源分配,而資源分配有多種方,比如least_allocated,most_allocated,requested_to_capacity_ratio都是資源分配的方法,而resource_allocation只是提供一個score方法,代碼如下:
func (r *resourceAllocationScorer) score(pod *v1.Pod,nodeInfo *framework.NodeInfo) (int64, *framework.Status) {node := nodeInfo.Node()if node == nil {return 0, framework.NewStatus(framework.Error, "node not found")}if r.resourceToWeightMap == nil {return 0, framework.NewStatus(framework.Error, "resources not found")}requested := make(resourceToValueMap, len(r.resourceToWeightMap))allocatable := make(resourceToValueMap, len(r.resourceToWeightMap))for resource := range r.resourceToWeightMap {allocatable[resource], requested[resource] = calculateResourceAllocatableRequest(nodeInfo, pod, resource)}var score int64// Check if the pod has volumes and this could be added to scorer function for balanced resource allocation.if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)} else {score = r.scorer(requested, allocatable, false, 0, 0)}if klog.V(10).Enabled() {if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {klog.Infof("%v -> %v: %v, map of allocatable resources %v, map of requested resources %v , allocatable volumes %d, requested volumes %d, score %d",pod.Name, node.Name, r.Name,allocatable, requested, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount,nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes,score,)} else {klog.Infof("%v -> %v: %v, map of allocatable resources %v, map of requested resources %v ,score %d,",pod.Name, node.Name, r.Name,allocatable, requested, score,)}}return score, nil }核心代碼就一句
if len(pod.Spec.Volumes) > 0 && utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) && nodeInfo.TransientInfo != nil {score = r.scorer(requested, allocatable, true, nodeInfo.TransientInfo.TransNodeInfo.RequestedVolumes, nodeInfo.TransientInfo.TransNodeInfo.AllocatableVolumesCount)} else {score = r.scorer(requested, allocatable, false, 0, 0)}如果pod有volume并且BalanceAttachedNodeVolumes這個feature打開了,并且節點有TransientInfo
那么就走存儲相關的打分,否則就不走。
我們看一下r.scorer的參數的后兩個,就是我們在預選階段最后賦值的兩個參數
AllocatableVolumesCount 表示還可以創建的volume數量
RequestedVolumes 表示該pod需要的volume數量
2.2.2 least_allocated
least_allocated代碼最少資源優先,也就是節點上資源越少,分越高
直接看源碼
type resourceAllocationScorer struct {Name stringscorer func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64resourceToWeightMap resourceToWeightMap }func leastResourceScorer(resToWeightMap resourceToWeightMap) func(resourceToValueMap, resourceToValueMap, bool, int, int) int64 {return func(requested, allocable resourceToValueMap, includeVolumes bool, requestedVolumes int, allocatableVolumes int) int64 {var nodeScore, weightSum int64for resource, weight := range resToWeightMap {resourceScore := leastRequestedScore(requested[resource], allocable[resource])nodeScore += resourceScore * weightweightSum += weight}return nodeScore / weightSum} }// The unused capacity is calculated on a scale of 0-MaxNodeScore // 0 being the lowest priority and `MaxNodeScore` being the highest. // The more unused resources the higher the score is. func leastRequestedScore(requested, capacity int64) int64 {if capacity == 0 {return 0}if requested > capacity {return 0}return ((capacity - requested) * int64(framework.MaxNodeScore)) / capacity }我們來看核心算法
capacity表示還可以剩余資源數量
requested表示該pod需求資源數量
比如:capacity=10,requested=3,framework.MaxNodeScorel默認是100
那么得分就是 (10-3)*100/10=70
但是我們看到 leastResourceScorer中并沒有引用存儲的部分,所以我們可以手動添加上
if includeVolumes && allocatableVolumes - requestedVolumes > 0 && allocatableVolumes > 0 {nodeScore += int64(((allocatableVolumes - requestedVolumes) * int(framework.MaxNodeScore)) / allocatableVolumes)weightSum += 1}至此,kube-scheduler存儲部分代碼解讀。
往期推薦
從 40% 跌至 4%,“糊”了的 Firefox 還能重回巔峰嗎?
Gartner 發布 2022 年汽車行業五大技術趨勢
使用這個庫,讓你的服務操作 Redis 速度飛起
漫畫:什么是“低代碼”開發平臺?
點分享
點收藏
點點贊
點在看
總結
以上是生活随笔為你收集整理的kube-scheduler 磁盘调度源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 指数级暴增、复杂场景下,揭秘百度云原生湖
- 下一篇: 谁说技术男不浪漫!90后程序员2天做出猫