找回密码
 立即注册
首页 业界区 业界 商品中心—10.商品B端搜索系统的说明文档 ...

商品中心—10.商品B端搜索系统的说明文档

欤夤 2025-6-22 22:19:06
大纲
1.商品B端搜索系统的运行流程 + 缓存和索引设计
2.商品B端搜索系统监听数据变更与写入ES索引
3.商品B端搜索系统的历史搜索词的实现
4.商品B端搜索系统的搜索词补全的实现
5.商品B端搜索系统的搜索接口实现
6.索引重建
 
1.商品B端搜索系统的运行流程 + 缓存和索引设计
(1)运行流程
(2)Redis缓存设计
(3)索引设计
 
(1)运行流程
1.webp
(2)Redis缓存设计
使用Redis缓存用户的搜索词记录,比如保存最近的10个搜索词记录,使⽤的数据结构:list。
  1. key的格式:history_search_words:{userId}
  2. value的格式:["⽜奶", "鸡蛋", "⻁⽪凤⽖", "正⼤蒸饺"]
复制代码
(3)索引设计
一.商品索引
二.索引字段说明
三.数据示例
四.搜索补全索引
 
一.商品索引
  1. PUT /sku_info_index
  2. {
  3.     "settings": {
  4.         "number_of_shards": 3,
  5.         "number_of_replicas": 1
  6.     },
  7.     "mappings": {
  8.         "properties": {
  9.             "skuName": {
  10.                 "type": "text",
  11.                 "analyzer": "ik_max_word",
  12.                 "search_analyzer": "ik_smart"
  13.             },
  14.             "basePrice": {
  15.                 "type": "integer"
  16.             },
  17.             "vipPrice": {
  18.                 "type": "integer"
  19.             },
  20.             "brandId": {
  21.                 "type": "keyword"
  22.             },
  23.             "brandName": {
  24.                 "type": "keyword"
  25.             },
  26.             "saleCount": {
  27.                 "type": "integer"
  28.             },
  29.             "createTime": {
  30.                 "type": "date",
  31.                 "format": "yyyy-MM-dd HH:mm:ss"
  32.             },
  33.             "updateTime": {
  34.                 "type": "date",
  35.                 "format": "yyyy-MM-dd HH:mm:ss"
  36.             }
  37.         }
  38.     }
  39. }
复制代码
二.索引字段说明
2.webp
三.数据示例
  1. {
  2.     "_index": "sku_info_index",
  3.     "_type": "_doc",
  4.     "_id": "8000177337",
  5.     "_score": 1.0,
  6.     "_source": {
  7.         "skuName": "Apple iPhone 13 Pro Max 256GB 苍岭绿⾊ ⽀持移动联通电信5G 双卡双待⼿机",
  8.         "brandName": "苹果",
  9.         "createTime": "2022-03-12 08:24:57",
  10.         "brandId": 4,
  11.         "vipPrice": 9799,
  12.         "updateTime": "2022-03-12 08:24:57",
  13.         "basePrice": 9999
  14.     }
  15. }
复制代码
四.搜索补全索引
  1. put /completion_word_index
  2. {
  3.     "settings": {
  4.         "number_of_shards": 3,
  5.         "number_of_replicas": 1,
  6.         "analysis": {
  7.             "analyzer": {
  8.                 "ik_and_pinyin_analyzer": {
  9.                     "type": "custom",
  10.                     "tokenizer": "ik_smart",
  11.                     "filter": "my_pinyin"
  12.                 }
  13.             },
  14.             "filter": {
  15.                 "my_pinyin": {
  16.                     "type": "pinyin",
  17.                     "keep_first_letter": true,
  18.                     "keep_full_pinyin": true,
  19.                     "keep_original": true,
  20.                     "remove_duplicated_term": true
  21.                 }
  22.             }
  23.         }
  24.     },
  25.     "mappings": {
  26.         "properties": {
  27.             "completion_word": {
  28.                 "type": "completion",
  29.                 "analyzer": "ik_and_pinyin_analyzer"
  30.             }
  31.         }
  32.     }
  33. }
复制代码
 
2.商品B端搜索系统监听数据变更与写入ES索引
(1)消息处理系统添加数据监听配置
(2)商品B端搜索系统下的数据变更消息消费者
(3)sku表变更消息处理器
(4)item表变更消息处理器
 
(1)消息处理系统添加数据监听配置
一.data_change_listen_config表
  1. INSERT INTO data_change_listen_config (id, table_name, key_column, filter_flag, del_flag, create_user, create_time, update_user, update_time)
  2. VALUES (1, 'sku_info', 'sku_id', 1, 1, 0, '2022-02-25 13:42:28', 0, '2022-02-25 13:42:28');
  3. INSERT INTO data_change_listen_config (id, table_name, key_column, filter_flag, del_flag, create_user, create_time, update_user, update_time)
  4. VALUES (2, 'item_info', 'item_id', 1, 1, 0, '2022-02-25 13:42:28', 0, '2022-02-25 13:42:28');
