找回密码
 立即注册
首页 业界区 业界 开源 vGPU 方案 HAMi 原理分析 Part1:hami-device-plug ...

开源 vGPU 方案 HAMi 原理分析 Part1:hami-device-plugin-nvidia 实现

系味 4 天前
1.png

本文为开源的 vGPU 方案 HAMi 实现原理分析第一篇,主要分析 hami-device-plugin-nvidia 实现原理。
之前在 开源 vGPU 方案:HAMi,实现细粒度 GPU 切分 介绍了 HAMi 是什么,然后在开源 vGPU 方案 HAMi: core&memory 隔离测试 中对 HAMi 提供的 vGPU 方案进行了测试。
接下来则是逐步分析 HAMi 中的 vGPU 实现原理,涉及到的东西比较多,暂定分为几部分:

  • 1)hami-device-plugin-nvidia:HAMi 版本的 device plugin 中 GPU 感知以及分配逻辑是怎么实现的,和 NVIDIA 原生的 device-plugin 有和不同。
  • 2)HAMI-Scheduler:HAMi 是如何做调度的,binpack/spread 高级调度策略是怎么实现的
  • 3)HAMi-Core:这也是 vCUDA 方案最核心的部分,HAMi 是如何通过拦截 CUDA API 实现 Core&Memory 隔离限制的
本文为第一篇,分析 hami-device-plugin-nvidia 实现原理。
1. 概述

NVIDIA 是有自己实现 device plugin 的,那么问题来了:HAMi 为什么还要自己实现一个 device plugin 呢?
是 hami-device-plugin-nvidia 是有哪些 NVIDIA 原生  device plugin 没有的功能吗?带着疑问,我们开始查看 hami-device-plugin-nvidia 源码。
这部分需要大家对 GPU Operator、k8s device plugin 等比较熟悉阅读起来才比较丝滑。
推荐阅读

  • GPU 环境搭建指南:如何在裸机、Docker、K8s 等环境中使用 GPU
  • GPU 环境搭建指南:使用 GPU Operator 加速 Kubernetes GPU 环境搭建
  • Kubernetes教程(二一)—自定义资源支持:K8s Device Plugin 从原理到实现
  • Kubernetes教程(二二)—在 K8S 中创建 Pod 是如何使用到 GPU 的:device plugin&nvidia-container-toolkit 源码分析
后续都默认大家都对这块比较熟悉了,特别是后两篇
2. 程序入口

HAMi 首先支持的是 NVIDIA GPU,单独实现了一个 device plugin nvidia。

  • 启动文件在 cmd/device-plugin/nvidia
  • 核心实现在 pkg/device-plugin/nvidiadevice
默认大家都对 k8s 的 device plugin 机制比较熟悉了,因此这里只分析核心代码逻辑,不然篇幅就太长了。
对于一个 device plugin 我们一般关注以下 3 个地方:

  • Register:将插件注册到 Kubelet 的,参数 ResourceName 比较重要
  • ListAndWatch:device plugin 是怎么感知 GPU 并上报的
  • Allocate:device plugin 是如何将 GPU 分配给 Pod 的
启动命令在 /cmd/device-plugin/nvidia,用的是 github.com/urfave/cli/v2 构建的一个命令行工具。
  1. func main() {
  2.     var configFile string
  3.     c := cli.NewApp()
  4.     c.Name = "NVIDIA Device Plugin"
  5.     c.Usage = "NVIDIA device plugin for Kubernetes"
  6.     c.Version = info.GetVersionString()
  7.     c.Action = func(ctx *cli.Context) error {
  8.        return start(ctx, c.Flags)
  9.     }
  10.     c.Flags = []cli.Flag{
  11.        &cli.StringFlag{
  12.           Name:    "mig-strategy",
  13.           Value:   spec.MigStrategyNone,
  14.           Usage:   "the desired strategy for exposing MIG devices on GPUs that support it:\n\t\t[none | single | mixed]",
  15.           EnvVars: []string{"MIG_STRATEGY"},
  16.        },
  17.        &cli.BoolFlag{
  18.           Name:    "fail-on-init-error",
  19.           Value:   true,
  20.           Usage:   "fail the plugin if an error is encountered during initialization, otherwise block indefinitely",
  21.           EnvVars: []string{"FAIL_ON_INIT_ERROR"},
  22.        },
  23.        &cli.StringFlag{
  24.           Name:    "nvidia-driver-root",
  25.           Value:   "/",
  26.           Usage:   "the root path for the NVIDIA driver installation (typical values are '/' or '/run/nvidia/driver')",
  27.           EnvVars: []string{"NVIDIA_DRIVER_ROOT"},
  28.        },
  29.        &cli.BoolFlag{
  30.           Name:    "pass-device-specs",
  31.           Value:   false,
  32.           Usage:   "pass the list of DeviceSpecs to the kubelet on Allocate()",
  33.           EnvVars: []string{"PASS_DEVICE_SPECS"},
  34.        },
  35.        &cli.StringSliceFlag{
  36.           Name:    "device-list-strategy",
  37.           Value:   cli.NewStringSlice(string(spec.DeviceListStrategyEnvvar)),
  38.           Usage:   "the desired strategy for passing the device list to the underlying runtime:\n\t\t[envvar | volume-mounts | cdi-annotations]",
  39.           EnvVars: []string{"DEVICE_LIST_STRATEGY"},
  40.        },
  41.        &cli.StringFlag{
  42.           Name:    "device-id-strategy",
  43.           Value:   spec.DeviceIDStrategyUUID,
  44.           Usage:   "the desired strategy for passing device IDs to the underlying runtime:\n\t\t[uuid | index]",
  45.           EnvVars: []string{"DEVICE_ID_STRATEGY"},
  46.        },
  47.        &cli.BoolFlag{
  48.           Name:    "gds-enabled",
  49.           Usage:   "ensure that containers are started with NVIDIA_GDS=enabled",
  50.           EnvVars: []string{"GDS_ENABLED"},
  51.        },
  52.        &cli.BoolFlag{
  53.           Name:    "mofed-enabled",
  54.           Usage:   "ensure that containers are started with NVIDIA_MOFED=enabled",
  55.           EnvVars: []string{"MOFED_ENABLED"},
  56.        },
  57.        &cli.StringFlag{
  58.           Name:        "config-file",
  59.           Usage:       "the path to a config file as an alternative to command line options or environment variables",
  60.           Destination: &configFile,
  61.           EnvVars:     []string{"CONFIG_FILE"},
  62.        },
  63.        &cli.StringFlag{
  64.           Name:    "cdi-annotation-prefix",
  65.           Value:   spec.DefaultCDIAnnotationPrefix,
  66.           Usage:   "the prefix to use for CDI container annotation keys",
  67.           EnvVars: []string{"CDI_ANNOTATION_PREFIX"},
  68.        },
  69.        &cli.StringFlag{
  70.           Name:    "nvidia-ctk-path",
  71.           Value:   spec.DefaultNvidiaCTKPath,
  72.           Usage:   "the path to use for the nvidia-ctk in the generated CDI specification",
  73.           EnvVars: []string{"NVIDIA_CTK_PATH"},
  74.        },
  75.        &cli.StringFlag{
  76.           Name:    "container-driver-root",
  77.           Value:   spec.DefaultContainerDriverRoot,
  78.           Usage:   "the path where the NVIDIA driver root is mounted in the container; used for generating CDI specifications",
  79.           EnvVars: []string{"CONTAINER_DRIVER_ROOT"},
  80.        },
  81.     }
  82.     c.Flags = append(c.Flags, addFlags()...)
  83.     err := c.Run(os.Args)
  84.     if err != nil {
  85.        klog.Error(err)
  86.        os.Exit(1)
  87.     }
  88. }
  89. func addFlags() []cli.Flag {
  90.     addition := []cli.Flag{
  91.        &cli.StringFlag{
  92.           Name:    "node-name",
  93.           Value:   os.Getenv(util.NodeNameEnvName),
  94.           Usage:   "node name",
  95.           EnvVars: []string{"NodeName"},
  96.        },
  97.        &cli.UintFlag{
  98.           Name:    "device-split-count",
  99.           Value:   2,
  100.           Usage:   "the number for NVIDIA device split",
  101.           EnvVars: []string{"DEVICE_SPLIT_COUNT"},
  102.        },
  103.        &cli.Float64Flag{
  104.           Name:    "device-memory-scaling",
  105.           Value:   1.0,
  106.           Usage:   "the ratio for NVIDIA device memory scaling",
  107.           EnvVars: []string{"DEVICE_MEMORY_SCALING"},
  108.        },
  109.        &cli.Float64Flag{
  110.           Name:    "device-cores-scaling",
  111.           Value:   1.0,
  112.           Usage:   "the ratio for NVIDIA device cores scaling",
  113.           EnvVars: []string{"DEVICE_CORES_SCALING"},
  114.        },
  115.        &cli.BoolFlag{
  116.           Name:    "disable-core-limit",
  117.           Value:   false,
  118.           Usage:   "If set, the core utilization limit will be ignored",
  119.           EnvVars: []string{"DISABLE_CORE_LIMIT"},
  120.        },
  121.        &cli.StringFlag{
  122.           Name:  "resource-name",
  123.           Value: "nvidia.com/gpu",
  124.           Usage: "the name of field for number GPU visible in container",
  125.        },
  126.     }
  127.     return addition
  128. }
复制代码
启动时做了两件事:

  • 将插件注册到 Kubelet
  • 启动一个 gRPC 服务
我们只需要关注一下接收的几个参数:
  1. &cli.UintFlag{
  2.     Name:    "device-split-count",
  3.     Value:   2,
  4.     Usage:   "the number for NVIDIA device split",
  5.     EnvVars: []string{"DEVICE_SPLIT_COUNT"},
  6. },
  7. &cli.Float64Flag{
  8.     Name:    "device-memory-scaling",
  9.     Value:   1.0,
  10.     Usage:   "the ratio for NVIDIA device memory scaling",
  11.     EnvVars: []string{"DEVICE_MEMORY_SCALING"},
  12. },
  13. &cli.Float64Flag{
  14.     Name:    "device-cores-scaling",
  15.     Value:   1.0,
  16.     Usage:   "the ratio for NVIDIA device cores scaling",
  17.     EnvVars: []string{"DEVICE_CORES_SCALING"},
  18. },
  19. &cli.BoolFlag{
  20.     Name:    "disable-core-limit",
  21.     Value:   false,
  22.     Usage:   "If set, the core utilization limit will be ignored",
  23.     EnvVars: []string{"DISABLE_CORE_LIMIT"},
  24. },
  25. &cli.StringFlag{
  26.     Name:  "resource-name",
  27.     Value: "nvidia.com/gpu",
  28.     Usage: "the name of field for number GPU visible in container",
  29. },
