大纲
1.Seata Saga案例简介
2.Seata Saga案例的状态机定义分析
3.Seata Saga分布式事务与状态机关系
4.Seata Saga案例的Dubbo服务调用配置分析
5.Seata Saga案例的状态机数据库和工程启动
6.基于数据库的状态机配置实例的初始化
7.状态机配置实例中包含的一些关键组件
8.默认的状态机配置类的初始化
9.状态机定义的仓储组件解析状态机定义文件
10.状态机定义的仓储组件注册StateMachine
11.状态机引擎接口的源码
12.状态机引擎创建状态机实例的源码
13.ProcessContextBuilder构建流程上下文
14.StateLogStore记录日志和开启Saga全局事务
15.状态操作组件获取状态和State状态类继承体系
16.启动状态机实例时发布流程上下文到事件总线
17.通过业务处理器处理状态机当前需要执行的状态
18.通过服务调用组件来执行State指定的服务方法
19.将业务处理器路由到状态机下一个要执行的状态
1.Seata Saga案例简介
seata-samples项目下的saga模块,会基于Seata Saga模式,实现了分布式事务的提交和回滚。
在dubbo-saga-sample模块中,一个分布式事务内会有两个Saga事务参与者,这两个Saga事务参与者分别是InventoryAction和BalanceAction。当分布式事务提交时,两者均提交。当分布式事务回滚时,两者均回滚。
这两个Saga事务参与者均是Dubbo服务。这两个Saga事务参与者都有一个reduce方法表示库存扣减或余额扣减,还有一个compensateReduce方法表示补偿扣减操作。
InventoryAction接口定义如下:- public interface InventoryAction {
- boolean reduce(String businessKey, int count);
- boolean compensateReduce(String businessKey);
- }
复制代码 BalanceAction接口定义如下:- public interface BalanceAction {
- boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params);
- boolean compensateReduce(String businessKey, Map<String, Object> params);
- }
复制代码 运行dubbo-saga-sample模块的Demo工程:
步骤一:启动Seata Server
步骤二:运行DubboSagaProviderStarter
步骤三:运行DubboSagaTransactionStarter
2.Seata Saga案例的状态机定义分析
(1)Saga状态机定义文件详情
(2)状态机定义的全流程输入输出和流转分析
(1)Saga状态机定义文件详情
Saga的本质就是一个状态机。
reduce_inventory_and_balance.json- {
- "Name": "reduceInventoryAndBalance",
- "Comment": "reduce inventory then reduce balance in a transaction",
- "StartState": "ReduceInventory",
- "Version": "0.0.1",
- "States": {
- "ReduceInventory": {
- "Type": "ServiceTask",
- "ServiceName": "inventoryAction",
- "ServiceMethod": "reduce",
- "CompensateState": "CompensateReduceInventory",
- "Next": "ChoiceState",
- "Input": [
- "$.[businessKey]",
- "$.[count]"
- ],
- "Output": {
- "reduceInventoryResult": "$.#root"
- },
- "Status": {
- "#root == true": "SU",
- "#root == false": "FA",
- "$Exception{java.lang.Throwable}": "UN"
- }
- },
- "ChoiceState": {
- "Type": "Choice",
- "Choices": [{
- "Expression": "[reduceInventoryResult] == true",
- "Next": "ReduceBalance"
- }],
- "Default": "Fail"
- },
- "ReduceBalance": {
- "Type": "ServiceTask",
- "ServiceName": "balanceAction",
- "ServiceMethod": "reduce",
- "CompensateState": "CompensateReduceBalance",
- "Input": [
- "$.[businessKey]",
- "$.[amount]",
- {
- "throwException": "$.[mockReduceBalanceFail]"
- }
- ],
- "Output": {
- "compensateReduceBalanceResult": "$.#root"
- },
- "Status": {
- "#root == true": "SU",
- "#root == false": "FA",
- "$Exception{java.lang.Throwable}": "UN"
- },
- "Catch": [{
- "Exceptions": [
- "java.lang.Throwable"
- ],
- "Next": "CompensationTrigger"
- }],
- "Next": "Succeed"
- },
- "CompensateReduceInventory": {
- "Type": "ServiceTask",
- "ServiceName": "inventoryAction",
- "ServiceMethod": "compensateReduce",
- "Input": [
- "$.[businessKey]"
- ]
- },
- "CompensateReduceBalance": {
- "Type": "ServiceTask",
- "ServiceName": "balanceAction",
- "ServiceMethod": "compensateReduce",
- "Input": [
- "$.[businessKey]"
- ]
- },
- "CompensationTrigger": {
- "Type": "CompensationTrigger",
- "Next": "Fail"
- },
- "Succeed": {
- "Type": "Succeed"
- },
- "Fail": {
- "Type": "Fail",
- "ErrorCode": "PURCHASE_FAILED",
- "Message": "purchase failed"
- }
- }
- }
复制代码 (2)状态机定义的全流程输入输出和流转分析
3.Seata Saga分布式事务与状态机关系
Seata Saga分布式事务基本都是通过状态机一起配合起来使用的,通过状态机来调度编排每一个分布式事务的执行。编排的每一个服务执行时,可能会执行本地事务,也可能会调用远程服务。触发每一个服务的补偿时,可能会回滚本地事务,也可能会补偿远程服务。
4.Seata Saga案例的Dubbo服务调用配置分析
(1)Dubbo服务提供者的配置
(2)Dubbo服务调用者的配置
(3)Seata Saga状态机的Bean的配置
(1)Dubbo服务提供者的配置
seata-dubbo-provider.xml- <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
- xmlns="http://www.springframework.org/schema/beans"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://code.alibabatech.com/schema/dubbo
- http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
- <dubbo:application name="saga-sample">
- <dubbo:parameter key="qos.enable" value="true"/>
- <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
- <dubbo:parameter key="qos.port" value="33333"/>
- </dubbo:application>
- <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
- <dubbo:protocol name="dubbo" port="-1"/>
- <dubbo:provider timeout="10000" threads="10" threadpool="fixed" loadbalance="roundrobin"/>
-
- <dubbo:service ref="inventoryActionImpl" interface="io.seata.samples.saga.action.InventoryAction"/>
- <dubbo:service ref="balanceActionImpl" interface="io.seata.samples.saga.action.BalanceAction"/>
- <bean id="inventoryActionImpl" />
- <bean id="balanceActionImpl" />
- </beans>
复制代码
(2)Dubbo服务调用者的配置
seata-dubbo-reference.xml- <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
- xmlns="http://www.springframework.org/schema/beans"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://code.alibabatech.com/schema/dubbo
- http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
- <dubbo:application name="saga-sample-reference">
- <dubbo:parameter key="qos.enable" value="true"/>
- <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
- <dubbo:parameter key="qos.port" value="22222"/>
- </dubbo:application>
- <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
- <dubbo:protocol name="dubbo" port="-1"/>
- <dubbo:reference id="inventoryAction" interface="io.seata.samples.saga.action.InventoryAction" check="false" lazy="true"/>
- <dubbo:reference id="balanceAction" interface="io.seata.samples.saga.action.BalanceAction" check="false" lazy="true"/>
- </beans>
复制代码
(3)Seata Saga状态机的Bean的配置
seata-saga.xml- <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
- xmlns="http://www.springframework.org/schema/beans"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://code.alibabatech.com/schema/dubbo
- http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
- <dubbo:application name="saga-sample">
- <dubbo:parameter key="qos.enable" value="true"/>
- <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
- <dubbo:parameter key="qos.port" value="33333"/>
- </dubbo:application>
- <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
- <dubbo:protocol name="dubbo" port="-1"/>
- <dubbo:provider timeout="10000" threads="10" threadpool="fixed" loadbalance="roundrobin"/>
-
- <dubbo:service ref="inventoryActionImpl" interface="io.seata.samples.saga.action.InventoryAction"/>
- <dubbo:service ref="balanceActionImpl" interface="io.seata.samples.saga.action.BalanceAction"/>
- <bean id="inventoryActionImpl" />
- <bean id="balanceActionImpl" />
- </beans><beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
- xmlns="http://www.springframework.org/schema/beans"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://code.alibabatech.com/schema/dubbo
- http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
- <dubbo:application name="saga-sample">
- <dubbo:parameter key="qos.enable" value="true"/>
- <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
- <dubbo:parameter key="qos.port" value="33333"/>
- </dubbo:application>
- <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
- <dubbo:protocol name="dubbo" port="-1"/>
- <dubbo:provider timeout="10000" threads="10" threadpool="fixed" loadbalance="roundrobin"/>
-
- <dubbo:service ref="inventoryActionImpl" interface="io.seata.samples.saga.action.InventoryAction"/>
- <dubbo:service ref="balanceActionImpl" interface="io.seata.samples.saga.action.BalanceAction"/>
- <bean id="inventoryActionImpl" />
- <bean id="balanceActionImpl" />
- </beans><beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
- xmlns="http://www.springframework.org/schema/beans"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://code.alibabatech.com/schema/dubbo
- http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
- <dubbo:application name="saga-sample">
- <dubbo:parameter key="qos.enable" value="true"/>
- <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
- <dubbo:parameter key="qos.port" value="33333"/>
- </dubbo:application>
- <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
- <dubbo:protocol name="dubbo" port="-1"/>
- <dubbo:provider timeout="10000" threads="10" threadpool="fixed" loadbalance="roundrobin"/>
-
- <dubbo:service ref="inventoryActionImpl" interface="io.seata.samples.saga.action.InventoryAction"/>
- <dubbo:service ref="balanceActionImpl" interface="io.seata.samples.saga.action.BalanceAction"/>
- <bean id="inventoryActionImpl" />
- <bean id="balanceActionImpl" />
- </beans><beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
- xmlns="http://www.springframework.org/schema/beans"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://code.alibabatech.com/schema/dubbo
- http://code.alibabatech.com/schema/dubbo/dubbo.xsd" default-autowire="byName">
- <dubbo:application name="saga-sample-reference">
- <dubbo:parameter key="qos.enable" value="true"/>
- <dubbo:parameter key="qos.accept.foreign.ip" value="false"/>
- <dubbo:parameter key="qos.port" value="22222"/>
- </dubbo:application>
- <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
- <dubbo:protocol name="dubbo" port="-1"/>
- <dubbo:reference id="inventoryAction" interface="io.seata.samples.saga.action.InventoryAction" check="false" lazy="true"/>
- <dubbo:reference id="balanceAction" interface="io.seata.samples.saga.action.BalanceAction" check="false" lazy="true"/>
- </beans>
复制代码
5.Seata Saga案例的状态机数据库和工程启动
(1)状态机相关的数据库表
(2)Seata Saga案例的工程启动
(1)状态机相关的数据库表
状态机定义 -> 状态机实例 -> 状态实例- -- PUBLIC.SEATA_STATE_INST definition
- -- 状态实例,每个状态机实例里会有多个状态实例
- CREATE CACHED TABLE "PUBLIC"."SEATA_STATE_INST"(
- "ID" VARCHAR NOT NULL COMMENT 'id',
- "MACHINE_INST_ID" VARCHAR NOT NULL COMMENT 'state machine instance id,机器实例ID',
- "NAME" VARCHAR NOT NULL COMMENT 'state name',
- "TYPE" VARCHAR COMMENT 'state type',
- "SERVICE_NAME" VARCHAR COMMENT 'service name',
- "SERVICE_METHOD" VARCHAR COMMENT 'method name',
- "SERVICE_TYPE" VARCHAR COMMENT 'service type',
- "BUSINESS_KEY" VARCHAR COMMENT 'business key',
- "STATE_ID_COMPENSATED_FOR" VARCHAR COMMENT 'state compensated for',
- "STATE_ID_RETRIED_FOR" VARCHAR COMMENT 'state retried for',
- "GMT_STARTED" TIMESTAMP NOT NULL COMMENT 'start time',
- "IS_FOR_UPDATE" TINYINT COMMENT 'is service for update',
- "INPUT_PARAMS" CLOB COMMENT 'input parameters',
- "OUTPUT_PARAMS" CLOB COMMENT 'output parameters',
- "STATUS" VARCHAR NOT NULL COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
- "EXCEP" BLOB COMMENT 'exception',
- "GMT_UPDATED" TIMESTAMP COMMENT 'update time',
- "GMT_END" TIMESTAMP COMMENT 'end time'
- );
- -- PUBLIC.SEATA_STATE_MACHINE_DEF definition
- -- 状态机定义,在json文件中定义好的状态流程会存放到这里
- CREATE CACHED TABLE "PUBLIC"."SEATA_STATE_MACHINE_DEF"(
- "ID" VARCHAR NOT NULL COMMENT 'id',
- "NAME" VARCHAR NOT NULL COMMENT 'name',
- "TENANT_ID" VARCHAR NOT NULL COMMENT 'tenant id',
- "APP_NAME" VARCHAR NOT NULL COMMENT 'application name',
- "TYPE" VARCHAR COMMENT 'state language type',
- "COMMENT_" VARCHAR COMMENT 'comment',
- "VER" VARCHAR NOT NULL COMMENT 'version',
- "GMT_CREATE" TIMESTAMP NOT NULL COMMENT 'create time',
- "STATUS" VARCHAR NOT NULL COMMENT 'status(AC:active|IN:inactive)',
- "CONTENT" CLOB COMMENT 'content',
- "RECOVER_STRATEGY" VARCHAR COMMENT 'transaction recover strategy(compensate|retry)'
- );
- -- PUBLIC.SEATA_STATE_MACHINE_INST definition
- -- 状态机实例,每执行一次完整的分布式事务就会对应一个状态机实例,每个状态机实例里会有多个状态实例
- CREATE CACHED TABLE "PUBLIC"."SEATA_STATE_MACHINE_INST"(
- "ID" VARCHAR NOT NULL COMMENT 'id',
- "MACHINE_ID" VARCHAR NOT NULL COMMENT 'state machine definition id',
- "TENANT_ID" VARCHAR NOT NULL COMMENT 'tenant id',
- "PARENT_ID" VARCHAR COMMENT 'parent id',
- "GMT_STARTED" TIMESTAMP NOT NULL COMMENT 'start time',
- "BUSINESS_KEY" VARCHAR COMMENT 'business key',
- "START_PARAMS" CLOB COMMENT 'start parameters',
- "GMT_END" TIMESTAMP COMMENT 'end time',
- "EXCEP" BLOB COMMENT 'exception',
- "END_PARAMS" CLOB COMMENT 'end parameters',
- "STATUS" VARCHAR COMMENT 'status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
- "COMPENSATION_STATUS" VARCHAR COMMENT 'compensation status(SU succeed|FA failed|UN unknown|SK skipped|RU running)',
- "IS_RUNNING" TINYINT COMMENT 'is running(0 no|1 yes)',
- "GMT_UPDATED" TIMESTAMP NOT NULL
- );
复制代码 (2)Seata Saga案例的工程启动
ApplicationKeeper启动完之后就会被阻塞住。- public class ApplicationKeeper {
- private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationKeeper.class);
- private final ReentrantLock LOCK = new ReentrantLock();
- private final Condition STOP = LOCK.newCondition();
- //Instantiates a new Application keeper.
- public ApplicationKeeper(AbstractApplicationContext applicationContext) {
- addShutdownHook(applicationContext);
- }
- private void addShutdownHook(final AbstractApplicationContext applicationContext) {
- Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- applicationContext.close();
- LOGGER.info("ApplicationContext " + applicationContext + " is closed.");
- } catch (Exception e) {
- LOGGER.error("Failed to close ApplicationContext", e);
- }
- try {
- LOCK.lock();
- STOP.signal();
- } finally {
- LOCK.unlock();
- }
- }
- }));
- }
- public void keep() {
- synchronized (LOCK) {
- try {
- LOGGER.info("Application is keep running ... ");
- LOCK.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
复制代码 步骤一:运行DubboSagaProviderStarter,启动Dubbo Provider。- public class DubboSagaProviderStarter {
- private static TestingServer server;
- //The entry point of application.
- public static void main(String[] args) throws Exception {
- //mock zk server
- mockZKServer();
- ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
- new String[]{"spring/seata-dubbo-provider.xml"}
- );
- new ApplicationKeeper(applicationContext).keep();
- }
- private static void mockZKServer() throws Exception {
- //Mock zk server,作为Dubbo配置中心
- server = new TestingServer(2181, true);
- server.start();
- }
- }
- public class InventoryActionImpl implements InventoryAction {
- private static final Logger LOGGER = LoggerFactory.getLogger(InventoryActionImpl.class);
-
- @Override
- public boolean reduce(String businessKey, int count) {
- LOGGER.info("reduce inventory succeed, count: " + count + ", businessKey:" + businessKey);
- return true;
- }
-
- @Override
- public boolean compensateReduce(String businessKey) {
- LOGGER.info("compensate reduce inventory succeed, businessKey:" + businessKey);
- return true;
- }
- }
- public class BalanceActionImpl implements BalanceAction {
- private static final Logger LOGGER = LoggerFactory.getLogger(BalanceActionImpl.class);
-
- @Override
- public boolean reduce(String businessKey, BigDecimal amount, Map<String, Object> params) {
- if (params != null) {
- Object throwException = params.get("throwException");
- if (throwException != null && "true".equals(throwException.toString())) {
- throw new RuntimeException("reduce balance failed");
- }
- }
- LOGGER.info("reduce balance succeed, amount: " + amount + ", businessKey:" + businessKey);
- return true;
- }
- @Override
- public boolean compensateReduce(String businessKey, Map<String, Object> params) {
- if (params != null) {
- Object throwException = params.get("throwException");
- if (throwException != null && "true".equals(throwException.toString())) {
- throw new RuntimeException("compensate reduce balance failed");
- }
- }
- LOGGER.info("compensate reduce balance succeed, businessKey:" + businessKey);
- return true;
- }
- }
复制代码 步骤二:运行DubboSagaTransactionStarter,启动Demo工程。- public class DubboSagaTransactionStarter {
- private static volatile Object lock = new Object();
-
- private static AsyncCallback CALL_BACK = new AsyncCallback() {
- @Override
- public void onFinished(ProcessContext context, StateMachineInstance stateMachineInstance) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
-
- @Override
- public void onError(ProcessContext context, StateMachineInstance stateMachineInstance, Exception exp) {
- synchronized (lock) {
- lock.notifyAll();
- }
- }
- };
- public static void main(String[] args) {
- AbstractApplicationContext applicationContext = new ClassPathXmlApplicationContext(
- new String[]{"spring/seata-saga.xml", "spring/seata-dubbo-reference.xml"}
- );
- StateMachineEngine stateMachineEngine = (StateMachineEngine) applicationContext.getBean("stateMachineEngine");
- transactionCommittedDemo(stateMachineEngine);
- transactionCompensatedDemo(stateMachineEngine);
- new ApplicationKeeper(applicationContext).keep();
- }
- private static void transactionCommittedDemo(StateMachineEngine stateMachineEngine) {
- //设置状态机上下文
- Map<String, Object> startParams = new HashMap<>(3);
- String businessKey = String.valueOf(System.currentTimeMillis());
- startParams.put("businessKey", businessKey);
- startParams.put("count", 10);
- startParams.put("amount", new BigDecimal("100"));
- //sync test,同步启动状态机
- StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);
- Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID: " + inst.getId());
- System.out.println("saga transaction commit succeed. XID: " + inst.getId());
- //async test,异步启动状态机
- businessKey = String.valueOf(System.currentTimeMillis());
- inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams, CALL_BACK);
- waittingForFinish(inst);
- Assert.isTrue(ExecutionStatus.SU.equals(inst.getStatus()), "saga transaction execute failed. XID: " + inst.getId());
- System.out.println("saga transaction commit succeed. XID: " + inst.getId());
- }
- private static void transactionCompensatedDemo(StateMachineEngine stateMachineEngine) {
- //设置状态机上下文
- Map<String, Object> startParams = new HashMap<>(4);
- String businessKey = String.valueOf(System.currentTimeMillis());
- startParams.put("businessKey", businessKey);
- startParams.put("count", 10);
- startParams.put("amount", new BigDecimal("100"));
- startParams.put("mockReduceBalanceFail", "true");
- //sync test,同步启动状态机
- StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("reduceInventoryAndBalance", null, businessKey, startParams);
- //async test,异步启动状态机
- businessKey = String.valueOf(System.currentTimeMillis());
- inst = stateMachineEngine.startWithBusinessKeyAsync("reduceInventoryAndBalance", null, businessKey, startParams, CALL_BACK);
- waittingForFinish(inst);
- Assert.isTrue(ExecutionStatus.SU.equals(inst.getCompensationStatus()), "saga transaction compensate failed. XID: " + inst.getId());
- System.out.println("saga transaction compensate succeed. XID: " + inst.getId());
- }
- private static void waittingForFinish(StateMachineInstance inst) {
- synchronized (lock) {
- if (ExecutionStatus.RU.equals(inst.getStatus())) {
- try {
- lock.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
复制代码
6.基于数据库的状态机配置实例的初始化
基于数据库的状态机配置实例DbStateMachineConfig会将状态机定义、状态机实例、状态实例放在数据库中。由seata-saga.xml配置文件可知,它会被注入到状态机引擎实例StateMachineEngine里。DbStateMachineConfig的初始化源码如下:
7.状态机配置实例中包含的一些关键组件
(1)状态机配置组件
(2)状态机实例和状态实例的仓储组件
(3)状态机实例日志和状态实例日志的存储组件
(4)状态机定义的存储组件
(5)表达式工厂管理器
(6)状态机定义的仓储组件
(1)状态机配置组件
StateMachineConfig表示的是状态机配置组件,可以从中获取状态机需要的各种组件。- //StateMachineConfig
- //状态机配置组件接口,可以获取到状态机需要的各种组件
- public interface StateMachineConfig {
- //Gets state log store.
- //获取状态机实例和状态实例的仓储组件
- //StateLogRepository可以根据各种条件查询状态机实例和状态实例
- StateLogRepository getStateLogRepository();
- //Gets get state log store.
- //获取状态机实例日志和状态实例日志的存储组件
- //StateLogStore可以对状态机日志和状态日志进行读写、也可以根据各种条件查询状态机实例和状态实例
- StateLogStore getStateLogStore();
- //Gets get state language definition store.
- //获取状态机定义的存储组件
- StateLangStore getStateLangStore();
- //Gets get expression factory manager.
- //获取表达式工厂管理器
- ExpressionFactoryManager getExpressionFactoryManager();
- //Gets get evaluator factory manager.
- //获取表达式计算工厂管理器
- EvaluatorFactoryManager getEvaluatorFactoryManager();
- //Gets get charset.
- //获取字符集编码
- String getCharset();
- //Gets get default tenant id.
- //获取默认的租户ID
- String getDefaultTenantId();
- //Gets get state machine repository.
- //获取状态机定义的仓储组件
- StateMachineRepository getStateMachineRepository();
- //Gets get status decision strategy.
- //获取State执行结果的判定策略组件
- StatusDecisionStrategy getStatusDecisionStrategy();
- //Gets get seq generator.
- //获取序号生成器
- SeqGenerator getSeqGenerator();
- //Gets get process ctrl event publisher.
- //获取流程控制事件的发布器(同步发布)
- ProcessCtrlEventPublisher getProcessCtrlEventPublisher();
- //Gets get async process ctrl event publisher.
- //获取流程控制事件的发布器(异步发布)
- ProcessCtrlEventPublisher getAsyncProcessCtrlEventPublisher();
- //Gets get application context.
- //获取Spring容器上下文
- ApplicationContext getApplicationContext();
- //Gets get thread pool executor.
- //获取异步执行的线程池
- ThreadPoolExecutor getThreadPoolExecutor();
- //Is enable async boolean.
- //是否启用异步
- boolean isEnableAsync();
- //get ServiceInvokerManager
- //获取服务调用的管理器
- ServiceInvokerManager getServiceInvokerManager();
- //get trans operation timeout
- //获取事务操作的超时时间
- int getTransOperationTimeout();
- //get service invoke timeout
- //获取服务调用的超时时间
- int getServiceInvokeTimeout();
- //get ScriptEngineManager
- //获取脚本引擎管理器
- ScriptEngineManager getScriptEngineManager();
- }
复制代码 (2)状态机实例和状态实例的仓储组件
StateLogRepository表示的是状态机实例和状态实例的仓储组件,可以根据不同的条件查询状态机实例和状态实例。- //State Log Repository
- //状态机实例和状态实例的仓储组件,可以根据各种条件查询状态机实例和状态实例
- public interface StateLogRepository {
- //Get state machine instance
- //根据状态机实例ID,获取到具体的状态机实例
- StateMachineInstance getStateMachineInstance(String stateMachineInstanceId);
- //Get state machine instance by businessKey
- //根据业务key获取状态机实例
- StateMachineInstance getStateMachineInstanceByBusinessKey(String businessKey, String tenantId);
- //Query the list of state machine instances by parent id
- //根据父ID查询状态机实例
- List<StateMachineInstance> queryStateMachineInstanceByParentId(String parentId);
- //Get state instance
- //根据状态实例ID查询状态实例
- StateInstance getStateInstance(String stateInstanceId, String machineInstId);
- //Get a list of state instances by state machine instance id
- //根据状态机实例ID查询所有状态实例
- List<StateInstance> queryStateInstanceListByMachineInstanceId(String stateMachineInstanceId);
- }
复制代码 (3)状态机实例日志和状态实例日志的存储组件接口
StateLogStore表示的是状态机实例日志和状态实例日志的存储组件,可以记录各种状态机实例日志和状态实例日志。- //StateMachine engine log store
- //状态机实例日志和状态实例日志的存储组件
- public interface StateLogStore {
- //Record state machine startup events
- //记录状态机实例的启动事件日志
- void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context);
- //Record status end event
- //记录状态机实例的完成事件日志
- void recordStateMachineFinished(StateMachineInstance machineInstance, ProcessContext context);
- //Record state machine restarted
- //记录状态机实例的重启事件日志
- void recordStateMachineRestarted(StateMachineInstance machineInstance, ProcessContext context);
- //Record state start execution event
- //记录状态实例的启动事件日志
- void recordStateStarted(StateInstance stateInstance, ProcessContext context);
- //Record state execution end event
- //记录状态实例的完成事件日志
- void recordStateFinished(StateInstance stateInstance, ProcessContext context);
- //Get state machine instance
- //根据状态机实例ID获取状态机实例
- StateMachineInstance getStateMachineInstance(String stateMachineInstanceId);
- //Get state machine instance by businessKey
- //根据业务key获取状态机实例
- StateMachineInstance getStateMachineInstanceByBusinessKey(String businessKey, String tenantId);
- //Query the list of state machine instances by parent id
- //根据父ID查询状态机实例
- List<StateMachineInstance> queryStateMachineInstanceByParentId(String parentId);
- //Get state instance
- //根据状态实例ID和状态机ID获取状态实例
- StateInstance getStateInstance(String stateInstanceId, String machineInstId);
- //Get a list of state instances by state machine instance id
- //根据状态机实例ID查询状态实例
- List<StateInstance> queryStateInstanceListByMachineInstanceId(String stateMachineInstanceId);
- }
复制代码 (4)状态机定义的存储组件接口
StateLangStore表示的是状态机定义的存储组件,可以存储和获取不同的状态机定义。- //State language definition store
- //状态机定义的存储组件
- public interface StateLangStore {
- //Query the state machine definition by id
- //根据状态机ID获取状态机定义
- StateMachine getStateMachineById(String stateMachineId);
- //Get the latest version of the state machine by state machine name
- //根据状态机名称和租户ID,可以获取最新版本的状态机定义
- StateMachine getLastVersionStateMachine(String stateMachineName, String tenantId);
- //Storage state machine definition
- //存储状态机定义
- boolean storeStateMachine(StateMachine stateMachine);
- }
复制代码 (5)表达式工厂管理器
ExpressionFactoryManager表示的是表达式工厂管理器,可以根据不同的表达式类型获取不同的表达式工厂组件。- //Expression factory manager
- //表达式工厂管理器
- public class ExpressionFactoryManager {
- public static final String DEFAULT_EXPRESSION_TYPE = "Default";
- //表达式类型 -> 表达式工厂
- private Map<String, ExpressionFactory> expressionFactoryMap = new ConcurrentHashMap<>();
-
- //根据不同的表达式类型,去获取不同的表达式工厂组件
- public ExpressionFactory getExpressionFactory(String expressionType) {
- if (StringUtils.isBlank(expressionType)) {
- expressionType = DEFAULT_EXPRESSION_TYPE;
- }
- return expressionFactoryMap.get(expressionType);
- }
-
- public void setExpressionFactoryMap(Map<String, ExpressionFactory> expressionFactoryMap) {
- this.expressionFactoryMap.putAll(expressionFactoryMap);
- }
-
- public void putExpressionFactory(String type, ExpressionFactory factory) {
- this.expressionFactoryMap.put(type, factory);
- }
- }
复制代码 (6)状态机定义的仓储组件
StateMachineRepository表示的是状态机定义的仓储组件,可以注册和获取一个状态机定义。- //StateMachineRepository
- //状态机定义的仓储组件
- public interface StateMachineRepository {
- //Gets get state machine by id.
- //根据状态机ID获取一个状态机定义StateMachine
- StateMachine getStateMachineById(String stateMachineId);
- //Gets get state machine.
- //根据状态机名字、租户ID获取一个状态机定义StateMachine
- StateMachine getStateMachine(String stateMachineName, String tenantId);
- //Gets get state machine.
- //根据状态机名字、租户ID、版本号获取一个状态机定义StateMachine
- StateMachine getStateMachine(String stateMachineName, String tenantId, String version);
- //Register the state machine to the repository (if the same version already exists, return the existing version)
- //向状态机定义的仓储组件注册一个状态机定义
- StateMachine registryStateMachine(StateMachine stateMachine);
- //registry by resources
- //向状态机定义的仓储组件注册资源
- void registryByResources(Resource[] resources, String tenantId) throws IOException;
- }
复制代码
8.默认的状态机配置类的初始化
DefaultStateMachineConfig是默认的状态机配置类,基于数据库的状态机配置组件DbStateMachineConfig是它的子类。它可以支持不同种类的状态机配置,也就是状态机定义、状态机实例、状态实例,既可以放在数据库里,也可以放在其他存储里,默认是放在数据库里的。- //Default state machine configuration
- //DefaultStateMachineConfig会封装(注入)状态机运行时需要的所有组件
- //StateMachineConfig可以获取到状态机需要的各种组件
- //ApplicationContextAware可以感知Spring容器上下文
- //InitializingBean可以对Spring Bean进行初始化
- public class DefaultStateMachineConfig implements StateMachineConfig, ApplicationContextAware, InitializingBean {
- private static final int DEFAULT_TRANS_OPER_TIMEOUT = 60000 * 30;
- private static final int DEFAULT_SERVICE_INVOKE_TIMEOUT = 60000 * 5;
- //事务操作的超时时间,默认是30分钟
- private int transOperationTimeout = DEFAULT_TRANS_OPER_TIMEOUT;
- //服务调用的超时时间,默认是5分钟
- private int serviceInvokeTimeout = DEFAULT_SERVICE_INVOKE_TIMEOUT;
- //状态机实例和状态实例的仓储组件
- private StateLogRepository stateLogRepository;
- //状态机实例日志和状态实例日志的存储组件
- private StateLogStore stateLogStore;
- //状态机定义的存储组件
- private StateLangStore stateLangStore;
- //表达式工厂管理器
- private ExpressionFactoryManager expressionFactoryManager;
- //表达式计算的工厂管理器
- private EvaluatorFactoryManager evaluatorFactoryManager;
- //状态机定义的仓储组件
- private StateMachineRepository stateMachineRepository;
- //State执行结果的判定策略组件
- private StatusDecisionStrategy statusDecisionStrategy;
- //序号生成器
- private SeqGenerator seqGenerator;
- //流程控制的事件发布器(同步发布)
- private ProcessCtrlEventPublisher syncProcessCtrlEventPublisher;
- //流程控制的事件发布器(异步发布)
- private ProcessCtrlEventPublisher asyncProcessCtrlEventPublisher;
- //Spring容器上下文
- private ApplicationContext applicationContext;
- //异步执行的线程池
- private ThreadPoolExecutor threadPoolExecutor;
- //是否启用异步执行,默认是false
- private boolean enableAsync = false;
- //服务调用的管理器
- private ServiceInvokerManager serviceInvokerManager;
- //是否自动注册资源(自动注册流程定义),默认是true
- private boolean autoRegisterResources = true;
- //默认的状态机定义文件(json文件)
- private String[] resources = new String[]{"classpath*:seata/saga/statelang/**/*.json"};
- //字符集编码
- private String charset = "UTF-8";
- //默认租户ID
- private String defaultTenantId = "000001";
- //脚本引擎管理器
- private ScriptEngineManager scriptEngineManager;
- //状态机定义文件(json文件)解析器
- private String sagaJsonParser = DEFAULT_SAGA_JSON_PARSER;
- //是否更新Saga重试持久化的模式
- private boolean sagaRetryPersistModeUpdate = DEFAULT_CLIENT_SAGA_RETRY_PERSIST_MODE_UPDATE;
- //是否更新Saga补偿持久化的模式
- private boolean sagaCompensatePersistModeUpdate = DEFAULT_CLIENT_SAGA_COMPENSATE_PERSIST_MODE_UPDATE;
- //因为DefaultStateMachineConfig实现了InitializingBean接口
- //所以继承DefaultStateMachineConfig的Spring Bean初始化之后,就会回调afterPropertiesSet()方法
- @Override
- public void afterPropertiesSet() throws Exception {
- init();
- }
- //初始化方法
- protected void init() throws Exception {
- //创建表达式工厂管理器
- if (expressionFactoryManager == null) {
- //创建一个表达式工厂管理器
- expressionFactoryManager = new ExpressionFactoryManager();
- //创建一个Spring EL表达式工厂
- SpringELExpressionFactory springELExpressionFactory = new SpringELExpressionFactory();
- //注入(设置)Spring容器上下文到Spring EL表达式工厂中
- springELExpressionFactory.setApplicationContext(getApplicationContext());
- //将Spring EL表达式工厂放入表达式工厂管理器中
- expressionFactoryManager.putExpressionFactory(ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE, springELExpressionFactory);
- //创建一个序号表达式工厂
- SequenceExpressionFactory sequenceExpressionFactory = new SequenceExpressionFactory();
- //注入(设置)序号生成器到序号表达式工厂中
- sequenceExpressionFactory.setSeqGenerator(getSeqGenerator());
- //将序号表达式工厂放入表达式工厂管理器中
- expressionFactoryManager.putExpressionFactory(DomainConstants.EXPRESSION_TYPE_SEQUENCE, sequenceExpressionFactory);
- }
- //创建表达式计算工厂管理器
- if (evaluatorFactoryManager == null) {
- //创建一个表达式计算工厂管理器
- evaluatorFactoryManager = new EvaluatorFactoryManager();
- //创建一个表达式计算工厂,并将该表达式计算工厂放入表达式计算工厂管理器中
- ExpressionEvaluatorFactory expressionEvaluatorFactory = new ExpressionEvaluatorFactory();
- expressionEvaluatorFactory.setExpressionFactory(expressionFactoryManager.getExpressionFactory(ExpressionFactoryManager.DEFAULT_EXPRESSION_TYPE));
- evaluatorFactoryManager.putEvaluatorFactory(EvaluatorFactoryManager.EVALUATOR_TYPE_DEFAULT, expressionEvaluatorFactory);
- //创建一个异常匹配计算工厂,并将该异常匹配计算工厂放入表达式计算工厂管理器中
- evaluatorFactoryManager.putEvaluatorFactory(DomainConstants.EVALUATOR_TYPE_EXCEPTION, new ExceptionMatchEvaluatorFactory());
- }
- //创建状态机定义的仓储组件
- if (stateMachineRepository == null) {
- StateMachineRepositoryImpl stateMachineRepository = new StateMachineRepositoryImpl();
- stateMachineRepository.setCharset(charset);//设置字符集编码
- stateMachineRepository.setSeqGenerator(seqGenerator);//设置序号生成器
- stateMachineRepository.setStateLangStore(stateLangStore);//设置状态机定义的存储组件
- stateMachineRepository.setDefaultTenantId(defaultTenantId);//设置默认租户ID
- stateMachineRepository.setJsonParserName(sagaJsonParser);//设置状态机定义文件(json文件)解析器
- this.stateMachineRepository = stateMachineRepository;
- }
- //stateMachineRepository may be overridden, so move `stateMachineRepository.registryByResources()` here.
- //如果需要自动注册资源(比如数据库等),则获取资源并进行注册
- if (autoRegisterResources && ArrayUtils.isNotEmpty(resources)) {
- try {
- //读取默认的状态机定义文件(json文件),把这个状态机定义注册到状态机定义的仓储组件中
- Resource[] resources = ResourceUtil.getResources(this.resources);
- stateMachineRepository.registryByResources(resources, defaultTenantId);
- } catch (IOException e) {
- LOGGER.error("Load State Language Resources failed.", e);
- }
- }
- //创建状态机实例和状态实例的仓储组件
- if (stateLogRepository == null) {
- StateLogRepositoryImpl stateLogRepositoryImpl = new StateLogRepositoryImpl();
- stateLogRepositoryImpl.setStateLogStore(stateLogStore);
- this.stateLogRepository = stateLogRepositoryImpl;
- }
- //创建State执行结果的判定策略组件
- if (statusDecisionStrategy == null) {
- statusDecisionStrategy = new DefaultStatusDecisionStrategy();
- }
- //创建流程控制的事件发布器(同步发布)
- if (syncProcessCtrlEventPublisher == null) {
- //创建流程控制的事件发布器
- ProcessCtrlEventPublisher syncEventPublisher = new ProcessCtrlEventPublisher();
- //创建流程控制器
- ProcessControllerImpl processorController = createProcessorController(syncEventPublisher);
- //创建流程控制事件的消费者
- ProcessCtrlEventConsumer processCtrlEventConsumer = new ProcessCtrlEventConsumer();
- processCtrlEventConsumer.setProcessController(processorController);
- //创建事件总线
- DirectEventBus directEventBus = new DirectEventBus();
- syncEventPublisher.setEventBus(directEventBus);
- directEventBus.registerEventConsumer(processCtrlEventConsumer);
- syncProcessCtrlEventPublisher = syncEventPublisher;
- }
- //如果启用了异步化执行 且 流程控制的事件发布器(异步发布)为null
- if (enableAsync && asyncProcessCtrlEventPublisher == null) {
- ProcessCtrlEventPublisher asyncEventPublisher = new ProcessCtrlEventPublisher();
- ProcessControllerImpl processorController = createProcessorController(asyncEventPublisher);
- ProcessCtrlEventConsumer processCtrlEventConsumer = new ProcessCtrlEventConsumer();
- processCtrlEventConsumer.setProcessController(processorController);
- AsyncEventBus asyncEventBus = new AsyncEventBus();
- asyncEventBus.setThreadPoolExecutor(getThreadPoolExecutor());
- asyncEventPublisher.setEventBus(asyncEventBus);
- asyncEventBus.registerEventConsumer(processCtrlEventConsumer);
- asyncProcessCtrlEventPublisher = asyncEventPublisher;
- }
- //创建服务调用管理器
- if (this.serviceInvokerManager == null) {
- this.serviceInvokerManager = new ServiceInvokerManager();
- //创建Spring Bean的服务调用组件
- SpringBeanServiceInvoker springBeanServiceInvoker = new SpringBeanServiceInvoker();
- springBeanServiceInvoker.setApplicationContext(getApplicationContext());
- springBeanServiceInvoker.setThreadPoolExecutor(threadPoolExecutor);
- springBeanServiceInvoker.setSagaJsonParser(getSagaJsonParser());
- //将Spring Bean的服务调用组件放入到服务调用管理器中
- this.serviceInvokerManager.putServiceInvoker(DomainConstants.SERVICE_TYPE_SPRING_BEAN, springBeanServiceInvoker);
- }
- //创建脚本引擎管理器
- if (this.scriptEngineManager == null) {
- this.scriptEngineManager = new ScriptEngineManager();
- }
- }
- protected ProcessControllerImpl createProcessorController(ProcessCtrlEventPublisher eventPublisher) throws Exception {
- //创建状态机流程处理的路由器
- StateMachineProcessRouter stateMachineProcessRouter = new StateMachineProcessRouter();
- stateMachineProcessRouter.initDefaultStateRouters();
- //通过SPI机制加载状态机流程处理路由的拦截器
- loadStateRouterInterceptors(stateMachineProcessRouter.getStateRouters());
- //创建状态机流程处理器
- StateMachineProcessHandler stateMachineProcessHandler = new StateMachineProcessHandler();
- stateMachineProcessHandler.initDefaultHandlers();
- //通过SPI机制加载状态机流程处理器的拦截器
- loadStateHandlerInterceptors(stateMachineProcessHandler.getStateHandlers());
- //创建默认的路由处理器,并设置事件发布器
- DefaultRouterHandler defaultRouterHandler = new DefaultRouterHandler();
- defaultRouterHandler.setEventPublisher(eventPublisher);
- //创建状态机流程处理路由器对应的Map
- Map<String, ProcessRouter> processRouterMap = new HashMap<>(1);
- processRouterMap.put(ProcessType.STATE_LANG.getCode(), stateMachineProcessRouter);
- defaultRouterHandler.setProcessRouters(processRouterMap);
- //创建自定义的业务处理器
- CustomizeBusinessProcessor customizeBusinessProcessor = new CustomizeBusinessProcessor();
- //创建状态机流程处理器对应的Map
- Map<String, ProcessHandler> processHandlerMap = new HashMap<>(1);
- processHandlerMap.put(ProcessType.STATE_LANG.getCode(), stateMachineProcessHandler);
- customizeBusinessProcessor.setProcessHandlers(processHandlerMap);
- //创建路由处理器对应的Map
- Map<String, RouterHandler> routerHandlerMap = new HashMap<>(1);
- routerHandlerMap.put(ProcessType.STATE_LANG.getCode(), defaultRouterHandler);
- customizeBusinessProcessor.setRouterHandlers(routerHandlerMap);
- //创建流程控制器
- ProcessControllerImpl processorController = new ProcessControllerImpl();
- processorController.setBusinessProcessor(customizeBusinessProcessor);
- return processorController;
- }
- protected void loadStateHandlerInterceptors(Map<String, StateHandler> stateHandlerMap) {
- for (StateHandler stateHandler : stateHandlerMap.values()) {
- if (stateHandler instanceof InterceptableStateHandler) {
- InterceptableStateHandler interceptableStateHandler = (InterceptableStateHandler) stateHandler;
- List<StateHandlerInterceptor> interceptorList = EnhancedServiceLoader.loadAll(StateHandlerInterceptor.class);
- for (StateHandlerInterceptor interceptor : interceptorList) {
- if (interceptor.match(interceptableStateHandler.getClass())) {
- interceptableStateHandler.addInterceptor(interceptor);
- }
- if (interceptor instanceof ApplicationContextAware) {
- ((ApplicationContextAware) interceptor).setApplicationContext(getApplicationContext());
- }
- }
- }
- }
- }
- protected void loadStateRouterInterceptors(Map<String, StateRouter> stateRouterMap) {
- for (StateRouter stateRouter : stateRouterMap.values()) {
- if (stateRouter instanceof InterceptableStateRouter) {
- InterceptableStateRouter interceptableStateRouter = (InterceptableStateRouter) stateRouter;
- List<StateRouterInterceptor> interceptorList = EnhancedServiceLoader.loadAll(StateRouterInterceptor.class);
- for (StateRouterInterceptor interceptor : interceptorList) {
- if (interceptor.match(interceptableStateRouter.getClass())) {
- interceptableStateRouter.addInterceptor(interceptor);
- }
- if (interceptor instanceof ApplicationContextAware) {
- ((ApplicationContextAware) interceptor).setApplicationContext(getApplicationContext());
- }
- }
- }
- }
- }
- ...
- }
复制代码
9.状态机定义的仓储组件解析状态机定义文件
在DefaultStateMachineConfig的init()方法初始化逻辑中,会通过调用状态机定义的仓储组件StateMachineRepository.registryByResources()方法来读取和解析状态机定义文件,并把解析出来的状态机定义StateMachine注册到状态机定义的仓储组件中,也就是存储在stateMachineRepository变量里。
注意:StateMachine就是状态机定义。
- public class StateMachineRepositoryImpl implements StateMachineRepository {
- ...
- @Override
- public void registryByResources(Resource[] resources, String tenantId) throws IOException {
- if (resources != null) {
- for (Resource resource : resources) {
- //1.基于IO流把状态机定义文件(json文件)读取出来,放入json字符串
- String json;
- try (InputStream is = resource.getInputStream()) {
- json = IOUtils.toString(is, charset);
- }
- //2.从状态机解析器工厂中获取一个状态机解析器,然后解析json字符串得到一个状态机定义StateMachine
- StateMachine stateMachine = StateMachineParserFactory.getStateMachineParser(jsonParserName).parse(json);
- if (stateMachine != null) {
- stateMachine.setContent(json);
- if (StringUtils.isBlank(stateMachine.getTenantId())) {
- stateMachine.setTenantId(tenantId);
- }
- //3.注册状态机定义StateMachine
- registryStateMachine(stateMachine);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("===== StateMachine Loaded: \n{}", json);
- }
- }
- }
- }
- }
- ...
- }
复制代码
10.状态机定义的仓储组件注册StateMachine
- public class StateMachineRepositoryImpl implements StateMachineRepository {
- private Map<String/** Name_Tenant **/, Item> stateMachineMapByNameAndTenant = new ConcurrentHashMap<>();
- private Map<String/** Id **/, Item> stateMachineMapById = new ConcurrentHashMap<>();
- private StateLangStore stateLangStore;//状态机定义的存储组件
- ...
-
- @Override
- public StateMachine registryStateMachine(StateMachine stateMachine) {
- String stateMachineName = stateMachine.getName();
- String tenantId = stateMachine.getTenantId();
- if (stateLangStore != null) {
- //1.从状态机定义的存储组件中,根据状态机名称和租户ID,获取最新版本的状态机定义StateMachine
- StateMachine oldStateMachine = stateLangStore.getLastVersionStateMachine(stateMachineName, tenantId);
- if (oldStateMachine != null) {
- byte[] oldBytesContent = null;
- byte[] bytesContent = null;
- try {
- oldBytesContent = oldStateMachine.getContent().getBytes(charset);
- bytesContent = stateMachine.getContent().getBytes(charset);
- } catch (UnsupportedEncodingException e) {
- LOGGER.error(e.getMessage(), e);
- }
- if (Arrays.equals(bytesContent, oldBytesContent) && stateMachine.getVersion() != null && stateMachine.getVersion().equals(oldStateMachine.getVersion())) {
- LOGGER.info("StateMachine[{}] is already exist a same version", stateMachineName);
- stateMachine.setId(oldStateMachine.getId());
- stateMachine.setGmtCreate(oldStateMachine.getGmtCreate());
- Item item = new Item(stateMachine);
- stateMachineMapByNameAndTenant.put(stateMachineName + "_" + tenantId, item);
- stateMachineMapById.put(stateMachine.getId(), item);
- return stateMachine;
- }
- }
- if (StringUtils.isBlank(stateMachine.getId())) {
- //2.生成新的状态机定义StateMachine的ID
- stateMachine.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE));
- }
- stateMachine.setGmtCreate(new Date());
- //3.通过状态机定义的存储组件,把状态机定义StateMachine保存起来,默认就是把状态机定义StateMachine存储到DB中
- stateLangStore.storeStateMachine(stateMachine);
- }
- if (StringUtils.isBlank(stateMachine.getId())) {
- stateMachine.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE));
- }
- Item item = new Item(stateMachine);
- //4.把状态机定义放入内存中
- stateMachineMapByNameAndTenant.put(stateMachineName + "_" + tenantId, item);
- stateMachineMapById.put(stateMachine.getId(), item);
- return stateMachine;
- }
- ...
- }
- public class DbStateLangStore extends AbstractStore implements StateLangStore {
- ...
- @Override
- public boolean storeStateMachine(StateMachine stateMachine) {
- //把状态机定义StateMachine写入到DB中
- return executeUpdate(stateLangStoreSqls.getInsertStateMachineSql(dbType), STATE_MACHINE_TO_STATEMENT, stateMachine) > 0;
- }
- ...
- }
复制代码
11.状态机引擎接口的源码
从seata-saga.xml配置文件可知,基于数据库的状态机配置实例DbStateMachineConfig会被注入到基于流程控制的状态机引擎实例ProcessCtrlStateMachineEngine中。- <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:jdbc="http://www.springframework.org/schema/jdbc"
- xmlns="http://www.springframework.org/schema/beans"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd"
- default-autowire="byName">
- ...
- <bean id="stateMachineEngine" >
- <property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
- </bean>
- ...
- </beans>
复制代码 基于流程控制的状态机引擎ProcessCtrlStateMachineEngine,会实现状态机引擎接口StateMachineEngine。- //ProcessCtrl-based state machine engine
- //基于流程控制的状态机引擎
- public class ProcessCtrlStateMachineEngine implements StateMachineEngine {
- //需要在seata-saga.xml配置文件中注入状态机配置
- private StateMachineConfig stateMachineConfig;
- ...
- }
- //State machine engine
- //状态机引擎接口
- public interface StateMachineEngine {
- //start a state machine instance
- //根据状态机的名称、租户ID、启动参数来启动一个状态机实例
- StateMachineInstance start(String stateMachineName, String tenantId, Map<String, Object> startParams) throws EngineExecutionException;
- //start a state machine instance with businessKey
- //根据状态机的名称、租户ID、业务key、启动参数来启动一个状态机实例
- StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) throws EngineExecutionException;
- //start a state machine instance asynchronously
- //根据状态机的名称、租户ID、启动参数来启动一个状态机实例
- //也就是状态机实例跑完之后,会回调传入的callback()方法
- StateMachineInstance startAsync(String stateMachineName, String tenantId, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;
- //start a state machine instance asynchronously with businessKey
- //根据状态机的名称、租户ID、业务key、启动参数来异步化启动一个状态机实例
- //也就是状态机实例跑完之后,会回调传入的callback()方法
- StateMachineInstance startWithBusinessKeyAsync(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException;
- //forward restart a failed state machine instance
- //重启一个失败的状态机实例
- StateMachineInstance forward(String stateMachineInstId, Map<String, Object> replaceParams) throws ForwardInvalidException;
- //forward restart a failed state machine instance asynchronously
- //异步化重启一个失败的状态机实例
- StateMachineInstance forwardAsync(String stateMachineInstId, Map<String, Object> replaceParams, AsyncCallback callback) throws ForwardInvalidException;
- //compensate a state machine instance
- //对一个状态机实例进行补偿
- StateMachineInstance compensate(String stateMachineInstId, Map<String, Object> replaceParams) throws EngineExecutionException;
- //compensate a state machine instance asynchronously
- //对一个状态机实例进行异步化补偿
- StateMachineInstance compensateAsync(String stateMachineInstId, Map<String, Object> replaceParams, AsyncCallback callback) throws EngineExecutionException;
- //skip current failed state instance and forward restart state machine instance
- //跳过当前失败的状态机实例,同时重启一个状态机实例
- StateMachineInstance skipAndForward(String stateMachineInstId, Map<String, Object> replaceParams) throws EngineExecutionException;
- //skip current failed state instance and forward restart state machine instance asynchronously
- StateMachineInstance skipAndForwardAsync(String stateMachineInstId, AsyncCallback callback) throws EngineExecutionException;
- //get state machine configurations
- StateMachineConfig getStateMachineConfig();
- //Reload StateMachine Instance
- StateMachineInstance reloadStateMachineInstance(String instId);
- }
复制代码
12.状态机引擎创建状态机实例的源码
基于流程控制的状态机引擎实例ProcessCtrlStateMachineEngine的start()方法调用其startInternal()方法启动一个状态机实例时,会先创建一个状态机实例StateMachineInstance。
在调用ProcessCtrlStateMachineEngine的createMachineInstance()方法创建一个状态机实例StateMachineInstance的过程中,会先通过状态机定义的仓储组件StateMachineRepository来获取一个状态机定义StateMachine,然后将状态机定义StateMachine注入到状态机实例对象中,以此来完成状态机实例对象的实例化。
- //ProcessCtrl-based state machine engine
- //基于流程控制的状态机引擎
- public class ProcessCtrlStateMachineEngine implements StateMachineEngine {
- //需要在seata-saga.xml配置文件中注入状态机配置实例
- private StateMachineConfig stateMachineConfig;
- ...
-
- @Override
- public StateMachineInstance start(String stateMachineName, String tenantId, Map<String, Object> startParams) throws EngineExecutionException {
- return startInternal(stateMachineName, tenantId, null, startParams, false, null);
- }
-
- @Override
- public StateMachineInstance startAsync(String stateMachineName, String tenantId, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException {
- return startInternal(stateMachineName, tenantId, null, startParams, true, callback);
- }
-
- @Override
- public StateMachineInstance startWithBusinessKey(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) throws EngineExecutionException {
- return startInternal(stateMachineName, tenantId, businessKey, startParams, false, null);
- }
-
- @Override
- public StateMachineInstance startWithBusinessKeyAsync(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, AsyncCallback callback) throws EngineExecutionException {
- return startInternal(stateMachineName, tenantId, businessKey, startParams, true, callback);
- }
- ...
-
- //启动状态机实例StateMachineInstance
- //@param stateMachineName 状态机名称
- //@param tenantId 租户ID
- //@param businessKey 业务key
- //@param startParams 状态机实例的启动参数
- //@param async 是否异步化运行
- //@param callback 异步化运行时的回调接口
- private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {
- try {
- //如果指定需要异步运行,但是状态机配置里是不允许异步运行的,则会抛异常
- if (async && !stateMachineConfig.isEnableAsync()) {
- throw new EngineExecutionException("Asynchronous start is disabled. please set StateMachineConfig.enableAsync=true first.", FrameworkErrorCode.AsynchronousStartDisabled);
- }
- if (StringUtils.isEmpty(tenantId)) {
- tenantId = stateMachineConfig.getDefaultTenantId();
- }
- //创建状态机实例StateMachineInstance
- StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);
- ...
- }
- ...
- }
-
- private StateMachineInstance createMachineInstance(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams) {
- //通过状态机定义的仓储组件StateMachineRepository,来获取一个状态机定义StateMachine
- StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(stateMachineName, tenantId);
- if (stateMachine == null) {
- throw new EngineExecutionException("StateMachine[" + stateMachineName + "] is not exists", FrameworkErrorCode.ObjectNotExists);
- }
- //根据状态机定义StateMachine,实例化一个状态机实例对象StateMachineInstanceImpl
- StateMachineInstanceImpl inst = new StateMachineInstanceImpl();
- inst.setStateMachine(stateMachine);
- inst.setMachineId(stateMachine.getId());
- inst.setTenantId(tenantId);
- inst.setBusinessKey(businessKey);
- //设置状态机实例的启动参数
- inst.setStartParams(startParams);
- if (startParams != null) {
- if (StringUtils.hasText(businessKey)) {
- startParams.put(DomainConstants.VAR_NAME_BUSINESSKEY, businessKey);
- }
- String parentId = (String)startParams.get(DomainConstants.VAR_NAME_PARENT_ID);
- if (StringUtils.hasText(parentId)) {
- inst.setParentId(parentId);
- startParams.remove(DomainConstants.VAR_NAME_PARENT_ID);
- }
- }
- inst.setStatus(ExecutionStatus.RU);
- inst.setRunning(true);
- inst.setGmtStarted(new Date());
- inst.setGmtUpdated(inst.getGmtStarted());
- return inst;
- }
- ...
- }
复制代码
13.ProcessContextBuilder构建流程上下文
ProcessCtrlStateMachineEngine.startInternal()方法在执行过程中会创建一个流程上下文构造器ProcessContextBuilder实例,然后根据这个流程上下文构造器ProcessContextBuilder构建出一个流程上下文ProcessContext。
- public class ProcessCtrlStateMachineEngine implements StateMachineEngine {
- ...
- //启动状态机实例StateMachineInstance
- //@param stateMachineName 状态机名称
- //@param tenantId 租户ID
- //@param businessKey 业务key
- //@param startParams 状态机实例的启动参数
- //@param async 是否异步化运行
- //@param callback 异步化运行时的回调接口
- private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {
- try {
- //如果指定需要异步运行,但是状态机配置里是不允许异步运行的,则会抛异常
- if (async && !stateMachineConfig.isEnableAsync()) {
- throw new EngineExecutionException("Asynchronous start is disabled. please set StateMachineConfig.enableAsync=true first.", FrameworkErrorCode.AsynchronousStartDisabled);
- }
- if (StringUtils.isEmpty(tenantId)) {
- tenantId = stateMachineConfig.getDefaultTenantId();
- }
- //创建状态机实例StateMachineInstance
- StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);
- //创建一个流程上下文构造器ProcessContextBuilder实例,用来构造流程运行时的上下文
- ProcessContextBuilder contextBuilder = ProcessContextBuilder.create()
- .withProcessType(ProcessType.STATE_LANG)//设置流程类型
- .withOperationName(DomainConstants.OPERATION_NAME_START)//设置操作名称
- .withAsyncCallback(callback)//设置异步化时的回调接口
- .withInstruction(new StateInstruction(stateMachineName, tenantId))//设置状态获取组件
- .withStateMachineInstance(instance)//设置状态机实例
- .withStateMachineConfig(getStateMachineConfig())//设置状态机配置
- .withStateMachineEngine(this);//设置状态机引擎
- //上下文变量Map
- Map<String, Object> contextVariables;
- if (startParams != null) {
- contextVariables = new ConcurrentHashMap<>(startParams.size());
- nullSafeCopy(startParams, contextVariables);
- } else {
- contextVariables = new ConcurrentHashMap<>();
- }
- instance.setContext(contextVariables);
- //设置流程上下文构造器ProcessContextBuilder实例
- contextBuilder.withStateMachineContextVariables(contextVariables);
- contextBuilder.withIsAsyncExecution(async);
- //通过流程上下文构造器ProcessContextBuilder构建出一个流程上下文ProcessContext
- ProcessContext processContext = contextBuilder.build();
- //如果状态机定义StateMachine是支持持久化的 且 状态日志的存储组件不为null
- if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {
- //通过状态机实例日志和状态实例日志的存储组件StateLogStore,记录状态机实例StateMachineInstance的启动事件日志 + 开启全局事务
- //比如在DB中更新状态机实例StateMachineInstance的启动状态
- stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
- }
- if (StringUtils.isEmpty(instance.getId())) {
- //生成状态机实例StateMachineInstance的序号
- instance.setId(stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
- }
- StateInstruction stateInstruction = processContext.getInstruction(StateInstruction.class);
- Loop loop = LoopTaskUtils.getLoopConfig(processContext, stateInstruction.getState(processContext));
- if (null != loop) {
- stateInstruction.setTemporaryState(new LoopStartStateImpl());
- }
- if (async) {
- stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);
- } else {
- stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);
- }
- return instance;
- } finally {
- if (stateMachineConfig.getStateLogStore() != null) {
- stateMachineConfig.getStateLogStore().clearUp();
- }
- }
- }
- ...
- }
复制代码
14.StateLogStore记录日志和开启Saga全局事务
在状态机引擎ProcessCtrlStateMachineEngine的startInternal()方法中,会通过调用StateLogStore的recordStateMachineStarted()方法,记录状态机实例StateMachineInstance的启动事件到DB,以及开启Saga全局事务。
- public class DbAndReportTcStateLogStore extends AbstractStore implements StateLogStore {
- private static final Logger LOGGER = LoggerFactory.getLogger(DbAndReportTcStateLogStore.class);
-
- //插入状态机实例到数据库的SQL语句
- private static final StateMachineInstanceToStatementForInsert STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_INSERT = new StateMachineInstanceToStatementForInsert();
- //更新数据库状态机实例的SQL语句
- private static final StateMachineInstanceToStatementForUpdate STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_UPDATE = new StateMachineInstanceToStatementForUpdate();
- //查询数据库中的状态机实例的SQL语句
- private static final ResultSetToStateMachineInstance RESULT_SET_TO_STATE_MACHINE_INSTANCE = new ResultSetToStateMachineInstance();
- //插入状态实例到数据库的SQL语句
- private static final StateInstanceToStatementForInsert STATE_INSTANCE_TO_STATEMENT_FOR_INSERT = new StateInstanceToStatementForInsert();
- //更新数据库中的状态实例的SQL语句
- private static final StateInstanceToStatementForUpdate STATE_INSTANCE_TO_STATEMENT_FOR_UPDATE = new StateInstanceToStatementForUpdate();
- //查询数据库中的状态实例的SQL语句
- private static final ResultSetToStateInstance RESULT_SET_TO_STATE_INSTANCE = new ResultSetToStateInstance();
- //Saga全局事务模版
- private SagaTransactionalTemplate sagaTransactionalTemplate;
- //参数序列化组件
- private Serializer<Object, String> paramsSerializer = new ParamsSerializer();
- //异常序列化组件
- private Serializer<Exception, byte[]> exceptionSerializer = new ExceptionSerializer();
- //状态日志的存储SQL语句
- private StateLogStoreSqls stateLogStoreSqls;
- //默认的租户ID
- private String defaultTenantId;
- //序号生成器
- private SeqGenerator seqGenerator;
- //记录状态机实例的启动事件日志到DB + 开启Saga全局事务
- @Override
- public void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context) {
- if (machineInstance != null) {
- //if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction, use parent transaction instead.
- //如果parentId是空的,那么当前事务就是Saga分布式事务的启动入口;
- //如果parentId不是空的,则已经有服务使用状态机启动了Saga分布式事务;
- String parentId = machineInstance.getParentId();
- if (StringUtils.isEmpty(parentId)) {
- //1.开启Saga全局事务
- beginTransaction(machineInstance, context);
- }
- try {
- if (StringUtils.isEmpty(machineInstance.getId()) && seqGenerator != null) {
- machineInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
- }
- //bind SAGA branch type
- RootContext.bindBranchType(BranchType.SAGA);
- //save to db
- machineInstance.setSerializedStartParams(paramsSerializer.serialize(machineInstance.getStartParams()));
- //2.记录日志到DB
- int effect = executeUpdate(stateLogStoreSqls.getRecordStateMachineStartedSql(dbType), STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_INSERT, machineInstance);
- if (effect < 1) {
- throw new StoreException("StateMachineInstance record start error, Xid: " + machineInstance.getId(), FrameworkErrorCode.OperationDenied);
- }
- } catch (StoreException e) {
- LOGGER.error("Record statemachine start error: {}, StateMachine: {}, XID: {}, Reason: {}", e.getErrcode(), machineInstance.getStateMachine().getName(), machineInstance.getId(), e.getMessage(), e);
- this.clearUp();
- throw e;
- }
- }
- }
- protected void beginTransaction(StateMachineInstance machineInstance, ProcessContext context) {
- if (sagaTransactionalTemplate != null) {
- //获取状态机配置
- StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
- //构建事务信息
- TransactionInfo transactionInfo = new TransactionInfo();
- transactionInfo.setTimeOut(stateMachineConfig.getTransOperationTimeout());
- transactionInfo.setName(Constants.SAGA_TRANS_NAME_PREFIX + machineInstance.getStateMachine().getName());
- try {
- //通过Saga事务模版开启全局事务
- GlobalTransaction globalTransaction = sagaTransactionalTemplate.beginTransaction(transactionInfo);
- machineInstance.setId(globalTransaction.getXid());
- context.setVariable(DomainConstants.VAR_NAME_GLOBAL_TX, globalTransaction);
- Map<String, Object> machineContext = machineInstance.getContext();
- if (machineContext != null) {
- machineContext.put(DomainConstants.VAR_NAME_GLOBAL_TX, globalTransaction);
- }
- } catch (ExecutionException e) {
- String xid = null;
- if (e.getTransaction() != null) {
- xid = e.getTransaction().getXid();
- }
- throw new EngineExecutionException(e, e.getCode() + ", TransName:" + transactionInfo.getName() + ", XID: " + xid + ", Reason: " + e.getMessage(), FrameworkErrorCode.TransactionManagerError);
- } finally {
- if (Boolean.TRUE.equals(context.getVariable(DomainConstants.VAR_NAME_IS_ASYNC_EXECUTION))) {
- RootContext.unbind();
- RootContext.unbindBranchType();
- }
- }
- }
- }
- ...
- }
- public class StateLogStoreSqls {
- private String recordStateMachineStartedSql;
- private static final String RECORD_STATE_MACHINE_STARTED_SQL = "INSERT INTO ${TABLE_PREFIX}state_machine_inst\n"
- + "(id, machine_id, tenant_id, parent_id, gmt_started, business_key, start_params, is_running, status, "
- + "gmt_updated)\n"
- + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
- ...
-
- public StateLogStoreSqls(String tablePrefix) {
- this.tablePrefix = tablePrefix;
- init();
- }
-
- private void init() {
- recordStateMachineStartedSql = RECORD_STATE_MACHINE_STARTED_SQL.replaceAll(TABLE_PREFIX_REGEX, tablePrefix);
- ...
- }
-
- public String getRecordStateMachineStartedSql(String dbType) {
- return recordStateMachineStartedSql;
- }
- ...
- }
- public class AbstractStore {
- protected DataSource dataSource;
- ...
-
- protected <T> int executeUpdate(String sql, ObjectToStatement<T> objectToStatement, T o) {
- Connection connection = null;
- PreparedStatement stmt = null;
- try {
- connection = dataSource.getConnection();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Preparing SQL: {}", sql);
- }
- stmt = connection.prepareStatement(sql);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("setting params to PreparedStatement: {}", BeanUtils.beanToString(o));
- }
- objectToStatement.toStatement(o, stmt);
- int count = stmt.executeUpdate();
- if (!connection.getAutoCommit()) {
- connection.commit();
- }
- return count;
- } catch (SQLException e) {
- throw new StoreException(e);
- } finally {
- closeSilent(stmt);
- closeSilent(connection);
- }
- }
- ...
- }
- public class DefaultSagaTransactionalTemplate implements SagaTransactionalTemplate, ApplicationContextAware, DisposableBean, InitializingBean {
- ...
- @Override
- public GlobalTransaction beginTransaction(TransactionInfo txInfo) throws TransactionalExecutor.ExecutionException {
- GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
- try {
- triggerBeforeBegin();
- //开启一个全局会话
- tx.begin(txInfo.getTimeOut(), txInfo.getName());
- triggerAfterBegin();
- } catch (TransactionException txe) {
- throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure);
- }
- return tx;
- }
- ...
- }
复制代码 执行GlobalTransaction的begin()方法开启一个全局会话:- //The type Default global transaction.
- //默认的全局事务
- public class DefaultGlobalTransaction implements GlobalTransaction {
- private TransactionManager transactionManager;
- private String xid;
- private GlobalStatus status;
- private GlobalTransactionRole role;
- ...
-
- DefaultGlobalTransaction() {
- //全局事务角色是全局事务发起者
- this(null, GlobalStatus.UnKnown, GlobalTransactionRole.Launcher);
- }
-
- //Instantiates a new Default global transaction.
- DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {
- this.transactionManager = TransactionManagerHolder.get();//全局事务管理者
- this.xid = xid;
- this.status = status;
- this.role = role;
- }
- ...
-
- @Override
- public void begin(int timeout, String name) throws TransactionException {
- if (role != GlobalTransactionRole.Launcher) {
- assertXIDNotNull();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
- }
- return;
- }
- assertXIDNull();
- String currentXid = RootContext.getXID();
- if (currentXid != null) {
- throw new IllegalStateException("Global transaction already exists," + " can't begin a new global transaction, currentXid = " + currentXid);
- }
- //通过全局事务管理器去真正开启全局事务,一旦开启成功,就可以获取到一个xid
- xid = transactionManager.begin(null, null, name, timeout);
- status = GlobalStatus.Begin;
- //把xid绑定到RootContext的线程本地变量副本里去
- RootContext.bind(xid);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("Begin new global transaction [{}]", xid);
- }
- }
- ...
- }
- public class TransactionManagerHolder {
- private static final Logger LOGGER = LoggerFactory.getLogger(TransactionManagerHolder.class);
-
- private static class SingletonHolder {
- private static TransactionManager INSTANCE = null;
- static {
- try {
- INSTANCE = EnhancedServiceLoader.load(TransactionManager.class);
- LOGGER.info("TransactionManager Singleton {}", INSTANCE);
- } catch (Throwable anyEx) {
- LOGGER.error("Failed to load TransactionManager Singleton! ", anyEx);
- }
- }
- }
-
- //Get transaction manager.
- public static TransactionManager get() {
- if (SingletonHolder.INSTANCE == null) {
- throw new ShouldNeverHappenException("TransactionManager is NOT ready!");
- }
- return SingletonHolder.INSTANCE;
- }
-
- private TransactionManagerHolder() {
- }
- }
- //The type Default transaction manager.
- //默认的全局事务管理器
- public class DefaultTransactionManager implements TransactionManager {
- @Override
- public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {
- //构建一个全局事务开启请求GlobalBeginRequest
- GlobalBeginRequest request = new GlobalBeginRequest();
- request.setTransactionName(name);
- request.setTimeout(timeout);
- //发起一个同步调用
- GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
- if (response.getResultCode() == ResultCode.Failed) {
- throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
- }
- return response.getXid();
- }
- ...
-
- private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {
- try {
- //TMNettyRemotingClient会和Seata Server基于Netty建立长连接
- return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);
- } catch (TimeoutException toe) {
- throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
- }
- }
- }
- public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {
- ...
- @Override
- public Object sendSyncRequest(Object msg) throws TimeoutException {
- //因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡
- String serverAddress = loadBalance(getTransactionServiceGroup(), msg);
- //获取RPC调用的超时时间
- long timeoutMillis = this.getRpcRequestTimeout();
- //构建一个RPC消息
- RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);
- //send batch message
- //put message into basketMap, @see MergedSendRunnable
- //默认是不开启批量消息发送
- if (this.isEnableClientBatchSendRequest()) {
- ...
- } else {
- //通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel
- //然后通过网络连接Channel把RpcMessage发送出去
- Channel channel = clientChannelManager.acquireChannel(serverAddress);
- return super.sendSync(channel, rpcMessage, timeoutMillis);
- }
- }
- ...
- }
- public abstract class AbstractNettyRemoting implements Disposable {
- ...
- protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {
- if (timeoutMillis <= 0) {
- throw new FrameworkException("timeout should more than 0ms");
- }
- if (channel == null) {
- LOGGER.warn("sendSync nothing, caused by null channel.");
- return null;
- }
- //把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里
- MessageFuture messageFuture = new MessageFuture();
- messageFuture.setRequestMessage(rpcMessage);
- messageFuture.setTimeout(timeoutMillis);
- futures.put(rpcMessage.getId(), messageFuture);
- channelWritableCheck(channel, rpcMessage.getBody());
- //获取远程地址
- String remoteAddr = ChannelUtil.getAddressFromChannel(channel);
- doBeforeRpcHooks(remoteAddr, rpcMessage);
- //异步化发送数据,同时对发送结果添加监听器
- //如果发送失败,则会对网络连接Channel进行销毁处理
- channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {
- if (!future.isSuccess()) {
- MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());
- if (messageFuture1 != null) {
- messageFuture1.setResultMessage(future.cause());
- }
- destroyChannel(future.channel());
- }
- });
- try {
- //然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应
- Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
- doAfterRpcHooks(remoteAddr, rpcMessage, result);
- return result;
- } catch (Exception exx) {
- LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());
- if (exx instanceof TimeoutException) {
- throw (TimeoutException) exx;
- } else {
- throw new RuntimeException(exx);
- }
- }
- }
- ...
- }
复制代码
15.状态操作组件获取状态和State状态类继承体系
(1)状态操作组件StateInstruction获取状态
(2)State状态类的继承体系
(1)状态操作组件StateInstruction获取状态
在状态机引擎ProcessCtrlStateMachineEngine的startInternal()方法中,会通过流程上下文ProcessContext来获取状态操作组件StateInstruction。
根据状态操作组件StateInstruction,可以获取状态机的开始State(状态)、也可以获取指定的某个State(状态)。- public class ProcessCtrlStateMachineEngine implements StateMachineEngine {
- ...
- //启动状态机实例StateMachineInstance
- //@param stateMachineName 状态机名称
- //@param tenantId 租户ID
- //@param businessKey 业务key
- //@param startParams 状态机实例的启动参数
- //@param async 是否异步化运行
- //@param callback 异步化运行时的回调接口
- private StateMachineInstance startInternal(String stateMachineName, String tenantId, String businessKey, Map<String, Object> startParams, boolean async, AsyncCallback callback) throws EngineExecutionException {
- try {
- //如果指定需要异步运行,但是状态机配置里是不允许异步运行的,则会抛异常
- if (async && !stateMachineConfig.isEnableAsync()) {
- throw new EngineExecutionException("Asynchronous start is disabled. please set StateMachineConfig.enableAsync=true first.", FrameworkErrorCode.AsynchronousStartDisabled);
- }
- if (StringUtils.isEmpty(tenantId)) {
- tenantId = stateMachineConfig.getDefaultTenantId();
- }
- //创建状态机实例StateMachineInstance
- StateMachineInstance instance = createMachineInstance(stateMachineName, tenantId, businessKey, startParams);
- //创建一个流程上下文构造器ProcessContextBuilder实例,用来构造流程运行时的上下文
- ProcessContextBuilder contextBuilder = ProcessContextBuilder.create()
- .withProcessType(ProcessType.STATE_LANG)//设置流程类型
- .withOperationName(DomainConstants.OPERATION_NAME_START)//设置操作名称
- .withAsyncCallback(callback)//设置异步化时的回调接口
- .withInstruction(new StateInstruction(stateMachineName, tenantId))//设置状态获取组件
- .withStateMachineInstance(instance)//设置状态机实例
- .withStateMachineConfig(getStateMachineConfig())//设置状态机配置
- .withStateMachineEngine(this);//设置状态机引擎
- //上下文变量Map
- Map<String, Object> contextVariables;
- if (startParams != null) {
- contextVariables = new ConcurrentHashMap<>(startParams.size());
- nullSafeCopy(startParams, contextVariables);
- } else {
- contextVariables = new ConcurrentHashMap<>();
- }
- instance.setContext(contextVariables);
- //设置流程上下文构造器ProcessContextBuilder实例
- contextBuilder.withStateMachineContextVariables(contextVariables);
- contextBuilder.withIsAsyncExecution(async);
- //通过流程上下文构造器ProcessContextBuilder构建出一个流程上下文ProcessContext
- ProcessContext processContext = contextBuilder.build();
- //如果状态机定义StateMachine是支持持久化的 且 状态日志的存储组件不为null
- if (instance.getStateMachine().isPersist() && stateMachineConfig.getStateLogStore() != null) {
- //通过状态机实例日志和状态实例日志的存储组件StateLogStore,记录状态机实例StateMachineInstance的启动事件日志 + 开启全局事务
- //比如在DB中更新状态机实例StateMachineInstance的启动状态
- stateMachineConfig.getStateLogStore().recordStateMachineStarted(instance, processContext);
- }
- if (StringUtils.isEmpty(instance.getId())) {
- //生成状态机实例StateMachineInstance的序号
- instance.setId(stateMachineConfig.getSeqGenerator().generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
- }
-
- //从流程上下文中获取状态操作组件StateInstruction
- StateInstruction stateInstruction = processContext.getInstruction(StateInstruction.class);
- //获取循环策略
- Loop loop = LoopTaskUtils.getLoopConfig(processContext, stateInstruction.getState(processContext));
- if (null != loop) {
- stateInstruction.setTemporaryState(new LoopStartStateImpl());
- }
- if (async) {
- stateMachineConfig.getAsyncProcessCtrlEventPublisher().publish(processContext);
- } else {
- stateMachineConfig.getProcessCtrlEventPublisher().publish(processContext);
- }
- return instance;
- } finally {
- if (stateMachineConfig.getStateLogStore() != null) {
- stateMachineConfig.getStateLogStore().clearUp();
- }
- }
- }
- ...
- }
- //State Instruction,状态操作组件StateInstruction
- public class StateInstruction implements Instruction {
- private String stateName;
- private String stateMachineName;
- private String tenantId;
- private boolean end;
- //Temporary state node for running temporary nodes in the state machine
- private State temporaryState;
-
- public StateInstruction() {
- }
- public StateInstruction(String stateMachineName, String tenantId) {
- this.stateMachineName = stateMachineName;
- this.tenantId = tenantId;
- }
- //根据流程上下文获取状态,可以是状态机的开始State(状态)、也可以是指定的某个State(状态)
- public State getState(ProcessContext context) {
- if (getTemporaryState() != null) {
- return temporaryState;
- }
- String stateName = getStateName();
- String stateMachineName = getStateMachineName();
- String tenantId = getTenantId();
- if (StringUtils.isEmpty(stateMachineName)) {
- throw new EngineExecutionException("StateMachineName is required", FrameworkErrorCode.ParameterRequired);
- }
- StateMachineConfig stateMachineConfig = (StateMachineConfig)context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
- StateMachine stateMachine = stateMachineConfig.getStateMachineRepository().getStateMachine(stateMachineName, tenantId);
- if (stateMachine == null) {
- throw new EngineExecutionException("StateMachine[" + stateMachineName + "] is not exist", FrameworkErrorCode.ObjectNotExists);
- }
- //默认就是获取状态机定义StateMachine中的第一个状态State
- if (StringUtils.isEmpty(stateName)) {
- stateName = stateMachine.getStartState();
- setStateName(stateName);
- }
- //根据状态名字stateName,从状态机定义中获取对应的状态State
- State state = stateMachine.getStates().get(stateName);
- if (state == null) {
- throw new EngineExecutionException("State[" + stateName + "] is not exist", FrameworkErrorCode.ObjectNotExists);
- }
- return state;
- }
- ...
- }
复制代码 (2)State状态类的继承体系
[code]//A State in StateMachinepublic interface State { String getName(); String getComment(); String getType(); String getNext(); Map getExtensions(); StateMachine getStateMachine();}public abstract class BaseState implements State { private transient String name;//状态名称 private String type;//状态类型 private String comment;//状态备注 private String next;//下一个状态名称 private Map extensions;//状态扩展属性 private transient StateMachine stateMachine;//状态所属的状态机 ...}//The state of the execution task (abstract class), the specific task to be executed is determined by the subclasspublic abstract class AbstractTaskState extends BaseState implements TaskState { private String compensateState;//补偿状态 private boolean isForCompensation;//是否用于补偿 private boolean isForUpdate;//是否用于更新 private List retry;//重试列表 private List catches;//异常匹配列表 private List input;//状态输入列表 private Map output;//状态输出数据 private Map status;//Map private List inputExpressions;//输入表达式列表 private Map outputExpressions;//输出表达式数据 private boolean isPersist = true;//是否持久化 private Boolean retryPersistModeUpdate;//是否更新Saga重试持久化的模式 private Boolean compensatePersistModeUpdate;//是否更新Saga补偿持久化的模式 private Loop loop;//循环对象 ... public static class RetryImpl implements Retry { private List exceptions;//重试的过程中遇到了哪些异常 private List |