复制代码
二.data_change_column_config表
  1. INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)
  2. VALUES (1, 1, 'sku_name', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');
  3. INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)
  4. VALUES (2, 1, 'channel', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');
  5. INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)
  6. VALUES (3, 1, 'features', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');
  7. INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)
  8. VALUES (4, 1, 'vip_price', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');
  9. INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)
  10. VALUES (5, 1, 'base_price', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');
  11. INSERT INTO data_change_column_config (id, listen_id, listen_column, del_flag, create_user, create_time, update_user, update_time)
  12. VALUES (6, 2, 'brand_id', 1, 0, '2022-02-25 13:43:28', 0, '2022-02-25 13:43:28');
复制代码
三.data_change_message_config表
  1. INSERT INTO data_change_message_config (id, listen_id, notify_column, message_topic, delay_level, message_type, del_flag, create_user, create_time, update_user, update_time)
  2. VALUES (1, 1, 'id,sku_id', 'product_update_topic', 3, 1, 1, 0, '2022-02-25 13:45:24', 0, '2022-02-25 13:45:24');
  3. INSERT INTO data_change_message_config (id, listen_id, notify_column, message_topic, delay_level, message_type, del_flag, create_user, create_time, update_user, update_time)
  4. VALUES (3, 2, 'id,item_id', 'product_update_topic', 3, 1, 1, 0, '2022-02-25 13:45:24', 0, '2022-02-25 13:45:24');
复制代码
(2)商品B端搜索系统下的数据变更消息消费者
  1. @Configuration
  2. public class ConsumerBeanConfig {
  3.     //配置内容对象
  4.     @Autowired
  5.     private RocketMQProperties rocketMQProperties;
  6.    
  7.     //监听商品修改的MQ消息
  8.     @Bean("productUpdateTopic")
  9.     public DefaultMQPushConsumer productUpdateTopic(ProductUpdateListener productUpdateListener) throws MQClientException {
  10.         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMqConstant.PRODUCT_UPDATE_CONSUMER_GROUP);
  11.         consumer.setNamesrvAddr(rocketMQProperties.getNameServer());
  12.         consumer.subscribe(RocketMqConstant.PRODUCT_UPDATE_TOPIC, "*");
  13.         consumer.registerMessageListener(productUpdateListener);
  14.         consumer.start();
  15.         return consumer;
  16.     }
  17. }
  18. //搜索模块在商品变更的时候更新商品索引
  19. @Component
  20. public class ProductUpdateListener implements MessageListenerConcurrently {
  21.     @Autowired
  22.     private MessageHandlerManager messageHandlerManager;
  23.     @Override
  24.     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
  25.         try {
  26.             for (MessageExt messageExt : list) {
  27.                 String msg = new String(messageExt.getBody());
  28.                 log.info("执行商品索引数据更新逻辑,消息内容:{}", msg);
  29.   
  30.                 TableDataChangeDTO tableDataChangeDTO = JsonUtil.json2Object(msg, TableDataChangeDTO.class);
  31.                 //处理消息
  32.                 messageHandlerManager.handleMessage(tableDataChangeDTO);
  33.             }
  34.         } catch (Exception e){
  35.             log.error("consume error, 商品索引数据更新失败", e);
  36.             //本次消费失败,下次重新消费
  37.             return ConsumeConcurrentlyStatus.RECONSUME_LATER;
  38.         }
  39.         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  40.     }
  41. }
  42. @Component
  43. public class MessageHandlerManager {
  44.     //继承了MessageHandler的ItemInfoTableMessageHandler和SkuInfoTableMessageHandler都会被注入到这里
  45.     @Autowired
  46.     private List<MessageHandler> messageHandlers;
  47.    
  48.     public void handleMessage(TableDataChangeDTO tableDataChangeDTO) throws IOException {
  49.         MessageHandler messageHandlerToUse = messageHandlers.stream()
  50.             .filter(e -> StringUtils.equals(e.tableName(), tableDataChangeDTO.getTableName()))
  51.             .findFirst()
  52.             .orElse(null);
  53.         if (messageHandlerToUse == null) {
  54.             return;
  55.         }
  56.         messageHandlerToUse.handleMessage(tableDataChangeDTO);
  57.     }
  58. }
复制代码
(3)sku表变更消息处理器
  1. @Component
  2. public class SkuInfoTableMessageHandler implements MessageHandler {
  3.     @Autowired
  4.     private ProductSearchRepository productSearchRepository;
  5.    
  6.     @Override
  7.     public String tableName() {
  8.         return "sku_info";
  9.     }
  10.    
  11.     @Override
  12.     public void handleMessage(TableDataChangeDTO tableDataChangeDTO) throws IOException {
  13.         String skuId = String.valueOf(tableDataChangeDTO.getKeyId());
  14.         //到数据库查询索引相关的信息
  15.         ProductSearchDO productSearchDO = productSearchRepository.queryProductSearchInfo(skuId);
  16.         //保存索引数据到ES
  17.         productSearchRepository.saveProductSearchInfos(Collections.singletonList(productSearchDO));
  18.     }
  19. }
  20. @Repository
  21. public class ProductSearchRepository {
  22.     private static final String SKU_INFO_INDEX = "sku_info_index";
  23.    
  24.     @Autowired
  25.     private RestHighLevelClient restHighLevelClient;
  26.    
  27.     @Autowired
  28.     private SkuInfoMapper skuInfoMapper;
  29.    
  30.     //根据skuId查询和商品索引相关的信息
  31.     public ProductSearchDO queryProductSearchInfo(String skuId) {
  32.         return skuInfoMapper.queryProductSearchInfo(skuId);
  33.     }
  34.    
  35.     //批量保存商品索引数据
  36.     public void saveProductSearchInfos(List<ProductSearchDO> productSearchDOS) throws IOException {
  37.         BulkRequest bulkRequest = new BulkRequest();
  38.         for (ProductSearchDO productSearchDO : productSearchDOS) {
  39.             Map<String, Object> jsonMap = new HashMap<>();
  40.             jsonMap.put("skuName", productSearchDO.getSkuName());
  41.             jsonMap.put("basePrice", productSearchDO.getBasePrice());
  42.             jsonMap.put("vipPrice", productSearchDO.getVipPrice());
  43.             jsonMap.put("brandId", productSearchDO.getBrandId());
  44.             jsonMap.put("brandName", productSearchDO.getBrandName());
  45.             jsonMap.put("createTime", new Date());
  46.             jsonMap.put("updateTime", new Date());
  47.             IndexRequest indexRequest = new IndexRequest(SKU_INFO_INDEX).id(productSearchDO.getSkuId()).source(jsonMap);
  48.             bulkRequest.add(indexRequest);
  49.         }
  50.         restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  51.     }
  52.     ...
  53. }
