找回密码
 立即注册
首页 业界区 业界 Seata源码—8.Seata Saga模式的事务处理

Seata源码—8.Seata Saga模式的事务处理

庞环 2025-6-3 00:01:04
大纲
1.Seata Saga案例简介
2.Seata Saga案例的状态机定义分析
3.Seata Saga分布式事务与状态机关系
4.Seata Saga案例的Dubbo服务调用配置分析
5.Seata Saga案例的状态机数据库和工程启动
6.基于数据库的状态机配置实例的初始化
7.状态机配置实例中包含的一些关键组件
8.默认的状态机配置类的初始化
9.状态机定义的仓储组件解析状态机定义文件
10.状态机定义的仓储组件注册StateMachine
11.状态机引擎接口的源码
12.状态机引擎创建状态机实例的源码
13.ProcessContextBuilder构建流程上下文
14.StateLogStore记录日志和开启Saga全局事务
15.状态操作组件获取状态和State状态类继承体系
16.启动状态机实例时发布流程上下文到事件总线
17.通过业务处理器处理状态机当前需要执行的状态
18.通过服务调用组件来执行State指定的服务方法
19.将业务处理器路由到状态机下一个要执行的状态
 
1.Seata Saga案例简介
seata-samples项目下的saga模块,会基于Seata Saga模式,实现了分布式事务的提交和回滚。
 
在dubbo-saga-sample模块中,一个分布式事务内会有两个Saga事务参与者,这两个Saga事务参与者分别是InventoryAction和BalanceAction。当分布式事务提交时,两者均提交。当分布式事务回滚时,两者均回滚。
 
这两个Saga事务参与者均是Dubbo服务。这两个Saga事务参与者都有一个reduce方法表示库存扣减或余额扣减,还有一个compensateReduce方法表示补偿扣减操作。
 
InventoryAction接口定义如下:
  1. public interface InventoryAction {
  2.     boolean reduce(String businessKey, int count);
  3.     boolean compensateReduce(String businessKey);
  4. }
复制代码
BalanceAction接口定义如下:
  1. public interface BalanceAction {
  2.     boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);
  3.     boolean compensateReduce(String businessKey, Map<String, Object> params);
  4. }
复制代码
运行dubbo-saga-sample模块的Demo工程:
步骤一:启动Seata Server
步骤二:运行DubboSagaProviderStarter
步骤三:运行DubboSagaTransactionStarter
 
2.Seata Saga案例的状态机定义分析
(1)Saga状态机定义文件详情
(2)状态机定义的全流程输入输出和流转分析
 
(1)Saga状态机定义文件详情
Saga的本质就是一个状态机。
 
reduce_inventory_and_balance.json
  1. {
  2.     "Name": "reduceInventoryAndBalance",
  3.     "Comment": "reduce inventory then reduce balance in a transaction",
  4.     "StartState": "ReduceInventory",
  5.     "Version": "0.0.1",
  6.     "States": {
  7.         "ReduceInventory": {
  8.             "Type": "ServiceTask",
  9.             "ServiceName": "inventoryAction",
  10.             "ServiceMethod": "reduce",
  11.             "CompensateState": "CompensateReduceInventory",
  12.             "Next": "ChoiceState",
  13.             "Input": [
  14.                 "$.[businessKey]",
  15.                 "$.[count]"
  16.             ],
  17.             "Output": {
  18.                 "reduceInventoryResult": "$.#root"
  19.             },
  20.             "Status": {
  21.                 "#root == true": "SU",
  22.                 "#root == false": "FA",
  23.                 "$Exception{java.lang.Throwable}": "UN"
  24.             }
  25.         },
  26.         "ChoiceState": {
  27.             "Type": "Choice",
  28.             "Choices": [{
  29.                 "Expression": "[reduceInventoryResult] == true",
  30.                 "Next": "ReduceBalance"
  31.             }],
  32.             "Default": "Fail"
  33.         },
  34.         "ReduceBalance": {
  35.             "Type": "ServiceTask",
  36.             "ServiceName": "balanceAction",
  37.             "ServiceMethod": "reduce",
  38.             "CompensateState": "CompensateReduceBalance",
  39.             "Input": [
  40.                 "$.[businessKey]",
  41.                 "$.[amount]",
  42.                 {
  43.                     "throwException": "$.[mockReduceBalanceFail]"
  44.                 }
  45.             ],
  46.             "Output": {
  47.                 "compensateReduceBalanceResult": "$.#root"
  48.             },
  49.             "Status": {
  50.                 "#root == true": "SU",
  51.                 "#root == false": "FA",
  52.                 "$Exception{java.lang.Throwable}": "UN"
  53.             },
  54.             "Catch": [{
  55.                 "Exceptions": [
  56.                     "java.lang.Throwable"
  57.                 ],
  58.                 "Next": "CompensationTrigger"
  59.             }],
  60.             "Next": "Succeed"
  61.         },
  62.         "CompensateReduceInventory": {
  63.             "Type": "ServiceTask",
  64.             "ServiceName": "inventoryAction",
  65.             "ServiceMethod": "compensateReduce",
  66.             "Input": [
  67.                 "$.[businessKey]"
  68.             ]
  69.         },
  70.         "CompensateReduceBalance": {
  71.             "Type": "ServiceTask",
  72.             "ServiceName": "balanceAction",
  73.             "ServiceMethod": "compensateReduce",
  74.             "Input": [
  75.                 "$.[businessKey]"
  76.             ]
  77.         },
  78.         "CompensationTrigger": {
  79.             "Type": "CompensationTrigger",
  80.             "Next": "Fail"
  81.         },
  82.         "Succeed": {
  83.             "Type": "Succeed"
  84.         },
  85.         "Fail": {
  86.             "Type": "Fail",
  87.             "ErrorCode": "PURCHASE_FAILED",
  88.             "Message": "purchase failed"
  89.         }
  90.     }
  91. }
复制代码
(2)状态机定义的全流程输入输出和流转分析
1.png
2.png
 
3.Seata Saga分布式事务与状态机关系
Seata Saga分布式事务基本都是通过状态机一起配合起来使用的,通过状态机来调度编排每一个分布式事务的执行。编排的每一个服务执行时,可能会执行本地事务,也可能会调用远程服务。触发每一个服务的补偿时,可能会回滚本地事务,也可能会补偿远程服务。
3.png
 
4.Seata Saga案例的Dubbo服务调用配置分析
(1)Dubbo服务提供者的配置
(2)Dubbo服务调用者的配置
(3)Seata Saga状态机的Bean的配置
 
(1)Dubbo服务提供者的配置
seata-dubbo-provider.xml
  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
  3.        xmlns="http://www.springframework.org/schema/beans"
  4.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  5.        http://code.alibabatech.com/schema/dubbo
  6.        http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
  7.     <dubbo:application name="saga-sample">
  8.         <dubbo:parameter key="qos.enable" value="true"/>
  9.         <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
  10.         <dubbo:parameter key="qos.port" value="33333"/>
  11.     </dubbo:application>
  12.     <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
  13.     <dubbo:protocol name="dubbo" port="-1"/>
  14.     <dubbo:provider timeout="10000" threads="10" threadpool="fixed" loadbalance="roundrobin"/>
  15.    
  16.     <dubbo:service ref="inventoryActionImpl" interface="io.seata.samples.saga.action.InventoryAction"/>
  17.     <dubbo:service ref="balanceActionImpl" interface="io.seata.samples.saga.action.BalanceAction"/>
  18.     <bean id="inventoryActionImpl" />
  19.     <bean id="balanceActionImpl" />
  20. </beans>
复制代码
4.png
(2)Dubbo服务调用者的配置
seata-dubbo-reference.xml
  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
  3.        xmlns="http://www.springframework.org/schema/beans"
  4.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  5.        http://code.alibabatech.com/schema/dubbo
  6.        http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
  7.     <dubbo:application name="saga-sample-reference">
  8.         <dubbo:parameter key="qos.enable" value="true"/>
  9.         <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
  10.         <dubbo:parameter key="qos.port" value="22222"/>
  11.     </dubbo:application>
  12.     <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
  13.     <dubbo:protocol name="dubbo" port="-1"/>
  14.     <dubbo:reference id="inventoryAction" interface="io.seata.samples.saga.action.InventoryAction" check="false" lazy="true"/>
  15.     <dubbo:reference id="balanceAction" interface="io.seata.samples.saga.action.BalanceAction" check="false" lazy="true"/>
  16. </beans>
复制代码
5.png
(3)Seata Saga状态机的Bean的配置
seata-saga.xml
  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
  3.        xmlns="http://www.springframework.org/schema/beans"
  4.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  5.        http://code.alibabatech.com/schema/dubbo
  6.        http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
  7.     <dubbo:application name="saga-sample">
  8.         <dubbo:parameter key="qos.enable" value="true"/>
  9.         <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
  10.         <dubbo:parameter key="qos.port" value="33333"/>
  11.     </dubbo:application>
  12.     <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
  13.     <dubbo:protocol name="dubbo" port="-1"/>
  14.     <dubbo:provider timeout="10000" threads="10" threadpool="fixed" loadbalance="roundrobin"/>
  15.    
  16.     <dubbo:service ref="inventoryActionImpl" interface="io.seata.samples.saga.action.InventoryAction"/>
  17.     <dubbo:service ref="balanceActionImpl" interface="io.seata.samples.saga.action.BalanceAction"/>
  18.     <bean id="inventoryActionImpl" />
  19.     <bean id="balanceActionImpl" />
  20. </beans><beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  21.        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
  22.        xmlns="http://www.springframework.org/schema/beans"
  23.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  24.        http://code.alibabatech.com/schema/dubbo
  25.        http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
  26.     <dubbo:application name="saga-sample">
  27.         <dubbo:parameter key="qos.enable" value="true"/>
  28.         <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
  29.         <dubbo:parameter key="qos.port" value="33333"/>
  30.     </dubbo:application>
  31.     <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
  32.     <dubbo:protocol name="dubbo" port="-1"/>
  33.     <dubbo:provider timeout="10000" threads="10" threadpool="fixed" loadbalance="roundrobin"/>
  34.    
  35.     <dubbo:service ref="inventoryActionImpl" interface="io.seata.samples.saga.action.InventoryAction"/>
  36.     <dubbo:service ref="balanceActionImpl" interface="io.seata.samples.saga.action.BalanceAction"/>
  37.     <bean id="inventoryActionImpl" />
  38.     <bean id="balanceActionImpl" />
  39. </beans><beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  40.        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
  41.        xmlns="http://www.springframework.org/schema/beans"
  42.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  43.        http://code.alibabatech.com/schema/dubbo
  44.        http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
  45.     <dubbo:application name="saga-sample">
  46.         <dubbo:parameter key="qos.enable" value="true"/>
  47.         <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
  48.         <dubbo:parameter key="qos.port" value="33333"/>
  49.     </dubbo:application>
  50.     <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
  51.     <dubbo:protocol name="dubbo" port="-1"/>
  52.     <dubbo:provider timeout="10000" threads="10" threadpool="fixed" loadbalance="roundrobin"/>
  53.    
  54.     <dubbo:service ref="inventoryActionImpl" interface="io.seata.samples.saga.action.InventoryAction"/>
  55.     <dubbo:service ref="balanceActionImpl" interface="io.seata.samples.saga.action.BalanceAction"/>
  56.     <bean id="inventoryActionImpl" />
  57.     <bean id="balanceActionImpl" />
  58. </beans><beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  59.        xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
  60.        xmlns="http://www.springframework.org/schema/beans"
  61.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  62.        http://code.alibabatech.com/schema/dubbo
  63.        http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
  64.     <dubbo:application name="saga-sample-reference">
  65.         <dubbo:parameter key="qos.enable" value="true"/>
  66.         <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
  67.         <dubbo:parameter key="qos.port" value="22222"/>
  68.     </dubbo:application>
  69.     <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
  70.     <dubbo:protocol name="dubbo" port="-1"/>
  71.     <dubbo:reference id="inventoryAction" interface="io.seata.samples.saga.action.InventoryAction" check="false" lazy="true"/>
  72.     <dubbo:reference id="balanceAction" interface="io.seata.samples.saga.action.BalanceAction" check="false" lazy="true"/>
  73. </beans>        
复制代码
6.png
 