复制代码

  • device-split-count:表示 GPU 的分割数,每一张 GPU 都不能分配超过其配置数目的任务。若其配置为 N 的话,每个 GPU 上最多可以同时存在 N 个任务。

    • 建议根据 GPU 性能动态调整,一般建议大于 10。

  • device-memory-scaling:表示 GPU memory 的 oversubscription(超额订阅)** **比例,默认 1.0,大于 1.0 则表示启用虚拟显存(实验功能),不建议修改。
  • device-cores-scaling:表示 GPU core 的 oversubscription(超额订阅)比例,默认 1.0。
  • disable-core-limit:是否关闭 GPU Core Limit,默认 false,不建议修改。
  • resource-name:资源名称,建议改掉,不推荐使用默认的 nvidia.com/gpu 因为这个和 nvidia 原生的重复了。
3. Register

Register

Register 方法实现如下:
  1. // pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go#L222
  2. // Register registers the device plugin for the given resourceName with Kubelet.
  3. func (plugin *NvidiaDevicePlugin) Register() error {
  4.     conn, err := plugin.dial(kubeletdevicepluginv1beta1.KubeletSocket, 5*time.Second)
  5.     if err != nil {
  6.        return err
  7.     }
  8.     defer conn.Close()
  9.     client := kubeletdevicepluginv1beta1.NewRegistrationClient(conn)
  10.     reqt := &kubeletdevicepluginv1beta1.RegisterRequest{
  11.        Version:      kubeletdevicepluginv1beta1.Version,
  12.        Endpoint:     path.Base(plugin.socket),
  13.        ResourceName: string(plugin.rm.Resource()),
  14.        Options: &kubeletdevicepluginv1beta1.DevicePluginOptions{
  15.           GetPreferredAllocationAvailable: false,
  16.        },
  17.     }
  18.     _, err = client.Register(context.Background(), reqt)
  19.     if err != nil {
  20.        return err
  21.     }
  22.     return nil
  23. }
复制代码
device plugin 注册时的几个核心信息:

  • ResourceName:资源名称,这个和创建 Pod 时申请 vGPU 的资源名匹配时就会使用到该 device plugin。

    • 也是可以在 deivce plugin 启动时配置的,一般叫做 --resource-name=nvidia.com/vgpu

  • Version:device plugin 的版本,这里是 v1beta1
  • Endpoint:device plugin 的访问地址,Kubelet 会通过这个 sock 和 device plugin 进行交互。

    • hami 用的格式为:/var/lib/kubelet/device-plugins/nvidia-xxx.sock,其中 xxx 是 从 ResourceName 中解析出来的,比如 nvidia.com/vgpu 那么这里的 xx 就是后面的 vgpu。

假设我们都使用默认值,ResourceName 为 nvidia.com/vgpu,Endpoint 为 /var/lib/kubelet/device-plugins/nvidia-vgpu.sock。

  • 后续 Pod Resource 中申请使用 nvidia.com/vgpu 资源时,就会由该 device plugin 来处理,实现资源分配,Kubelet 则是通过 var/lib/kubelet/device-plugins/nvidia-vgpu.sock 这个 sock 文件调用 device plugin API。
  • 反之,我们在 Pod Resource 中申请使用 nvidia.com/gpu 时,这个 ResourceName 和 hami 插件不匹配,因此不由 hami device plugin nvidia 处理,而是由 nvidia 自己的  device plugin 进行处理。
WatchAndRegister

这个是 HAMi device plugin 中的一个特殊逻辑,将 node 上的 GPU 信息以 annotations 的形式添加到 Node 对象上。
这里是直接和 kube-apiserver 进行通信,而不是使用的传统的 device plugin 上报流程。
后续 HAMi-Scheduler 在进行调度时就会用到这边上报的 annotations 作为调度依据的一部分,分析 HAMi-Scheduler 时在仔细分析。
  1. func (plugin *NvidiaDevicePlugin) WatchAndRegister() {
  2.     klog.Info("Starting WatchAndRegister")
  3.     errorSleepInterval := time.Second * 5
  4.     successSleepInterval := time.Second * 30
  5.     for {
  6.        err := plugin.RegistrInAnnotation()
  7.        if err != nil {
  8.           klog.Errorf("Failed to register annotation: %v", err)
  9.           klog.Infof("Retrying in %v seconds...", errorSleepInterval)
  10.           time.Sleep(errorSleepInterval)
  11.        } else {
  12.           klog.Infof("Successfully registered annotation. Next check in %v seconds...", successSleepInterval)
  13.           time.Sleep(successSleepInterval)
  14.        }
  15.     }
  16. }
复制代码
getAPIDevices