复制代码
(4)item表变更消息处理器
  1. @Component
  2. public class ItemInfoTableMessageHandler implements MessageHandler {
  3.     @Autowired
  4.     private ProductSearchRepository productSearchRepository;
  5.    
  6.     @Override
  7.     public String tableName() {
  8.         return "item_info";
  9.     }
  10.    
  11.     @Override
  12.     public void handleMessage(TableDataChangeDTO tableDataChangeDTO) throws IOException {
  13.         String itemId = String.valueOf(tableDataChangeDTO.getKeyId());
  14.         List<ProductSearchDO> productSearchDOS = productSearchRepository.queryProductSearchInfos(itemId);
  15.         productSearchRepository.saveProductSearchInfos(productSearchDOS);
  16.     }
  17. }
  18. @Repository
  19. public class ProductSearchRepository {
  20.     private static final String SKU_INFO_INDEX = "sku_info_index";
  21.    
  22.     @Autowired
  23.     private RestHighLevelClient restHighLevelClient;
  24.    
  25.     @Autowired
  26.     private SkuInfoMapper skuInfoMapper;
  27.    
  28.     //根据itemId查询和商品索引相关的信息
  29.     public List<ProductSearchDO> queryProductSearchInfos(String itemId) {
  30.         return skuInfoMapper.queryProductSearchInfos(itemId);
  31.     }
  32.    
  33.     //批量保存商品索引数据
  34.     public void saveProductSearchInfos(List<ProductSearchDO> productSearchDOS) throws IOException {
  35.         BulkRequest bulkRequest = new BulkRequest();
  36.         for (ProductSearchDO productSearchDO : productSearchDOS) {
  37.             Map<String, Object> jsonMap = new HashMap<>();
  38.             jsonMap.put("skuName", productSearchDO.getSkuName());
  39.             jsonMap.put("basePrice", productSearchDO.getBasePrice());
  40.             jsonMap.put("vipPrice", productSearchDO.getVipPrice());
  41.             jsonMap.put("brandId", productSearchDO.getBrandId());
  42.             jsonMap.put("brandName", productSearchDO.getBrandName());
  43.             jsonMap.put("createTime", new Date());
  44.             jsonMap.put("updateTime", new Date());
  45.             IndexRequest indexRequest = new IndexRequest(SKU_INFO_INDEX).id(productSearchDO.getSkuId()).source(jsonMap);
  46.             bulkRequest.add(indexRequest);
  47.         }
  48.         restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  49.     }
  50.     ...
  51. }
复制代码
 
3.商品B端搜索系统的历史搜索词的实现
(1)商品B端保存历史搜索词的接⼝
(2)商品B端查询历史搜索词的接⼝
 
(1)商品B端保存历史搜索词的接⼝
  1. 使用场景:商家输入搜索词搜索商品的时候
  2. 接口说明:把商家搜索过的词保存到Redis的List数据结构中
复制代码
  1. //商品搜索服务
  2. @DubboService(version = "1.0.0", interfaceClass = ProductSearchApi.class, retries = 0)
  3. public class ProductSearchApiImpl implements ProductSearchApi {
  4.     @Resource
  5.     private RedisCache redisCache;
  6.    
  7.     @Resource
  8.     private ProductSearchRepository productSearchRepository;
  9.    
  10.     //保存历史搜索词接口
  11.     @Override
  12.     public JsonResult<HistorySearchWordResultDTO> saveHistorySearchWord(HistorySearchWordRequest request) {
  13.         //在队列头部添加新的历史搜索词
  14.         redisCache.lpush(HistorySearchWordConstants.getKey(request.getUserId()), request.getHistorySearchWord());
  15.         //修改队列只保存固定数量的搜索词
  16.         redisCache.ltrim(HistorySearchWordConstants.getKey(request.getUserId()), 0, HistorySearchWordConstants.HISTORY_WORD_COUNT_PER_USER - 1);
  17.         return JsonResult.buildSuccess(new HistorySearchWordResultDTO(true));
  18.     }
  19.     ...
  20. }
  21. //保存用户历史搜索词请求
  22. @Data
  23. public class HistorySearchWordRequest implements Serializable {
  24.     //用户id
  25.     private Long userId;
  26.     //新的历史搜索词
  27.     private String historySearchWord;
  28. }
