SEATA的AT模式和XA模式一样,也是2阶段提交,但是AT模式没有利用数据库的XA协议,如下所示:
从上面的图可以看到,seata AT 模式分为以下5个步骤:
①、TM(事务管理器) 开启全局事务;
②、RM 向 TC(事务协调者) 注册分支事务;
③、RM(资源管理器,也就是要访问数据库的进程或线程)记录undo-log(数据快照);
④、RM执行业务sql并提交,并向TC报告状态;
⑤、TC根据所有RM分支报告的状态来决定本次全局事务中的各个RM是要Async Commit(异步提交)或者Auto Rollback(自动回滚),异步提交和自动回滚都需要以不同的方式对步骤③中记录的undo-log(数据快照)进行操作(异步提交的话删除undo-log,自动回滚的话根据undo-log的记录进行回滚)
一、数据源代理
seata 中的 XA 模式也是使用数据源代理来实现的,如下所示:
DataSourceProxy、ConnectionProxy、StatementProxy的UML关系图,如下所示:
- AbstractDataSourceProxy.class和 DataSourceProxy.class的部分源码
- ...省略导包代码...public abstract class AbstractDataSourceProxy implements SeataDataSourceProxy { ...省略部分代码... protected DataSource targetDataSource; ...省略部分代码...}
复制代码- ...省略导包代码...public class DataSourceProxy extends AbstractDataSourceProxy implements Resource { //创建ConnectionProxy ...省略部分代码... @Override public ConnectionProxy getConnection() throws SQLException { Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection); } @Override public ConnectionProxy getConnection(String username, String password) throws SQLException { Connection targetConnection = targetDataSource.getConnection(username, password); return new ConnectionProxy(this, targetConnection); } ...省略部分代码...}
复制代码
- ConnectionProxy.class和 AbstractConnectionProxy.class的部分源码
- ...省略导包代码...public class ConnectionProxy extends AbstractConnectionProxy { ...省略部分代码... //提交SQL @Override public void commit() throws SQLException { try { lockRetryPolicy.execute(() -> { doCommit(); return null; }); } catch (SQLException e) { if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { rollback(); } throw e; } catch (Exception e) { throw new SQLException(e); } } //创建保存点 @Override public Savepoint setSavepoint() throws SQLException { Savepoint savepoint = targetConnection.setSavepoint(); context.appendSavepoint(savepoint); return savepoint; } //将当前RM(资源管理器,也就是要访问数据库的进程或线程)的数据返回到保存点 @Override public void rollback(Savepoint savepoint) throws SQLException { targetConnection.rollback(savepoint); context.removeSavepoint(savepoint); } //删除(数据库)中某个保存点 @Override public void releaseSavepoint(Savepoint savepoint) throws SQLException { targetConnection.releaseSavepoint(savepoint); context.releaseSavepoint(savepoint); } ...省略部分代码...}
复制代码- ...省略导包代码...public abstract class AbstractConnectionProxy implements Connection { ...省略部分代码... //创建StatementProxy @Override public Statement createStatement() throws SQLException { Statement targetStatement = getTargetConnection().createStatement(); return new StatementProxy(this, targetStatement); } //创建StatementProxy @Override public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException { Statement statement = targetConnection.createStatement(resultSetType, resultSetConcurrency); return new StatementProxy(this, statement); } //创建StatementProxy @Override public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException { Statement statement = targetConnection.createStatement(resultSetType, resultSetConcurrency, resultSetHoldability); return new StatementProxy(this, statement); } ...省略部分代码...}
复制代码 1.1、RM(资源管理器,也就是要访问数据库的进程或线程)和TC(事务协调者)之间的交互
在初始化数据源的阶段,RM需要做2件事情:
①、RM需要建立和目标数据库的TCP连接,
②、RM并向TC注册RM资源,如下所示:

这2步都是RM进程中的DataSourceProxy.class中完成的,如下所示:- ...省略导包代码...public class DataSourceProxy extends AbstractDataSourceProxy implements Resource { ...省略部分代码... private static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT"; private String resourceId; public DataSourceProxy(DataSource targetDataSource) { this(targetDataSource, DEFAULT_RESOURCE_GROUP_ID); } public DataSourceProxy(DataSource targetDataSource, String resourceGroupId) { if (targetDataSource instanceof SeataDataSourceProxy) { LOGGER.info("Unwrap the target data source, because the type is: {}", targetDataSource.getClass().getName()); targetDataSource = ((SeataDataSourceProxy) targetDataSource).getTargetDataSource(); } this.targetDataSource = targetDataSource; init(targetDataSource, resourceGroupId); } private void init(DataSource dataSource, String resourceGroupId) { this.resourceGroupId = resourceGroupId; //RM进程获取与数据库进程的TCP连接 try (Connection connection = dataSource.getConnection()) { jdbcUrl = connection.getMetaData().getURL();//解析TCP连接中的URL dbType = JdbcUtils.getDbType(jdbcUrl);//解析TCP连接中的数据库类型,比如是mysql数据库还是oracle数据库 if (JdbcConstants.ORACLE.equals(dbType)) { userName = connection.getMetaData().getUserName(); } else if (JdbcConstants.MYSQL.equals(dbType)) { validMySQLVersion(connection);//校验mysql数据库的版本号 checkDerivativeProduct(); } //校验TCP连接的数据库中,undo_log表是否存在 checkUndoLogTableExist(connection); } catch (SQLException e) { throw new IllegalStateException("can not init dataSource", e); } if (JdbcConstants.SQLSERVER.equals(dbType)) { LOGGER.info("SQLServer support in AT mode is currently an experimental function, " + "if you have any problems in use, please feedback to us"); } initResourceId();//初始化String resourceId //向TC注册RM资源 DefaultResourceManager.get().registerResource(this); //放到这个RM进程的缓存里面,其实是一个ConcurrentHashMap对象 TableMetaCacheFactory.registerTableMeta(this); //Set the default branch type to 'AT' in the RootContext. //将AT(枚举值)刷新到RootContext对象中的BranchType DEFAULT_BRANCH_TYPE变量里面 RootContext.setDefaultBranchType(this.getBranchType()); } ...省略部分代码...}
复制代码 二、AT模式的两个阶段
2.1、AT第一阶段部分源码
在 AT 模式的第一阶段, Seata 会通过代理数据源,拦截用户执行的业务 SQL ,并获取全局锁。如果业务 SQL 是写操作(增、删、改操作)类型,AT 模式会解析业务 SQL 的语法,生成 SELECT SQL 语句,把要被修改的记录查出来,保存为 “beforeImage” 。然后执行业务 SQL ,执行完后用同样的原理,将已经被修改的记录查出来,保存为 “afterImage” ,至此一个 undo log 记录就完整了。随后 RM 会向 TC 注册分支事务(和1.1中向TC注册RM资源不同,此处是向TC注册分支事务), TC 侧会更新加锁记录,锁可以保证 AT 模式的读、写隔离。RM 再将 undo log(保存到undo_log表) 和业务 SQL 的本地事务提交,保证业务 SQL 和保存 undo log 记录 SQL 的原子性,如下所示:
 - ExecuteTemplate.class的源码package org.apache.seata.rm.datasource.exec;import java.sql.SQLException;import java.sql.Statement;import java.util.List;import org.apache.seata.common.exception.NotSupportYetException;import org.apache.seata.common.loader.EnhancedServiceLoader;import org.apache.seata.common.util.CollectionUtils;import org.apache.seata.core.context.RootContext;import org.apache.seata.core.model.BranchType;import org.apache.seata.rm.datasource.StatementProxy;import org.apache.seata.rm.datasource.exec.mariadb.MariadbInsertOnDuplicateUpdateExecutor;import org.apache.seata.rm.datasource.exec.mariadb.MariadbUpdateJoinExecutor;import org.apache.seata.rm.datasource.exec.mysql.MySQLInsertOnDuplicateUpdateExecutor;import org.apache.seata.rm.datasource.exec.mysql.MySQLUpdateJoinExecutor;import org.apache.seata.rm.datasource.exec.polardbx.PolarDBXInsertOnDuplicateUpdateExecutor;import org.apache.seata.rm.datasource.exec.polardbx.PolarDBXUpdateJoinExecutor;import org.apache.seata.rm.datasource.exec.sqlserver.SqlServerDeleteExecutor;import org.apache.seata.rm.datasource.exec.sqlserver.SqlServerSelectForUpdateExecutor;import org.apache.seata.rm.datasource.exec.sqlserver.SqlServerUpdateExecutor;import org.apache.seata.rm.datasource.sql.SQLVisitorFactory;import org.apache.seata.sqlparser.SQLRecognizer;import org.apache.seata.sqlparser.SQLType;import org.apache.seata.sqlparser.util.JdbcConstants;public class ExecuteTemplate { public static T execute(StatementProxy statementProxy, StatementCallback statementCallback, Object... args) throws SQLException { return execute(null, statementProxy, statementCallback, args); } public static T execute(List sqlRecognizers, StatementProxy statementProxy, StatementCallback statementCallback, Object... args) throws SQLException { if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) { // Just work as original statement //如果不需要globalLock并且不是AT模式,则直接执行SQL return statementCallback.execute(statementProxy.getTargetStatement(), args); } //获取数据库类型 String dbType = statementProxy.getConnectionProxy().getDbType(); if (CollectionUtils.isEmpty(sqlRecognizers)) { sqlRecognizers = SQLVisitorFactory.get( statementProxy.getTargetSQL(), dbType); } //获取恰当的SQL解析器 Executor executor; if (CollectionUtils.isEmpty(sqlRecognizers)) { //通用型SQL解析器 executor = new PlainExecutor(statementProxy, statementCallback); } else { if (sqlRecognizers.size() == 1) { SQLRecognizer sqlRecognizer = sqlRecognizers.get(0); switch (sqlRecognizer.getSQLType()) { //INSERT语句的SQL解析器,比如:insert into 表名(列名1,列名2,...列名n) values(值1,值2,...值n); case INSERT: executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType, new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class}, new Object[]{statementProxy, statementCallback, sqlRecognizer}); break; //UPDATE语句的SQL解析器,比如:update 表名 set 列名1 = 值1, 列名2 = 值2,... [where 条件] case UPDATE: if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) { executor = new SqlServerUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); } else { executor = new UpdateExecutor(statementProxy, statementCallback, sqlRecognizer); } break; //DELETE语句的SQL解析器,比如:delete from 表名 where 列名="hello world" case DELETE: if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) { executor = new SqlServerDeleteExecutor(statementProxy, statementCallback, sqlRecognizer); } else { executor = new DeleteExecutor(statementProxy, statementCallback, sqlRecognizer); } break; //select for update语句的SQL解析器,比如select * from 表名 for update case SELECT_FOR_UPDATE: if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) { executor = new SqlServerSelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); } else { executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); } break; //INSERT ... ON DUPLICATE KEY UPDATE语句的SQL解析器,比如:INSERT INTO t1 (a(a是主键且唯一的列),b,c) VALUES (1,2,3) ON DUPLICATE KEY UPDATE c=c+1; case INSERT_ON_DUPLICATE_UPDATE: switch (dbType) { case JdbcConstants.MYSQL: executor = new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); break; case JdbcConstants.MARIADB: executor = new MariadbInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); break; case JdbcConstants.POLARDBX: executor = new PolarDBXInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); break; default: throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE"); } break; //UPDATE JOIN语句的SQL解析器,比如:UPDATE employees e INNER JOIN departments d ON e.dept_id = d.id SET e.salary = e.salary * 100 WHERE d.name = '研发部'; case UPDATE_JOIN: switch (dbType) { case JdbcConstants.MYSQL: executor = new MySQLUpdateJoinExecutor(statementProxy, statementCallback, sqlRecognizer); break; case JdbcConstants.MARIADB: executor = new MariadbUpdateJoinExecutor(statementProxy, statementCallback, sqlRecognizer); break; case JdbcConstants.POLARDBX: executor = new PolarDBXUpdateJoinExecutor(statementProxy, statementCallback, sqlRecognizer); break; default: throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name()); } break; default: executor = new PlainExecutor(statementProxy, statementCallback); break; } } else { executor = new MultiExecutor(statementProxy, statementCallback, sqlRecognizers); } } T rs; try { rs = executor.execute(args); } catch (Throwable ex) { if (!(ex instanceof SQLException)) { // Turn other exception into SQLException ex = new SQLException(ex); } throw (SQLException) ex; } return rs; }}
复制代码 所有的类型的SQL解析器最后都会执行execute(args)函数,这个函数定义在BaseTransactionalExecutor.class中,如下所示:

- BaseTransactionalExecutor.class的部分源码
- ...省略导包代码...public abstract class BaseTransactionalExecutor implements Executor { ...省略部分代码... @Override public T execute(Object... args) throws Throwable { String xid = RootContext.getXID(); if (xid != null) { statementProxy.getConnectionProxy().bind(xid); } //获取全局锁 statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock()); return doExecute(args); } ...省略部分代码...}
复制代码
- AbstractDMLBaseExecutor.class的部分源码
- ...省略导包代码...public abstract class AbstractDMLBaseExecutor extends BaseTransactionalExecutor { ...省略部分代码... @Override public T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); //查看当前RM连接的数据库自动提交的状态 if (connectionProxy.getAutoCommit()) { //RM连接的数据库自动提交的状态=true return executeAutoCommitTrue(args); } else { //RM连接的数据库自动提交的状态=false return executeAutoCommitFalse(args); } } protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { //必须将RM连接的数据库的自动提交状态设置为false connectionProxy.changeAutoCommit(); return new LockRetryPolicy(connectionProxy).execute(() -> { //先在这个函数里面获取执行DML语句前的镜像数据beforeImage,然后再执行SQL语句,最后再获取执行了DML语句后的镜像数据afterImage //最后将beforeImage和afterImage全部保存到数据库的undo_log表中 T result = executeAutoCommitFalse(args); connectionProxy.commit();//调用ConnectionProxy.class::commit()函数 return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); } } protected T executeAutoCommitFalse(Object[] args) throws Exception { try { //beforeImage()函数在子类中实现,比如DeleteExecutor TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args);//执行SQL语句 //afterImage()函数也在子类中实现,比如DeleteExecutor TableRecords afterImage = afterImage(beforeImage); //组装undo log日志的内容 prepareUndoLog(beforeImage, afterImage); return result; } catch (TableMetaException e) { LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}", e.getTableName(), e.getColumnName()); statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent(); throw e; } } //获取全局锁 private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy { LockRetryPolicy(final ConnectionProxy connection) { super(connection); } @Override public T execute(Callable callable) throws Exception { if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) { return doRetryOnLockConflict(callable); } else { return callable.call(); } } @Override protected void onException(Exception e) throws Exception { ConnectionContext context = connection.getContext(); //UndoItems can't use the Set collection class to prevent ABA context.removeSavepoint(null); connection.getTargetConnection().rollback(); } public static boolean isLockRetryPolicyBranchRollbackOnConflict() { return LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT; } } ...省略部分代码...}
复制代码
- AbstractDMLBaseExecutor.class的部分源码
- ...省略导包代码...public class ConnectionProxy extends AbstractConnectionProxy { ...省略部分代码... @Override public void commit() throws SQLException { try { lockRetryPolicy.execute(() -> { doCommit(); return null; }); } catch (SQLException e) { if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { rollback(); } throw e; } catch (Exception e) { throw new SQLException(e); } } private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); } } //此处是TM将全局事务提交,不应该算第二阶段的步骤,因为第二阶段指的是RM中的第二阶段,而此处是TM private void processGlobalTransactionCommit() throws SQLException { try { register();//注册分支事务,将xid绑定到context上 } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { //在flushUndoLogs()函数将组装好的undo log插入到undo_log表中 UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); targetConnection.commit();//提交undo log和目标SQL语句 } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); report(false); throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) { report(true); } context.reset(); } public static class LockRetryPolicy { protected static final boolean LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = ConfigurationFactory .getInstance().getBoolean(ConfigurationKeys.CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT, DEFAULT_CLIENT_LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT); protected final ConnectionProxy connection; public LockRetryPolicy(ConnectionProxy connection) { this.connection = connection; } public T execute(Callable callable) throws Exception { // the only case that not need to retry acquire lock hear is // LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == true && connection#autoCommit == true // because it has retry acquire lock when AbstractDMLBaseExecutor#executeAutoCommitTrue if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT && connection.getContext().isAutoCommitChanged()) { return callable.call(); } else { // LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == false // or LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT == true && autoCommit == false return doRetryOnLockConflict(callable); } } protected T doRetryOnLockConflict(Callable callable) throws Exception { LockRetryController lockRetryController = new LockRetryController(); while (true) { try { return callable.call(); } catch (LockConflictException lockConflict) { onException(lockConflict); // AbstractDMLBaseExecutor#executeAutoCommitTrue the local lock is released if (connection.getContext().isAutoCommitChanged() && lockConflict.getCode() == TransactionExceptionCode.LockKeyConflictFailFast) { lockConflict.setCode(TransactionExceptionCode.LockKeyConflict); } lockRetryController.sleep(lockConflict); } catch (Exception e) { onException(e); throw e; } } } protected void onException(Exception e) throws Exception { } } ...省略部分代码...}
复制代码 2.1.1、 beforeImage 和afterImage构造的undo log都有什么数据
比如有一张product表,该表的表结构如下:
FieldTypeKeyidbigint(20)PRInamevarchar(100)sincevarchar(100)AT模式中 有一个分支事务的业务逻辑如下所示:- update product set name = 'GTS' where name = 'TXC';
复制代码 在第一阶段中,执行以下8步:
①、解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = 'TXC')等相关的信息。
②、查询beforeImage(前镜像) :根据解析得到的条件信息,生成查询语句,定位数据。- select id, name, since from product where name = 'TXC';
复制代码 得到beforeImage (前镜像):
idnamesince1TXC2014③、执行业务 SQL:更新这条记录的 name 为 'GTS'。
④、查询afterImage(后镜像):根据beforeImage 的结果,通过 主键 定位数据。- select id, name, since from product where id = 1;
复制代码 得到afterImage(后镜像):
idnamesince1GTS2014⑤、插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。回滚日志记录如下所示:- { "branchId": 641789253, "undoItems": [{ "afterImage": { "rows": [{ "fields": [{ "name": "id", "type": 4, "value": 1 }, { "name": "name", "type": 12, "value": "GTS" }, { "name": "since", "type": 12, "value": "2014" }] }], "tableName": "product" }, "beforeImage": { "rows": [{ "fields": [{ "name": "id", "type": 4, "value": 1 }, { "name": "name", "type": 12, "value": "TXC" }, { "name": "since", "type": 12, "value": "2014" }] }], "tableName": "product" }, "sqlType": "UPDATE" }], "xid": "xid:xxx"}
复制代码 ⑥、提交前,向 TC 注册分支:申请 product 表中,主键值等于 1 的记录的 全局锁 。
⑦、本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
⑧、将本地事务提交的结果上报给 TC。
2.2、AT第二阶段部分源码
2.2.1、二阶段提交的分支——异步提交(如有作者理解的有问题,请告知作者)
作者认为AT 模式的二阶段提交是指RM异步提交并删除内存中的 undo log 记录即可,并不是指TM向TC提交全局事务(如有作者理解的有问题,请告知作者)。
- ConnectionProxy.class的部分源码
- ...省略导包代码...public class ConnectionProxy extends AbstractConnectionProxy { ...省略部分代码... @Override public void commit() throws SQLException { try { lockRetryPolicy.execute(() -> {//异步提交 doCommit(); return null; }); } catch (SQLException e) { if (targetConnection != null && !getAutoCommit() && !getContext().isAutoCommitChanged()) { rollback(); } throw e; } catch (Exception e) { throw new SQLException(e); } } private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { processLocalCommitWithGlobalLocks();//RM的二阶段提交 } else { targetConnection.commit(); } } private void processLocalCommitWithGlobalLocks() throws SQLException { checkLock(context.buildLockKeys());//检查锁是否还在 try { targetConnection.commit();//提交 } catch (Throwable ex) { throw new SQLException(ex); } context.reset();//清除jvm缓存 } ...省略部分代码...}
复制代码
- ConnectionContext.class的部分源码
- ...省略导包代码...public class ConnectionContext { ...省略部分代码... public void reset() { this.reset(null); } void reset(String xid) { this.xid = xid; branchId = null; this.isGlobalLockRequire = false; savepoints.clear();//清除保存点 lockKeysBuffer.clear();//清除锁 sqlUndoItemsBuffer.clear();//清除jvm内存中的undo log this.autoCommitChanged = false;//表示autoCommit状态是否改变 applicationData.clear();// } ...省略部分代码...}
复制代码 另外一种说法是,AT 模式的二阶段提交,TC 侧会将该事务的锁删除,然后通知 RM 异步删除 undo log 记录即可。
2.2.2、二阶段回滚的分支——自动回滚
如果 AT 模式的二阶段是回滚,那么 RM 侧需要根据一阶段保存的 undo log 数据中的 beforeImage 记录,通过逆向 SQL 的方式,对在一阶段修改过的业务数据进行还原即可。
但是在还原数据之前,需要进行脏数据校验。因为在一阶段提交后,到现在进行回滚的中间这段时间,该记录有可能被别的业务改动过。校验的方式,就是用 undo log 的 afterImage 和现在数据库的数据做比较,假如数据一致,说明没有脏数据;不一致则说明有脏数据,出现脏数据就需要人工进行处理了。
三、用JDK中Callable接口的匿名内部类(SEATA-AT模式也是用Callable接口)模拟SEATA-AT模式获取全局锁(global lock)的过程
- package org.apache.seata.xxx1;import java.sql.Statement;import java.util.concurrent.Callable;public abstract class AbstractDMLBaseExecutor { protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = new ConnectionProxy(); long startTime = System.currentTimeMillis(); return new LockRetryPolicy(connectionProxy).execute(() -> { if (LockRetryPolicy.LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT && LockRetryPolicy.count > 0) { System.out.println(Thread.currentThread().getName() + "线程的executeAutoCommitTrue()函数第" + LockRetryPolicy.count + "次尝试获取锁" + ",时间:" + System.currentTimeMillis()); LockRetryPolicy.count--; throw new Exception(); } else { System.out.println(Thread.currentThread().getName() + "线程的executeAutoCommitTrue()函数获取到锁了,总耗时:" + (System.currentTimeMillis() - startTime) + "ms"); return (T) "获取到锁执行了"; } }); } private static class LockRetryPolicy extends ConnectionProxy.LockRetryPolicy { LockRetryPolicy(final ConnectionProxy connection) { super(connection, 10); } @Override public T execute(Callable callable) throws Exception { if (LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) { return doRetryOnLockConflict(callable); } else { return callable.call(); } } @Override protected void onException(Exception e) throws Exception { } public static boolean isLockRetryPolicyBranchRollbackOnConflict() { return true; } }}
复制代码- package org.apache.seata.xxx1;import java.util.concurrent.Callable;public class ConnectionProxy { public static class LockRetryPolicy { protected static boolean LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT = true; public static int count; protected final ConnectionProxy connection; public LockRetryPolicy(ConnectionProxy connection, int count) { this.connection = connection; this.count = 10; } public T execute(Callable callable) throws Exception { if (!LOCK_RETRY_POLICY_BRANCH_ROLLBACK_ON_CONFLICT) { return callable.call(); } else { return doRetryOnLockConflict(callable); } } protected T doRetryOnLockConflict(Callable callable) throws Exception { while (true) { try { return callable.call(); } catch (Exception e) { Thread.sleep(1000);//此处模拟lockRetryController.sleep(lockConflict);函数的执行,lockRetryController.sleep()函数中也是使用Thread.sleep()函数停顿线程 } } } protected void onException(Exception e) throws Exception { } }}
复制代码- package org.apache.seata.xxx1;import java.sql.Statement;public class DeleteExecutor extends AbstractDMLBaseExecutor{}
复制代码- package org.apache.seata.xxx1;import java.sql.Connection;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.SQLWarning;public class PreparedStatementProxy extends StatementProxy { ...省略抽象函数的实现...}
复制代码- package org.apache.seata.xxx1;import java.sql.Statement;public abstract class StatementProxy implements Statement {}
复制代码 测试:- package org.apache.seata.xxx1;import java.sql.Statement;public class Test { public static void main(String[] args) throws Throwable { DeleteExecutor executor = new DeleteExecutor(); String s = executor.executeAutoCommitTrue(args); System.out.println(Thread.currentThread().getName()+"线程的main()函数:"+s); }}
复制代码 运行结果如下:
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |