找回密码
 立即注册
首页 业界区 业界 Seata源码—7.Seata TCC模式的事务处理

Seata源码—7.Seata TCC模式的事务处理

匡菲 2025-6-3 00:02:07
大纲
1.Seata TCC分布式事务案例配置
2.Seata TCC案例服务提供者启动分析
3.@TwoPhaseBusinessAction注解扫描源码
4.Seata TCC案例分布式事务入口分析
5.TCC核心注解扫描与代理创建入口源码
6.TCC动态代理拦截器TccActionInterceptor
7.Action拦截处理器ActionInterceptorHandler
8.Seata TCC分布式事务的注册提交回滚处理源码
 
1.Seata TCC分布式事务案例配置
(1)位于seata-samples的tcc模块下的Demo工程
(2)Demo工程的配置文件
(3)Demo工程运行说明
 
(1)位于seata-samples的tcc模块下的Demo工程
dubbo-tcc-sample模块主要演示了TCC模式下分布式事务的提交和回滚。该Demo中一个分布式事务内会有两个TCC事务参与者,这两个TCC事务参与者分别是TccActionOne和TccActionTwo。分布式事务提交则两者均提交,分布式事务回滚则两者均回滚。
 
这两个TCC事务参与者均是Dubbo远程服务。一个应用作为服务提供方,会实现这两个TCC参与者,并将它们发布成Dubbo服务。另外一个应用作为事务发起方,会订阅Dubbo服务,然后调用编排TCC参与者,执行远程Dubbo服务。
 
TccActionOne接口定义如下:
  1. public interface TccActionOne {
  2.     @TwoPhaseBusinessAction(name = "DubboTccActionOne", commitMethod = "commit", rollbackMethod = "rollback")
  3.     public boolean prepare(BusinessActionContext actionContext,
  4.         @BusinessActionContextParameter(paramName = "a") int a);
  5.     public boolean commit(BusinessActionContext actionContext);
  6.     public boolean rollback(BusinessActionContext actionContext);
  7. }
复制代码
TccActionTwo接口定义如下:
  1. public interface TccActionTwo {
  2.     @TwoPhaseBusinessAction(name = "DubboTccActionTwo", commitMethod = "commit", rollbackMethod = "rollback")
  3.     public boolean prepare(BusinessActionContext actionContext,
  4.         @BusinessActionContextParameter(paramName = "b") String b,
  5.         @BusinessActionContextParameter(paramName = "c", index = 1) List list);
  6.     public boolean commit(BusinessActionContext actionContext);
  7.     public boolean rollback(BusinessActionContext actionContext);
  8. }
复制代码
(2)Demo工程的配置文件
一.seata-tcc.xml
  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.        xmlns="http://www.springframework.org/schema/beans"
  3.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
  4.        default-autowire="byName">
  5.    
  6.     <bean >
  7.         <constructor-arg value="tcc-sample"/>
  8.         <constructor-arg value="my_test_tx_group"/>
  9.     </bean>
  10.     <bean id="tccActionOneImpl" />
  11.     <bean id="tccActionTwoImpl" />
  12.     <bean id="tccTransactionService" />
  13. </beans>
复制代码
二.seata-dubbo-provider.xml
  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.        xmlns="http://www.springframework.org/schema/beans"
  3.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
  4.        default-autowire="byName">
  5.    
  6.     <bean >
  7.         <constructor-arg value="tcc-sample"/>
  8.         <constructor-arg value="my_test_tx_group"/>
  9.     </bean>
  10.     <bean id="tccActionOneImpl" />
  11.     <bean id="tccActionTwoImpl" />
  12.     <bean id="tccTransactionService" />
  13. </beans>        
复制代码
三.seata-dubbo-reference.xml
  1. <beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  2.        xmlns="http://www.springframework.org/schema/beans"
  3.        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
  4.        default-autowire="byName">
  5.    
  6.     <bean >
  7.         <constructor-arg value="tcc-sample"/>
  8.         <constructor-arg value="my_test_tx_group"/>
  9.     </bean>
  10.     <bean id="tccActionOneImpl" />
  11.     <bean id="tccActionTwoImpl" />
  12.     <bean id="tccTransactionService" />
  13. </beans>        
复制代码
(3)Demo工程运行指南
一.启动Seata Server
 
二.启动Dubbo服务应用
运行DubboTccProviderStarter。该应用会发布Dubbo服务,并且实现了两个TCC参与者。
  1. public class TccProviderStarter extends AbstractStarter {
  2.     public static void main(String[] args) throws Exception {
  3.         new TccProviderStarter().start0(args);
  4.     }
  5.    
  6.     @Override
  7.     protected void start0(String[] args) throws Exception {
  8.         ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
  9.             new String[]{"spring/seata-tcc.xml", "spring/seata-dubbo-provider.xml"}
  10.         );
  11.         new ApplicationKeeper().keep();
  12.     }
  13. }
  14. public class TccActionOneImpl implements TccActionOne {
  15.     @Override
  16.     public boolean prepare(BusinessActionContext actionContext, int a) {
  17.         String xid = actionContext.getXid();
  18.         System.out.println("TccActionOne prepare, xid:" + xid + ", a:" + a);
  19.         return true;
  20.     }
  21.    
  22.     @Override
  23.     public boolean commit(BusinessActionContext actionContext) {
  24.         String xid = actionContext.getXid();
  25.         System.out.println("TccActionOne commit, xid:" + xid + ", a:" + actionContext.getActionContext("a"));
  26.         ResultHolder.setActionOneResult(xid, "T");
  27.         return true;
  28.     }
  29.    
  30.     @Override
  31.     public boolean rollback(BusinessActionContext actionContext) {
  32.         String xid = actionContext.getXid();
  33.         System.out.println("TccActionOne rollback, xid:" + xid + ", a:" + actionContext.getActionContext("a"));
  34.         ResultHolder.setActionOneResult(xid, "R");
  35.         return true;
  36.     }
  37. }
  38. public class TccActionTwoImpl implements TccActionTwo {
  39.     @Override
  40.     public boolean prepare(BusinessActionContext actionContext, String b, List list) {
  41.         String xid = actionContext.getXid();
  42.         System.out.println("TccActionTwo prepare, xid:" + xid + ", b:" + b + ", c:" + list.get(1));
  43.         return true;
  44.     }
  45.    
  46.     @Override
  47.     public boolean commit(BusinessActionContext actionContext) {
  48.         String xid = actionContext.getXid();
  49.         System.out.println("TccActionTwo commit, xid:" + xid + ", b:" + actionContext.getActionContext("b") + ", c:" + actionContext.getActionContext("c"));
  50.         ResultHolder.setActionTwoResult(xid, "T");
  51.         return true;
  52.     }
  53.    
  54.     @Override
  55.     public boolean rollback(BusinessActionContext actionContext) {
  56.         String xid = actionContext.getXid();
  57.         System.out.println("TccActionTwo rollback, xid:" + xid + ", b:" + actionContext.getActionContext("b") + ", c:" + actionContext.getActionContext("c"));
  58.         ResultHolder.setActionTwoResult(xid, "R");
  59.         return true;
  60.     }
  61. }
