本文记录Pipeline设计模式在业务流程编排中的应用
前言
Pipeline模式意为管道模式,又称为流水线模式。旨在通过预先设定好的一系列阶段来处理输入的数据,每个阶段的输出即是下一阶段的输入。
本案例通过定义PipelineProduct(管道产品),PipelineJob(管道任务),PipelineNode(管道节点),完成一整条流水线的组装,并将“原材料”加工为“商品”。其中管道产品负责承载各个阶段的产品信息;管道任务负责不同阶段对产品的加工;管道节点约束了管道产品及任务的关系,通过信号量定义了任务的执行方式。
依赖
工具依赖如下-
- <dependency>
- <groupId>cn.hutool</groupId>
- hutool-all</artifactId>
- <version>最新版本</version>
- </dependency>
复制代码 编程示例
1. 管道产品定义
- package com.example.demo.pipeline.model;
- /**
- * 管道产品接口
- *
- * @param <S> 信号量
- * @author
- * @date 2023/05/15 11:49
- */
- public interface PipelineProduct<S> {
- }
复制代码 2. 管道任务定义
- package com.example.demo.pipeline.model;
- /**
- * 管道任务接口
- *
- * @param <P> 管道产品
- * @author
- * @date 2023/05/15 11:52
- */
- @FunctionalInterface
- public interface PipelineJob<P> {
- /**
- * 执行任务
- *
- * @param product 管道产品
- * @return {@link P}
- */
- P execute(P product);
- }
复制代码 3. 管道节点定义
- package com.jd.baoxian.mall.market.service.pipeline.model;
- import java.util.function.Predicate;
- /**
- * 管道节点定义
- *
- * @param <S> 信号量
- * @param <P> 管道产品
- * @author
- * @date 2023/05/15 11:54
- */
- public interface PipelineNode<S, P extends PipelineProduct<S>> {
- /**
- * 节点组装,按照上个管道任务传递的信号,执行 pipelineJob
- *
- * @param pipelineJob 管道任务
- * @return {@link PipelineNode}<{@link S}, {@link P}>
- */
- PipelineNode<S, P> flax(PipelineJob<P> pipelineJob);
- /**
- * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob
- *
- * @param signal 信号
- * @param pipelineJob 管道任务
- * @return {@link PipelineNode}<{@link S}, {@link P}>
- */
- PipelineNode<S, P> flax(S signal, PipelineJob<P> pipelineJob);
- /**
- * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob
- *
- * @param predicate 信号
- * @param pipelineJob 管道任务
- * @return {@link PipelineNode}<{@link S}, {@link P}>
- */
- PipelineNode<S, P> flax(Predicate<S> predicate, PipelineJob<P> pipelineJob);
- /**
- * 管道节点-任务执行
- *
- * @param product 管道产品
- * @return {@link P}
- */
- P execute(P product);
- }
复制代码 4. 管道产品、任务,节点的实现
4.1 管道产品
- package com.example.demo.pipeline.factory;
- import com.example.demo.model.request.DemoReq;
- import com.example.demo.model.response.DemoResp;
- import com.example.demo.pipeline.model.PipelineProduct;
- import lombok.*;
- /**
- * 样例-管道产品
- *
- * @author
- * @date 2023/05/15 14:04
- */
- @Data
- @Builder
- @NoArgsConstructor
- @AllArgsConstructor
- public class DemoPipelineProduct implements PipelineProduct<DemoPipelineProduct.DemoSignalEnum> {
- /**
- * 信号量
- */
- private DemoSignalEnum signal;
- /**
- * 产品-入参及回参
- */
- private DemoProductData productData;
- /**
- * 异常信息
- */
- private Exception exception;
- /**
- * 流程Id
- */
- private String tradeId;
- @Data
- @Builder
- @NoArgsConstructor
- @AllArgsConstructor
- public static class DemoProductData {
- /**
- * 待验证入参
- */
- private DemoReq userRequestData;
- /**
- * 待验证回参
- */
- private DemoResp userResponseData;
- }
- /**
- * 产品-信号量
- *
- * @author
- * @date 2023/05/15 13:54
- */
- @Getter
- public enum DemoSignalEnum {
- /**
- *
- */
- NORMAL(0, "正常"),
- /**
- *
- */
- CHECK_NOT_PASS(1, "校验不通过"),
- /**
- *
- */
- BUSINESS_ERROR(2, "业务异常"),
- /**
- *
- */
- LOCK_ERROR(3, "锁处理异常"),
- /**
- *
- */
- DB_ERROR(4, "事务处理异常"),
- ;
- /**
- * 枚举码值
- */
- private final int code;
- /**
- * 枚举描述
- */
- private final String desc;
- /**
- * 构造器
- *
- * @param code
- * @param desc
- */
- DemoSignalEnum(int code, String desc) {
- this.code = code;
- this.desc = desc;
- }
- }
- }
复制代码 4.2 管道任务(抽象类)
- package com.example.demo.pipeline.factory.job;
- import cn.hutool.core.util.ClassUtil;
- import cn.hutool.json.JSONUtil;
- import com.example.demo.pipeline.factory.DemoPipelineProduct;
- import com.example.demo.pipeline.model.PipelineJob;
- import lombok.extern.slf4j.Slf4j;
- /**
- * 管道任务-抽象层
- *
- * @author
- * @date 2023/05/15 19:48
- */
- @Slf4j
- public abstract class AbstractDemoJob implements PipelineJob<DemoPipelineProduct> {
- /**
- * 公共执行逻辑
- *
- * @param product 产品
- * @return
- */
- @Override
- public DemoPipelineProduct execute(DemoPipelineProduct product) {
- DemoPipelineProduct.DemoSignalEnum newSignal;
- try {
- newSignal = execute(product.getTradeId(), product.getProductData());
- } catch (Exception e) {
- product.setException(e);
- newSignal = DemoPipelineProduct.DemoSignalEnum.BUSINESS_ERROR;
- }
- product.setSignal(newSignal);
- defaultLogPrint(product.getTradeId(), product);
- return product;
- }
- /**
- * 子类执行逻辑
- *
- * @param tradeId 流程Id
- * @param productData 请求数据
- * @return
- * @throws Exception 异常
- */
- abstract DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception;
- /**
- * 默认的日志打印
- */
- public void defaultLogPrint(String tradeId, DemoPipelineProduct product) {
- if (!DemoPipelineProduct.DemoSignalEnum.NORMAL.equals(product.getSignal())) {
- log.info("流水线任务处理异常:流程Id=【{}】,信号量=【{}】,任务=【{}】,参数=【{}】", tradeId, product.getSignal(),
- ClassUtil.getClassName(this, true), JSONUtil.toJsonStr(product.getProductData()), product.getException());
- }
- }
- }
复制代码 4.3 管道节点
- package com.example.demo.pipeline.factory;
- import cn.hutool.core.util.ClassUtil;
- import cn.hutool.json.JSONUtil;
- import com.example.demo.pipeline.model.PipelineJob;
- import com.example.demo.pipeline.model.PipelineNode;
- import lombok.extern.slf4j.Slf4j;
- import java.util.function.Predicate;
- /**
- * 审核-管道节点
- *
- * @author
- * @date 2023/05/15 14:32
- */
- @Slf4j
- public class DemoPipelineNode implements PipelineNode<DemoPipelineProduct.DemoSignalEnum, DemoPipelineProduct> {
- /**
- * 下一管道节点
- */
- private DemoPipelineNode next;
- /**
- * 当前管道任务
- */
- private PipelineJob<DemoPipelineProduct> job;
- /**
- * 节点组装,按照上个管道任务传递的信号,执行 pipelineJob
- *
- * @param pipelineJob 管道任务
- * @return {@link DemoPipelineNode}
- */
- @Override
- public DemoPipelineNode flax(PipelineJob<DemoPipelineProduct> pipelineJob) {
- return flax(DemoPipelineProduct.DemoSignalEnum.NORMAL, pipelineJob);
- }
- /**
- * 节点组装,按照传递的信号,判断当前管道的信号是否相等,执行 pipelineJob
- *
- * @param signal 信号
- * @param pipelineJob 管道任务
- * @return {@link DemoPipelineNode}
- */
- @Override
- public DemoPipelineNode flax(DemoPipelineProduct.DemoSignalEnum signal, PipelineJob<DemoPipelineProduct> pipelineJob) {
- return flax(signal::equals, pipelineJob);
- }
- /**
- * 节点组装,上个管道过来的信号运行 predicate 后是true的话,执行 pipelineJob
- *
- * @param predicate
- * @param pipelineJob
- * @return
- */
- @Override
- public DemoPipelineNode flax(Predicate<DemoPipelineProduct.DemoSignalEnum> predicate,
- PipelineJob<DemoPipelineProduct> pipelineJob) {
- this.next = new DemoPipelineNode();
- this.job = (job) -> {
- if (predicate.test(job.getSignal())) {
- return pipelineJob.execute(job);
- } else {
- return job;
- }
- };
- return next;
- }
- /**
- * 管道节点-任务执行
- *
- * @param product 管道产品
- * @return
- */
- @Override
- public DemoPipelineProduct execute(DemoPipelineProduct product) {
- // 执行当前任务
- try {
- product = job == null ? product : job.execute(product);
- return next == null ? product : next.execute(product);
- } catch (Exception e) {
- log.error("流水线处理异常:流程Id=【{}】,任务=【{}】,参数=【{}】", product.getTradeId(), ClassUtil.getClassName(job, true), JSONUtil.toJsonStr(product.getProductData()), product.getException());
- return null;
- }
- }
- }
复制代码 5. 业务实现
通过之前的定义,我们已经可以通过Pipeline完成流水线的搭建,接下来以“审核信息提交”这一业务场景,完成应用。
5.1 定义Api、入参、回参
- package com.example.demo.api;
- import com.example.demo.model.request.DemoReq;
- import com.example.demo.model.response.DemoResp;
- import com.example.demo.pipeline.factory.PipelineForManagerSubmit;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- /**
- * 演示-API
- *
- * @author
- * @date 2023/08/06 16:27
- */
- @Service
- public class DemoManagerApi {
- /**
- * 管道-审核提交
- */
- @Resource
- private PipelineForManagerSubmit pipelineForManagerSubmit;
- /**
- * 审核提交
- *
- * @param requestData 请求数据
- * @return {@link DemoResp}
- */
- public DemoResp managerSubmit(DemoReq requestData) {
- return pipelineForManagerSubmit.managerSubmitCheck(requestData);
- }
- }
- package com.example.demo.model.request;
- /**
- * 演示入参
- *
- * @author
- * @date 2023/08/06 16:33
- */
- public class DemoReq {
- }
- package com.example.demo.model.response;
- import lombok.Data;
- /**
- * 演示回参
- *
- * @author
- * @date 2023/08/06 16:33
- */
- @Data
- public class DemoResp {
- /**
- * 成功标识
- */
- private Boolean success = false;
- /**
- * 结果信息
- */
- private String resultMsg;
- /**
- * 构造方法
- *
- * @param message 消息
- * @return {@link DemoResp}
- */
- public static DemoResp buildRes(String message) {
- DemoResp response = new DemoResp();
- response.setResultMsg(message);
- return response;
- }
- }
复制代码 5.2 定义具体任务
假定审核提交的流程需要包含:参数验证、加锁、解锁、事务提交- package com.example.demo.pipeline.factory.job;
- import cn.hutool.json.JSONUtil;
- import com.example.demo.model.request.DemoReq;
- import com.example.demo.pipeline.factory.DemoPipelineProduct;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- /**
- * 加锁-实现层
- *
- * @author
- * @date 2023/05/17 17:00
- */
- @Service
- @Slf4j
- public class CheckRequestLockJob extends AbstractDemoJob {
- /**
- * 子类执行逻辑
- *
- * @param tradeId 流程Id
- * @param productData 请求数据
- * @return
- * @throws Exception 异常
- */
- @Override
- DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
- DemoReq userRequestData = productData.getUserRequestData();
- log.info("任务[{}]加锁,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
- return DemoPipelineProduct.DemoSignalEnum.NORMAL;
- }
- }
- package com.example.demo.pipeline.factory.job;
- import cn.hutool.json.JSONUtil;
- import com.example.demo.model.request.DemoReq;
- import com.example.demo.pipeline.factory.DemoPipelineProduct;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- /**
- * 解锁-实现层
- *
- * @author
- * @date 2023/05/17 17:00
- */
- @Service
- @Slf4j
- public class CheckRequestUnLockJob extends AbstractDemoJob {
- /**
- * 子类执行逻辑
- *
- * @param tradeId 流程Id
- * @param productData 请求数据
- * @return
- * @throws Exception 异常
- */
- @Override
- DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
- DemoReq userRequestData = productData.getUserRequestData();
- log.info("任务[{}]解锁,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
- return DemoPipelineProduct.DemoSignalEnum.NORMAL;
- }
- }
- package com.example.demo.pipeline.factory.job;
- import cn.hutool.json.JSONUtil;
- import com.example.demo.model.request.DemoReq;
- import com.example.demo.pipeline.factory.DemoPipelineProduct;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
- /**
- * 审核-参数验证-实现类
- *
- * @author
- * @date 2023/05/15 19:50
- */
- @Slf4j
- @Component
- public class ManagerCheckParamJob extends AbstractDemoJob {
- /**
- * 执行基本入参验证
- *
- * @param tradeId
- * @param productData 请求数据
- * @return
- */
- @Override
- DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) {
- /*
- * 入参验证
- */
- DemoReq userRequestData = productData.getUserRequestData();
- log.info("任务[{}]入参验证,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
- // 非空验证
- // 有效验证
- // 校验通过,退出
- return DemoPipelineProduct.DemoSignalEnum.NORMAL;
- }
- }
- package com.example.demo.pipeline.factory.job;
- import cn.hutool.json.JSONUtil;
- import com.example.demo.model.request.DemoReq;
- import com.example.demo.model.response.DemoResp;
- import com.example.demo.pipeline.factory.DemoPipelineProduct;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Service;
- /**
- * 审核-信息提交-业务实现
- *
- * @author
- * @date 2023/05/12 14:36
- */
- @Service
- @Slf4j
- public class ManagerSubmitJob extends AbstractDemoJob {
- /**
- * 子类执行逻辑
- *
- * @param tradeId 流程Id
- * @param productData 请求数据
- * @return
- * @throws Exception 异常
- */
- @Override
- DemoPipelineProduct.DemoSignalEnum execute(String tradeId, DemoPipelineProduct.DemoProductData productData) throws Exception {
- DemoReq userRequestData = productData.getUserRequestData();
- try {
- /*
- * DB操作
- */
- log.info("任务[{}]信息提交,线程号:{}", JSONUtil.toJsonStr(userRequestData), tradeId);
- productData.setUserResponseData(DemoResp.buildRes("成功"));
- } catch (Exception ex) {
- log.error("审核-信息提交-DB操作失败,入参:{}", JSONUtil.toJsonStr(userRequestData), ex);
- throw ex;
- }
- return DemoPipelineProduct.DemoSignalEnum.NORMAL;
- }
- }
复制代码 5.3 完成流水线组装
针对入回参转换,管道任务执行顺序及执行信号量的构建总结
本文重点为管道模式的抽象与应用,上述示例仅为个人理解。实际应用中,此案例长于应对各种规则冗杂的业务场景,便于规则编排。
待改进点:
- 各个任务其实隐含了执行的先后顺序,此项内容可进一步实现;
- 针对最后“流水线组装”这一步,可通过配置描述的方式,进一步抽象,从而将变动控制在每个“管道任务”的描述上,针对规则项做到“可插拔”式处理。
作者:京东保险 侯亚东
来源:京东云开发者社区 转载请注明来源
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |