找回密码
 立即注册
首页 业界区 业界 HAMi vGPU 原理分析 Part4:Spread&Binpack 高级调度策 ...

HAMi vGPU 原理分析 Part4:Spread&Binpack 高级调度策略实现

巩芷琪 2025-10-1 13:16:09
1.png

上篇我们分析了 hami-scheduler 工作流程,知道了 hami-webhook、hami-scheduler 是怎么配合工作的。
本文为 HAMi 原理分析的第四篇,分析 hami-scheduler 在调度时是如何选择节点的,即:Spread、Binpack 等高级调度策略是怎么实现的。
这篇文章我们解决最后一个问题:Spread、Binpack 等高级调度策略是怎么实现的
以下分析基于 HAMi v2.4.0
这里在贴一下上一篇总结的 HAMi Webhook 、Scheduler 工作流程:
2.png


  • 1)用户创建 Pod 并在 Pod 中申请了 vGPU 资源
  • 2)kube-apiserver 根据 MutatingWebhookConfiguration 配置请求 HAMi-Webhook
  • 3)HAMi-Webhook 检测 Pod 中的 Resource,发现是申请的由 HAMi 管理的 vGPU 资源,因此把 Pod 中的 SchedulerName 改成了 hami-scheduler,这样这个 Pod 就会由 hami-scheduler 进行调度了。

    • 对于特权模式的 Pod,Webhook 会直接跳过不处理
    • 对于使用 vGPU 资源但指定了 nodeName 的 Pod,Webhook 会直接拒绝

  • 4)hami-scheduler 进行 Pod 调度,不过就是用的 k8s 的默认 kube-scheduler 镜像,因此调度逻辑和默认的 default-scheduler 是一样的,kube-scheduler 根据 KubeSchedulerConfiguration 配置,调用 Extender Scheduler 插件

    • 这个 Extender Scheduler 就是 hami-scheduler Pod 中的另一个 Container,该 Container 同时提供了 Webhook 和 Scheduler 相关 API。
    • 当 Pod 申请了 vGPU 资源时,kube-scheduler 就会根据配置以 HTTP 形式调用 Extender Scheduler 插件,这样就实现了自定义调度逻辑

  • 5)Extender Scheduler 插件包含了真正的 hami 调度逻辑, 调度时根据节点剩余资源量进行打分选择节点
  • 6)异步任务,包括 GPU 感知逻辑

    • devicePlugin 中的后台 Goroutine 定时上报 Node 上的 GPU 资源并写入到 Node 的 Annoations
    • Extender Scheduler 插件根据 Node Annoations 解析出 GPU 资源总量、从 Node 上已经运行的 Pod 的 Annoations 中解析出 GPU 使用量,计算出每个 Node 剩余的可用资源保存到内存供调度时使用

1. 配置调度策略

hami-scheduler 提供了两种不同级别的调度策略:

  • 节点调度策略:作用于调度过程中如何为 Pod 选择节点
  • GPU 调度策略:作用于选择节点后,节点存在多 GPU 时如何为 Pod 选择 GPU
根据部署文档,我们可以在部署时指定调度策略

  • scheduler.defaultSchedulerPolicy.nodeSchedulerPolicy: 字符串类型,预设值为"binpack",表示 GPU 节点调度策略。

    • "binpack"表示尽量将任务分配到同一个 GPU 节点上
    • "spread"表示尽量将任务分配到不同 GPU 节点上。

  • scheduler.defaultSchedulerPolicy.gpuSchedulerPolicy: 字符串类型,预设值为"spread", 表示 GPU 调度策略。

    • "binpack"表示尽量将任务分配到同一个 GPU 上
    • "spread"表示尽量将任务分配到不同 GPU 上。

就像这样:
  1. helm install vgpu vgpu-charts/vgpu --set scheduler.defaultSchedulerPolicy.nodeSchedulerPolicy=binpark --set scheduler.defaultSchedulerPolicy.gpuSchedulerPolicy=spread
复制代码
部署后,这两个配置作用域 hami-scheduler 上具体如下:
  1. kk get deploy vgpu-hami-scheduler -oyaml
  2. apiVersion: apps/v1
  3. kind: Deployment
  4. metadata:
  5.   name: vgpu-hami-scheduler
  6.   namespace: kube-system
  7. spec:
  8.   template:
  9.     spec:
  10.       containers:
  11.       - command:
  12.         - scheduler
  13.         - --resource-name=nvidia.com/gpu
  14.         - --resource-mem=nvidia.com/gpumem
  15.         - --resource-cores=nvidia.com/gpucores
  16.         - --resource-mem-percentage=nvidia.com/gpumem-percentage
  17.         - --resource-priority=nvidia.com/priority
  18.         - --http_bind=0.0.0.0:443
  19.         - --cert_file=/tls/tls.crt
  20.         - --key_file=/tls/tls.key
  21.         - --scheduler-name=hami-scheduler
  22.         - --metrics-bind-address=:9395
  23.         - --default-mem=0
  24.         - --default-gpu=1
  25.         - --default-cores=0
  26.         - --iluvatar-memory=iluvatar.ai/vcuda-memory
  27.         - --iluvatar-cores=iluvatar.ai/vcuda-core
  28.         - --cambricon-mlu-name=cambricon.com/vmlu
  29.         - --cambricon-mlu-memory=cambricon.com/mlu.smlu.vmemory
  30.         - --cambricon-mlu-cores=cambricon.com/mlu.smlu.vcore
  31.         - --ascend-name=huawei.com/Ascend910
  32.         - --ascend-memory=huawei.com/Ascend910-memory
  33.         - --ascend310p-name=huawei.com/Ascend310P
  34.         - --ascend310p-memory=huawei.com/Ascend310P-memory
  35.         - --overwrite-env=false
  36.         - --node-scheduler-policy=binpack
  37.         - --gpu-scheduler-policy=spread
复制代码
就是这两个参数
  1. - --node-scheduler-policy=binpack
  2. - --gpu-scheduler-policy=spread
复制代码
2. Node 调度策略原理

这部分比较简单,选择节点的逻辑就在 Filter 接口中。
  1. // pkg/scheduler/scheduler.go#L444
  2. func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
  3.         klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace)
  4.         nums := k8sutil.Resourcereqs(args.Pod)
  5.         total := 0
  6.         for _, n := range nums {
  7.                 for _, k := range n {
  8.                         total += int(k.Nums)
  9.                 }
  10.         }
  11.         if total == 0 {
  12.                 klog.V(1).Infof("pod %v not find resource", args.Pod.Name)
  13.                 s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
  14.                 return &extenderv1.ExtenderFilterResult{
  15.                         NodeNames:   args.NodeNames,
  16.                         FailedNodes: nil,
  17.                         Error:       "",
  18.                 }, nil
  19.         }
  20.         annos := args.Pod.Annotations
  21.         s.delPod(args.Pod)
  22.         nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
  23.         if err != nil {
  24.                 s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
  25.                 return nil, err
  26.         }
  27.         if len(failedNodes) != 0 {
  28.                 klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes)
  29.         }
  30.         nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
  31.         if err != nil {
  32.                 err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
  33.                 s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
  34.                 return nil, err
  35.         }
  36.         if len((*nodeScores).NodeList) == 0 {
  37.                 klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)
  38.                 s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
  39.                 return &extenderv1.ExtenderFilterResult{
  40.                         FailedNodes: failedNodes,
  41.                 }, nil
  42.         }
  43.         klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
  44.         sort.Sort(nodeScores)
  45.         m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
  46.         klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
  47.         annotations := make(map[string]string)
  48.         annotations[util.AssignedNodeAnnotations] = m.NodeID
  49.         annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)
  50.         for _, val := range device.GetDevices() {
  51.                 val.PatchAnnotations(&annotations, m.Devices)
  52.         }
  53.         //InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
  54.         //supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
  55.         //maps.Copy(annotations, InRequestDevices)
  56.         //maps.Copy(annotations, supportDevices)
  57.         s.addPod(args.Pod, m.NodeID, m.Devices)
  58.         err = util.PatchPodAnnotations(args.Pod, annotations)
  59.         if err != nil {
  60.                 s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
  61.                 s.delPod(args.Pod)
  62.                 return nil, err
  63.         }
  64.         s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
  65.         res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
  66.         return &res, nil
  67. }
复制代码
主要就是下面这几句:
  1. //计算得分,拿到所有满足条件的节点
  2. nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
  3. // 排序
  4. sort.Sort(nodeScores)
  5. // 直接选择最后一个节点
  6. m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
  7. // 返回结果
  8. res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
  9. return &res, nil
复制代码
可以分为两个部分:

  • 1)为所有节点计算得分
  • 2)根据调度策略选择最合适的节点
计算得分

得分计算逻辑在 calcScore 方法里:
  1. // pkg/scheduler/score.go#L185
  2. func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) {
  3.         userNodePolicy := config.NodeSchedulerPolicy
  4.         if annos != nil {
  5.                 if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok {
  6.                         userNodePolicy = value
  7.                 }
  8.         }
  9.         res := policy.NodeScoreList{
  10.                 Policy:   userNodePolicy,
  11.                 NodeList: make([]*policy.NodeScore, 0),
  12.         }
  13.         //func calcScore(nodes *map[string]*NodeUsage, errMap *map[string]string, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*NodeScoreList, error) {
  14.         //        res := make(NodeScoreList, 0, len(*nodes))
  15.         for nodeID, node := range *nodes {
  16.                 viewStatus(*node)
  17.                 score := policy.NodeScore{NodeID: nodeID, Devices: make(util.PodDevices), Score: 0}
  18.                 score.ComputeScore(node.Devices)
  19.                 //This loop is for different container request
  20.                 ctrfit := false
  21.                 for ctrid, n := range nums {
  22.                         sums := 0
  23.                         for _, k := range n {
  24.                                 sums += int(k.Nums)
  25.                         }
  26.                         if sums == 0 {
  27.                                 for idx := range score.Devices {
  28.                                         if len(score.Devices[idx]) <= ctrid {
  29.                                                 score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{})
  30.                                         }
  31.                                         score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{})
  32.                                         continue
  33.                                 }
  34.                         }
  35.                         klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID)
  36.                         fit, _ := fitInDevices(node, n, annos, task, &score.Devices)
  37.                         ctrfit = fit
  38.                         if !fit {
  39.                                 klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID)
  40.                                 break
  41.                         }
  42.                 }
  43.                 if ctrfit {
  44.                         res.NodeList = append(res.NodeList, &score)
  45.                 }
  46.         }
  47.         return &res, nil
  48. }