复制代码
三.启动事务应用
运行TccConsumerStarter。该应用会订阅Dubbo服务,发起分布式事务,调用上述两个TCC参与者,内含TCC事务提交场景和TCC事务回滚场景的演示。
  1. public class TccConsumerStarter extends AbstractStarter {
  2.     static TccTransactionService tccTransactionService = null;
  3.    
  4.     public static void main(String[] args) throws Exception {
  5.         new TccConsumerStarter().start0(args);
  6.     }
  7.    
  8.     @Override
  9.     protected void start0(String[] args) throws Exception {
  10.         ClassPathXmlApplicationContext applicationContext = new ClassPathXmlApplicationContext(
  11.             new String[]{"spring/seata-tcc.xml", "spring/seata-dubbo-reference.xml"}
  12.         );
  13.         tccTransactionService = (TccTransactionService) applicationContext.getBean("tccTransactionService");
  14.         //分布式事务提交demo
  15.         transactionCommitDemo();
  16.         //分布式事务回滚demo
  17.         transactionRollbackDemo();
  18.     }
  19.    
  20.     private static void transactionCommitDemo() throws InterruptedException {
  21.         String txId = tccTransactionService.doTransactionCommit();
  22.         System.out.println(txId);
  23.         Assert.isTrue(StringUtils.isNotEmpty(txId), "事务开启失败");
  24.         System.out.println("transaction commit demo finish.");
  25.     }
  26.    
  27.     private static void transactionRollbackDemo() throws InterruptedException {
  28.         Map map = new HashMap(16);
  29.         try {
  30.             tccTransactionService.doTransactionRollback(map);
  31.             Assert.isTrue(false, "分布式事务未回滚");
  32.         } catch (Throwable t) {
  33.             Assert.isTrue(true, "分布式事务异常回滚");
  34.         }
  35.         String txId = (String) map.get("xid");
  36.         System.out.println(txId);
  37.         System.out.println("transaction rollback demo finish.");
  38.     }
  39. }
  40. public class TccTransactionService {
  41.     private TccActionOne tccActionOne;
  42.     private TccActionTwo tccActionTwo;
  43.     //提交分布式事务
  44.     @GlobalTransactional
  45.     public String doTransactionCommit() {
  46.         //第一个TCC事务参与者
  47.         boolean result = tccActionOne.prepare(null, 1);
  48.         if (!result) {
  49.             throw new RuntimeException("TccActionOne failed.");
  50.         }
  51.         List list = new ArrayList();
  52.         list.add("c1");
  53.         list.add("c2");
  54.         //第二个TCC事务参与者
  55.         result = tccActionTwo.prepare(null, "two", list);
  56.         if (!result) {
  57.             throw new RuntimeException("TccActionTwo failed.");
  58.         }
  59.         return RootContext.getXID();
  60.     }
  61.    
  62.     //回滚分布式事务
  63.     @GlobalTransactional
  64.     public String doTransactionRollback(Map map) {
  65.         //第一个TCC事务参与者
  66.         boolean result = tccActionOne.prepare(null, 1);
  67.         if (!result) {
  68.             throw new RuntimeException("TccActionOne failed.");
  69.         }
  70.         List list = new ArrayList();
  71.         list.add("c1");
  72.         list.add("c2");
  73.         //第二个TCC事务参与者
  74.         result = tccActionTwo.prepare(null, "two", list);
  75.         if (!result) {
  76.             throw new RuntimeException("TccActionTwo failed.");
  77.         }
  78.         map.put("xid", RootContext.getXID());
  79.         throw new RuntimeException("transacton rollback");
  80.     }
  81.     public void setTccActionOne(TccActionOne tccActionOne) {
  82.         this.tccActionOne = tccActionOne;
  83.     }
  84.    
  85.     public void setTccActionTwo(TccActionTwo tccActionTwo) {
  86.         this.tccActionTwo = tccActionTwo;
  87.     }
  88. }
复制代码
 
2.Seata TCC案例服务提供者启动分析
添加了@TwoPhaseBusinessAction注解的接口发布成Dubbo服务:
1.png
 
3.@TwoPhaseBusinessAction注解扫描源码
(1)全局事务注解扫描器的wrapIfNecessary()方法扫描Spring Bean
(2)TCCBeanParserUtils的isTccAutoProxy()方法判断是否要创建TCC动态代理
 
