雌鲳签 发表于 2025-9-29 12:50:14

再不学就晚了!RDT × LeRobot与RDKS100部署详解

作者:SkyXZ
CSDN:SkyXZ~-CSDN博客
博客园:SkyXZ - 博客园
机械臂:LeRobot-SO101         数采机:MacBook-Pro Python3.10
开发机:Ubuntu 22.04, Cuda12.4,8 × NVIDIA A100-SXM4-40GB
开发板:RDK OS 4.0.2 Based on Ubuntu 22.04, Python 3.10.12, OpenExplore 3.2.0
相关资料:

[*]LeRobot Doc:https://huggingface.co/docs/lerobot/main/en/index
[*]RDT 170M&1B:https://github.com/thu-ml/RoboticsDiffusionTransformer
[*]RDT on RDKS100:RDT on Double RDK S100P 全流程文档
一、环境安装&机械臂配置


[*]所有代码已上传至GitHub:GitHub - xiongqi123123/LeRobot-VLA: Classic VLA for LeRobot
环境安装

       我们首先完成LeRobot环境的安装,我们默认使用conda作为环境管理,先运行以下命令创建一个Python3.10的虚拟环境
conda create -y -n lerobot python=3.10        接着便可以在环境中运行以下命令来配置lerobot所需要的依赖(使用的lerobot源码为我修改之后的,仅添加了本地serverclient,其他部分与官方源码一致)
# step:0 安装编译依赖
sudo apt-get install cmake build-essential python-dev pkg-config libavformat-dev libavcodec-dev libavdevice-dev libavutil-dev libswscale-dev libswresample-dev libavfilter-dev pkg-config
# step:1 激活环境
conda activate lerobot
# step:2 安装ffmpeg
conda install ffmpeg -c conda-forge
# 以下两种方式任选其一:
# step:3 从源码安装lerobot
git clone https://github.com/xiongqi123123/LeRobot-VLA.git
cd LeRobot-VLA/lerobot
pip install -e .
# step:3 从PyPI 安装
pip install lerobot
# 要安装附加功能,请使用以下之一
pip install 'lerobot'          # All available features
pip install 'lerobot'# Specific features (Aloha & Pusht)
pip install 'lerobot'      # Feetech motor support机械臂安装


[*]官方安装教程:SO-101 - Hugging Face 机器学习平台
        由于SO-ARM101机械臂默认不提供机械臂上的相机安装位置,因此我们在原始的机械臂夹爪部分自行设计添加了一个相机固定的位置(安装孔位与夹爪是对齐的),我们使用的相机是亚博智能的1080P高清免驱摄像头,打印文件已保存进仓库中:

        拿到刚拆封的机械臂配件我们首先将Follower和Leader臂的物料进行区分,要注意的是Follower机械臂使用的是12个ST-3215-C001(7.4V)1:345齿轮比的电机,而Leader臂不同关机使用的电机型号有所不同,不同关节的电机型号区分如下图及上表:

        接下来我们便可以按照官方提供的如下3D演示动画安装Follower和Leader了,下面是关节一到关节五以及夹爪的安装实例,除了夹爪之外,前面五个关节的安装方法均一致,仅需注意Leader臂的电机型号:
机械臂电机配置

        在完成机械臂的安装后我们便可以开始对机械臂两个臂的电机进行配置设置其对应的ID了,新版的LeRobot提供了CLI命令可以直接运行对应的任务,我们首先将两个机械臂的串口钱全部接上电脑并运行如下命令,接着按照提示拔出其中一个串口线按下回车即可知道拔出的串口号是多少(实际就是记录插上的所有串口,然后再和拔出后的进行对比就可以知道哪个串口少了...),示例输出如下图:
lerobot-find-port
        由于LeRobot使用的是总线舵机,每个电机都通过总线上的唯一 ID 进行识别,而全新电机通常带有一个默认 ID 为 1,所以为了让电机和控制器之间正常通信,我们需要为每个电机设置一个唯一的、不同的 ID,在确定了自己的两个机械臂分别对应的串口后我们便可以一个臂一个臂一个电机一个电机的进行配置啦,在新版的LeRobot库不需要重复多次运行电机配置命令,我们只需要运行以下命令并依次将不同的舵机线插到控制板按回车即可,具体可以参考以下视频:
lerobot-setup-motors \
    --robot.type=so101_follower \
    --robot.port=/dev/tty.usbmodem585A0076841# <- paste here the port found at previous step        接下来仅需运行这个脚本或者是如下命令即可完成数据的转换:
lerobot-calibrate \
    --robot.type=so101_follower \
    --robot.port=/dev/tty.usbmodem58760431551 \ # <- The port of your robot
    --robot.id=my_awesome_follower_arm # <- Give the robot a unique name        我们使用RoboTwin修改后的RDT版本,这个版本使用比较简单快速(更多关于RDT的信息请见:RDT on Double RDK S100P 全流程文档),由于RDT默认是双臂任务,而我们采集的LeRobot是单臂数据,且我们只采集了两个摄像头的画面跟RDT默认的三个摄像头不匹配,因此如果直接训练的话肯定会报索引不匹配的错误,因此我们还需要对数据集加载的部分进行修改,首先是修改action的归一化部分,我们直接在加载的时候对数据除以[],然后便是将LeRobot的单臂映射到RDT的右臂部分的动作维度并将左臂整个给剔除同时把RDT默认加载的右臂图像用Ground图像进行替代,请使用以下完成了修改的代码替换原本的代码中的RDT/data/hdf5_vla_dataset.py
lerobot-find-cameras opencv开始训练

训练环境配置

        我们首先来安装RDT训练所需的环境,此我们进入RDT目录下依次安装如下包即可:
lerobot-teleoperate \
    --robot.type=so101_follower \
    --robot.port=/dev/tty.usbmodem5AB90671801 \
    --robot.id=my_awesome_follower_arm \
    --teleop.type=so101_leader \
    --teleop.port=/dev/tty.usbmodem5AB90671501 \
    --teleop.id=my_awesome_leader_arm \        除了上述依赖之外我们还需要安装flash_attn用来加速,为了避免网络连接问题我们手动下载预编译的wheel 文件,下载连接为:https://github.com/Dao-AILab/flash-attention/releases,我们需要根据我们实际安装的torch及cuda版本来选择对应的版本,然后我们还需要根据我们下载的PyTorch是如何编译的来选择对应的cxx11abi 是 TRUE 还是 FALSE。
lerobot-teleoperate \
    --robot.type=so101_follower \
    --robot.port=/dev/tty.usbmodem5AB90671801 \
    --robot.id=my_awesome_follower_arm \
    --robot.cameras="{ arm: {type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}, front: {type: opencv, index_or_path: 2, width: 1920, height: 1080, fps: 30}}" \
    --teleop.type=so101_leader \
    --teleop.port=/dev/tty.usbmodem5AB90671501 \
    --teleop.id=my_awesome_leader_arm \
    --display_data=true        接着我们便可以根据输出判断我们要下载的是哪个版本啦,如下图所示我们当前的PyTorch是CXX11_ABI = 0,因此我们要下载的是cxx11abiFALSE 的 .whl 文件
lerobot-record \
    --robot.type=so101_follower \
    --robot.port=/dev/tty.usbmodem5AB90671801 \
    --robot.id=my_awesome_follower_arm \
    --robot.cameras="{ arm: {type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}, front: {type: opencv, index_or_path: 2, width: 1920, height: 1080, fps: 30}}" \
    --teleop.type=so101_leader \
    --teleop.port=/dev/tty.usbmodem5AB90671501 \
    --teleop.id=my_awesome_leader_arm \
    --display_data=true \
    --dataset.repo_id=skyxz/blackmarker_scence1 \
    --dataset.num_episodes=5 \
    --dataset.single_task="Grab the black marker and put it in the bin" \
    --dataset.push_to_hub=False \
    --dataset.episode_time_s=15 \
    --dataset.reset_time_s=5         如果上述步骤跟我的一样的话,那么大家需要下载并安装的应该是如下的版本,下载后安装即可:
(RoboTwin) qi.xiong@A100-Test:~/Data_Qi/LeRobot/skyxz/blackmarker_scence1$ python3 -c "import h5py; f=h5py.File('/home/qi.xiong/Data_Qi/RDT/processed_data/place_dual_shoes-demo_clean-300/episode_3/episode_3.hdf5','r'); f.visit(print); f.close()"
action
observations
observations/images
observations/images/cam_high
observations/images/cam_left_wrist
observations/images/cam_right_wrist
observations/left_arm_dim
observations/qpos
observations/right_arm_dim预训练模型下载

        RDT分为了1B版本(SigLip+DIT+Adaptor)以及单独的170M(DiT)两个版本,其中的区别仅在于最后的DiT的hidden_size和depth维度区别,170M相比于1B的版本直接减半了,如果需要在RDKS100上部署的话请参考接下来的RDT170M模型版本以保证可行的性能,在训练之前还需按照如下的步骤下载预训练模型及完成训练环境的安装;
#!/usr/bin/env python3
"""
LeRobot到RDT数据转换脚本

LeRobot机器人结构:
- 5个关节 (shoulder_pan, shoulder_lift, elbow_flex, wrist_flex, wrist_roll)
- 1个夹爪 (gripper)
- 总计:6个自由度 (6DOF)

维度映射(匹配RDT训练代码):
- left_arm_dim = 0 (单臂机器人,左臂不存在)
- right_arm_dim = 6 (5关节 + 1夹爪,映射到RDT的right_arm部分)
- 状态向量:6维
- RDT索引映射:right_arm_joint_0_pos到right_arm_joint_5_pos (索引0-5)
"""

