找回密码
 立即注册
首页 业界区 业界 Pipeline模式应用

Pipeline模式应用

都硎唷 2025-6-9 08:27:15
本文记录Pipeline设计模式在业务流程编排中的应用
前言

Pipeline模式意为管道模式,又称为流水线模式。旨在通过预先设定好的一系列阶段来处理输入的数据,每个阶段的输出即是下一阶段的输入。
本案例通过定义PipelineProduct(管道产品),PipelineJob(管道任务),PipelineNode(管道节点),完成一整条流水线的组装,并将“原材料”加工为“商品”。其中管道产品负责承载各个阶段的产品信息;管道任务负责不同阶段对产品的加工;管道节点约束了管道产品及任务的关系,通过信号量定义了任务的执行方式。
依赖

工具依赖如下
  1.             
  2.             <dependency>
  3.                 <groupId>cn.hutool</groupId>
  4.                 hutool-all</artifactId>
  5.                 <version>最新版本</version>
  6.             </dependency>
复制代码
编程示例

1. 管道产品定义
  1. package com.example.demo.pipeline.model;
  2. /**
  3. * 管道产品接口
  4. *
  5. * @param <S> 信号量
  6. * @author
  7. * @date 2023/05/15 11:49
  8. */
  9. public interface PipelineProduct<S> {
  10. }
复制代码
2. 管道任务定义
  1. package com.example.demo.pipeline.model;
  2. /**
  3. * 管道任务接口
  4. *
  5. * @param <P> 管道产品
  6. * @author
  7. * @date 2023/05/15 11:52
  8. */
  9. @FunctionalInterface
  10. public interface PipelineJob<P> {
  11.     /**
  12.      * 执行任务
  13.      *
  14.      * @param product 管道产品
  15.      * @return {@link P}
  16.      */
  17.     P execute(P product);
  18. }
复制代码
3. 管道节点定义
  1. package com.jd.baoxian.mall.market.service.pipeline.model;
  2. import java.util.function.Predicate;
  3. /**
  4. * 管道节点定义
  5. *
  6. * @param <S> 信号量
  7. * @param <P> 管道产品
  8. * @author
  9. * @date 2023/05/15 11:54
  10. */
  11. public interface PipelineNode<S, P extends PipelineProduct<S>> {
  12.     /**
  13.      * 节点组装,按照上个管道任务传递的信号,执行 pipelineJob
  14.      *
  15.      * @param pipelineJob 管道任务
  16.      * @return {@link PipelineNode}<{@link S},  {@link P}>
  17.      */
  18.     PipelineNode<S, P> flax(PipelineJob<P> pipelineJob);
  19.     /**
  20.      * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob
  21.      *
  22.      * @param signal      信号
  23.      * @param pipelineJob 管道任务
  24.      * @return {@link PipelineNode}<{@link S},  {@link P}>
  25.      */
  26.     PipelineNode<S, P> flax(S signal, PipelineJob<P> pipelineJob);
  27.     /**
  28.      * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob
  29.      *
  30.      * @param predicate   信号
  31.      * @param pipelineJob 管道任务
  32.      * @return {@link PipelineNode}<{@link S},  {@link P}>
  33.      */
  34.     PipelineNode<S, P> flax(Predicate<S> predicate, PipelineJob<P> pipelineJob);
  35.     /**
  36.      * 管道节点-任务执行
  37.      *
  38.      * @param product 管道产品
  39.      * @return {@link P}
  40.      */
  41.     P execute(P product);
  42. }
复制代码
4. 管道产品、任务,节点的实现

