找回密码
 立即注册
首页 业界区 业界 商品中心—14.库存分桶初始化的技术文档 ...

商品中心—14.库存分桶初始化的技术文档

嫁吱裨 2025-6-26 18:31:59
大纲
1.库存分桶缓存初始化时涉及的数据表
2.库存分桶架构的初始化 + 扣减 + 上下线 + 扩容 + 下线 + 预警补货流程
3.商品库存⼊桶流程概览
4.商品库存分桶缓存初始化请求处理
5.商品库存分桶缓存初始化的加分布式锁处理 + 插入库存变更记录
6.商品库存分桶元数据本地 + 远程缓存查询
7.商品库存动态分桶算法实现
8.基于分桶算法结果构建库存分桶元数据
9.剩余库存写入中心桶缓存 + 分桶库存写入分桶缓存 + 分桶元数据写入本地缓存
 
1.库存分桶缓存初始化时涉及的数据表
(1)库存分桶配置表
(2)库存分配记录表
 
(1)库存分桶配置表
  1. CREATE TABLE `inventory_bucket_config` (
  2.     `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  3.     `bucket_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '分桶数量',
  4.     `max_depth_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '最⼤库存深度,即分桶的最大库存容量',
  5.     `min_depth_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '最⼩库存深度,即分桶的最小库存容量',
  6.     `threshold_value` int(10)  NOT NULL DEFAULT '0' COMMENT  '分桶下线阈值,当某个分桶的库存数小于阈值时就需要将该分桶下线了',
  7.     `back_source_proportion` int(10)  NOT NULL DEFAULT '0' COMMENT  '回源⽐例,从1-100设定⽐例',
  8.     `back_source_step` int(10)  NOT NULL DEFAULT '0' COMMENT  '回源步⻓,桶扩容的时候默认每次分配的库存⼤⼩',
  9.     `template_name` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '模板名称',
  10.     `is_default` tinyint(1)  NOT NULL DEFAULT '0' COMMENT '是否默认模板,只允许⼀个,1为默认模板',
  11.     `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',
  12.     `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',
  13.     `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',
  14.     `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  15.     `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',
  16.     `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  17.     PRIMARY KEY (`ID`)
  18. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存分桶配置模板表';
复制代码
(2)库存分配记录表
  1. CREATE TABLE `inventory_allot_detail` (
  2.     `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  3.     `sku_id` varchar(40) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT 'skuId',
  4.     `inventor_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '库存申请业务编号',
  5.     `seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID',
  6.     `inventor_num` int(10) NOT NULL DEFAULT '0' COMMENT '库存变更数量',
  7.     `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',
  8.     `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',
  9.     `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',
  10.     `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  11.     `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',
  12.     `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  13.     PRIMARY KEY (`id`),
  14.     UNIQUE KEY `inde_unique_inventor_no` (`inventor_no`) USING BTREE
  15. ) ENGINE=InnoDB AUTO_INCREMENT=265 DEFAULT CHARSET=utf8 COMMENT='库存分配记录表';
复制代码
 
2.库存分桶架构的初始化 + 扣减 + 上下线 + 扩容 + 下线 + 预警补货流程
分桶上线:指的是初始化时设置了可用的分桶或者增加库存时将下线的分桶设置可用。
 
分桶下线:指的是可用的分桶在扣减库存后触发了下线阈值,导致该分桶需要下线。当某个可用的分桶拥有的库存数较少时,如果将该分桶下线,那么可以提高库存扣减的效率。
 
分桶扩容:指的是当可用的分桶剩余库存少于回源比例的库存时,而且中心桶的剩余库存大于0时,就会触发当前分桶的异步扩容(即加库存)。
 
中心桶和分桶:每个商品库存都会对应一个中心桶缓存 + 多个分桶缓存。例如对10000库存分配到8个分桶中,每个分桶最多1000个库存,此时就有2000个库存剩下需要放入中心桶缓存中。中心桶的出现,是由于退款返还库存时可用于临时存放返还的库存。
 
Tair缓存:商品SKU库存的各个分桶会对应不同的分桶编号,这些分桶编号会对应于Tair缓存的各个key,从而可以应对高并发请求。这样就可以利用Redis Cluster的特性,通过把key路由到不同机器来处理。
 
一.分配库存的流程
1.png
二.扣减库存 + 分桶扩容 + 分桶下线 + 库存预警的流程
2.png
 
3.商品库存⼊桶流程概览
(1)使用入口
(2)场景一初始化分桶库存
(3)场景二增加库存
 
(1)使用入口
库存后台对商品初始化库存或增加库存时:
  1. @RestController
  2. @RequestMapping("/product/inventory")
  3. public class InventoryController {
  4.     @Autowired
  5.     private InventoryBucketService inventoryBucketService;
  6.     @Autowired
  7.     private InventoryBucketCache cache;
  8.     @Resource
  9.     private TairCache tairCache;
  10.     ...
  11.     //初始化库存
  12.     @RequestMapping("/init")
  13.     public void inventoryInit(@RequestBody InventorBucketRequest request) {
  14.         //清除本地缓存数据
  15.         cache.getCache().invalidateAll();
  16.         //清除tair中的数据,扫描卖家ID+SKU的ID的key会比较耗时
  17.         Set<String> keys = tairCache.getJedis().keys("*" + request.getSellerId() + request.getSkuId() + "*");
  18.         if (!CollectionUtils.isEmpty(keys)) {
  19.             tairCache.mdelete(Lists.newArrayList(keys));
  20.         }
  21.         //这里模拟指定本次的库存业务单号,实际接口需要外部传入
  22.         request.setInventorCode(SnowflakeIdWorker.getCode());
  23.         //初始化库存信息
  24.         inventoryBucketService.inventorBucket(request);
  25.     }
  26.     //增加库存
  27.     @RequestMapping("/inventorBucket")
  28.     public void inventorBucket(@RequestBody InventorBucketRequest request) {
  29.         //这里模拟指定本次的库存业务单号,实际接口需要外部传入
  30.         request.setInventorCode(SnowflakeIdWorker.getCode());
  31.         //增加库存
  32.         inventoryBucketService.inventorBucket(request);
  33.     }
  34.     ...
  35. }
  36. @Service
  37. public class InventoryBucketServiceImpl implements InventoryBucketService {
  38.     @Resource
  39.     private TairCache tairCache;
  40.     @Resource
  41.     private TairLock tairLock;
  42.     @Resource
  43.     private InventoryBucketCache inventoryBucketCache;
  44.     ...
  45.     //商品库存入桶分配
  46.     @Override
  47.     @Transactional(rollbackFor = Exception.class)
  48.     public void inventorBucket(InventorBucketRequest request) {
  49.         //1.验证入参必填项
  50.         checkInventorParams(request);
  51.         //锁key = 卖家ID + SKU的ID
  52.         String key = buildBucketLockKey(request.getSellerId(), request.getSkuId());
  53.         String value = SnowflakeIdWorker.getCode();
  54.         //注意这里需要锁定中心桶库存
  55.         boolean lock = tairLock.tryLock(key, value);
  56.         //分配库存时,这个卖家的sku是不允许其他相关操作的
  57.         if (lock) {
  58.             try {
  59.                 //2.插入库存入库的记录信息
  60.                 //由于申请的库存业务编号是一个唯一key,所以可以避免重复请求
  61.                 //也就是会校验库存单号是否已经存在了,保证⼀次库存变更⾏为只能执⾏⼀次
  62.                 inventoryRepository.saveInventoryAllotDetail(request);
  63.                 //3.将库存数据写入缓存
  64.                 inventoryBucketCache(request);
  65.             } catch (Exception e) {
  66.                 e.printStackTrace();
  67.             } finally {
  68.                 tairLock.unlock(key, value);
  69.             }
  70.         } else {
  71.             throw new BaseBizException("请求繁忙,稍后重试!");
  72.         }
  73.     }
  74.     //将库存数据写入缓存
  75.     private void inventoryBucketCache(InventorBucketRequest request) {
  76.         //获取中心桶库存的key
  77.         String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId());
  78.         //1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存
  79.         BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
  80.         //缓存不存在,则进行初始化
  81.         if (Objects.isNull(bucketLocalCache)) {
  82.             //2.获取库存分桶的配置模板
  83.             InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId());
  84.             //初始化分桶库存
  85.             initInventoryBucket(request, inventoryBucketConfig);
  86.         } else {
  87.             //3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存
  88.             Integer residueNum = tairCache.incr(key, request.getInventoryNum());
  89.             //4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去)
  90.             InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request);
  91.             //5.构建新的分桶元数据信息并写入
  92.             //分桶元数据中包含了商品库存应该如何构建分桶缓存的详细信息
  93.             writeBucketCache(onlineRequest, residueNum);
  94.         }
  95.     }
  96.     //获取锁的key
  97.     private String buildBucketLockKey(String sellerId, String skuId) {
  98.         return TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + sellerId + skuId;
  99.     }
  100.     //获取中心桶库存的key
  101.     private String buildSellerInventoryKey(String sellerId, String skuId) {
  102.         return TairInventoryConstant.SELLER_INVENTORY_PREFIX + sellerId + skuId;
  103.     }
  104.     ...
  105. }
复制代码
(2)场景一初始化分桶库存
  1. 步骤一:检验入参必填项
  2. 步骤二:插入库存入库的记录信息
  3. 步骤三:获取库存分桶对应的配置模板,开始初始化分桶库存
  4. 步骤四:初始化分桶库存时首先计算出本次库存入库的具体分桶信息
  5. 步骤五:根据计算好的分桶数 + 每个分桶分配的库存数构建缓存数据模型
  6. 步骤六:计算完分桶的数据信息后则将这些信息写入远程缓存和本地缓存
复制代码
步骤一:检验入参必填项
 
步骤二:插入库存入库的记录信息。由于申请的库存业务编号是一个唯一key,所以可以避免重复请求。即会校验库存单号是否已经存在,保证⼀次库存变更⾏为只执⾏⼀次。
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     ...
  4.     //验证入参必填字段
  5.     private void checkInventorParams(InventorBucketRequest request) {
  6.         if (Objects.isNull(request)) {
  7.             throw new BaseBizException(InventoryExceptionCode.PARAM_CHECK_ERROR, InventoryExceptionCode.PARAM_CHECK_ERROR.getErrorCode());
  8.         }
  9.         if (Objects.isNull(request.getInventorCode()) || Objects.isNull(request.getInventoryNum())
  10.                 || Objects.isNull(request.getSellerId()) || Objects.isNull(request.getSkuId())) {
  11.             throw new BaseBizException(InventoryExceptionCode.PARAM_CHECK_ERROR, InventoryExceptionCode.PARAM_CHECK_ERROR.getErrorCode());
  12.         }
  13.         if (request.getInventoryNum() <= 0) {
  14.             throw new BaseBizException(InventoryExceptionCode.PARAM_CHECK_ERROR, InventoryExceptionCode.PARAM_CHECK_ERROR.getErrorCode());
  15.         }
  16.     }
  17.     ...
  18. }
  19. @Repository
  20. public class InventoryRepository {
  21.     ...
  22.     //存储每次库存入库的申请记录
  23.     //校验库存单号是否已经存在了(⼀次库存变更⾏为只能执⾏⼀次)
  24.     public void saveInventoryAllotDetail(InventorBucketRequest request) {
  25.         InventoryAllotDetailDO inventoryAllotDetailDO = inventoryConverter.converterDO(request);
  26.         inventoryAllotDetailDO.initCommon();
  27.         int count = inventoryAllotDetailMapper.insert(inventoryAllotDetailDO);
  28.         if (count <= 0) {
  29.             throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL);
  30.         }
  31.     }
  32.     ...
  33. }
复制代码
步骤五:根据计算好的分桶数 + 每个分桶分配的库存数构建缓存数据模型
  1. 通过InventoryBucketCache的getBucketLocalCache()方法获取本地缓存
  2. 通过inventoryRepository的getInventoryBucketConfig()方法获取分桶配置模版
  3. 通过initInventoryBucket()方法执行初始化分桶库存
复制代码
步骤六:计算完分桶的数据信息后则将这些信息写入远程缓存和本地缓存
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     @Resource
  4.     private InventoryBucketCache inventoryBucketCache;
  5.     ...
  6.     //将库存数据写入缓存
  7.     private void inventoryBucketCache(InventorBucketRequest request) {
  8.         //获取中心桶库存的key
  9.         String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId());
  10.         //1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存
  11.         BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
  12.         //缓存不存在,则进行初始化
  13.         if (Objects.isNull(bucketLocalCache)) {
  14.             //2.获取库存分桶的配置模板
  15.             InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId());
  16.             //初始化分桶库存
  17.             initInventoryBucket(request, inventoryBucketConfig);
  18.         } else {
  19.             //3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存
  20.             Integer residueNum = tairCache.incr(key, request.getInventoryNum());
  21.             //4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去)
  22.             InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request);
  23.             //5.构建新的分桶信息数据信息并写入
  24.             writeBucketCache(onlineRequest, residueNum);
  25.         }
  26.     }
  27.     ...
  28. }
  29. //库存分桶配置
  30. @Data
  31. @TableName("inventory_bucket_config")
  32. public class InventoryBucketConfigDO extends BaseEntity {
  33.     //分桶数量
  34.     private Integer bucketNum;
  35.     //最大库存深度
  36.     private Integer maxDepthNum;
  37.     //最小库存深度
  38.     private Integer minDepthNum;
  39.     //分桶下线阈值
  40.     private Integer thresholdValue;
  41.     //分桶下线阈值动态比例
  42.     private Integer thresholdProportion;
  43.     //回源比例,从1-100设定比例
  44.     private Integer backSourceProportion;
  45.     //回源步长,桶扩容的时候默认每次分配的库存大小
  46.     private Integer backSourceStep;
  47.     //模板名称
  48.     private String templateName;
  49.     //是否默认模板,只允许一个,1为默认模板
  50.     private Integer isDefault;
  51. }
复制代码
(3)场景二增加库存
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     ...
  4.     //初始化分桶库存
  5.     private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
  6.         //计算出分桶的数据信息
  7.         BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig);
  8.         //写入远程缓存以及本地缓存
  9.         writeCache(bucketLocalCache);
  10.     }
  11.     //计算出当前库存入库的具体分桶信息
  12.     private BucketLocalCache calcInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
  13.         //构建出当前分桶的数据模型,库存数量大于最大容量时,通过最大容量来构建模型
  14.         return buildBucketInfo(request, inventoryBucketConfig);
  15.     }
  16.     //构建分桶的数据模型
  17.     //@param request               请求入参
  18.     //@param inventoryBucketConfig 分桶配置信息
  19.     private BucketLocalCache buildBucketInfo(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
  20.         //分桶配置模版中默认的分桶数量
  21.         Integer bucketNum = inventoryBucketConfig.getBucketNum();
  22.         //获取本次需要入库的库存数量
  23.         Integer inventorNum = request.getInventoryNum();
  24.         //配置模版中所有分桶的最大库存容量
  25.         Integer maxBucketNum = bucketNum * inventoryBucketConfig.getMaxDepthNum();
  26.         //配置模版中所有分桶的最小库存容量
  27.         //如果需要放入分桶的库存数量低于这个值,那么只会分配给部分分桶,此时就需要重新计算分桶
  28.         Integer minBucketNum = bucketNum * inventoryBucketConfig.getMinDepthNum();
  29.         //本次最多可以放入分桶的库存数量
  30.         int countBucketNum = Math.min(inventorNum, maxBucketNum);
  31.         //当库存数量小于最小分桶深度 * 分桶数量,就需要减少分配的分桶数
  32.         //此时要分配的分桶数量 bucketNum = 本次库存入库的数量 / 每个分桶的最小库存容量
  33.         if (minBucketNum > countBucketNum) {
  34.             bucketNum = countBucketNum / inventoryBucketConfig.getMinDepthNum();
  35.             //如果库存数量不足一个分桶的最小深度,但是大于0,则上线一个分桶
  36.             if (bucketNum == 0 && countBucketNum % inventoryBucketConfig.getMinDepthNum() > 0) {
  37.                 bucketNum++;
  38.             }
  39.         }
  40.         //获取每个分桶分配的库存数量
  41.         Integer bucketInventorNum = countBucketNum / bucketNum;
  42.         //剩余库存数量,可能为0或者大于0,补到最后一个分桶上
  43.         Integer residueNum = countBucketNum - bucketInventorNum * bucketNum;
  44.         //构建缓存数据模型时,以卖家ID + 商品skuId为唯一标识
  45.         String key = request.getSellerId() + request.getSkuId();
  46.         //构建缓存数据模型
  47.         BucketLocalCache bucketLocalCache = buildBucketCache(key, bucketNum, bucketInventorNum, residueNum, inventoryBucketConfig);
  48.         //标记到具体的数据上
  49.         bucketLocalCache.setSellerId(request.getSellerId());
  50.         bucketLocalCache.setSkuId(request.getSkuId());
  51.         bucketLocalCache.setInventoryNum(inventorNum);
  52.         //中心桶剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量
  53.         bucketLocalCache.setResidueNum(inventorNum - countBucketNum);
  54.         bucketLocalCache.setInventoryBucketConfig(inventoryBucketConfig);
  55.         return bucketLocalCache;
  56.     }
  57.     ...
  58. }
复制代码
步骤一:检验入参必填项
 
步骤二:插入库存入库的记录信息。由于申请的库存业务编号是一个唯一key,所以可以避免重复请求。即会校验库存单号是否已经存在,保证⼀次库存变更⾏为只执⾏⼀次。
 
步骤三:缓存已存在,先把库存加到中心桶上
 
步骤四:构建新的分桶元数据信息并写入
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     ...
  4.     //构建缓存模型
  5.     //@param bucketNum             分桶数量
  6.     //@param inventorNum           分桶分配的库存数量
  7.     //@param residueNum            剩余的未分配均匀的库存
  8.     //@param inventoryBucketConfig 分桶配置信息
  9.     private BucketLocalCache buildBucketCache(String key, Integer bucketNum, Integer inventorNum, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) {
  10.         BucketLocalCache bucketLocalCache = new BucketLocalCache();
  11.         //先获取得到这个模板配置的对应可分槽位的均匀桶列表
  12.         List<String> bucketNoList = InventorBucketUtil.createBucketNoList(key, inventoryBucketConfig.getBucketNum());
  13.         List<BucketCacheBO> bucketCacheBOList = new ArrayList<>(bucketNum);
  14.         List<BucketCacheBO> undistributedList = new ArrayList<>(bucketNum);
  15.         //构建出多个分桶对象
  16.         for (int i = 0; i < bucketNum; i++) {
  17.             //生成对应的分桶编号,方便定义到具体的分桶上
  18.             BucketCacheBO bucketCache = new BucketCacheBO();
  19.             String bucketNo = bucketNoList.get(i);
  20.             bucketCache.setBucketNo(bucketNo);
  21.             //最后一个分桶,分配剩余未除尽的库存 + 平均库存
  22.             if (i == bucketNum - 1) {
  23.                 bucketCache.setBucketNum(inventorNum + residueNum);
  24.             } else {
  25.                 bucketCache.setBucketNum(inventorNum);
  26.             }
  27.             bucketCacheBOList.add(bucketCache);
  28.         }
  29.         //生成的分桶对象超过实际可分配的分桶对象,保留这批多余的分桶模型为不可用分桶,后续分桶上线可以选择使用
  30.         if (bucketNoList.size() > bucketNum) {
  31.             for (int i = bucketNum; i < bucketNoList.size(); i++) {
  32.                 BucketCacheBO bucketCache = new BucketCacheBO();
  33.                 bucketCache.setBucketNo(bucketNoList.get(i));
  34.                 undistributedList.add(bucketCache);
  35.             }
  36.         }
  37.         //设置可用的分桶缓存列表
  38.         bucketLocalCache.setAvailableList(bucketCacheBOList);
  39.         //设置不可用或者已下线的分桶缓存列表
  40.         bucketLocalCache.setUndistributedList(undistributedList);
  41.         return bucketLocalCache;
  42.     }
  43.     ...
  44. }
  45. //库存工具方法
  46. public class InventorBucketUtil {
  47.     private static final int MAX_SIZE = 100000;
  48.     //生成对应的槽位key
  49.     //@param key       卖家Id + 商品skuId
  50.     //@param bucketNum 分桶配置数量
  51.     //@return 预先保留的槽位集合
  52.     public static List<String> createBucketNoList(String key, Integer bucketNum) {
  53.         Map<Long, String> cacheKey = new HashMap<>(bucketNum);
  54.         //bucketNoList用来存放每个桶对应的hashKey
  55.         List<String> bucketNoList = new ArrayList<>(bucketNum);
  56.         for (int i = 1; i <= MAX_SIZE; i++) {
  57.             String serialNum = String.format("%06d", i);
  58.             //卖家ID + 商品SKU ID + 序号
  59.             String hashKey = key + serialNum;
  60.             //一致性哈希算法murmur
  61.             long hash = HashUtil.murMurHash(hashKey.getBytes());
  62.             //对分桶数量进行取模运算
  63.             long c = (hash %= bucketNum);
  64.             //确保被选中的hashKey都能哈希到不同的分桶
  65.             if (cacheKey.containsKey(c)) {
  66.                 continue;
  67.             }
  68.             cacheKey.put(c, hashKey);
  69.             bucketNoList.add(hashKey);
  70.             if (cacheKey.size() >= bucketNum) {
  71.                 break;
  72.             }
  73.         }
  74.         return bucketNoList;
  75.     }
  76. }
复制代码
步骤五:获取本地存储的分桶元数据信息
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     @Resource
  4.     private InventoryBucketCache inventoryBucketCache;
  5.     @Resource
  6.     private TairCache tairCache;
  7.     ...
  8.     //初始化分桶库存
  9.     private void initInventoryBucket(InventorBucketRequest request, InventoryBucketConfigDO inventoryBucketConfig) {
  10.         //计算出分桶的数据信息
  11.         BucketLocalCache bucketLocalCache = calcInventoryBucket(request, inventoryBucketConfig);
  12.         //写入远程缓存以及本地缓存
  13.         writeCache(bucketLocalCache);
  14.     }
  15.     //将数据写入远程缓存以及本地缓存
  16.     private void writeCache(BucketLocalCache bucketLocalCache) {
  17.         //卖家ID + 商品skuId标识
  18.         String key = bucketLocalCache.getSellerId() + bucketLocalCache.getSkuId();
  19.         //1.写入中心桶的库存信息:中心桶存放的是剩余库存 = 本次需要入库的库存数量 - 本次最多可以放入分桶的库存数量
  20.         log.info("中心桶中存放的是剩余库存:{}", bucketLocalCache.getResidueNum());
  21.         tairCache.set(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, bucketLocalCache.getResidueNum().toString(), 0);
  22.         //2.写入数据到对应的缓存上,计算出的分桶库存写入Tair上的分桶缓存
  23.         List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
  24.         for (BucketCacheBO bucketCacheBO : availableList) {
  25.             log.info("bucketNo: {}, inventoryNum: {}", bucketCacheBO.getBucketNo(), bucketCacheBO.getBucketNum());
  26.             tairCache.set(bucketCacheBO.getBucketNo(), JSONObject.toJSONString(bucketCacheBO.getBucketNum()), 0);
  27.         }
  28.         //3.将数据存储到本地缓存列表
  29.         inventoryBucketCache.setBucketLocalCache(key, bucketLocalCache);
  30.         //4.维护分桶的元数据信息到缓存上
  31.         tairCache.set(TairInventoryConstant.SELLER_BUCKET_PREFIX + key, JSONObject.toJSONString(bucketLocalCache), 0);
  32.         log.info("元数据信息: {}", JSONObject.toJSONString(bucketLocalCache));
  33.     }
  34.     ...
  35. }
  36. @Component
  37. @Data
  38. public class InventoryBucketCache {
  39.     //本地缓存
  40.     @Autowired
  41.     private Cache cache;
  42.     @Resource
  43.     private TairCache tairCache;
  44.     //本地存储分桶元数据信息
  45.     public void setBucketLocalCache(String bucketKey, BucketLocalCache bucketLocalCache) {
  46.         log.info("local cache set key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));
  47.         cache.put(bucketKey, bucketLocalCache);
  48.     }
  49.     ...
  50. }
  51. @Component
  52. public class TairCache {
  53.     private JedisPool jedisPool;
  54.     public TairCache(JedisPool jedisPool) {
  55.         this.jedisPool = jedisPool;
  56.     }
  57.     public Jedis getJedis() {
  58.         return jedisPool.getResource();
  59.     }
  60.     public TairString createTairString(Jedis jedis) {
  61.         return new TairString(jedis);
  62.     }
  63.     ...
  64.     //缓存存储
  65.     public Boolean set(String key, String value, int seconds) {
  66.         log.info("tairString set key:{}, value:{}, seconds:{}", key, value, seconds);
  67.         try (Jedis jedis = getJedis()) {
  68.             TairString tairString = createTairString(jedis);
  69.             String result;
  70.             if (seconds > 0) {
  71.                 result = tairString.exset(key, value, new ExsetParams().ex(seconds));
  72.             } else {
  73.                 result = tairString.exset(key, value);
  74.             }
  75.             return "OK".equals(result);
  76.         }
  77.     }
  78.     ...
  79. }
复制代码
步骤六:获取当前可上线的分桶列表信息以及具体上线库存
  1. 步骤一:检验入参必填项
  2. 步骤二:插入库存入库的记录信息
  3. 步骤三:缓存已存在,先把库存加到中心桶上
  4. 步骤四:构建新的分桶元数据信息并写入
  5. 步骤五:获取本地存储的分桶元数据信息
  6. 步骤六:获取当前可上线的分桶列表信息以及具体上线库存
  7. 步骤七:如果当前可上线分桶列表信息不空则构建新的分桶元数据模型
  8. 步骤八:写入数据到远程缓存中并更新本地缓存的分桶元数据信息
复制代码
步骤七:如果当前可上线分桶列表信息不空则构建新的分桶元数据模型
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     @Resource
  4.     private InventoryBucketCache inventoryBucketCache;
  5.     ...
  6.     //将库存数据写入缓存
  7.     private void inventoryBucketCache(InventorBucketRequest request) {
  8.         //获取中心桶库存的key
  9.         String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId());
  10.         //1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存
  11.         BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
  12.         //缓存不存在,则进行初始化
  13.         if (Objects.isNull(bucketLocalCache)) {
  14.             //2.获取库存分桶的配置模板
  15.             InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId());
  16.             //初始化分桶库存
  17.             initInventoryBucket(request, inventoryBucketConfig);
  18.         } else {
  19.             //3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存
  20.             Integer residueNum = tairCache.incr(key, request.getInventoryNum());
  21.             InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request);
  22.             //4.构建新的分桶元数据信息并写入
  23.             writeBucketCache(onlineRequest, residueNum);
  24.         }
  25.     }
  26.     //构建新的分桶元数据信息
  27.     //@param request    分桶上线对象
  28.     //@param residueNum 中心桶剩余库存
  29.     private void writeBucketCache(InventorOnlineRequest request, Integer residueNum) {
  30.         String key = request.getSellerId() + request.getSkuId();
  31.         //5.获取本地存储的分桶元数据信息
  32.         BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
  33.         if (!Objects.isNull(bucketLocalCache)) {
  34.             //6.获取当前可上线的分桶列表信息以及具体上线库存
  35.             List<BucketCacheBO> bucketCacheBOList = buildBucketList(
  36.                 request.getBucketNoList(),
  37.                 bucketLocalCache.getAvailableList(),
  38.                 bucketLocalCache.getUndistributedList(),
  39.                 bucketLocalCache.getInventoryBucketConfig(),
  40.                 residueNum
  41.             );
  42.             //当前可上线的分桶为空,直接返回
  43.             if (CollectionUtils.isEmpty(bucketCacheBOList)) {
  44.                 return;
  45.             }
  46.             //7.构建返回新的分桶元数据模型返回
  47.             buildBucketLocalCache(bucketLocalCache, bucketCacheBOList, residueNum);
  48.             //8.写入数据到远程缓存中并更新本地缓存的分桶元数据信息
  49.             writeBucketLocalCache(bucketLocalCache, bucketCacheBOList);
  50.         }
  51.     }
  52.     ...
  53. }
复制代码
步骤八:写入数据到远程缓存中并更新本地缓存的分桶元数据信息
  1. @Component
  2. @Data
  3. public class InventoryBucketCache {
  4.     //本地缓存
  5.     @Autowired
  6.     private Cache cache;
  7.     @Autowired
  8.     private TairCache tairCache;
  9.     //获取本地存储的分桶元数据信息
  10.     public BucketLocalCache getBucketLocalCache(String bucketKey) {
  11.         //先查本地缓存
  12.         BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey);
  13.         log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));
  14.         if (Objects.isNull(bucketLocalCache)) {
  15.             //再查远程缓存
  16.             synchronized (bucketKey.intern()) {
  17.                 String bucketCache = tairCache.get(TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey);
  18.                 if (!StringUtils.isEmpty(bucketCache)) {
  19.                     bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);
  20.                     cache.put(bucketKey, bucketLocalCache);
  21.                 }
  22.             }
  23.         }
  24.         return bucketLocalCache;
  25.     }
  26. }
复制代码
 
4.商品库存分桶缓存初始化请求处理
首先通过Guava Cache清除本地的缓存数据,然后通过Tair Cache清除Tair的数据,最后传入请求参数模型到inventorBucket()方法来初始化库存。注意:请求参数模型中会带上本次的库存业务单号。
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     ...
  4.     //获取当前可上线的分桶列表信息以及具体上线库存
  5.     //@param bucketNoList            上线分桶编号列表
  6.     //@param availableList           上线正在使用的分桶编号列表
  7.     //@param undistributedList       下线或者未使用的分桶编号列表
  8.     //@param inventoryBucketConfigDO 当前分桶的配置模板信息
  9.     //@param residueNum              中心桶的剩余可分配库存
  10.     //@return 当前可上线的分桶列表以及具体分桶库存
  11.     private List<BucketCacheBO> buildBucketList(List<String> bucketNoList, List<BucketCacheBO> availableList, List<BucketCacheBO> undistributedList, InventoryBucketConfigDO inventoryBucketConfigDO, Integer residueNum) {
  12.         //1.如果入参选择了上线的分桶编号列表,则从缓存中配置的未使用分桶列表进行比对处理
  13.         List<String> bucketCacheList = null;
  14.         if (!CollectionUtils.isEmpty(bucketNoList)) {
  15.             Map<String, BucketCacheBO> bucketCacheMap = undistributedList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));
  16.             //过滤返回可用的分桶编号
  17.             bucketCacheList = bucketNoList.stream().filter(bucketNo -> bucketCacheMap.containsKey(bucketNo)).collect(Collectors.toList());
  18.         } else {
  19.             //直接返回:下线的不可用分桶列表
  20.             bucketCacheList = undistributedList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());
  21.         }
  22.         //可上线的分桶列表为空
  23.         if (CollectionUtils.isEmpty(bucketCacheList)) {
  24.             return Lists.newArrayList();
  25.         }
  26.         //2.根据中心桶的可分配库存,处理返回具体上线的分桶配置信息
  27.         return calcOnlineBucket(availableList, bucketCacheList, residueNum, inventoryBucketConfigDO);
  28.     }
  29.     //构建上线的分桶库存模型
  30.     //@param availableList         上线正在使用的分桶编号列表
  31.     //@param bucketCacheList       预上线的分桶列表
  32.     //@param residueNum            中心桶剩余库存容量
  33.     //@param inventoryBucketConfig 当前分桶配置信息
  34.     private List<BucketCacheBO> calcOnlineBucket(List<BucketCacheBO> availableList, List<String> bucketCacheList, Integer residueNum, InventoryBucketConfigDO inventoryBucketConfig) {
  35.         List<BucketCacheBO> bucketCacheBOList = new ArrayList<>();
  36.         //获取已上线分桶 + 准备上线的分桶数量
  37.         Integer sumBucketSize = availableList.size() + bucketCacheList.size();
  38.         //获取得到已上线的分桶分配库存深度总和
  39.         int sumBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();
  40.         //获取总的库存深度 + 中心桶的库存,得到平均的分桶实际可分配库存深度
  41.         Integer averageNum = (sumBucketNum + residueNum) / sumBucketSize;
  42.         //当前准备分桶上线的数量(一般都是未使用的下线分桶)
  43.         Integer bucketNum = bucketCacheList.size();
  44.         //计算一下平均分桶的库存 是否小于最小深度,如小于则以最小深度为准进行分桶
  45.         Integer minBucketNum = bucketNum * averageNum;
  46.         //当库存数量小于最小分桶深度*分桶数量,减少可分配的分桶数量, 最后一个分桶分配剩余的全部库存(避免少量的库存分桶直接触发阈值下线)
  47.         if (minBucketNum > residueNum) {
  48.             bucketNum = residueNum / inventoryBucketConfig.getMinDepthNum();
  49.             averageNum = inventoryBucketConfig.getMinDepthNum();
  50.         }
  51.         //如果库存深度超过最大库存深度,则只存放最大深度
  52.         if (averageNum > inventoryBucketConfig.getMaxDepthNum()) {
  53.             averageNum = inventoryBucketConfig.getMinDepthNum();
  54.         }
  55.         //当前没有准备分桶上线的数量
  56.         if (bucketNum == 0) {
  57.             return Lists.newArrayList();
  58.         }
  59.         //开始填充每个分桶的具体上线库存
  60.         for (int i = 0; i < bucketNum; i++) {
  61.             BucketCacheBO bucketCache = new BucketCacheBO();
  62.             //这里上线的分桶数量不会超过实际能够上线的分桶数量,所以bucketCacheList.get(i)不会数组越界
  63.             bucketCache.setBucketNo(bucketCacheList.get(i));
  64.             bucketCache.setBucketNum(averageNum);
  65.             bucketCache.setAllotNum(bucketCache.getBucketNum());
  66.             bucketCacheBOList.add(bucketCache);
  67.         }
  68.         return bucketCacheBOList;
  69.     }
  70.     ...
  71. }
复制代码
 
5.商品库存分桶缓存初始化的加分布式锁处理 + 插入库存变更记录
执行InventoryBucketService的inventorBucket()方法来初始化分桶缓存。
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     ...
  4.     //构建新的分桶元数据模型
  5.     //@param bucketLocalCache  本地分桶元数据信息
  6.     //@param bucketCacheBOList 上线的分桶数据列表
  7.     //@param residueNum        中心桶剩余库存
  8.     private void buildBucketLocalCache(BucketLocalCache bucketLocalCache, List<BucketCacheBO> bucketCacheBOList, Integer residueNum) {
  9.         //获取本次上线的库存信息
  10.         Integer inventoryNum = 0;
  11.         for (BucketCacheBO bucketCacheBO : bucketCacheBOList) {
  12.             inventoryNum = inventoryNum + bucketCacheBO.getBucketNum();
  13.         }
  14.         //填充中心桶剩余库存
  15.         residueNum = residueNum - inventoryNum;
  16.         bucketLocalCache.setResidueNum(residueNum);
  17.         //添加新上线的分桶列表
  18.         bucketLocalCache.getAvailableList().addAll(bucketCacheBOList);
  19.         Map<String, BucketCacheBO> bucketCacheMap = bucketCacheBOList.stream().collect(Collectors.toMap(BucketCacheBO::getBucketNo, Function.identity()));
  20.         List<BucketCacheBO> undistributedList = bucketLocalCache.getUndistributedList().stream().filter(bucketCacheBO ->
  21.             //在上线的分桶列表,需要移除掉
  22.             !bucketCacheMap.containsKey(bucketCacheBO.getBucketNo())).collect(Collectors.toList()
  23.         );
  24.         //从不可用的分桶列表重移除
  25.         bucketLocalCache.setUndistributedList(undistributedList);
  26.     }
  27.     ...
  28. }
复制代码
 
8.基于分桶算法结果构建库存分桶元数据
库存分桶元数据BucketLocalCache中包含了商品库存应该如何构建分桶缓存的详细信息。
  1. 通过InventoryBucketCache的getBucketLocalCache()方法获取本地缓存
  2. 通过inventoryRepository的getInventoryBucketConfig()方法获取分桶配置模版
  3. 通过initInventoryBucket()方法执行初始化分桶库存//本地分桶缓存相关信息 = 分桶元数据//分桶元数据中包含了商品库存应该如何构建分桶缓存的详细信息@Datapublic class BucketLocalCache {    //商品skuID    private String skuId;    //卖家ID    private String sellerId;    //中心桶库存    private Integer inventoryNum;    //中心桶剩余库存    private Integer residueNum;    //本地元数据对应的版本号    private String version;    //操作类型,0上线,1下线    private Integer operationType;    //当前分桶的配置信息    private InventoryBucketConfigDO inventoryBucketConfig;    //分桶明细缓存key    private List bucketDetailKeyList;    //可用分桶缓存列表    private List availableList;    //未分配或者已下线的分桶缓存列表,不可用的分桶缓存列表    private List undistributedList;    //默认的缓存对象    public String getBucketLocalKey() {        return sellerId + skuId;    }}
复制代码
 
9.剩余库存写入中心桶缓存 + 分桶库存写入分桶缓存 + 分桶元数据写入本地缓存
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     @Resource
  4.     private InventoryBucketCache inventoryBucketCache;
  5.     ...
  6.     //将库存数据写入缓存
  7.     private void inventoryBucketCache(InventorBucketRequest request) {
  8.         //获取中心桶库存的key
  9.         String key = buildSellerInventoryKey(request.getSellerId(), request.getSkuId());
  10.         //1.先验证是否已缓存分桶元数据信息,先查本地缓存,再查远程缓存
  11.         BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
  12.         //缓存不存在,则进行初始化
  13.         if (Objects.isNull(bucketLocalCache)) {
  14.             //2.获取库存分桶的配置模板
  15.             InventoryBucketConfigDO inventoryBucketConfig = inventoryRepository.getInventoryBucketConfig(request.getTemplateId());
  16.             //初始化分桶库存
  17.             initInventoryBucket(request, inventoryBucketConfig);
  18.         } else {
  19.             //3.缓存已存在,直接把库存加到中心桶里面,并返回中心桶库存
  20.             Integer residueNum = tairCache.incr(key, request.getInventoryNum());
  21.             //4.尝试将库存分配到新的分桶上(注意,先将中心桶的库存加上去)
  22.             InventorOnlineRequest onlineRequest = inventoryConverter.converterRequest(request);
  23.             //5.构建新的分桶信息数据信息并写入
  24.             writeBucketCache(onlineRequest, residueNum);
  25.         }
  26.     }
  27.     ...
  28. }
  29. //库存分桶配置
  30. @Data
  31. @TableName("inventory_bucket_config")
  32. public class InventoryBucketConfigDO extends BaseEntity {
  33.     //分桶数量
  34.     private Integer bucketNum;
  35.     //最大库存深度
  36.     private Integer maxDepthNum;
  37.     //最小库存深度
  38.     private Integer minDepthNum;
  39.     //分桶下线阈值
  40.     private Integer thresholdValue;
  41.     //分桶下线阈值动态比例
  42.     private Integer thresholdProportion;
  43.     //回源比例,从1-100设定比例
  44.     private Integer backSourceProportion;
  45.     //回源步长,桶扩容的时候默认每次分配的库存大小
  46.     private Integer backSourceStep;
  47.     //模板名称
  48.     private String templateName;
  49.     //是否默认模板,只允许一个,1为默认模板
  50.     private Integer isDefault;
  51. }
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册