5.Seata Saga案例的状态机数据库和工程启动
(1)状态机相关的数据库表
(2)Seata Saga案例的工程启动
 
(1)状态机相关的数据库表
状态机定义 -> 状态机实例 -> 状态实例
  1. -- PUBLIC.SEATA_STATE_INST definition
  2. -- 状态实例,每个状态机实例里会有多个状态实例
  3. CREATE CACHED TABLE "PUBLIC"."SEATA_STATE_INST"(
  4.     "ID" VARCHAR NOT NULL COMMENT 'id',
  5.     "MACHINE_INST_ID" VARCHAR NOT NULL COMMENT 'state machine instance id,机器实例ID',
  6.     "NAME" VARCHAR NOT NULL COMMENT 'state name',
  7.     "TYPE" VARCHAR COMMENT 'state type',
  8.     "SERVICE_NAME" VARCHAR COMMENT 'service name',
  9.     "SERVICE_METHOD" VARCHAR COMMENT 'method name',
  10.     "SERVICE_TYPE" VARCHAR COMMENT 'service type',
  11.     "BUSINESS_KEY" VARCHAR COMMENT 'business key',
  12.     "STATE_ID_COMPENSATED_FOR" VARCHAR COMMENT 'state compensated for',
  13.     "STATE_ID_RETRIED_FOR" VARCHAR COMMENT 'state retried for',
  14.     "GMT_STARTED" TIMESTAMP NOT NULL COMMENT 'start time',
  15.     "IS_FOR_UPDATE" TINYINT COMMENT 'is service for update',
  16.     "INPUT_PARAMS" CLOB COMMENT 'input parameters',
  17.     "OUTPUT_PARAMS" CLOB COMMENT 'output parameters',
  18.     "STATUS" VARCHAR NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
  19.     "EXCEP" BLOB COMMENT 'exception',
  20.     "GMT_UPDATED" TIMESTAMP COMMENT 'update time',
  21.     "GMT_END" TIMESTAMP COMMENT 'end time'
  22. );
  23. -- PUBLIC.SEATA_STATE_MACHINE_DEF definition
  24. -- 状态机定义,在json文件中定义好的状态流程会存放到这里
  25. CREATE CACHED TABLE "PUBLIC"."SEATA_STATE_MACHINE_DEF"(
  26.     "ID" VARCHAR NOT NULL COMMENT 'id',
  27.     "NAME" VARCHAR NOT NULL COMMENT 'name',
  28.     "TENANT_ID" VARCHAR NOT NULL COMMENT 'tenant id',
  29.     "APP_NAME" VARCHAR NOT NULL COMMENT 'application name',
  30.     "TYPE" VARCHAR COMMENT 'state language type',
  31.     "COMMENT_" VARCHAR COMMENT 'comment',
  32.     "VER" VARCHAR NOT NULL COMMENT 'version',
  33.     "GMT_CREATE" TIMESTAMP NOT NULL COMMENT 'create time',
  34.     "STATUS" VARCHAR NOT NULL COMMENT 'status(AC:active|IN:inactive)',
  35.     "CONTENT" CLOB COMMENT 'content',
  36.     "RECOVER_STRATEGY" VARCHAR COMMENT 'transaction recover strategy(compensate|retry)'
  37. );
  38. -- PUBLIC.SEATA_STATE_MACHINE_INST definition
  39. -- 状态机实例,每执行一次完整的分布式事务就会对应一个状态机实例,每个状态机实例里会有多个状态实例
  40. CREATE CACHED TABLE "PUBLIC"."SEATA_STATE_MACHINE_INST"(
  41.     "ID" VARCHAR NOT NULL COMMENT 'id',
  42.     "MACHINE_ID" VARCHAR NOT NULL COMMENT 'state machine definition id',
  43.     "TENANT_ID" VARCHAR NOT NULL COMMENT 'tenant id',
  44.     "PARENT_ID" VARCHAR COMMENT 'parent id',
  45.     "GMT_STARTED" TIMESTAMP NOT NULL COMMENT 'start time',
  46.     "BUSINESS_KEY" VARCHAR COMMENT 'business key',
  47.     "START_PARAMS" CLOB COMMENT 'start parameters',
  48.     "GMT_END" TIMESTAMP COMMENT 'end time',
  49.     "EXCEP" BLOB COMMENT 'exception',
  50.     "END_PARAMS" CLOB COMMENT 'end parameters',
  51.     "STATUS" VARCHAR COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
  52.     "COMPENSATION_STATUS" VARCHAR COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
  53.     "IS_RUNNING" TINYINT COMMENT 'is running(0 no|1 yes)',
  54.     "GMT_UPDATED" TIMESTAMP NOT NULL
  55. );
复制代码
(2)Seata Saga案例的工程启动
ApplicationKeeper启动完之后就会被阻塞住。
  1. public class ApplicationKeeper {
  2.     private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationKeeper.class);
  3.     private final ReentrantLock LOCK = new ReentrantLock();
  4.     private final Condition STOP = LOCK.newCondition();
  5.     //Instantiates a new Application keeper.
  6.     public ApplicationKeeper(AbstractApplicationContext applicationContext) {
  7.         addShutdownHook(applicationContext);
  8.     }
  9.     private void addShutdownHook(final AbstractApplicationContext applicationContext) {
  10.         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
  11.             @Override
  12.             public void run() {
  13.                 try {
  14.                     applicationContext.close();
  15.                     LOGGER.info("ApplicationContext " + applicationContext + " is closed.");
  16.                 } catch (Exception e) {
  17.                     LOGGER.error("Failed to close ApplicationContext", e);
  18.                 }
  19.                 try {
  20.                     LOCK.lock();
  21.                     STOP.signal();
  22.                 } finally {
  23.                     LOCK.unlock();
  24.                 }
  25.             }
  26.         }));
  27.     }
  28.     public void keep() {
  29.         synchronized (LOCK) {
  30.             try {
  31.                 LOGGER.info("Application is keep running ... ");
  32.                 LOCK.wait();
  33.             } catch (InterruptedException e) {
  34.                 e.printStackTrace();
  35.             }
  36.         }
  37.     }
  38. }
复制代码
步骤一:运行DubboSagaProviderStarter,启动Dubbo Provider。
  1. public class DubboSagaProviderStarter {
  2.     private static TestingServer server;
  3.     //The entry point of application.
  4.     public static void main(String[] args) throws Exception {
  5.         //mock zk server
  6.         mockZKServer();
  7.         ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
  8.             new String[]{"spring/seata-dubbo-provider.xml"}
  9.         );
  10.         new ApplicationKeeper(applicationContext).keep();
  11.     }
  12.     private static void mockZKServer() throws Exception {
  13.         //Mock zk server,作为Dubbo配置中心
  14.         server = new TestingServer(2181, true);
  15.         server.start();
  16.     }
  17. }
  18. public class InventoryActionImpl implements InventoryAction {
  19.     private static final Logger LOGGER = LoggerFactory.getLogger(InventoryActionImpl.class);
  20.    
  21.     @Override
  22.     public boolean reduce(String businessKey, int count) {
  23.         LOGGER.info("reduce inventory succeed, count: " + count + ", businessKey:" + businessKey);
  24.         return true;
  25.     }
  26.    
  27.     @Override
  28.     public boolean compensateReduce(String businessKey) {
  29.         LOGGER.info("compensate reduce inventory succeed, businessKey:" + businessKey);
  30.         return true;
  31.     }
  32. }
  33. public class BalanceActionImpl implements BalanceAction {
  34.     private static final Logger LOGGER = LoggerFactory.getLogger(BalanceActionImpl.class);
  35.    
  36.     @Override
  37.     public boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) {
  38.         if (params != null) {
  39.             Object throwException = params.get("throwException");
  40.             if (throwException != null && "true".equals(throwException.toString())) {
  41.                 throw new RuntimeException("reduce balance failed");
  42.             }
  43.         }
  44.         LOGGER.info("reduce balance succeed, amount: " + amount + ", businessKey:" + businessKey);
  45.         return true;
  46.     }
  47.     @Override
  48.     public boolean compensateReduce(String businessKey, Map<String, Object> params) {
  49.         if (params != null) {
  50.             Object throwException = params.get("throwException");
  51.             if (throwException != null && "true".equals(throwException.toString())) {
  52.                 throw new RuntimeException("compensate reduce balance failed");
  53.             }
  54.         }
  55.         LOGGER.info("compensate reduce balance succeed, businessKey:" + businessKey);
  56.         return true;
  57.     }
  58. }
复制代码
步骤二:运行DubboSagaTransactionStarter,启动Demo工程。
  1. public class DubboSagaTransactionStarter {
  2.     private static volatile Object lock = new Object();
  3.    
  4.     private static AsyncCallback CALL_BACK = new AsyncCallback() {
  5.         @Override
  6.         public void onFinished(ProcessContext context, StateMachineInstance stateMachineInstance) {
  7.             synchronized (lock) {
  8.                 lock.notifyAll();
  9.             }
  10.         }
  11.         
  12.         @Override
  13.         public void onError(ProcessContext context, StateMachineInstance stateMachineInstance, Exception exp) {
  14.             synchronized (lock) {
  15.                 lock.notifyAll();
  16.             }
  17.         }
  18.     };
  19.     public static void main(String[] args) {
  20.         AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext(
  21.             new String[]{"spring/seata-saga.xml", "spring/seata-dubbo-reference.xml"}
  22.         );
  23.         StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationContext.getBean("stateMachineEngine");
  24.         transactionCommittedDemo(stateMachineEngine);
  25.         transactionCompensatedDemo(stateMachineEngine);
  26.         new ApplicationKeeper(applicationContext).keep();
  27.     }
  28.     private static void transactionCommittedDemo(StateMachineEngine stateMachineEngine) {
  29.         //设置状态机上下文
  30.         Map<String, Object> startParams = new HashMap<>(3);
  31.         String businessKey = String.valueOf(System.currentTimeMillis());
  32.         startParams.put("businessKey", businessKey);
  33.         startParams.put("count", 10);
  34.         startParams.put("amount", new BigDecimal("100"));
  35.         //sync test,同步启动状态机
  36.         StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);
  37.         Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID: " + inst.getId());
  38.         System.out.println("saga transaction commit succeed. XID: " + inst.getId());
  39.         //async test,异步启动状态机
  40.         businessKey = String.valueOf(System.currentTimeMillis());
  41.         inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams, CALL_BACK);
  42.         waittingForFinish(inst);
  43.         Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID: " + inst.getId());
  44.         System.out.println("saga transaction commit succeed. XID: " + inst.getId());
  45.     }
  46.     private static void transactionCompensatedDemo(StateMachineEngine stateMachineEngine) {
  47.         //设置状态机上下文
  48.         Map<String, Object> startParams = new HashMap<>(4);
  49.         String businessKey = String.valueOf(System.currentTimeMillis());
  50.         startParams.put("businessKey", businessKey);
  51.         startParams.put("count", 10);
  52.         startParams.put("amount", new BigDecimal("100"));
  53.         startParams.put("mockReduceBalanceFail", "true");
  54.         //sync test,同步启动状态机
  55.         StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);
  56.         //async test,异步启动状态机
  57.         businessKey = String.valueOf(System.currentTimeMillis());
  58.         inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams, CALL_BACK);
  59.         waittingForFinish(inst);
  60.         Assert.isTrue(ExecutionStatus.SU.equals(inst.getCompensationStatus()), "saga transaction compensate failed. XID: " + inst.getId());
  61.         System.out.println("saga transaction compensate succeed. XID: " + inst.getId());
  62.     }
  63.     private static void waittingForFinish(StateMachineInstance inst) {
  64.         synchronized (lock) {
  65.             if (ExecutionStatus.RU.equals(inst.getStatus())) {
  66.                 try {
  67.                     lock.wait();
  68.                 } catch (InterruptedException e) {
  69.                     e.printStackTrace();
  70.                 }
  71.             }
  72.         }
  73.     }
  74. }
复制代码
 
