找回密码
 立即注册
首页 业界区 业界 商品中心—15.库存分桶扣减的技术文档

商品中心—15.库存分桶扣减的技术文档

厌外 2025-6-27 22:31:04
大纲
1.库存分桶扣减和扩容时涉及的数据表
2.下单时商品库存扣减
3.库存分桶扣减后异步更新DB
4.取消订单时回退商品库存
5.查询商品库存
6.库存扣减分桶轮询以及随机备用分桶
7.基于Tair中分桶数据实现库存扣减
8.分桶库存扣减完毕后扣减明细异步落库
9.分桶扣减完库存后异步触发回源扩容
10.库存分桶回源扩容的Double Check
11.库存分桶扩容量计算算法实现
12.库存分桶扩容完成以及分桶下线触发
13.库存分桶下线以及剩余存量归还中心桶
14.库存下线触发剩余库存总量预警机制
 
1.库存分桶扣减和扩容时涉及的数据表
(1)库存分桶配置表
(2)库存扣减明细表
 
(1)库存分桶配置表
  1. CREATE TABLE `inventory_bucket_config` (
  2.     `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  3.     `bucket_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '分桶数量',
  4.     `max_depth_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '最⼤库存深度,即分桶的最大库存容量',
  5.     `min_depth_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '最⼩库存深度,即分桶的最小库存容量',
  6.     `threshold_value` int(10)  NOT NULL DEFAULT '0' COMMENT  '分桶下线阈值,当某个分桶的库存数小于阈值时就需要将该分桶下线了',
  7.     `back_source_proportion` int(10)  NOT NULL DEFAULT '0' COMMENT  '回源⽐例,从1-100设定⽐例',
  8.     `back_source_step` int(10)  NOT NULL DEFAULT '0' COMMENT  '回源步⻓,桶扩容的时候默认每次分配的库存⼤⼩',
  9.     `template_name` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '模板名称',
  10.     `is_default` tinyint(1)  NOT NULL DEFAULT '0' COMMENT '是否默认模板,只允许⼀个,1为默认模板',
  11.     `version_id` int(10) NOT NULL DEFAULT '0' COMMENT '版本号',
  12.     `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',
  13.     `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',
  14.     `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  15.     `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',
  16.     `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  17.     PRIMARY KEY (`ID`)
  18. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存分桶配置模板表';
复制代码
(2)库存扣减明细表
  1. CREATE TABLE `inventory_deduction_detail` (
  2.     `id` int(10) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  3.     `order_id` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '订单id',
  4.     `refund_no` varchar(32) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '退款编号',
  5.     `inventory_num` int(10)  NOT NULL DEFAULT '0' COMMENT  '扣减库存数量',
  6.     `sku_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '商品skuId',
  7.     `seller_id` varchar(64) CHARACTER SET utf8mb4 DEFAULT NULL COMMENT '卖家ID',
  8.     `bucket_no` int(10)  NOT NULL COMMENT  '扣减分桶编号',
  9.     `deduction_type` int(2)  NOT NULL COMMENT  '库存操作类型(10库存扣减,20库存退货)',
  10.     `del_flag` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除标记(1-有效,0-删除)',
  11.     `create_user` int(10) NOT NULL DEFAULT '0' COMMENT '创建⼈',
  12.     `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
  13.     `update_user` int(10) NOT NULL DEFAULT '0' COMMENT '更新⼈',
  14.     `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  15.     PRIMARY KEY (`ID`)
  16. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COMMENT='库存扣减明细表';
复制代码
(3)流程图
1.png
 
2.下单时商品库存扣减
(1)使用入口
(2)场景一之扣减库存
(3)场景二之扩容库存
(4)场景三之分桶下线
 
(1)使用入口
⽤户在前端下单时,会对商品库存发起扣减请求。
  1. @RestController
  2. @RequestMapping("/product/inventory")
  3. public class InventoryController {
  4.     @Autowired
  5.     private InventoryService inventoryService;
  6.     ...
  7.     //扣减库存
  8.     @RequestMapping("/deduct")
  9.     public JsonResult deductProductStock(@RequestBody InventoryRequest request) {
  10.         //这里模拟指定本次的库存业务单号,实际接口需要外部传入
  11.         request.setOrderId(SnowflakeIdWorker.getCode());
  12.         JsonResult result = inventoryService.deductProductStock(request);
  13.         return result;
  14.     }
  15.     ...
  16. }
  17. @Service
  18. public class InventoryServiceImpl implements InventoryService {
  19.     ...
  20.     //扣减商品库存
  21.     @Override
  22.     public JsonResult deductProductStock(InventoryRequest request) {
  23.         //1.验证入参是否合法
  24.         checkDeductProductStock(request);
  25.         //2.构建扣减库存的上下文对象
  26.         BucketContext bucketContext = buildDeductProductStock(request);
  27.         try {
  28.             //3.获取是否已经有一条扣减明细记录,检查该笔订单号是否已经在缓存中存在
  29.             String repeatDeductInfo = getRepeatDeductInfo(bucketContext);
  30.             if (!StringUtils.isEmpty(repeatDeductInfo)) {
  31.                 return JsonResult.buildSuccess();
  32.             }
  33.             //4.执行库存扣减
  34.             deductInventory(bucketContext);
  35.             //5.写入明细,如果已重复写入,则写入失败,并回退库存
  36.             writeInventoryDetail(bucketContext);
  37.         } catch (Exception e) {
  38.             e.printStackTrace();
  39.             return JsonResult.buildError(e.getMessage());
  40.         } finally {
  41.             //6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则发送通知进行异步扩容
  42.             checkInventoryBackSource(bucketContext);
  43.         }
  44.         return JsonResult.buildSuccess();
  45.     }
  46.     ...
  47. }
复制代码
(2)场景一之扣减库存
  1. 步骤一:库存扣减请求的⼊参校验
  2. 步骤二:构建扣减库存的上下文对象
  3. 步骤三:检查该笔订单号是否已经在缓存中存在
  4. 步骤四:执⾏库存扣减时会⽤到备⽤的分桶进行尝试
  5. 步骤五:扣减成功后写入⼀条执⾏明细到缓存
  6. 步骤六:⽆论整个扣减流程是否成功,都要执⾏⼀次是否扩容的校验判断
复制代码
步骤一:库存扣减请求的⼊参校验
 
步骤二:构建扣减库存的上下文对象。接下来进⾏校验和扣减都会以这个上下文对象的信息为准。构建该上下文对象时,首先会根据⼀个⾃增的访问次数key来定位本次扣减应该路由到哪个分桶。也就是使用Round Robin轮询算法,根据扣减次数定位具体要扣减哪个分桶。为避免扣减失败,会同时随机⽣成2个备⽤分桶,用于扣减失败时的重试。如果其他分桶都作为备用分桶,那么就是库存合并扣减的功能了。
 
⼀般出现连续多个分桶库存都不⾜且分桶还未被进行下线处理的概率较少,此种场景的分桶应该⼤部分都已下线,且只保留了唯⼀⼀个可⽤分桶。获取分桶元数据时,会先从本地缓存获取,然后再从Tair缓存中获取。
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     ...
  4.     //构建接下来用于处理具体扣减库存时所需要的上下文对象
  5.     private BucketContext buildDeductProductStock(InventoryRequest request) {
  6.         //1.填充扣减库存相关请求信息的明细
  7.         InventoryDetail inventoryDetail = inventoryConverter.converterRequest(request);
  8.         //2.填充扣减库存的分桶相关信息
  9.         BucketContext bucketContext = buildDeductBucketList(request);
  10.         bucketContext.setInventoryDetail(inventoryDetail);
  11.         return bucketContext;
  12.     }
  13.     //填充扣减库存的分桶相关信息
  14.     private BucketContext buildDeductBucketList(InventoryRequest request) {
  15.         BucketContext context = new BucketContext();
  16.         //获取本地缓存的分桶元数据
  17.         BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
  18.         //获取本地缓存的分桶列表
  19.         List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
  20.         //获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存
  21.         Integer incrementCount = getIncrementCount(request);
  22.         //通过取模运算得到本次访问所需要定位的分桶,使用Round Robin轮询算法
  23.         int index = incrementCount % availableList.size();
  24.         //获取本次准备处理的分桶信息
  25.         BucketCacheBO bucketCacheBO = availableList.get(index);
  26.         context.getAvailableList().add(bucketCacheBO);
  27.         context.getBucketNoList().add(bucketCacheBO.getBucketNo());
  28.         //获取分桶配置信息
  29.         context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());
  30.         //为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶
  31.         //如果其他分桶都作为备用分桶,那么就可以实现库存合并扣减的功能了
  32.         for (int i = 0; i < 2; i++) {
  33.             //任意填充2个作为备份
  34.             Random random = new Random();
  35.             int num = random.nextInt(availableList.size());
  36.             BucketCacheBO bucketCache = availableList.get(num);
  37.             //避免拿到重复的分桶,这里处理一下
  38.             if (context.getBucketNoList().contains(bucketCache.getBucketNo())) {
  39.                 i--;
  40.                 continue;
  41.             }
  42.             context.getAvailableList().add(bucketCache);
  43.             context.getBucketNoList().add(bucketCache.getBucketNo());
  44.         }
  45.         return context;
  46.     }
  47.     //获取对应售卖商品的扣减访问次数
  48.     private Integer getIncrementCount(InventoryRequest request) {
  49.         String incrementKey = TairInventoryConstant.SELLER_SKU_STOCK_COUNT_PREFIX + request.getSellerId() + request.getSkuId();
  50.         Integer incrementCount = tairCache.incr(incrementKey);
  51.         return incrementCount;
  52.     }
  53.     ...
  54. }
  55. @Component
  56. @Data
  57. public class InventoryBucketCache {
  58.     @Autowired
  59.     private Cache cache;
  60.     @Autowired
  61.     private TairCache tairCache;
  62.     ...
  63.     //获取本地缓存的分桶元数据信息
  64.     public BucketLocalCache getBucketLocalCache(String bucketKey) {
  65.         BucketLocalCache bucketLocalCache = (BucketLocalCache) cache.getIfPresent(bucketKey);
  66.         log.info("local cache get key:{}, value:{}", bucketKey, JSON.toJSONString(bucketLocalCache));
  67.         if (Objects.isNull(bucketLocalCache)) {
  68.             synchronized (bucketKey.intern()) {
  69.                 String bucketCache = tairCache.get(TairInventoryConstant.SELLER_BUCKET_PREFIX + bucketKey);
  70.                 if (!StringUtils.isEmpty(bucketCache)) {
  71.                     bucketLocalCache = JsonUtil.json2Object(bucketCache, BucketLocalCache.class);
  72.                     cache.put(bucketKey, bucketLocalCache);
  73.                 }
  74.             }
  75.         }
  76.         return bucketLocalCache;
  77.     }
  78.     ...
  79. }
  80. //分桶扣减库存上下文对象
  81. @Data
  82. public class BucketContext {
  83.     //存储可用的分桶编号
  84.     private List<String> bucketNoList = new ArrayList<>();
  85.     //存储可用的分桶的具体信息
  86.     private List<BucketCacheBO> availableList  = new ArrayList<>();
  87.     //扣减明细信息
  88.     private InventoryDetail inventoryDetail;
  89.     //用来处理扩容所用
  90.     private Map<String, Integer> capacityMap = new HashMap<>();
  91.     //当前分桶的配置信息
  92.     private InventoryBucketConfigDO inventoryBucketConfig;
  93. }
  94. //库存扣减的明细对象
  95. @Data
  96. public class InventoryDetail {
  97.     //商品SKU
  98.     private Long skuId;
  99.     //扣减库存数量
  100.     private Integer inventoryNum;
  101.     //卖家ID
  102.     private String sellerId;
  103.     //订单ID
  104.     private String orderId;
  105.     //扣减使用的分桶
  106.     private String bucketNo;
  107.     //退款编号
  108.     private String refundNo;
  109. }
复制代码
步骤三:检查该笔订单号是否已经在缓存中存在。如果已经存在则认为这笔订单已被扣减过库存了,可以直接返回。
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     @Resource
  4.     private TairCache tairCache;
  5.     ...
  6.     //验证当前得请求扣减是否已经存在了
  7.     private String getRepeatDeductInfo(BucketContext bucketContext) {
  8.         //获取当前扣减库存对应的订单明细缓存
  9.         String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();
  10.         return tairCache.exhget(key, String.valueOf(bucketContext.getInventoryDetail().getOrderId()));
  11.     }
  12.     ...
  13. }
  14. @Component
  15. public class TairCache {
  16.     private JedisPool jedisPool;
  17.     public TairCache(JedisPool jedisPool) {
  18.         this.jedisPool = jedisPool;
  19.     }
  20.     public Jedis getJedis() {
  21.         return jedisPool.getResource();
  22.     }
  23.     private TairHash createTairHash(Jedis jedis) {
  24.         return new TairHash(jedis);
  25.     }
  26.     //获取hash对象
  27.     public String exhget(String key, String field) {
  28.         try (Jedis jedis = getJedis()) {
  29.             String exhget = createTairHash(jedis).exhget(key, field);
  30.             log.info("exhget key:{}, field:{}, value:{}", key, field, exhget);
  31.             return exhget;
  32.         }
  33.     }
  34. }
复制代码
步骤四:执⾏库存扣减时会⽤到备⽤的分桶进行尝试。当第⼀次分桶扣减库存失败,默认会重试其它⼏个分桶。如果都失败则直接提示库存不⾜抛出异常。如果备用的分桶全部是可用的分桶,那么第一个分桶不够扣,就会继续扣第二个分桶,从而实现库存合并扣减。
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     @Resource
  4.     private TairCache tairCache;
  5.     ...
  6.     //扣减库存
  7.     //@param bucketContext 库存扣减上下文对象
  8.     private void deductInventory(BucketContext bucketContext) {
  9.         //获取可以使用的分桶编号,即对应缓存中的key
  10.         List<String> bucketNoList = bucketContext.getBucketNoList();
  11.         //获取扣减明细信息
  12.         InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();
  13.         //获取扣减库存数量
  14.         Integer inventoryNum = inventoryDetail.getInventoryNum();
  15.         //获取用于处理扩容的Map
  16.         Map<String, Integer> capacityMap = bucketContext.getCapacityMap();
  17.         Boolean isDeduct = false;
  18.         //对分桶进行库存扣减(每次)
  19.         //如果bucketNoList是全部可用分桶,那么第一个分桶不够扣减,就会继续扣减第二个分桶,从而实现库存合并扣减
  20.         for (String bucketNo : bucketNoList) {
  21.             //自减,默认扣减后不能小于0,否则返回-1
  22.             Integer residueNum = tairCache.decr(bucketNo, inventoryNum);
  23.             //capacityMap可用于判断分桶是否是扩容的分桶,以及标记当前分桶剩余的库存数
  24.             capacityMap.put(bucketNo, residueNum);
  25.             //库存扣减成功
  26.             if (residueNum >= 0) {
  27.                 //标记一下具体扣减的分桶属于哪个
  28.                 inventoryDetail.setBucketNo(bucketNo);
  29.                 isDeduct = true;
  30.                 break;
  31.             }
  32.         }
  33.         //分桶扣减都没有成功,此时抛出异常提示库存不足
  34.         if (!isDeduct) {
  35.             throw new BaseBizException(InventoryExceptionCode.INVENTORY_INSUFFICIENT_ERROR);
  36.         }
  37.     }
  38.     ...
  39. }