获取 Node 上的 GPU 信息,并组装成 api.DeviceInfo 对象。
  1. func (plugin *NvidiaDevicePlugin) getAPIDevices() *[]*api.DeviceInfo {
  2.     devs := plugin.Devices()
  3.     nvml.Init()
  4.     res := make([]*api.DeviceInfo, 0, len(devs))
  5.     idx := 0
  6.     for idx < len(devs) {
  7.        ndev, ret := nvml.DeviceGetHandleByIndex(idx)
  8.        //ndev, err := nvml.NewDevice(uint(idx))
  9.        //klog.V(3).Infoln("ndev type=", ndev.Model)
  10.        if ret != nvml.SUCCESS {
  11.           klog.Errorln("nvml new device by index error idx=", idx, "err=", ret)
  12.           panic(0)
  13.        }
  14.        memoryTotal := 0
  15.        memory, ret := ndev.GetMemoryInfo()
  16.        if ret == nvml.SUCCESS {
  17.           memoryTotal = int(memory.Total)
  18.        } else {
  19.           klog.Error("nvml get memory error ret=", ret)
  20.           panic(0)
  21.        }
  22.        UUID, ret := ndev.GetUUID()
  23.        if ret != nvml.SUCCESS {
  24.           klog.Error("nvml get uuid error ret=", ret)
  25.           panic(0)
  26.        }
  27.        Model, ret := ndev.GetName()
  28.        if ret != nvml.SUCCESS {
  29.           klog.Error("nvml get name error ret=", ret)
  30.           panic(0)
  31.        }
  32.        registeredmem := int32(memoryTotal / 1024 / 1024)
  33.        if *util.DeviceMemoryScaling != 1 {
  34.           registeredmem = int32(float64(registeredmem) * *util.DeviceMemoryScaling)
  35.        }
  36.        klog.Infoln("MemoryScaling=", *util.DeviceMemoryScaling, "registeredmem=", registeredmem)
  37.        health := true
  38.        for _, val := range devs {
  39.           if strings.Compare(val.ID, UUID) == 0 {
  40.              // when NVIDIA-Tesla P4, the device info is : ID:GPU-e290caca-2f0c-9582-acab-67a142b61ffa,Health:Healthy,Topology:nil,
  41.              // it is more reasonable to think of healthy as case-insensitive
  42.              if strings.EqualFold(val.Health, "healthy") {
  43.                 health = true
  44.              } else {
  45.                 health = false
  46.              }
  47.              break
  48.           }
  49.        }
  50.        numa, err := plugin.getNumaInformation(idx)
  51.        if err != nil {
  52.           klog.ErrorS(err, "failed to get numa information", "idx", idx)
  53.        }
  54.        res = append(res, &api.DeviceInfo{
  55.           ID:      UUID,
  56.           Count:   int32(*util.DeviceSplitCount),
  57.           Devmem:  registeredmem,
  58.           Devcore: int32(*util.DeviceCoresScaling * 100),
  59.           Type:    fmt.Sprintf("%v-%v", "NVIDIA", Model),
  60.           Numa:    numa,
  61.           Health:  health,
  62.        })
  63.        idx++
  64.        klog.Infof("nvml registered device id=%v, memory=%v, type=%v, numa=%v", idx, registeredmem, Model, numa)
  65.     }
  66.     return &res
  67. }
复制代码
核心部分
  1. // 通过 nvml 库获取 GPU 信息
  2. ndev, ret := nvml.DeviceGetHandleByIndex(idx)
  3. memoryTotal := 0
  4. memory, ret := ndev.GetMemoryInfo()
  5. if ret == nvml.SUCCESS {
  6.     memoryTotal = int(memory.Total)
  7. }
  8. UUID, ret := ndev.GetUUID()
  9. Model, ret := ndev.GetName()
  10. // 处理 Scaling
  11. registeredmem := int32(memoryTotal / 1024 / 1024)
  12. if *util.DeviceMemoryScaling != 1 {
  13.     registeredmem = int32(float64(registeredmem) * *util.DeviceMemoryScaling)
  14. }
  15. // 组装结果返回
  16. res = append(res, &api.DeviceInfo{
  17.     ID:      UUID,
  18.     Count:   int32(*util.DeviceSplitCount),
  19.     Devmem:  registeredmem,
  20.     Devcore: int32(*util.DeviceCoresScaling * 100),
  21.     Type:    fmt.Sprintf("%v-%v", "NVIDIA", Model),
  22.     Numa:    numa,
  23.     Health:  health,
  24. })
复制代码
更新到 Node Annoations

拿到 Device 信息之后,调用 kube-apiserver 更新 Node 对象的 Annoations 把 Device 信息存起来。
  1. encodeddevices := util.EncodeNodeDevices(*devices)
  2. annos[nvidia.HandshakeAnnos] = "Reported " + time.Now().String()
  3. annos[nvidia.RegisterAnnos] = encodeddevices
  4. klog.Infof("patch node with the following annos %v", fmt.Sprintf("%v", annos))
  5. err = util.PatchNodeAnnotations(node, annos)
复制代码
正常应该是走 k8s 的 device plugin 接口上报信息才对,这里是 HAMi 的特殊逻辑。
Demo

查看 Node 上的 Annoations,看看这边记录了些什么数据
  1. root@j99cloudvm:~# k get node j99cloudvm -oyaml
  2. apiVersion: v1
  3. kind: Node
  4. metadata:
  5.   annotations:
  6.     hami.io/node-handshake: Requesting_2024.09.25 07:48:26
  7.     hami.io/node-nvidia-register: 'GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA
  8.       A40,0,true:GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA
  9.       A40,0,true:'
