阿里开源分布式事务组件 seata :demo 环境搭建以及运行流程简析

案例设计

seata 官方给出了一系列 demo 样例,不过我在用的过程中发现总有这个那个的问题,所以自己维护了一份基于 dubbo 的 demo 在 github 上,适配的 seata 版本是 0.8.0。

案例的设计直接参考官方 quick start给出的案例:

整个案例分为三个服务,分别是存储服务、订单服务和账户服务,这些服务通过 dubbo 进行发布和调用,内部调用逻辑如上面图所示。  

整个 demo 的工程样例如下所示:

undo_log 表

这个案例除了在数据库需要建立业务表以外,还要额外建立一张 undo_log 表,这个表的主要作用是记录事务的前置镜像和后置镜像。

全局事务进行到提交阶段,则删除该表对应的记录,全局事务如果需要回滚,则会利用这个表里记录的镜像数据,恢复数据。

undo_log 表里的数据实际上是“朝生夕死”的,数据不需要在表里存活太久。表结构如下所示:

CREATE TABLE `undo_log` (

`id` bigint(20) NOT NULL AUTO_INCREMENT,

`branch_id` bigint(20) NOT NULL,

`xid` varchar(100) NOT NULL,

`context` varchar(128) NOT NULL,

`rollback_info` longblob NOT NULL,

`log_status` int(11) NOT NULL,

`log_created` datetime NOT NULL,

`log_modified` datetime NOT NULL,

`ext` varchar(100) DEFAULT NULL,

PRIMARY KEY (`id`),

UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)

) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

服务逻辑

每个服务都对应了一个 starter 类,这个类主要用来在 spring 环境下,将该服务启动,并通过 dubbo 发布出去,以账户服务为例:

/**

* The type Dubbo account service starter.

*/

public class DubboAccountServiceStarter {

/**

* 2. Account service is ready . A buyer register an account: U100001 on my e-commerce platform

*

* @param args the input arguments

*/

public static void main(String[] args) {

ClassPathXmlApplicationContext accountContext = new ClassPathXmlApplicationContext(new String[]{"spring/dubbo-account-service.xml"});

accountContext.getBean("service");

JdbcTemplate accountJdbcTemplate = (JdbcTemplate) accountContext.getBean("jdbcTemplate");

accountJdbcTemplate.update("delete from account_tbl where user_id = 'U100001'");

accountJdbcTemplate.update("insert into account_tbl(user_id, money) values ('U100001', 999)");


new ApplicationKeeper(accountContext).keep();

}

}

首先通过   ClassPathXmlApplicationContext   读取 dubbo-account-service.xml 这个 spring 配置文件并启动 spring 容器环境,并通过 spring 的 jdbc template 对账户表的数据进行初始化。  

dubbo-account-service.xml 配置文件中进行了各类 bean 的配置,包括 dubbo 与 spring 结合时的标准配置:

<bean id="accountDataSourceProxy" class="io.seata.rm.datasource.DataSourceProxy">

<constructor-arg ref="accountDataSource" />

</bean>


<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">

<property name="dataSource" ref="accountDataSourceProxy" />

</bean>


<dubbo:application name="dubbo-demo-account-service" />

<dubbo:registry address="zookeeper://localhost:2181" />

<dubbo:protocol name="dubbo" port="20881" />

<dubbo:service interface="io.seata.samples.dubbo.service.AccountService" ref="service" timeout="10000"/>


<bean id="service" class="io.seata.samples.dubbo.service.impl.AccountServiceImpl">

<property name="jdbcTemplate" ref="jdbcTemplate"/>

</bean>


<bean class="io.seata.spring.annotation.GlobalTransactionScanner">

<constructor-arg value="dubbo-demo-account-service"/>

<constructor-arg value="my_test_tx_group"/>

</bean>

这份配置里主要有两个需要引起注意的关键点

  1. jdbcTemplate 这个 bean 所依赖的数据源 bean,是一个类名为 io.seata.rm.datasource.DataSourceProxy 的数据源类,通过它的名字可以很明显地看出这是一个代理模式的应用,因为 seata 为完成全局事务的逻辑,需要在普通的 sql 操作前后添加一些逻辑,比如说 sql 执行前对 sql 进行语法解析,生成前置镜像,sql 执行后生成后置镜像,通过代理的方式,可以方便地对 connection,statement 等进行代理包装,在调用的时候进行拦截,加入自己的逻辑。

  2. 配置文件中还有一个 io.seata.spring.annotation.GlobalTransactionScanner 类型的 bean,这个 bean 是支撑 seata 能在 spring 环境中通过注解的方式来划定事务边界的基础。在 spring 容器启动时,会扫描 @GlobalTransactional   注解是否存在,这个注解标识了全局事务的开始和结束,也就是我们常说的“事务的边界”

业务逻辑

业务逻辑的具体详情在 BusinessServiceImpl   类中可以看到:

@Override

@GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")

public void purchase(String userId, String commodityCode, int orderCount) {

LOGGER.info("purchase begin ... xid: " + RootContext.getXID());

storageService.deduct(commodityCode, orderCount);

orderService.create(userId, commodityCode, orderCount);

// throw new RuntimeException("xxx");

}

先调用存储服务,减少库存,然后调用订单服务,新建订单。这两个动作属于一个整体的事务,任何一个动作失败,都需要撤销所有的操作。

这个方法也有两个需要注意的点:

  1. 该方法上声明了 @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx") 这样的注解,用于让上文提到的 GlobalTransactionScanner 扫描的时候发现这是一个全局事务。

  2. 方法的最后有一行代码抛出了 RuntimeException,这主要是为了模仿全局事务的失败,并让 seata 走全局事务回滚逻辑。