import sys
import os
import h5py
import numpy as np
import cv2
import argparse
import yaml
import json
import subprocess
from pathlib import Path
import pandas as pd
import torch

current_dir = os.path.dirname(__file__)
sys.path.append(os.path.join(current_dir, ".."))
from models.multimodal_encoder.t5_encoder import T5Embedder

def extract_frames_from_video(video_path, output_dir, episode_idx):
    if not os.path.exists(video_path):
      print(f"No video file: {video_path}")
      return []
   
    temp_dir = os.path.join(output_dir, f"temp_frames_{episode_idx}")
    if not os.path.exists(temp_dir):
      os.makedirs(temp_dir)
   
    output_pattern = os.path.join(temp_dir, "frame_%04d.jpg")
   
    try:
      cmd = [
            'ffmpeg', '-i', video_path,
            '-vf', 'fps=30',
            '-q:v', '2',
            output_pattern,
            '-y'
      ]
      
      result = subprocess.run(cmd, capture_output=True, text=True)
      
      if result.returncode != 0:
            print(f"Failed to extract frames with ffmpeg: {result.stderr}")
            return []
      
      frames = []
      frame_files = sorted()
      
      for frame_file in frame_files:
            frame_path = os.path.join(temp_dir, frame_file)
            frame = cv2.imread(frame_path)
            if frame is not None:
                frame_resized = cv2.resize(frame, (640, 480))
                frames.append(frame_resized)
      
      print(f"Successfully extracted {len(frames)} frames")
      
      for frame_file in frame_files:
            os.remove(os.path.join(temp_dir, frame_file))
      os.rmdir(temp_dir)
      
      return frames
      
    except Exception as e:
      print(f"Error extracting frames: {e}")
      return []

def load_lerobot_episode(data_dir, episode_idx, output_dir):
    """加载LeRobot的单个episode数据
   
    LeRobot数据结构:
    - action: 6维
    - observation.state: 6维
    - 图像: 高位相机 + 手臂相机
    """
    parquet_path = os.path.join(data_dir, "data/chunk-000", f"episode_{episode_idx:06d}.parquet")
    if not os.path.exists(parquet_path):
      print(f"Episode {episode_idx} parquet file does not exist: {parquet_path}")
      return None
   
    df = pd.read_parquet(parquet_path)
   
    actions = []
    qpos = []
   
    for i in range(len(df)):
      action = df['action'].iloc
      state = df['observation.state'].iloc
      
      if isinstance(action, np.ndarray):
            actions.append(action.astype(np.float32))
      else:
            actions.append(np.array(action, dtype=np.float32))
            
      if isinstance(state, np.ndarray):
            qpos.append(state.astype(np.float32))
      else:
            qpos.append(np.array(state, dtype=np.float32))
   
    high_cam_path = os.path.join(data_dir, "videos/chunk-000/observation.images.high", f"episode_{episode_idx:06d}.mp4")
    arm_cam_path = os.path.join(data_dir, "videos/chunk-000/observation.images.arm", f"episode_{episode_idx:06d}.mp4")
   
    print(f"Extracting high camera frames...")
    high_images = extract_frames_from_video(high_cam_path, output_dir, episode_idx)
   
    print(f"Extracting arm camera frames...")
    arm_images = extract_frames_from_video(arm_cam_path, output_dir, episode_idx)
   
    target_frames = len(df)
    if len(high_images) > target_frames:
      high_images = high_images[:target_frames]
    if len(arm_images) > target_frames:
      arm_images = arm_images[:target_frames]
   
    while len(high_images) < target_frames and high_images:
      high_images.append(high_images[-1])
    while len(arm_images) < target_frames and arm_images:
      arm_images.append(arm_images[-1])
   
    return {
      'actions': np.array(actions),
      'qpos': np.array(qpos),
      'high_images': high_images,
      'arm_images': arm_images,
      'episode_length': len(df)
    }

def images_encoding(imgs):
    if not imgs:
      return [], 0
      
    encode_data = []
    padded_data = []
    max_len = 0
   
    for i in range(len(imgs)):
      success, encoded_image = cv2.imencode(".jpg", imgs)
      if success:
            jpeg_data = encoded_image.tobytes()
            encode_data.append(jpeg_data)
            max_len = max(max_len, len(jpeg_data))
      else:
            print(f"Image encoding failed: {i}")
            empty_data = b""
            encode_data.append(empty_data)
   
    for i in range(len(imgs)):
      padded_data.append(encode_data.ljust(max_len, b"\0"))
   
    return encode_data, max_len

def load_task_instructions(data_dir):
    tasks_file = os.path.join(data_dir, "meta/tasks.jsonl")
    if not os.path.exists(tasks_file):
      print(f"Warning: tasks file not found: {tasks_file}")
      return None
   
    instructions = []
    with open(tasks_file, 'r') as f:
      for line in f:
            if line.strip():
                task_data = json.loads(line.strip())
                instructions.append(task_data["task"])
   
    print(f"加载了 {len(instructions)} 个任务指令")
    return instructions

def encode_language_instruction(instruction_text, t5_embedder, device):
    try:
      text_embeds, attn_mask = t5_embedder.get_text_embeddings()
      
      valid_embeds = text_embeds].float()
      return valid_embeds.cpu().numpy()
      
    except Exception as e:
      print(f"Language encoding failed: {e}")
      return np.zeros((1, 4096))

def convert_lerobot_to_rdt(data_dir, output_dir, episode_num, gpu=0, no_language=False):
    if not os.path.exists(output_dir):
      os.makedirs(output_dir)
   
    print(f"Start converting LeRobot data to RDT format...")
    print(f"Data source: {data_dir}")
    print(f"Output directory: {output_dir}")
    print(f"Processing episode number: {episode_num}")
    print(f"GPU device: {gpu}")
   
    scene_name = os.path.basename(data_dir)
   
    instructions = None
    if not no_language:
      instructions = load_task_instructions(data_dir)
   
    t5_embedder = None
    if not no_language and instructions:
      try:
            print(f"Initializing T5 encoder...")
            t5_model_path = "/home/qi.xiong/Data_Qi/t5-v1_1-xxl"
            if not os.path.exists(t5_model_path):
                print(f"Warning: T5 model path does not exist: {t5_model_path}")
                print(f"Will skip language processing")
                no_language = True
            else:
                t5_embedder = T5Embedder(
                  from_pretrained=t5_model_path,
                  device=f"cuda:{gpu}" if torch.cuda.is_available() else "cpu",
                  model_max_length=120,
                  use_offload_folder=None,
                )
                print(f"T5 encoder initialized successfully")
      except Exception as e:
            print(f"T5 encoder initialization failed: {e}")
            print(f"Will skip language processing")
            no_language = True
   
    for i in range(episode_num):
      print(f"Processing episode {i}...")
      
      episode_data = load_lerobot_episode(data_dir, i, output_dir)
      if episode_data is None:
            print(f"Skipping episode {i}")
            continue
      
      episode_output_dir = os.path.join(output_dir, f"episode_{i}")
      if not os.path.exists(episode_output_dir):
            os.makedirs(episode_output_dir)
      
      hdf5_path = os.path.join(episode_output_dir, f"episode_{i}.hdf5")
      
      with h5py.File(hdf5_path, "w") as f:
            f.create_dataset("action", data=episode_data['actions'])
            
            obs = f.create_group("observations")
            obs.create_dataset("qpos", data=episode_data['qpos'])
            
            image = obs.create_group("images")
            
            if episode_data['high_images']:
                print(f"Encoding high camera images...")
                high_enc, len_high = images_encoding(episode_data['high_images'])
                if high_enc and len_high > 0:
                  image.create_dataset("cam_high", data=high_enc, dtype=f"S{len_high}")
                  print(f"Saved high camera images: {len(episode_data['high_images'])} frames")
                else:
                  print(f"Warning: High camera images encoding failed")
            
            if episode_data['arm_images']:
                print(f"Encoding arm camera images...")
                arm_enc, len_arm = images_encoding(episode_data['arm_images'])
                if arm_enc and len_arm > 0:
                  image.create_dataset("cam_right_wrist", data=arm_enc, dtype=f"S{len_arm}")
                  print(f"Saved arm camera images: {len(episode_data['arm_images'])} frames")
                else:
                  print(f"Warning: Arm camera images encoding failed")
            
            # 添加机器人维度信息(LeRobot: 5个关节 + 1个夹爪)
            # 根据process_data.py的逻辑,每个时间步都需要记录维度信息
            # LeRobot是单臂机器人,只有右臂:5个关节 + 1个夹爪 = 6维
            # 左臂:0维(单臂机器人)
            
            # 为每个时间步记录维度信息
            left_arm_dim = * len(episode_data['actions'])# 左臂0维(单臂机器人)
            right_arm_dim = * len(episode_data['actions'])# 右臂6维(5关节+1夹爪)
            
            obs.create_dataset("left_arm_dim", data=np.array(left_arm_dim))
            obs.create_dataset("right_arm_dim", data=np.array(right_arm_dim))
      
      print(f"Episode {i} converted successfully: {hdf5_path}")
      print(f"Data length: {episode_data['episode_length']}")
      print(f"Action shape: {episode_data['actions'].shape}")
      print(f"Qpos shape: {episode_data['qpos'].shape}")
      print(f"High camera frames: {len(episode_data['high_images'])}")
      print(f"Arm camera frames: {len(episode_data['arm_images'])}")
      
      if not no_language and t5_embedder and instructions:
            print(f"Processing language instructions...")
            try:
                instruction = instructions
               
                language_features = encode_language_instruction(instruction, t5_embedder, f"cuda:{gpu}")
               
                instructions_dir = os.path.join(episode_output_dir, "instructions")
                if not os.path.exists(instructions_dir):
                  os.makedirs(instructions_dir)
               
                lang_embed_path = os.path.join(instructions_dir, "lang_embed_0.pt")
                torch.save(torch.from_numpy(language_features), lang_embed_path)
               
                print(f"Language instruction encoded successfully: {instruction}")
                print(f"Language features saved to: {lang_embed_path}")
                print(f"Language features shape: {language_features.shape}, data type: {language_features.dtype}")
               
            except Exception as e:
                print(f"Language instruction processing failed: {e}")
   
    print(f"\nConversion completed! Processed {episode_num} episodes")
    print(f"Output directory: {output_dir}")

def main():
    parser = argparse.ArgumentParser(description="Convert LeRobot data to RDT format")
    parser.add_argument("--data_dir", type=str, required=True,
                     help="LeRobot data directory path")
    parser.add_argument("--output_dir", type=str, required=True,
                     help="Output directory path")
    parser.add_argument("--episode_num", type=int, default=10,
                     help="Number of episodes to process")
    parser.add_argument("--gpu", type=int, default=0,
                     help="GPU device ID")
    parser.add_argument("--no_language", action="store_true",
                     help="Skip language processing")
   
    args = parser.parse_args()
   
    if not os.path.exists(args.data_dir):
      print(f"Error: Data directory does not exist: {args.data_dir}")
      return
   
    meta_file = os.path.join(args.data_dir, "meta/info.json")
    if not os.path.exists(meta_file):
      print(f"Error: Meta information file not found: {meta_file}")
      return
   
    try:
      subprocess.run(['ffmpeg', '-version'], capture_output=True, check=True)
      print("ffmpeg is available, will use ffmpeg to extract video frames")
    except (subprocess.CalledProcessError, FileNotFoundError):
      print("Warning: ffmpeg is not available, image data may not be extracted correctly")
      print("Please install ffmpeg: conda install -c conda-forge ffmpeg=6.1")
      return
   
    with open(meta_file, 'r') as f:
      meta_info = yaml.safe_load(f)
   
    total_episodes = meta_info.get('total_episodes', 10)
    if args.episode_num > total_episodes:
      print(f"Warning: Requested episode number ({args.episode_num}) exceeds available number ({total_episodes})")
      args.episode_num = total_episodes
   
    convert_lerobot_to_rdt(
      args.data_dir,
      args.output_dir,
      args.episode_num,
      args.gpu,
      args.no_language
    )

if __name__ == "__main__":
    main()RDT1B微调

        要训练1B的版本我们需要修改RDT/configs/base.yaml文件中model类下的rdt参数
# 法一 bash process_data_rdt.sh data_dir=${1} output_dir=${2} episode_num=${3} gpu_id=${4}
bash process_data_rdt.sh /home/qi.xiong/Data_Qi/LeRobot/skyxz/redmarker_scence4 /home/qi.xiong/DualArm/RoboTwin/policy/RDT-LeRobot/processed_data/redmarker_scence4 5 0
# 法二 python scripts/process_data_lerobot.py --data_dir --output_dir --episode_num --gpu
python3 scripts/process_data_lerobot.py --data_dir /home/qi.xiong/Data_Qi/LeRobot/skyxz/redmarker_scence4 --output_dir /home/qi.xiong/DualArm/RoboTwin/policy/RDT-LeRobot/processed_data/redmarker_scence4        接着生成训练的参数yml文件,并将pretrained_model_name_or_path指向我们先前下载的1B模型
import os
import fnmatch
import json

import h5py
import yaml
import cv2
import numpy as np

from configs.state_vec import STATE_VEC_IDX_MAPPING

class HDF5VLADataset:
    """
    This class is used to sample episodes from the embododiment dataset
    stored in HDF5.
    """

    def __init__(self, model_config_path) -> None:
      # The path to the HDF5 dataset directory
      # Each HDF5 file contains one episode
      with open(model_config_path, "r") as f:
            model_config = yaml.safe_load(f)
      HDF5_DIR = model_config["data_path"]
      self.DATASET_NAME = "agilex"

      self.file_paths = []
      for root, _, files in os.walk(HDF5_DIR):
            for filename in fnmatch.filter(files, "*.hdf5"):
                file_path = os.path.join(root, filename)
                self.file_paths.append(file_path)

      # Load the config
      with open("configs/base.yaml", "r") as file:
            config = yaml.safe_load(file)
      self.CHUNK_SIZE = config["common"]["action_chunk_size"]
      self.IMG_HISORY_SIZE = config["common"]["img_history_size"]
      self.STATE_DIM = config["common"]["state_dim"]

      # Get each episode's len (use original length, not standardized length)
      episode_lens = []
      for file_path in self.file_paths:
            try:
                with h5py.File(file_path, "r") as f:
                  qpos = f["observations"]["qpos"][:]
                  num_steps = qpos.shape
                  episode_lens.append(num_steps)
            except Exception as e:
                print(f"Warning: Could not read {file_path}: {e}")
                episode_lens.append(0)
      self.episode_sample_weights = np.array(episode_lens) / np.sum(episode_lens)

    def __len__(self):
      return len(self.file_paths)

    def get_dataset_name(self):
      return self.DATASET_NAME

    def get_item(self, index: int = None, state_only=False):
      """Get a training sample at a random timestep.

      Args:
            index (int, optional): the index of the episode.
                If not provided, a random episode will be selected.
            state_only (bool, optional): Whether to return only the state.
                In this way, the sample will contain a complete trajectory rather
                than a single timestep. Defaults to False.

      Returns:
         sample (dict): a dictionary containing the training sample.
      """
      while True:
            if index is None:
                file_path = np.random.choice(self.file_paths, p=self.episode_sample_weights)
            else:
                file_path = self.file_paths
            valid, sample = (self.parse_hdf5_file(file_path)
                           if not state_only else self.parse_hdf5_file_state_only(file_path))
            if valid:
                return sample
            else:
                index = np.random.randint(0, len(self.file_paths))

    def parse_hdf5_file(self, file_path):
      """ Parse a hdf5 file to generate a training sample at
            a random timestep.

      Args:
            file_path (str): the path to the hdf5 file

      Returns:
            valid (bool): whether the episode is valid, which is useful for filtering.
                If False, this episode will be dropped.
            dict: a dictionary containing the training sample,
                {
                  "meta": {
                        "dataset_name": str,    # the name of your dataset.
                        "#steps": int,          # the number of steps in the episode,
                                                # also the total timesteps.
                        "instruction": str      # the language instruction for this episode.
                  },
                  "step_id": int,             # the index of the sampled step,
                                                # also the timestep t.
                  "state": ndarray,         # state, (1, STATE_DIM).
                  "state_std": ndarray,       # std(state[:]), (STATE_DIM,).
                  "state_mean": ndarray,      # mean(state[:]), (STATE_DIM,).
                  "state_norm": ndarray,      # norm(state[:]), (STATE_DIM,).
                  "actions": ndarray,         # action, (CHUNK_SIZE, STATE_DIM).
                  "state_indicator", ndarray, # indicates the validness of each dim, (STATE_DIM,).
                  "cam_high": ndarray,      # external camera image, (IMG_HISORY_SIZE, H, W, 3)
                                                # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable.
                  "cam_high_mask": ndarray,   # indicates the validness of each timestep, (IMG_HISORY_SIZE,) boolean array.
                                                # For the first IMAGE_HISTORY_SIZE-1 timesteps, the mask should be False.
                  "cam_left_wrist": ndarray,# left wrist camera image, (IMG_HISORY_SIZE, H, W, 3).
                                                # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable.
                  "cam_left_wrist_mask": ndarray,
                  "cam_right_wrist": ndarray, # right wrist camera image, (IMG_HISORY_SIZE, H, W, 3).
                                                # or (IMG_HISORY_SIZE, 0, 0, 0) if unavailable.
                                                # If only one wrist, make it right wrist, plz.
                  "cam_right_wrist_mask": ndarray
                } or None if the episode is invalid.
      """
      with h5py.File(file_path, "r") as f:
            qpos = f["observations"]["qpos"][:]
            left_arm_dim = f["observations"]["left_arm_dim"][:]
            right_arm_dim = f["observations"]["right_arm_dim"][:]
            num_steps = qpos.shape
            action_dim = qpos
            # We drop too-short episode
            # if num_steps < 128:
            #   return False, None

            # We skip the first few still steps
            EPS = 1e-2
            # Get the idx of the first qpos whose delta exceeds the threshold
            qpos_delta = np.abs(qpos - qpos)
            indices = np.where(np.any(qpos_delta > EPS, axis=1))
            if len(indices) > 0:
                first_idx = indices
            else:
                raise ValueError("Found no qpos that exceeds the threshold.")

            # We randomly sample a timestep
            step_id = np.random.randint(first_idx - 1, num_steps)

            # Load the instruction
            dir_path = os.path.dirname(file_path)

            # with open(os.path.join(dir_path, 'instruction.json'), 'r') as f_instr:
            #   instruction_dict = json.load(f_instr)
            # # We have 1/3 prob to use original instruction,
            # # 1/3 to use simplified instruction,
            # # and 1/3 to use expanded instruction.
            # instruction_type = np.random.choice([
            #   'instruction', 'expanded_instruction'])
            # instruction = instruction_dict
            # if isinstance(instruction, list):
            #    instruction = np.random.choice(instruction)

            # You can also use precomputed language embeddings (recommended)
            # instruction = "path/to/lang_embed.pt"
            instructions_path = os.path.join(dir_path, "instructions")
            instructions_names = []

            for filename in os.listdir(instructions_path):
                # 检查文件名是否以.pt结尾
                if filename.endswith(".pt"):
                  instructions_names.append(os.path.join(instructions_path, filename))
            instruction = np.random.choice(instructions_names)
            # print(f"choose {instruction} file as instruction.")
            # Assemble the meta
            meta = {
                "dataset_name": self.DATASET_NAME,
                "#steps": num_steps,
                "step_id": step_id,
                "instruction": instruction,
            }

            # Rescale gripper to
            # qpos = qpos / np.array([ + 1 + right_arm_dim + 1)]])
            # target_qpos = f["action"] / np.array(
            #   [ + 1 + right_arm_dim + 1)]])

            qpos = qpos / np.array(
            #    []
            []
            )
            target_qpos = f['action'] / np.array(
            #    []
            []
            )

            # Parse the state and action
            state = qpos
            state_std = np.std(qpos, axis=0)
            state_mean = np.mean(qpos, axis=0)
            state_norm = np.sqrt(np.mean(qpos**2, axis=0))
            actions = target_qpos
            if actions.shape < self.CHUNK_SIZE:
                # Pad the actions using the last action
                actions = np.concatenate(
                  [
                        actions,
                        np.tile(actions[-1:], (self.CHUNK_SIZE - actions.shape, 1)),
                  ],
                  axis=0,
                )

            # Fill the state/action into the unified vector

            def fill_in_state(values):
                # Target indices corresponding to your state space
                # In this example: 6 joints + 1 gripper for each arm
                UNI_STATE_INDICES =[
                  STATE_VEC_IDX_MAPPING for i in range(6)
                # ] + [
                  # STATE_VEC_IDX_MAPPING["right_gripper_open"]
                ]
                uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM, ))
                uni_vec[..., UNI_STATE_INDICES] = values
                return uni_vec

            state = fill_in_state(state)
            state_indicator = fill_in_state(np.ones_like(state_std))
            state_std = fill_in_state(state_std)
            state_mean = fill_in_state(state_mean)
            state_norm = fill_in_state(state_norm)
            # If action's format is different from state's,
            # you may implement fill_in_action()
            actions = fill_in_state(actions)

            # Parse the images
            def parse_img(key):
                imgs = []
                for i in range(max(step_id - self.IMG_HISORY_SIZE + 1, 0), step_id + 1):
                  img_bits = f["observations"]["images"]
                  img = cv2.imdecode(np.frombuffer(img_bits, np.uint8), cv2.IMREAD_COLOR)
                  imgs.append(img)
                imgs = np.stack(imgs)
                if imgs.shape < self.IMG_HISORY_SIZE:
                  # Pad the images using the first image
                  imgs = np.concatenate(
                        [
                            np.tile(
                              imgs[:1],
                              (self.IMG_HISORY_SIZE - imgs.shape, 1, 1, 1),
                            ),
                            imgs,
                        ],
                        axis=0,
                  )
                return imgs

            # `cam_high` is the external camera image
            cam_high = parse_img("cam_high")
            # For step_id = first_idx - 1, the valid_len should be one
            valid_len = min(step_id - (first_idx - 1) + 1, self.IMG_HISORY_SIZE)
            cam_high_mask = np.array( * (self.IMG_HISORY_SIZE - valid_len) + * valid_len)
            # cam_left_wrist = parse_img("cam_left_wrist")
            # cam_left_wrist_mask = cam_high_mask.copy()
            cam_left_wrist = np.zeros((self.IMG_HISORY_SIZE, 0, 0, 0))#parse_img('cam_right_wrist')
            cam_left_wrist_mask = np.array( * self.IMG_HISORY_SIZE)#cam_high_mask.copy()
            cam_right_wrist = parse_img("cam_right_wrist")
            cam_right_wrist_mask = cam_high_mask.copy()# 使用相同的掩码逻辑

            # Return the resulting sample
            # For unavailable images, return zero-shape arrays, i.e., (IMG_HISORY_SIZE, 0, 0, 0)
            # E.g., return np.zeros((self.IMG_HISORY_SIZE, 0, 0, 0)) for the key "cam_left_wrist",
            # if the left-wrist camera is unavailable on your robot
            return True, {
                "meta": meta,
                "state": state,
                "state_std": state_std,
                "state_mean": state_mean,
                "state_norm": state_norm,
                "actions": actions,
                "state_indicator": state_indicator,
                "cam_high": cam_high,
                "cam_high_mask": cam_high_mask,
                "cam_left_wrist": cam_left_wrist,
                "cam_left_wrist_mask": cam_left_wrist_mask,
                "cam_right_wrist": cam_right_wrist,
                "cam_right_wrist_mask": cam_right_wrist_mask,
            }

    def parse_hdf5_file_state_only(self, file_path):
      """ Parse a hdf5 file to generate a state trajectory.

      Args:
            file_path (str): the path to the hdf5 file

      Returns:
            valid (bool): whether the episode is valid, which is useful for filtering.
                If False, this episode will be dropped.
            dict: a dictionary containing the training sample,
                {
                  "state": ndarray,         # state[:], (T, STATE_DIM).
                  "action": ndarray,          # action[:], (T, STATE_DIM).
                } or None if the episode is invalid.
      """
      with h5py.File(file_path, "r") as f:
            qpos = f["observations"]["qpos"][:]
            left_arm_dim = f["observations"]["left_arm_dim"][:]
            right_arm_dim = f["observations"]["right_arm_dim"][:]

            num_steps = qpos.shape
            # We drop too-short episode
            # if num_steps < 128:
            # return False, None

            # We skip the first few still steps
            EPS = 1e-2
            # Get the idx of the first qpos whose delta exceeds the threshold
            qpos_delta = np.abs(qpos - qpos)
            indices = np.where(np.any(qpos_delta > EPS, axis=1))
            if len(indices) > 0:
                first_idx = indices
            else:
                raise ValueError("Found no qpos that exceeds the threshold.")

            # Rescale gripper to
            # qpos = qpos / np.array([ + right_arm_dim + 2)]])
            # target_qpos = f["action"][:] / np.array([ + right_arm_dim + 2)]])
            
            qpos = qpos / np.array(
            #    []
            []
            )
            target_qpos = f['action'] / np.array(
            #    []
            []
            )
            # Parse the state and action
            state = qpos
            action = target_qpos
            
            # Standardize trajectory length to avoid batch size mismatch
            # Use a fixed length (e.g., 128) or pad/truncate to match
            target_length = 128# You can adjust this value
            if state.shape > target_length:
                # Truncate to target length
                state = state[:target_length]
                action = action[:target_length]
            elif state.shape < target_length:
                # Pad with the last state/action
                pad_length = target_length - state.shape
                state = np.concatenate(, (pad_length, 1))], axis=0)
                action = np.concatenate(, (pad_length, 1))], axis=0)

            # Fill the state/action into the unified vector
            def fill_in_state(values):
                # Target indices corresponding to your state space
                # In this example: 6 joints + 1 gripper for each arm
                UNI_STATE_INDICES =[
                  STATE_VEC_IDX_MAPPING for i in range(6)
                # ] + [
                  # STATE_VEC_IDX_MAPPING["right_gripper_open"]
                ]
                uni_vec = np.zeros(values.shape[:-1] + (self.STATE_DIM, ))
                uni_vec[..., UNI_STATE_INDICES] = values
                return uni_vec

            state = fill_in_state(state)
            action = fill_in_state(action)

            # Return the resulting sample
            return True, {"state": state, "action": action}

if __name__ == "__main__":
    ds = HDF5VLADataset()
    for i in range(len(ds)):
      print(f"Processing episode {i}/{len(ds)}...")
      ds.get_item(i)        接着直接开始训练:
# step1:安装torch、torchvision
pip install torch==2.1.0 torchvision==0.16.0--index-url https://download.pytorch.org/whl/cu121
# step2:安装packaging
pip install packaging==24.0
# step3:安装其他依赖
pip install -r requirements.txtRDT170M微调

        要训练170M的版本我们需要修改RDT/configs/base.yaml文件中model类下的rdt参数