复制代码
hami.io/node-nvidia-register 就是 HAMi 的 device plugin 更新到 Node 上的 GPU 信息,格式化一下
  1. GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA A40,0,true:
  2. GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA A40,0,true:
复制代码
当前节点上是两张 A40 GPU,

  • GPU-03f69c50-207a-2038-9b45-23cac89cb67d:为 GPU 设备的 UUID
  • 10,46068,100: 切分为 10 份,每张卡 46068M 内存,core 为 100 个(说明没有配置 oversubscription)
  • NVIDIA-NVIDIA:GPU 类型
  • A40:GPU 型号
  • 0:表示 GPU 的 NUMA 结构
  • true:表示该 GPU 是健康的
  • : 最后的冒号是分隔符
ps:这部分信息后续 hami-scheduler 进行调度时会用到,这里暂时不管。
小结

Register 方法分为两部分:

  • Register:将 device plugin 注册到 kubelet
  • WatchAndRegister:感知 Node 上的 GPU 信息,并和 kube-apiserver 交互,将这部分信息以 annotations 的形式添加到 Node 对象上,以便后续 hami-scheduler 使用。
4. ListAndWatch

ListAndWatch 方法用于感知节点上的设备并上报给 Kubelet。
由于需要将同一个 GPU 切分给多个 Pod 使用,因此 HAMi 的 device plugin 也会有类似 TimeSlicing 中的 Device 复制操作。
具体实现如下:
  1. // ListAndWatch lists devices and update that list according to the health status
  2. func (plugin *NvidiaDevicePlugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
  3.     s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
  4.     for {
  5.        select {
  6.        case <-plugin.stop:
  7.           return nil
  8.        case d := <-plugin.health:
  9.           // FIXME: there is no way to recover from the Unhealthy state.
  10.           d.Health = kubeletdevicepluginv1beta1.Unhealthy
  11.           klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
  12.           s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
  13.        }
  14.     }
  15. }
复制代码
比较长,我们只需要关注核心部分,同时先忽略 MIG 相关的逻辑。
首先是添加一个 CUDA_DEVICE_MEMORY_LIMIT_$Index 的环境变量,用于 gpu memory 限制。
  1. // VisitDevices visits each top-level device and invokes a callback function for it
  2. func (d *devicelib) VisitDevices(visit func(int, Device) error) error {
  3.     count, ret := d.nvml.DeviceGetCount()
  4.     if ret != nvml.SUCCESS {
  5.        return fmt.Errorf("error getting device count: %v", ret)
  6.     }
  7.     for i := 0; i < count; i++ {
  8.        device, ret := d.nvml.DeviceGetHandleByIndex(i)
  9.        if ret != nvml.SUCCESS {
  10.           return fmt.Errorf("error getting device handle for index '%v': %v", i, ret)
  11.        }
  12.        dev, err := d.newDevice(device)
  13.        if err != nil {
  14.           return fmt.Errorf("error creating new device wrapper: %v", err)
  15.        }
  16.        isSkipped, err := dev.isSkipped()
  17.        if err != nil {
  18.           return fmt.Errorf("error checking whether device is skipped: %v", err)
  19.        }
  20.        if isSkipped {
  21.           continue
  22.        }
  23.        err = visit(i, dev)
  24.        if err != nil {
  25.           return fmt.Errorf("error visiting device: %v", err)
  26.        }
  27.     }
  28.     return nil
  29. }
  30. // buildGPUDeviceMap builds a map of resource names to GPU devices
  31. func (b *deviceMapBuilder) buildGPUDeviceMap() (DeviceMap, error) {
  32.     devices := make(DeviceMap)
  33.     b.VisitDevices(func(i int, gpu device.Device) error {
  34.        name, ret := gpu.GetName()
  35.        if ret != nvml.SUCCESS {
  36.           return fmt.Errorf("error getting product name for GPU: %v", ret)
  37.        }
  38.        migEnabled, err := gpu.IsMigEnabled()
  39.        if err != nil {
  40.           return fmt.Errorf("error checking if MIG is enabled on GPU: %v", err)
  41.        }
  42.        if migEnabled && *b.config.Flags.MigStrategy != spec.MigStrategyNone {
  43.           return nil
  44.        }
  45.        for _, resource := range b.config.Resources.GPUs {
  46.           if resource.Pattern.Matches(name) {
  47.              index, info := newGPUDevice(i, gpu)
  48.              return devices.setEntry(resource.Name, index, info)
  49.           }
  50.        }
  51.        return fmt.Errorf("GPU name '%v' does not match any resource patterns", name)
  52.     })
  53.     return devices, nil
  54. }
复制代码
然后则是根据申请的 gpucores 配置 gpu core 限制的环境变量
  1. // GetPluginDevices returns the plugin Devices from all devices in the Devices
  2. func (ds Devices) GetPluginDevices() []*kubeletdevicepluginv1beta1.Device {
  3.     var res []*kubeletdevicepluginv1beta1.Device
  4.     if !strings.Contains(ds.GetIDs()[0], "MIG") {
  5.        for _, dev := range ds {
  6.           for i := uint(0); i < *util.DeviceSplitCount; i++ {
  7.              id := fmt.Sprintf("%v-%v", dev.ID, i)
  8.              res = append(res, &kubeletdevicepluginv1beta1.Device{
  9.                 ID:       id,
  10.                 Health:   dev.Health,
  11.                 Topology: nil,
  12.              })
  13.           }
  14.        }
  15.     } else {
  16.        for _, d := range ds {
  17.           res = append(res, &d.Device)
  18.        }
  19.     }
  20.     return res
  21. }
