系味 发表于 2025-10-6 16:28:36

开源 vGPU 方案 HAMi 原理分析 Part1:hami-device-plugin-nvidia 实现


本文为开源的 vGPU 方案 HAMi 实现原理分析第一篇,主要分析 hami-device-plugin-nvidia 实现原理。
之前在 开源 vGPU 方案:HAMi,实现细粒度 GPU 切分 介绍了 HAMi 是什么,然后在开源 vGPU 方案 HAMi: core&memory 隔离测试 中对 HAMi 提供的 vGPU 方案进行了测试。
接下来则是逐步分析 HAMi 中的 vGPU 实现原理,涉及到的东西比较多,暂定分为几部分:

[*]1)hami-device-plugin-nvidia:HAMi 版本的 device plugin 中 GPU 感知以及分配逻辑是怎么实现的,和 NVIDIA 原生的 device-plugin 有和不同。
[*]2)HAMI-Scheduler:HAMi 是如何做调度的,binpack/spread 高级调度策略是怎么实现的
[*]3)HAMi-Core:这也是 vCUDA 方案最核心的部分,HAMi 是如何通过拦截 CUDA API 实现 Core&Memory 隔离限制的
本文为第一篇,分析 hami-device-plugin-nvidia 实现原理。
1. 概述

NVIDIA 是有自己实现 device plugin 的,那么问题来了:HAMi 为什么还要自己实现一个 device plugin 呢?
是 hami-device-plugin-nvidia 是有哪些 NVIDIA 原生device plugin 没有的功能吗?带着疑问,我们开始查看 hami-device-plugin-nvidia 源码。
这部分需要大家对 GPU Operator、k8s device plugin 等比较熟悉阅读起来才比较丝滑。
推荐阅读:

[*]GPU 环境搭建指南:如何在裸机、Docker、K8s 等环境中使用 GPU
[*]GPU 环境搭建指南:使用 GPU Operator 加速 Kubernetes GPU 环境搭建
[*]Kubernetes教程(二一)—自定义资源支持:K8s Device Plugin 从原理到实现
[*]Kubernetes教程(二二)—在 K8S 中创建 Pod 是如何使用到 GPU 的:device plugin&nvidia-container-toolkit 源码分析
后续都默认大家都对这块比较熟悉了,特别是后两篇
2. 程序入口

HAMi 首先支持的是 NVIDIA GPU,单独实现了一个 device plugin nvidia。

[*]启动文件在 cmd/device-plugin/nvidia
[*]核心实现在 pkg/device-plugin/nvidiadevice
默认大家都对 k8s 的 device plugin 机制比较熟悉了,因此这里只分析核心代码逻辑,不然篇幅就太长了。
对于一个 device plugin 我们一般关注以下 3 个地方:

[*]Register:将插件注册到 Kubelet 的,参数 ResourceName 比较重要
[*]ListAndWatch:device plugin 是怎么感知 GPU 并上报的
[*]Allocate:device plugin 是如何将 GPU 分配给 Pod 的
启动命令在 /cmd/device-plugin/nvidia,用的是 github.com/urfave/cli/v2 构建的一个命令行工具。
func main() {
    var configFile string

    c := cli.NewApp()
    c.Name = "NVIDIA Device Plugin"
    c.Usage = "NVIDIA device plugin for Kubernetes"
    c.Version = info.GetVersionString()
    c.Action = func(ctx *cli.Context) error {
       return start(ctx, c.Flags)
    }

    c.Flags = []cli.Flag{
       &cli.StringFlag{
          Name:    "mig-strategy",
          Value:   spec.MigStrategyNone,
          Usage:   "the desired strategy for exposing MIG devices on GPUs that support it:\n\t\t",
          EnvVars: []string{"MIG_STRATEGY"},
       },
       &cli.BoolFlag{
          Name:    "fail-on-init-error",
          Value:   true,
          Usage:   "fail the plugin if an error is encountered during initialization, otherwise block indefinitely",
          EnvVars: []string{"FAIL_ON_INIT_ERROR"},
       },
       &cli.StringFlag{
          Name:    "nvidia-driver-root",
          Value:   "/",
          Usage:   "the root path for the NVIDIA driver installation (typical values are '/' or '/run/nvidia/driver')",
          EnvVars: []string{"NVIDIA_DRIVER_ROOT"},
       },
       &cli.BoolFlag{
          Name:    "pass-device-specs",
          Value:   false,
          Usage:   "pass the list of DeviceSpecs to the kubelet on Allocate()",
          EnvVars: []string{"PASS_DEVICE_SPECS"},
       },
       &cli.StringSliceFlag{
          Name:    "device-list-strategy",
          Value:   cli.NewStringSlice(string(spec.DeviceListStrategyEnvvar)),
          Usage:   "the desired strategy for passing the device list to the underlying runtime:\n\t\t",
          EnvVars: []string{"DEVICE_LIST_STRATEGY"},
       },
       &cli.StringFlag{
          Name:    "device-id-strategy",
          Value:   spec.DeviceIDStrategyUUID,
          Usage:   "the desired strategy for passing device IDs to the underlying runtime:\n\t\t",
          EnvVars: []string{"DEVICE_ID_STRATEGY"},
       },
       &cli.BoolFlag{
          Name:    "gds-enabled",
          Usage:   "ensure that containers are started with NVIDIA_GDS=enabled",
          EnvVars: []string{"GDS_ENABLED"},
       },
       &cli.BoolFlag{
          Name:    "mofed-enabled",
          Usage:   "ensure that containers are started with NVIDIA_MOFED=enabled",
          EnvVars: []string{"MOFED_ENABLED"},
       },
       &cli.StringFlag{
          Name:      "config-file",
          Usage:       "the path to a config file as an alternative to command line options or environment variables",
          Destination: &configFile,
          EnvVars:   []string{"CONFIG_FILE"},
       },
       &cli.StringFlag{
          Name:    "cdi-annotation-prefix",
          Value:   spec.DefaultCDIAnnotationPrefix,
          Usage:   "the prefix to use for CDI container annotation keys",
          EnvVars: []string{"CDI_ANNOTATION_PREFIX"},
       },
       &cli.StringFlag{
          Name:    "nvidia-ctk-path",
          Value:   spec.DefaultNvidiaCTKPath,
          Usage:   "the path to use for the nvidia-ctk in the generated CDI specification",
          EnvVars: []string{"NVIDIA_CTK_PATH"},
       },
       &cli.StringFlag{
          Name:    "container-driver-root",
          Value:   spec.DefaultContainerDriverRoot,
          Usage:   "the path where the NVIDIA driver root is mounted in the container; used for generating CDI specifications",
          EnvVars: []string{"CONTAINER_DRIVER_ROOT"},
       },
    }
    c.Flags = append(c.Flags, addFlags()...)
    err := c.Run(os.Args)
    if err != nil {
       klog.Error(err)
       os.Exit(1)
    }
}

func addFlags() []cli.Flag {
    addition := []cli.Flag{
       &cli.StringFlag{
          Name:    "node-name",
          Value:   os.Getenv(util.NodeNameEnvName),
          Usage:   "node name",
          EnvVars: []string{"NodeName"},
       },
       &cli.UintFlag{
          Name:    "device-split-count",
          Value:   2,
          Usage:   "the number for NVIDIA device split",
          EnvVars: []string{"DEVICE_SPLIT_COUNT"},
       },
       &cli.Float64Flag{
          Name:    "device-memory-scaling",
          Value:   1.0,
          Usage:   "the ratio for NVIDIA device memory scaling",
          EnvVars: []string{"DEVICE_MEMORY_SCALING"},
       },
       &cli.Float64Flag{
          Name:    "device-cores-scaling",
          Value:   1.0,
          Usage:   "the ratio for NVIDIA device cores scaling",
          EnvVars: []string{"DEVICE_CORES_SCALING"},
       },
       &cli.BoolFlag{
          Name:    "disable-core-limit",
          Value:   false,
          Usage:   "If set, the core utilization limit will be ignored",
          EnvVars: []string{"DISABLE_CORE_LIMIT"},
       },
       &cli.StringFlag{
          Name:"resource-name",
          Value: "nvidia.com/gpu",
          Usage: "the name of field for number GPU visible in container",
       },
    }
    return addition
}启动时做了两件事:

[*]将插件注册到 Kubelet
[*]启动一个 gRPC 服务
我们只需要关注一下接收的几个参数:
&cli.UintFlag{
    Name:    "device-split-count",
    Value:   2,
    Usage:   "the number for NVIDIA device split",
    EnvVars: []string{"DEVICE_SPLIT_COUNT"},
},
&cli.Float64Flag{
    Name:    "device-memory-scaling",
    Value:   1.0,
    Usage:   "the ratio for NVIDIA device memory scaling",
    EnvVars: []string{"DEVICE_MEMORY_SCALING"},
},
&cli.Float64Flag{
    Name:    "device-cores-scaling",
    Value:   1.0,
    Usage:   "the ratio for NVIDIA device cores scaling",
    EnvVars: []string{"DEVICE_CORES_SCALING"},
},
&cli.BoolFlag{
    Name:    "disable-core-limit",
    Value:   false,
    Usage:   "If set, the core utilization limit will be ignored",
    EnvVars: []string{"DISABLE_CORE_LIMIT"},
},
&cli.StringFlag{
    Name:"resource-name",
    Value: "nvidia.com/gpu",
    Usage: "the name of field for number GPU visible in container",
},