$ python3 -c "import torch ; print(torch.__config__.show())"        接着生成训练的参数yml文件,并将pretrained_model_name_or_path指向我们先前下载的170M模型
PyTorch built with:
- GCC 9.3
- C++ Version: 201703
- Intel(R) oneAPI Math Kernel Library Version 2022.2-Product Build 20220804 for Intel(R) 64 architecture applications
- Intel(R) MKL-DNN v3.4.2 (Git Hash 1137e04ec0b5251ca2b4400a4fd3c667ce843d67)
- OpenMP 201511 (a.k.a. OpenMP 4.5)
- LAPACK is enabled (usually provided by MKL)
- NNPACK is enabled
- CPU capability usage: AVX2
- CUDA Runtime 12.1
- NVCC architecture flags: -gencode;arch=compute_50,code=sm_50;-gencode;arch=compute_60,code=sm_60;-gencode;arch=compute_70,code=sm_70;-gencode;arch=compute_75,code=sm_75;-gencode;arch=compute_80,code=sm_80;-gencode;arch=compute_86,code=sm_86;-gencode;arch=compute_90,code=sm_90
- CuDNN 90.1(built against CUDA 12.4)
- Magma 2.6.1
- Build settings: BLAS_INFO=mkl, BUILD_TYPE=Release, CUDA_VERSION=12.1, CUDNN_VERSION=9.1.0, CXX_COMPILER=/opt/rh/devtoolset-9/root/usr/bin/c++, CXX_FLAGS= -D_GLIBCXX_USE_CXX11_ABI=0 -fabi-version=11 -fvisibility-inlines-hidden -DUSE_PTHREADPOOL -DNDEBUG -DUSE_KINETO -DLIBKINETO_NOROCTRACER -DUSE_FBGEMM -DUSE_PYTORCH_QNNPACK -DUSE_XNNPACK -DSYMBOLICATE_MOBILE_DEBUG_HANDLE -O2 -fPIC -Wall -Wextra -Werror=return-type -Werror=non-virtual-dtor -Werror=bool-operation -Wnarrowing -Wno-missing-field-initializers -Wno-type-limits -Wno-array-bounds -Wno-unknown-pragmas -Wno-unused-parameter -Wno-unused-function -Wno-unused-result -Wno-strict-overflow -Wno-strict-aliasing -Wno-stringop-overflow -Wsuggest-override -Wno-psabi -Wno-error=pedantic -Wno-error=old-style-cast -Wno-missing-braces -fdiagnostics-color=always -faligned-new -Wno-unused-but-set-variable -Wno-maybe-uninitialized -fno-math-errno -fno-trapping-math -Werror=format -Wno-stringop-overflow, LAPACK_INFO=mkl, PERF_WITH_AVX=1, PERF_WITH_AVX2=1, PERF_WITH_AVX512=1, TORCH_VERSION=2.4.1, USE_CUDA=ON, USE_CUDNN=ON, USE_CUSPARSELT=1, USE_EXCEPTION_PTR=1, USE_GFLAGS=OFF, USE_GLOG=OFF, USE_GLOO=ON, USE_MKL=ON, USE_MKLDNN=ON, USE_MPI=OFF, USE_NCCL=1, USE_NNPACK=ON, USE_OPENMP=ON, USE_ROCM=OFF, USE_ROCM_KERNEL_ASSERT=OFF,         接着直接开始训练:
pip3 install flash_attn-2.7.2.post1+cu12torch2.1cxx11abiFALSE-cp310-cp310-linux_x86_64.whl        在双卡A100-40GB上BS16即单卡8BS的显存占用及训练速度参考如下,按照RDT论文中的说法关注overall_avg_sample_mse指标,RDT170M和1B的版本在数据量仅有100条的时候均能在7000步左右实现拟合指标下降到0.0001量级


四、实际评测

        在训练完成之后我们便可以开始实机评测啦,由于我们目前需要在Mac上连接LeRobot机械臂进行控制,因此我们在实际使用A100或者是RDKS100部署推理的时候还需要完成两端之间的通信代码,我们在这里就用最简单的Socket来实现ServerClient,还有其他更优的ZMQ等方式就不在这里呈现了,具体实现的代码如下,这份代码需要同时放到本地及推理服务器端进行调用
# 法一: 直接运行我仓库中写好的脚本
cd weights/RDT
bash _download.sh
# 法二: 手动下载
export HF_ENDPOINT=https://hf-mirror.com # 国内镜像,加速下载
huggingface-cli download google/t5-v1_1-xxl --local-dir t5-v1_1-xxl
huggingface-cli download google/siglip-so400m-patch14-384 --local-dir siglip-so400m-patch14-384
huggingface-cli download robotics-diffusion-transformer/rdt-1b --local-dir rdt-1b
huggingface-cli download robotics-diffusion-transformer/rdt-170m --local-dir rdt-170m服务器单卡部署

       接着我们便可以完成我们服务器上的推理代码了,我们参考RDT中的RDT/scripts/agilex_model.py来完成我们的lerobot_rdt_server,我们在这个代码中完成对RoboticDiffusionTransformerModel类的修改同时使用类中的step执行推理,并集成我们的ServerClient来接收本地电脑发来的机械臂observation数据,具体的代码实现如下:
rdt:
# 1B: num_head 32 hidden_size 2048 depth: 28
# 170M: num_head 32 hidden_size 1024 depth: 14
hidden_size: 2048
depth: 28
num_heads: 32
cond_pos_embed_type: multimodal         新版的LeRobot使用函数来替换了老的observation获取函数,同时其中的数据结构也进行了更改,因此我们在record的基础上直接复制一份使用最小的状态获取及机械臂控制实例完成本地的数据传输及通信控制代码,具体代码如下,文件位置为lerobot/src/lerobot/record_rdt.py
bash generate.sh RDT1B_LeRobot
# Generated on 2025-08-28 17:14:20
model: RDT1B_LeRobot
data_path: training_data/RDT1B_LeRobot
checkpoint_path: checkpoints/RDT1B_LeRobot
pretrained_model_name_or_path: ../weights/RDT/rdt-1b
cuda_visible_device: '0,1,2,3'
train_batch_size: 16
sample_batch_size: 32
max_train_steps: 10000
checkpointing_period: 2500
sample_period: 100
checkpoints_total_limit: 40
learning_rate: 0.0001
dataloader_num_workers: 8
state_noise_snr: 40
gradient_accumulation_steps: 1        接着按照你的实际摄像头串口配置修改main()中的对应配置即可,接着在依次运行服务端即本地客户端即可看到机械臂开始运动完成任务:
bash finetune.sh RDT1B_LeRobotRDKS100部署

        目前RDKS100上仅支持RDT170M的部署,接下来我们参考文档:RDT on Double RDK S100P 全流程文档,一步步的完成LeRobot的RDT的上板流程,我们首先使用仓库中RDT目录下的脚本RDT/export_all.py来完成RDT中所有ONNX模型的导出,这段脚本有如下的参数可以进行配置,大家仅需按照自己的实际情况配置即可
rdt:
# 1B: num_head 32 hidden_size 2048 depth: 28
# 170M: num_head 32 hidden_size 1024 depth: 14
hidden_size: 1024
depth: 14
num_heads: 32
cond_pos_embed_type: multimodal         接下来我们执行这段脚本之后在RDT的目录下会生成脚本导出的ONNX模型以及dump出的校准数据和文件
bash generate.sh RDT170M_LeRobot
# Generated on 2025-08-28 17:14:20
model: RDT170M_LeRobot
data_path: training_data/RDT170M_LeRobot
checkpoint_path: checkpoints/RDT170M_LeRobot
pretrained_model_name_or_path: ../weights/RDT/rdt-170m
cuda_visible_device: '0,1'
train_batch_size: 16
sample_batch_size: 32
max_train_steps: 10000
checkpointing_period: 2500
sample_period: 100
checkpoints_total_limit: 40
learning_rate: 0.0001
dataloader_num_workers: 8
state_noise_snr: 40
gradient_accumulation_steps: 1        接着我们使用RDKS100算法工具链标准交付的docker环境,来完成BPU模型的量化和编译,docker的安装挂载和使用命令参考以下:
bash finetune.sh RDT170M_LeRobot        在Docker内输入bash build_all.sh后便会自动开启编译并导出可以在RDKS100板端运行的HBM模型

        以下是DiT、img_adaptor、lang_adaptor以及state_adaptor模型量化过程中的部分指标参考:
import socket
import numpy as np
import zlib
import json
import base64
import time
from typing import Any
import torch

class NumpyEncoder(json.JSONEncoder):
    """Enhanced json encoder for numpy types and PyTorch tensors with array reconstruction info"""
    def default(self, obj):
      if isinstance(obj, np.ndarray):
            return {
                '__numpy_array__': True,
                'data': base64.b64encode(obj.tobytes()).decode('ascii'),
                'dtype': str(obj.dtype),
                'shape': obj.shape
            }
      elif torch is not None and isinstance(obj, torch.Tensor):
            # 将 PyTorch Tensor 转换为 numpy 数组
            numpy_array = obj.cpu().detach().numpy()
            return {
                '__numpy_array__': True,
                'data': base64.b64encode(numpy_array.tobytes()).decode('ascii'),
                'dtype': str(numpy_array.dtype),
                'shape': numpy_array.shape
            }
      elif isinstance(obj, (np.integer, np.floating, np.bool_)):
            return obj.item()
      return super().default(obj)

def numpy_to_json(data: Any) -> str:
    return json.dumps(data, cls=NumpyEncoder)

def json_to_numpy(json_str: str) -> Any:
    def hook(dct):
      if '__numpy_array__' in dct:
            data = base64.b64decode(dct['data'])
            return np.frombuffer(data, dtype=dct['dtype']).reshape(dct['shape'])
      return dct
    return json.loads(json_str, object_hook=hook)

class CommonUtils:
    @staticmethod
    def serialize(data: Any) -> bytes:
      return zlib.compress(numpy_to_json(data).encode('utf-8'))

    @staticmethod
    def deserialize(data: bytes) -> Any:
      return json_to_numpy(zlib.decompress(data).decode('utf-8'))