(1)全局事务注解扫描器的wrapIfNecessary()方法扫描Spring Bean
全局事务注解扫描器GlobalTransactionScanner会在调用initClient()方法初始化Seata Client客户端后,通过wrapIfNecessary()方法扫描Spring Bean中含有@TwoPhaseBusinessAction注解的方法。
2.png
  1. //AbstractAutoProxyCreator:Spring的动态代理自动创建者
  2. //ConfigurationChangeListener:关注配置变更事件的监听器
  3. //InitializingBean:Spring Bean初始化回调
  4. //ApplicationContextAware:用来感知Spring容器
  5. //DisposableBean:支持可抛弃Bean
  6. public class GlobalTransactionScanner extends AbstractAutoProxyCreator
  7.         implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {
  8.     ...
  9.    
  10.     //InitializingBean接口的回调方法
  11.     //Spring容器启动和初始化完毕后,会调用如下的afterPropertiesSet()方法进行回调
  12.     @Override
  13.     public void afterPropertiesSet() {
  14.         //是否禁用了全局事务,默认是false
  15.         if (disableGlobalTransaction) {
  16.             if (LOGGER.isInfoEnabled()) {
  17.                 LOGGER.info("Global transaction is disabled.");
  18.             }
  19.             ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this);
  20.             return;
  21.         }
  22.         //通过CAS操作确保initClient()初始化动作仅仅执行一次
  23.         if (initialized.compareAndSet(false, true)) {
  24.             //initClient()方法会对Seata Client进行初始化,比如和Seata Server建立长连接
  25.             //seata-samples的tcc模块的seata-tcc.xml配置文件里都配置了GlobalTransactionScanner这个Bean
  26.             //而GlobalTransactionScanner这个Bean伴随着Spring容器的初始化完毕,都会回调其初始化逻辑initClient()
  27.             initClient();
  28.         }
  29.     }
  30.     //initClient()是核心方法,负责对Seata Client客户端进行初始化
  31.     private void initClient() {
  32.         if (LOGGER.isInfoEnabled()) {
  33.             LOGGER.info("Initializing Global Transaction Clients ... ");
  34.         }
  35.         if (DEFAULT_TX_GROUP_OLD.equals(txServiceGroup)) {
  36.             LOGGER.warn("...", DEFAULT_TX_GROUP_OLD, DEFAULT_TX_GROUP);
  37.         }
  38.         if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) {
  39.             throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup));
  40.         }
  41.         //对于Seata Client来说,最重要的组件有两个:
  42.         //一个是TM,即Transaction Manager,用来管理全局事务
  43.         //一个是RM,即Resource Manager,用来管理各分支事务的数据源
  44.         //init TM
  45.         //TMClient.init()会对客户端的TM全局事务管理器进行初始化
  46.         TMClient.init(applicationId, txServiceGroup, accessKey, secretKey);
  47.         if (LOGGER.isInfoEnabled()) {
  48.             LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
  49.         }
  50.         //init RM
  51.         //RMClient.init()会对客户端的RM分支事务资源管理器进行初始化
  52.         RMClient.init(applicationId, txServiceGroup);
  53.         if (LOGGER.isInfoEnabled()) {
  54.             LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup);
  55.         }
  56.         if (LOGGER.isInfoEnabled()) {
  57.             LOGGER.info("Global Transaction Clients are initialized. ");
  58.         }
  59.         //注册Spring容器被销毁时的回调钩子,释放TM和RM两个组件的一些资源
  60.         registerSpringShutdownHook();
  61.     }
  62.    
  63.     //The following will be scanned, and added corresponding interceptor:
  64.     //添加了如下注解的方法会被扫描到,然后方法会添加相应的拦截器进行拦截
  65.    
  66.     //TM:
  67.     //@see io.seata.spring.annotation.GlobalTransactional // TM annotation
  68.     //Corresponding interceptor:
  69.     //@see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler
  70.    
  71.     //GlobalLock:
  72.     //@see io.seata.spring.annotation.GlobalLock // GlobalLock annotation
  73.     //Corresponding interceptor:
  74.     //@see io.seata.spring.annotation.GlobalTransactionalInterceptor# handleGlobalLock(MethodInvocation, GlobalLock)  // GlobalLock handler
  75.    
  76.     //TCC mode:
  77.     //@see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface
  78.     //@see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method
  79.     //@see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser
  80.     //Corresponding interceptor:
  81.     //@see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode
  82.    
  83.     @Override
  84.     //由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,
  85.     //所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;
  86.     //让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,
  87.     //从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;
  88.     protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
  89.         //do checkers
  90.         if (!doCheckers(bean, beanName)) {
  91.             return bean;
  92.         }
  93.         try {
  94.             synchronized (PROXYED_SET) {
  95.                 if (PROXYED_SET.contains(beanName)) {
  96.                     return bean;
  97.                 }
  98.                 interceptor = null;
  99.                 //check TCC proxy
  100.                 //判断传递进来的Bean是否是TCC动态代理
  101.                 //服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理
  102.                 if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
  103.                     //init tcc fence clean task if enable useTccFence
  104.                     TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
  105.                     //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
  106.                     interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
  107.                     ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);
  108.                 } else {
  109.                     //获取目标class的接口
  110.                     Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
  111.                     Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
  112.                     //existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解
  113.                     if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {
  114.                         return bean;
  115.                     }
  116.                     if (globalTransactionalInterceptor == null) {
  117.                         //构建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器
  118.                         globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
  119.                         ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);
  120.                     }
  121.                     interceptor = globalTransactionalInterceptor;
  122.                 }
  123.                 LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
  124.                 if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理
  125.                     //接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理
  126.                     //这样后续调用到目标Bean的方法,就会调用到TccActionInterceptor拦截器
  127.                     bean = super.wrapIfNecessary(bean, beanName, cacheKey);
  128.                 } else {
  129.                     AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
  130.                     Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
  131.                     int pos;
  132.                     for (Advisor avr : advisor) {
  133.                         //Find the position based on the advisor's order, and add to advisors by pos
  134.                         pos = findAddSeataAdvisorPosition(advised, avr);
  135.                         advised.addAdvisor(pos, avr);
  136.                     }
  137.                 }
  138.                 PROXYED_SET.add(beanName);
  139.                 return bean;
  140.             }
  141.         } catch (Exception exx) {
  142.             throw new RuntimeException(exx);
  143.         }
  144.     }
  145.     ...
  146. }
