最近在倒腾“AI大模型基础设施”, 宏观目标是做一个基于云原生的AI算力平台,目前因公司隐私暂不能公开宏观背景和技术方案, 姑且记录实践中遇到的一些技能点。
前文已经记录了第1步: 使用arena 提交训练任务的实践。
今天我们记录聊一聊平台侧另一个核心能力:
动态纳管云原生k8s集群,并监听AI/ML训练任务的状态变化。也就是上图的第4步。
作为面向算法开发者的云原生saas平台,平台在界面上提供了纳管集群的交互入口,平台启动后会去监听pytorch,mpi训练任务的状态变更,并回显到界面(并给开发者发送飞书变更通知)。
本文核心关注:
- 如何动态纳管k8s集群
- 如何重建k8s informer监听
这里我提供我的实践, 请看下图:
- 程序启动,加载初始k8s集群,informer监听训练任务状态,并绑定informer停止信号(stopCh)、重建信号(rebuildCh)
- package main
- import "k8s.io/client-go/rest"
- type StartInformerFunc func(clusterId string, restConf *rest.Config) (stopCh chan struct{}, err error)
- type InformerManager struct {
- clusterConfigs map[string]string
- gvr map[string]StartInformerFunc
- stopCh chan struct{} // informer 需要用到
- rebuildCh chan struct{}
- }
- var InformerManagerInstance *InformerManager
- func NewInFormerManager() *InformerManager {
- InformerManagerInstance = &InformerManager{
- clusterConfigs: map[string]string{
- "id1": "kubeconfig1",
- "id2": "kubeconfig2",
- },
- gvr: map[string]StartInformerFunc{
- "pytorchjob": startPytorchjobInformer,
- "mpijob": startMpijobInformer,
- "job": startRawjobInformer,
- },
- stopCh: make(chan struct{}),
- rebuildCh: make(chan struct{}, 1),
- }
- return InformerManagerInstance
- }
复制代码
- 开协程定时任务去轮循落盘的待纳管k8s集群记录
- 考虑纳管的k8s集群数据可控,变更时机可控,采用md5校验的方式判断是否发生集群变更
下面的k8s.CheckClusterChanged(mgr.clusterConfigs) 是利用对kube-configs做md5, 前后对比判断集群是否发生变更。
[code]func (mgr *InformerManager) monitorcLusterChanged() bool { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case |