4.1 管道产品
  1. package com.example.demo.pipeline.factory;
  2. import com.example.demo.model.request.DemoReq;
  3. import com.example.demo.model.response.DemoResp;
  4. import com.example.demo.pipeline.model.PipelineProduct;
  5. import lombok.*;
  6. /**
  7. * 样例-管道产品
  8. *
  9. * @author
  10. * @date 2023/05/15 14:04
  11. */
  12. @Data
  13. @Builder
  14. @NoArgsConstructor
  15. @AllArgsConstructor
  16. public class DemoPipelineProduct implements PipelineProduct<DemoPipelineProduct.DemoSignalEnum> {
  17.     /**
  18.      * 信号量
  19.      */
  20.     private DemoSignalEnum signal;
  21.     /**
  22.      * 产品-入参及回参
  23.      */
  24.     private DemoProductData productData;
  25.     /**
  26.      * 异常信息
  27.      */
  28.     private Exception exception;
  29.     /**
  30.      * 流程Id
  31.      */
  32.     private String tradeId;
  33.     @Data
  34.     @Builder
  35.     @NoArgsConstructor
  36.     @AllArgsConstructor
  37.     public static class DemoProductData {
  38.         /**
  39.          * 待验证入参
  40.          */
  41.         private DemoReq userRequestData;
  42.         /**
  43.          * 待验证回参
  44.          */
  45.         private DemoResp userResponseData;
  46.     }
  47.     /**
  48.      * 产品-信号量
  49.      *
  50.      * @author
  51.      * @date 2023/05/15 13:54
  52.      */
  53.     @Getter
  54.     public enum DemoSignalEnum {
  55.         /**
  56.          *
  57.          */
  58.         NORMAL(0, "正常"),
  59.         /**
  60.          *
  61.          */
  62.         CHECK_NOT_PASS(1, "校验不通过"),
  63.         /**
  64.          *
  65.          */
  66.         BUSINESS_ERROR(2, "业务异常"),
  67.         /**
  68.          *
  69.          */
  70.         LOCK_ERROR(3, "锁处理异常"),
  71.         /**
  72.          *
  73.          */
  74.         DB_ERROR(4, "事务处理异常"),
  75.         ;
  76.         /**
  77.          * 枚举码值
  78.          */
  79.         private final int code;
  80.         /**
  81.          * 枚举描述
  82.          */
  83.         private final String desc;
  84.         /**
  85.          * 构造器
  86.          *
  87.          * @param code
  88.          * @param desc
  89.          */
  90.         DemoSignalEnum(int code, String desc) {
  91.             this.code = code;
  92.             this.desc = desc;
  93.         }
  94.     }
  95. }
复制代码
4.2 管道任务(抽象类)
  1. package com.example.demo.pipeline.factory.job;
  2. import cn.hutool.core.util.ClassUtil;
  3. import cn.hutool.json.JSONUtil;
  4. import com.example.demo.pipeline.factory.DemoPipelineProduct;
  5. import com.example.demo.pipeline.model.PipelineJob;
  6. import lombok.extern.slf4j.Slf4j;
  7. /**
  8. * 管道任务-抽象层
  9. *
  10. * @author
  11. * @date 2023/05/15 19:48
  12. */
  13. @Slf4j
  14. public abstract class AbstractDemoJob implements PipelineJob<DemoPipelineProduct> {
  15.     /**
  16.      * 公共执行逻辑
  17.      *
  18.      * @param product 产品
  19.      * @return
  20.      */
  21.     @Override
  22.     public DemoPipelineProduct execute(DemoPipelineProduct product) {
  23.         DemoPipelineProduct.DemoSignalEnum newSignal;
  24.         try {
  25.             newSignal = execute(product.getTradeId(), product.getProductData());
  26.         } catch (Exception e) {
  27.             product.setException(e);
  28.             newSignal = DemoPipelineProduct.DemoSignalEnum.BUSINESS_ERROR;
  29.         }
  30.         product.setSignal(newSignal);
  31.         defaultLogPrint(product.getTradeId(), product);
  32.         return product;
  33.     }
  34.     /**
  35.      * 子类执行逻辑
  36.      *
  37.      * @param tradeId     流程Id
  38.      * @param productData 请求数据
  39.      * @return
  40.      * @throws Exception 异常
  41.      */
  42.     abstract DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception;
  43.     /**
  44.      * 默认的日志打印
  45.      */
  46.     public void defaultLogPrint(String tradeId, DemoPipelineProduct product) {
  47.         if (!DemoPipelineProduct.DemoSignalEnum.NORMAL.equals(product.getSignal())) {
  48.             log.info("流水线任务处理异常:流程Id=【{}】,信号量=【{}】,任务=【{}】,参数=【{}】", tradeId, product.getSignal(),
  49.                     ClassUtil.getClassName(this, true), JSONUtil.toJsonStr(product.getProductData()), product.getException());
  50.         }
  51.     }
  52. }