复制代码
GenerateResourceRequests 是 Interface,以 NVIDIA 实现为例
  1. // pkg/scheduler/policy/node_policy.go#L53
  2. func (ns *NodeScore) ComputeScore(devices DeviceUsageList) {
  3.         // current user having request resource
  4.         used, usedCore, usedMem := int32(0), int32(0), int32(0)
  5.         for _, device := range devices.DeviceLists {
  6.                 used += device.Device.Used
  7.                 usedCore += device.Device.Usedcores
  8.                 usedMem += device.Device.Usedmem
  9.         }
  10.         klog.V(2).Infof("node %s used %d, usedCore %d, usedMem %d,", ns.NodeID, used, usedCore, usedMem)
  11.         total, totalCore, totalMem := int32(0), int32(0), int32(0)
  12.         for _, deviceLists := range devices.DeviceLists {
  13.                 total += deviceLists.Device.Count
  14.                 totalCore += deviceLists.Device.Totalcore
  15.                 totalMem += deviceLists.Device.Totalmem
  16.         }
  17.         useScore := float32(used) / float32(total)
  18.         coreScore := float32(usedCore) / float32(totalCore)
  19.         memScore := float32(usedMem) / float32(totalMem)
  20.         ns.Score = float32(Weight) * (useScore + coreScore + memScore)
  21.         klog.V(2).Infof("node %s computer score is %f", ns.NodeID, ns.Score)
  22. }
复制代码
逻辑也比较简单,就是从 Container 的 Resources 中根据名称解析拿到申请的 gpu、gpucore、gpumem 等信息。
过滤节点

逻辑同样在 calcScore 方法中,具体如下:
  1. // pkg/scheduler/scheduler.go#L444
  2. func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
  3.         nums := k8sutil.Resourcereqs(args.Pod)
  4. }
复制代码
这样,我们就把不满足条件的节点给过滤掉了,剩下的节点都是可以正常调度 Pod 的,不过具体选择哪个节点还需要依赖于配置的调度策略。
根据策略选择节点

上一步计算出了每个节点的得分之后,就可以根据策略进行选择了。
  1. //计算得分,拿到所有满足条件的节点
  2. nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
  3. // 排序
  4. sort.Sort(nodeScores)
  5. // 直接选择最后一个节点
  6. m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
  7. // 返回结果
  8. res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
  9. return &res, nil
复制代码
具体的选择逻辑在这里:
  1. // pkg/device/nvidia/device.go#L264
  2. func (dev *NvidiaGPUDevices) GenerateResourceRequests(ctr *corev1.Container) util.ContainerDeviceRequest {
  3.         resourceName := corev1.ResourceName(ResourceName)
  4.         resourceMem := corev1.ResourceName(ResourceMem)
  5.         resourceMemPercentage := corev1.ResourceName(ResourceMemPercentage)
  6.         resourceCores := corev1.ResourceName(ResourceCores)
  7.         v, ok := ctr.Resources.Limits[resourceName]
  8.         if !ok {
  9.                 v, ok = ctr.Resources.Requests[resourceName]
  10.         }
  11.         if ok {
  12.                 if n, ok := v.AsInt64(); ok {
  13.                         memnum := 0
  14.                         mem, ok := ctr.Resources.Limits[resourceMem]
  15.                         if !ok {
  16.                                 mem, ok = ctr.Resources.Requests[resourceMem]
  17.                         }
  18.                         if ok {
  19.                                 memnums, ok := mem.AsInt64()
  20.                                 if ok {
  21.                                         memnum = int(memnums)
  22.                                 }
  23.                         }
  24.                         mempnum := int32(101)
  25.                         mem, ok = ctr.Resources.Limits[resourceMemPercentage]
  26.                         if !ok {
  27.                                 mem, ok = ctr.Resources.Requests[resourceMemPercentage]
  28.                         }
  29.                         if ok {
  30.                                 mempnums, ok := mem.AsInt64()
  31.                                 if ok {
  32.                                         mempnum = int32(mempnums)
  33.                                 }
  34.                         }
  35.                         if mempnum == 101 && memnum == 0 {
  36.                                 if config.DefaultMem != 0 {
  37.                                         memnum = int(config.DefaultMem)
  38.                                 } else {
  39.                                         mempnum = 100
  40.                                 }
  41.                         }
  42.                         corenum := config.DefaultCores
  43.                         core, ok := ctr.Resources.Limits[resourceCores]
  44.                         if !ok {
  45.                                 core, ok = ctr.Resources.Requests[resourceCores]
  46.                         }
  47.                         if ok {
  48.                                 corenums, ok := core.AsInt64()
  49.                                 if ok {
  50.                                         corenum = int32(corenums)
  51.                                 }
  52.                         }
  53.                         return util.ContainerDeviceRequest{
  54.                                 Nums:             int32(n),
  55.                                 Type:             NvidiaGPUDevice,
  56.                                 Memreq:           int32(memnum),
  57.                                 MemPercentagereq: int32(mempnum),
  58.                                 Coresreq:         int32(corenum),
  59.                         }
  60.                 }
  61.         }
  62.         return util.ContainerDeviceRequest{}
  63. }