[*]device-split-count:表示 GPU 的分割数,每一张 GPU 都不能分配超过其配置数目的任务。若其配置为 N 的话,每个 GPU 上最多可以同时存在 N 个任务。

[*]建议根据 GPU 性能动态调整,一般建议大于 10。

[*]device-memory-scaling:表示 GPU memory 的 oversubscription(超额订阅)** **比例,默认 1.0,大于 1.0 则表示启用虚拟显存(实验功能),不建议修改。
[*]device-cores-scaling:表示 GPU core 的 oversubscription(超额订阅)比例,默认 1.0。
[*]disable-core-limit:是否关闭 GPU Core Limit,默认 false,不建议修改。
[*]resource-name:资源名称,建议改掉,不推荐使用默认的 nvidia.com/gpu 因为这个和 nvidia 原生的重复了。
3. Register

Register

Register 方法实现如下:
// pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go#L222

// Register registers the device plugin for the given resourceName with Kubelet.
func (plugin *NvidiaDevicePlugin) Register() error {
    conn, err := plugin.dial(kubeletdevicepluginv1beta1.KubeletSocket, 5*time.Second)
    if err != nil {
       return err
    }
    defer conn.Close()

    client := kubeletdevicepluginv1beta1.NewRegistrationClient(conn)
    reqt := &kubeletdevicepluginv1beta1.RegisterRequest{
       Version:      kubeletdevicepluginv1beta1.Version,
       Endpoint:   path.Base(plugin.socket),
       ResourceName: string(plugin.rm.Resource()),
       Options: &kubeletdevicepluginv1beta1.DevicePluginOptions{
          GetPreferredAllocationAvailable: false,
       },
    }

    _, err = client.Register(context.Background(), reqt)
    if err != nil {
       return err
    }
    return nil
} device plugin 注册时的几个核心信息:

[*]ResourceName:资源名称,这个和创建 Pod 时申请 vGPU 的资源名匹配时就会使用到该 device plugin。

[*]也是可以在 deivce plugin 启动时配置的,一般叫做 --resource-name=nvidia.com/vgpu

[*]Version:device plugin 的版本,这里是 v1beta1
[*]Endpoint:device plugin 的访问地址,Kubelet 会通过这个 sock 和 device plugin 进行交互。

[*]hami 用的格式为:/var/lib/kubelet/device-plugins/nvidia-xxx.sock,其中 xxx 是 从 ResourceName 中解析出来的,比如 nvidia.com/vgpu 那么这里的 xx 就是后面的 vgpu。

假设我们都使用默认值,ResourceName 为 nvidia.com/vgpu,Endpoint 为 /var/lib/kubelet/device-plugins/nvidia-vgpu.sock。

[*]后续 Pod Resource 中申请使用 nvidia.com/vgpu 资源时,就会由该 device plugin 来处理,实现资源分配,Kubelet 则是通过 var/lib/kubelet/device-plugins/nvidia-vgpu.sock 这个 sock 文件调用 device plugin API。
[*]反之,我们在 Pod Resource 中申请使用 nvidia.com/gpu 时,这个 ResourceName 和 hami 插件不匹配,因此不由 hami device plugin nvidia 处理,而是由 nvidia 自己的device plugin 进行处理。
WatchAndRegister

这个是 HAMi device plugin 中的一个特殊逻辑,将 node 上的 GPU 信息以 annotations 的形式添加到 Node 对象上。
这里是直接和 kube-apiserver 进行通信,而不是使用的传统的 device plugin 上报流程。
后续 HAMi-Scheduler 在进行调度时就会用到这边上报的 annotations 作为调度依据的一部分,分析 HAMi-Scheduler 时在仔细分析。
func (plugin *NvidiaDevicePlugin) WatchAndRegister() {
    klog.Info("Starting WatchAndRegister")
    errorSleepInterval := time.Second * 5
    successSleepInterval := time.Second * 30
    for {
       err := plugin.RegistrInAnnotation()
       if err != nil {
          klog.Errorf("Failed to register annotation: %v", err)
          klog.Infof("Retrying in %v seconds...", errorSleepInterval)
          time.Sleep(errorSleepInterval)
       } else {
          klog.Infof("Successfully registered annotation. Next check in %v seconds...", successSleepInterval)
          time.Sleep(successSleepInterval)
       }
    }
}getAPIDevices