复制代码
4.3 管道节点
  1. package com.example.demo.pipeline.factory;
  2. import cn.hutool.core.util.ClassUtil;
  3. import cn.hutool.json.JSONUtil;
  4. import com.example.demo.pipeline.model.PipelineJob;
  5. import com.example.demo.pipeline.model.PipelineNode;
  6. import lombok.extern.slf4j.Slf4j;
  7. import java.util.function.Predicate;
  8. /**
  9. * 审核-管道节点
  10. *
  11. * @author
  12. * @date 2023/05/15 14:32
  13. */
  14. @Slf4j
  15. public class DemoPipelineNode implements PipelineNode<DemoPipelineProduct.DemoSignalEnum, DemoPipelineProduct> {
  16.     /**
  17.      * 下一管道节点
  18.      */
  19.     private DemoPipelineNode next;
  20.     /**
  21.      * 当前管道任务
  22.      */
  23.     private PipelineJob<DemoPipelineProduct> job;
  24.     /**
  25.      * 节点组装,按照上个管道任务传递的信号,执行 pipelineJob
  26.      *
  27.      * @param pipelineJob 管道任务
  28.      * @return {@link DemoPipelineNode}
  29.      */
  30.     @Override
  31.     public DemoPipelineNode flax(PipelineJob<DemoPipelineProduct> pipelineJob) {
  32.         return flax(DemoPipelineProduct.DemoSignalEnum.NORMAL, pipelineJob);
  33.     }
  34.     /**
  35.      * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob
  36.      *
  37.      * @param signal      信号
  38.      * @param pipelineJob 管道任务
  39.      * @return {@link DemoPipelineNode}
  40.      */
  41.     @Override
  42.     public DemoPipelineNode flax(DemoPipelineProduct.DemoSignalEnum signal, PipelineJob<DemoPipelineProduct> pipelineJob) {
  43.         return flax(signal::equals, pipelineJob);
  44.     }
  45.     /**
  46.      * 节点组装,上个管道过来的信号运行 predicate 后是true的话,执行 pipelineJob
  47.      *
  48.      * @param predicate
  49.      * @param pipelineJob
  50.      * @return
  51.      */
  52.     @Override
  53.     public DemoPipelineNode flax(Predicate<DemoPipelineProduct.DemoSignalEnum> predicate,
  54.                                  PipelineJob<DemoPipelineProduct> pipelineJob) {
  55.         this.next = new DemoPipelineNode();
  56.         this.job = (job) -> {
  57.             if (predicate.test(job.getSignal())) {
  58.                 return pipelineJob.execute(job);
  59.             } else {
  60.                 return job;
  61.             }
  62.         };
  63.         return next;
  64.     }
  65.     /**
  66.      * 管道节点-任务执行
  67.      *
  68.      * @param product 管道产品
  69.      * @return
  70.      */
  71.     @Override
  72.     public DemoPipelineProduct execute(DemoPipelineProduct product) {
  73.         // 执行当前任务
  74.         try {
  75.             product = job == null ? product : job.execute(product);
  76.             return next == null ? product : next.execute(product);
  77.         } catch (Exception e) {
  78.             log.error("流水线处理异常:流程Id=【{}】,任务=【{}】,参数=【{}】", product.getTradeId(), ClassUtil.getClassName(job, true), JSONUtil.toJsonStr(product.getProductData()), product.getException());
  79.             return null;
  80.         }
  81.     }
  82. }