复制代码
这个用于设置 share_region  mmap 文件在容器中的位置
  1. for _, dev := range ds {
  2.   for i := uint(0); i < *util.DeviceSplitCount; i++ {
  3.      id := fmt.Sprintf("%v-%v", dev.ID, i)
  4.      res = append(res, &kubeletdevicepluginv1beta1.Device{
  5.         ID:       id,
  6.         Health:   dev.Health,
  7.         Topology: nil,
  8.      })
  9.   }
复制代码
Gpu memory 超额订阅
  1. // pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go#L290
  2. func (plugin *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *kubeletdevicepluginv1beta1.AllocateRequest) (*kubeletdevicepluginv1beta1.AllocateResponse, error) {
  3.     klog.InfoS("Allocate", "request", reqs)
  4.     responses := kubeletdevicepluginv1beta1.AllocateResponse{}
  5.     nodename := os.Getenv(util.NodeNameEnvName)
  6.     current, err := util.GetPendingPod(ctx, nodename)
  7.     if err != nil {
  8.        nodelock.ReleaseNodeLock(nodename, NodeLockNvidia)
  9.        return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
  10.     }
  11.     klog.V(5).Infof("allocate pod name is %s/%s, annotation is %+v", current.Namespace, current.Name, current.Annotations)
  12.     for idx, req := range reqs.ContainerRequests {
  13.        // If the devices being allocated are replicas, then (conditionally)
  14.        // error out if more than one resource is being allocated.
  15.        if strings.Contains(req.DevicesIDs[0], "MIG") {
  16.           if plugin.config.Sharing.TimeSlicing.FailRequestsGreaterThanOne && rm.AnnotatedIDs(req.DevicesIDs).AnyHasAnnotations() {
  17.              if len(req.DevicesIDs) > 1 {
  18.                 return nil, fmt.Errorf("request for '%v: %v' too large: maximum request size for shared resources is 1", plugin.rm.Resource(), len(req.DevicesIDs))
  19.              }
  20.           }
  21.           for _, id := range req.DevicesIDs {
  22.              if !plugin.rm.Devices().Contains(id) {
  23.                 return nil, fmt.Errorf("invalid allocation request for '%s': unknown device: %s", plugin.rm.Resource(), id)
  24.              }
  25.           }
  26.           response, err := plugin.getAllocateResponse(req.DevicesIDs)
  27.           if err != nil {
  28.              return nil, fmt.Errorf("failed to get allocate response: %v", err)
  29.           }
  30.           responses.ContainerResponses = append(responses.ContainerResponses, response)
  31.        } else {
  32.           currentCtr, devreq, err := util.GetNextDeviceRequest(nvidia.NvidiaGPUDevice, *current)
  33.           klog.Infoln("deviceAllocateFromAnnotation=", devreq)
  34.           if err != nil {
  35.              device.PodAllocationFailed(nodename, current, NodeLockNvidia)
  36.              return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
  37.           }
  38.           if len(devreq) != len(reqs.ContainerRequests[idx].DevicesIDs) {
  39.              device.PodAllocationFailed(nodename, current, NodeLockNvidia)
  40.              return &kubeletdevicepluginv1beta1.AllocateResponse{}, errors.New("device number not matched")
  41.           }
  42.           response, err := plugin.getAllocateResponse(util.GetContainerDeviceStrArray(devreq))
  43.           if err != nil {
  44.              return nil, fmt.Errorf("failed to get allocate response: %v", err)
  45.           }
  46.           err = util.EraseNextDeviceTypeFromAnnotation(nvidia.NvidiaGPUDevice, *current)
  47.           if err != nil {
  48.              device.PodAllocationFailed(nodename, current, NodeLockNvidia)
  49.              return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
  50.           }
  51.           for i, dev := range devreq {
  52.              limitKey := fmt.Sprintf("CUDA_DEVICE_MEMORY_LIMIT_%v", i)
  53.              response.Envs[limitKey] = fmt.Sprintf("%vm", dev.Usedmem)
  54.              /*tmp := response.Envs["NVIDIA_VISIBLE_DEVICES"]
  55.              if i > 0 {
  56.                 response.Envs["NVIDIA_VISIBLE_DEVICES"] = fmt.Sprintf("%v,%v", tmp, dev.UUID)
  57.              } else {
  58.                 response.Envs["NVIDIA_VISIBLE_DEVICES"] = dev.UUID
  59.              }*/
  60.           }
  61.           response.Envs["CUDA_DEVICE_SM_LIMIT"] = fmt.Sprint(devreq[0].Usedcores)
  62.           response.Envs["CUDA_DEVICE_MEMORY_SHARED_CACHE"] = fmt.Sprintf("%s/vgpu/%v.cache", hostHookPath, uuid.New().String())
  63.           if *util.DeviceMemoryScaling > 1 {
  64.              response.Envs["CUDA_OVERSUBSCRIBE"] = "true"
  65.           }
  66.           if *util.DisableCoreLimit {
  67.              response.Envs[api.CoreLimitSwitch] = "disable"
  68.           }
  69.           cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/containers/%s_%s", hostHookPath, current.UID, currentCtr.Name)
  70.           os.RemoveAll(cacheFileHostDirectory)
  71.           os.MkdirAll(cacheFileHostDirectory, 0777)
  72.           os.Chmod(cacheFileHostDirectory, 0777)
  73.           os.MkdirAll("/tmp/vgpulock", 0777)
  74.           os.Chmod("/tmp/vgpulock", 0777)
  75.           response.Mounts = append(response.Mounts,
  76.              &kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu/libvgpu.so", hostHookPath),
  77.                 HostPath: hostHookPath + "/vgpu/libvgpu.so",
  78.                 ReadOnly: true},
  79.              &kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu", hostHookPath),
  80.                 HostPath: cacheFileHostDirectory,
  81.                 ReadOnly: false},
  82.              &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/tmp/vgpulock",
  83.                 HostPath: "/tmp/vgpulock",
  84.                 ReadOnly: false},
  85.           )
  86.           found := false
  87.           for _, val := range currentCtr.Env {
  88.              if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
  89.                 // if env existed but is set to false or can not be parsed, ignore
  90.                 t, _ := strconv.ParseBool(val.Value)
  91.                 if !t {
  92.                    continue
  93.                 }
  94.                 // only env existed and set to true, we mark it "found"
  95.                 found = true
  96.                 break
  97.              }
  98.           }
  99.           if !found {
  100.              response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/etc/ld.so.preload",
  101.                 HostPath: hostHookPath + "/vgpu/ld.so.preload",
  102.                 ReadOnly: true},
  103.              )
  104.           }
  105.           _, err = os.Stat(fmt.Sprintf("%s/vgpu/license", hostHookPath))
  106.           if err == nil {
  107.              response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
  108.                 ContainerPath: "/tmp/license",
  109.                 HostPath:      fmt.Sprintf("%s/vgpu/license", hostHookPath),
  110.                 ReadOnly:      true,
  111.              })
  112.              response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
  113.                 ContainerPath: "/usr/bin/vgpuvalidator",
  114.                 HostPath:      fmt.Sprintf("%s/vgpu/vgpuvalidator", hostHookPath),
  115.                 ReadOnly:      true,
  116.              })
  117.           }
  118.           responses.ContainerResponses = append(responses.ContainerResponses, response)
  119.        }
  120.     }
  121.     klog.Infoln("Allocate Response", responses.ContainerResponses)
  122.     device.PodAllocationTrySuccess(nodename, nvidia.NvidiaGPUDevice, NodeLockNvidia, current)
  123.     return &responses, nil
  124. }