获取 Node 上的 GPU 信息,并组装成 api.DeviceInfo 对象。
func (plugin *NvidiaDevicePlugin) getAPIDevices() *[]*api.DeviceInfo {
    devs := plugin.Devices()
    nvml.Init()
    res := make([]*api.DeviceInfo, 0, len(devs))
    idx := 0
    for idx < len(devs) {
       ndev, ret := nvml.DeviceGetHandleByIndex(idx)
       //ndev, err := nvml.NewDevice(uint(idx))
       //klog.V(3).Infoln("ndev type=", ndev.Model)
       if ret != nvml.SUCCESS {
          klog.Errorln("nvml new device by index error idx=", idx, "err=", ret)
          panic(0)
       }
       memoryTotal := 0
       memory, ret := ndev.GetMemoryInfo()
       if ret == nvml.SUCCESS {
          memoryTotal = int(memory.Total)
       } else {
          klog.Error("nvml get memory error ret=", ret)
          panic(0)
       }
       UUID, ret := ndev.GetUUID()
       if ret != nvml.SUCCESS {
          klog.Error("nvml get uuid error ret=", ret)
          panic(0)
       }
       Model, ret := ndev.GetName()
       if ret != nvml.SUCCESS {
          klog.Error("nvml get name error ret=", ret)
          panic(0)
       }

       registeredmem := int32(memoryTotal / 1024 / 1024)
       if *util.DeviceMemoryScaling != 1 {
          registeredmem = int32(float64(registeredmem) * *util.DeviceMemoryScaling)
       }
       klog.Infoln("MemoryScaling=", *util.DeviceMemoryScaling, "registeredmem=", registeredmem)
       health := true
       for _, val := range devs {
          if strings.Compare(val.ID, UUID) == 0 {
             // when NVIDIA-Tesla P4, the device info is : ID:GPU-e290caca-2f0c-9582-acab-67a142b61ffa,Health:Healthy,Topology:nil,
             // it is more reasonable to think of healthy as case-insensitive
             if strings.EqualFold(val.Health, "healthy") {
                health = true
             } else {
                health = false
             }
             break
          }
       }
       numa, err := plugin.getNumaInformation(idx)
       if err != nil {
          klog.ErrorS(err, "failed to get numa information", "idx", idx)
       }
       res = append(res, &api.DeviceInfo{
          ID:      UUID,
          Count:   int32(*util.DeviceSplitCount),
          Devmem:registeredmem,
          Devcore: int32(*util.DeviceCoresScaling * 100),
          Type:    fmt.Sprintf("%v-%v", "NVIDIA", Model),
          Numa:    numa,
          Health:health,
       })
       idx++
       klog.Infof("nvml registered device id=%v, memory=%v, type=%v, numa=%v", idx, registeredmem, Model, numa)
    }
    return &res
}核心部分
// 通过 nvml 库获取 GPU 信息
ndev, ret := nvml.DeviceGetHandleByIndex(idx)
memoryTotal := 0
memory, ret := ndev.GetMemoryInfo()
if ret == nvml.SUCCESS {
    memoryTotal = int(memory.Total)
}
UUID, ret := ndev.GetUUID()
Model, ret := ndev.GetName()

// 处理 Scaling
registeredmem := int32(memoryTotal / 1024 / 1024)
if *util.DeviceMemoryScaling != 1 {
    registeredmem = int32(float64(registeredmem) * *util.DeviceMemoryScaling)
}
// 组装结果返回
res = append(res, &api.DeviceInfo{
    ID:      UUID,
    Count:   int32(*util.DeviceSplitCount),
    Devmem:registeredmem,
    Devcore: int32(*util.DeviceCoresScaling * 100),
    Type:    fmt.Sprintf("%v-%v", "NVIDIA", Model),
    Numa:    numa,
    Health:health,
})更新到 Node Annoations

拿到 Device 信息之后,调用 kube-apiserver 更新 Node 对象的 Annoations 把 Device 信息存起来。
encodeddevices := util.EncodeNodeDevices(*devices)
annos = "Reported " + time.Now().String()
annos = encodeddevices
klog.Infof("patch node with the following annos %v", fmt.Sprintf("%v", annos))
err = util.PatchNodeAnnotations(node, annos)正常应该是走 k8s 的 device plugin 接口上报信息才对,这里是 HAMi 的特殊逻辑。
Demo

查看 Node 上的 Annoations,看看这边记录了些什么数据
root@j99cloudvm:~# k get node j99cloudvm -oyaml
apiVersion: v1
kind: Node
metadata:
annotations:
    hami.io/node-handshake: Requesting_2024.09.25 07:48:26
    hami.io/node-nvidia-register: 'GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA
      A40,0,true:GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA
      A40,0,true:'hami.io/node-nvidia-register 就是 HAMi 的 device plugin 更新到 Node 上的 GPU 信息,格式化一下
GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA A40,0,true:
GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA A40,0,true:当前节点上是两张 A40 GPU,

[*]GPU-03f69c50-207a-2038-9b45-23cac89cb67d:为 GPU 设备的 UUID
[*]10,46068,100: 切分为 10 份,每张卡 46068M 内存,core 为 100 个(说明没有配置 oversubscription)
[*]NVIDIA-NVIDIA:GPU 类型
[*]A40:GPU 型号
[*]0:表示 GPU 的 NUMA 结构
[*]true:表示该 GPU 是健康的
[*]: 最后的冒号是分隔符
ps:这部分信息后续 hami-scheduler 进行调度时会用到,这里暂时不管。
小结

Register 方法分为两部分:

[*]Register:将 device plugin 注册到 kubelet
[*]WatchAndRegister:感知 Node 上的 GPU 信息,并和 kube-apiserver 交互,将这部分信息以 annotations 的形式添加到 Node 对象上,以便后续 hami-scheduler 使用。
4. ListAndWatch

ListAndWatch 方法用于感知节点上的设备并上报给 Kubelet。
由于需要将同一个 GPU 切分给多个 Pod 使用,因此 HAMi 的 device plugin 也会有类似 TimeSlicing 中的 Device 复制操作。
具体实现如下:
// ListAndWatch lists devices and update that list according to the health status
func (plugin *NvidiaDevicePlugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
    s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})

    for {
       select {
       case <-plugin.stop:
          return nil
       case d := <-plugin.health:
          // FIXME: there is no way to recover from the Unhealthy state.
          d.Health = kubeletdevicepluginv1beta1.Unhealthy
          klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
          s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
       }
    }
}比较长,我们只需要关注核心部分,同时先忽略 MIG 相关的逻辑。
首先是添加一个 CUDA_DEVICE_MEMORY_LIMIT_$Index 的环境变量,用于 gpu memory 限制。
// VisitDevices visits each top-level device and invokes a callback function for it
func (d *devicelib) VisitDevices(visit func(int, Device) error) error {
    count, ret := d.nvml.DeviceGetCount()
    if ret != nvml.SUCCESS {
       return fmt.Errorf("error getting device count: %v", ret)
    }

    for i := 0; i < count; i++ {
       device, ret := d.nvml.DeviceGetHandleByIndex(i)
       if ret != nvml.SUCCESS {
          return fmt.Errorf("error getting device handle for index '%v': %v", i, ret)
       }
       dev, err := d.newDevice(device)
       if err != nil {
          return fmt.Errorf("error creating new device wrapper: %v", err)
       }

       isSkipped, err := dev.isSkipped()
       if err != nil {
          return fmt.Errorf("error checking whether device is skipped: %v", err)
       }
       if isSkipped {
          continue
       }

       err = visit(i, dev)
       if err != nil {
          return fmt.Errorf("error visiting device: %v", err)
       }
    }
    return nil
}