复制代码
(2)TCCBeanParserUtils的isTccAutoProxy()方法判断是否要创建TCC动态代理
  1. public class TCCBeanParserUtils {
  2.     private TCCBeanParserUtils() {
  3.     }
  4.     //is auto proxy TCC bean
  5.     public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {
  6.         boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);
  7.         //get RemotingBean description
  8.         RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
  9.         //is remoting bean
  10.         if (isRemotingBean) {
  11.             if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {
  12.                 //LocalTCC
  13.                 //创建一个local tcc代理
  14.                 return isTccProxyTargetBean(remotingDesc);
  15.             } else {
  16.                 //sofa:reference / dubbo:reference, factory bean
  17.                 return false;
  18.             }
  19.         } else {
  20.             if (remotingDesc == null) {
  21.                 //check FactoryBean
  22.                 if (isRemotingFactoryBean(bean, beanName, applicationContext)) {
  23.                     remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
  24.                     return isTccProxyTargetBean(remotingDesc);
  25.                 } else {
  26.                     return false;
  27.                 }
  28.             } else {
  29.                 return isTccProxyTargetBean(remotingDesc);
  30.             }
  31.         }
  32.     }
  33.     ...
  34.    
  35.     //is TCC proxy-bean/target-bean: LocalTCC , the proxy bean of sofa:reference/dubbo:reference
  36.     public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {
  37.         if (remotingDesc == null) {
  38.             return false;
  39.         }
  40.         //check if it is TCC bean
  41.         boolean isTccClazz = false;
  42.         //针对我们的class拿到一个接口class
  43.         Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();
  44.         //获取我们的接口里定义的所有的方法
  45.         Method[] methods = tccInterfaceClazz.getMethods();
  46.         TwoPhaseBusinessAction twoPhaseBusinessAction;
  47.         //遍历所有的方法
  48.         for (Method method : methods) {
  49.             //获取的方法是否加了@TwoPhaseBusinessAction注解
  50.             twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
  51.             if (twoPhaseBusinessAction != null) {
  52.                 isTccClazz = true;
  53.                 break;
  54.             }
  55.         }
  56.         if (!isTccClazz) {
  57.             return false;
  58.         }
  59.         short protocols = remotingDesc.getProtocol();
  60.         //LocalTCC
  61.         if (Protocols.IN_JVM == protocols) {
  62.             //in jvm TCC bean , AOP
  63.             return true;
  64.         }
  65.         //sofa:reference /  dubbo:reference, AOP
  66.         return remotingDesc.isReference();
  67.     }
  68.     ...
  69. }
复制代码
 
4.Seata TCC案例分布式事务入口分析
TccTransactionService作为分布式事务的入口,其提交事务和回滚事务的接口都会被添加上@GlobalTransactional注解。
 
所以应用启动时,TccTransactionService的Bean就会被GlobalTransactionScanner扫描,然后其下添加了@GlobalTransactional注解的接口就会被创建动态代理。
 
在TccTransactionService的提交分布式事务的接口中,会先后调用TccActionOne和TccActionTwo两个Dubbo服务。并且在调用两个Dubbo服务时,会通过ApacheDubboTransactionPropagationFilter传递xid。
3.png
  1. public class TccTransactionService {
  2.     private TccActionOne tccActionOne;
  3.     private TccActionTwo tccActionTwo;
  4.     //提交分布式事务
  5.     @GlobalTransactional
  6.     public String doTransactionCommit() {
  7.         //第一个TCC事务参与者
  8.         boolean result = tccActionOne.prepare(null, 1);
  9.         if (!result) {
  10.             throw new RuntimeException("TccActionOne failed.");
  11.         }
  12.         List list = new ArrayList();
  13.         list.add("c1");
  14.         list.add("c2");
  15.         //第二个TCC事务参与者
  16.         result = tccActionTwo.prepare(null, "two", list);
  17.         if (!result) {
  18.             throw new RuntimeException("TccActionTwo failed.");
  19.         }
  20.         return RootContext.getXID();
  21.     }
  22.     //回滚分布式事务
  23.     @GlobalTransactional
  24.     public String doTransactionRollback(Map map) {
  25.         //第一个TCC事务参与者
  26.         boolean result = tccActionOne.prepare(null, 1);
  27.         if (!result) {
  28.             throw new RuntimeException("TccActionOne failed.");
  29.         }
  30.         List list = new ArrayList();
  31.         list.add("c1");
  32.         list.add("c2");
  33.         //第二个TCC事务参与者
  34.         result = tccActionTwo.prepare(null, "two", list);
  35.         if (!result) {
  36.             throw new RuntimeException("TccActionTwo failed.");
  37.         }
  38.         map.put("xid", RootContext.getXID());
  39.         throw new RuntimeException("transacton rollback");
  40.     }
  41.     public void setTccActionOne(TccActionOne tccActionOne) {
  42.         this.tccActionOne = tccActionOne;
  43.     }
  44.    
  45.     public void setTccActionTwo(TccActionTwo tccActionTwo) {
  46.         this.tccActionTwo = tccActionTwo;
  47.     }
  48. }
复制代码
 
5.TCC核心注解扫描与代理创建入口源码
GlobalTransactionScanner的wrapIfNecessary()方法会扫描Spring Bean。TCCBeanParserUtils的isTccAutoProxy()方法会通过判断扫描的Spring Bean中的方法是否添加了TCC的注解,来决定是否要对Bean的方法进行TCC动态代理。
 