6.基于数据库的状态机配置实例的初始化
基于数据库的状态机配置实例DbStateMachineConfig会将状态机定义、状态机实例、状态实例放在数据库中。由seata-saga.xml配置文件可知,它会被注入到状态机引擎实例StateMachineEngine里。DbStateMachineConfig的初始化源码如下:
  1. //DbStateMachineConfig表示状态机定义、状态机实例、状态实例会放在DB中
  2. public class DbStateMachineConfig extends DefaultStateMachineConfig implements DisposableBean {
  3.     //会通过seata-saga.xml配置文件,注入一个数据库连接池进来,这个DB会用来存放状态机定义、状态机实例、状态实例
  4.     private DataSource dataSource;
  5.     //应用程序ID
  6.     private String applicationId;
  7.     //分布式事务服务的分组
  8.     private String txServiceGroup;
  9.     //DB里存放状态机数据的表的前缀
  10.     private String tablePrefix = "seata_";
  11.     //DB的类型,seata-samples里用的是h2,一般都用MySQL
  12.     private String dbType;
  13.     //Saga分布式事务模版
  14.     private SagaTransactionalTemplate sagaTransactionalTemplate;
  15.     //是否启用RM资源管理器上报成功的机制,默认是不启用的
  16.     private boolean rmReportSuccessEnable = DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE;
  17.     //是否启用Saga分支事务的注册机制,默认是不启用的
  18.     private boolean sagaBranchRegisterEnable = DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE;
  19.     //初始化
  20.     public DbStateMachineConfig() {
  21.         try {
  22.             Configuration configuration = ConfigurationFactory.getInstance();
  23.             if (configuration != null) {
  24.                 this.rmReportSuccessEnable = configuration.getBoolean(ConfigurationKeys.CLIENT_REPORT_SUCCESS_ENABLE, DEFAULT_CLIENT_REPORT_SUCCESS_ENABLE);
  25.                 this.sagaBranchRegisterEnable = configuration.getBoolean(ConfigurationKeys.CLIENT_SAGA_BRANCH_REGISTER_ENABLE, DEFAULT_CLIENT_SAGA_BRANCH_REGISTER_ENABLE);
  26.                 //设置Saga状态机定义文件的解析器
  27.                 setSagaJsonParser(configuration.getConfig(ConfigurationKeys.CLIENT_SAGA_JSON_PARSER, DEFAULT_SAGA_JSON_PARSER));
  28.                 this.applicationId = configuration.getConfig(ConfigurationKeys.APPLICATION_ID);
  29.                 this.txServiceGroup = configuration.getConfig(ConfigurationKeys.TX_SERVICE_GROUP);
  30.                 //设置Saga重试持久化模式是否为更新模式
  31.                 setSagaRetryPersistModeUpdate(configuration.getBoolean(ConfigurationKeys.CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE, DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE));
  32.                 //设置Saga补偿持久化模式是否为更新模式
  33.                 setSagaCompensatePersistModeUpdate(configuration.getBoolean(ConfigurationKeys.CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE, DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE));
  34.             }
  35.         } catch (Exception e) {
  36.             LOGGER.warn("Load SEATA configuration failed, use default configuration instead.", e);
  37.         }
  38.     }
  39.     //根据数据库连接池获取DB类型
  40.     public static String getDbTypeFromDataSource(DataSource dataSource) throws SQLException {
  41.         try (Connection con = dataSource.getConnection()) {
  42.             DatabaseMetaData metaData = con.getMetaData();
  43.             return metaData.getDatabaseProductName();
  44.         }
  45.     }
  46.     //初始化DB状态机配置DbStateMachineConfig
  47.     //因为DbStateMachineConfig本身就是Spring Bean,当该Bean初始化完毕后,就会执行afterPropertiesSet()方法
  48.     @Override
  49.     public void afterPropertiesSet() throws Exception {
  50.         //先根据数据库连接池获取DB类型
  51.         dbType = getDbTypeFromDataSource(dataSource);
  52.         //如果状态机实例日志和状态实例日志存储组件为null
  53.         if (getStateLogStore() == null) {
  54.             DbAndReportTcStateLogStore dbStateLogStore = new DbAndReportTcStateLogStore();
  55.             dbStateLogStore.setDataSource(dataSource);
  56.             dbStateLogStore.setTablePrefix(tablePrefix);
  57.             dbStateLogStore.setDbType(dbType);
  58.             dbStateLogStore.setDefaultTenantId(getDefaultTenantId());
  59.             dbStateLogStore.setSeqGenerator(getSeqGenerator());
  60.    
  61.             if (StringUtils.hasLength(getSagaJsonParser())) {
  62.                 ParamsSerializer paramsSerializer = new ParamsSerializer();
  63.                 paramsSerializer.setJsonParserName(getSagaJsonParser());
  64.                 dbStateLogStore.setParamsSerializer(paramsSerializer);
  65.             }
  66.             if (sagaTransactionalTemplate == null) {
  67.                 DefaultSagaTransactionalTemplate defaultSagaTransactionalTemplate = new DefaultSagaTransactionalTemplate();
  68.                 defaultSagaTransactionalTemplate.setApplicationContext(getApplicationContext());
  69.                 defaultSagaTransactionalTemplate.setApplicationId(applicationId);
  70.                 defaultSagaTransactionalTemplate.setTxServiceGroup(txServiceGroup);
  71.                 defaultSagaTransactionalTemplate.afterPropertiesSet();
  72.                 sagaTransactionalTemplate = defaultSagaTransactionalTemplate;
  73.             }
  74.             dbStateLogStore.setSagaTransactionalTemplate(sagaTransactionalTemplate);
  75.             setStateLogStore(dbStateLogStore);
  76.         }
  77.         if (getStateLangStore() == null) {
  78.             DbStateLangStore dbStateLangStore = new DbStateLangStore();
  79.             dbStateLangStore.setDataSource(dataSource);
  80.             dbStateLangStore.setTablePrefix(tablePrefix);
  81.             dbStateLangStore.setDbType(dbType);
  82.             setStateLangStore(dbStateLangStore);
  83.         }
  84.         super.afterPropertiesSet();//must execute after StateLangStore initialized
  85.     }
  86.     @Override
  87.     public void destroy() throws Exception {
  88.         if ((sagaTransactionalTemplate != null) && (sagaTransactionalTemplate instanceof DisposableBean)) {
  89.             ((DisposableBean) sagaTransactionalTemplate).destroy();
  90.         }
  91.     }
  92.     ...
  93. }
  94. //Default state machine configuration
  95. //默认的状态机配置,DB状态机配置DbStateMachineConfig是它的子类
  96. //DefaultStateMachineConfig可以支持不同种类的状态机配置
  97. //即状态机定义、状态机实例、状态实例,既可以放在DB里,也可以放在其他存储里,默认是放在DB里的
  98. //DefaultStateMachineConfig会封装(注入)状态机运行时需要的所有组件
  99. //StateMachineConfig可以获取到状态机需要的各种组件
  100. //ApplicationContextAware可以感知Spring容器上下文
  101. //InitializingBean可以对Spring Bean进行初始化
  102. public class DefaultStateMachineConfig implements StateMachineConfig, ApplicationContextAware, InitializingBean {
  103.     ...
  104.     ...
  105. }
复制代码
7.png
 
7.状态机配置实例中包含的一些关键组件
(1)状态机配置组件
(2)状态机实例和状态实例的仓储组件
(3)状态机实例日志和状态实例日志的存储组件
(4)状态机定义的存储组件
(5)表达式工厂管理器
(6)状态机定义的仓储组件
 
(1)状态机配置组件
StateMachineConfig表示的是状态机配置组件,可以从中获取状态机需要的各种组件。
  1. //StateMachineConfig
  2. //状态机配置组件接口,可以获取到状态机需要的各种组件
  3. public interface StateMachineConfig {
  4.     //Gets state log store.
  5.     //获取状态机实例和状态实例的仓储组件
  6.     //StateLogRepository可以根据各种条件查询状态机实例和状态实例
  7.     StateLogRepository getStateLogRepository();
  8.     //Gets get state log store.
  9.     //获取状态机实例日志和状态实例日志的存储组件
  10.     //StateLogStore可以对状态机日志和状态日志进行读写、也可以根据各种条件查询状态机实例和状态实例
  11.     StateLogStore getStateLogStore();
  12.     //Gets get state language definition store.
  13.     //获取状态机定义的存储组件
  14.     StateLangStore getStateLangStore();
  15.     //Gets get expression factory manager.
  16.     //获取表达式工厂管理器
  17.     ExpressionFactoryManager getExpressionFactoryManager();
  18.     //Gets get evaluator factory manager.
  19.     //获取表达式计算工厂管理器
  20.     EvaluatorFactoryManager getEvaluatorFactoryManager();
  21.     //Gets get charset.
  22.     //获取字符集编码
  23.     String getCharset();
  24.     //Gets get default tenant id.
  25.     //获取默认的租户ID
  26.     String getDefaultTenantId();
  27.     //Gets get state machine repository.
  28.     //获取状态机定义的仓储组件
  29.     StateMachineRepository getStateMachineRepository();
  30.     //Gets get status decision strategy.
  31.     //获取State执行结果的判定策略组件
  32.     StatusDecisionStrategy getStatusDecisionStrategy();
  33.     //Gets get seq generator.
  34.     //获取序号生成器
  35.     SeqGenerator getSeqGenerator();
  36.     //Gets get process ctrl event publisher.
  37.     //获取流程控制事件的发布器(同步发布)
  38.     ProcessCtrlEventPublisher getProcessCtrlEventPublisher();
  39.     //Gets get async process ctrl event publisher.
  40.     //获取流程控制事件的发布器(异步发布)
  41.     ProcessCtrlEventPublisher getAsyncProcessCtrlEventPublisher();
  42.     //Gets get application context.
  43.     //获取Spring容器上下文
  44.     ApplicationContext getApplicationContext();
  45.     //Gets get thread pool executor.
  46.     //获取异步执行的线程池
  47.     ThreadPoolExecutor getThreadPoolExecutor();
  48.     //Is enable async boolean.
  49.     //是否启用异步
  50.     boolean isEnableAsync();
  51.     //get ServiceInvokerManager
  52.     //获取服务调用的管理器
  53.     ServiceInvokerManager getServiceInvokerManager();
  54.     //get trans operation timeout
  55.     //获取事务操作的超时时间
  56.     int getTransOperationTimeout();
  57.     //get service invoke timeout
  58.     //获取服务调用的超时时间
  59.     int getServiceInvokeTimeout();
  60.     //get ScriptEngineManager
  61.     //获取脚本引擎管理器
  62.     ScriptEngineManager getScriptEngineManager();
  63. }
复制代码
(2)状态机实例和状态实例的仓储组件
StateLogRepository表示的是状态机实例和状态实例的仓储组件,可以根据不同的条件查询状态机实例和状态实例。
  1. //State Log Repository
  2. //状态机实例和状态实例的仓储组件,可以根据各种条件查询状态机实例和状态实例
  3. public interface StateLogRepository {
  4.     //Get state machine instance
  5.     //根据状态机实例ID,获取到具体的状态机实例
  6.     StateMachineInstance getStateMachineInstance(String stateMachineInstanceId);
  7.     //Get state machine instance by businessKey
  8.     //根据业务key获取状态机实例
  9.     StateMachineInstance getStateMachineInstanceByBusinessKey(String businessKey, String tenantId);
  10.     //Query the list of state machine instances by parent id
  11.     //根据父ID查询状态机实例
  12.     List<StateMachineInstance> queryStateMachineInstanceByParentId(String parentId);
  13.     //Get state instance
  14.     //根据状态实例ID查询状态实例
  15.     StateInstance getStateInstance(String stateInstanceId, String machineInstId);
  16.     //Get a list of state instances by state machine instance id
  17.     //根据状态机实例ID查询所有状态实例
  18.     List<StateInstance> queryStateInstanceListByMachineInstanceId(String stateMachineInstanceId);
  19. }
复制代码
(3)状态机实例日志和状态实例日志的存储组件接口
StateLogStore表示的是状态机实例日志和状态实例日志的存储组件,可以记录各种状态机实例日志和状态实例日志。
  1. //StateMachine engine log store
  2. //状态机实例日志和状态实例日志的存储组件
  3. public interface StateLogStore {
  4.     //Record state machine startup events
  5.     //记录状态机实例的启动事件日志
  6.     void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context);
  7.     //Record status end event
  8.     //记录状态机实例的完成事件日志
  9.     void recordStateMachineFinished(StateMachineInstance machineInstance, ProcessContext context);
  10.     //Record state machine restarted
  11.     //记录状态机实例的重启事件日志
  12.     void recordStateMachineRestarted(StateMachineInstance machineInstance, ProcessContext context);
  13.     //Record state start execution event
  14.     //记录状态实例的启动事件日志
  15.     void recordStateStarted(StateInstance stateInstance, ProcessContext context);
  16.     //Record state execution end event
  17.     //记录状态实例的完成事件日志
  18.     void recordStateFinished(StateInstance stateInstance, ProcessContext context);
  19.     //Get state machine instance
  20.     //根据状态机实例ID获取状态机实例
  21.     StateMachineInstance getStateMachineInstance(String stateMachineInstanceId);
  22.     //Get state machine instance by businessKey
  23.     //根据业务key获取状态机实例
  24.     StateMachineInstance getStateMachineInstanceByBusinessKey(String businessKey, String tenantId);
  25.     //Query the list of state machine instances by parent id
  26.     //根据父ID查询状态机实例
  27.     List<StateMachineInstance> queryStateMachineInstanceByParentId(String parentId);
  28.     //Get state instance
  29.     //根据状态实例ID和状态机ID获取状态实例
  30.     StateInstance getStateInstance(String stateInstanceId, String machineInstId);
  31.     //Get a list of state instances by state machine instance id
  32.     //根据状态机实例ID查询状态实例
  33.     List<StateInstance> queryStateInstanceListByMachineInstanceId(String stateMachineInstanceId);
  34. }
复制代码
(4)状态机定义的存储组件接口
StateLangStore表示的是状态机定义的存储组件,可以存储和获取不同的状态机定义。
  1. //State language definition store
  2. //状态机定义的存储组件
  3. public interface StateLangStore {
  4.     //Query the state machine definition by id
  5.     //根据状态机ID获取状态机定义
  6.     StateMachine getStateMachineById(String stateMachineId);
  7.     //Get the latest version of the state machine by state machine name
  8.     //根据状态机名称和租户ID,可以获取最新版本的状态机定义
  9.     StateMachine getLastVersionStateMachine(String stateMachineName, String tenantId);
  10.     //Storage state machine definition
  11.     //存储状态机定义
  12.     boolean storeStateMachine(StateMachine stateMachine);
  13. }
复制代码
(5)表达式工厂管理器
ExpressionFactoryManager表示的是表达式工厂管理器,可以根据不同的表达式类型获取不同的表达式工厂组件。
  1. //Expression factory manager
  2. //表达式工厂管理器
  3. public class ExpressionFactoryManager {
  4.     public static final String DEFAULT_EXPRESSION_TYPE = "Default";
  5.     //表达式类型 -> 表达式工厂
  6.     private Map<String, ExpressionFactory> expressionFactoryMap = new ConcurrentHashMap<>();
  7.    
  8.     //根据不同的表达式类型,去获取不同的表达式工厂组件
  9.     public ExpressionFactory getExpressionFactory(String expressionType) {
  10.         if (StringUtils.isBlank(expressionType)) {
  11.             expressionType = DEFAULT_EXPRESSION_TYPE;
  12.         }
  13.         return expressionFactoryMap.get(expressionType);
  14.     }
  15.    
  16.     public void setExpressionFactoryMap(Map<String, ExpressionFactory> expressionFactoryMap) {
  17.         this.expressionFactoryMap.putAll(expressionFactoryMap);
  18.     }
  19.    
  20.     public void putExpressionFactory(String type, ExpressionFactory factory) {
  21.         this.expressionFactoryMap.put(type, factory);
  22.     }
  23. }
复制代码
(6)状态机定义的仓储组件
StateMachineRepository表示的是状态机定义的仓储组件,可以注册和获取一个状态机定义。
  1. //StateMachineRepository
  2. //状态机定义的仓储组件
  3. public interface StateMachineRepository {
  4.     //Gets get state machine by id.
  5.     //根据状态机ID获取一个状态机定义StateMachine
  6.     StateMachine getStateMachineById(String stateMachineId);
  7.     //Gets get state machine.
  8.     //根据状态机名字、租户ID获取一个状态机定义StateMachine
  9.     StateMachine getStateMachine(String stateMachineName, String tenantId);
  10.     //Gets get state machine.
  11.     //根据状态机名字、租户ID、版本号获取一个状态机定义StateMachine
  12.     StateMachine getStateMachine(String stateMachineName, String tenantId, String version);
  13.     //Register the state machine to the repository (if the same version already exists, return the existing version)
  14.     //向状态机定义的仓储组件注册一个状态机定义
  15.     StateMachine registryStateMachine(StateMachine stateMachine);
  16.     //registry by resources
  17.     //向状态机定义的仓储组件注册资源
  18.     void registryByResources(Resource[] resources, String tenantId) throws IOException;
  19. }
复制代码
 
8.默认的状态机配置类的初始化
DefaultStateMachineConfig是默认的状态机配置类,基于数据库的状态机配置组件DbStateMachineConfig是它的子类。它可以支持不同种类的状态机配置,也就是状态机定义、状态机实例、状态实例,既可以放在数据库里,也可以放在其他存储里,默认是放在数据库里的。
  1. //Default state machine configuration
  2. //DefaultStateMachineConfig会封装(注入)状态机运行时需要的所有组件
  3. //StateMachineConfig可以获取到状态机需要的各种组件
  4. //ApplicationContextAware可以感知Spring容器上下文
  5. //InitializingBean可以对Spring Bean进行初始化
  6. public class DefaultStateMachineConfig implements StateMachineConfig, ApplicationContextAware, InitializingBean {
  7.     private static final int DEFAULT_TRANS_OPER_TIMEOUT     = 60000 * 30;
  8.     private static final int DEFAULT_SERVICE_INVOKE_TIMEOUT = 60000 * 5;
  9.     //事务操作的超时时间,默认是30分钟
  10.     private int transOperationTimeout = DEFAULT_TRANS_OPER_TIMEOUT;
  11.     //服务调用的超时时间,默认是5分钟
  12.     private int serviceInvokeTimeout  = DEFAULT_SERVICE_INVOKE_TIMEOUT;
  13.     //状态机实例和状态实例的仓储组件
  14.     private StateLogRepository stateLogRepository;
  15.     //状态机实例日志和状态实例日志的存储组件
  16.     private StateLogStore stateLogStore;
  17.     //状态机定义的存储组件
  18.     private StateLangStore stateLangStore;
  19.     //表达式工厂管理器
  20.     private ExpressionFactoryManager expressionFactoryManager;
  21.     //表达式计算的工厂管理器
  22.     private EvaluatorFactoryManager evaluatorFactoryManager;
  23.     //状态机定义的仓储组件
  24.     private StateMachineRepository stateMachineRepository;
  25.     //State执行结果的判定策略组件
  26.     private StatusDecisionStrategy statusDecisionStrategy;
  27.     //序号生成器
  28.     private SeqGenerator seqGenerator;
  29.     //流程控制的事件发布器(同步发布)
  30.     private ProcessCtrlEventPublisher syncProcessCtrlEventPublisher;
  31.     //流程控制的事件发布器(异步发布)
  32.     private ProcessCtrlEventPublisher asyncProcessCtrlEventPublisher;
  33.     //Spring容器上下文
  34.     private ApplicationContext applicationContext;
  35.     //异步执行的线程池
  36.     private ThreadPoolExecutor threadPoolExecutor;
  37.     //是否启用异步执行,默认是false
  38.     private boolean enableAsync = false;
  39.     //服务调用的管理器
  40.     private ServiceInvokerManager serviceInvokerManager;
  41.     //是否自动注册资源(自动注册流程定义),默认是true
  42.     private boolean autoRegisterResources = true;
  43.     //默认的状态机定义文件(json文件)
  44.     private String[] resources = new String[]{"classpath*:seata/saga/statelang/**/*.json"};
  45.     //字符集编码
  46.     private String charset = "UTF-8";
  47.     //默认租户ID
  48.     private String defaultTenantId = "000001";
  49.     //脚本引擎管理器
  50.     private ScriptEngineManager scriptEngineManager;
  51.     //状态机定义文件(json文件)解析器
  52.     private String sagaJsonParser = DEFAULT_SAGA_JSON_PARSER;
  53.     //是否更新Saga重试持久化的模式
  54.     private boolean sagaRetryPersistModeUpdate = DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE;
  55.     //是否更新Saga补偿持久化的模式
  56.     private boolean sagaCompensatePersistModeUpdate = DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE;
  57.     //因为DefaultStateMachineConfig实现了InitializingBean接口
  58.     //所以继承DefaultStateMachineConfig的Spring Bean初始化之后,就会回调afterPropertiesSet()方法
  59.     @Override
  60.     public void afterPropertiesSet() throws Exception {
  61.         init();
  62.     }
  63.     //初始化方法
  64.     protected void init() throws Exception {
  65.         //创建表达式工厂管理器
  66.         if (expressionFactoryManager == null) {
  67.             //创建一个表达式工厂管理器
  68.             expressionFactoryManager = new ExpressionFactoryManager();
  69.             //创建一个Spring EL表达式工厂
  70.             SpringELExpressionFactory springELExpressionFactory = new SpringELExpressionFactory();
  71.             //注入(设置)Spring容器上下文到Spring EL表达式工厂中
  72.             springELExpressionFactory.setApplicationContext(getApplicationContext());
  73.             //将Spring EL表达式工厂放入表达式工厂管理器中
  74.             expressionFactoryManager.putExpressionFactory(ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, springELExpressionFactory);
  75.             //创建一个序号表达式工厂
  76.             SequenceExpressionFactory sequenceExpressionFactory = new SequenceExpressionFactory();
  77.             //注入(设置)序号生成器到序号表达式工厂中
  78.             sequenceExpressionFactory.setSeqGenerator(getSeqGenerator());
  79.             //将序号表达式工厂放入表达式工厂管理器中
  80.             expressionFactoryManager.putExpressionFactory(DomainConstants.EXPRESSION_TYPE_SEQUENCE, sequenceExpressionFactory);
  81.         }
  82.         //创建表达式计算工厂管理器
  83.         if (evaluatorFactoryManager == null) {
  84.             //创建一个表达式计算工厂管理器
  85.             evaluatorFactoryManager = new EvaluatorFactoryManager();
  86.             //创建一个表达式计算工厂,并将该表达式计算工厂放入表达式计算工厂管理器中
  87.             ExpressionEvaluatorFactory expressionEvaluatorFactory = new ExpressionEvaluatorFactory();
  88.             expressionEvaluatorFactory.setExpressionFactory(expressionFactoryManager.getExpressionFactory(ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE));
  89.             evaluatorFactoryManager.putEvaluatorFactory(EvaluatorFactoryManager.EVALUATOR_TYPE_DEFAULT, expressionEvaluatorFactory);
  90.             //创建一个异常匹配计算工厂,并将该异常匹配计算工厂放入表达式计算工厂管理器中
  91.             evaluatorFactoryManager.putEvaluatorFactory(DomainConstants.EVALUATOR_TYPE_EXCEPTION, new ExceptionMatchEvaluatorFactory());
  92.         }
  93.         //创建状态机定义的仓储组件
  94.         if (stateMachineRepository == null) {
  95.             StateMachineRepositoryImpl stateMachineRepository = new StateMachineRepositoryImpl();
  96.             stateMachineRepository.setCharset(charset);//设置字符集编码
  97.             stateMachineRepository.setSeqGenerator(seqGenerator);//设置序号生成器
  98.             stateMachineRepository.setStateLangStore(stateLangStore);//设置状态机定义的存储组件
  99.             stateMachineRepository.setDefaultTenantId(defaultTenantId);//设置默认租户ID
  100.             stateMachineRepository.setJsonParserName(sagaJsonParser);//设置状态机定义文件(json文件)解析器
  101.             this.stateMachineRepository = stateMachineRepository;
  102.         }
  103.         //stateMachineRepository may be overridden, so move `stateMachineRepository.registryByResources()` here.
  104.         //如果需要自动注册资源(比如数据库等),则获取资源并进行注册
  105.         if (autoRegisterResources && ArrayUtils.isNotEmpty(resources)) {
  106.             try {
  107.                 //读取默认的状态机定义文件(json文件),把这个状态机定义注册到状态机定义的仓储组件中
  108.                 Resource[] resources = ResourceUtil.getResources(this.resources);
  109.                 stateMachineRepository.registryByResources(resources, defaultTenantId);
  110.             } catch (IOException e) {
  111.                 LOGGER.error("Load State Language Resources failed.", e);
  112.             }
  113.         }
  114.         //创建状态机实例和状态实例的仓储组件
  115.         if (stateLogRepository == null) {
  116.             StateLogRepositoryImpl stateLogRepositoryImpl = new StateLogRepositoryImpl();
  117.             stateLogRepositoryImpl.setStateLogStore(stateLogStore);
  118.             this.stateLogRepository = stateLogRepositoryImpl;
  119.         }
  120.         //创建State执行结果的判定策略组件
  121.         if (statusDecisionStrategy == null) {
  122.             statusDecisionStrategy = new DefaultStatusDecisionStrategy();
  123.         }
  124.         //创建流程控制的事件发布器(同步发布)
  125.         if (syncProcessCtrlEventPublisher == null) {
  126.             //创建流程控制的事件发布器
  127.             ProcessCtrlEventPublisher syncEventPublisher = new ProcessCtrlEventPublisher();
  128.             //创建流程控制器
  129.             ProcessControllerImpl processorController = createProcessorController(syncEventPublisher);
  130.             //创建流程控制事件的消费者
  131.             ProcessCtrlEventConsumer processCtrlEventConsumer = new ProcessCtrlEventConsumer();
  132.             processCtrlEventConsumer.setProcessController(processorController);
  133.             //创建事件总线
  134.             DirectEventBus directEventBus = new DirectEventBus();
  135.             syncEventPublisher.setEventBus(directEventBus);
  136.             directEventBus.registerEventConsumer(processCtrlEventConsumer);
  137.             syncProcessCtrlEventPublisher = syncEventPublisher;
  138.         }
  139.         //如果启用了异步化执行 且 流程控制的事件发布器(异步发布)为null
  140.         if (enableAsync && asyncProcessCtrlEventPublisher == null) {
  141.             ProcessCtrlEventPublisher asyncEventPublisher = new ProcessCtrlEventPublisher();
  142.             ProcessControllerImpl processorController = createProcessorController(asyncEventPublisher);
  143.             ProcessCtrlEventConsumer processCtrlEventConsumer = new ProcessCtrlEventConsumer();
  144.             processCtrlEventConsumer.setProcessController(processorController);
  145.             AsyncEventBus asyncEventBus = new AsyncEventBus();
  146.             asyncEventBus.setThreadPoolExecutor(getThreadPoolExecutor());
  147.             asyncEventPublisher.setEventBus(asyncEventBus);
  148.             asyncEventBus.registerEventConsumer(processCtrlEventConsumer);
  149.             asyncProcessCtrlEventPublisher = asyncEventPublisher;
  150.         }
  151.         //创建服务调用管理器
  152.         if (this.serviceInvokerManager == null) {
  153.             this.serviceInvokerManager = new ServiceInvokerManager();
  154.             //创建Spring Bean的服务调用组件
  155.             SpringBeanServiceInvoker springBeanServiceInvoker = new SpringBeanServiceInvoker();
  156.             springBeanServiceInvoker.setApplicationContext(getApplicationContext());
  157.             springBeanServiceInvoker.setThreadPoolExecutor(threadPoolExecutor);
  158.             springBeanServiceInvoker.setSagaJsonParser(getSagaJsonParser());
  159.             //将Spring Bean的服务调用组件放入到服务调用管理器中
  160.             this.serviceInvokerManager.putServiceInvoker(DomainConstants.SERVICE_TYPE_SPRING_BEAN, springBeanServiceInvoker);
  161.         }
  162.         //创建脚本引擎管理器
  163.         if (this.scriptEngineManager == null) {
  164.             this.scriptEngineManager = new ScriptEngineManager();
  165.         }
  166.     }
  167.     protected ProcessControllerImpl createProcessorController(ProcessCtrlEventPublisher eventPublisher) throws Exception {
  168.         //创建状态机流程处理的路由器
  169.         StateMachineProcessRouter stateMachineProcessRouter = new StateMachineProcessRouter();
  170.         stateMachineProcessRouter.initDefaultStateRouters();
  171.         //通过SPI机制加载状态机流程处理路由的拦截器
  172.         loadStateRouterInterceptors(stateMachineProcessRouter.getStateRouters());
  173.         //创建状态机流程处理器
  174.         StateMachineProcessHandler stateMachineProcessHandler = new StateMachineProcessHandler();
  175.         stateMachineProcessHandler.initDefaultHandlers();
  176.         //通过SPI机制加载状态机流程处理器的拦截器
  177.         loadStateHandlerInterceptors(stateMachineProcessHandler.getStateHandlers());
  178.         //创建默认的路由处理器,并设置事件发布器
  179.         DefaultRouterHandler defaultRouterHandler = new DefaultRouterHandler();
  180.         defaultRouterHandler.setEventPublisher(eventPublisher);
  181.         //创建状态机流程处理路由器对应的Map
  182.         Map<String, ProcessRouter> processRouterMap = new HashMap<>(1);
  183.         processRouterMap.put(ProcessType.STATE_LANG.getCode(), stateMachineProcessRouter);
  184.         defaultRouterHandler.setProcessRouters(processRouterMap);
  185.         //创建自定义的业务处理器
  186.         CustomizeBusinessProcessor customizeBusinessProcessor = new CustomizeBusinessProcessor();
  187.         //创建状态机流程处理器对应的Map
  188.         Map<String, ProcessHandler> processHandlerMap = new HashMap<>(1);
  189.         processHandlerMap.put(ProcessType.STATE_LANG.getCode(), stateMachineProcessHandler);
  190.         customizeBusinessProcessor.setProcessHandlers(processHandlerMap);
  191.         //创建路由处理器对应的Map
  192.         Map<String, RouterHandler> routerHandlerMap = new HashMap<>(1);
  193.         routerHandlerMap.put(ProcessType.STATE_LANG.getCode(), defaultRouterHandler);
  194.         customizeBusinessProcessor.setRouterHandlers(routerHandlerMap);
  195.         //创建流程控制器
  196.         ProcessControllerImpl processorController = new ProcessControllerImpl();
  197.         processorController.setBusinessProcessor(customizeBusinessProcessor);
  198.         return processorController;
  199.     }
  200.     protected void loadStateHandlerInterceptors(Map<String, StateHandler> stateHandlerMap) {
  201.         for (StateHandler stateHandler : stateHandlerMap.values()) {
  202.             if (stateHandler instanceof InterceptableStateHandler) {
  203.                 InterceptableStateHandler interceptableStateHandler = (InterceptableStateHandler) stateHandler;
  204.                 List<StateHandlerInterceptor> interceptorList = EnhancedServiceLoader.loadAll(StateHandlerInterceptor.class);
  205.                 for (StateHandlerInterceptor interceptor : interceptorList) {
  206.                     if (interceptor.match(interceptableStateHandler.getClass())) {
  207.                         interceptableStateHandler.addInterceptor(interceptor);
  208.                     }
  209.                     if (interceptor instanceof ApplicationContextAware) {
  210.                         ((ApplicationContextAware) interceptor).setApplicationContext(getApplicationContext());
  211.                     }
  212.                 }
  213.             }
  214.         }
  215.     }
  216.     protected void loadStateRouterInterceptors(Map<String, StateRouter> stateRouterMap) {
  217.         for (StateRouter stateRouter : stateRouterMap.values()) {
  218.             if (stateRouter instanceof InterceptableStateRouter) {
  219.                 InterceptableStateRouter interceptableStateRouter = (InterceptableStateRouter) stateRouter;
  220.                 List<StateRouterInterceptor> interceptorList = EnhancedServiceLoader.loadAll(StateRouterInterceptor.class);
  221.                 for (StateRouterInterceptor interceptor : interceptorList) {
  222.                     if (interceptor.match(interceptableStateRouter.getClass())) {
  223.                         interceptableStateRouter.addInterceptor(interceptor);
  224.                     }
  225.                     if (interceptor instanceof ApplicationContextAware) {
  226.                         ((ApplicationContextAware) interceptor).setApplicationContext(getApplicationContext());
  227.                     }
  228.                 }
  229.             }
  230.         }
  231.     }
  232.     ...
  233. }
复制代码
 
9.状态机定义的仓储组件解析状态机定义文件
在DefaultStateMachineConfig的init()方法初始化逻辑中,会通过调用状态机定义的仓储组件StateMachineRepository.registryByResources()方法来读取和解析状态机定义文件,并把解析出来的状态机定义StateMachine注册到状态机定义的仓储组件中,也就是存储在stateMachineRepository变量里。
 
注意:StateMachine就是状态机定义。
8.png
  1. public class StateMachineRepositoryImpl implements StateMachineRepository {
  2.     ...
  3.     @Override
  4.     public void registryByResources(Resource[] resources, String tenantId) throws IOException {
  5.         if (resources != null) {
  6.             for (Resource resource : resources) {
  7.                 //1.基于IO流把状态机定义文件(json文件)读取出来,放入json字符串
  8.                 String json;
  9.                 try (InputStream is = resource.getInputStream()) {
  10.                     json = IOUtils.toString(is, charset);
  11.                 }
  12.                 //2.从状态机解析器工厂中获取一个状态机解析器,然后解析json字符串得到一个状态机定义StateMachine
  13.                 StateMachine stateMachine = StateMachineParserFactory.getStateMachineParser(jsonParserName).parse(json);
  14.                 if (stateMachine != null) {
  15.                     stateMachine.setContent(json);
  16.                     if (StringUtils.isBlank(stateMachine.getTenantId())) {
  17.                         stateMachine.setTenantId(tenantId);
  18.                     }
  19.                     //3.注册状态机定义StateMachine
  20.                     registryStateMachine(stateMachine);
  21.                     if (LOGGER.isDebugEnabled()) {
  22.                         LOGGER.debug("===== StateMachine Loaded: \n{}", json);
  23.                     }
  24.                 }
  25.             }
  26.         }
  27.     }
  28.     ...
  29. }
复制代码
 
10.状态机定义的仓储组件注册StateMachine
9.png
  1. public class StateMachineRepositoryImpl implements StateMachineRepository {
  2.     private Map<String/** Name_Tenant **/, Item> stateMachineMapByNameAndTenant = new ConcurrentHashMap<>();
  3.     private Map<String/** Id **/, Item> stateMachineMapById = new ConcurrentHashMap<>();
  4.     private StateLangStore stateLangStore;//状态机定义的存储组件
  5.     ...
  6.    
  7.     @Override
  8.     public StateMachine registryStateMachine(StateMachine stateMachine) {
  9.         String stateMachineName = stateMachine.getName();
  10.         String tenantId = stateMachine.getTenantId();
  11.         if (stateLangStore != null) {
  12.             //1.从状态机定义的存储组件中,根据状态机名称和租户ID,获取最新版本的状态机定义StateMachine
  13.             StateMachine oldStateMachine = stateLangStore.getLastVersionStateMachine(stateMachineName, tenantId);
  14.             if (oldStateMachine != null) {
  15.                 byte[] oldBytesContent = null;
  16.                 byte[] bytesContent = null;
  17.                 try {
  18.                     oldBytesContent = oldStateMachine.getContent().getBytes(charset);
  19.                     bytesContent = stateMachine.getContent().getBytes(charset);
  20.                 } catch (UnsupportedEncodingException e) {
  21.                     LOGGER.error(e.getMessage(), e);
  22.                 }
  23.                 if (Arrays.equals(bytesContent, oldBytesContent) && stateMachine.getVersion() != null && stateMachine.getVersion().equals(oldStateMachine.getVersion())) {
  24.                     LOGGER.info("StateMachine[{}] is already exist a same version", stateMachineName);
  25.                     stateMachine.setId(oldStateMachine.getId());
  26.                     stateMachine.setGmtCreate(oldStateMachine.getGmtCreate());
  27.                     Item item = new Item(stateMachine);
  28.                     stateMachineMapByNameAndTenant.put(stateMachineName + "_" + tenantId, item);
  29.                     stateMachineMapById.put(stateMachine.getId(), item);
  30.                     return stateMachine;
  31.                 }
  32.             }
  33.             if (StringUtils.isBlank(stateMachine.getId())) {
  34.                 //2.生成新的状态机定义StateMachine的ID
  35.                 stateMachine.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE));
  36.             }
  37.             stateMachine.setGmtCreate(new Date());
  38.             //3.通过状态机定义的存储组件,把状态机定义StateMachine保存起来,默认就是把状态机定义StateMachine存储到DB中
  39.             stateLangStore.storeStateMachine(stateMachine);
  40.         }
  41.         if (StringUtils.isBlank(stateMachine.getId())) {
  42.             stateMachine.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE));
  43.         }
  44.         Item item = new Item(stateMachine);
  45.         //4.把状态机定义放入内存中
  46.         stateMachineMapByNameAndTenant.put(stateMachineName + "_" + tenantId, item);
  47.         stateMachineMapById.put(stateMachine.getId(), item);
  48.         return stateMachine;
  49.     }
  50.     ...
  51. }
  52. public class DbStateLangStore extends AbstractStore implements StateLangStore {
  53.     ...
  54.     @Override
  55.     public boolean storeStateMachine(StateMachine stateMachine) {
  56.         //把状态机定义StateMachine写入到DB中
  57.         return executeUpdate(stateLangStoreSqls.getInsertStateMachineSql(dbType), STATE_MACHINE_TO_STATEMENT, stateMachine) > 0;
  58.     }
  59.     ...
  60. }