事务扫描与边界定义

上文提到的 GlobalTransactionScanner 类,会在 spring 容器启动的时候,也被初始化。

在它的 afterPropertiesSet 方法被调用时,会触发 seata client 的初始化

@Override

public void afterPropertiesSet() {

if (disableGlobalTransaction) {

if (LOGGER.isInfoEnabled()) {

LOGGER.info("Global transaction is disabled.");

}

return;

}

initClient();


}

关于 seata client 的初始化的细节,可以看看我写的另外一篇文章 《阿里开源分布式事务组件 seata : seata client 通信层解析》  

初始化客户端做的事情主要是建立与 seata server 的连接,并注册 TM 和 RM。接下来,在 wrapIfNecessary 方法里,实现对注解的扫描,并对添加了注解的方法添加 interceptor。  

这篇文章里我们暂时不讨论 TCC 模式,只讨论 AT 模式,也暂不讨论全局事务锁 GlobalLock 的实现,先忽略这些有关的逻辑,只关注事务处理逻辑。

Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);

Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

if (!existsAnnotation(new Class[] {serviceInterface})

&& !existsAnnotation(interfacesIfJdk)) {

return bean;

}

if (interceptor == null) {

interceptor = new GlobalTransactionalInterceptor(failureHandlerHook);

}

在这里,interceptor 的实现是 GlobalTransactionalInterceptor,也就是说,以上文的案例为例子,当 BusinessServiceImpl 的 purchase 方法被调用的时候,实际上这个方法会被拦截器拦截,执行拦截器里的逻辑:

@Override

public Object invoke(final MethodInvocation methodInvocation) throws Throwable {

Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);

Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);

final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);


final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);

final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);

if (globalTransactionalAnnotation != null) {

return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);

} else if (globalLockAnnotation != null) {

return handleGlobalLock(methodInvocation);

} else {

return methodInvocation.proceed();

}

}


private Object handleGlobalTransaction(final MethodInvocation methodInvocation,

final GlobalTransactional globalTrxAnno) throws Throwable {

try {

return transactionalTemplate.execute(new TransactionalExecutor() {

@Override

public Object execute() throws Throwable {

return methodInvocation.proceed();

}


public String name() {

String name = globalTrxAnno.name();

if (!StringUtils.isNullOrEmpty(name)) {

return name;

}

return formatMethod(methodInvocation.getMethod());

}


@Override

public TransactionInfo getTransactionInfo() {

TransactionInfo transactionInfo = new TransactionInfo();

transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());

transactionInfo.setName(name());

Set<RollbackRule> rollbackRules = new LinkedHashSet<>();

for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {

rollbackRules.add(new RollbackRule(rbRule));

}

for (String rbRule : globalTrxAnno.rollbackForClassName()) {

rollbackRules.add(new RollbackRule(rbRule));

}

for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {

rollbackRules.add(new NoRollbackRule(rbRule));

}

for (String rbRule : globalTrxAnno.noRollbackForClassName()) {

rollbackRules.add(new NoRollbackRule(rbRule));

}

transactionInfo.setRollbackRules(rollbackRules);

return transactionInfo;

}

});

} catch (TransactionalExecutor.ExecutionException e) {

TransactionalExecutor.Code code = e.getCode();

switch (code) {

case RollbackDone:

throw e.getOriginalException();

case BeginFailure:

failureHandler.onBeginFailure(e.getTransaction(), e.getCause());

throw e.getCause();

case CommitFailure:

failureHandler.onCommitFailure(e.getTransaction(), e.getCause());

throw e.getCause();

case RollbackFailure:

failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());

throw e.getCause();

default:

throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);

}

}

}

在执行 handleGlobalTransaction 方法时,实际上采用模板模式,委托给了 TransactionalTemplate 类去执行标准的事务处理流程。如下所示:

/**

* Execute object.

*

* @param business the business

* @return the object

* @throws TransactionalExecutor.ExecutionException the execution exception

*/

public Object execute(TransactionalExecutor business) throws Throwable {

// 1. get or create a transaction

GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();


// 1.1 get transactionInfo

TransactionInfo txInfo = business.getTransactionInfo();

if (txInfo == null) {

throw new ShouldNeverHappenException("transactionInfo does not exist");

}

try {


// 2. begin transaction

beginTransaction(txInfo, tx);


Object rs = null;

try {


// Do Your Business

rs = business.execute();


} catch (Throwable ex) {


// 3.the needed business exception to rollback.

completeTransactionAfterThrowing(txInfo,tx,ex);

throw ex;

}


// 4. everything is fine, commit.

commitTransaction(tx);


return rs;

} finally {

//5. clear

triggerAfterCompletion();

cleanUp();

}

}

事务处理逻辑实际上是一种模板,将事务相关的处理逻辑放在 try 块里,发现异常后执行回滚,正常执行则执行提交。  

在这里有个需要注意的地方是,seata 不把提交这个动作放在 try 块里,因为在 seata 里,全局事务的提交实际上是可以异步执行的。  

因为全局事务如果进行到提交这一阶段,那么意味着各个分支事务已经执行过本地提交,全局事务的提交阶段仅仅是删除 undo_log 里的记录,这个记录删除或者不删除,实际上不会改变全局事务已经正常完成的事实。所以它可以用程序异步去做,或者以人工介入的方式去做,所以 seata 认为,全局事务提交失败,不需要执行回滚流程。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章