复制代码
对得分数据排序后,直接就选择了最后一个节点。
初次看到这里时也有点懵,想不明白这怎么和调度策略牵扯到一起的。
实际上具体逻辑就在 sort 这里,NodeScoreList 要实现 sort 接口才能进行排序,因此看下是怎么实现的:
  1.                 ctrfit := false
  2.                 for ctrid, n := range nums {
  3.                         sums := 0
  4.                         for _, k := range n {
  5.                                 sums += int(k.Nums)
  6.                         }
  7.                         if sums == 0 {
  8.                                 for idx := range score.Devices {
  9.                                         if len(score.Devices[idx]) <= ctrid {
  10.                                                 score.Devices[idx] = append(score.Devices[idx], util.ContainerDevices{})
  11.                                         }
  12.                                         score.Devices[idx][ctrid] = append(score.Devices[idx][ctrid], util.ContainerDevice{})
  13.                                         continue
  14.                                 }
  15.                         }
  16.                         klog.V(5).InfoS("fitInDevices", "pod", klog.KObj(task), "node", nodeID)
  17.                         fit, _ := fitInDevices(node, n, annos, task, &score.Devices)
  18.                         ctrfit = fit
  19.                         if !fit {
  20.                                 klog.InfoS("calcScore:node not fit pod", "pod", klog.KObj(task), "node", nodeID)
  21.                                 break
  22.                         }
  23.                 }
  24.                 if ctrfit {
  25.                         res.NodeList = append(res.NodeList, &score)
  26.                 }
复制代码
核心部分:
  1. fit, _ := fitInDevices(node, n, annos, task, &score.Devices)
复制代码
根据我们的 Policy 不同,有两种排序方式,而且排序正好相反。
  1. func fitInCertainDevice(node *NodeUsage, request util.ContainerDeviceRequest, annos map[string]string, pod *corev1.Pod) (bool, map[string]util.ContainerDevices) {
  2.                 // ....
  3.          for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- {
  4.                   if node.Devices.DeviceLists[i].Device.Totalmem-node.Devices.DeviceLists[i].Device.Usedmem < memreq {
  5.                         continue
  6.                 }
  7.                 if node.Devices.DeviceLists[i].Device.Totalcore-node.Devices.DeviceLists[i].Device.Usedcores < k.Coresreq {
  8.                         continue
  9.                 }
  10.                 // Coresreq=100 indicates it want this card exclusively
  11.                 if node.Devices.DeviceLists[i].Device.Totalcore == 100 && k.Coresreq == 100 && node.Devices.DeviceLists[i].Device.Used > 0 {
  12.                         continue
  13.                 }
  14.                 // You can't allocate core=0 job to an already full GPU
  15.                 if node.Devices.DeviceLists[i].Device.Totalcore != 0 && node.Devices.DeviceLists[i].Device.Usedcores == node.Devices.DeviceLists[i].Device.Totalcore && k.Coresreq == 0 {
  16.                         continue
  17.                 }
  18.                 if k.Nums > 0 {
  19.                         klog.InfoS("first fitted", "pod", klog.KObj(pod), "device", node.Devices.DeviceLists[i].Device.ID)
  20.                         k.Nums--
  21.                         tmpDevs[k.Type] = append(tmpDevs[k.Type], util.ContainerDevice{
  22.                                 Idx:       int(node.Devices.DeviceLists[i].Device.Index),
  23.                                 UUID:      node.Devices.DeviceLists[i].Device.ID,
  24.                                 Type:      k.Type,
  25.                                 Usedmem:   memreq,
  26.                                 Usedcores: k.Coresreq,
  27.                         })
  28.                 }
  29.                 if k.Nums == 0 {
  30.                         klog.InfoS("device allocate success", "pod", klog.KObj(pod), "allocate device", tmpDevs)
  31.                         return true, tmpDevs
  32.                 }
  33.         }
  34.         return false, tmpDevs
  35. }
复制代码
这里涉及到 sort.Sort() 的实现,简单来说:
<ul>如果Less()方法中使用大于(>)比较,最终排序结果将是降序。
如果Less()方法中使用小于( len(node.Devices.DeviceLists) {                        klog.InfoS("request devices nums cannot exceed the total number of devices on the node.", "pod", klog.KObj(pod), "request devices nums", k.Nums, "node device nums", len(node.Devices.DeviceLists))                        return false, 0                }                func fitInDevices(node *NodeUsage, requests util.ContainerDeviceRequests, annos map[string]string, pod *corev1.Pod, devinput *util.PodDevices) (bool, float32) {
        //devmap := make(map[string]util.ContainerDevices)
        devs := util.ContainerDevices{}
        total, totalCore, totalMem := int32(0), int32(0), int32(0)
        free, freeCore, freeMem := int32(0), int32(0), int32(0)
        sums := 0
        // computer all device score for one node
        for index := range node.Devices.DeviceLists {
                node.Devices.DeviceLists[index].ComputeScore(requests)
        }
        //This loop is for requests for different devices
        for _, k := range requests {
                sums += int(k.Nums)
                if int(k.Nums) > len(node.Devices.DeviceLists) {
                        klog.InfoS("request devices nums cannot exceed the total number of devices on the node.", "pod", klog.KObj(pod), "request devices nums", k.Nums, "node device nums", len(node.Devices.DeviceLists))
                        return false, 0
                }
                sort.Sort(node.Devices)
                fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)
                if fit {
                        for _, val := range tmpDevs[k.Type] {
                                total += node.Devices.DeviceLists[val.Idx].Device.Count
                                totalCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore
                                totalMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem
                                free += node.Devices.DeviceLists[val.Idx].Device.Count - node.Devices.DeviceLists[val.Idx].Device.Used
                                freeCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore - node.Devices.DeviceLists[val.Idx].Device.Usedcores
                                freeMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem - node.Devices.DeviceLists[val.Idx].Device.Usedmem

                                node.Devices.DeviceLists[val.Idx].Device.Used++
                                node.Devices.DeviceLists[val.Idx].Device.Usedcores += val.Usedcores
                                node.Devices.DeviceLists[val.Idx].Device.Usedmem += val.Usedmem
                        }
                        devs = append(devs, tmpDevs[k.Type]...)
                } else {
                        return false, 0
                }
                (*devinput)[k.Type] = append((*devinput)[k.Type], devs)
        }
        return true, 0
}                // 排序
sort.Sort(nodeScores)
// 直接选择最后一个节点
m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]                if fit {                        for _, val := range tmpDevs[k.Type] {                                total += node.Devices.DeviceLists[val.Idx].Device.Count                                totalCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore                                totalMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem                                free += node.Devices.DeviceLists[val.Idx].Device.Count - node.Devices.DeviceLists[val.Idx].Device.Used                                freeCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore - node.Devices.DeviceLists[val.Idx].Device.Usedcores                                freeMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem - node.Devices.DeviceLists[val.Idx].Device.Usedmem                                node.Devices.DeviceLists[val.Idx].Device.Used++                                node.Devices.DeviceLists[val.Idx].Device.Usedcores += val.Usedcores                                node.Devices.DeviceLists[val.Idx].Device.Usedmem += val.Usedmem                        }                        devs = append(devs, tmpDevs[k.Type]...)                } else {                        return false, 0                }                (*devinput)[k.Type] = append((*devinput)[k.Type], devs)        }        return true, 0}[/code]核心部分:
  1. //计算得分,拿到所有满足条件的节点
  2. nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
  3. // 排序
  4. sort.Sort(nodeScores)
  5. // 直接选择最后一个节点
  6. m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
  7. // 返回结果
  8. res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
  9. return &res, nil