// buildGPUDeviceMap builds a map of resource names to GPU devices
func (b *deviceMapBuilder) buildGPUDeviceMap() (DeviceMap, error) {
    devices := make(DeviceMap)

    b.VisitDevices(func(i int, gpu device.Device) error {
       name, ret := gpu.GetName()
       if ret != nvml.SUCCESS {
          return fmt.Errorf("error getting product name for GPU: %v", ret)
       }
       migEnabled, err := gpu.IsMigEnabled()
       if err != nil {
          return fmt.Errorf("error checking if MIG is enabled on GPU: %v", err)
       }
       if migEnabled && *b.config.Flags.MigStrategy != spec.MigStrategyNone {
          return nil
       }
       for _, resource := range b.config.Resources.GPUs {
          if resource.Pattern.Matches(name) {
             index, info := newGPUDevice(i, gpu)
             return devices.setEntry(resource.Name, index, info)
          }
       }
       return fmt.Errorf("GPU name '%v' does not match any resource patterns", name)
    })
    return devices, nil
} 然后则是根据申请的 gpucores 配置 gpu core 限制的环境变量
// GetPluginDevices returns the plugin Devices from all devices in the Devices
func (ds Devices) GetPluginDevices() []*kubeletdevicepluginv1beta1.Device {
    var res []*kubeletdevicepluginv1beta1.Device

    if !strings.Contains(ds.GetIDs(), "MIG") {
       for _, dev := range ds {
          for i := uint(0); i < *util.DeviceSplitCount; i++ {
             id := fmt.Sprintf("%v-%v", dev.ID, i)
             res = append(res, &kubeletdevicepluginv1beta1.Device{
                ID:       id,
                Health:   dev.Health,
                Topology: nil,
             })
          }
       }
    } else {
       for _, d := range ds {
          res = append(res, &d.Device)
       }

    }

    return res
}这个用于设置 share_regionmmap 文件在容器中的位置
for _, dev := range ds {
for i := uint(0); i < *util.DeviceSplitCount; i++ {
   id := fmt.Sprintf("%v-%v", dev.ID, i)
   res = append(res, &kubeletdevicepluginv1beta1.Device{
      ID:       id,
      Health:   dev.Health,
      Topology: nil,
   })
}Gpu memory 超额订阅
// pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go#L290
func (plugin *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *kubeletdevicepluginv1beta1.AllocateRequest) (*kubeletdevicepluginv1beta1.AllocateResponse, error) {
    klog.InfoS("Allocate", "request", reqs)
    responses := kubeletdevicepluginv1beta1.AllocateResponse{}
    nodename := os.Getenv(util.NodeNameEnvName)
    current, err := util.GetPendingPod(ctx, nodename)
    if err != nil {
       nodelock.ReleaseNodeLock(nodename, NodeLockNvidia)
       return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
    }
    klog.V(5).Infof("allocate pod name is %s/%s, annotation is %+v", current.Namespace, current.Name, current.Annotations)

    for idx, req := range reqs.ContainerRequests {
       // If the devices being allocated are replicas, then (conditionally)
       // error out if more than one resource is being allocated.

       if strings.Contains(req.DevicesIDs, "MIG") {

          if plugin.config.Sharing.TimeSlicing.FailRequestsGreaterThanOne && rm.AnnotatedIDs(req.DevicesIDs).AnyHasAnnotations() {
             if len(req.DevicesIDs) > 1 {
                return nil, fmt.Errorf("request for '%v: %v' too large: maximum request size for shared resources is 1", plugin.rm.Resource(), len(req.DevicesIDs))
             }
          }

          for _, id := range req.DevicesIDs {
             if !plugin.rm.Devices().Contains(id) {
                return nil, fmt.Errorf("invalid allocation request for '%s': unknown device: %s", plugin.rm.Resource(), id)
             }
          }

          response, err := plugin.getAllocateResponse(req.DevicesIDs)
          if err != nil {
             return nil, fmt.Errorf("failed to get allocate response: %v", err)
          }
          responses.ContainerResponses = append(responses.ContainerResponses, response)
       } else {
          currentCtr, devreq, err := util.GetNextDeviceRequest(nvidia.NvidiaGPUDevice, *current)
          klog.Infoln("deviceAllocateFromAnnotation=", devreq)
          if err != nil {
             device.PodAllocationFailed(nodename, current, NodeLockNvidia)
             return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
          }
          if len(devreq) != len(reqs.ContainerRequests.DevicesIDs) {
             device.PodAllocationFailed(nodename, current, NodeLockNvidia)
             return &kubeletdevicepluginv1beta1.AllocateResponse{}, errors.New("device number not matched")
          }
          response, err := plugin.getAllocateResponse(util.GetContainerDeviceStrArray(devreq))
          if err != nil {
             return nil, fmt.Errorf("failed to get allocate response: %v", err)
          }

          err = util.EraseNextDeviceTypeFromAnnotation(nvidia.NvidiaGPUDevice, *current)
          if err != nil {
             device.PodAllocationFailed(nodename, current, NodeLockNvidia)
             return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
          }

          for i, dev := range devreq {
             limitKey := fmt.Sprintf("CUDA_DEVICE_MEMORY_LIMIT_%v", i)
             response.Envs = fmt.Sprintf("%vm", dev.Usedmem)

             /*tmp := response.Envs["NVIDIA_VISIBLE_DEVICES"]
             if i > 0 {
                response.Envs["NVIDIA_VISIBLE_DEVICES"] = fmt.Sprintf("%v,%v", tmp, dev.UUID)
             } else {
                response.Envs["NVIDIA_VISIBLE_DEVICES"] = dev.UUID
             }*/
          }
          response.Envs["CUDA_DEVICE_SM_LIMIT"] = fmt.Sprint(devreq.Usedcores)
          response.Envs["CUDA_DEVICE_MEMORY_SHARED_CACHE"] = fmt.Sprintf("%s/vgpu/%v.cache", hostHookPath, uuid.New().String())
          if *util.DeviceMemoryScaling > 1 {
             response.Envs["CUDA_OVERSUBSCRIBE"] = "true"
          }
          if *util.DisableCoreLimit {
             response.Envs = "disable"
          }
          cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/containers/%s_%s", hostHookPath, current.UID, currentCtr.Name)
          os.RemoveAll(cacheFileHostDirectory)

          os.MkdirAll(cacheFileHostDirectory, 0777)
          os.Chmod(cacheFileHostDirectory, 0777)
          os.MkdirAll("/tmp/vgpulock", 0777)
          os.Chmod("/tmp/vgpulock", 0777)
          response.Mounts = append(response.Mounts,
             &kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu/libvgpu.so", hostHookPath),
                HostPath: hostHookPath + "/vgpu/libvgpu.so",
                ReadOnly: true},
             &kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu", hostHookPath),
                HostPath: cacheFileHostDirectory,
                ReadOnly: false},
             &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/tmp/vgpulock",
                HostPath: "/tmp/vgpulock",
                ReadOnly: false},
          )
          found := false
          for _, val := range currentCtr.Env {
             if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
                // if env existed but is set to false or can not be parsed, ignore
                t, _ := strconv.ParseBool(val.Value)
                if !t {
                   continue
                }
                // only env existed and set to true, we mark it "found"
                found = true
                break
             }
          }
          if !found {
             response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/etc/ld.so.preload",
                HostPath: hostHookPath + "/vgpu/ld.so.preload",
                ReadOnly: true},
             )
          }
          _, err = os.Stat(fmt.Sprintf("%s/vgpu/license", hostHookPath))
          if err == nil {
             response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
                ContainerPath: "/tmp/license",
                HostPath:      fmt.Sprintf("%s/vgpu/license", hostHookPath),
                ReadOnly:      true,
             })
             response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
                ContainerPath: "/usr/bin/vgpuvalidator",
                HostPath:      fmt.Sprintf("%s/vgpu/vgpuvalidator", hostHookPath),
                ReadOnly:      true,
             })
          }
          responses.ContainerResponses = append(responses.ContainerResponses, response)
       }
    }
    klog.Infoln("Allocate Response", responses.ContainerResponses)
    device.PodAllocationTrySuccess(nodename, nvidia.NvidiaGPUDevice, NodeLockNvidia, current)
    return &responses, nil
}是否关闭算力限制
for i, dev := range devreq {
    limitKey := fmt.Sprintf("CUDA_DEVICE_MEMORY_LIMIT_%v", i)
    response.Envs = fmt.Sprintf("%vm", dev.Usedmem)

    /*tmp := response.Envs["NVIDIA_VISIBLE_DEVICES"]
    if i > 0 {
       response.Envs["NVIDIA_VISIBLE_DEVICES"] = fmt.Sprintf("%v,%v", tmp, dev.UUID)
    } else {
       response.Envs["NVIDIA_VISIBLE_DEVICES"] = dev.UUID
    }*/
}挂载 vgpu 相关文件
这里就实现了 libvgpu.so 库的替换。
response.Envs["CUDA_DEVICE_SM_LIMIT"] = fmt.Sprint(devreq.Usedcores)替换动态库,当没有指定CUDA_DISABLE_CONTROL=true 时就会做该处理
response.Envs["CUDA_DEVICE_MEMORY_SHARED_CACHE"] = fmt.Sprintf("%s/vgpu/%v.cache", hostHookPath, uuid.New().String())整个实现也算比较容易理解,就是给 Pod 增加了一系列环境变量,以及增加了替换 libvgpu.so 的 Mounts 配置,后续这个 libvgpu.so 就会根据这些环境变量做 Core&Memory 的限制。
NVIDIA 原生逻辑