复制代码
5. 业务实现

通过之前的定义,我们已经可以通过Pipeline完成流水线的搭建,接下来以“审核信息提交”这一业务场景,完成应用。
5.1 定义Api、入参、回参
  1. package com.example.demo.api;
  2. import com.example.demo.model.request.DemoReq;
  3. import com.example.demo.model.response.DemoResp;
  4. import com.example.demo.pipeline.factory.PipelineForManagerSubmit;
  5. import org.springframework.stereotype.Service;
  6. import javax.annotation.Resource;
  7. /**
  8. * 演示-API
  9. *
  10. * @author
  11. * @date 2023/08/06 16:27
  12. */
  13. @Service
  14. public class DemoManagerApi {
  15.     /**
  16.      * 管道-审核提交
  17.      */
  18.     @Resource
  19.     private PipelineForManagerSubmit pipelineForManagerSubmit;
  20.     /**
  21.      * 审核提交
  22.      *
  23.      * @param requestData 请求数据
  24.      * @return {@link DemoResp}
  25.      */
  26.     public DemoResp managerSubmit(DemoReq requestData) {
  27.         return pipelineForManagerSubmit.managerSubmitCheck(requestData);
  28.     }
  29. }
  30. package com.example.demo.model.request;
  31. /**
  32. * 演示入参
  33. *
  34. * @author
  35. * @date 2023/08/06 16:33
  36. */
  37. public class DemoReq {
  38. }
  39. package com.example.demo.model.response;
  40. import lombok.Data;
  41. /**
  42. * 演示回参
  43. *
  44. * @author
  45. * @date 2023/08/06 16:33
  46. */
  47. @Data
  48. public class DemoResp {
  49.     /**
  50.      * 成功标识
  51.      */
  52.     private Boolean success = false;
  53.     /**
  54.      * 结果信息
  55.      */
  56.     private String resultMsg;
  57.     /**
  58.      * 构造方法
  59.      *
  60.      * @param message 消息
  61.      * @return {@link DemoResp}
  62.      */
  63.     public static DemoResp buildRes(String message) {
  64.         DemoResp response = new DemoResp();
  65.         response.setResultMsg(message);
  66.         return response;
  67.     }
  68. }
复制代码
5.2 定义具体任务