复制代码
这里又出现了 sort.Sort 是不是有点熟悉,不过暂时先不管,还是先分析怎么过滤 GPU 的。
核心部分在fitInCertainDevice 中,根据 Pod 申请的 GPU 信息找出当前节点上所有满足条件的 GPU
  1. // 排序
  2. sort.Sort(nodeScores)
  3. // 直接选择最后一个节点
  4. m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
复制代码
fitInCertainDevice 在前面过滤 Node 时也分析过,这里就简单看下
  1. // pkg/scheduler/policy/node_policy.go#L32
  2. type NodeScoreList struct {
  3.     NodeList []*NodeScore
  4.     Policy   string
  5. }
  6. func (l NodeScoreList) Len() int {
  7.     return len(l.NodeList)
  8. }
  9. func (l NodeScoreList) Swap(i, j int) {
  10.     l.NodeList[i], l.NodeList[j] = l.NodeList[j], l.NodeList[i]
  11. }
  12. func (l NodeScoreList) Less(i, j int) bool {
  13.     if l.Policy == NodeSchedulerPolicySpread.String() {
  14.        return l.NodeList[i].Score > l.NodeList[j].Score
  15.     }
  16.     // default policy is Binpack
  17.     return l.NodeList[i].Score < l.NodeList[j].Score
  18. }
复制代码
如果某个 GPU 能满足这些条件就认为该 GPU 可以分配给对应 Container。
又回到前面的核心逻辑,对于满足条件的 GPU,这里使用了 devinput 对象进行记录。
  1. //计算得分,拿到所有满足条件的节点
  2. nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
  3. // 排序
  4. sort.Sort(nodeScores)
  5. // 直接选择最后一个节点
  6. m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
  7. // 返回结果
  8. res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
  9. return &res, nil
复制代码
这里的 devinput 实际上就是前面传进来的 Score 对象。
  1. // NodeSchedulerPolicyBinpack is node use binpack scheduler policy.
  2. NodeSchedulerPolicyBinpack SchedulerPolicyName = "binpack"
  3. // NodeSchedulerPolicySpread is node use spread scheduler policy.
  4. NodeSchedulerPolicySpread SchedulerPolicyName = "spread"
复制代码
标记到 Pod 上

hami 为了让后续的 DevicePlugin 能够知道要把哪些 GPU 分配给该 Pod,是直接将其记录到 Pod 的 Annoations 上的。
  1. func (s *Scheduler) calcScore(nodes *map[string]*NodeUsage, nums util.PodDeviceRequests, annos map[string]string, task *corev1.Pod) (*policy.NodeScoreList, error) {
  2.         userNodePolicy := config.NodeSchedulerPolicy
  3.         if annos != nil {
  4.                 if value, ok := annos[policy.NodeSchedulerPolicyAnnotationKey]; ok {
  5.                         userNodePolicy = value
  6.                 }
  7.         }
  8.         res := policy.NodeScoreList{
  9.                 Policy:   userNodePolicy,
  10.                 NodeList: make([]*policy.NodeScore, 0),
  11.         }
  12. }
复制代码
选择到节点之后,还把 m.Devices 信息记录到了 Pod 的 Annoations 上。
  1.         rootCmd.Flags().StringVar(&config.NodeSchedulerPolicy, "node-scheduler-policy", policy.NodeSchedulerPolicyBinpack.String(), "node scheduler policy")
  2.         rootCmd.Flags().StringVar(&config.GPUSchedulerPolicy, "gpu-scheduler-policy", policy.GPUSchedulerPolicySpread.String(), "GPU scheduler policy")
