找回密码
 立即注册
首页 业界区 业界 Cython与CUDA之BatchGather

Cython与CUDA之BatchGather

稞冀 2025-6-4 22:24:52
技术背景

在前面一篇文章中,我们介绍过Cython+CUDA框架下实现一个简单的Gather算子的方法。这里演示Gather算子的升级版本实现——BatchGather算子。不过这里只是加了一个Batch维度,并没有添加其他的维度,例如Dimension维度,在这里暂不考虑。
CUDA头文件

这里我们保留了原本的Gather部分,只添加一个BatchGather的运算,以下为cuda_index.cuh的内容:
  1. #include <stdio.h>
  2. extern "C" float Gather(float *source, int *index, float *res, int N, int M);
  3. extern "C" float BatchGather(float *source, int *index, float *res, int N, int M, int B);
复制代码
BatchGather只是在Gather的基础上加了一个B的维度。除了CUDA算子本身的头文件之外,这里我们还使用到了异常捕获头文件error.cuh:
  1. #pragma once
  2. #include <stdio.h>
  3. #define CHECK(call) do{const cudaError_t error_code = call; if (error_code != cudaSuccess){printf("CUDA Error:\n"); printf("    File:   %s\n", __FILE__); printf("    Line:   %d\n", __LINE__); printf("    Error code: %d\n", error_code); printf("    Error text: %s\n", cudaGetErrorString(error_code)); exit(1);}} while (0)