假定审核提交的流程需要包含:参数验证、加锁、解锁、事务提交
  1. package com.example.demo.pipeline.factory.job;
  2. import cn.hutool.json.JSONUtil;
  3. import com.example.demo.model.request.DemoReq;
  4. import com.example.demo.pipeline.factory.DemoPipelineProduct;
  5. import lombok.extern.slf4j.Slf4j;
  6. import org.springframework.stereotype.Service;
  7. /**
  8. * 加锁-实现层
  9. *
  10. * @author
  11. * @date 2023/05/17 17:00
  12. */
  13. @Service
  14. @Slf4j
  15. public class CheckRequestLockJob extends AbstractDemoJob {
  16.     /**
  17.      * 子类执行逻辑
  18.      *
  19.      * @param tradeId     流程Id
  20.      * @param productData 请求数据
  21.      * @return
  22.      * @throws Exception 异常
  23.      */
  24.     @Override
  25.     DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
  26.         DemoReq userRequestData = productData.getUserRequestData();
  27.         log.info("任务[{}]加锁,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
  28.         return DemoPipelineProduct.DemoSignalEnum.NORMAL;
  29.     }
  30. }
  31. package com.example.demo.pipeline.factory.job;
  32. import cn.hutool.json.JSONUtil;
  33. import com.example.demo.model.request.DemoReq;
  34. import com.example.demo.pipeline.factory.DemoPipelineProduct;
  35. import lombok.extern.slf4j.Slf4j;
  36. import org.springframework.stereotype.Service;
  37. /**
  38. * 解锁-实现层
  39. *
  40. * @author
  41. * @date 2023/05/17 17:00
  42. */
  43. @Service
  44. @Slf4j
  45. public class CheckRequestUnLockJob extends AbstractDemoJob {
  46.     /**
  47.      * 子类执行逻辑
  48.      *
  49.      * @param tradeId     流程Id
  50.      * @param productData 请求数据
  51.      * @return
  52.      * @throws Exception 异常
  53.      */
  54.     @Override
  55.     DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
  56.         DemoReq userRequestData = productData.getUserRequestData();
  57.         log.info("任务[{}]解锁,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
  58.         return DemoPipelineProduct.DemoSignalEnum.NORMAL;
  59.     }
  60. }
  61. package com.example.demo.pipeline.factory.job;
  62. import cn.hutool.json.JSONUtil;
  63. import com.example.demo.model.request.DemoReq;
  64. import com.example.demo.pipeline.factory.DemoPipelineProduct;
  65. import lombok.extern.slf4j.Slf4j;
  66. import org.springframework.stereotype.Component;
  67. /**
  68. * 审核-参数验证-实现类
  69. *
  70. * @author
  71. * @date 2023/05/15 19:50
  72. */
  73. @Slf4j
  74. @Component
  75. public class ManagerCheckParamJob extends AbstractDemoJob {
  76.     /**
  77.      * 执行基本入参验证
  78.      *
  79.      * @param tradeId
  80.      * @param productData 请求数据
  81.      * @return
  82.      */
  83.     @Override
  84.     DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) {
  85.         /*
  86.          * 入参验证
  87.          */
  88.         DemoReq userRequestData = productData.getUserRequestData();
  89.         log.info("任务[{}]入参验证,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
  90.         // 非空验证
  91.         // 有效验证
  92.         // 校验通过,退出
  93.         return DemoPipelineProduct.DemoSignalEnum.NORMAL;
  94.     }
  95. }
  96. package com.example.demo.pipeline.factory.job;
  97. import cn.hutool.json.JSONUtil;
  98. import com.example.demo.model.request.DemoReq;
  99. import com.example.demo.model.response.DemoResp;
  100. import com.example.demo.pipeline.factory.DemoPipelineProduct;
  101. import lombok.extern.slf4j.Slf4j;
  102. import org.springframework.stereotype.Service;
  103. /**
  104. * 审核-信息提交-业务实现
  105. *
  106. * @author
  107. * @date 2023/05/12 14:36
  108. */
  109. @Service
  110. @Slf4j
  111. public class ManagerSubmitJob extends AbstractDemoJob {
  112.     /**
  113.      * 子类执行逻辑
  114.      *
  115.      * @param tradeId     流程Id
  116.      * @param productData 请求数据
  117.      * @return
  118.      * @throws Exception 异常
  119.      */
  120.     @Override
  121.     DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
  122.         DemoReq userRequestData = productData.getUserRequestData();
  123.         try {
  124.             /*
  125.              * DB操作
  126.              */
  127.             log.info("任务[{}]信息提交,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
  128.             productData.setUserResponseData(DemoResp.buildRes("成功"));
  129.         } catch (Exception ex) {
  130.             log.error("审核-信息提交-DB操作失败,入参:{}", JSONUtil.toJsonStr(userRequestData), ex);
  131.             throw ex;
  132.         }
  133.         return DemoPipelineProduct.DemoSignalEnum.NORMAL;
  134.     }
  135. }
复制代码
5.3 完成流水线组装

针对入回参转换,管道任务执行顺序及执行信号量的构建
  1. package com.example.demo.pipeline.factory;
  2. import com.example.demo.model.request.DemoReq;
  3. import com.example.demo.model.response.DemoResp;
  4. import com.example.demo.pipeline.factory.job.CheckRequestLockJob;
  5. import com.example.demo.pipeline.factory.job.CheckRequestUnLockJob;
  6. import com.example.demo.pipeline.factory.job.ManagerCheckParamJob;
  7. import com.example.demo.pipeline.factory.job.ManagerSubmitJob;
  8. import lombok.RequiredArgsConstructor;
  9. import lombok.extern.slf4j.Slf4j;
  10. import org.springframework.stereotype.Service;
  11. import javax.annotation.PostConstruct;
  12. import java.util.Objects;
  13. import java.util.UUID;
  14. /**
  15. * 管道工厂入口-审核流水线
  16. *
  17. * @author
  18. * @date 2023/05/15 19:52
  19. */
  20. @Slf4j
  21. @Service
  22. @RequiredArgsConstructor
  23. public class PipelineForManagerSubmit {
  24.     /**
  25.      * 审核-管道节点
  26.      */
  27.     private final DemoPipelineNode managerSubmitNode = new DemoPipelineNode();
  28.     /**
  29.      * 审核-管道任务-提交-防刷锁-加锁
  30.      */
  31.     private final CheckRequestLockJob checkRequestLockJob;
  32.     /**
  33.      * 审核-管道任务-提交-防刷锁-解锁
  34.      */
  35.     private final CheckRequestUnLockJob checkRequestUnLockJob;
  36.     /**
  37.      * 审核-管道任务-参数验证
  38.      */
  39.     private final ManagerCheckParamJob managerCheckParamJob;
  40.     /**
  41.      * 审核-管道任务-事务操作
  42.      */
  43.     private final ManagerSubmitJob managerSubmitJob;
  44.     /**
  45.      * 组装审核的处理链
  46.      */
  47.     @PostConstruct
  48.     private void assembly() {
  49.         assemblyManagerSubmit();
  50.     }
  51.     /**
  52.      * 组装处理链
  53.      */
  54.     private void assemblyManagerSubmit() {
  55.         managerSubmitNode
  56.                 // 参数验证及填充
  57.                 .flax(managerCheckParamJob)
  58.                 // 防刷锁
  59.                 .flax(checkRequestLockJob)
  60.                 // 事务操作
  61.                 .flax(managerSubmitJob)
  62.                 // 锁释放
  63.                 .flax((ignore) -> true, checkRequestUnLockJob);
  64.     }
  65.     /**
  66.      * 审核-提交处理
  67.      *
  68.      * @param requestData 入参
  69.      * @return
  70.      */
  71.     public DemoResp managerSubmitCheck(DemoReq requestData) {
  72.         DemoPipelineProduct initialProduct = managerSubmitCheckInitial(requestData);
  73.         DemoPipelineProduct finalProduct = managerSubmitNode.execute(initialProduct);
  74.         if (Objects.isNull(finalProduct) || Objects.nonNull(finalProduct.getException())) {
  75.             return DemoResp.buildRes("未知异常");
  76.         }
  77.         return finalProduct.getProductData().getUserResponseData();
  78.     }
  79.     /**
  80.      * 审核-初始化申请的流水线数据
  81.      *
  82.      * @param requestData 入参
  83.      * @return 初始的流水线数据
  84.      */
  85.     private DemoPipelineProduct managerSubmitCheckInitial(DemoReq requestData) {
  86.         // 初始化
  87.         return DemoPipelineProduct.builder()
  88.                 .signal(DemoPipelineProduct.DemoSignalEnum.NORMAL)
  89.                 .tradeId(UUID.randomUUID().toString())
  90.                 .productData(DemoPipelineProduct.DemoProductData.builder().userRequestData(requestData).build())
  91.                 .build();
  92.     }
  93. }
复制代码
总结

本文重点为管道模式的抽象与应用,上述示例仅为个人理解。实际应用中,此案例长于应对各种规则冗杂的业务场景,便于规则编排。
待改进点:

  • 各个任务其实隐含了执行的先后顺序,此项内容可进一步实现;
  • 针对最后“流水线组装”这一步,可通过配置描述的方式,进一步抽象,从而将变动控制在每个“管道任务”的描述上,针对规则项做到“可插拔”式处理。
作者:京东保险 侯亚东
来源:京东云开发者社区 转载请注明来源

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册