复制代码
这里的 m.Devices 实际上就是前面我们过滤出来的满足条件的 GPU。
Annoations 大概是这样的:
  1. // pkg/scheduler/scheduler.go#L444
  2. func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
  3.         klog.InfoS("begin schedule filter", "pod", args.Pod.Name, "uuid", args.Pod.UID, "namespaces", args.Pod.Namespace)
  4.         nums := k8sutil.Resourcereqs(args.Pod)
  5.         total := 0
  6.         for _, n := range nums {
  7.                 for _, k := range n {
  8.                         total += int(k.Nums)
  9.                 }
  10.         }
  11.         if total == 0 {
  12.                 klog.V(1).Infof("pod %v not find resource", args.Pod.Name)
  13.                 s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("does not request any resource"))
  14.                 return &extenderv1.ExtenderFilterResult{
  15.                         NodeNames:   args.NodeNames,
  16.                         FailedNodes: nil,
  17.                         Error:       "",
  18.                 }, nil
  19.         }
  20.         annos := args.Pod.Annotations
  21.         s.delPod(args.Pod)
  22.         nodeUsage, failedNodes, err := s.getNodesUsage(args.NodeNames, args.Pod)
  23.         if err != nil {
  24.                 s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
  25.                 return nil, err
  26.         }
  27.         if len(failedNodes) != 0 {
  28.                 klog.V(5).InfoS("getNodesUsage failed nodes", "nodes", failedNodes)
  29.         }
  30.         nodeScores, err := s.calcScore(nodeUsage, nums, annos, args.Pod)
  31.         if err != nil {
  32.                 err := fmt.Errorf("calcScore failed %v for pod %v", err, args.Pod.Name)
  33.                 s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
  34.                 return nil, err
  35.         }
  36.         if len((*nodeScores).NodeList) == 0 {
  37.                 klog.V(4).Infof("All node scores do not meet for pod %v", args.Pod.Name)
  38.                 s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, fmt.Errorf("no available node, all node scores do not meet"))
  39.                 return &extenderv1.ExtenderFilterResult{
  40.                         FailedNodes: failedNodes,
  41.                 }, nil
  42.         }
  43.         klog.V(4).Infoln("nodeScores_len=", len((*nodeScores).NodeList))
  44.         sort.Sort(nodeScores)
  45.         m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]
  46.         klog.Infof("schedule %v/%v to %v %v", args.Pod.Namespace, args.Pod.Name, m.NodeID, m.Devices)
  47.         annotations := make(map[string]string)
  48.         annotations[util.AssignedNodeAnnotations] = m.NodeID
  49.         annotations[util.AssignedTimeAnnotations] = strconv.FormatInt(time.Now().Unix(), 10)
  50.         for _, val := range device.GetDevices() {
  51.                 val.PatchAnnotations(&annotations, m.Devices)
  52.         }
  53.         //InRequestDevices := util.EncodePodDevices(util.InRequestDevices, m.devices)
  54.         //supportDevices := util.EncodePodDevices(util.SupportDevices, m.devices)
  55.         //maps.Copy(annotations, InRequestDevices)
  56.         //maps.Copy(annotations, supportDevices)
  57.         s.addPod(args.Pod, m.NodeID, m.Devices)
  58.         err = util.PatchPodAnnotations(args.Pod, annotations)
  59.         if err != nil {
  60.                 s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringFailed, []string{}, err)
  61.                 s.delPod(args.Pod)
  62.                 return nil, err
  63.         }
  64.         s.recordScheduleFilterResultEvent(args.Pod, EventReasonFilteringSucceed, []string{m.NodeID}, nil)
  65.         res := extenderv1.ExtenderFilterResult{NodeNames: &[]string{m.NodeID}}
  66.         return &res, nil
  67. }
复制代码

  • hami.io/vgpu-devices-to-allocate 是 Scheduler 为 Pod 选择的目标 GPU
  • hami.io/vgpu-devices-allocated 是当前已经分配的
ps:对于已经调度的 Pod hami.io/vgpu-devices-to-allocate 会被清空
调度完成后,DevicePlugin 直接读取 hami.io/vgpu-devices-to-allocate 就知道要为该 Pod 分配哪些 GPU 了。
根据策略选择 GPU

前面都已经选出了满足条件的 GPU 甚至都记录到了 Pod 的 Annoations 上了,那么 GPU 调度策略是什么时候生效的呢?
GPU 打分

逻辑和 Node 打分逻辑基本一致:都是剩余资源越多,得分越低
  1. // pkg/scheduler/scheduler.go#L444
  2. func (s *Scheduler) Filter(args extenderv1.ExtenderArgs) (*extenderv1.ExtenderFilterResult, error) {
  3.         nums := k8sutil.Resourcereqs(args.Pod)
  4. }
复制代码
排序