复制代码
(2)商品B端查询历史搜索词的接⼝
  1. 使用场景:展示商家的搜索历史记录的时候
  2. 接口说明:从Redis列表中查询商家的历史搜索词
复制代码
  1. //商品搜索服务
  2. @DubboService(version = "1.0.0", interfaceClass = ProductSearchApi.class, retries = 0)
  3. public class ProductSearchApiImpl implements ProductSearchApi {
  4.     @Resource
  5.     private RedisCache redisCache;
  6.    
  7.     @Resource
  8.     private ProductSearchRepository productSearchRepository;
  9.    
  10.     //查询历史搜索词接口
  11.     @Override
  12.     public JsonResult<HistorySearchWordDTO> listHistorySearchWords(HistorySearchWordQuery request) {
  13.         List<String> result = redisCache.lrange(HistorySearchWordConstants.getKey(request.getUserId()), 0, HistorySearchWordConstants.HISTORY_WORD_COUNT_PER_USER - 1);
  14.         return JsonResult.buildSuccess(new HistorySearchWordDTO(result));
  15.     }
  16.     ...
  17. }
  18. //查询商家历史搜索词请求
  19. @Data
  20. public class HistorySearchWordQuery implements Serializable {
  21.     //用户id
  22.     private Long userId;
  23. }
复制代码
 
4.商品B端搜索系统的搜索词补全的实现
(1)商品B端搜索系统的添加搜索补全词的接⼝
(2)商品B端搜索系统查询搜索补全词的接口
(3)商品B端搜索词补全的接口
 
(1)商品B端搜索系统的添加搜索补全词的接⼝
  1. 使用场景:运营人员添加搜索补全词的时候
  2. 接口说明:把搜索补全词保存到ES的搜索补全词索引中
复制代码
  1. //搜索词
  2. @DubboService(version = "1.0.0", interfaceClass = CompletionSearchWordApi.class, retries = 0)
  3. public class CompletionSearchWordApiImpl implements CompletionSearchWordApi {
  4.     @Autowired
  5.     private CompletionSearchWordService completionSearchWordService;
  6.    
  7.     //保存搜索补全词接口
  8.     @Override
  9.     public JsonResult<CompletionSearchWordResultDTO> saveCompletionSearchWord(CompletionSearchWordRequest request) {
  10.         try {
  11.             CompletionSearchWordResultDTO resultDTO = completionSearchWordService.saveCompletionSearchWord(request);
  12.             return JsonResult.buildSuccess(resultDTO);
  13.         } catch (ProductBizException e) {
  14.             log.error("biz error: request={}", JSON.toJSONString(request), e);
  15.             return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
  16.         } catch (Exception e) {
  17.             log.error("system error: request={}", JSON.toJSONString(request), e);
  18.             return JsonResult.buildError(e.getMessage());
  19.         }
  20.     }
  21.     ...
  22. }
  23. @Service
  24. public class CompletionSearchWordServiceImpl implements CompletionSearchWordService {
  25.     @Autowired
  26.     private CompletionSearchWordRepository completionSearchWordRepository;
  27.    
  28.     //保存搜索补全词
  29.     @Override
  30.     public CompletionSearchWordResultDTO saveCompletionSearchWord(CompletionSearchWordRequest request) throws IOException {
  31.         return completionSearchWordRepository.saveCompletionSearchWord(request);
  32.     }
  33.     ...
  34. }
  35. //运营添加搜索补全词请求
  36. @Data
  37. public class CompletionSearchWordRequest implements Serializable {
  38.     //索引名称
  39.     private String indexName;
  40.     //字段名称
  41.     private String fieldName;
  42.     //要添加的补全词
  43.     private List<String> completionSearchWords;
  44. }
  45. @Repository
  46. public class CompletionSearchWordRepository {
  47.     @Autowired
  48.     private RestHighLevelClient restHighLevelClient;
  49.    
  50.     //保存搜索补全词
  51.     public CompletionSearchWordResultDTO saveCompletionSearchWord(CompletionSearchWordRequest request) throws IOException {
  52.         BulkRequest bulkRequest = new BulkRequest(request.getIndexName());
  53.         List<String> completionSearchWords = request.getCompletionSearchWords();
  54.         for (String completionSearchWord : completionSearchWords) {
  55.             bulkRequest.add(new IndexRequest().source(XContentType.JSON, request.getFieldName(), completionSearchWord));
  56.         }
  57.         restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
  58.         return new CompletionSearchWordResultDTO(true);
  59.     }
  60.     ...
  61. }
复制代码
(2)商品B端搜索系统查询搜索补全词的接口
  1. 使用场景:后台展示搜索补全词列表的时候
  2. 接口说明:从ES的搜索补全词索引中分页查询数据