复制代码
是否关闭算力限制
  1. for i, dev := range devreq {
  2.     limitKey := fmt.Sprintf("CUDA_DEVICE_MEMORY_LIMIT_%v", i)
  3.     response.Envs[limitKey] = fmt.Sprintf("%vm", dev.Usedmem)
  4.     /*tmp := response.Envs["NVIDIA_VISIBLE_DEVICES"]
  5.     if i > 0 {
  6.        response.Envs["NVIDIA_VISIBLE_DEVICES"] = fmt.Sprintf("%v,%v", tmp, dev.UUID)
  7.     } else {
  8.        response.Envs["NVIDIA_VISIBLE_DEVICES"] = dev.UUID
  9.     }*/
  10. }
复制代码
挂载 vgpu 相关文件
这里就实现了 libvgpu.so 库的替换。
  1. response.Envs["CUDA_DEVICE_SM_LIMIT"] = fmt.Sprint(devreq[0].Usedcores)
复制代码
替换动态库,当没有指定  CUDA_DISABLE_CONTROL=true 时就会做该处理
  1. response.Envs["CUDA_DEVICE_MEMORY_SHARED_CACHE"] = fmt.Sprintf("%s/vgpu/%v.cache", hostHookPath, uuid.New().String())
复制代码
整个实现也算比较容易理解,就是给 Pod 增加了一系列环境变量,以及增加了替换 libvgpu.so 的 Mounts 配置,后续这个 libvgpu.so 就会根据这些环境变量做 Core&Memory 的限制。
NVIDIA 原生逻辑
  1. if *util.DeviceMemoryScaling > 1 {
  2.     response.Envs["CUDA_OVERSUBSCRIBE"] = "true"
  3. }
复制代码
核心部分就是这一句,添加了一个环境变量
  1. if *util.DisableCoreLimit {
  2.     response.Envs[api.CoreLimitSwitch] = "disable"
  3. }