这部分也在 fitInDevices 方法中
  1. // pkg/scheduler/score.go#L144func fitInDevices(node *NodeUsage, requests util.ContainerDeviceRequests, annos map[string]string, pod *corev1.Pod, devinput *util.PodDevices) (bool, float32) {  for _, k := range requests {      func fitInDevices(node *NodeUsage, requests util.ContainerDeviceRequests, annos map[string]string, pod *corev1.Pod, devinput *util.PodDevices) (bool, float32) {
  2.         //devmap := make(map[string]util.ContainerDevices)
  3.         devs := util.ContainerDevices{}
  4.         total, totalCore, totalMem := int32(0), int32(0), int32(0)
  5.         free, freeCore, freeMem := int32(0), int32(0), int32(0)
  6.         sums := 0
  7.         // computer all device score for one node
  8.         for index := range node.Devices.DeviceLists {
  9.                 node.Devices.DeviceLists[index].ComputeScore(requests)
  10.         }
  11.         //This loop is for requests for different devices
  12.         for _, k := range requests {
  13.                 sums += int(k.Nums)
  14.                 if int(k.Nums) > len(node.Devices.DeviceLists) {
  15.                         klog.InfoS("request devices nums cannot exceed the total number of devices on the node.", "pod", klog.KObj(pod), "request devices nums", k.Nums, "node device nums", len(node.Devices.DeviceLists))
  16.                         return false, 0
  17.                 }
  18.                 sort.Sort(node.Devices)
  19.                 fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)
  20.                 if fit {
  21.                         for _, val := range tmpDevs[k.Type] {
  22.                                 total += node.Devices.DeviceLists[val.Idx].Device.Count
  23.                                 totalCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore
  24.                                 totalMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem
  25.                                 free += node.Devices.DeviceLists[val.Idx].Device.Count - node.Devices.DeviceLists[val.Idx].Device.Used
  26.                                 freeCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore - node.Devices.DeviceLists[val.Idx].Device.Usedcores
  27.                                 freeMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem - node.Devices.DeviceLists[val.Idx].Device.Usedmem
  28.                                 node.Devices.DeviceLists[val.Idx].Device.Used++
  29.                                 node.Devices.DeviceLists[val.Idx].Device.Usedcores += val.Usedcores
  30.                                 node.Devices.DeviceLists[val.Idx].Device.Usedmem += val.Usedmem
  31.                         }
  32.                         devs = append(devs, tmpDevs[k.Type]...)
  33.                 } else {
  34.                         return false, 0
  35.                 }
  36.                 (*devinput)[k.Type] = append((*devinput)[k.Type], devs)
  37.         }
  38.         return true, 0
  39. }      // 排序
  40. sort.Sort(nodeScores)
  41. // 直接选择最后一个节点
  42. m := (*nodeScores).NodeList[len((*nodeScores).NodeList)-1]      if fit {        devs = append(devs, tmpDevs[k.Type]...)      } else {        return false, 0      }      (*devinput)[k.Type] = append((*devinput)[k.Type], devs)  }}
复制代码
核心就是这个 sort 方法
  1. func fitInDevices(node *NodeUsage, requests util.ContainerDeviceRequests, annos map[string]string, pod *corev1.Pod, devinput *util.PodDevices) (bool, float32) {
  2.         //devmap := make(map[string]util.ContainerDevices)
  3.         devs := util.ContainerDevices{}
  4.         total, totalCore, totalMem := int32(0), int32(0), int32(0)
  5.         free, freeCore, freeMem := int32(0), int32(0), int32(0)
  6.         sums := 0
  7.         // computer all device score for one node
  8.         for index := range node.Devices.DeviceLists {
  9.                 node.Devices.DeviceLists[index].ComputeScore(requests)
  10.         }
  11.         //This loop is for requests for different devices
  12.         for _, k := range requests {
  13.                 sums += int(k.Nums)
  14.                 if int(k.Nums) > len(node.Devices.DeviceLists) {
  15.                         klog.InfoS("request devices nums cannot exceed the total number of devices on the node.", "pod", klog.KObj(pod), "request devices nums", k.Nums, "node device nums", len(node.Devices.DeviceLists))
  16.                         return false, 0
  17.                 }
  18.                 sort.Sort(node.Devices)
  19.                 fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)
  20.                 if fit {
  21.                         for _, val := range tmpDevs[k.Type] {
  22.                                 total += node.Devices.DeviceLists[val.Idx].Device.Count
  23.                                 totalCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore
  24.                                 totalMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem
  25.                                 free += node.Devices.DeviceLists[val.Idx].Device.Count - node.Devices.DeviceLists[val.Idx].Device.Used
  26.                                 freeCore += node.Devices.DeviceLists[val.Idx].Device.Totalcore - node.Devices.DeviceLists[val.Idx].Device.Usedcores
  27.                                 freeMem += node.Devices.DeviceLists[val.Idx].Device.Totalmem - node.Devices.DeviceLists[val.Idx].Device.Usedmem
  28.                                 node.Devices.DeviceLists[val.Idx].Device.Used++
  29.                                 node.Devices.DeviceLists[val.Idx].Device.Usedcores += val.Usedcores
  30.                                 node.Devices.DeviceLists[val.Idx].Device.Usedmem += val.Usedmem
  31.                         }
  32.                         devs = append(devs, tmpDevs[k.Type]...)
  33.                 } else {
  34.                         return false, 0
  35.                 }
  36.                 (*devinput)[k.Type] = append((*devinput)[k.Type], devs)
  37.         }
  38.         return true, 0
  39. }
复制代码
又出现了 sort.Sort 是不是想到了什么。
前面选择节点的时候也是这样实现的,把具体逻辑放在 sort 接口实现上。看看 GPU 的 Sort 接口怎么实现的:
  1. for _, k := range requests {
  2.           sort.Sort(node.Devices)
  3.                 fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)
  4.           if fit {
  5.                         devs = append(devs, tmpDevs[k.Type]...)
  6.                 } else {
  7.                         return false, 0
  8.                 }
  9.                 (*devinput)[k.Type] = append((*devinput)[k.Type], devs)
  10. }
复制代码
果然又是这样的,根据不同的 GPU 调度策略,Less 方法返回不同结果以控制排序结果是降序还是升序。
选择 GPU

然后后续再选择 GPU 的时候的代码如下:
  1. // pkg/scheduler/policy/node_policy.go#L32
  2. type NodeScoreList struct {
  3.     NodeList []*NodeScore
  4.     Policy   string
  5. }
  6. func (l NodeScoreList) Len() int {
  7.     return len(l.NodeList)
  8. }
  9. func (l NodeScoreList) Swap(i, j int) {
  10.     l.NodeList[i], l.NodeList[j] = l.NodeList[j], l.NodeList[i]
  11. }
  12. func (l NodeScoreList) Less(i, j int) bool {
  13.     if l.Policy == NodeSchedulerPolicySpread.String() {
  14.        return l.NodeList[i].Score > l.NodeList[j].Score
  15.     }
  16.     // default policy is Binpack
  17.     return l.NodeList[i].Score < l.NodeList[j].Score
  18. }
复制代码
核心是这个 for 循环
  1. func fitInCertainDevice(node *NodeUsage, request util.ContainerDeviceRequest, annos map[string]string, pod *corev1.Pod) (bool, map[string]util.ContainerDevices) {
  2.         for i := len(node.Devices.DeviceLists) - 1; i >= 0; i-- {
  3.                         continue
  4.                 }
  5.                 if node.Devices.DeviceLists[i].Device.Totalcore-node.Devices.DeviceLists[i].Device.Usedcores < k.Coresreq {
  6.                         continue
  7.                 }
  8.                 // Coresreq=100 indicates it want this card exclusively
  9.                 if node.Devices.DeviceLists[i].Device.Totalcore == 100 && k.Coresreq == 100 && node.Devices.DeviceLists[i].Device.Used > 0 {
  10.                         continue
  11.                 }
  12.                 // You can't allocate core=0 job to an already full GPU
  13.                 if node.Devices.DeviceLists[i].Device.Totalcore != 0 && node.Devices.DeviceLists[i].Device.Usedcores == node.Devices.DeviceLists[i].Device.Totalcore && k.Coresreq == 0 {
  14.                         continue
  15.                 }
  16.                 if k.Nums > 0 {
  17.                         k.Nums--
  18.                         tmpDevs[k.Type] = append(tmpDevs[k.Type], util.ContainerDevice{
  19.                                 Idx:       int(node.Devices.DeviceLists[i].Device.Index),
  20.                                 UUID:      node.Devices.DeviceLists[i].Device.ID,
  21.                                 Type:      k.Type,
  22.                                 Usedmem:   memreq,
  23.                                 Usedcores: k.Coresreq,
  24.                         })
  25.                 }
  26.                 if k.Nums == 0 {
  27.                         klog.InfoS("device allocate success", "pod", klog.KObj(pod), "allocate device", tmpDevs)
  28.                         return true, tmpDevs
  29.                 }
  30.         }
  31.         return false, tmpDevs
  32.   }
  33. }
复制代码
也是从最后一个 GPU 开始的,也就是如果排在后面的 GPU 满足条件就会直接被选中,不会再去选择前面的了。

  • Binpack 策略:结果为升序,越往后的 GPU 空闲资源越少
  • Spread 策略:结果为降序,越往后的 GPU 空闲资源越多
同样也是符合对应策略含义的。
至此,GPU 调度策略也分析完了。
DevicePlugin 解析 GPU 信息

在调度时,我们把最终选择的 GPU 记录到了 Pod 的 Annoations 上,DevicePlugin 这边就不需要选择 GPU 了,从 Annoations 上解析即可
  1. for _, k := range requests {
  2.           sort.Sort(node.Devices)
  3.                 fit, tmpDevs := fitInCertainDevice(node, k, annos, pod)
  4.           if fit {
  5.                         devs = append(devs, tmpDevs[k.Type]...)
  6.                 } else {
  7.                         return false, 0
  8.                 }
  9.                 (*devinput)[k.Type] = append((*devinput)[k.Type], devs)
  10. }
复制代码
具体的解析逻辑如下,就是按照预设规则,以冒号,逗号进行切分
  1. type NodeScore struct {
  2.         NodeID  string
  3.         Devices util.PodDevices
  4.         // Score recode every node all device user/allocate score
  5.         Score float32
  6. }
复制代码
至此,hami 提供的 Node、GPU 级别的 Spread、Binpack 高级调度策略就分析完成了。
【Kubernetes 系列】持续更新中,搜索公众号【探索云原生】订阅,阅读更多文章。
3.png

4. 小结

调度策略配置
hami-scheduler 提供了两种不同级别的调度策略:

  • 节点调度策略:作用于调度过程中如何为 Pod 选择节点
  • GPU 调度策略:作用于选择节点后,节点存在多 GPU 时如何为 Pod 选择 GPU
二者都支持 Spread 和 Binpack 两种配置:

  • Spread 表示尽量将任务分配到不同 Node 或 GPU 上,让集群中的 Node 或 GPU 负载尽量保持相同水位线。
  • Binpack 表示尽量将任务分配到同一 Node 或者 GPU 上,尽量先占满一个 Node 或者 GPU 后再使用别的
具体 Node、GPU 调度策略实现都可以分为以下几步

  • 1)给 Node、GPU 打分
  • 2)过滤掉不满足条件的 Node、GPU
  • 3)根据调度策略选择出最优 Node、GPU

    • 具体逻辑都在 sort.Sort 接口的 Less 方法实现的
    • 对于 Spread 策略就选择剩余资源多的 Node、GPU,Binpack 策略就选择剩余资源少的 Node、GPU

  • 4)对结果进行记录

    • Node 则是通过 Bind 结果直接和 Pod 绑定
    • GPU 则是记录到 Pod 的 Annoations 上

所有逻辑都在 Filter 方法里,Node 的调度策略还算比较清晰,除了 sort.Sort 这个点需要多看会之外,其他都还好。
GPU 调度策略就复杂了一点,所有逻辑都混在一起的,不是很容易区分,需要慢慢分析。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册