复制代码
  1. //搜索词
  2. @DubboService(version = "1.0.0", interfaceClass = CompletionSearchWordApi.class, retries = 0)
  3. public class CompletionSearchWordApiImpl implements CompletionSearchWordApi {
  4.     @Autowired
  5.     private CompletionSearchWordService completionSearchWordService;
  6.    
  7.     //查询补全词接口
  8.     @Override
  9.     public JsonResult<PageResult<CompletionSearchWordDTO>> listCompletionSearchWordPage(QueryCompletionSearchWordPageRequest request) {
  10.         try {
  11.             PageResult<CompletionSearchWordDTO> resultDTO = completionSearchWordService.listCompletionSearchWordPage(request);
  12.             return JsonResult.buildSuccess(resultDTO);
  13.         } catch (ProductBizException e) {
  14.             log.error("biz error: request={}", JSON.toJSONString(request), e);
  15.             return JsonResult.buildError(e.getErrorCode(), e.getErrorMsg());
  16.         } catch (Exception e) {
  17.             log.error("system error: request={}", JSON.toJSONString(request), e);
  18.             return JsonResult.buildError(e.getMessage());
  19.         }
  20.     }
  21.     ...
  22. }
  23. @Service
  24. public class CompletionSearchWordServiceImpl implements CompletionSearchWordService {
  25.     @Autowired
  26.     private CompletionSearchWordRepository completionSearchWordRepository;
  27.    
  28.     //查询搜索补全词
  29.     @Override
  30.     public PageResult<CompletionSearchWordDTO> listCompletionSearchWordPage(QueryCompletionSearchWordPageRequest request) throws IOException {
  31.         return completionSearchWordRepository.listCompletionSearchWordPage(request);
  32.     }
  33.     ...
  34. }
  35. //后台查询搜索词列表请求
  36. @Data
  37. public class QueryCompletionSearchWordPageRequest extends PageRequest {
  38.     //索引名称
  39.     private String indexName;
  40.     //字段名称
  41.     private String fieldName;
  42.     //补全词
  43.     private String completionSearchWord;
  44. }
  45. @Repository
  46. public class CompletionSearchWordRepository {
  47.     @Autowired
  48.     private RestHighLevelClient restHighLevelClient;
  49.     ...
  50.    
  51.     //查询搜索补全词
  52.     public PageResult<CompletionSearchWordDTO> listCompletionSearchWordPage(QueryCompletionSearchWordPageRequest request) throws IOException {
  53.         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  54.         if (StringUtils.isNotBlank(request.getCompletionSearchWord())) {
  55.             searchSourceBuilder.query(QueryBuilders.matchQuery(request.getFieldName(), request.getCompletionSearchWord()));
  56.         }
  57.   
  58.         int from = (request.getPageNum() - 1) * request.getPageSize();
  59.         searchSourceBuilder.from(from);
  60.         searchSourceBuilder.size(request.getPageSize());
  61.   
  62.         SearchRequest searchRequest = new SearchRequest(request.getIndexName());
  63.         searchRequest.source(searchSourceBuilder);
  64.   
  65.         SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  66.   
  67.         PageResult<CompletionSearchWordDTO> pageResult = new PageResult<>();
  68.         List<CompletionSearchWordDTO> pageContent = new ArrayList<>();
  69.         SearchHit[] hits = searchResponse.getHits().getHits();
  70.         for (SearchHit hit : hits) {
  71.             pageContent.add(new CompletionSearchWordDTO(((String) hit.getSourceAsMap().get(request.getFieldName()))));
  72.         }
  73.         pageResult.setContent(pageContent);
  74.         pageResult.setTotalElements(searchResponse.getHits().getTotalHits().value);
  75.         pageResult.setSize(request.getPageSize());
  76.         pageResult.setNumber(request.getPageNum());
  77.         return pageResult;
  78.     }
  79.     ...
  80. }
复制代码
(3)商品B端搜索词补全的接口
  1. 使用场景:商家在搜索框输入搜索词的时候
  2. 接口说明:根据输入的搜索词从ES的搜索补全词索引中查询对应的词
复制代码
  1. //商品搜索
  2. @DubboService(version = "1.0.0", interfaceClass = ProductSearchApi.class, retries = 0)
  3. public class ProductSearchApiImpl implements ProductSearchApi {
  4.     @Resource
  5.     private RedisCache redisCache;
  6.    
  7.     @Resource
  8.     private ProductSearchRepository productSearchRepository;
  9.     ...
  10.    
  11.     //搜索词补全接口
  12.     @Override
  13.     public JsonResult<CompletionSearchWordsDTO> listCompletionSearchWords(CompletionSearchWordQuery request) {
  14.         try {
  15.             CompletionSearchWordsDTO result = productSearchRepository.listCompletionSearchWords(request);
  16.             return JsonResult.buildSuccess(result);
  17.         } catch (Exception e) {
  18.             e.printStackTrace();
  19.             return JsonResult.buildError(e.getMessage());
  20.         }
  21.     }
  22.     ...
  23. }
  24. //补全用户搜索词请求
  25. @Data
  26. public class CompletionSearchWordQuery {
  27.     //索引名称
  28.     private String indexName;
  29.     //字段名称
  30.     private String fieldName;
  31.     //需要补全的词(用户输入的内容)
  32.     private String text;
  33.     //返回多少个补全后的词
  34.     private int count;
  35. }
  36. //商品搜索
  37. @Repository
  38. public class ProductSearchRepository {
  39.     private static final String MY_SUGGEST = "my_suggest";
  40.    
  41.     @Resource
  42.     private RestHighLevelClient restHighLevelClient;
  43.    
  44.     //搜索词补全
  45.     public CompletionSearchWordsDTO listCompletionSearchWords(CompletionSearchWordQuery request) throws IOException {
  46.         //1.构建CompletionSuggestion条件
  47.         CompletionSuggestionBuilder completionSuggestionBuilder = SuggestBuilders.completionSuggestion(request.getFieldName());
  48.         completionSuggestionBuilder.prefix(request.getText());
  49.         completionSuggestionBuilder.skipDuplicates(true);
  50.         completionSuggestionBuilder.size(request.getCount());
  51.   
  52.         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  53.         searchSourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
  54.         searchSourceBuilder.suggest(new SuggestBuilder().addSuggestion(MY_SUGGEST, completionSuggestionBuilder));
  55.   
  56.         //2.封装搜索请求
  57.         SearchRequest searchRequest = new SearchRequest();
  58.         searchRequest.indices(request.getIndexName());
  59.         searchRequest.source(searchSourceBuilder);
  60.   
  61.         //3.查询elasticsearch
  62.         SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  63.   
  64.         //4.获取响应中的补全的词的列表
  65.         CompletionSuggestion completionSuggestion = searchResponse.getSuggest().getSuggestion(MY_SUGGEST);
  66.         List<CompletionSuggestion.Entry.Option> options = completionSuggestion.getEntries().get(0).getOptions();
  67.   
  68.         List<String> result = new ArrayList<>();
  69.         for (CompletionSuggestion.Entry.Option option : options) {
  70.             result.add(option.getText().string());
  71.         }
  72.         return new CompletionSearchWordsDTO(result);
  73.     }
  74.     ...
  75. }
复制代码
 
5.商品B端搜索系统的搜索接口实现
(1)商品B端的搜索查询接口
(2)商品B端的结构化查询接口
 
(1)商品B端的搜索查询接口
  1. 使用场景:商家搜索商品的时候
  2. 接口说明:根据输入的搜索词从商品索引中查询skuId列表
复制代码
  1. //商品搜索
  2. @DubboService(version = "1.0.0", interfaceClass = ProductSearchApi.class, retries = 0)
  3. public class ProductSearchApiImpl implements ProductSearchApi {
  4.     ...
  5.     //商品搜索查询接口
  6.     @Override
  7.     public JsonResult<PorductSearchDTO> searchProducts(ProductSearchQuery request) {
  8.         try {
  9.             PorductSearchDTO result = productSearchRepository.searchProducts(request);
  10.             return JsonResult.buildSuccess(result);
  11.         } catch (Exception e) {
  12.             e.printStackTrace();
  13.             return JsonResult.buildError(e.getMessage());
  14.         }
  15.     }
  16.     ...
  17. }
  18. //商品搜索请求
  19. @Data
  20. public class ProductSearchQuery extends PageQuery {
  21.     //索引名字
  22.     private String indexName;
  23.     //查询参数
  24.     private Map<String, String> queryTexts;
  25.     //高亮字段
  26.     private String highLightField;
  27. }
  28. //商品搜索
  29. @Repository
  30. public class ProductSearchRepository {
  31.     private static final String MY_SUGGEST = "my_suggest";
  32.    
  33.     @Resource
  34.     private RestHighLevelClient restHighLevelClient;
  35.     ...
  36.    
  37.     //商品搜索查询接口
  38.     public PorductSearchDTO searchProducts(ProductSearchQuery request) throws IOException {
  39.         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  40.         searchSourceBuilder.trackTotalHits(true);
  41.   
  42.         //1.构建match条件
  43.         request.getQueryTexts().forEach((field, text) -> {
  44.             searchSourceBuilder.query(QueryBuilders.matchQuery(field, text));
  45.         });
  46.   
  47.         //2.设置搜索高亮配置
  48.         HighlightBuilder highlightBuilder = new HighlightBuilder();
  49.         highlightBuilder.field(request.getHighLightField());
  50.         highlightBuilder.preTags("");
  51.         highlightBuilder.postTags("");
  52.         highlightBuilder.numOfFragments(0);
  53.         searchSourceBuilder.highlighter(highlightBuilder);
  54.   
  55.         //3.设置搜索分页参数
  56.         int from = (request.getPageNum() - 1) * request.getPageSize();
  57.         searchSourceBuilder.from(from);
  58.         searchSourceBuilder.size(request.getPageSize());
  59.   
  60.         //4.封装搜索请求
  61.         SearchRequest searchRequest = new SearchRequest(request.getIndexName());
  62.         searchRequest.source(searchSourceBuilder);
  63.   
  64.         //5.查询elasticsearch
  65.         SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  66.   
  67.         //6.对结果进行高亮处理
  68.         SearchHits hits = searchResponse.getHits();
  69.         for (SearchHit hit : hits) {
  70.             HighlightField highlightField = hit.getHighlightFields().get(request.getHighLightField());
  71.             Map<String, Object> sourceAsMap = hit.getSourceAsMap();
  72.             Text[] fragments = highlightField.fragments();
  73.             StringBuilder builder = new StringBuilder();
  74.             for (Text fragment : fragments) {
  75.                 builder.append(fragment.string());
  76.             }
  77.             sourceAsMap.put(request.getHighLightField(), builder.toString());
  78.         }
  79.   
  80.         //7.封装返回结果
  81.         return buildPorductSearchDTO(hits, request.getPageNum(), request.getPageSize());
  82.     }
  83.     ...
  84. }
复制代码
(2)商品B端的结构化查询接口
  1. 使用场景:商家对搜索结果过滤和排序的时候
  2. 接口说明:根据用户输入的过滤和排序条件从商品索引中查询skuId列表
复制代码
  1. //商品搜索
  2. @DubboService(version = "1.0.0", interfaceClass = ProductSearchApi.class, retries = 0)
  3. public class ProductSearchApiImpl implements ProductSearchApi {
  4.     ...
  5.     //商品结构化查询接口
  6.     @Override
  7.     public JsonResult<PorductSearchDTO> structuredSearchProducts(ProductStructuredQuery request) {
  8.         try {
  9.             PorductSearchDTO result = productSearchRepository.structuredSearchProducts(request);
  10.             return JsonResult.buildSuccess(result);
  11.         } catch (Exception e) {
  12.             e.printStackTrace();
  13.             return JsonResult.buildError(e.getMessage());
  14.         }
  15.     }
  16.     ...
  17. }
  18. //商品结构化查询请求
  19. @Data
  20. public class ProductStructuredQuery extends PageQuery {
  21.     //索引名字
  22.     private String indexName;
  23.     //Query DSL
  24.     private Map<String, Object> queryDsl;
  25. }
  26. //商品搜索
  27. @Repository
  28. public class ProductSearchRepository {
  29.     private static final String MY_SUGGEST = "my_suggest";
  30.    
  31.     @Resource
  32.     private RestHighLevelClient restHighLevelClient;
  33.     ...
  34.    
  35.     //商品结构化查询
  36.     public PorductSearchDTO structuredSearchProducts(ProductStructuredQuery request) throws IOException {
  37.         SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
  38.         searchSourceBuilder.trackTotalHits(true);
  39.   
  40.         //1.解析queryDSL
  41.         String queryDsl = JSON.toJSONString(request.getQueryDsl());
  42.         SearchModule searchModule = new SearchModule(Settings.EMPTY, false, Collections.emptyList());
  43.         NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(searchModule.getNamedXContents());
  44.         XContent xContent = XContentFactory.xContent(XContentType.JSON);
  45.         XContentParser xContentParser = xContent.createParser(namedXContentRegistry, LoggingDeprecationHandler.INSTANCE, queryDsl);
  46.         searchSourceBuilder.parseXContent(xContentParser);
  47.   
  48.         //2.设置搜索分页参数
  49.         int from = (request.getPageNum() - 1) * request.getPageSize();
  50.         searchSourceBuilder.from(from);
  51.         searchSourceBuilder.size(request.getPageSize());
  52.   
  53.         //3.封装搜索请求
  54.         SearchRequest searchRequest = new SearchRequest(request.getIndexName());
  55.         searchRequest.source(searchSourceBuilder);
  56.   
  57.         //4.查询elasticsearch
  58.         SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
  59.   
  60.         //5.封装返回结果
  61.         return buildPorductSearchDTO(searchResponse.getHits(), request.getPageNum(), request.getPageSize());
  62.     }
  63.     ...
  64. }
复制代码
 
6.索引重建
(1)问题分析
(2)解决方案
(3)操作演示
(4)其他说明
 
(1)问题分析
在实际中可能会遇到正在使⽤的索引需要变化字段类型、增减字段等,这时可能就需要创建新的mappings。
 
因为索引正在被应⽤使⽤,在进⾏操作时就要考虑怎么降低对应⽤的影响,以及如何把⽬前的数据迁移到新的索引中。
 
(2)解决方案
可以使⽤ES的索引别名功能来降低对应⽤的影响,实现不停机重建索引。可以使⽤ES的Scroll API + Bulk API,实现把⽬前的数据迁移到新的索引中。
 
(3)操作演示
  1. 一.假设目前正在被使用的商品索引为sku_info_index
  2. 二.首先给sku_info_index索引起别名sku_info_index_alias
  3. 三.然后需要新建一个索引sku_info_index_v2
  4. 四.接着使用Scroll API + Bulk API迁移数据
  5. 五.最后把sku_info_index_alias索引别名指向sku_info_index_v2索引
复制代码
一.目前正在被使用的商品索引
sku_info_index现在正在被业务使⽤:
  1. PUT /sku_info_index
  2. {
  3.     "settings": {
  4.         "number_of_shards": 3,
  5.         "number_of_replicas": 1
  6.     },
  7.     "mappings":{
  8.         "properties": {
  9.             "skuName": {
  10.                 "type": "text",
  11.                 "analyzer": "ik_max_word",
  12.                 "search_analyzer": "ik_smart"
  13.             },
  14.             "basePrice": {
  15.                 "type": "integer"
  16.             },
  17.             "vipPrice": {
  18.                 "type": "integer"
  19.             },
  20.             "brandId": {
  21.                 "type": "keyword"
  22.             },
  23.             "brandName": {
  24.                 "type": "keyword"
  25.             },
  26.             "saleCount": {
  27.                 "type": "integer"
  28.             },
  29.             "createTime": {
  30.                 "type": "date",
  31.                 "format": "yyyy-MM-dd HH:mm:ss"
  32.             },
  33.             "updateTime": {
  34.                 "type": "date",
  35.                 "format": "yyyy-MM-dd HH:mm:ss"
  36.             }
  37.         }
  38.     }
  39. }