if *util.DeviceMemoryScaling > 1 {
    response.Envs["CUDA_OVERSUBSCRIBE"] = "true"
}核心部分就是这一句,添加了一个环境变量
if *util.DisableCoreLimit {
    response.Envs = "disable"
}而 plugin.deviceListEnvvar 的值来自:
// 缓存文件存放位置 /usr/local/vgpu/containers/xxx/xxx
cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/containers/%s_%s", hostHookPath, current.UID, currentCtr.Name)
os.RemoveAll(cacheFileHostDirectory)

os.MkdirAll(cacheFileHostDirectory, 0777)
os.Chmod(cacheFileHostDirectory, 0777)
os.MkdirAll("/tmp/vgpulock", 0777)
os.Chmod("/tmp/vgpulock", 0777)

response.Mounts = append(response.Mounts,
    // 宿主机上的 libvgpu.so挂载到 pod 里替换 nvidia 默认的驱动
    &kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu/libvgpu.so", hostHookPath),
       HostPath: hostHookPath + "/vgpu/libvgpu.so",
       ReadOnly: true},
    // 随机的文件挂载进 pod 作为 vgpu 使用
    &kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu", hostHookPath),
       HostPath: cacheFileHostDirectory,
       ReadOnly: false},
    // 一个 lock 文件
    &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/tmp/vgpulock",
       HostPath: "/tmp/vgpulock",
       ReadOnly: false},
)即:NVIDIA_VISIBLE_DEVICES
正好,这个 ENV 就是 NVIDIAdeviceplugin 中的实现,设置该环境变量之后 nvidia-container-toolkit 会为有这个环境变量的容器分配 GPU。
HAMi这里则是复用了nvidia-container-toolkit的能力将 GPU 分配给 Pod。
小结