复制代码
 
11.状态机引擎接口的源码
从seata-saga.xml配置文件可知,基于数据库的状态机配置实例DbStateMachineConfig会被注入到基于流程控制的状态机引擎实例ProcessCtrlStateMachineEngine中。
  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.        xmlns:jdbc="http://www.springframework.org/schema/jdbc"
  3.        xmlns="http://www.springframework.org/schema/beans"
  4.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  5.        http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd"
  6.        default-autowire="byName">
  7.     ...
  8.     <bean id="stateMachineEngine" >
  9.         <property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
  10.     </bean>
  11.     ...
  12. </beans>
复制代码
基于流程控制的状态机引擎ProcessCtrlStateMachineEngine,会实现状态机引擎接口StateMachineEngine。
  1. //ProcessCtrl-based state machine engine
  2. //基于流程控制的状态机引擎
  3. public class ProcessCtrlStateMachineEngine implements StateMachineEngine {
  4.     //需要在seata-saga.xml配置文件中注入状态机配置
  5.     private StateMachineConfig stateMachineConfig;
  6.     ...
  7. }
  8. //State machine engine
  9. //状态机引擎接口
  10. public interface StateMachineEngine {
  11.     //start a state machine instance
  12.     //根据状态机的名称、租户ID、启动参数来启动一个状态机实例
  13.     StateMachineInstance start(String stateMachineName, String tenantId, Map<String, Object> startParams) throws EngineExecutionException;
  14.     //start a state machine instance with businessKey
  15.     //根据状态机的名称、租户ID、业务key、启动参数来启动一个状态机实例
  16.     StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) throws EngineExecutionException;
  17.     //start a state machine instance asynchronously
  18.     //根据状态机的名称、租户ID、启动参数来启动一个状态机实例
  19.     //也就是状态机实例跑完之后,会回调传入的callback()方法
  20.     StateMachineInstance startAsync(String stateMachineName, String tenantId, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;
  21.     //start a state machine instance asynchronously with businessKey
  22.     //根据状态机的名称、租户ID、业务key、启动参数来异步化启动一个状态机实例
  23.     //也就是状态机实例跑完之后,会回调传入的callback()方法
  24.     StateMachineInstance startWithBusinessKeyAsync(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;
  25.     //forward restart a failed state machine instance
  26.     //重启一个失败的状态机实例
  27.     StateMachineInstance forward(String stateMachineInstId, Map<String, Object> replaceParams) throws ForwardInvalidException;
  28.     //forward restart a failed state machine instance asynchronously
  29.     //异步化重启一个失败的状态机实例
  30.     StateMachineInstance forwardAsync(String stateMachineInstId, Map<String, Object> replaceParams, AsyncCallback callback) throws ForwardInvalidException;
  31.     //compensate a state machine instance
  32.     //对一个状态机实例进行补偿
  33.     StateMachineInstance compensate(String stateMachineInstId, Map<String, Object> replaceParams) throws EngineExecutionException;
  34.     //compensate a state machine instance asynchronously
  35.     //对一个状态机实例进行异步化补偿
  36.     StateMachineInstance compensateAsync(String stateMachineInstId, Map<String, Object> replaceParams, AsyncCallback callback) throws EngineExecutionException;
  37.     //skip current failed state instance and forward restart state machine instance
  38.     //跳过当前失败的状态机实例,同时重启一个状态机实例
  39.     StateMachineInstance skipAndForward(String stateMachineInstId, Map<String, Object> replaceParams) throws EngineExecutionException;
  40.     //skip current failed state instance and forward restart state machine instance asynchronously
  41.     StateMachineInstance skipAndForwardAsync(String stateMachineInstId, AsyncCallback callback) throws EngineExecutionException;
  42.     //get state machine configurations
  43.     StateMachineConfig getStateMachineConfig();
  44.     //Reload StateMachine Instance
  45.     StateMachineInstance reloadStateMachineInstance(String instId);
  46. }
复制代码
 
12.状态机引擎创建状态机实例的源码
基于流程控制的状态机引擎实例ProcessCtrlStateMachineEngine的start()方法调用其startInternal()方法启动一个状态机实例时,会先创建一个状态机实例StateMachineInstance。
 
在调用ProcessCtrlStateMachineEngine的createMachineInstance()方法创建一个状态机实例StateMachineInstance的过程中,会先通过状态机定义的仓储组件StateMachineRepository来获取一个状态机定义StateMachine,然后将状态机定义StateMachine注入到状态机实例对象中,以此来完成状态机实例对象的实例化。
10.png
  1. //ProcessCtrl-based state machine engine
  2. //基于流程控制的状态机引擎
  3. public class ProcessCtrlStateMachineEngine implements StateMachineEngine {
  4.     //需要在seata-saga.xml配置文件中注入状态机配置实例
  5.     private StateMachineConfig stateMachineConfig;
  6.     ...
  7.    
  8.     @Override
  9.     public StateMachineInstance start(String stateMachineName, String tenantId, Map<String, Object> startParams) throws EngineExecutionException {
  10.         return startInternal(stateMachineName, tenantId, null, startParams, false, null);
  11.     }
  12.    
  13.     @Override
  14.     public StateMachineInstance startAsync(String stateMachineName, String tenantId, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException {
  15.         return startInternal(stateMachineName, tenantId, null, startParams, true, callback);
  16.     }
  17.    
  18.     @Override
  19.     public StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) throws EngineExecutionException {
  20.         return startInternal(stateMachineName, tenantId, businessKey, startParams, false, null);
  21.     }
  22.    
  23.     @Override
  24.     public StateMachineInstance startWithBusinessKeyAsync(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException {
  25.         return startInternal(stateMachineName, tenantId, businessKey, startParams, true, callback);
  26.     }
  27.     ...
  28.    
  29.     //启动状态机实例StateMachineInstance
  30.     //@param stateMachineName 状态机名称
  31.     //@param tenantId 租户ID
  32.     //@param businessKey 业务key
  33.     //@param startParams 状态机实例的启动参数
  34.     //@param async 是否异步化运行
  35.     //@param callback 异步化运行时的回调接口
  36.     private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {
  37.         try {
  38.             //如果指定需要异步运行,但是状态机配置里是不允许异步运行的,则会抛异常
  39.             if (async && !stateMachineConfig.isEnableAsync()) {
  40.                 throw new EngineExecutionException("Asynchronous start is disabled. please set StateMachineConfig.enableAsync=true first.", FrameworkErrorCode.AsynchronousStartDisabled);
  41.             }
  42.             if (StringUtils.isEmpty(tenantId)) {
  43.                 tenantId = stateMachineConfig.getDefaultTenantId();
  44.             }
  45.             //创建状态机实例StateMachineInstance
  46.             StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);
  47.             ...
  48.         }
  49.         ...
  50.     }
  51.    
  52.     private StateMachineInstance createMachineInstance(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) {
  53.         //通过状态机定义的仓储组件StateMachineRepository,来获取一个状态机定义StateMachine
  54.         StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(stateMachineName, tenantId);
  55.         if (stateMachine == null) {
  56.             throw new EngineExecutionException("StateMachine[" + stateMachineName + "] is not exists", FrameworkErrorCode.ObjectNotExists);
  57.         }
  58.         //根据状态机定义StateMachine,实例化一个状态机实例对象StateMachineInstanceImpl
  59.         StateMachineInstanceImpl inst = new StateMachineInstanceImpl();
  60.         inst.setStateMachine(stateMachine);
  61.         inst.setMachineId(stateMachine.getId());
  62.         inst.setTenantId(tenantId);
  63.         inst.setBusinessKey(businessKey);
  64.         //设置状态机实例的启动参数
  65.         inst.setStartParams(startParams);
  66.         if (startParams != null) {
  67.             if (StringUtils.hasText(businessKey)) {
  68.                 startParams.put(DomainConstants.VAR_NAME_BUSINESSKEY, businessKey);
  69.             }
  70.             String parentId = (String)startParams.get(DomainConstants.VAR_NAME_PARENT_ID);
  71.             if (StringUtils.hasText(parentId)) {
  72.                 inst.setParentId(parentId);
  73.                 startParams.remove(DomainConstants.VAR_NAME_PARENT_ID);
  74.             }
  75.         }
  76.         inst.setStatus(ExecutionStatus.RU);
  77.         inst.setRunning(true);
  78.         inst.setGmtStarted(new Date());
  79.         inst.setGmtUpdated(inst.getGmtStarted());
  80.         return inst;
  81.     }
  82.     ...
  83. }
复制代码
 
13.ProcessContextBuilder构建流程上下文
ProcessCtrlStateMachineEngine.startInternal()方法在执行过程中会创建一个流程上下文构造器ProcessContextBuilder实例,然后根据这个流程上下文构造器ProcessContextBuilder构建出一个流程上下文ProcessContext。
11.png
  1. public class ProcessCtrlStateMachineEngine implements StateMachineEngine {
  2.     ...
  3.     //启动状态机实例StateMachineInstance
  4.     //@param stateMachineName 状态机名称
  5.     //@param tenantId 租户ID
  6.     //@param businessKey 业务key
  7.     //@param startParams 状态机实例的启动参数
  8.     //@param async 是否异步化运行
  9.     //@param callback 异步化运行时的回调接口
  10.     private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {
  11.         try {
  12.             //如果指定需要异步运行,但是状态机配置里是不允许异步运行的,则会抛异常
  13.             if (async && !stateMachineConfig.isEnableAsync()) {
  14.                 throw new EngineExecutionException("Asynchronous start is disabled. please set StateMachineConfig.enableAsync=true first.", FrameworkErrorCode.AsynchronousStartDisabled);
  15.             }
  16.             if (StringUtils.isEmpty(tenantId)) {
  17.                 tenantId = stateMachineConfig.getDefaultTenantId();
  18.             }
  19.             //创建状态机实例StateMachineInstance
  20.             StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);
  21.             //创建一个流程上下文构造器ProcessContextBuilder实例,用来构造流程运行时的上下文
  22.             ProcessContextBuilder contextBuilder = ProcessContextBuilder.create()
  23.                 .withProcessType(ProcessType.STATE_LANG)//设置流程类型
  24.                 .withOperationName(DomainConstants.OPERATION_NAME_START)//设置操作名称
  25.                 .withAsyncCallback(callback)//设置异步化时的回调接口
  26.                 .withInstruction(new StateInstruction(stateMachineName, tenantId))//设置状态获取组件
  27.                 .withStateMachineInstance(instance)//设置状态机实例
  28.                 .withStateMachineConfig(getStateMachineConfig())//设置状态机配置
  29.                 .withStateMachineEngine(this);//设置状态机引擎
  30.             //上下文变量Map
  31.             Map<String, Object> contextVariables;
  32.             if (startParams != null) {
  33.                 contextVariables = new ConcurrentHashMap<>(startParams.size());
  34.                 nullSafeCopy(startParams, contextVariables);
  35.             } else {
  36.                 contextVariables = new ConcurrentHashMap<>();
  37.             }
  38.             instance.setContext(contextVariables);
  39.             //设置流程上下文构造器ProcessContextBuilder实例
  40.             contextBuilder.withStateMachineContextVariables(contextVariables);
  41.             contextBuilder.withIsAsyncExecution(async);
  42.             //通过流程上下文构造器ProcessContextBuilder构建出一个流程上下文ProcessContext
  43.             ProcessContext processContext = contextBuilder.build();
  44.             //如果状态机定义StateMachine是支持持久化的 且 状态日志的存储组件不为null
  45.             if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {
  46.                 //通过状态机实例日志和状态实例日志的存储组件StateLogStore,记录状态机实例StateMachineInstance的启动事件日志 + 开启全局事务
  47.                 //比如在DB中更新状态机实例StateMachineInstance的启动状态
  48.                 stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
  49.             }
  50.             if (StringUtils.isEmpty(instance.getId())) {
  51.                 //生成状态机实例StateMachineInstance的序号
  52.                 instance.setId(stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
  53.             }
  54.             StateInstruction stateInstruction = processContext.getInstruction(StateInstruction.class);
  55.             Loop loop = LoopTaskUtils.getLoopConfig(processContext, stateInstruction.getState(processContext));
  56.             if (null != loop) {
  57.                 stateInstruction.setTemporaryState(new LoopStartStateImpl());
  58.             }
  59.             if (async) {
  60.                 stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);
  61.             } else {
  62.                 stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);
  63.             }
  64.             return instance;
  65.         } finally {
  66.             if (stateMachineConfig.getStateLogStore() != null) {
  67.                 stateMachineConfig.getStateLogStore().clearUp();
  68.             }
  69.         }
  70.     }
  71.     ...
  72. }