注意,其中TCC的注解有两个:@LocalTCC、@TwoPhaseBusinessAction
4.png
  1. public class GlobalTransactionScanner extends AbstractAutoProxyCreator        implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {    ...    @Override    //由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,    //所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;    //让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,    //从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {        //do checkers        if (!doCheckers(bean, beanName)) {            return bean;        }        try {            synchronized (PROXYED_SET) {                if (PROXYED_SET.contains(beanName)) {                    return bean;                }                interceptor = null;                //check TCC proxy                //判断传递进来的Bean是否是TCC动态代理                //服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {                    //init tcc fence clean task if enable useTccFence                    TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);                } else {                    //获取目标class的接口                    Class serviceInterface = SpringProxyUtils.findTargetClass(bean);                    Class[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);                    //existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解                    if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {                        return bean;                    }                    if (globalTransactionalInterceptor == null) {                        //构建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器                        globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);                        ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);                    }                    interceptor = globalTransactionalInterceptor;                }                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());                if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理                    //接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理                    //这样后续调用到目标Bean的方法,就会调用到TccActionInterceptor拦截器                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);                } else {                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));                    int pos;                    for (Advisor avr : advisor) {                        //Find the position based on the advisor's order, and add to advisors by pos                        pos = findAddSeataAdvisorPosition(advised, avr);                        advised.addAdvisor(pos, avr);                    }                }                PROXYED_SET.add(beanName);                return bean;            }        } catch (Exception exx) {            throw new RuntimeException(exx);        }    }    ...}public class TCCBeanParserUtils {
  2.     private TCCBeanParserUtils() {
  3.     }
  4.     //is auto proxy TCC bean
  5.     public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {
  6.         boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);
  7.         //get RemotingBean description
  8.         RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
  9.         //is remoting bean
  10.         if (isRemotingBean) {
  11.             if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {
  12.                 //LocalTCC
  13.                 //创建一个local tcc代理
  14.                 return isTccProxyTargetBean(remotingDesc);
  15.             } else {
  16.                 //sofa:reference / dubbo:reference, factory bean
  17.                 return false;
  18.             }
  19.         } else {
  20.             if (remotingDesc == null) {
  21.                 //check FactoryBean
  22.                 if (isRemotingFactoryBean(bean, beanName, applicationContext)) {
  23.                     remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
  24.                     return isTccProxyTargetBean(remotingDesc);
  25.                 } else {
  26.                     return false;
  27.                 }
  28.             } else {
  29.                 return isTccProxyTargetBean(remotingDesc);
  30.             }
  31.         }
  32.     }
  33.     ...
  34.    
  35.     //is TCC proxy-bean/target-bean: LocalTCC , the proxy bean of sofa:reference/dubbo:reference
  36.     public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {
  37.         if (remotingDesc == null) {
  38.             return false;
  39.         }
  40.         //check if it is TCC bean
  41.         boolean isTccClazz = false;
  42.         //针对我们的class拿到一个接口class
  43.         Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();
  44.         //获取我们的接口里定义的所有的方法
  45.         Method[] methods = tccInterfaceClazz.getMethods();
  46.         TwoPhaseBusinessAction twoPhaseBusinessAction;
  47.         //遍历所有的方法
  48.         for (Method method : methods) {
  49.             //获取的方法是否加了@TwoPhaseBusinessAction注解
  50.             twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
  51.             if (twoPhaseBusinessAction != null) {
  52.                 isTccClazz = true;
  53.                 break;
  54.             }
  55.         }
  56.         if (!isTccClazz) {
  57.             return false;
  58.         }
  59.         short protocols = remotingDesc.getProtocol();
  60.         //LocalTCC
  61.         if (Protocols.IN_JVM == protocols) {
  62.             //in jvm TCC bean , AOP
  63.             return true;
  64.         }
  65.         //sofa:reference /  dubbo:reference, AOP
  66.         return remotingDesc.isReference();
  67.     }
  68.     ...
  69. }
复制代码
 
6.TCC动态代理拦截器TccActionInterceptor
如果调用添加了TCC的注解的方法,就会执行TccActionInterceptor的invoke()方法,此外只有分支事务的方法才会有可能被TCC动态代理。
 
在TccActionInterceptor的invoke()方法中,会通过ActionInterceptorHandler的proceed()方法来执行具体拦截逻辑。
  1. public class TccActionInterceptor implements MethodInterceptor, ConfigurationChangeListener, Ordered {
  2.     //Action拦截处理器
  3.     private ActionInterceptorHandler actionInterceptorHandler = new ActionInterceptorHandler();
  4.     private volatile boolean disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);
  5.     ...
  6.    
  7.     //如果调用添加了TCC的注解的方法,就会执行如下invoke()方法
  8.     @Override
  9.     public Object invoke(final MethodInvocation invocation) throws Throwable {
  10.         //当前必须是在全局事务里,也就是说分支事务的方法才会有可能被TCC动态代理
  11.         if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {
  12.             //not in transaction, or this interceptor is disabled
  13.             return invocation.proceed();
  14.         }
  15.         //本次调用的是哪个方法,在class接口里找到这个方法
  16.         Method method = getActionInterfaceMethod(invocation);
  17.         //然后才能找到接口里定义的那个方法上面加的一个注解
  18.         TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
  19.         //try method
  20.         if (businessAction != null) {
  21.             //save the xid
  22.             String xid = RootContext.getXID();
  23.             //save the previous branchType
  24.             BranchType previousBranchType = RootContext.getBranchType();
  25.             //if not TCC, bind TCC branchType
  26.             if (BranchType.TCC != previousBranchType) {
  27.                 RootContext.bindBranchType(BranchType.TCC);
  28.             }
  29.             try {
  30.                 //Handler the TCC Aspect, and return the business result
  31.                 //传入actionInterceptorHandler的参数分别是:方法、调用方法传递进来的参数、全局事务xid、分支事务注解、目标方法执行
  32.                 return actionInterceptorHandler.proceed(method, invocation.getArguments(), xid, businessAction, invocation::proceed);
  33.             } finally {
  34.                 //if not TCC, unbind branchType
  35.                 if (BranchType.TCC != previousBranchType) {
  36.                     RootContext.unbindBranchType();
  37.                 }
  38.                 //MDC remove branchId
  39.                 MDC.remove(RootContext.MDC_KEY_BRANCH_ID);
  40.             }
  41.         }
  42.         //not TCC try method
  43.         return invocation.proceed();
  44.     }
  45.    
  46.     //get the method from interface
  47.     protected Method getActionInterfaceMethod(MethodInvocation invocation) {
  48.         Class<?> interfaceType = null;
  49.         try {
  50.             if (remotingDesc == null) {
  51.                 interfaceType = getProxyInterface(invocation.getThis());
  52.             } else {
  53.                 interfaceType = remotingDesc.getInterfaceClass();
  54.             }
  55.             if (interfaceType == null && remotingDesc != null && remotingDesc.getInterfaceClassName() != null) {
  56.                 interfaceType = Class.forName(remotingDesc.getInterfaceClassName(), true, Thread.currentThread().getContextClassLoader());
  57.             }
  58.             if (interfaceType == null) {
  59.                 return invocation.getMethod();
  60.             }
  61.             return interfaceType.getMethod(invocation.getMethod().getName(), invocation.getMethod().getParameterTypes());
  62.         } catch (NoSuchMethodException e) {
  63.             if (interfaceType != null && !"toString".equals(invocation.getMethod().getName())) {
  64.                 LOGGER.warn("no such method '{}' from interface {}", invocation.getMethod().getName(), interfaceType.getName());
  65.             }
  66.             return invocation.getMethod();
  67.         } catch (Exception e) {
  68.             LOGGER.warn("get Method from interface failed", e);
  69.             return invocation.getMethod();
  70.         }
  71.     }
  72.     //get the interface of proxy
  73.     @Nullable
  74.     protected Class<?> getProxyInterface(Object proxyBean) throws Exception {
  75.         if (DubboUtil.isDubboProxyName(proxyBean.getClass().getName())) {
  76.             //dubbo javaassist proxy
  77.             return DubboUtil.getAssistInterface(proxyBean);
  78.         } else {
  79.             //jdk/cglib proxy
  80.             return SpringProxyUtils.getTargetInterface(proxyBean);
  81.         }
  82.     }
  83.     ...
  84. }
复制代码
 