Allocate 方法中核心部分包括三件事情:

[*]HAMi 自定义逻辑

[*]添加用于资源限制的环境变量CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT
[*]挂载 libvgpu.so 到 Pod 中进行替换

[*]NVIDIA 原生逻辑

[*]添加用于分配 GPU 的 环境变量 NVIDIA_VISIBLE_DEVICES,借助 NVIDIA Container Toolkit 将 GPU 挂载到 Pod 里

6. 总结

至此,HAMi 的 NVIDIA device plugin 工作原理就很清晰了。

[*]首先是 Register 插件注册,可以配置使用和原生 nvidia device plugin 不同的 ResourceName 来进行区分。

[*]另外会额外启动一个后台 goroutine WatchAndRegister 定时将 GPU 信息更新到 Node 对象的 Annoations 上便于 Scheduler 时使用。

[*]然后 ListAndWatch 感知设备时也根据配置对 Device 进行复制,让同一个设备能够分配给多个 Pod。

[*]这个和 TimeSlicing 方案一样

[*]最后 Allocate 方法中主要做三件事:

[*]1)为容器中增加NVIDIA_VISIBLE_DEVICES 环境变量,借助 NVIDIA Container Toolkit 实现为容器分配 GPU
[*]2)增加 Mounts 配置,挂载 libvgpu.so 到容器中实现对原始驱动的替换
[*]3)为容器中增加部分 HAMi 自定义的环境变量 CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT ,配合 ibvgpu.so 实现 GPU core、memory 的限制

核心其实就是在 Allocate 方法中,给容器中添加CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT环境变量和挂载 libvgpu.so 到容器中实现对原始驱动的替换。
当容器启动后,CUDA API 请求先经过 libvgpu.so,然后 libvgpu.so 根据环境变量 CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT 实现 Core & Memory 限制。
最后回答开篇提出的问题:HAMi 为什么要自己实现一个 device plugin 呢?hami-device-plugin-nvidia 是有哪些 NVIDIA 原生device plugin 没有的功能吗?
在 hami device plugin 相比原生的 NVIDIA device plugin 做了几个修改:

[*]1)注册时额外启动后台 goroutine WatchAndRegister 定时将 GPU 信息更新到 Node 对象的 Annoations 上便于 Scheduler 时使用。
[*]2)ListAndWatch 感知设备时也根据配置对 Device 进行复制,便于将同一个物理 GPU 分配给多个 Pod

[*]这个其实原生的 NVIDIA device plugin 也有,就是 TimeSlicing 方案。

[*]3)Allocate 中增加了 HAMi 自定义逻辑:

[*]挂载 libvgpu.so 到容器中实现对原始驱动的替换
[*]指定部分 HAMi 自定义的环境变量CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT,配合 ibvgpu.so 实现 GPU core、memory 的限制

7. FAQ

Node 上的 libvgpu.so 是怎么来的

Allocate 方法中要将 libvgpu.so 挂载到 Pod 里,这里用的是 HostPath 方式挂载,说明这个 libvgpu.so 是存在于宿主机上的。
那么问题来了,宿主机上的 libvgpu.so 是怎么来的?
这个实际上是打包在 HAMi 提供的 device-plugin 镜像里的,device-plugin 启动时将其从 Pod 里复制到宿主机上,相关 yaml 如下:
found := false
for _, val := range currentCtr.Env {
    if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
       // if env existed but is set to false or can not be parsed, ignore
       t, _ := strconv.ParseBool(val.Value)
       if !t {
          continue
       }
       // only env existed and set to true, we mark it "found"
       found = true
       break
    }
}
if !found {
    response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/etc/ld.so.preload",
       HostPath: hostHookPath + "/vgpu/ld.so.preload",
       ReadOnly: true},
    )
}挂载了 hostPath 到容器里,然后容器里执行cp -f /k8s-vgpu/lib/nvidia/* /usr/local/vgpu/ 命令将其复制到宿主机。
【Kubernetes 系列】持续更新中,搜索公众号【探索云原生】订阅,阅读更多文章。


来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 开源 vGPU 方案 HAMi 原理分析 Part1:hami-device-plugin-nvidia 实现