开源 vGPU 方案 HAMi 原理分析 Part1:hami-device-plugin-nvidia 实现
本文为开源的 vGPU 方案 HAMi 实现原理分析第一篇,主要分析 hami-device-plugin-nvidia 实现原理。
之前在 开源 vGPU 方案:HAMi,实现细粒度 GPU 切分 介绍了 HAMi 是什么,然后在开源 vGPU 方案 HAMi: core&memory 隔离测试 中对 HAMi 提供的 vGPU 方案进行了测试。
接下来则是逐步分析 HAMi 中的 vGPU 实现原理,涉及到的东西比较多,暂定分为几部分:
[*]1)hami-device-plugin-nvidia:HAMi 版本的 device plugin 中 GPU 感知以及分配逻辑是怎么实现的,和 NVIDIA 原生的 device-plugin 有和不同。
[*]2)HAMI-Scheduler:HAMi 是如何做调度的,binpack/spread 高级调度策略是怎么实现的
[*]3)HAMi-Core:这也是 vCUDA 方案最核心的部分,HAMi 是如何通过拦截 CUDA API 实现 Core&Memory 隔离限制的
本文为第一篇,分析 hami-device-plugin-nvidia 实现原理。
1. 概述
NVIDIA 是有自己实现 device plugin 的,那么问题来了:HAMi 为什么还要自己实现一个 device plugin 呢?
是 hami-device-plugin-nvidia 是有哪些 NVIDIA 原生device plugin 没有的功能吗?带着疑问,我们开始查看 hami-device-plugin-nvidia 源码。
这部分需要大家对 GPU Operator、k8s device plugin 等比较熟悉阅读起来才比较丝滑。
推荐阅读:
[*]GPU 环境搭建指南:如何在裸机、Docker、K8s 等环境中使用 GPU
[*]GPU 环境搭建指南:使用 GPU Operator 加速 Kubernetes GPU 环境搭建
[*]Kubernetes教程(二一)—自定义资源支持:K8s Device Plugin 从原理到实现
[*]Kubernetes教程(二二)—在 K8S 中创建 Pod 是如何使用到 GPU 的:device plugin&nvidia-container-toolkit 源码分析
后续都默认大家都对这块比较熟悉了,特别是后两篇
2. 程序入口
HAMi 首先支持的是 NVIDIA GPU,单独实现了一个 device plugin nvidia。
[*]启动文件在 cmd/device-plugin/nvidia
[*]核心实现在 pkg/device-plugin/nvidiadevice
默认大家都对 k8s 的 device plugin 机制比较熟悉了,因此这里只分析核心代码逻辑,不然篇幅就太长了。
对于一个 device plugin 我们一般关注以下 3 个地方:
[*]Register:将插件注册到 Kubelet 的,参数 ResourceName 比较重要
[*]ListAndWatch:device plugin 是怎么感知 GPU 并上报的
[*]Allocate:device plugin 是如何将 GPU 分配给 Pod 的
启动命令在 /cmd/device-plugin/nvidia,用的是 github.com/urfave/cli/v2 构建的一个命令行工具。
func main() {
var configFile string
c := cli.NewApp()
c.Name = "NVIDIA Device Plugin"
c.Usage = "NVIDIA device plugin for Kubernetes"
c.Version = info.GetVersionString()
c.Action = func(ctx *cli.Context) error {
return start(ctx, c.Flags)
}
c.Flags = []cli.Flag{
&cli.StringFlag{
Name: "mig-strategy",
Value: spec.MigStrategyNone,
Usage: "the desired strategy for exposing MIG devices on GPUs that support it:\n\t\t",
EnvVars: []string{"MIG_STRATEGY"},
},
&cli.BoolFlag{
Name: "fail-on-init-error",
Value: true,
Usage: "fail the plugin if an error is encountered during initialization, otherwise block indefinitely",
EnvVars: []string{"FAIL_ON_INIT_ERROR"},
},
&cli.StringFlag{
Name: "nvidia-driver-root",
Value: "/",
Usage: "the root path for the NVIDIA driver installation (typical values are '/' or '/run/nvidia/driver')",
EnvVars: []string{"NVIDIA_DRIVER_ROOT"},
},
&cli.BoolFlag{
Name: "pass-device-specs",
Value: false,
Usage: "pass the list of DeviceSpecs to the kubelet on Allocate()",
EnvVars: []string{"PASS_DEVICE_SPECS"},
},
&cli.StringSliceFlag{
Name: "device-list-strategy",
Value: cli.NewStringSlice(string(spec.DeviceListStrategyEnvvar)),
Usage: "the desired strategy for passing the device list to the underlying runtime:\n\t\t",
EnvVars: []string{"DEVICE_LIST_STRATEGY"},
},
&cli.StringFlag{
Name: "device-id-strategy",
Value: spec.DeviceIDStrategyUUID,
Usage: "the desired strategy for passing device IDs to the underlying runtime:\n\t\t",
EnvVars: []string{"DEVICE_ID_STRATEGY"},
},
&cli.BoolFlag{
Name: "gds-enabled",
Usage: "ensure that containers are started with NVIDIA_GDS=enabled",
EnvVars: []string{"GDS_ENABLED"},
},
&cli.BoolFlag{
Name: "mofed-enabled",
Usage: "ensure that containers are started with NVIDIA_MOFED=enabled",
EnvVars: []string{"MOFED_ENABLED"},
},
&cli.StringFlag{
Name: "config-file",
Usage: "the path to a config file as an alternative to command line options or environment variables",
Destination: &configFile,
EnvVars: []string{"CONFIG_FILE"},
},
&cli.StringFlag{
Name: "cdi-annotation-prefix",
Value: spec.DefaultCDIAnnotationPrefix,
Usage: "the prefix to use for CDI container annotation keys",
EnvVars: []string{"CDI_ANNOTATION_PREFIX"},
},
&cli.StringFlag{
Name: "nvidia-ctk-path",
Value: spec.DefaultNvidiaCTKPath,
Usage: "the path to use for the nvidia-ctk in the generated CDI specification",
EnvVars: []string{"NVIDIA_CTK_PATH"},
},
&cli.StringFlag{
Name: "container-driver-root",
Value: spec.DefaultContainerDriverRoot,
Usage: "the path where the NVIDIA driver root is mounted in the container; used for generating CDI specifications",
EnvVars: []string{"CONTAINER_DRIVER_ROOT"},
},
}
c.Flags = append(c.Flags, addFlags()...)
err := c.Run(os.Args)
if err != nil {
klog.Error(err)
os.Exit(1)
}
}
func addFlags() []cli.Flag {
addition := []cli.Flag{
&cli.StringFlag{
Name: "node-name",
Value: os.Getenv(util.NodeNameEnvName),
Usage: "node name",
EnvVars: []string{"NodeName"},
},
&cli.UintFlag{
Name: "device-split-count",
Value: 2,
Usage: "the number for NVIDIA device split",
EnvVars: []string{"DEVICE_SPLIT_COUNT"},
},
&cli.Float64Flag{
Name: "device-memory-scaling",
Value: 1.0,
Usage: "the ratio for NVIDIA device memory scaling",
EnvVars: []string{"DEVICE_MEMORY_SCALING"},
},
&cli.Float64Flag{
Name: "device-cores-scaling",
Value: 1.0,
Usage: "the ratio for NVIDIA device cores scaling",
EnvVars: []string{"DEVICE_CORES_SCALING"},
},
&cli.BoolFlag{
Name: "disable-core-limit",
Value: false,
Usage: "If set, the core utilization limit will be ignored",
EnvVars: []string{"DISABLE_CORE_LIMIT"},
},
&cli.StringFlag{
Name:"resource-name",
Value: "nvidia.com/gpu",
Usage: "the name of field for number GPU visible in container",
},
}
return addition
}启动时做了两件事:
[*]将插件注册到 Kubelet
[*]启动一个 gRPC 服务
我们只需要关注一下接收的几个参数:
&cli.UintFlag{
Name: "device-split-count",
Value: 2,
Usage: "the number for NVIDIA device split",
EnvVars: []string{"DEVICE_SPLIT_COUNT"},
},
&cli.Float64Flag{
Name: "device-memory-scaling",
Value: 1.0,
Usage: "the ratio for NVIDIA device memory scaling",
EnvVars: []string{"DEVICE_MEMORY_SCALING"},
},
&cli.Float64Flag{
Name: "device-cores-scaling",
Value: 1.0,
Usage: "the ratio for NVIDIA device cores scaling",
EnvVars: []string{"DEVICE_CORES_SCALING"},
},
&cli.BoolFlag{
Name: "disable-core-limit",
Value: false,
Usage: "If set, the core utilization limit will be ignored",
EnvVars: []string{"DISABLE_CORE_LIMIT"},
},
&cli.StringFlag{
Name:"resource-name",
Value: "nvidia.com/gpu",
Usage: "the name of field for number GPU visible in container",
},
[*]device-split-count:表示 GPU 的分割数,每一张 GPU 都不能分配超过其配置数目的任务。若其配置为 N 的话,每个 GPU 上最多可以同时存在 N 个任务。
[*]建议根据 GPU 性能动态调整,一般建议大于 10。
[*]device-memory-scaling:表示 GPU memory 的 oversubscription(超额订阅)** **比例,默认 1.0,大于 1.0 则表示启用虚拟显存(实验功能),不建议修改。
[*]device-cores-scaling:表示 GPU core 的 oversubscription(超额订阅)比例,默认 1.0。
[*]disable-core-limit:是否关闭 GPU Core Limit,默认 false,不建议修改。
[*]resource-name:资源名称,建议改掉,不推荐使用默认的 nvidia.com/gpu 因为这个和 nvidia 原生的重复了。
3. Register
Register
Register 方法实现如下:
// pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go#L222
// Register registers the device plugin for the given resourceName with Kubelet.
func (plugin *NvidiaDevicePlugin) Register() error {
conn, err := plugin.dial(kubeletdevicepluginv1beta1.KubeletSocket, 5*time.Second)
if err != nil {
return err
}
defer conn.Close()
client := kubeletdevicepluginv1beta1.NewRegistrationClient(conn)
reqt := &kubeletdevicepluginv1beta1.RegisterRequest{
Version: kubeletdevicepluginv1beta1.Version,
Endpoint: path.Base(plugin.socket),
ResourceName: string(plugin.rm.Resource()),
Options: &kubeletdevicepluginv1beta1.DevicePluginOptions{
GetPreferredAllocationAvailable: false,
},
}
_, err = client.Register(context.Background(), reqt)
if err != nil {
return err
}
return nil
} device plugin 注册时的几个核心信息:
[*]ResourceName:资源名称,这个和创建 Pod 时申请 vGPU 的资源名匹配时就会使用到该 device plugin。
[*]也是可以在 deivce plugin 启动时配置的,一般叫做 --resource-name=nvidia.com/vgpu
[*]Version:device plugin 的版本,这里是 v1beta1
[*]Endpoint:device plugin 的访问地址,Kubelet 会通过这个 sock 和 device plugin 进行交互。
[*]hami 用的格式为:/var/lib/kubelet/device-plugins/nvidia-xxx.sock,其中 xxx 是 从 ResourceName 中解析出来的,比如 nvidia.com/vgpu 那么这里的 xx 就是后面的 vgpu。
假设我们都使用默认值,ResourceName 为 nvidia.com/vgpu,Endpoint 为 /var/lib/kubelet/device-plugins/nvidia-vgpu.sock。
[*]后续 Pod Resource 中申请使用 nvidia.com/vgpu 资源时,就会由该 device plugin 来处理,实现资源分配,Kubelet 则是通过 var/lib/kubelet/device-plugins/nvidia-vgpu.sock 这个 sock 文件调用 device plugin API。
[*]反之,我们在 Pod Resource 中申请使用 nvidia.com/gpu 时,这个 ResourceName 和 hami 插件不匹配,因此不由 hami device plugin nvidia 处理,而是由 nvidia 自己的device plugin 进行处理。
WatchAndRegister
这个是 HAMi device plugin 中的一个特殊逻辑,将 node 上的 GPU 信息以 annotations 的形式添加到 Node 对象上。
这里是直接和 kube-apiserver 进行通信,而不是使用的传统的 device plugin 上报流程。
后续 HAMi-Scheduler 在进行调度时就会用到这边上报的 annotations 作为调度依据的一部分,分析 HAMi-Scheduler 时在仔细分析。
func (plugin *NvidiaDevicePlugin) WatchAndRegister() {
klog.Info("Starting WatchAndRegister")
errorSleepInterval := time.Second * 5
successSleepInterval := time.Second * 30
for {
err := plugin.RegistrInAnnotation()
if err != nil {
klog.Errorf("Failed to register annotation: %v", err)
klog.Infof("Retrying in %v seconds...", errorSleepInterval)
time.Sleep(errorSleepInterval)
} else {
klog.Infof("Successfully registered annotation. Next check in %v seconds...", successSleepInterval)
time.Sleep(successSleepInterval)
}
}
}getAPIDevices
获取 Node 上的 GPU 信息,并组装成 api.DeviceInfo 对象。
func (plugin *NvidiaDevicePlugin) getAPIDevices() *[]*api.DeviceInfo {
devs := plugin.Devices()
nvml.Init()
res := make([]*api.DeviceInfo, 0, len(devs))
idx := 0
for idx < len(devs) {
ndev, ret := nvml.DeviceGetHandleByIndex(idx)
//ndev, err := nvml.NewDevice(uint(idx))
//klog.V(3).Infoln("ndev type=", ndev.Model)
if ret != nvml.SUCCESS {
klog.Errorln("nvml new device by index error idx=", idx, "err=", ret)
panic(0)
}
memoryTotal := 0
memory, ret := ndev.GetMemoryInfo()
if ret == nvml.SUCCESS {
memoryTotal = int(memory.Total)
} else {
klog.Error("nvml get memory error ret=", ret)
panic(0)
}
UUID, ret := ndev.GetUUID()
if ret != nvml.SUCCESS {
klog.Error("nvml get uuid error ret=", ret)
panic(0)
}
Model, ret := ndev.GetName()
if ret != nvml.SUCCESS {
klog.Error("nvml get name error ret=", ret)
panic(0)
}
registeredmem := int32(memoryTotal / 1024 / 1024)
if *util.DeviceMemoryScaling != 1 {
registeredmem = int32(float64(registeredmem) * *util.DeviceMemoryScaling)
}
klog.Infoln("MemoryScaling=", *util.DeviceMemoryScaling, "registeredmem=", registeredmem)
health := true
for _, val := range devs {
if strings.Compare(val.ID, UUID) == 0 {
// when NVIDIA-Tesla P4, the device info is : ID:GPU-e290caca-2f0c-9582-acab-67a142b61ffa,Health:Healthy,Topology:nil,
// it is more reasonable to think of healthy as case-insensitive
if strings.EqualFold(val.Health, "healthy") {
health = true
} else {
health = false
}
break
}
}
numa, err := plugin.getNumaInformation(idx)
if err != nil {
klog.ErrorS(err, "failed to get numa information", "idx", idx)
}
res = append(res, &api.DeviceInfo{
ID: UUID,
Count: int32(*util.DeviceSplitCount),
Devmem:registeredmem,
Devcore: int32(*util.DeviceCoresScaling * 100),
Type: fmt.Sprintf("%v-%v", "NVIDIA", Model),
Numa: numa,
Health:health,
})
idx++
klog.Infof("nvml registered device id=%v, memory=%v, type=%v, numa=%v", idx, registeredmem, Model, numa)
}
return &res
}核心部分
// 通过 nvml 库获取 GPU 信息
ndev, ret := nvml.DeviceGetHandleByIndex(idx)
memoryTotal := 0
memory, ret := ndev.GetMemoryInfo()
if ret == nvml.SUCCESS {
memoryTotal = int(memory.Total)
}
UUID, ret := ndev.GetUUID()
Model, ret := ndev.GetName()
// 处理 Scaling
registeredmem := int32(memoryTotal / 1024 / 1024)
if *util.DeviceMemoryScaling != 1 {
registeredmem = int32(float64(registeredmem) * *util.DeviceMemoryScaling)
}
// 组装结果返回
res = append(res, &api.DeviceInfo{
ID: UUID,
Count: int32(*util.DeviceSplitCount),
Devmem:registeredmem,
Devcore: int32(*util.DeviceCoresScaling * 100),
Type: fmt.Sprintf("%v-%v", "NVIDIA", Model),
Numa: numa,
Health:health,
})更新到 Node Annoations
拿到 Device 信息之后,调用 kube-apiserver 更新 Node 对象的 Annoations 把 Device 信息存起来。
encodeddevices := util.EncodeNodeDevices(*devices)
annos = "Reported " + time.Now().String()
annos = encodeddevices
klog.Infof("patch node with the following annos %v", fmt.Sprintf("%v", annos))
err = util.PatchNodeAnnotations(node, annos)正常应该是走 k8s 的 device plugin 接口上报信息才对,这里是 HAMi 的特殊逻辑。
Demo
查看 Node 上的 Annoations,看看这边记录了些什么数据
root@j99cloudvm:~# k get node j99cloudvm -oyaml
apiVersion: v1
kind: Node
metadata:
annotations:
hami.io/node-handshake: Requesting_2024.09.25 07:48:26
hami.io/node-nvidia-register: 'GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA
A40,0,true:GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA
A40,0,true:'hami.io/node-nvidia-register 就是 HAMi 的 device plugin 更新到 Node 上的 GPU 信息,格式化一下
GPU-03f69c50-207a-2038-9b45-23cac89cb67d,10,46068,100,NVIDIA-NVIDIA A40,0,true:
GPU-1afede84-4e70-2174-49af-f07ebb94d1ae,10,46068,100,NVIDIA-NVIDIA A40,0,true:当前节点上是两张 A40 GPU,
[*]GPU-03f69c50-207a-2038-9b45-23cac89cb67d:为 GPU 设备的 UUID
[*]10,46068,100: 切分为 10 份,每张卡 46068M 内存,core 为 100 个(说明没有配置 oversubscription)
[*]NVIDIA-NVIDIA:GPU 类型
[*]A40:GPU 型号
[*]0:表示 GPU 的 NUMA 结构
[*]true:表示该 GPU 是健康的
[*]: 最后的冒号是分隔符
ps:这部分信息后续 hami-scheduler 进行调度时会用到,这里暂时不管。
小结
Register 方法分为两部分:
[*]Register:将 device plugin 注册到 kubelet
[*]WatchAndRegister:感知 Node 上的 GPU 信息,并和 kube-apiserver 交互,将这部分信息以 annotations 的形式添加到 Node 对象上,以便后续 hami-scheduler 使用。
4. ListAndWatch
ListAndWatch 方法用于感知节点上的设备并上报给 Kubelet。
由于需要将同一个 GPU 切分给多个 Pod 使用,因此 HAMi 的 device plugin 也会有类似 TimeSlicing 中的 Device 复制操作。
具体实现如下:
// ListAndWatch lists devices and update that list according to the health status
func (plugin *NvidiaDevicePlugin) ListAndWatch(e *kubeletdevicepluginv1beta1.Empty, s kubeletdevicepluginv1beta1.DevicePlugin_ListAndWatchServer) error {
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
for {
select {
case <-plugin.stop:
return nil
case d := <-plugin.health:
// FIXME: there is no way to recover from the Unhealthy state.
d.Health = kubeletdevicepluginv1beta1.Unhealthy
klog.Infof("'%s' device marked unhealthy: %s", plugin.rm.Resource(), d.ID)
s.Send(&kubeletdevicepluginv1beta1.ListAndWatchResponse{Devices: plugin.apiDevices()})
}
}
}比较长,我们只需要关注核心部分,同时先忽略 MIG 相关的逻辑。
首先是添加一个 CUDA_DEVICE_MEMORY_LIMIT_$Index 的环境变量,用于 gpu memory 限制。
// VisitDevices visits each top-level device and invokes a callback function for it
func (d *devicelib) VisitDevices(visit func(int, Device) error) error {
count, ret := d.nvml.DeviceGetCount()
if ret != nvml.SUCCESS {
return fmt.Errorf("error getting device count: %v", ret)
}
for i := 0; i < count; i++ {
device, ret := d.nvml.DeviceGetHandleByIndex(i)
if ret != nvml.SUCCESS {
return fmt.Errorf("error getting device handle for index '%v': %v", i, ret)
}
dev, err := d.newDevice(device)
if err != nil {
return fmt.Errorf("error creating new device wrapper: %v", err)
}
isSkipped, err := dev.isSkipped()
if err != nil {
return fmt.Errorf("error checking whether device is skipped: %v", err)
}
if isSkipped {
continue
}
err = visit(i, dev)
if err != nil {
return fmt.Errorf("error visiting device: %v", err)
}
}
return nil
}
// buildGPUDeviceMap builds a map of resource names to GPU devices
func (b *deviceMapBuilder) buildGPUDeviceMap() (DeviceMap, error) {
devices := make(DeviceMap)
b.VisitDevices(func(i int, gpu device.Device) error {
name, ret := gpu.GetName()
if ret != nvml.SUCCESS {
return fmt.Errorf("error getting product name for GPU: %v", ret)
}
migEnabled, err := gpu.IsMigEnabled()
if err != nil {
return fmt.Errorf("error checking if MIG is enabled on GPU: %v", err)
}
if migEnabled && *b.config.Flags.MigStrategy != spec.MigStrategyNone {
return nil
}
for _, resource := range b.config.Resources.GPUs {
if resource.Pattern.Matches(name) {
index, info := newGPUDevice(i, gpu)
return devices.setEntry(resource.Name, index, info)
}
}
return fmt.Errorf("GPU name '%v' does not match any resource patterns", name)
})
return devices, nil
} 然后则是根据申请的 gpucores 配置 gpu core 限制的环境变量
// GetPluginDevices returns the plugin Devices from all devices in the Devices
func (ds Devices) GetPluginDevices() []*kubeletdevicepluginv1beta1.Device {
var res []*kubeletdevicepluginv1beta1.Device
if !strings.Contains(ds.GetIDs(), "MIG") {
for _, dev := range ds {
for i := uint(0); i < *util.DeviceSplitCount; i++ {
id := fmt.Sprintf("%v-%v", dev.ID, i)
res = append(res, &kubeletdevicepluginv1beta1.Device{
ID: id,
Health: dev.Health,
Topology: nil,
})
}
}
} else {
for _, d := range ds {
res = append(res, &d.Device)
}
}
return res
}这个用于设置 share_regionmmap 文件在容器中的位置
for _, dev := range ds {
for i := uint(0); i < *util.DeviceSplitCount; i++ {
id := fmt.Sprintf("%v-%v", dev.ID, i)
res = append(res, &kubeletdevicepluginv1beta1.Device{
ID: id,
Health: dev.Health,
Topology: nil,
})
}Gpu memory 超额订阅
// pkg/device-plugin/nvidiadevice/nvinternal/plugin/server.go#L290
func (plugin *NvidiaDevicePlugin) Allocate(ctx context.Context, reqs *kubeletdevicepluginv1beta1.AllocateRequest) (*kubeletdevicepluginv1beta1.AllocateResponse, error) {
klog.InfoS("Allocate", "request", reqs)
responses := kubeletdevicepluginv1beta1.AllocateResponse{}
nodename := os.Getenv(util.NodeNameEnvName)
current, err := util.GetPendingPod(ctx, nodename)
if err != nil {
nodelock.ReleaseNodeLock(nodename, NodeLockNvidia)
return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
}
klog.V(5).Infof("allocate pod name is %s/%s, annotation is %+v", current.Namespace, current.Name, current.Annotations)
for idx, req := range reqs.ContainerRequests {
// If the devices being allocated are replicas, then (conditionally)
// error out if more than one resource is being allocated.
if strings.Contains(req.DevicesIDs, "MIG") {
if plugin.config.Sharing.TimeSlicing.FailRequestsGreaterThanOne && rm.AnnotatedIDs(req.DevicesIDs).AnyHasAnnotations() {
if len(req.DevicesIDs) > 1 {
return nil, fmt.Errorf("request for '%v: %v' too large: maximum request size for shared resources is 1", plugin.rm.Resource(), len(req.DevicesIDs))
}
}
for _, id := range req.DevicesIDs {
if !plugin.rm.Devices().Contains(id) {
return nil, fmt.Errorf("invalid allocation request for '%s': unknown device: %s", plugin.rm.Resource(), id)
}
}
response, err := plugin.getAllocateResponse(req.DevicesIDs)
if err != nil {
return nil, fmt.Errorf("failed to get allocate response: %v", err)
}
responses.ContainerResponses = append(responses.ContainerResponses, response)
} else {
currentCtr, devreq, err := util.GetNextDeviceRequest(nvidia.NvidiaGPUDevice, *current)
klog.Infoln("deviceAllocateFromAnnotation=", devreq)
if err != nil {
device.PodAllocationFailed(nodename, current, NodeLockNvidia)
return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
}
if len(devreq) != len(reqs.ContainerRequests.DevicesIDs) {
device.PodAllocationFailed(nodename, current, NodeLockNvidia)
return &kubeletdevicepluginv1beta1.AllocateResponse{}, errors.New("device number not matched")
}
response, err := plugin.getAllocateResponse(util.GetContainerDeviceStrArray(devreq))
if err != nil {
return nil, fmt.Errorf("failed to get allocate response: %v", err)
}
err = util.EraseNextDeviceTypeFromAnnotation(nvidia.NvidiaGPUDevice, *current)
if err != nil {
device.PodAllocationFailed(nodename, current, NodeLockNvidia)
return &kubeletdevicepluginv1beta1.AllocateResponse{}, err
}
for i, dev := range devreq {
limitKey := fmt.Sprintf("CUDA_DEVICE_MEMORY_LIMIT_%v", i)
response.Envs = fmt.Sprintf("%vm", dev.Usedmem)
/*tmp := response.Envs["NVIDIA_VISIBLE_DEVICES"]
if i > 0 {
response.Envs["NVIDIA_VISIBLE_DEVICES"] = fmt.Sprintf("%v,%v", tmp, dev.UUID)
} else {
response.Envs["NVIDIA_VISIBLE_DEVICES"] = dev.UUID
}*/
}
response.Envs["CUDA_DEVICE_SM_LIMIT"] = fmt.Sprint(devreq.Usedcores)
response.Envs["CUDA_DEVICE_MEMORY_SHARED_CACHE"] = fmt.Sprintf("%s/vgpu/%v.cache", hostHookPath, uuid.New().String())
if *util.DeviceMemoryScaling > 1 {
response.Envs["CUDA_OVERSUBSCRIBE"] = "true"
}
if *util.DisableCoreLimit {
response.Envs = "disable"
}
cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/containers/%s_%s", hostHookPath, current.UID, currentCtr.Name)
os.RemoveAll(cacheFileHostDirectory)
os.MkdirAll(cacheFileHostDirectory, 0777)
os.Chmod(cacheFileHostDirectory, 0777)
os.MkdirAll("/tmp/vgpulock", 0777)
os.Chmod("/tmp/vgpulock", 0777)
response.Mounts = append(response.Mounts,
&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu/libvgpu.so", hostHookPath),
HostPath: hostHookPath + "/vgpu/libvgpu.so",
ReadOnly: true},
&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu", hostHookPath),
HostPath: cacheFileHostDirectory,
ReadOnly: false},
&kubeletdevicepluginv1beta1.Mount{ContainerPath: "/tmp/vgpulock",
HostPath: "/tmp/vgpulock",
ReadOnly: false},
)
found := false
for _, val := range currentCtr.Env {
if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
// if env existed but is set to false or can not be parsed, ignore
t, _ := strconv.ParseBool(val.Value)
if !t {
continue
}
// only env existed and set to true, we mark it "found"
found = true
break
}
}
if !found {
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/etc/ld.so.preload",
HostPath: hostHookPath + "/vgpu/ld.so.preload",
ReadOnly: true},
)
}
_, err = os.Stat(fmt.Sprintf("%s/vgpu/license", hostHookPath))
if err == nil {
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
ContainerPath: "/tmp/license",
HostPath: fmt.Sprintf("%s/vgpu/license", hostHookPath),
ReadOnly: true,
})
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{
ContainerPath: "/usr/bin/vgpuvalidator",
HostPath: fmt.Sprintf("%s/vgpu/vgpuvalidator", hostHookPath),
ReadOnly: true,
})
}
responses.ContainerResponses = append(responses.ContainerResponses, response)
}
}
klog.Infoln("Allocate Response", responses.ContainerResponses)
device.PodAllocationTrySuccess(nodename, nvidia.NvidiaGPUDevice, NodeLockNvidia, current)
return &responses, nil
}是否关闭算力限制
for i, dev := range devreq {
limitKey := fmt.Sprintf("CUDA_DEVICE_MEMORY_LIMIT_%v", i)
response.Envs = fmt.Sprintf("%vm", dev.Usedmem)
/*tmp := response.Envs["NVIDIA_VISIBLE_DEVICES"]
if i > 0 {
response.Envs["NVIDIA_VISIBLE_DEVICES"] = fmt.Sprintf("%v,%v", tmp, dev.UUID)
} else {
response.Envs["NVIDIA_VISIBLE_DEVICES"] = dev.UUID
}*/
}挂载 vgpu 相关文件
这里就实现了 libvgpu.so 库的替换。
response.Envs["CUDA_DEVICE_SM_LIMIT"] = fmt.Sprint(devreq.Usedcores)替换动态库,当没有指定CUDA_DISABLE_CONTROL=true 时就会做该处理
response.Envs["CUDA_DEVICE_MEMORY_SHARED_CACHE"] = fmt.Sprintf("%s/vgpu/%v.cache", hostHookPath, uuid.New().String())整个实现也算比较容易理解,就是给 Pod 增加了一系列环境变量,以及增加了替换 libvgpu.so 的 Mounts 配置,后续这个 libvgpu.so 就会根据这些环境变量做 Core&Memory 的限制。
NVIDIA 原生逻辑
if *util.DeviceMemoryScaling > 1 {
response.Envs["CUDA_OVERSUBSCRIBE"] = "true"
}核心部分就是这一句,添加了一个环境变量
if *util.DisableCoreLimit {
response.Envs = "disable"
}而 plugin.deviceListEnvvar 的值来自:
// 缓存文件存放位置 /usr/local/vgpu/containers/xxx/xxx
cacheFileHostDirectory := fmt.Sprintf("%s/vgpu/containers/%s_%s", hostHookPath, current.UID, currentCtr.Name)
os.RemoveAll(cacheFileHostDirectory)
os.MkdirAll(cacheFileHostDirectory, 0777)
os.Chmod(cacheFileHostDirectory, 0777)
os.MkdirAll("/tmp/vgpulock", 0777)
os.Chmod("/tmp/vgpulock", 0777)
response.Mounts = append(response.Mounts,
// 宿主机上的 libvgpu.so挂载到 pod 里替换 nvidia 默认的驱动
&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu/libvgpu.so", hostHookPath),
HostPath: hostHookPath + "/vgpu/libvgpu.so",
ReadOnly: true},
// 随机的文件挂载进 pod 作为 vgpu 使用
&kubeletdevicepluginv1beta1.Mount{ContainerPath: fmt.Sprintf("%s/vgpu", hostHookPath),
HostPath: cacheFileHostDirectory,
ReadOnly: false},
// 一个 lock 文件
&kubeletdevicepluginv1beta1.Mount{ContainerPath: "/tmp/vgpulock",
HostPath: "/tmp/vgpulock",
ReadOnly: false},
)即:NVIDIA_VISIBLE_DEVICES
正好,这个 ENV 就是 NVIDIAdeviceplugin 中的实现,设置该环境变量之后 nvidia-container-toolkit 会为有这个环境变量的容器分配 GPU。
HAMi这里则是复用了nvidia-container-toolkit的能力将 GPU 分配给 Pod。
小结
Allocate 方法中核心部分包括三件事情:
[*]HAMi 自定义逻辑
[*]添加用于资源限制的环境变量CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT
[*]挂载 libvgpu.so 到 Pod 中进行替换
[*]NVIDIA 原生逻辑
[*]添加用于分配 GPU 的 环境变量 NVIDIA_VISIBLE_DEVICES,借助 NVIDIA Container Toolkit 将 GPU 挂载到 Pod 里
6. 总结
至此,HAMi 的 NVIDIA device plugin 工作原理就很清晰了。
[*]首先是 Register 插件注册,可以配置使用和原生 nvidia device plugin 不同的 ResourceName 来进行区分。
[*]另外会额外启动一个后台 goroutine WatchAndRegister 定时将 GPU 信息更新到 Node 对象的 Annoations 上便于 Scheduler 时使用。
[*]然后 ListAndWatch 感知设备时也根据配置对 Device 进行复制,让同一个设备能够分配给多个 Pod。
[*]这个和 TimeSlicing 方案一样
[*]最后 Allocate 方法中主要做三件事:
[*]1)为容器中增加NVIDIA_VISIBLE_DEVICES 环境变量,借助 NVIDIA Container Toolkit 实现为容器分配 GPU
[*]2)增加 Mounts 配置,挂载 libvgpu.so 到容器中实现对原始驱动的替换
[*]3)为容器中增加部分 HAMi 自定义的环境变量 CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT ,配合 ibvgpu.so 实现 GPU core、memory 的限制
核心其实就是在 Allocate 方法中,给容器中添加CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT环境变量和挂载 libvgpu.so 到容器中实现对原始驱动的替换。
当容器启动后,CUDA API 请求先经过 libvgpu.so,然后 libvgpu.so 根据环境变量 CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT 实现 Core & Memory 限制。
最后回答开篇提出的问题:HAMi 为什么要自己实现一个 device plugin 呢?hami-device-plugin-nvidia 是有哪些 NVIDIA 原生device plugin 没有的功能吗?
在 hami device plugin 相比原生的 NVIDIA device plugin 做了几个修改:
[*]1)注册时额外启动后台 goroutine WatchAndRegister 定时将 GPU 信息更新到 Node 对象的 Annoations 上便于 Scheduler 时使用。
[*]2)ListAndWatch 感知设备时也根据配置对 Device 进行复制,便于将同一个物理 GPU 分配给多个 Pod
[*]这个其实原生的 NVIDIA device plugin 也有,就是 TimeSlicing 方案。
[*]3)Allocate 中增加了 HAMi 自定义逻辑:
[*]挂载 libvgpu.so 到容器中实现对原始驱动的替换
[*]指定部分 HAMi 自定义的环境变量CUDA_DEVICE_MEMORY_LIMIT_X 和 CUDA_DEVICE_SM_LIMIT,配合 ibvgpu.so 实现 GPU core、memory 的限制
7. FAQ
Node 上的 libvgpu.so 是怎么来的
Allocate 方法中要将 libvgpu.so 挂载到 Pod 里,这里用的是 HostPath 方式挂载,说明这个 libvgpu.so 是存在于宿主机上的。
那么问题来了,宿主机上的 libvgpu.so 是怎么来的?
这个实际上是打包在 HAMi 提供的 device-plugin 镜像里的,device-plugin 启动时将其从 Pod 里复制到宿主机上,相关 yaml 如下:
found := false
for _, val := range currentCtr.Env {
if strings.Compare(val.Name, "CUDA_DISABLE_CONTROL") == 0 {
// if env existed but is set to false or can not be parsed, ignore
t, _ := strconv.ParseBool(val.Value)
if !t {
continue
}
// only env existed and set to true, we mark it "found"
found = true
break
}
}
if !found {
response.Mounts = append(response.Mounts, &kubeletdevicepluginv1beta1.Mount{ContainerPath: "/etc/ld.so.preload",
HostPath: hostHookPath + "/vgpu/ld.so.preload",
ReadOnly: true},
)
}挂载了 hostPath 到容器里,然后容器里执行cp -f /k8s-vgpu/lib/nvidia/* /usr/local/vgpu/ 命令将其复制到宿主机。
【Kubernetes 系列】持续更新中,搜索公众号【探索云原生】订阅,阅读更多文章。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
页:
[1]