复制代码
二.给sku_info_index索引起别名
让应⽤使⽤sku_info_index_alias别名来操作数据:
  1. PUT /sku_info_index/_alias/sku_info_index_alias
复制代码
三.然后需要新建一个索引sku_info_index_v2
新建一个sku_info_index_v2索引:
  1. PUT /sku_info_index
  2. {
  3.     "settings": {
  4.         "number_of_shards": 3,
  5.         "number_of_replicas": 1
  6.     },
  7.     "mappings": {
  8.         "properties": {
  9.             "skuName": {
  10.                 "type": "text",
  11.                 "analyzer": "ik_max_word",
  12.                 "search_analyzer": "ik_smart"
  13.             },
  14.             "basePrice": {
  15.                 "type": "integer"
  16.             },
  17.             "vipPrice": {
  18.                 "type": "integer"
  19.             },
  20.             "brandId": {
  21.                 "type": "keyword"
  22.             },
  23.             "brandName": {
  24.                 "type": "keyword"
  25.             },
  26.             "saleCount": {
  27.                 "type": "integer"
  28.             },
  29.             "label": {
  30.                 "type": "integer"
  31.             },
  32.             "createTime": {
  33.                 "type": "date",
  34.                 "format": "yyyy-MM-dd HH:mm:ss"
  35.             },
  36.             "updateTime": {
  37.                 "type": "date",
  38.                 "format": "yyyy-MM-dd HH:mm:ss"
  39.             }
  40.         }
  41.     }
  42. }
复制代码
四.接着使用Scroll API + Bulk API迁移数据
  1. #https://www.elastic.co/guide/en/elasticsearch/reference/7.6/search-request-body.html#request-body-search-scroll
  2. POST /sku_info_index/_search?scroll=1m
  3. {
  4.     "size": 3,
  5.     "query": {
  6.         "match_all": { }
  7.     }
  8. }
  9. POST /_bulk
  10. {
  11.     "index": {
  12.         "_index": "sku_info_index_v2",
  13.         "_id": "8000177337"
  14.     }
  15. }
  16. {
  17.     "skuName": "Apple iPhone 13 Pro Max 256GB 苍岭绿⾊ ⽀持移动联通电信5G 双卡双待 ⼿机",
  18.     "brandName": "苹果",
  19.     "createTime": "2022-03-12 08:24:57",
  20.     "brandId": 4,
  21.     "vipPrice": 9799,
  22.     "updateTime": "2022-03-12 08:24:57",
  23.     "basePrice": 9999,
  24.     "label": "新品"
  25. }
  26. {
  27.     "index": {
  28.         "_index": "sku_info_index_v2",
  29.         "_id": "8000177338"
  30.     }
  31. }
  32. {
  33.     "skuName": "Apple iPhone 13 (A2634)128GB 绿⾊ ⽀持移动联通电信5G 双卡双待⼿ 机",
  34.     "brandName": "苹果",
  35.     "createTime": "2022-03-12 08:24:57",
  36.     "brandId": 4,
  37.     "vipPrice": 5798,
  38.     "updateTime": "2022-03-12 08:24:57",
  39.     "basePrice": 5999,
  40.     "label": "爆品"
  41. }
  42. {
  43.     "index": {
  44.         "_index": "sku_info_index_v2",
  45.         "_id": "8000177339"
  46.     }
  47. }
  48. {
  49.     "skuName": "苹果13mini Apple iphone 13 mini 5G新品⼿机 粉⾊ 128GB",
  50.     "brandName": "苹果",
  51.     "createTime": "2022-03-12 08:24:57",
  52.     "brandId": 4,
  53.     "vipPrice": 4900,
  54.     "updateTime": "2022-03-12 08:24:57",
  55.     "basePrice": 5100,
  56.     "label": "超值特惠"
  57. }
复制代码
五.最后把sku_info_index_alias索引别名指向sku_info_index_v2索引
  1. POST /_aliases
  2. {
  3.     "actions": [{
  4.         "remove": {
  5.             "index": "sku_info_index",
  6.             "alias": "sku_info_index_alias"
  7.         }
  8.     }, {
  9.         "add": {
  10.             "index": "sku_info_index_v2",
  11.             "alias": "sku_info_index_alias"
  12.         }
  13.     }]
  14. }
复制代码
(4)其他说明
一.如果在上ES前,就预计索引结构可能会发⽣变化。可以⼀开始就通过索引别名来操作数据,这样当索引结构需要变更时可按上⾯的⽅案及演示实现不停机重建索引。
 
二.当使⽤索引别名时,ES Java API的代码无需任何变化,⽐如下⾯是SearchRequest的构造法⽅法:
  1. public SearchRequest(String... indices) {
  2.     this(indices, new SearchSourceBuilder());
  3. }
复制代码
如果直接索引名是example_index,那么创建的SearchRequest对象如下,其中example_index为索引名:
  1. new SearchRequest("example_index")
复制代码
如果直接索引example_index对应的索引别名是example_index_alias,那么创建的SearchRequest对象就是:
  1. new SearchRequest("example_index_alias")
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册