复制代码
步骤五:扣减成功后写入⼀条执⾏明细到缓存。注意并发场景下可能获取时没有明细,但写⼊时⼜有明细了。⼀旦发⽣这种场景,说明并发了,需要进⾏库存的回退。
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     @Resource
  4.     private TairCache tairCache;
  5.     @Resource
  6.     private InventoryDetailProducer inventoryDetailProducer;
  7.     ...
  8.     //将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)
  9.     private void writeInventoryDetail(BucketContext bucketContext) {
  10.         //获取扣减明细信息
  11.         InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();
  12.         String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();
  13.         //尝试写入明细记录,如果没有写入成功则说明库存需要回退
  14.         Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));
  15.         if (count < 0) {
  16.             //说明明细已经存在了,写入失败,此时需要将库存回退到对应的分桶上
  17.             tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());
  18.         } else {
  19.             //发送消息,异步写入库存扣减的明细到DB
  20.             inventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());
  21.         }
  22.     }
  23.     ...
  24. }
  25. @Component
  26. public class TairCache {
  27.     private JedisPool jedisPool;
  28.     public TairCache(JedisPool jedisPool) {
  29.         this.jedisPool = jedisPool;
  30.     }
  31.     public Jedis getJedis() {
  32.         return jedisPool.getResource();
  33.     }
  34.     private TairHash createTairHash(Jedis jedis) {
  35.         return new TairHash(jedis);
  36.     }
  37.     //存储hash对象
  38.     public Integer exhset(String key,String field, String value){
  39.         try (Jedis jedis = getJedis()) {
  40.             return createTairHash(jedis).exhset(key, field, value, ExhsetParams.ExhsetParams().nx()).intValue();
  41.         }
  42.     }
  43. }
复制代码
步骤六:⽆论整个扣减流程是否成功,都要执⾏⼀次是否扩容的校验判断。执⾏依据是分桶扣减完库存后返回的扣减后剩余库存值以及对应的分桶。例如第⼀个分桶失败,第⼆个分桶成功,必然会触发第⼀个分桶的扩容。然后校验第⼆个分桶的库存返回值是否触发回源⽐例后决定是否触发扩容。
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     ...
  4.     //扣减商品库存
  5.     @Override
  6.     public JsonResult deductProductStock(InventoryRequest request) {
  7.         //1.验证入参是否合法
  8.         checkDeductProductStock(request);
  9.         //2.构建扣减库存的上下文对象
  10.         BucketContext bucketContext = buildDeductProductStock(request);
  11.         try {
  12.             //3.获取是否已经有一条扣减明细记录,检查该笔订单号是否已经在缓存中存在
  13.             String repeatDeductInfo = getRepeatDeductInfo(bucketContext);
  14.             if (!StringUtils.isEmpty(repeatDeductInfo)) {
  15.                 return JsonResult.buildSuccess();
  16.             }
  17.             //4.执行库存扣减
  18.             deductInventory(bucketContext);
  19.             //5.写入明细,如果已重复写入,则写入失败,并回退库存
  20.             writeInventoryDetail(bucketContext);
  21.         } catch (Exception e) {
  22.             e.printStackTrace();
  23.             return JsonResult.buildError(e.getMessage());
  24.         } finally {
  25.             //6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则异步通知扩容
  26.             checkInventoryBackSource(bucketContext);
  27.         }
  28.         return JsonResult.buildSuccess();
  29.     }
  30.     ...
  31.     //检测扣减成功后的库存是否触发回源
  32.     //例如商品分桶库存1000,回源比例40%,那么实际剩余库存小于400就会触发回源库存的操作
  33.     private void checkInventoryBackSource(BucketContext bucketContext) {
  34.         //获取扣减明细信息
  35.         InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();
  36.         //存储对应的需要校验扩容的分桶
  37.         Map<String, Integer> bucketMap = bucketContext.getCapacityMap();
  38.         //获取库存分桶配置
  39.         InventoryBucketConfigDO inventoryBucketConfig = bucketContext.getInventoryBucketConfig();
  40.         //判断分桶当初分配的最大库存容量,计算是否触发回源比例
  41.         List<BucketCacheBO> availableList = bucketContext.getAvailableList();
  42.         for (BucketCacheBO bucketCacheBO : availableList) {
  43.             //具体使用的是哪个分桶扣减库存
  44.             if (bucketMap.containsKey(bucketCacheBO.getBucketNo())) {
  45.                 Integer residueNum = bucketMap.get(bucketCacheBO.getBucketNo());
  46.                 //当前分桶的分配总库存
  47.                 Integer bucketNum = bucketCacheBO.getBucketNum();
  48.                 //触发回源比例的百分比
  49.                 Integer backSourceProportion = inventoryBucketConfig.getBackSourceProportion();
  50.                 //这里如果要更准确,需要用小数得到回源数,剩余数量小于回源数,那么就要回源
  51.                 //这里省略了小数,所以可能会有一个数的误差,影响不大
  52.                 int backSourceNum = bucketNum * backSourceProportion / 100;
  53.                 //回源比例的库存 大于剩余的库存,触发异步扩容,或者没有返回剩余库存也说明扣减失败
  54.                 if (backSourceNum > residueNum) {
  55.                     //标记出回源的具体分桶
  56.                     inventoryDetail.setBucketNo(bucketCacheBO.getBucketNo());
  57.                     //发送通知到消息队列进行异步库存扩容
  58.                     sendAsynchronous(inventoryDetail);
  59.                 }
  60.             }
  61.         }
  62.     }
  63.     ...
  64. }
复制代码
(3)场景二之扩容库存
  1. 步骤一:库存分桶缓存的扣减过程会触发库存扩容
  2. 步骤二:消费者消费分桶库存扩容的消息
  3. 步骤三:检测是否需要扩容,如果⽆需扩容则直接结束
  4. 步骤四:获取分布式锁来进行扩容处理或分桶下线处理
  5. 步骤五:进行具体的库存分桶缓存的扩容处理
复制代码
步骤一:库存分桶缓存的扣减过程会触发库存扩容。执⾏库存分桶缓存的扣减时,如果发现分桶的库存剩余值小于回源配置数,此时就需要发送⼀个异步消息,通知该库存分桶进⾏扩容。
 