复制代码
而 plugin.deviceListEnvvar 的值来自:
  1. // 缓存文件存放位置 /usr/local/vgpu/containers/xxx/xxx
  2. cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/containers/%s_%s", hostHookPath, current.UID, currentCtr.Name)
  3. os.RemoveAll(cacheFileHostDirectory)
  4. os.MkdirAll(cacheFileHostDirectory, 0777)
  5. os.Chmod(cacheFileHostDirectory, 0777)
  6. os.MkdirAll("/tmp/vgpulock", 0777)
  7. os.Chmod("/tmp/vgpulock", 0777)
  8. response.Mounts = append(response.Mounts,
  9.     // 宿主机上的 libvgpu.so挂载到 pod 里替换 nvidia 默认的驱动
  10.     &kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu/libvgpu.so", hostHookPath),
  11.        HostPath: hostHookPath + "/vgpu/libvgpu.so",
  12.        ReadOnly: true},
  13.     // 随机的文件挂载进 pod 作为 vgpu 使用
  14.     &kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu", hostHookPath),
  15.        HostPath: cacheFileHostDirectory,
  16.        ReadOnly: false},
  17.     // 一个 lock 文件
  18.     &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/tmp/vgpulock",
  19.        HostPath: "/tmp/vgpulock",
  20.        ReadOnly: false},
  21. )
复制代码
即:NVIDIA_VISIBLE_DEVICES
正好,这个 ENV 就是 NVIDIA  deviceplugin 中的实现,设置该环境变量之后 nvidia-container-toolkit 会为有这个环境变量的容器分配 GPU。
HAMi  这里则是复用了  nvidia-container-toolkit  的能力将 GPU 分配给 Pod。
小结

Allocate 方法中核心部分包括三件事情:

  • HAMi 自定义逻辑

    • 添加用于资源限制的环境变量CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT
    • 挂载 libvgpu.so 到 Pod 中进行替换

  • NVIDIA 原生逻辑

    • 添加用于分配 GPU 的 环境变量 NVIDIA_VISIBLE_DEVICES,借助 NVIDIA Container Toolkit 将 GPU 挂载到 Pod 里

6. 总结

至此,HAMi 的 NVIDIA device plugin 工作原理就很清晰了。

  • 首先是 Register 插件注册,可以配置使用和原生 nvidia device plugin 不同的 ResourceName 来进行区分。

    • 另外会额外启动一个后台 goroutine WatchAndRegister 定时将 GPU 信息更新到 Node 对象的 Annoations 上便于 Scheduler 时使用。

  • 然后 ListAndWatch 感知设备时也根据配置对 Device 进行复制,让同一个设备能够分配给多个 Pod。

    • 这个和 TimeSlicing 方案一样

  • 最后 Allocate 方法中主要做三件事:

    • 1)为容器中增加NVIDIA_VISIBLE_DEVICES 环境变量,借助 NVIDIA Container Toolkit 实现为容器分配 GPU
    • 2)增加 Mounts 配置,挂载 libvgpu.so 到容器中实现对原始驱动的替换
    • 3)为容器中增加部分 HAMi 自定义的环境变量 CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT ,配合 ibvgpu.so 实现 GPU core、memory 的限制

核心其实就是在 Allocate 方法中,给容器中添加CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT环境变量和挂载 libvgpu.so 到容器中实现对原始驱动的替换。
当容器启动后,CUDA API 请求先经过 libvgpu.so,然后 libvgpu.so 根据环境变量 CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT 实现 Core & Memory 限制。
最后回答开篇提出的问题:HAMi 为什么要自己实现一个 device plugin 呢?hami-device-plugin-nvidia 是有哪些 NVIDIA 原生  device plugin 没有的功能吗?
在 hami device plugin 相比原生的 NVIDIA device plugin 做了几个修改:

  • 1)注册时额外启动后台 goroutine WatchAndRegister 定时将 GPU 信息更新到 Node 对象的 Annoations 上便于 Scheduler 时使用。
  • 2)ListAndWatch 感知设备时也根据配置对 Device 进行复制,便于将同一个物理 GPU 分配给多个 Pod

    • 这个其实原生的 NVIDIA device plugin 也有,就是 TimeSlicing 方案。

  • 3)Allocate 中增加了 HAMi 自定义逻辑:

    • 挂载 libvgpu.so 到容器中实现对原始驱动的替换
    • 指定部分 HAMi 自定义的环境变量CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT,配合 ibvgpu.so 实现 GPU core、memory 的限制

7. FAQ

Node 上的 libvgpu.so 是怎么来的

Allocate 方法中要将 libvgpu.so 挂载到 Pod 里,这里用的是 HostPath 方式挂载,说明这个 libvgpu.so 是存在于宿主机上的。
那么问题来了,宿主机上的 libvgpu.so 是怎么来的?
这个实际上是打包在 HAMi 提供的 device-plugin 镜像里的,device-plugin 启动时将其从 Pod 里复制到宿主机上,相关 yaml 如下:
  1. found := false
  2. for _, val := range currentCtr.Env {
  3.     if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
  4.        // if env existed but is set to false or can not be parsed, ignore
  5.        t, _ := strconv.ParseBool(val.Value)
  6.        if !t {
  7.           continue
  8.        }
  9.        // only env existed and set to true, we mark it "found"
  10.        found = true
  11.        break
  12.     }
  13. }
  14. if !found {
  15.     response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/etc/ld.so.preload",
  16.        HostPath: hostHookPath + "/vgpu/ld.so.preload",
  17.        ReadOnly: true},
  18.     )
  19. }
复制代码
挂载了 hostPath 到容器里,然后容器里执行cp -f /k8s-vgpu/lib/nvidia/* /usr/local/vgpu/ 命令将其复制到宿主机。
【Kubernetes 系列】持续更新中,搜索公众号【探索云原生】订阅,阅读更多文章。
2.png


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册