7.Action拦截处理器ActionInterceptorHandler
Action拦截处理器ActionInterceptorHandler的主要工作是:设置业务动作上下文 + 注册分支事务 + 执行目标方法。
5.png
注意:设置业务动作上下文时,会判断执行方法入参中是否有业务动作上下文。注册分支事务前,会从执行方法入参中提取数据设置到业务动作上下文。
  1. //Handler the TCC Participant Aspect : Setting Context, Creating Branch Record
  2. public class ActionInterceptorHandler {
  3.     ...
  4.     public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction, Callback<Object> targetCallback) throws Throwable {
  5.         //1.设置业务动作上下文
  6.         //Get action context from arguments, or create a new one and then reset to arguments
  7.         BusinessActionContext actionContext = getOrCreateActionContextAndResetToArguments(method.getParameterTypes(), arguments);
  8.         //Set the xid
  9.         actionContext.setXid(xid);
  10.         //Set the action name,我们自己定义的tcc业务动作名称
  11.         String actionName = businessAction.name();
  12.         actionContext.setActionName(actionName);
  13.         //Set the delay report,延迟report上报
  14.         actionContext.setDelayReport(businessAction.isDelayReport());
  15.         //Creating Branch Record
  16.         //2.发起分支事务的注册,注册成功才会获取到一个branchId
  17.         String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
  18.         actionContext.setBranchId(branchId);
  19.         //MDC put branchId
  20.         MDC.put(RootContext.MDC_KEY_BRANCH_ID, branchId);
  21.         //save the previous action context
  22.         BusinessActionContext previousActionContext = BusinessActionContextUtil.getContext();
  23.         try {
  24.             //share actionContext implicitly
  25.             BusinessActionContextUtil.setContext(actionContext);
  26.             if (businessAction.useTCCFence()) {
  27.                 try {
  28.                     //Use TCC Fence, and return the business result
  29.                     return TCCFenceHandler.prepareFence(xid, Long.valueOf(branchId), actionName, targetCallback);
  30.                 } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
  31.                     Throwable originException = e.getCause();
  32.                     if (originException instanceof FrameworkException) {
  33.                         LOGGER.error("[{}] prepare TCC fence error: {}", xid, originException.getMessage());
  34.                     }
  35.                     throw originException;
  36.                 }
  37.             } else {
  38.                 //Execute business, and return the business result
  39.                 //3.执行目标方法
  40.                 return targetCallback.execute();
  41.             }
  42.         } finally {
  43.             try {
  44.                 //to report business action context finally if the actionContext.getUpdated() is true
  45.                 BusinessActionContextUtil.reportContext(actionContext);
  46.             } finally {
  47.                 if (previousActionContext != null) {
  48.                     //recovery the previous action context
  49.                     BusinessActionContextUtil.setContext(previousActionContext);
  50.                 } else {
  51.                     //clear the action context
  52.                     BusinessActionContextUtil.clear();
  53.                 }
  54.             }
  55.         }
  56.     }
  57.     ...
  58.    
  59.     //Get or create action context, and reset to arguments
  60.     @Nonnull
  61.     protected BusinessActionContext getOrCreateActionContextAndResetToArguments(Class<?>[] parameterTypes, Object[] arguments) {
  62.         BusinessActionContext actionContext = null;
  63.         //get the action context from arguments
  64.         int argIndex = 0;
  65.         //遍历方法调用时传入的参数类型
  66.         for (Class<?> parameterType : parameterTypes) {
  67.             //如果某个参数类型是BusinessActionContext,因为prepare方法是可以接收一个BusinessActionContext类型的入参的
  68.             if (BusinessActionContext.class.isAssignableFrom(parameterType)) {
  69.                 //尝试获取这个位置的参数对象,但基本是空的
  70.                 actionContext = (BusinessActionContext) arguments[argIndex];
  71.                 if (actionContext == null) {
  72.                     //If the action context exists in arguments but is null, create a new one and reset the action context to the arguments
  73.                     //创建一个BusinessActionContext对象,把一个空的上下文传递到方法入参里
  74.                     actionContext = new BusinessActionContext();
  75.                     arguments[argIndex] = actionContext;
  76.                 } else {
  77.                     //Reset the updated, avoid unnecessary reporting
  78.                     actionContext.setUpdated(null);
  79.                 }
  80.                 break;
  81.             }
  82.             argIndex++;
  83.         }
  84.         //if null, create a new one
  85.         if (actionContext == null) {
  86.             actionContext = new BusinessActionContext();
  87.         }
  88.         return actionContext;
  89.     }
  90.     ...
  91.    
  92.     //Creating Branch Record
  93.     protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction, BusinessActionContext actionContext) {
  94.         String actionName = actionContext.getActionName();
  95.         String xid = actionContext.getXid();
  96.         //region fetch context and init action context
  97.         //从方法入参里提取出来一些数据放入到上下文里去
  98.         Map<String, Object> context = fetchActionRequestContext(method, arguments);
  99.         context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());
  100.         //Init business context
  101.         initBusinessContext(context, method, businessAction);
  102.         //Init running environment context
  103.         initFrameworkContext(context);
  104.         Map<String, Object> originContext = actionContext.getActionContext();
  105.         if (CollectionUtils.isNotEmpty(originContext)) {
  106.             //Merge context and origin context if it exists.
  107.             //@since: above 1.4.2
  108.             originContext.putAll(context);
  109.             context = originContext;
  110.         } else {
  111.             actionContext.setActionContext(context);
  112.         }
  113.         //endregion
  114.         //Init applicationData
  115.         Map<String, Object> applicationContext = Collections.singletonMap(Constants.TCC_ACTION_CONTEXT, context);
  116.         String applicationContextStr = JSON.toJSONString(applicationContext);
  117.         try {
  118.             //registry branch record
  119.             //分支事务注册
  120.             Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid, applicationContextStr, null);
  121.             return String.valueOf(branchId);
  122.         } catch (Throwable t) {
  123.             String msg = String.format("TCC branch Register error, xid: %s", xid);
  124.             LOGGER.error(msg, t);
  125.             throw new FrameworkException(t, msg);
  126.         }
  127.     }
  128.     ...
  129.    
  130.     //Extracting context data from parameters, add them to the context
  131.     protected Map<String, Object> fetchActionRequestContext(Method method, Object[] arguments) {
  132.         Map<String, Object> context = new HashMap<>(8);
  133.         Annotation[][] parameterAnnotations = method.getParameterAnnotations();
  134.         //方法的入参也是可以添加注解的
  135.         for (int i = 0; i < parameterAnnotations.length; i++) {
  136.             for (int j = 0; j < parameterAnnotations[i].length; j++) {
  137.                 //如果某个入参添加了一个@BusinessActionContextParameter注解
  138.                 if (parameterAnnotations[i][j] instanceof BusinessActionContextParameter) {
  139.                     //get annotation
  140.                     BusinessActionContextParameter annotation = (BusinessActionContextParameter) parameterAnnotations[i][j];
  141.                     if (arguments[i] == null) {
  142.                         throw new IllegalArgumentException("@BusinessActionContextParameter 's params can not null");
  143.                     }
  144.                     //get param
  145.                     Object paramObject = arguments[i];
  146.                     if (paramObject == null) {
  147.                         continue;
  148.                     }
  149.                     //load param by the config of annotation, and then put into the context
  150.                     //根据注解的配置提取入参的名称和值,把这个名称和值放入到BusinessActionContext里
  151.                     ActionContextUtil.loadParamByAnnotationAndPutToContext(ParamType.PARAM, "", paramObject, annotation, context);
  152.                 }
  153.             }
  154.         }
  155.         return context;
  156.     }
  157. }
复制代码
 
8.Seata TCC分布式事务的注册提交回滚处理源码
(1)TCC分支事务的注册
(2)TCC分支事务的提交
(3)TCC分支事务的回滚
6.png
从TCC的分支事务注册提交回滚过程可知:TCC和AT是可以混合使用的。
 