步骤二:消费者消费分桶库存扩容的消息。此时会调用分桶扩容接口InventoryBucketService的bucketCapacity()来进行扩容。
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     ...
  4.     //发送通知到消息队列进行异步库存扩容
  5.     //@param inventoryDetail 库存扣减明细对象
  6.     private void sendAsynchronous(InventoryDetail inventoryDetail) {
  7.         //1.构建发送的消息对象
  8.         BucketCapacity bucketCapacity = inventoryConverter.converter(inventoryDetail);
  9.         //2.发送消息,异步处理扩容
  10.         bucketCapacityProducer.sendBucketCapacity(bucketCapacity);
  11.     }
  12.     ...
  13. }
  14. //分桶扩容对象
  15. @Data
  16. public class BucketCapacity {
  17.     //分桶编号
  18.     private String bucketNo;
  19.     //商品skuID
  20.     private String skuId;
  21.     //卖家ID
  22.     private String sellerId;
  23. }
  24. //分桶扩容的消息队列
  25. @Component
  26. public class BucketCapacityProducer {
  27.     @Autowired
  28.     private DefaultProducer defaultProducer;
  29.     //分桶扩容的消息
  30.     public void sendBucketCapacity(BucketCapacity bucketCapacity) {
  31.         //发送分桶扩容消息
  32.         defaultProducer.sendMessage(RocketMqConstant.BUCKET_CAPACITY_TOPIC, JSONObject.toJSONString(bucketCapacity), "分桶扩容");
  33.     }
  34. }
  35. //消费分桶库存扩容的消息
  36. @Component
  37. public class BucketCapacityListener implements MessageListenerConcurrently {
  38.     @Autowired
  39.     private InventoryBucketService inventoryBucketService;
  40.     @Override
  41.     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  42.         try {
  43.             for (MessageExt messageExt : list) {
  44.                 String msg = new String(messageExt.getBody());
  45.                 log.info("执行分桶库存扩容,消息内容:{}", msg);
  46.                 BucketCapacity bucketCapacity = JsonUtil.json2Object(msg, BucketCapacity.class);
  47.                 //调用分桶扩容接口
  48.                 inventoryBucketService.bucketCapacity(bucketCapacity);
  49.             }
  50.         } catch (Exception e) {
  51.             log.error("consume error, 分桶库存扩容失败", e);
  52.             //本次消费失败,下次重新消费
  53.             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  54.         }
  55.         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  56.     }
  57. }
  58. @Service
  59. public class InventoryBucketServiceImpl implements InventoryBucketService {
  60.     ...
  61.     //分桶扩容接口
  62.     @Override
  63.     public void bucketCapacity(BucketCapacity bucketCapacity) {
  64.         //先锁住中心桶库存,避免此时库存发生变化
  65.         String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
  66.         String value = SnowflakeIdWorker.getCode();
  67.         //1.校验是否已经无需扩容了,如果是则快速结束
  68.         BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);
  69.         if (!bucketCapacityContext.getIsCapacity()) {
  70.             return;
  71.         }
  72.         //获取分布式锁来进行扩容处理
  73.         boolean lock = tairLock.tryLock(key, value);
  74.         if (lock) {
  75.             try {
  76.                 //再次校验是否需要扩容,此处不允许并发
  77.                 bucketCapacityContext = checkBucketCapacity(bucketCapacity);
  78.                 if (bucketCapacityContext.getIsCapacity()) {
  79.                     //2.获取中心桶缓存的剩余库存
  80.                     Integer residueNum = getCenterStock(bucketCapacity);
  81.                     //3.可以扩容,计算出可回源的库存进行处理
  82.                     if (residueNum > 0) {
  83.                         backSourceInventory(residueNum, bucketCapacityContext);
  84.                     } else {
  85.                         //4.中心桶无库存,检查是否触发下线
  86.                         checkBucketOffline(bucketCapacity);
  87.                     }
  88.                 }
  89.             } catch (Exception e) {
  90.                 e.printStackTrace();
  91.             } finally {
  92.                 tairLock.unlock(key, value);
  93.             }
  94.         } else {
  95.             throw new BaseBizException("请求繁忙,稍后重试!");
  96.         }
  97.     }
  98.     ...
  99. }
复制代码
步骤三:检测是否需要扩容,如果⽆需扩容则结束
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     ...
  4.     //校验本次请求是否还需要执行扩容处理
  5.     private BucketCapacityContext checkBucketCapacity(BucketCapacity bucketCapacity) {
  6.         String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
  7.         //1.获取远程的分桶缓存信息
  8.         Integer residueNum = getBucketInventoryNum(bucketCapacity.getBucketNo());
  9.         //2.获取缓存元数据信息
  10.         BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
  11.         //3.校验是否还需要执行扩容
  12.         List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
  13.         InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();
  14.         for (BucketCacheBO bucketCacheBO:availableList) {
  15.             //具体使用的是哪个分桶进行扣减库存
  16.             if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {
  17.                 //当前分桶的分配总库存
  18.                 Integer bucketNum = bucketCacheBO.getBucketNum();
  19.                 //触发回源比例的百分比
  20.                 Integer backSourceProportion = inventoryBucketConfig.getBackSourceProportion();
  21.                 int backSourceNum = bucketNum * backSourceProportion / 100;
  22.                 //回源比例的库存大于剩余的库存,触发异步扩容
  23.                 return new BucketCapacityContext(residueNum, backSourceNum > residueNum, bucketCapacity);
  24.             }
  25.         }
  26.         //如果不在可用列表里面,则意味已下线,快速结束掉
  27.         return new BucketCapacityContext(residueNum, false, bucketCapacity);
  28.     }
  29.     //获取得到当前分桶对应的实际剩余库存
  30.     private Integer getBucketInventoryNum( String bucketNo) {
  31.         String bucketNum = tairCache.get(bucketNo);
  32.         if (StringUtils.isEmpty(bucketNum)){
  33.             return 0;
  34.         }
  35.         return Integer.valueOf(bucketNum);
  36.     }
  37.     ...
  38. }
复制代码
步骤四:获取分布式锁来进行扩容处理或分桶下线处理。这⾥会查看中⼼桶缓存的剩余库存,判断是否还有剩余库存可以扩容。如果有则进⾏扩容,如果没有则判断分桶是否触发下线阈值。并且处理请求前,需要再次判断是否⽆需扩容。避免有竞争锁的请求跳过前面的校验进⼊锁,需要过滤掉这种⽆效请求。
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     ...
  4.     //分桶扩容接口
  5.     @Override
  6.     public void bucketCapacity(BucketCapacity bucketCapacity) {
  7.         //先锁住中心桶库存,避免此时库存发生变化
  8.         String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
  9.         String value = SnowflakeIdWorker.getCode();
  10.         //1.校验是否已经无需扩容了,如果是则快速结束
  11.         BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);
  12.         if (!bucketCapacityContext.getIsCapacity()) {
  13.             return;
  14.         }
  15.         //获取分布式锁来进行扩容处理
  16.         boolean lock = tairLock.tryLock(key, value);
  17.         if (lock) {
  18.             try {
  19.                 //再次校验是否需要扩容,此处不允许并发
  20.                 bucketCapacityContext = checkBucketCapacity(bucketCapacity);
  21.                 if (bucketCapacityContext.getIsCapacity()) {
  22.                     //2.获取中心桶缓存的剩余库存
  23.                     Integer residueNum = getCenterStock(bucketCapacity);
  24.                     //3.可以扩容,计算出可回源的库存进行处理
  25.                     if (residueNum > 0) {
  26.                         backSourceInventory(residueNum, bucketCapacityContext);
  27.                     } else {
  28.                         //4.中心桶无库存,检查是否触发下线
  29.                         checkBucketOffline(bucketCapacity);
  30.                     }
  31.                 }
  32.             } catch (Exception e) {
  33.                 e.printStackTrace();
  34.             } finally {
  35.                 tairLock.unlock(key, value);
  36.             }
  37.         } else {
  38.             throw new BaseBizException("请求繁忙,稍后重试!");
  39.         }
  40.     }
  41.     ...
  42. }
复制代码
步骤五:进行具体的库存分桶缓存的扩容处理。分桶需要扩容多少库存,需要尽量保证每个分桶的库存尽可能均匀。
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     ...
  4.     //回源库存到分桶上
  5.     //@param residueNum            中心桶库存
  6.     //@param bucketCapacityContext 扩容上下文对象
  7.     private void backSourceInventory(Integer residueNum, BucketCapacityContext bucketCapacityContext) {
  8.         //首先需要当前分桶的库存,其次还需要获取目前分桶的可发库存深度(第一次初始化的时候分配的库存)
  9.         //根据当初分配的库存深度以及最大库存深度以及中心桶库存,得出均匀到目前支持可用的分桶均匀分配库存大概数量
  10.         //同时根据本次同步的库存数量刷新分桶的实际库存深度
  11.         BucketCapacity bucketCapacity = bucketCapacityContext.getBucketCapacity();
  12.         //先获取本地的分桶元数据信息,获取当前分桶的总发放上限
  13.         String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
  14.         BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
  15.         InventoryBucketConfigDO inventoryBucketConfig = bucketLocalCache.getInventoryBucketConfig();
  16.         List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
  17.         Integer inventoryNum = 0;
  18.         //获取实际配置的最大可用库存深度
  19.         Integer maxBucketNum = availableList.stream().mapToInt(BucketCacheBO::getBucketNum).sum();
  20.         BucketCacheBO bucketCache = null;
  21.         for (BucketCacheBO bucketCacheBO : availableList) {
  22.             if (bucketCacheBO.getBucketNo().equals(bucketCapacity.getBucketNo())) {
  23.                 bucketCache = bucketCacheBO;
  24.                 break;
  25.             }
  26.         }
  27.         //这里没有匹配到分桶,则该分桶已被下线,不处理后续流程
  28.         if (Objects.isNull(bucketCache)) {
  29.             return;
  30.         }
  31.         //中心桶库存超过最大深度库存(全部分桶总计),直接以配置的回源步长增长库存
  32.         if (residueNum > maxBucketNum) {
  33.             inventoryNum = inventoryBucketConfig.getBackSourceStep();
  34.         } else {
  35.             inventoryNum = calcEvenInventoryNum(maxBucketNum, inventoryBucketConfig, residueNum, bucketCache);
  36.         }
  37.         //填充变更元数据关于库存的深度数据
  38.         Integer maxDepthNum = getMaxDepthNum(inventoryNum, inventoryBucketConfig, bucketCache, bucketCapacityContext);
  39.         //更新分桶元数据相关信息,注意需要判断当前分桶的库存深度是否真实发生变化,如无变化则不需要更新
  40.         refreshBucketCache(maxDepthNum, bucketLocalCache, bucketCapacity.getBucketNo(), inventoryNum);
  41.         log.info("本次分桶:{},回源库存:{}", bucketCapacity.getBucketNo(), inventoryNum);
  42.         //回源分桶的库存
  43.         Integer incr = tairCache.incr(bucketCapacity.getBucketNo(), inventoryNum);
  44.         //扣减中心桶库存
  45.         Integer decr = tairCache.decr(TairInventoryConstant.SELLER_INVENTORY_PREFIX + key, inventoryNum);
  46.         log.info("本次分桶:{},回源库存:{}, 回源后分桶库存:{}, 中心桶剩余库存:{}", bucketCapacity.getBucketNo(), inventoryNum, incr, decr);
  47.     }
  48.     ...
  49. }
复制代码
(4)场景三之分桶下线
说明一:当中⼼桶库存缓存⽆剩余库存,分桶库存也处于下线的阈值时。为了避免碎⽚化的问题出现,需要将⼀些⼩于阈值的库存进⾏分桶回收。分桶回收也就是,将库存回源到中⼼桶,提供给其它分桶扩容。如此反复,当库存越来越少,最终只留下⼀个分桶扣减库存。
 
说明二:分桶下线采取异步⽅式执⾏,因为分桶下线请求的路由需要时间,所以这⾥对于分桶下线需要先把下线分桶从可⽤列表移除。再通过⼀个延迟消息将对应分桶⾥的库存回源到中⼼桶,避免库存超发。
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     ...
  4.     //校验当前分桶是否触发下线的阈值
  5.     private void checkBucketOffline(BucketCapacity bucketCapacity) {
  6.         //1.获取当前分桶的配置信息
  7.         String key = bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
  8.         BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(key);
  9.         //2.检测分桶的库存是否触发下线阈值,先获取当前分桶的具体库存以及下线配置阈值
  10.         Integer thresholdValue = bucketLocalCache.getInventoryBucketConfig().getThresholdValue();
  11.         Integer inventoryNum = getBucketInventoryNum(bucketCapacity.getBucketNo());
  12.         //3.如果触发下线,发送消息调用分桶下线
  13.         if (thresholdValue > inventoryNum) {
  14.             log.info("触发下线{},阈值{},当前库存值{}", thresholdValue > inventoryNum, thresholdValue, inventoryNum);
  15.             sendAsynchronous(bucketCapacity);
  16.         }
  17.     }
  18.     //对分桶进行异步下线
  19.     private void sendAsynchronous(BucketCapacity bucketCapacity) {
  20.         //1.构建分桶下线接口模型
  21.         InventorOfflineRequest offlineRequest = buildOfflineBucketInfo(bucketCapacity);
  22.         //2.发送消息,通知处理分桶下线
  23.         bucketOfflineProducer.sendBucketOffline(offlineRequest);
  24.     }
  25.     ...
  26. }
复制代码
 
3.库存分桶扣减后异步更新DB
(1)使用入口
(2)具体实现
 
(1)使用入口
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     @Resource
  4.     private TairCache tairCache;
  5.     //扣减商品库存
  6.     @Override
  7.     public JsonResult deductProductStock(InventoryRequest request) {
  8.         //1.验证入参是否合法
  9.         checkDeductProductStock(request);
  10.         //2.构建扣减库存的上下文对象
  11.         BucketContext bucketContext = buildDeductProductStock(request);
  12.         try {
  13.             //3.获取是否已经有一条扣减明细记录,检查该笔订单号是否已经在缓存中存在
  14.             String repeatDeductInfo = getRepeatDeductInfo(bucketContext);
  15.             if (!StringUtils.isEmpty(repeatDeductInfo)) {
  16.                 return JsonResult.buildSuccess();
  17.             }
  18.             //4.执行库存扣减
  19.             deductInventory(bucketContext);
  20.             //5.写入明细,如果已重复写入,则写入失败,并回退库存
  21.             writeInventoryDetail(bucketContext);
  22.         } catch (Exception e) {
  23.             e.printStackTrace();
  24.             return JsonResult.buildError(e.getMessage());
  25.         } finally {
  26.             //6.检测当前返回的库存数量是否触发扩容的阈值(回源比例),触发则异步通知扩容
  27.             checkInventoryBackSource(bucketContext);
  28.         }
  29.         return JsonResult.buildSuccess();
  30.     }
  31.     //将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)
  32.     private void writeInventoryDetail(BucketContext bucketContext) {
  33.         //获取扣减明细信息
  34.         InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();
  35.         String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();
  36.         //尝试写入明细记录,如果没有写入成功则说明库存需要回退
  37.         Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));
  38.         if (count < 0) {
  39.             //说明明细已经存在了,写入失败,此时需要将库存回退到对应的分桶上
  40.             tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());
  41.         } else {
  42.             //发送消息,异步写入库存扣减的明细到DB
  43.             inventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());
  44.         }
  45.     }
  46.     ...
  47. }
  48. //库存扣减明细
  49. @Component
  50. public class InventoryDetailProducer {
  51.     @Autowired
  52.     private DefaultProducer defaultProducer;
  53.     //库存扣减明细 MQ生产
  54.     public void sendInventoryDetail(InventoryDetail inventoryDetail) {
  55.         //发送库存扣减 明细保存消息
  56.         defaultProducer.sendAsyncMessage(RocketMqConstant.INVENTORY_DETAIL_TOPIC, JSONObject.toJSONString(inventoryDetail), "库存扣减明细");
  57.     }
  58. }
复制代码
(2)具体实现
说明一:当有任意⼀个库存发⽣扣减或者撤销时,会将此次发⽣变化的库存具体明细进⾏消息发送。
 
说明二:扣减需要验证是否已有该订单号的库存明细记录,同样的记录只允许⽣成⼀次,重复则抛出唯⼀索引异常。捕获异常返回成功消息,不写⼊数据库。
 
说明三:写入数据库时,通过批量插入库存明细代替单条数据插入。
  1. //处理库存扣减明细的记录消息
  2. @Component
  3. public class InventoryDetailListener implements MessageListenerConcurrently {
  4.     @Resource
  5.     private InventoryRepository inventoryRepository;
  6.     @Override
  7.     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  8.         try {
  9.             log.info("执行库存扣减明细保存,消息数量:{}", list.size());
  10.             List<InventoryDetail> inventoryDetailList = new ArrayList<>(list.size());
  11.             for (MessageExt messageExt : list) {
  12.                 String msg = new String(messageExt.getBody());
  13.                 InventoryDetail inventoryDetail = JsonUtil.json2Object(msg, InventoryDetail.class);
  14.                 inventoryDetailList.add(inventoryDetail);
  15.             }
  16.             inventoryRepository.saveInventoryDetailList(inventoryDetailList, InventoryDeductionTypeEnum.INVENTORY_DETAIL_DEDUCTIONS_TYPE.getCode());
  17.         } catch (DuplicateKeyException ex) {
  18.             log.error("consume repeat, 库存扣减明细重复记录,不再重复执行", ex);
  19.             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  20.         } catch (Exception e) {
  21.             log.error("consume error, 库存扣减明细保存失败", e);
  22.             //本次消费失败,下次重新消费
  23.             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  24.         }
  25.         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  26.     }
  27. }
  28. @Repository
  29. public class InventoryRepository {
  30.     ...
  31.     //批量保存库存扣减的明细
  32.     public void saveInventoryDetailList(List<InventoryDetail> inventoryDetailList, Integer deductionType) throws DuplicateKeyException {
  33.         List<InventoryDeductionDetailDO> inventoryDeductionDetailDOList = inventoryConverter.converterDOList(inventoryDetailList);
  34.         //对象转换赋值
  35.         for (InventoryDeductionDetailDO inventoryDeductionDetailDO : inventoryDeductionDetailDOList) {
  36.             inventoryDeductionDetailDO.setDeductionType(deductionType);
  37.             inventoryDeductionDetailDO.initCommon();
  38.         }
  39.         int count = inventoryDetailMapper.insertBatch(inventoryDeductionDetailDOList);
  40.         if (count <= 0) {
  41.             throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL);
  42.         }
  43.     }
  44.     ...
  45. }
复制代码
 
8.分桶库存扣减完毕后扣减明细异步落库
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     @Resource
  4.     private TairCache tairCache;
  5.     @Resource
  6.     private InventoryDetailProducer inventoryDetailProducer;
  7.     ...
  8.     //将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)
  9.     private void writeInventoryDetail(BucketContext bucketContext) {
  10.         //获取扣减明细信息
  11.         InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();
  12.         String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();
  13.         //尝试写入明细记录,如果没有写入成功则说明库存需要回退
  14.         Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));
  15.         if (count < 0) {
  16.             //说明明细已经存在了,写入失败,此时需要将库存回退到对应的分桶上
  17.             tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());
  18.         } else {
  19.             //发送消息,异步写入库存扣减的明细到DB
  20.             inventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());
  21.         }
  22.     }
  23.     ...
  24. }
  25. @Component
  26. public class TairCache {
  27.     private JedisPool jedisPool;
  28.     public TairCache(JedisPool jedisPool) {
  29.         this.jedisPool = jedisPool;
  30.     }
  31.     public Jedis getJedis() {
  32.         return jedisPool.getResource();
  33.     }
  34.     private TairHash createTairHash(Jedis jedis) {
  35.         return new TairHash(jedis);
  36.     }
  37.     //存储hash对象
  38.     public Integer exhset(String key,String field, String value){
  39.         try (Jedis jedis = getJedis()) {
  40.             return createTairHash(jedis).exhset(key, field, value, ExhsetParams.ExhsetParams().nx()).intValue();
  41.         }
  42.     }
  43. }//处理库存扣减明细的记录消息@Componentpublic class InventoryDetailListener implements MessageListenerConcurrently {    @Resource    private InventoryRepository inventoryRepository;    @Override    public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {        try {            log.info("执行库存扣减明细保存,消息数量:{}", list.size());            List inventoryDetailList = new ArrayList(list.size());            for (MessageExt messageExt : list) {                String msg = new String(messageExt.getBody());                InventoryDetail inventoryDetail = JsonUtil.json2Object(msg, InventoryDetail.class);                inventoryDetailList.add(inventoryDetail);            }            inventoryRepository.saveInventoryDetailList(inventoryDetailList, InventoryDeductionTypeEnum.INVENTORY_DETAIL_DEDUCTIONS_TYPE.getCode());        } catch (DuplicateKeyException ex) {            log.error("consume repeat, 库存扣减明细重复记录,不再重复执行", ex);            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;        } catch (Exception e) {            log.error("consume error, 库存扣减明细保存失败", e);            //本次消费失败,下次重新消费            return ConsumeConcurrentlyStatus.RECONSUME_LATER;        }        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;    }}
复制代码
 
9.分桶扣减完库存后异步触发回源扩容
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     @Resource
  4.     private InventoryRepository inventoryRepository;
  5.     ...
  6.     //保存退还库存记录
  7.     private InventoryDetail saveRefundInventoryDetail(InventoryRequest request) {
  8.         InventoryDetail inventoryDetail = inventoryRepository.getInventoryDetail(request.getOrderId());
  9.         if (Objects.isNull(inventoryDetail)) {
  10.             throw new BaseBizException(InventoryExceptionCode.INVENTORY_DETAIL_NULL_ERROR, InventoryExceptionCode.INVENTORY_DETAIL_NULL_ERROR.getErrorCode());
  11.         }
  12.         //校验对应订单的退款库存申请记录
  13.         List<InventoryDetail> inventoryDetails = inventoryRepository.queryRefundInventoryDetailList(request.getOrderId());
  14.         Integer refundNum = 0;
  15.         //校验已退库存 + 本次申请退货库存 是否超过下单库存
  16.         if (!CollectionUtils.isEmpty(inventoryDetails)) {
  17.             for (InventoryDetail inventoryDetail1 : inventoryDetails) {
  18.                 refundNum = refundNum + inventoryDetail1.getInventoryNum();
  19.             }
  20.         }
  21.         //如果扣减的库存大于已退库存和本次退的库存,则允许退货
  22.         if (inventoryDetail.getInventoryNum() > (refundNum + request.getInventoryNum())) {
  23.             InventoryDetail inventoryRefundDetail = inventoryConverter.converterRequest(request);
  24.             inventoryRepository.saveInventoryDetail(inventoryRefundDetail, InventoryDeductionTypeEnum.INVENTORY_DETAIL_REFUND_TYPE.getCode());
  25.             return inventoryRefundDetail;
  26.         }
  27.         return null;
  28.     }
  29.     ...
  30. }
  31. @Repository
  32. public class InventoryRepository {
  33.     ...
  34.     //根据订单号查询 库存扣减明细
  35.     public InventoryDetail getInventoryDetail(String orderId) {
  36.         LambdaQueryWrapper<InventoryDeductionDetailDO> queryWrapper = Wrappers.lambdaQuery();
  37.         queryWrapper.eq(InventoryDeductionDetailDO::getOrderId, orderId);
  38.         queryWrapper.eq(InventoryDeductionDetailDO::getDeductionType, InventoryDeductionTypeEnum.INVENTORY_DETAIL_DEDUCTIONS_TYPE.getCode());
  39.         InventoryDeductionDetailDO inventoryDeductionDetailDO = inventoryDetailMapper.selectOne(queryWrapper);
  40.         return inventoryConverter.converter(inventoryDeductionDetailDO);
  41.     }
  42.     //获取扣减库存对应的已退明细
  43.     public List<InventoryDetail> queryRefundInventoryDetailList(String orderId) {
  44.         LambdaQueryWrapper<InventoryDeductionDetailDO> queryWrapper = Wrappers.lambdaQuery();
  45.         queryWrapper.eq(InventoryDeductionDetailDO::getOrderId, orderId);
  46.         queryWrapper.eq(InventoryDeductionDetailDO::getDeductionType, InventoryDeductionTypeEnum.INVENTORY_DETAIL_REFUND_TYPE.getCode());
  47.         List<InventoryDeductionDetailDO> inventoryDeductionDetailDOS = inventoryDetailMapper.selectList(queryWrapper);
  48.         return inventoryConverter.converterList(inventoryDeductionDetailDOS);
  49.     }
  50.     //保存库存扣减的明细
  51.     public void saveInventoryDetail(InventoryDetail inventoryDetail, Integer deductionType) throws DuplicateKeyException {
  52.         InventoryDeductionDetailDO inventoryDeductionDetailDO = inventoryConverter.converterDO(inventoryDetail);
  53.         inventoryDeductionDetailDO.setDeductionType(deductionType);
  54.         inventoryDeductionDetailDO.initCommon();
  55.         int count = inventoryDetailMapper.insert(inventoryDeductionDetailDO);
  56.         if (count <= 0) {
  57.             throw new BaseBizException(InventoryExceptionCode.INVENTORY_SQL);
  58.         }
  59.     }
  60.     ...
  61. }
复制代码
 
10.库存分桶回源扩容的Double Check
  1. @Service
  2. public class InventoryBucketServiceImpl implements InventoryBucketService {
  3.     ...
  4.     //分桶扩容接口
  5.     @Override
  6.     public void bucketCapacity(BucketCapacity bucketCapacity) {
  7.         //先锁住中心桶库存,避免此时库存发生变化
  8.         String key = TairInventoryConstant.SELLER_INVENTORY_LOCK_PREFIX + bucketCapacity.getSellerId() + bucketCapacity.getSkuId();
  9.         String value = SnowflakeIdWorker.getCode();
  10.         //1.校验是否已经无需扩容了,如果是则快速结束
  11.         BucketCapacityContext bucketCapacityContext = checkBucketCapacity(bucketCapacity);
  12.         if (!bucketCapacityContext.getIsCapacity()) {
  13.             return;
  14.         }
  15.         //获取分布式锁来进行扩容处理
  16.         boolean lock = tairLock.tryLock(key, value);
  17.         if (lock) {
  18.             try {
  19.                 //再次校验是否需要扩容,此处不允许并发
  20.                 bucketCapacityContext = checkBucketCapacity(bucketCapacity);
  21.                 if (bucketCapacityContext.getIsCapacity()) {
  22.                     //2.获取中心桶缓存的剩余库存
  23.                     Integer residueNum = getCenterStock(bucketCapacity);
  24.                     //3.可以扩容,计算出可回源的库存进行处理
  25.                     if (residueNum > 0) {
  26.                         backSourceInventory(residueNum, bucketCapacityContext);
  27.                     } else {
  28.                         //4.中心桶无库存,检查是否触发下线
  29.                         checkBucketOffline(bucketCapacity);
  30.                     }
  31.                 }
  32.             } catch (Exception e) {
  33.                 e.printStackTrace();
  34.             } finally {
  35.                 tairLock.unlock(key, value);
  36.             }
  37.         } else {
  38.             throw new BaseBizException("请求繁忙,稍后重试!");
  39.         }
  40.     }
  41.     ...
  42. }
复制代码
 
11.库存分桶扩容量计算算法实现
分桶需要扩容多少库存,需要注意尽量保证每个分桶的库存尽可能均匀。
 
如果中心桶库存超过最大深度库存,则直接以配置的回源步长增长库存,否则汇总当前分桶的实际库存深度。也就是根据当前的可⽤分桶列表、中⼼桶库存、总的可⽤库存深度,计算出平均的⼀个可分配库存数量。从而避免每个分桶扩容的库存不均匀(最⼩值必须超过最⼩库存深度)。
 
如果扩容的库存深度超过当时分配的库存深度,且未超过最⼤库存深度,则以当前分配的实际库存更新当前分桶库存深度。
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     @Resource
  4.     private InventoryBucketCache inventoryBucketCache;
  5.     @Resource
  6.     private TairCache tairCache;
  7.     ...
  8.     //查询返回卖家的商品实际库存信息
  9.     private List<InventoryResponseDTO> queryInventoryCacheList(InventoryQueryRequest queryRequest) {
  10.         //1.获取组装查询的缓存key
  11.         List<String> skuIdList = queryRequest.getSkuIdList();
  12.         List<InventoryResponseDTO> inventoryResponseDTOList = new ArrayList<>();
  13.         for (String skuId : skuIdList) {
  14.             //商品库存的分桶元数据的缓存key
  15.             String cacheKey = queryRequest.getSellerId() + skuId;
  16.             //获取分桶元数据的缓存,先查本地缓存,再查远程缓存
  17.             BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(cacheKey);
  18.             if (Objects.isNull(bucketLocalCache)) {
  19.                 continue;
  20.             }
  21.             List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
  22.             availableList.addAll(bucketLocalCache.getUndistributedList());
  23.             //获取存有库存值的缓存key
  24.             List<String> bucketList = availableList.stream().map(BucketCacheBO::getBucketNo).collect(Collectors.toList());
  25.             //将缓存中心桶剩余库存的缓存key添加到bucketList
  26.             String key = TairInventoryConstant.SELLER_INVENTORY_PREFIX + cacheKey;
  27.             bucketList.add(key);
  28.             //获取每个缓存key对应的库存值
  29.             List<String> bucketNumList = tairCache.mget(bucketList);
  30.             Integer inventoryNum = 0;
  31.             for (String bucketNum : bucketNumList) {
  32.                 if (!Objects.isNull(bucketNum)) {
  33.                     inventoryNum = inventoryNum + Integer.valueOf(bucketNum);
  34.                 }
  35.             }
  36.             //构建商品库存模型
  37.             InventoryResponseDTO inventoryResponseDTO = new InventoryResponseDTO();
  38.             inventoryResponseDTO.setSellerId(queryRequest.getSellerId());
  39.             inventoryResponseDTO.setSellerId(skuId);
  40.             inventoryResponseDTO.setInventoryNum(inventoryNum);
  41.             inventoryResponseDTOList.add(inventoryResponseDTO);
  42.         }
  43.         return inventoryResponseDTOList;
  44.     }
  45.     ...
  46. }
复制代码
 