复制代码
 
14.StateLogStore记录日志和开启Saga全局事务
在状态机引擎ProcessCtrlStateMachineEngine的startInternal()方法中,会通过调用StateLogStore的recordStateMachineStarted()方法,记录状态机实例StateMachineInstance的启动事件到DB,以及开启Saga全局事务。
12.png
  1. public class DbAndReportTcStateLogStore extends AbstractStore implements StateLogStore {
  2.     private static final Logger LOGGER = LoggerFactory.getLogger(DbAndReportTcStateLogStore.class);
  3.    
  4.                 //插入状态机实例到数据库的SQL语句
  5.     private static final StateMachineInstanceToStatementForInsert STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_INSERT = new StateMachineInstanceToStatementForInsert();
  6.     //更新数据库状态机实例的SQL语句
  7.     private static final StateMachineInstanceToStatementForUpdate STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_UPDATE = new StateMachineInstanceToStatementForUpdate();
  8.     //查询数据库中的状态机实例的SQL语句
  9.     private static final ResultSetToStateMachineInstance RESULT_SET_TO_STATE_MACHINE_INSTANCE = new ResultSetToStateMachineInstance();
  10.     //插入状态实例到数据库的SQL语句
  11.     private static final StateInstanceToStatementForInsert STATE_INSTANCE_TO_STATEMENT_FOR_INSERT = new StateInstanceToStatementForInsert();
  12.     //更新数据库中的状态实例的SQL语句
  13.     private static final StateInstanceToStatementForUpdate STATE_INSTANCE_TO_STATEMENT_FOR_UPDATE = new StateInstanceToStatementForUpdate();
  14.     //查询数据库中的状态实例的SQL语句
  15.     private static final ResultSetToStateInstance RESULT_SET_TO_STATE_INSTANCE = new ResultSetToStateInstance();
  16.     //Saga全局事务模版
  17.     private SagaTransactionalTemplate sagaTransactionalTemplate;
  18.     //参数序列化组件
  19.     private Serializer<Object, String> paramsSerializer = new ParamsSerializer();
  20.     //异常序列化组件
  21.     private Serializer<Exception, byte[]> exceptionSerializer = new ExceptionSerializer();
  22.     //状态日志的存储SQL语句
  23.     private StateLogStoreSqls stateLogStoreSqls;
  24.     //默认的租户ID
  25.     private String defaultTenantId;
  26.     //序号生成器
  27.     private SeqGenerator seqGenerator;
  28.     //记录状态机实例的启动事件日志到DB + 开启Saga全局事务
  29.     @Override
  30.     public void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context) {
  31.         if (machineInstance != null) {
  32.             //if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction, use parent transaction instead.
  33.             //如果parentId是空的,那么当前事务就是Saga分布式事务的启动入口;
  34.             //如果parentId不是空的,则已经有服务使用状态机启动了Saga分布式事务;
  35.             String parentId = machineInstance.getParentId();
  36.             if (StringUtils.isEmpty(parentId)) {
  37.                 //1.开启Saga全局事务
  38.                 beginTransaction(machineInstance, context);
  39.             }
  40.             try {
  41.                 if (StringUtils.isEmpty(machineInstance.getId()) && seqGenerator != null) {
  42.                     machineInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
  43.                 }
  44.                 //bind SAGA branch type
  45.                 RootContext.bindBranchType(BranchType.SAGA);
  46.                 //save to db
  47.                 machineInstance.setSerializedStartParams(paramsSerializer.serialize(machineInstance.getStartParams()));
  48.                 //2.记录日志到DB
  49.                 int effect = executeUpdate(stateLogStoreSqls.getRecordStateMachineStartedSql(dbType), STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_INSERT, machineInstance);
  50.                 if (effect < 1) {
  51.                     throw new StoreException("StateMachineInstance record start error, Xid: " + machineInstance.getId(), FrameworkErrorCode.OperationDenied);
  52.                 }
  53.             } catch (StoreException e) {
  54.                 LOGGER.error("Record statemachine start error: {}, StateMachine: {}, XID: {}, Reason: {}", e.getErrcode(), machineInstance.getStateMachine().getName(), machineInstance.getId(), e.getMessage(), e);
  55.                 this.clearUp();
  56.                 throw e;
  57.             }
  58.         }
  59.     }
  60.     protected void beginTransaction(StateMachineInstance machineInstance, ProcessContext context) {
  61.         if (sagaTransactionalTemplate != null) {
  62.             //获取状态机配置
  63.             StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
  64.             //构建事务信息
  65.             TransactionInfo transactionInfo = new TransactionInfo();
  66.             transactionInfo.setTimeOut(stateMachineConfig.getTransOperationTimeout());
  67.             transactionInfo.setName(Constants.SAGA_TRANS_NAME_PREFIX + machineInstance.getStateMachine().getName());
  68.             try {
  69.                 //通过Saga事务模版开启全局事务
  70.                 GlobalTransaction globalTransaction = sagaTransactionalTemplate.beginTransaction(transactionInfo);
  71.                 machineInstance.setId(globalTransaction.getXid());
  72.                 context.setVariable(DomainConstants.VAR_NAME_GLOBAL_TX, globalTransaction);
  73.                 Map<String, Object> machineContext = machineInstance.getContext();
  74.                 if (machineContext != null) {
  75.                     machineContext.put(DomainConstants.VAR_NAME_GLOBAL_TX, globalTransaction);
  76.                 }
  77.             } catch (ExecutionException e) {
  78.                 String xid = null;
  79.                 if (e.getTransaction() != null) {
  80.                     xid = e.getTransaction().getXid();
  81.                 }
  82.                 throw new EngineExecutionException(e, e.getCode() + ", TransName:" + transactionInfo.getName() + ", XID: " + xid + ", Reason: " + e.getMessage(), FrameworkErrorCode.TransactionManagerError);
  83.             } finally {
  84.                 if (Boolean.TRUE.equals(context.getVariable(DomainConstants.VAR_NAME_IS_ASYNC_EXECUTION))) {
  85.                     RootContext.unbind();
  86.                     RootContext.unbindBranchType();
  87.                 }
  88.             }
  89.         }
  90.     }
  91.     ...
  92. }
  93. public class StateLogStoreSqls {
  94.     private String recordStateMachineStartedSql;
  95.     private static final String RECORD_STATE_MACHINE_STARTED_SQL = "INSERT INTO ${TABLE_PREFIX}state_machine_inst\n"
  96.         + "(id, machine_id, tenant_id, parent_id, gmt_started, business_key, start_params, is_running, status, "
  97.         + "gmt_updated)\n"
  98.         + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
  99.     ...
  100.    
  101.     public StateLogStoreSqls(String tablePrefix) {
  102.         this.tablePrefix = tablePrefix;
  103.         init();
  104.     }
  105.    
  106.     private void init() {
  107.         recordStateMachineStartedSql = RECORD_STATE_MACHINE_STARTED_SQL.replaceAll(TABLE_PREFIX_REGEX, tablePrefix);
  108.         ...
  109.     }
  110.    
  111.     public String getRecordStateMachineStartedSql(String dbType) {
  112.         return recordStateMachineStartedSql;
  113.     }
  114.     ...
  115. }
  116. public class AbstractStore {
  117.     protected DataSource dataSource;
  118.     ...
  119.    
  120.     protected <T> int executeUpdate(String sql, ObjectToStatement<T> objectToStatement, T o) {
  121.         Connection connection = null;
  122.         PreparedStatement stmt = null;
  123.         try {
  124.             connection = dataSource.getConnection();
  125.             if (LOGGER.isDebugEnabled()) {
  126.                 LOGGER.debug("Preparing SQL: {}", sql);
  127.             }
  128.             stmt = connection.prepareStatement(sql);
  129.             if (LOGGER.isDebugEnabled()) {
  130.                 LOGGER.debug("setting params to PreparedStatement: {}", BeanUtils.beanToString(o));
  131.             }
  132.             objectToStatement.toStatement(o, stmt);
  133.             int count = stmt.executeUpdate();
  134.             if (!connection.getAutoCommit()) {
  135.                 connection.commit();
  136.             }
  137.             return count;
  138.         } catch (SQLException e) {
  139.             throw new StoreException(e);
  140.         } finally {
  141.             closeSilent(stmt);
  142.             closeSilent(connection);
  143.         }
  144.     }
  145.     ...
  146. }
  147. public class DefaultSagaTransactionalTemplate implements SagaTransactionalTemplate, ApplicationContextAware, DisposableBean, InitializingBean {
  148.     ...
  149.     @Override
  150.     public GlobalTransaction beginTransaction(TransactionInfo txInfo) throws TransactionalExecutor.ExecutionException {
  151.         GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
  152.         try {
  153.             triggerBeforeBegin();
  154.             //开启一个全局会话
  155.             tx.begin(txInfo.getTimeOut(), txInfo.getName());
  156.             triggerAfterBegin();
  157.         } catch (TransactionException txe) {
  158.             throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);
  159.         }
  160.         return tx;
  161.     }
  162.     ...
  163. }
复制代码
执行GlobalTransaction的begin()方法开启一个全局会话:
  1. //The type Default global transaction.
  2. //默认的全局事务
  3. public class DefaultGlobalTransaction implements GlobalTransaction {
  4.     private TransactionManager transactionManager;
  5.     private String xid;
  6.     private GlobalStatus status;
  7.     private GlobalTransactionRole role;
  8.     ...
  9.    
  10.     DefaultGlobalTransaction() {
  11.         //全局事务角色是全局事务发起者
  12.         this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
  13.     }
  14.    
  15.     //Instantiates a new Default global transaction.
  16.     DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
  17.         this.transactionManager = TransactionManagerHolder.get();//全局事务管理者
  18.         this.xid = xid;
  19.         this.status = status;
  20.         this.role = role;
  21.     }
  22.     ...
  23.    
  24.     @Override
  25.     public void begin(int timeout, String name) throws TransactionException {
  26.         if (role != GlobalTransactionRole.Launcher) {
  27.             assertXIDNotNull();
  28.             if (LOGGER.isDebugEnabled()) {
  29.                 LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
  30.             }
  31.             return;
  32.         }
  33.         assertXIDNull();
  34.         String currentXid = RootContext.getXID();
  35.         if (currentXid != null) {
  36.             throw new IllegalStateException("Global transaction already exists," + " can't begin a new global transaction, currentXid = " + currentXid);
  37.         }
  38.         //通过全局事务管理器去真正开启全局事务,一旦开启成功,就可以获取到一个xid
  39.         xid = transactionManager.begin(null, null, name, timeout);
  40.         status = GlobalStatus.Begin;
  41.         //把xid绑定到RootContext的线程本地变量副本里去
  42.         RootContext.bind(xid);
  43.         if (LOGGER.isInfoEnabled()) {
  44.             LOGGER.info("Begin new global transaction [{}]", xid);
  45.         }
  46.     }
  47.     ...
  48. }
  49. public class TransactionManagerHolder {
  50.     private static final Logger LOGGER = LoggerFactory.getLogger(TransactionManagerHolder.class);
  51.    
  52.     private static class SingletonHolder {
  53.         private static TransactionManager INSTANCE = null;
  54.         static {
  55.             try {
  56.                 INSTANCE = EnhancedServiceLoader.load(TransactionManager.class);
  57.                 LOGGER.info("TransactionManager Singleton {}", INSTANCE);
  58.             } catch (Throwable anyEx) {
  59.                 LOGGER.error("Failed to load TransactionManager Singleton! ", anyEx);
  60.             }
  61.         }
  62.     }
  63.    
  64.     //Get transaction manager.
  65.     public static TransactionManager get() {
  66.         if (SingletonHolder.INSTANCE == null) {
  67.             throw new ShouldNeverHappenException("TransactionManager is NOT ready!");
  68.         }
  69.         return SingletonHolder.INSTANCE;
  70.     }
  71.    
  72.     private TransactionManagerHolder() {
  73.     }
  74. }
  75. //The type Default transaction manager.
  76. //默认的全局事务管理器
  77. public class DefaultTransactionManager implements TransactionManager {
  78.     @Override
  79.     public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
  80.         //构建一个全局事务开启请求GlobalBeginRequest
  81.         GlobalBeginRequest request = new GlobalBeginRequest();
  82.         request.setTransactionName(name);
  83.         request.setTimeout(timeout);
  84.         //发起一个同步调用
  85.         GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
  86.         if (response.getResultCode() == ResultCode.Failed) {
  87.             throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
  88.         }
  89.         return response.getXid();
  90.     }
  91.     ...
  92.    
  93.     private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
  94.         try {
  95.             //TMNettyRemotingClient会和Seata Server基于Netty建立长连接
  96.             return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
  97.         } catch (TimeoutException toe) {
  98.             throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
  99.         }
  100.     }
  101. }
  102. public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
  103.     ...
  104.     @Override
  105.     public Object sendSyncRequest(Object msg) throws TimeoutException {
  106.         //因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡
  107.         String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
  108.         //获取RPC调用的超时时间
  109.         long timeoutMillis = this.getRpcRequestTimeout();
  110.         //构建一个RPC消息
  111.         RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
  112.         //send batch message
  113.         //put message into basketMap, @see MergedSendRunnable
  114.         //默认是不开启批量消息发送
  115.         if (this.isEnableClientBatchSendRequest()) {
  116.             ...
  117.         } else {
  118.             //通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel
  119.             //然后通过网络连接Channel把RpcMessage发送出去
  120.             Channel channel = clientChannelManager.acquireChannel(serverAddress);
  121.             return super.sendSync(channel, rpcMessage, timeoutMillis);
  122.         }
  123.     }
  124.     ...
  125. }
  126. public abstract class AbstractNettyRemoting implements Disposable {
  127.     ...
  128.     protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
  129.         if (timeoutMillis <= 0) {
  130.             throw new FrameworkException("timeout should more than 0ms");
  131.         }
  132.         if (channel == null) {
  133.             LOGGER.warn("sendSync nothing, caused by null channel.");
  134.             return null;
  135.         }
  136.         //把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里
  137.         MessageFuture messageFuture = new MessageFuture();
  138.         messageFuture.setRequestMessage(rpcMessage);
  139.         messageFuture.setTimeout(timeoutMillis);
  140.         futures.put(rpcMessage.getId(), messageFuture);
  141.         channelWritableCheck(channel, rpcMessage.getBody());
  142.         //获取远程地址
  143.         String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
  144.         doBeforeRpcHooks(remoteAddr, rpcMessage);
  145.         //异步化发送数据,同时对发送结果添加监听器
  146.         //如果发送失败,则会对网络连接Channel进行销毁处理
  147.         channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
  148.             if (!future.isSuccess()) {
  149.                 MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
  150.                 if (messageFuture1 != null) {
  151.                     messageFuture1.setResultMessage(future.cause());
  152.                 }
  153.                 destroyChannel(future.channel());
  154.             }
  155.         });
  156.         try {
  157.             //然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应
  158.             Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
  159.             doAfterRpcHooks(remoteAddr, rpcMessage, result);
  160.             return result;
  161.         } catch (Exception exx) {
  162.             LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
  163.             if (exx instanceof TimeoutException) {
  164.                 throw (TimeoutException) exx;
  165.             } else {
  166.                 throw new RuntimeException(exx);
  167.             }
  168.         }
  169.     }
  170.     ...
  171. }
复制代码
 
15.状态操作组件获取状态和State状态类继承体系
(1)状态操作组件StateInstruction获取状态
(2)State状态类的继承体系
 
(1)状态操作组件StateInstruction获取状态
在状态机引擎ProcessCtrlStateMachineEngine的startInternal()方法中,会通过流程上下文ProcessContext来获取状态操作组件StateInstruction。
 
根据状态操作组件StateInstruction,可以获取状态机的开始State(状态)、也可以获取指定的某个State(状态)。
  1. public class ProcessCtrlStateMachineEngine implements StateMachineEngine {
  2.     ...
  3.     //启动状态机实例StateMachineInstance
  4.     //@param stateMachineName 状态机名称
  5.     //@param tenantId 租户ID
  6.     //@param businessKey 业务key
  7.     //@param startParams 状态机实例的启动参数
  8.     //@param async 是否异步化运行
  9.     //@param callback 异步化运行时的回调接口
  10.     private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {
  11.         try {
  12.             //如果指定需要异步运行,但是状态机配置里是不允许异步运行的,则会抛异常
  13.             if (async && !stateMachineConfig.isEnableAsync()) {
  14.                 throw new EngineExecutionException("Asynchronous start is disabled. please set StateMachineConfig.enableAsync=true first.", FrameworkErrorCode.AsynchronousStartDisabled);
  15.             }
  16.             if (StringUtils.isEmpty(tenantId)) {
  17.                 tenantId = stateMachineConfig.getDefaultTenantId();
  18.             }
  19.             //创建状态机实例StateMachineInstance
  20.             StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);
  21.             //创建一个流程上下文构造器ProcessContextBuilder实例,用来构造流程运行时的上下文
  22.             ProcessContextBuilder contextBuilder = ProcessContextBuilder.create()
  23.                 .withProcessType(ProcessType.STATE_LANG)//设置流程类型
  24.                 .withOperationName(DomainConstants.OPERATION_NAME_START)//设置操作名称
  25.                 .withAsyncCallback(callback)//设置异步化时的回调接口
  26.                 .withInstruction(new StateInstruction(stateMachineName, tenantId))//设置状态获取组件
  27.                 .withStateMachineInstance(instance)//设置状态机实例
  28.                 .withStateMachineConfig(getStateMachineConfig())//设置状态机配置
  29.                 .withStateMachineEngine(this);//设置状态机引擎
  30.             //上下文变量Map
  31.             Map<String, Object> contextVariables;
  32.             if (startParams != null) {
  33.                 contextVariables = new ConcurrentHashMap<>(startParams.size());
  34.                 nullSafeCopy(startParams, contextVariables);
  35.             } else {
  36.                 contextVariables = new ConcurrentHashMap<>();
  37.             }
  38.             instance.setContext(contextVariables);
  39.             //设置流程上下文构造器ProcessContextBuilder实例
  40.             contextBuilder.withStateMachineContextVariables(contextVariables);
  41.             contextBuilder.withIsAsyncExecution(async);
  42.             //通过流程上下文构造器ProcessContextBuilder构建出一个流程上下文ProcessContext
  43.             ProcessContext processContext = contextBuilder.build();
  44.             //如果状态机定义StateMachine是支持持久化的 且 状态日志的存储组件不为null
  45.             if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {
  46.                 //通过状态机实例日志和状态实例日志的存储组件StateLogStore,记录状态机实例StateMachineInstance的启动事件日志 + 开启全局事务
  47.                 //比如在DB中更新状态机实例StateMachineInstance的启动状态
  48.                 stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
  49.             }
  50.             if (StringUtils.isEmpty(instance.getId())) {
  51.                 //生成状态机实例StateMachineInstance的序号
  52.                 instance.setId(stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
  53.             }
  54.          
  55.             //从流程上下文中获取状态操作组件StateInstruction
  56.             StateInstruction stateInstruction = processContext.getInstruction(StateInstruction.class);
  57.             //获取循环策略
  58.             Loop loop = LoopTaskUtils.getLoopConfig(processContext, stateInstruction.getState(processContext));
  59.             if (null != loop) {
  60.                 stateInstruction.setTemporaryState(new LoopStartStateImpl());
  61.             }
  62.             if (async) {
  63.                 stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);
  64.             } else {
  65.                 stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);
  66.             }
  67.             return instance;
  68.         } finally {
  69.             if (stateMachineConfig.getStateLogStore() != null) {
  70.                 stateMachineConfig.getStateLogStore().clearUp();
  71.             }
  72.         }
  73.     }
  74.     ...
  75. }
  76. //State Instruction,状态操作组件StateInstruction
  77. public class StateInstruction implements Instruction {
  78.     private String stateName;
  79.     private String stateMachineName;
  80.     private String tenantId;
  81.     private boolean end;
  82.     //Temporary state node for running temporary nodes in the state machine
  83.     private State temporaryState;
  84.    
  85.     public StateInstruction() {
  86.     }
  87.     public StateInstruction(String stateMachineName, String tenantId) {
  88.         this.stateMachineName = stateMachineName;
  89.         this.tenantId = tenantId;
  90.     }
  91.     //根据流程上下文获取状态,可以是状态机的开始State(状态)、也可以是指定的某个State(状态)
  92.     public State getState(ProcessContext context) {
  93.         if (getTemporaryState() != null) {
  94.             return temporaryState;
  95.         }
  96.         String stateName = getStateName();
  97.         String stateMachineName = getStateMachineName();
  98.         String tenantId = getTenantId();
  99.         if (StringUtils.isEmpty(stateMachineName)) {
  100.             throw new EngineExecutionException("StateMachineName is required", FrameworkErrorCode.ParameterRequired);
  101.         }
  102.         StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
  103.         StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(stateMachineName, tenantId);
  104.         if (stateMachine == null) {
  105.             throw new EngineExecutionException("StateMachine[" + stateMachineName + "] is not exist", FrameworkErrorCode.ObjectNotExists);
  106.         }
  107.         //默认就是获取状态机定义StateMachine中的第一个状态State
  108.         if (StringUtils.isEmpty(stateName)) {
  109.             stateName = stateMachine.getStartState();
  110.             setStateName(stateName);
  111.         }
  112.         //根据状态名字stateName,从状态机定义中获取对应的状态State
  113.         State state = stateMachine.getStates().get(stateName);
  114.         if (state == null) {
  115.             throw new EngineExecutionException("State[" + stateName + "] is not exist", FrameworkErrorCode.ObjectNotExists);
  116.         }
  117.         return state;
  118.     }
  119.     ...
  120. }
复制代码
(2)State状态类的继承体系
13.png
[code]//A State in StateMachinepublic interface State {    String getName();    String getComment();    String getType();    String getNext();    Map getExtensions();    StateMachine getStateMachine();}public abstract class BaseState implements State {    private transient String name;//状态名称    private String type;//状态类型    private String comment;//状态备注    private String next;//下一个状态名称    private Map extensions;//状态扩展属性    private transient StateMachine stateMachine;//状态所属的状态机    ...}//The state of the execution task (abstract class), the specific task to be executed is determined by the subclasspublic abstract class AbstractTaskState extends BaseState implements TaskState {    private String compensateState;//补偿状态    private boolean isForCompensation;//是否用于补偿    private boolean isForUpdate;//是否用于更新    private List retry;//重试列表    private List catches;//异常匹配列表    private List input;//状态输入列表    private Map output;//状态输出数据    private Map status;//Map    private List inputExpressions;//输入表达式列表    private Map outputExpressions;//输出表达式数据    private boolean isPersist = true;//是否持久化    private Boolean retryPersistModeUpdate;//是否更新Saga重试持久化的模式    private Boolean compensatePersistModeUpdate;//是否更新Saga补偿持久化的模式    private Loop loop;//循环对象    ...        public static class RetryImpl implements Retry {        private List exceptions;//重试的过程中遇到了哪些异常        private List
您需要登录后才可以回帖 登录 | 立即注册