def send_all(sock, payload):
    sock.sendall(len(payload).to_bytes(8, 'big') + payload)

def recv_all(sock) :
    length_bytes = sock.recv(8)
    if not length_bytes:
      return None
    length = int.from_bytes(length_bytes, 'big')
    buf = b''
    while len(buf) < length:
      chunk = sock.recv(length - len(buf))
      if not chunk:
            return None
      buf += chunk
    return buf

class ServerClient:
    def __init__(self, host='localhost', port=5000, is_server=True):
      self.host, self.port, self.is_server = host, port, is_server
      self.utils = CommonUtils()
      self._connect()

    def _connect(self):
      if self.is_server:
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.bind((self.host, self.port))
            self.sock.listen(1)
            print(f" Listening on {self.host}:{self.port}")
            self.conn, addr = self.sock.accept()
            print(f" Connected by {addr}")
      else:
            while True:
                try:
                  self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                  self.sock.connect((self.host, self.port))
                  self.conn = self.sock
                  print(f" Connected to {self.host}:{self.port}")
                  break
                except (ConnectionRefusedError, OSError):
                  print(" Waiting for server...")
                  time.sleep(2)

    def send(self, data):
      payload = self.utils.serialize(data)
      try:
            send_all(self.conn, payload)
      except (BrokenPipeError, ConnectionResetError, OSError):
            print(" Connection lost. Reconnecting...")
            self._connect()
            send_all(self.conn, payload)

    def receive(self):
      try:
            buf = recv_all(self.conn)
            return self.utils.deserialize(buf) if buf else None
      except (BrokenPipeError, ConnectionResetError, OSError):
            print(" Connection lost. Reconnecting...")
            self._connect()
            return None

    def close(self):
      self.conn.close()
      self.sock.close()

class Client:
    def __init__(self, host='127.0.0.1', port=5000):
      self.host, self.port = host, port
      self.utils = CommonUtils()
      self.connect()

    def connect(self):
      while True:
            try:
                self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self.sock.connect((self.host, self.port))
                print(f" Connected to {self.host}:{self.port}")
                break
            except (ConnectionRefusedError, OSError):
                print(" Waiting for server...")
                time.sleep(2)

    def send(self, data):
      payload = self.utils.serialize(data)
      try:
            send_all(self.sock, payload)
      except (BrokenPipeError, ConnectionResetError, OSError):
            print(" Connection lost. Reconnecting...")
            self.connect()
            send_all(self.sock, payload)

    def receive(self):
      try:
            buf = recv_all(self.sock)
            return self.utils.deserialize(buf) if buf else None
      except (BrokenPipeError, ConnectionResetError, OSError):
            print(" Connection lost. Reconnecting...")
            self.connect()
            return None

    def close(self):
      self.sock.close()
      print(" Closed.")

class Server:
    def __init__(self, host='0.0.0.0', port=5000):
      self.host, self.port = host, port
      self.utils = CommonUtils()
      self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      self.sock.bind((self.host, self.port))
      self.sock.listen(1)
      print(f" Listening on {self.host}:{self.port}")
      self._wait_client()

    def _wait_client(self):
      print(" Waiting for client...")
      self.conn, addr = self.sock.accept()
      print(f" Connected by {addr}")

    def send(self, data: Any):
      payload = self.utils.serialize(data)
      try:
            send_all(self.conn, payload)
      except (BrokenPipeError, ConnectionResetError, OSError):
            print(" Client disconnected. Waiting new client...")
            self._wait_client()
            send_all(self.conn, payload)

    def receive(self):
      try:
            buf = recv_all(self.conn)
            return self.utils.deserialize(buf) if buf else None
      except (BrokenPipeError, ConnectionResetError, OSError):
            print(" Client disconnected. Waiting new client...")
            self._wait_client()
            return None

    def close(self):
      self.conn.close()
      self.sock.close()
      print(" Closed.")        接着我们运行以下命令下载已经量化编译好的SigLip到编译产物结果文件夹BPU_RDT_Policy中并将这个文件夹和test_data文件夹以及LeRobot-VLA仓库中RDKS_ModelRun/RDT路径下的所有文件复制到我们的RDKS100板端如图所示:
import os, sys

import numpy as np
import torch
from PIL import Image
from torchvision import transforms
import yaml

from pathlib import Path

# get current workspace
current_file = Path(__file__)
sys.path.append(os.path.join(current_file.parent.parent, "models"))
sys.path.append(os.path.join(current_file.parent.parent, "models"))
sys.path.append(os.path.join(current_file.parent.parent))
from configs.state_vec import STATE_VEC_IDX_MAPPING
from multimodal_encoder.siglip_encoder import SiglipVisionTower
from multimodal_encoder.t5_encoder import T5Embedder
from rdt_runner import RDTRunner
from server_client import ServerClient

# The indices that the raw vector should be mapped to in the unified action vector
AGILEX_STATE_INDICES = [
    STATE_VEC_IDX_MAPPING for i in range(6)
]

# Create the RDT model
def create_model(args, **kwargs):
    model = RoboticDiffusionTransformerModel(args, **kwargs)
    pretrained = kwargs.get("pretrained", None)
    if pretrained is not None and os.path.isfile(pretrained):
      model.load_pretrained_weights(pretrained)

    return model