12.库存分桶扩容完成以及分桶下线触发
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     ...
  4.     //构建接下来用于处理具体扣减库存时所需要的上下文对象
  5.     private BucketContext buildDeductProductStock(InventoryRequest request) {
  6.         //1.填充扣减库存相关请求信息的明细
  7.         InventoryDetail inventoryDetail = inventoryConverter.converterRequest(request);
  8.         //2.填充扣减库存的分桶相关信息
  9.         BucketContext bucketContext = buildDeductBucketList(request);
  10.         bucketContext.setInventoryDetail(inventoryDetail);
  11.         return bucketContext;
  12.     }
  13.     //填充扣减库存的分桶相关信息
  14.     private BucketContext buildDeductBucketList(InventoryRequest request) {
  15.         BucketContext context = new BucketContext();
  16.         //获取本地缓存的分桶元数据
  17.         BucketLocalCache bucketLocalCache = inventoryBucketCache.getBucketLocalCache(request.getSellerId() + request.getSkuId());
  18.         //获取本地缓存的分桶列表
  19.         List<BucketCacheBO> availableList = bucketLocalCache.getAvailableList();
  20.         //获取本次扣减请求对应的扣减次数,用来定位具体使用的分桶库存
  21.         Integer incrementCount = getIncrementCount(request);
  22.         //通过运算得到本次访问所需要定位的分桶,使用Round Robin轮询算法
  23.         int index = incrementCount % availableList.size();
  24.         //获取本次准备处理的分桶信息
  25.         BucketCacheBO bucketCacheBO = availableList.get(index);
  26.         context.getAvailableList().add(bucketCacheBO);
  27.         context.getBucketNoList().add(bucketCacheBO.getBucketNo());
  28.         //获取分桶配置信息
  29.         context.setInventoryBucketConfig(bucketLocalCache.getInventoryBucketConfig());
  30.         //为了避免扣减失败(分桶已下线或者库存不足),可以多备份几个分桶
  31.         for (int i = 0; i < 2; i++) {
  32.             //任意填充2个作为备份
  33.             Random random = new Random();
  34.             int num = random.nextInt(availableList.size());
  35.             BucketCacheBO bucketCache = availableList.get(num);
  36.             //避免拿到重复的分桶,这里处理一下
  37.             if (context.getBucketNoList().contains(bucketCache.getBucketNo())) {
  38.                 i--;
  39.                 continue;
  40.             }
  41.             context.getAvailableList().add(bucketCache);
  42.             context.getBucketNoList().add(bucketCache.getBucketNo());
  43.         }
  44.         return context;
  45.     }
  46.     //获取对应售卖商品的扣减访问次数
  47.     private Integer getIncrementCount(InventoryRequest request) {
  48.         String incrementKey = TairInventoryConstant.SELLER_SKU_STOCK_COUNT_PREFIX + request.getSellerId() + request.getSkuId();
  49.         Integer incrementCount = tairCache.incr(incrementKey);
  50.         return incrementCount;
  51.     }
  52.     ...
  53. }
复制代码
 
13.库存分桶下线以及剩余存量归还中心桶
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     @Resource
  4.     private TairCache tairCache;
  5.     ...
  6.     //扣减库存
  7.     //@param bucketContext 库存扣减上下文对象
  8.     private void deductInventory(BucketContext bucketContext) {
  9.         //获取可以使用的分桶编号,即对应缓存中的key
  10.         List<String> bucketNoList = bucketContext.getBucketNoList();
  11.         //获取扣减明细信息
  12.         InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();
  13.         //获取扣减库存数量
  14.         Integer inventoryNum = inventoryDetail.getInventoryNum();
  15.         //获取用于处理扩容的Map
  16.         Map<String, Integer> capacityMap = bucketContext.getCapacityMap();
  17.         Boolean isDeduct = false;
  18.         //对分桶进行库存扣减(每次)
  19.         for (String bucketNo : bucketNoList) {
  20.             //自减,默认扣减后不能小于0,否则返回-1
  21.             Integer residueNum = tairCache.decr(bucketNo, inventoryNum);
  22.             //capacityMap可用于判断分桶是否是扩容的分桶,以及标记当前分桶剩余的库存数
  23.             capacityMap.put(bucketNo, residueNum);
  24.             //库存扣减成功
  25.             if (residueNum >= 0) {
  26.                 //标记一下具体扣减的分桶属于哪个
  27.                 inventoryDetail.setBucketNo(bucketNo);
  28.                 isDeduct = true;
  29.                 break;
  30.             }
  31.         }
  32.         //分桶扣减都没有成功,此时抛出异常提示库存不足
  33.         if (!isDeduct) {
  34.             throw new BaseBizException(InventoryExceptionCode.INVENTORY_INSUFFICIENT_ERROR);
  35.         }
  36.     }
  37.     ...
  38. }
复制代码
 
14.库存下线触发剩余库存总量预警机制
  1. @Service
  2. public class InventoryServiceImpl implements InventoryService {
  3.     @Resource
  4.     private TairCache tairCache;
  5.     @Resource
  6.     private InventoryDetailProducer inventoryDetailProducer;
  7.     ...
  8.     //将库存明细写入缓存(写入失败则代表已经被写入了,此时需要回退库存)
  9.     private void writeInventoryDetail(BucketContext bucketContext) {
  10.         //获取扣减明细信息
  11.         InventoryDetail inventoryDetail = bucketContext.getInventoryDetail();
  12.         String key = TairInventoryConstant.SELLER_SKU_TRADE_DETAIL_PREFIX + bucketContext.getInventoryDetail().getSellerId();
  13.         //尝试写入明细记录,如果没有写入成功则说明库存需要回退
  14.         Integer count = tairCache.exhset(key, String.valueOf(inventoryDetail.getOrderId()), JSONObject.toJSONString(inventoryDetail));
  15.         if (count < 0) {
  16.             //说明明细已经存在了,写入失败,此时需要将库存回退到对应的分桶上
  17.             tairCache.incr(inventoryDetail.getBucketNo(), inventoryDetail.getInventoryNum());
  18.         } else {
  19.             //发送消息,异步写入库存扣减的明细到DB
  20.             inventoryDetailProducer.sendInventoryDetail(bucketContext.getInventoryDetail());
  21.         }
  22.     }
  23.     ...
  24. }
  25. @Component
  26. public class TairCache {
  27.     private JedisPool jedisPool;
  28.     public TairCache(JedisPool jedisPool) {
  29.         this.jedisPool = jedisPool;
  30.     }
  31.     public Jedis getJedis() {
  32.         return jedisPool.getResource();
  33.     }
  34.     private TairHash createTairHash(Jedis jedis) {
  35.         return new TairHash(jedis);
  36.     }
  37.     //存储hash对象
  38.     public Integer exhset(String key,String field, String value){
  39.         try (Jedis jedis = getJedis()) {
  40.             return createTairHash(jedis).exhset(key, field, value, ExhsetParams.ExhsetParams().nx()).intValue();
  41.         }
  42.     }
  43. }
  44. //处理库存扣减明细的记录消息
  45. @Component
  46. public class InventoryDetailListener implements MessageListenerConcurrently {
  47.     @Resource
  48.     private InventoryRepository inventoryRepository;
  49.     @Override
  50.     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  51.         try {
  52.             log.info("执行库存扣减明细保存,消息数量:{}", list.size());
  53.             List<InventoryDetail> inventoryDetailList = new ArrayList<>(list.size());
  54.             for (MessageExt messageExt : list) {
  55.                 String msg = new String(messageExt.getBody());
  56.                 InventoryDetail inventoryDetail = JsonUtil.json2Object(msg, InventoryDetail.class);
  57.                 inventoryDetailList.add(inventoryDetail);
  58.             }
  59.             inventoryRepository.saveInventoryDetailList(inventoryDetailList, InventoryDeductionTypeEnum.INVENTORY_DETAIL_DEDUCTIONS_TYPE.getCode());
  60.         } catch (DuplicateKeyException ex) {
  61.             log.error("consume repeat, 库存扣减明细重复记录,不再重复执行", ex);
  62.             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  63.         } catch (Exception e) {
  64.             log.error("consume error, 库存扣减明细保存失败", e);
  65.             //本次消费失败,下次重新消费
  66.             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  67.         }
  68.         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  69.     }
  70. }
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册