复制代码
其中的宏可用于检测CUDA函数所抛出的异常。另外还有一个用于统计CUDA函数运行时长的头文件:
  1. #pragma once
  2. #include <stdio.h>
  3. #include <cuda_runtime.h>
  4. // 宏定义,用于测量CUDA函数的执行时间
  5. #define TIME_CUDA_FUNCTION(func) \
  6.     do { \
  7.         cudaEvent_t start, stop; \
  8.         float elapsedTime; \
  9.         cudaEventCreate(&start); \
  10.         cudaEventCreate(&stop); \
  11.         cudaEventRecord(start, NULL); \
  12.         \
  13.         func; \
  14.         \
  15.         cudaEventRecord(stop, NULL); \
  16.         cudaEventSynchronize(stop); \
  17.         cudaEventElapsedTime(&elapsedTime, start, stop); \
  18.         printf("Time taken by function %s is: %f ms\n", #func, elapsedTime); \
  19.         \
  20.         cudaEventDestroy(start); \
  21.         cudaEventDestroy(stop); \
  22.     } while (0)
  23. // 宏定义,用于测量CUDA函数的执行时间并返回该时间
  24. #define GET_CUDA_TIME(func) \
  25.     ({ \
  26.         cudaEvent_t start, stop; \
  27.         float elapsedTime = 0.0f; \
  28.         cudaEventCreate(&start); \
  29.         cudaEventCreate(&stop); \
  30.         cudaEventRecord(start, NULL); \
  31.         \
  32.         func; \
  33.         \
  34.         cudaEventRecord(stop, NULL); \
  35.         cudaEventSynchronize(stop); \
  36.         cudaEventElapsedTime(&elapsedTime, start, stop); \
  37.         \
  38.         cudaEventDestroy(start); \
  39.         cudaEventDestroy(stop); \
  40.         \
  41.         elapsedTime; \
  42.     })
复制代码
可选择直接打印时长,也可以选择返回时长的float值。
CUDA文件

接下来就是正式的CUDA函数内容cuda_index.cu:
  1. // nvcc -shared ./cuda_index.cu -Xcompiler -fPIC -o ./libcuindex.so
  2. #include <stdio.h>
  3. #include "cuda_index.cuh"
  4. #include "error.cuh"
  5. #include "record.cuh"
  6. __global__ void GatherKernel(float *source, int *index, float *res, int N){
  7.     int idx = blockIdx.x * blockDim.x + threadIdx.x;
  8.     if (idx < N){
  9.         res[idx] = source[index[idx]];
  10.     }
  11. }
  12. extern "C" float Gather(float *source, int *index, float *res, int N, int M){
  13.     float *souce_device, *res_device;
  14.     int *index_device;
  15.     CHECK(cudaMalloc((void **)&souce_device, M * sizeof(float)));
  16.     CHECK(cudaMalloc((void **)&res_device, N * sizeof(float)));
  17.     CHECK(cudaMalloc((void **)&index_device, N * sizeof(int)));
  18.     CHECK(cudaMemcpy(souce_device, source, M * sizeof(float), cudaMemcpyHostToDevice));
  19.     CHECK(cudaMemcpy(res_device, res, N * sizeof(float), cudaMemcpyHostToDevice));
  20.     CHECK(cudaMemcpy(index_device, index, N * sizeof(int), cudaMemcpyHostToDevice));
  21.     int block_size = 1024;
  22.     int grid_size = (N + block_size - 1) / block_size;
  23.     float timeTaken = GET_CUDA_TIME((GatherKernel<<<grid_size, block_size>>>(souce_device, index_device, res_device, N)));
  24.     CHECK(cudaGetLastError());
  25.     CHECK(cudaDeviceSynchronize());
  26.     CHECK(cudaMemcpy(res, res_device, N * sizeof(float), cudaMemcpyDeviceToHost));
  27.     CHECK(cudaFree(souce_device));
  28.     CHECK(cudaFree(index_device));
  29.     CHECK(cudaDeviceSynchronize());
  30.     CHECK(cudaFree(res_device));
  31.     CHECK(cudaDeviceReset());
  32.     return timeTaken;
  33. }
  34. __global__ void BatchGatherKernel(float *source, int *index, float *res, int N, int M, int B){
  35.     int idx = blockIdx.x * blockDim.x + threadIdx.x;
  36.     if (idx < N*B){
  37.         int batch_idx = idx / N;
  38.         int source_idx = batch_idx * M + index[idx];
  39.         res[idx] = source[source_idx];
  40.     }
  41. }
  42. extern "C" float BatchGather(float *source, int *index, float *res, int N, int M, int B){
  43.     float *souce_device, *res_device;
  44.     int *index_device;
  45.     CHECK(cudaMalloc((void **)&souce_device, B * M * sizeof(float)));
  46.     CHECK(cudaMalloc((void **)&res_device, B * N * sizeof(float)));
  47.     CHECK(cudaMalloc((void **)&index_device, B * N * sizeof(int)));
  48.     CHECK(cudaMemcpy(souce_device, source, B * M * sizeof(float), cudaMemcpyHostToDevice));
  49.     CHECK(cudaMemcpy(res_device, res, B * N * sizeof(float), cudaMemcpyHostToDevice));
  50.     CHECK(cudaMemcpy(index_device, index, B * N * sizeof(int), cudaMemcpyHostToDevice));
  51.     int block_size = 1024;
  52.     int grid_size = (B * N + block_size - 1) / block_size;
  53.     float timeTaken = GET_CUDA_TIME((BatchGatherKernel<<<grid_size, block_size>>>(souce_device, index_device, res_device, N, M, B)));
  54.     CHECK(cudaGetLastError());
  55.     CHECK(cudaDeviceSynchronize());
  56.     CHECK(cudaMemcpy(res, res_device, B * N * sizeof(float), cudaMemcpyDeviceToHost));
  57.     CHECK(cudaFree(souce_device));
  58.     CHECK(cudaFree(index_device));
  59.     CHECK(cudaDeviceSynchronize());
  60.     CHECK(cudaFree(res_device));
  61.     CHECK(cudaDeviceReset());
  62.     return timeTaken;
  63. }
复制代码
这里传入到CUDA之前,我们需要在Cython或者Python中把相关的数据压缩为一维,所以传入CUDA函数的是一个一维的指针。相比于单一的Gather操作,BatchGather中的几个输入含义有所变化,例如N表示的是单Batch的Index长度,M表示的是单Batch的源数组长度。
Cython文件

对于一个新的Batch函数来说,我们需要构建一个新的Cython调用函数wrapper.pyx:
  1. # cythonize -i -f wrapper.pyx
  2. import numpy as np
  3. cimport numpy as np
  4. cimport cython
  5. cdef extern from "<dlfcn.h>" nogil:
  6.     void *dlopen(const char *, int)
  7.     char *dlerror()
  8.     void *dlsym(void *, const char *)
  9.     int dlclose(void *)
  10.     enum:
  11.         RTLD_LAZY
  12. ctypedef float (*GatherFunc)(float *source, int *index, float *res, int N, int M) noexcept nogil
  13. ctypedef float (*BatchGatherFunc)(float *source, int *index, float *res, int N, int M, int B) noexcept nogil
  14. cdef void* handle = dlopen('/path/to/libcuindex.so', RTLD_LAZY)
  15. @cython.boundscheck(False)
  16. @cython.wraparound(False)
  17. cpdef float[:] cuda_gather(float[:] x, int[:] idx):
  18.     cdef:
  19.         GatherFunc Gather
  20.         float timeTaken
  21.         int N = idx.shape[0]
  22.         int M = x.shape[0]
  23.         float[:] res = np.zeros((N, ), dtype=np.float32)
  24.     Gather = <GatherFunc>dlsym(handle, "Gather")
  25.     timeTaken = Gather(&x[0], &idx[0], &res[0], N, M)
  26.     print (timeTaken)
  27.     return res
  28. @cython.boundscheck(False)
  29. @cython.wraparound(False)
  30. cpdef float[:] batch_cuda_gather(float[:] x, int[:] idx, int B):
  31.     cdef:
  32.         BatchGatherFunc BatchGather
  33.         float timeTaken
  34.         int N = idx.shape[0] // B
  35.         int M = x.shape[0] // B
  36.         float[:] res = np.zeros((B*N, ), dtype=np.float32)
  37.     BatchGather = <BatchGatherFunc>dlsym(handle, "BatchGather")
  38.     timeTaken = BatchGather(&x[0], &idx[0], &res[0], N, M, B)
  39.     print (timeTaken)
  40.     return res
  41. while not True:
  42.     dlclose(handle)
复制代码
这里我们还是接受一维的数组,多引入一个Batch维度的参数B,其他的都是一样的。
Python调用文件

最后是用来调用的最上层Python端的代码test_gather.py:
  1. import numpy as np
  2. np.random.seed(0)
  3. from wrapper import batch_cuda_gather
  4. B = 2
  5. M = 1024 * 1024 * 128
  6. N = 1024 * 1024
  7. x = np.random.random((M*B,)).astype(np.float32)
  8. idx = np.random.randint(0, M, (N*B,)).astype(np.int32)
  9. np_res = np.zeros((B, N), dtype=np.float32)
  10. for i in range(B):
  11.     np_res[i] = x.reshape((B,-1))[i][idx.reshape((B, -1))[i]]
  12. np_res = np_res.reshape(-1)
  13. res = np.asarray(batch_cuda_gather(x, idx, B))
  14. print (res.shape)
  15. print ((res==np_res).sum())
复制代码
为了方便处理,在构建数据的时候,我们直接在生成数据阶段就生成一维的数据,然后直接调用Cython函数进行CUDA相关运算。
运行方法

首先将CUDA文件编译成动态链接库,使其可以在Cython中被调用。然后将Cython文件编译成动态链接库,使其可以在Python中被调用。最后运行Python代码即可:
  1. $ nvcc -shared ./cuda_index.cu -Xcompiler -fPIC -o ./libcuindex.so
  2. $ cythonize -i -f wrapper.pyx
  3. $ python3 test_gather.py
复制代码
运行结果如下:
  1. 0.9606080055236816
  2. (2097152,)
  3. 2097152
复制代码
这表示CUDA核函数部分的运行时长为0.96ms,输入的数组总长度为2097152,跟numpy版本的数组索引实现对比之后,得到2097152个相同的元素。也就是说,计算结果跟numpy的计算结果是一致的,以此来校验CUDA部分的运算结果。
总结概要

以学习CUDA为目的,接上一篇关于Cython与CUDA架构下的Gather算子实现,这里我们加一个Batch的维度,做一个BatchGather的简单实现。
版权声明

本文首发链接为:https://www.cnblogs.com/dechinphy/p/cython-cuda-batchgather.html
作者ID:DechinPhy
更多原著文章:https://www.cnblogs.com/dechinphy/
请博主喝咖啡:https://www.cnblogs.com/dechinphy/gallery/image/379634.html

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册