class RoboticDiffusionTransformerModel(object):
    """A wrapper for the RDT model, which handles
    1. Model initialization
    2. Encodings of instructions
    3. Model inference
    """

    def __init__(
      self,
      args,
      device="cuda",
      dtype=torch.bfloat16,
      image_size=None,
      control_frequency=25,
      pretrained=None,
      pretrained_vision_encoder_name_or_path=None,
    ):
      self.args = args
      self.dtype = dtype
      self.image_size = image_size
      self.device = device
      self.control_frequency = control_frequency
      # We do not use the text encoder due to limited GPU memory
      # self.text_tokenizer, self.text_model = self.get_text_encoder(pretrained_text_encoder_name_or_path)
      self.image_processor, self.vision_model = self.get_vision_encoder(pretrained_vision_encoder_name_or_path)
      self.policy = self.get_policy(pretrained)

      self.reset()

    def get_policy(self, pretrained):
      """Initialize the model."""
      # Initialize model with arguments
      if pretrained is None or os.path.isfile(pretrained):
            img_cond_len = (self.args["common"]["img_history_size"] * self.args["common"]["num_cameras"] *
                            self.vision_model.num_patches)

            _model = RDTRunner(
                action_dim=self.args["common"]["state_dim"],
                pred_horizon=self.args["common"]["action_chunk_size"],
                config=self.args["model"],
                lang_token_dim=self.args["model"]["lang_token_dim"],
                img_token_dim=self.args["model"]["img_token_dim"],
                state_token_dim=self.args["model"]["state_token_dim"],
                max_lang_cond_len=self.args["dataset"]["tokenizer_max_length"],
                img_cond_len=img_cond_len,
                img_pos_embed_config=[
                  # No initial pos embed in the last grid size
                  # since we've already done in ViT
                  (
                        "image",
                        (
                            self.args["common"]["img_history_size"],
                            self.args["common"]["num_cameras"],
                            -self.vision_model.num_patches,
                        ),
                  ),
                ],
                lang_pos_embed_config=[
                  # Similarly, no initial pos embed for language
                  ("lang", -self.args["dataset"]["tokenizer_max_length"]),
                ],
                dtype=self.dtype,
            )
      else:
            _model = RDTRunner.from_pretrained(pretrained)

      return _model

    def get_text_encoder(self, pretrained_text_encoder_name_or_path):
      text_embedder = T5Embedder(
            from_pretrained=pretrained_text_encoder_name_or_path,
            model_max_length=self.args["dataset"]["tokenizer_max_length"],
            device=self.device,
      )
      tokenizer, text_encoder = text_embedder.tokenizer, text_embedder.model
      return tokenizer, text_encoder

    def get_vision_encoder(self, pretrained_vision_encoder_name_or_path):
      vision_encoder = SiglipVisionTower(vision_tower=pretrained_vision_encoder_name_or_path, args=None)
      image_processor = vision_encoder.image_processor
      return image_processor, vision_encoder

    def reset(self):
      """Set model to evaluation mode."""
      device = self.device
      weight_dtype = self.dtype
      self.policy.eval()
      # self.text_model.eval()
      self.vision_model.eval()

      self.policy = self.policy.to(device, dtype=weight_dtype)
      # self.text_model = self.text_model.to(device, dtype=weight_dtype)
      self.vision_model = self.vision_model.to(device, dtype=weight_dtype)

    def load_pretrained_weights(self, pretrained=None):
      if pretrained is None:
            return
      print(f"Loading weights from {pretrained}")
      filename = os.path.basename(pretrained)
      if filename.endswith(".pt"):
            checkpoint = torch.load(pretrained)
            self.policy.load_state_dict(checkpoint["module"])
      elif filename.endswith(".safetensors"):
            from safetensors.torch import load_model

            load_model(self.policy, pretrained)
      else:
            raise NotImplementedError(f"Unknown checkpoint format: {pretrained}")

    def encode_instruction(self, instruction, device="cuda"):
      """Encode string instruction to latent embeddings.

      Args:
            instruction: a string of instruction
            device: a string of device

      Returns:
            pred: a tensor of latent embeddings of shape (text_max_length, 512)
      """
      tokens = self.text_tokenizer(instruction, return_tensors="pt", padding="longest",
                                     truncation=True)["input_ids"].to(device)

      tokens = tokens.view(1, -1)
      with torch.no_grad():
            pred = self.text_model(tokens).last_hidden_state.detach()

      return pred

    def _format_joint_to_state(self, joints):
      """
      Format the joint proprioception into the unified action vector.

      Args:
            joints (torch.Tensor): The joint proprioception to be formatted.
                qpos ().

      Returns:
            state (torch.Tensor): The formatted vector for RDT ().
      """
      # Rescale the gripper to the range of
      joints = joints / torch.tensor(
            [[]],
            device=joints.device,
            dtype=joints.dtype,
      )

      B, N, _ = joints.shape
      state = torch.zeros(
            (B, N, self.args["model"]["state_token_dim"]),
            device=joints.device,
            dtype=joints.dtype,
      )
      # Fill into the unified state vector
      state[:, :, AGILEX_STATE_INDICES] = joints
      # Assemble the mask indicating each dimension's availability
      state_elem_mask = torch.zeros(
            (B, self.args["model"]["state_token_dim"]),
            device=joints.device,
            dtype=joints.dtype,
      )
      state_elem_mask[:, AGILEX_STATE_INDICES] = 1
      return state, state_elem_mask

    def _unformat_action_to_joint(self, action):
      """
      Unformat the unified action vector into the joint action to be executed.

      Args:
            action (torch.Tensor): The unified action vector to be unformatted.
                ()

      Returns:
            joints (torch.Tensor): The unformatted robot joint action.
                qpos ().
      """
      action_indices = AGILEX_STATE_INDICES
      joints = action[:, :, action_indices]

      # Rescale the gripper back to the action range
      # Note that the action range and proprioception range are different
      # for Mobile ALOHA robot
      joints = joints * torch.tensor(
            [[]],
            device=joints.device,
            dtype=joints.dtype,
      )

      return joints

    @torch.no_grad()
    def step(self, proprio, images, text_embeds):
      """
      Predict the next action chunk given the
      proprioceptive states, images, and instruction embeddings.

      Args:
            proprio: proprioceptive states
            images: RGB images, the order should be
                [ext_{t-1}, right_wrist_{t-1}, left_wrist_{t-1},
                ext_{t}, right_wrist_{t}, left_wrist_{t}]
            text_embeds: instruction embeddings

      Returns:
            action: predicted action
      """
      device = self.device
      dtype = self.dtype

      # The background image used for padding
      background_color = np.array(,
                                    dtype=np.uint8).reshape(1, 1, 3)
      background_image = (np.ones(
            (
                self.image_processor.size["height"],
                self.image_processor.size["width"],
                3,
            ),
            dtype=np.uint8,
      ) * background_color)

      # Preprocess the images by order and encode them
      image_tensor_list = []
      for image in images:
            if image is None:
                # Replace it with the background image
                image = Image.fromarray(background_image)
            else:
                # Convert numpy array to PIL Image if needed
                if isinstance(image, np.ndarray):
                  image = Image.fromarray(image)

            if self.image_size is not None:
                image = transforms.Resize(self.image_size)(image)

            if self.args["dataset"].get("auto_adjust_image_brightness", False):
                pixel_values = list(image.getdata())
                average_brightness = sum(sum(pixel) for pixel in pixel_values) / (len(pixel_values) * 255.0 * 3)
                if average_brightness <= 0.15:
                  image = transforms.ColorJitter(brightness=(1.75, 1.75))(image)

            if self.args["dataset"].get("image_aspect_ratio", "pad") == "pad":

                def expand2square(pil_img, background_color):
                  width, height = pil_img.size
                  if width == height:
                        return pil_img
                  elif width > height:
                        result = Image.new(pil_img.mode, (width, width), background_color)
                        result.paste(pil_img, (0, (width - height) // 2))
                        return result
                  else:
                        result = Image.new(pil_img.mode, (height, height), background_color)
                        result.paste(pil_img, ((height - width) // 2, 0))
                        return result

                image = expand2square(image, tuple(int(x * 255) for x in self.image_processor.image_mean))
            image = self.image_processor.preprocess(image, return_tensors="pt")["pixel_values"]
            image_tensor_list.append(image)

      image_tensor = torch.stack(image_tensor_list, dim=0).to(device, dtype=dtype)

      image_embeds = self.vision_model(image_tensor).detach()
      image_embeds = image_embeds.reshape(-1, self.vision_model.hidden_size).unsqueeze(0)

      # Prepare the proprioception states and the control frequency
      # Convert numpy array to tensor if needed
      if isinstance(proprio, np.ndarray):
            # Copy the array to make it writable
            proprio = torch.from_numpy(proprio.copy())
      
      joints = proprio.to(device).unsqueeze(0)# (1, 1, 14)
      states, state_elem_mask = self._format_joint_to_state(joints)# (1, 1, 128), (1, 128)
      states, state_elem_mask = states.to(device, dtype=dtype), state_elem_mask.to(device, dtype=dtype)
      states = states[:, -1:, :]# (1, 1, 128)
      ctrl_freqs = torch.tensor().to(device)

      text_embeds = text_embeds.to(device, dtype=dtype)

      # Predict the next action chunk given the inputs
      trajectory = self.policy.predict_action(
            lang_tokens=text_embeds,
            lang_attn_mask=torch.ones(text_embeds.shape[:2], dtype=torch.bool, device=text_embeds.device),
            img_tokens=image_embeds,
            state_tokens=states,
            action_mask=state_elem_mask.unsqueeze(1),
            ctrl_freqs=ctrl_freqs,
      )
      trajectory = self._unformat_action_to_joint(trajectory).to(torch.float32)

      return trajectory

class LERobotRDTServer:
    def __init__(self, pretrained_vision_encoder_name_or_path, pretrained, args, lang_model):
      self.policy = create_model(
            args=args,
            dtype=torch.bfloat16,
            pretrained=pretrained,
            pretrained_vision_encoder_name_or_path=pretrained_vision_encoder_name_or_path,
            control_frequency=30,
      )
      self.server = ServerClient(host="0.0.0.0", port=5002, is_server=True)
      
      # Load and debug language embeddings
      self.lang_embeddings = torch.load(lang_model)
      print(f"Loaded language embeddings shape: {self.lang_embeddings.shape}")
      print(f"Model expects tokenizer_max_length: {self.policy.args['dataset']['tokenizer_max_length']}")
      print(f"Model lang_token_dim: {self.policy.args['model']['lang_token_dim']}")
      
      # Check if dimensions match
      expected_seq_len = self.policy.args["dataset"]["tokenizer_max_length"]
      expected_hidden_dim = self.policy.args["model"]["lang_token_dim"]
      
      # Handle different embedding formats
      if len(self.lang_embeddings.shape) == 2:
            # Format:
            actual_seq_len, actual_hidden_dim = self.lang_embeddings.shape
            if actual_seq_len != expected_seq_len:
                print(f"WARNING: Sequence length mismatch! Expected {expected_seq_len}, got {actual_seq_len}")
            if actual_hidden_dim != expected_hidden_dim:
                print(f"WARNING: Hidden dimension mismatch! Expected {expected_hidden_dim}, got {actual_hidden_dim}")
      elif len(self.lang_embeddings.shape) == 3:
            # Format:
            actual_batch, actual_seq_len, actual_hidden_dim = self.lang_embeddings.shape
            if actual_seq_len != expected_seq_len:
                print(f"WARNING: Sequence length mismatch! Expected {expected_seq_len}, got {actual_seq_len}")
            if actual_hidden_dim != expected_hidden_dim:
                print(f"WARNING: Hidden dimension mismatch! Expected {expected_hidden_dim}, got {actual_hidden_dim}")
      else:
            print(f"WARNING: Unexpected embedding shape: {self.lang_embeddings.shape}")
      
    def run(self):
      print("LERobot RDT Server started, waiting for messages...")
      try:
            while True:
                print("Waiting for RDT data...")
                rdt_data = self.server.receive()
                print(f"Received RDT data, message_id: {rdt_data['message_id']}")
               
                # Perform inference
                # Ensure language embeddings have correct shape
                if len(self.lang_embeddings.shape) == 2:
                  # ->
                  text_embeds = self.lang_embeddings.unsqueeze(0)
                else:
                  # Already
                  text_embeds = self.lang_embeddings
               
                action = self.policy.step(
                  proprio=rdt_data["proprio"],
                  images=rdt_data["images"],
                  text_embeds=text_embeds,
                )
               
                # Prepare response - use 'actions' key to match client expectation
                message_id = rdt_data["message_id"]
                action_data = {
                  "message_id": message_id,
                  "actions": action,# Changed from 'action' to 'actions'
                }
               
                # Send response
                print(f"send action data, action_data: {action_data}")
                self.server.send(action_data)
                print(f"Sent action data for message_id: {message_id}")
               
      except KeyboardInterrupt:
            print("\nServer stopped by user")
            self.server.close()
      except Exception as e:
            print(f"Error in server loop: {e}")
            self.server.close()
            raise

if __name__ == "__main__":
    path_to_rdt_model_wights = "/home/qi.xiong/DualArm/RoboTwin/policy/RDT/checkpoints/RDT_LeRobot/checkpoint-7500/pytorch_model/mp_rank_00_model_states.pt"
    path_to_vision_encoder_model = "/home/qi.xiong/DualArm/RoboTwin/policy/weights/RDT/siglip-so400m-patch14-384"
    lang_model = "/home/qi.xiong/DualArm/RoboTwin/policy/RDT/scripts/lerobot_rdt_data/greenmarker_scene1/episode_4/instructions/lang_embed_0.pt"
    with open("/home/qi.xiong/DualArm/RoboTwin/policy/RDT/configs/base.yaml", "r") as fp:
      config = yaml.safe_load(fp)
    rdt_server = LERobotRDTServer(path_to_vision_encoder_model, path_to_rdt_model_wights, config, lang_model)
    rdt_server.run()
        接着我们直接运行以下命令即可首先使用校准测试数据验证模型推理是否成功,若无报错即证明模型可正常推理无异常,若运行过程中缺少某个依赖,直接安装即可:
import logging
import time
from dataclasses import asdict, dataclass
from pathlib import Path
from pprint import pformat
from collections import deque
import torch
from PIL import Image
import numpy as np
from lerobot.cameras import (# noqa: F401
    CameraConfig,# noqa: F401
)
from lerobot.cameras.opencv.configuration_opencv import OpenCVCameraConfig# noqa: F401
from lerobot.configs import parser
from lerobot.configs.policies import PreTrainedConfig
from lerobot.datasets.image_writer import safe_stop_image_writer
from lerobot.datasets.lerobot_dataset import LeRobotDataset
from lerobot.datasets.utils import build_dataset_frame, hw_to_dataset_features
from lerobot.datasets.video_utils import VideoEncodingManager
from lerobot.policies.factory import make_policy
from lerobot.policies.pretrained import PreTrainedPolicy
from lerobot.robots import (# noqa: F401
    Robot,
    RobotConfig,
    bi_so100_follower,
    hope_jr,
    koch_follower,
    make_robot_from_config,
    so100_follower,
    so101_follower,
)
from lerobot.robots.so101_follower.so101_follower import SO101Follower
from lerobot.robots.so101_follower.config_so101_follower import SO101FollowerConfig
from lerobot.teleoperators import (# noqa: F401
    Teleoperator,
    TeleoperatorConfig,
    bi_so100_leader,
    homunculus,
    koch_leader,
    make_teleoperator_from_config,
    so100_leader,
    so101_leader,
)
from lerobot.teleoperators.keyboard.teleop_keyboard import KeyboardTeleop
from lerobot.utils.control_utils import (
    init_keyboard_listener,
    is_headless,
    predict_action,
    sanity_check_dataset_name,
    sanity_check_dataset_robot_compatibility,
)
from lerobot.utils.robot_utils import busy_wait
from lerobot.utils.visualization_utils import _init_rerun, log_rerun_data
from server_client import *

debug_save_img = True

_action_queue = deque([], maxlen=64)
_message_id = 0
_last_cam_high = None
_last_cam_right_wrist = None

@safe_stop_image_writer
def record_loop(
    robot: Robot,
    client: Client,
):
    global _action_queue, _message_id, _last_cam_high, _last_cam_right_wrist

    observation = robot.get_observation()

    cam_high = observation['high']
    cam_right_wrist = observation['arm']
    image_arrs = [
      _last_cam_high,
      _last_cam_right_wrist,
      None,
      cam_high,
      cam_right_wrist,
      None
    ]

    images = [arr if arr is not None else None
            for arr in image_arrs]
    joint_positions = for key in observation.keys() if key.endswith('.pos')]
    proprio = torch.tensor(joint_positions, dtype=torch.float32).unsqueeze(0)

###################Debug图像########################
    if debug_save_img:
      imgs_to_show =
      if all(img is not None for img in imgs_to_show):
            pil_imgs = []
            for img in imgs_to_show:
                if img.dtype != np.uint8:
                  img = np.clip(img, 0, 1)
                  img = (img * 255).astype(np.uint8)
                if img.ndim == 2:
                  img = np.stack(*3, axis=-1)
                elif img.shape[-1] == 1:
                  img = np.repeat(img, 3, axis=-1)
                pil_imgs.append(Image.fromarray(img))
            w, h = pil_imgs.size
            for i in range(4):
                if pil_imgs.size != (w, h):
                  pil_imgs = pil_imgs.resize((w, h))
            new_img = Image.new('RGB', (w*2, h*2))
            new_img.paste(pil_imgs, (0, 0))       # 左上:新high
            new_img.paste(pil_imgs, (w, 0))       # 右上:新wrist
            new_img.paste(pil_imgs, (0, h))       # 左下:老high
            new_img.paste(pil_imgs, (w, h))       # 右下:老wrist

            debug_save_path = "debug_2x2.png"
            new_img.save(debug_save_path)
            print(f"Have been saved at: {debug_save_path}")
            # new_img.show()
###################Debug图像########################
    rdt_data = {
      'message_id': _message_id,
      'proprio': proprio,
      'images': images,
      'text_embeds': ""
    }
    client.send(rdt_data)
    _message_id += 1
    print(f"send new rdt data done, message_id: {_message_id-1}")
    action_data = client.receive()
    if action_data is None:
      print("ERROR: Server returned None. Is the RDT server running?")
      print("Please start the RDT server first!")
      raise ConnectionError("Failed to receive response from RDT server")
    actions = action_data['actions']
    action_message_id = action_data["message_id"]
    print(f"receive actions done, message_id: {action_message_id}")
    # print(f"receive actions contents: {actions}")
    actions_array = np.array(actions)
    if len(actions_array.shape) == 3:
      action_sequence = actions_array# 取第一个batch的所有时间步
    else:
      print(f"action shape should be 3 dim, but get {actions_array.shape} ")
   
    joint_names = ["shoulder_pan", "shoulder_lift", "elbow_flex", "wrist_flex", "wrist_roll", "gripper"]
    for step_idx in range(0, len(action_sequence), 4):# 64个动作隔4个执行一次动作
      action_values = action_sequence
      action_dict = {f"{joint}.pos": float(action_values) for i, joint in enumerate(joint_names)}
      sent_action = robot.send_action(action_dict)
      time.sleep(0.1)

    _last_cam_high = cam_high
    _last_cam_right_wrist = cam_right_wrist

def main():
    robot = SO101Follower(SO101FollowerConfig(
      port="/dev/tty.usbmodem5AB90671801",
      id="my_awesome_follower_arm",
      cameras={
            "arm": OpenCVCameraConfig(index_or_path=0, width=1920, height=1080, fps=30),
            "high": OpenCVCameraConfig(index_or_path=2, width=1920, height=1080, fps=30)
      }
    ))
   
    robot.connect()
    client = Client(host="localhost", port=5002)

    try:
      while True:
            record_loop(
                robot,
                client
            )
            time.sleep(0.1)
    except KeyboardInterrupt:
      pass

    robot.disconnect()

if __name__ == "__main__":
    main()        测试正常后我们便可以仿照上面服务器推理一样先启动板端推理代码,将其作为一个板端推理Server,接着运行我们本地的机械臂控制代码即可:
# 服务端
python3 RDT/scripts/lerobot_rdt_server.py
# 客户端
python3 lerobot/src/lerobot/record_rdt.py        RDKS100板端性能占用参考如下(多多支持我的Dtop:Jetson有Jtop,Linux有Htop,RDK也有Dtop! - SkyXZ - 博客园哈哈哈哈哈哈哈哈哈

        RDKS100可直接连接LeRobot控制机械臂,但由于写文档的时候手上没有多余的摄像头,头部的摄像头只能使用Apple的连续互通实现,若使用RDK直接连接LeRobot的话将无法当问头部摄像头,因此只能绕个弯用本地Mac来控制机械臂了...

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

煅汾付 发表于 2025-10-15 00:42:06

谢谢分享,辛苦了

忌才砟 发表于 2025-10-28 04:27:00

感谢分享

乳杂丫 发表于 2025-12-6 03:16:37

感谢分享,下载保存了,貌似很强大

纪睐讦 发表于 2025-12-16 04:06:49

热心回复!

各卧唯 发表于 2025-12-26 02:36:42

分享、互助 让互联网精神温暖你我

喳谍 发表于 2026-1-4 04:45:36

用心讨论,共获提升!

龙玮奇 发表于 2026-1-15 23:36:25

新版吗?好像是停更了吧。

湛恶 发表于 2026-1-19 02:49:45

用心讨论,共获提升!

诘琅 发表于 2026-1-23 08:04:30

鼓励转贴优秀软件安全工具和文档!

崔瑜然 发表于 2026-1-23 09:37:18

这个好,看起来很实用

酒跚骼 发表于 2026-1-25 12:27:25

热心回复!

岭猿 发表于 2026-1-26 06:06:29

新版吗?好像是停更了吧。

沃盼盼 发表于 2026-1-26 18:17:22

感谢分享,下载保存了,貌似很强大

左丘平莹 发表于 2026-1-30 03:54:51

不错,里面软件多更新就更好了

胁冉右 发表于 2026-2-7 09:28:49

收藏一下   不知道什么时候能用到

赖秀竹 发表于 2026-2-8 10:54:45

谢谢楼主提供!

丁若云 发表于 2026-2-8 12:51:02

谢谢分享,辛苦了

挽幽 发表于 2026-2-8 16:13:17

感谢分享

袁勤 发表于 2026-2-9 01:14:02

yyds。多谢分享
页: [1] 2
查看完整版本: 再不学就晚了!RDT × LeRobot与RDKS100部署详解