(1)TCC分支事务的注册
Action拦截处理器在注册TCC的分支事务时,会调用DefaultResourceManager的branchRegister()方法。
  1. public class DefaultResourceManager implements ResourceManager {
  2.     //all resource managers
  3.     protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();
  4.    
  5.     private static class SingletonHolder {
  6.         private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
  7.     }
  8.    
  9.     public static DefaultResourceManager get() {
  10.         return SingletonHolder.INSTANCE;
  11.     }
  12.    
  13.     private DefaultResourceManager() {
  14.         initResourceManagers();
  15.     }
  16.    
  17.     protected void initResourceManagers() {
  18.         //init all resource managers
  19.         //通过SPI加载所有的ResourceManager资源管理器
  20.         //比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXA
  21.         List<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);
  22.         if (CollectionUtils.isNotEmpty(allResourceManagers)) {
  23.             for (ResourceManager rm : allResourceManagers) {
  24.                 resourceManagers.put(rm.getBranchType(), rm);
  25.             }
  26.         }
  27.     }
  28.     ...
  29.    
  30.     //注册分支事务
  31.     @Override
  32.     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
  33.         return getResourceManager(branchType).branchRegister(branchType, resourceId, clientId, xid, applicationData, lockKeys);
  34.     }
  35.    
  36.     public ResourceManager getResourceManager(BranchType branchType) {
  37.         ResourceManager rm = resourceManagers.get(branchType);
  38.         if (rm == null) {
  39.             throw new FrameworkException("No ResourceManager for BranchType:" + branchType.name());
  40.         }
  41.         return rm;
  42.     }
  43.     ...
  44. }
  45. public class TCCResourceManager extends AbstractResourceManager {
  46.     ...
  47.     ...
  48. }
  49. public abstract class AbstractResourceManager implements ResourceManager {
  50.     //registry branch record
  51.     @Override
  52.     public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String applicationData, String lockKeys) throws TransactionException {
  53.         try {
  54.             BranchRegisterRequest request = new BranchRegisterRequest();
  55.             request.setXid(xid);//xid是全局事务id
  56.             request.setLockKey(lockKeys);//这次分支事务要更新数据全局锁keys
  57.             request.setResourceId(resourceId);//分支事务对应的资源id
  58.             request.setBranchType(branchType);//分支事务类型
  59.             request.setApplicationData(applicationData);//应用数据
  60.             BranchRegisterResponse response = (BranchRegisterResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(request);
  61.             if (response.getResultCode() == ResultCode.Failed) {
  62.                 throw new RmTransactionException(response.getTransactionExceptionCode(), String.format("Response[ %s ]", response.getMsg()));
  63.             }
  64.             return response.getBranchId();
  65.         } catch (TimeoutException toe) {
  66.             throw new RmTransactionException(TransactionExceptionCode.IO, "RPC Timeout", toe);
  67.         } catch (RuntimeException rex) {
  68.             throw new RmTransactionException(TransactionExceptionCode.BranchRegisterFailed, "Runtime", rex);
  69.         }
  70.     }
  71.     ...
  72. }
复制代码
(2)TCC分支事务的提交
  1. public class TCCResourceManager extends AbstractResourceManager {
  2.     ...
  3.     //TCC branch commit,分支事务的提交
  4.     @Override
  5.     public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
  6.         TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
  7.         if (tccResource == null) {
  8.             throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
  9.         }
  10.         //获取到目标的Bean
  11.         Object targetTCCBean = tccResource.getTargetBean();
  12.         //获取到目标Bean的commit方法
  13.         Method commitMethod = tccResource.getCommitMethod();
  14.         if (targetTCCBean == null || commitMethod == null) {
  15.             throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
  16.         }
  17.         try {
  18.             //BusinessActionContext
  19.             BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData);
  20.             Object[] args = this.getTwoPhaseCommitArgs(tccResource, businessActionContext);
  21.             Object ret;
  22.             boolean result;
  23.             //add idempotent and anti hanging
  24.             if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
  25.                 try {
  26.                     result = TCCFenceHandler.commitFence(commitMethod, targetTCCBean, xid, branchId, args);
  27.                 } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
  28.                     throw e.getCause();
  29.                 }
  30.             } else {
  31.                 ret = commitMethod.invoke(targetTCCBean, args);
  32.                 if (ret != null) {
  33.                     if (ret instanceof TwoPhaseResult) {
  34.                         result = ((TwoPhaseResult) ret).isSuccess();
  35.                     } else {
  36.                         result = (boolean) ret;
  37.                     }
  38.                 } else {
  39.                     result = true;
  40.                 }
  41.             }
  42.             LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
  43.             return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
  44.         } catch (Throwable t) {
  45.             String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
  46.             LOGGER.error(msg, t);
  47.             return BranchStatus.PhaseTwo_CommitFailed_Retryable;
  48.         }
  49.     }
  50.     ...
  51. }
复制代码
(3)TCC分支事务的回滚
  1. public class TCCResourceManager extends AbstractResourceManager {
  2.     ...
  3.     //TCC branch rollback,分支事务的回滚
  4.     @Override
  5.     public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {
  6.         TCCResource tccResource = (TCCResource) tccResourceCache.get(resourceId);
  7.         if (tccResource == null) {
  8.             throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
  9.         }
  10.         Object targetTCCBean = tccResource.getTargetBean();
  11.         Method rollbackMethod = tccResource.getRollbackMethod();
  12.         if (targetTCCBean == null || rollbackMethod == null) {
  13.             throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
  14.         }
  15.         try {
  16.             //BusinessActionContext
  17.             BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId, applicationData);
  18.             Object[] args = this.getTwoPhaseRollbackArgs(tccResource, businessActionContext);
  19.             Object ret;
  20.             boolean result;
  21.             //add idempotent and anti hanging
  22.             if (Boolean.TRUE.equals(businessActionContext.getActionContext(Constants.USE_TCC_FENCE))) {
  23.                 try {
  24.                     result = TCCFenceHandler.rollbackFence(rollbackMethod, targetTCCBean, xid, branchId, args, tccResource.getActionName());
  25.                 } catch (SkipCallbackWrapperException | UndeclaredThrowableException e) {
  26.                     throw e.getCause();
  27.                 }
  28.             } else {
  29.                 ret = rollbackMethod.invoke(targetTCCBean, args);
  30.                 if (ret != null) {
  31.                     if (ret instanceof TwoPhaseResult) {
  32.                         result = ((TwoPhaseResult) ret).isSuccess();
  33.                     } else {
  34.                         result = (boolean) ret;
  35.                     }
  36.                 } else {
  37.                     result = true;
  38.                 }
  39.             }
  40.             LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", result, xid, branchId, resourceId);
  41.             return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
  42.         } catch (Throwable t) {
  43.             String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
  44.             LOGGER.error(msg, t);
  45.             return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
  46.         }
  47.     }
  48.     ...
  49. }
复制代码
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!
您需要登录后才可以回帖 登录 | 立即注册