在 Saga 模式的业务流程中,每个参与者都会提交本地事务。当某一个参与者失败,则补偿之前已经成功的参与者。一阶段正向服务和二阶段补偿服务均由业务开发实现。
本文将基于 Saga 模式的示例工程 引导您快速体验 Saga 模式的分布式应用事务开发。
依赖与配置项
引入依赖
基于 SOFABoot 框架
SOFABoot 框架中已经默认集成了分布式事务 SDK,不需要手动添加分布式事务的 Maven 依赖。您只需确保在工程的主 pom.xml
文件中添加以下依赖配置:
<parent>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofaboot-enterprise-dependencies</artifactId>
<version>3.3.2</version>
</parent>
说明:SOFABoot 的最新版本信息,参见 SOFABoot 版本说明。
基于 Spring 框架
在 Spring 框架中,您需要依次添加 Spring 依赖和分布式事务依赖。
在应用中添加如下 Spring Maven 依赖:
<!-- spring boot and spring dependencies begin--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-autoconfigure</artifactId> </dependency> <!-- logback --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot</artifactId> </dependency> <!-- test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- spring dependencies end -->
说明Spring 使用默认版本即可,也可自定义需要的版本,如
2.1.13.RELEASE
。在工程中添加如下分布式事务依赖:
<dependency> <groupId>com.alipay.dtx</groupId> <artifactId>dtx-sofaboot</artifactId> <version>${dtx.version}</version> </dependency>
说明${dtx.version}
可自定义需要的版本或最新版本,如2.3.0
版本。
配置工程属性
在 SOFAStack 云上环境中运行,需要将工程 application.properties
文件中 instanceid
、antvip.endpoint
等修改为对应环境的配置。详见 引入 SOFA 中间件 > properties 配置项。
参与者开发
完成依赖与配置项操作后,即可进行参与者应用开发。此处提供两个参与者示例,分别是转账交易中的扣钱与加钱参与者。您可以根据实际业务,自定义正向和补偿操作方法名。
扣钱参与者
实现扣钱方法和失败补偿方法,示例如下:
/** * Saga扣钱参与者 */ public interface FirstSagaAction{ /** * 扣钱操作 * @param businessKey * @param accountNo * @param amount * @return */ boolean amountMinus(String businessKey,String accountNo,BigDecimal amount,Map<String,Object> extParams); /** * 补偿(冲正)扣钱操作 * @param businessKey * @param accountNo * @return */ boolean compensateAmountMinus(String businessKey,String accountNo); }
实现接口,示例如下:
public class FirstSagaActionImpl implements FirstSagaAction{ protected final static Logger logger =LoggerFactory.getLogger(FirstSagaActionImpl.class); //账户dao private AccountDAO firstAccountDAO; //业务流水dao private AccountTransactionDAO firstAccountTransactionDAO; //事务模版 private TransactionTemplate firstActionTransactionTemplate; @Override public boolean amountMinus(final String businessKey,final String accountNo,final BigDecimal amount,final Map<String,Object> extParams){ try{ if(amount.compareTo(BigDecimal.ZERO)<0){ throw new RuntimeException("金额必须大小0"); } AccountTransaction accountTransaction = firstAccountTransactionDAO.findTransaction(businessKey); if(accountTransaction !=null&&Status.SUCCESS.name().equals(accountTransaction.getStatus())){ //幂等控制:交易已成功,直接返回成功 return true; } return firstActionTransactionTemplate.execute(newTransactionCallback<Boolean>(){ @Override public Boolean doInTransaction(TransactionStatus status){ try{ //校验账户余额 Account account = firstAccountDAO.getAccountForUpdate(accountNo); if(account.getAmount().compareTo(amount)<0){ throw new RuntimeException("余额不足"); } //记录账户操作流水 AccountTransaction accountTransaction =new AccountTransaction(); accountTransaction.setBusinessKey(businessKey); accountTransaction.setAccountNo(accountNo); accountTransaction.setAmount(amount); accountTransaction.setType(Type.MINUS.name()); accountTransaction.setStatus(Status.SUCCESS.name()); firstAccountTransactionDAO.addTransaction(accountTransaction); //扣钱 BigDecimal amount = account.getAmount().subtract(accountTransaction.getAmount()); if(amount.compareTo(BigDecimal.ZERO)<0){ throw new RuntimeException("余额不足"); } account.setAmount(amount); firstAccountDAO.updateAmount(account); }catch(Exception e){ logger.error("扣钱操作失败", e); throw new RuntimeException("扣钱操作失败", e); } return true; } }); }catch(Exception e){ logger.error("扣钱操作失败", e); return false; } } @Override public boolean compensateAmountMinus(final String businessKey,final String accountNo){ AccountTransaction accountTransaction; try{ accountTransaction = firstAccountTransactionDAO.findTransaction(businessKey); if(accountTransaction ==null){ //原交易流水不存在, 记录防悬挂(后发先至)流水 accountTransaction =new AccountTransaction(); accountTransaction.setBusinessKey(businessKey); accountTransaction.setAccountNo(accountNo); accountTransaction.setType(Type.MINUS.name()); accountTransaction.setStatus(Status.COMPENSATED.name()); firstAccountTransactionDAO.addTransaction(accountTransaction); return true; } if(Status.COMPENSATED.name().equals(accountTransaction.getStatus())){ //幂等控制:补偿已成功,直接返回成功 return true; } final AccountTransaction accountTransactionFinal = accountTransaction; return firstActionTransactionTemplate.execute(new TransactionCallback<Boolean>(){ @Override public Boolean doInTransaction(TransactionStatus status){ try{ //补偿已扣减金额 Account account = firstAccountDAO.getAccountForUpdate(accountTransactionFinal.getAccountNo()); BigDecimal amount = account.getAmount().add(accountTransactionFinal.getAmount()); account.setAmount(amount); firstAccountDAO.updateAmount(account); //更新流水状态 accountTransactionFinal.setStatus(Status.COMPENSATED.name()); firstAccountTransactionDAO.updateTransaction(accountTransactionFinal); return true; }catch(Exception e){ logger.error("扣钱操作补偿失败", e); status.setRollbackOnly(); return false; } } }); }catch(SQLException e){ logger.error("扣钱操作补偿失败", e); return false; } } public void setFirstAccountDAO(AccountDAO firstAccountDAO){ this.firstAccountDAO = firstAccountDAO; } public void setFirstAccountTransactionDAO(AccountTransactionDAO firstAccountTransactionDAO){ this.firstAccountTransactionDAO = firstAccountTransactionDAO; } public void setFirstActionTransactionTemplate(TransactionTemplate firstActionTransactionTemplate){ this.firstActionTransactionTemplate = firstActionTransactionTemplate; } }
加钱参与者
实现正向加钱方法和失败补偿方法,示例如下:
/** * Saga 加钱参与者 */ public interface SecondSagaAction{ /** * 加钱操作 * @param businessKey * @param accountNo * @param amount * @return */ boolean amountAdd(String businessKey,String accountNo,BigDecimal amount,Map<String,Object> extParams); /** * 补偿(冲正)加钱操作 * @param businessKey * @param accountNo * @return */ boolean compensateAmountAdd(String businessKey,String accountNo); }
实现接口,示例如下:
@Override public boolean compensateAmountAdd(final String businessKey,final String accountNo){ AccountTransaction accountTransaction; try{ accountTransaction = secondAccountTransactionDAO.findTransaction(businessKey); if(accountTransaction ==null){ //防悬挂:原交流流水不存在, 记录防悬挂(后发先至)流水 accountTransaction =new AccountTransaction(); accountTransaction.setBusinessKey(businessKey); accountTransaction.setAccountNo(accountNo); accountTransaction.setType(Type.ADD.name()); accountTransaction.setStatus(Status.COMPENSATED.name()); secondAccountTransactionDAO.addTransaction(accountTransaction); //允许空补偿:返回补偿成功 return true; } if(Status.COMPENSATED.name().equals(accountTransaction.getStatus())){ //幂等控制:补偿已成功,直接返回成功 return true; } final AccountTransaction accountTransactionFinal = accountTransaction; return secondActionTransactionTemplate.execute(new TransactionCallback<Boolean>(){ @Override public Boolean doInTransaction(TransactionStatus status){ try{ //扣回已加金额 Account account = secondAccountDAO.getAccountForUpdate(accountTransactionFinal.getAccountNo()); BigDecimal amount = account.getAmount().subtract(accountTransactionFinal.getAmount()); if(amount.compareTo(BigDecimal.ZERO)<0){ throw new RuntimeException("余额不足, 请工人核对"); } account.setAmount(amount); secondAccountDAO.updateAmount(account); //更新流水状态 accountTransactionFinal.setStatus(Status.COMPENSATED.name()); secondAccountTransactionDAO.updateTransaction(accountTransactionFinal); return true; }catch(Exception e){ logger.error("加钱操作补偿失败", e); status.setRollbackOnly(); return false; } } }); }catch(SQLException e){ logger.error("加钱操作补偿失败", e); return false; } } public void setSecondAccountDAO(AccountDAO secondAccountDAO){ this.secondAccountDAO = secondAccountDAO; } public void setSecondAccountTransactionDAO(AccountTransactionDAO secondAccountTransactionDAO){ this.secondAccountTransactionDAO = secondAccountTransactionDAO; } public void setSecondActionTransactionTemplate(TransactionTemplate secondActionTransactionTemplate){ this.secondActionTransactionTemplate = secondActionTransactionTemplate; } }
业务开发
完成参与者开发后,您需要通过分布式事务控制台提供的状态设计器进行业务服务编排,如编排成转账交易服务。
操作步骤如下:
进入分布式事务控制台页面,在左侧导航栏中选择 应用事务 > 事务配置。
在当前页面中,点击 创建应用事务 > Saga 模式。
在 创建 Saga 事务 页面,根据提示配置以下信息:
应用名称:可从下拉菜单中选择现有应用或新建一个应用。
BizType:填写要创建的 Saga 事务的业务流程名称。
描述:对该事务的备注信息。
单击 下一步,进入设计器页面,开始编排业务流程。
以便您快速体验,点击此处直接下载一个已经编排好的 JSON 文件。
单击编辑区上方的 JSON View 图标,并将下载的 JSON 语句粘贴到设计器的 JSON View 框中。
单击 Designer View 图标,切换回设计器模式,即可查看具体的业务流程图。
将刚刚设计器生成的 JSON 保存到工程中。如在示例工程将其保存至
account-demo-saga/account-demo-service/src/main/resources/statelang/transfer.json
。在工程中配置状态机引擎,参见示例工程文件
account-demo-saga/account-demo-service/src/main/resources/META-INF/spring/transfer-bean.xml
。<bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine"> <property name="stateMachineConfig" ref="dbStateMachineConfig"></property> </bean> <bean id="dbStateMachineConfig" class="com.alipay.dtx.client.saga.config.DtxSagaStateMachineConfig"> <property name="dataSource" ref="transferDataSource"></property> <property name="resources" value="classpath:statelang/*.json"></property> <property name="enableAsync" value="true"></property> <property name="threadPoolExecutor" ref="threadExecutor"></property> </bean> <bean id="threadExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean"> <property name="threadNamePrefix" value="SAGA_ASYNC_EXE_"/> <property name="corePoolSize" value="20"/> <property name="maxPoolSize" value="20"/> <property name="queueCapacity" value="100"/> <property name="rejectedExecutionHandler" ref="callerRunsPolicy"/> </bean> <bean name="callerRunsPolicy" class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy"></bean> <bean class="io.seata.saga.rm.StateMachineEngineHolder"> <property name="stateMachineEngine" ref="stateMachineEngine"/> </bean>
说明本文的示例工程中是用的 h2 数据源,且自动建表。Saga 事务日志建表语句,可参考 Saga 模式建表语句示例。
在 Java 代码中,通过状态机引擎启动业务流程:
/** * 转账交易 */ public interface TransferService{ /** * Saga转账 * * @param from * @param to * @param amount * @return */ boolean transferBySaga(final String from,final String to,final BigDecimal amount,Map<String,Object> extParams); }
对应的接口实现代码示例如下:
public class TransferServiceImpl implements TransferService{ protected final static Logger logger =LoggerFactory.getLogger(TransferServiceImpl.class); @Autowired private StateMachineEngine stateMachineEngine; @Override public boolean transferBySaga(String from,String to,BigDecimal amount,Map<String,Object> extParams){ try{ String businessKey = UUID.randomUUID().toString().replaceAll("-",""); Map<String,Object> params =new HashMap<>(4); params.put("from", from); params.put("to", to); params.put("amount", amount); if(extParams !=null){ params.put("extParams", extParams); } StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("transferBySaga",null, businessKey, params); if(ExecutionStatus.SU.equals(inst.getStatus()) && inst.getCompensationStatus() == null){ //正向状态为成功, 补偿状态为空(没为触发回滚),则交易成功 return true; } else{ //如果执行到了Fail节点会有errorCode和errorMessage String errorCode =(String) inst.getContext().get(DomainConstants.VAR_NAME_STATEMACHINE_ERROR_CODE); String errorMessage =(String) inst.getContext().get(DomainConstants.VAR_NAME_STATEMACHINE_ERROR_MSG); System.out.println("ErrorCode:"+ errorCode +", ErrorMsg:"+ errorMessage +", exception: "+ inst.getException()); return false; } }catch(Throwable t){ logger.error("转账交易执行失败", t); throw new RuntimeException(t); } } public void setStateMachineEngine(StateMachineEngine stateMachineEngine){ this.stateMachineEngine = stateMachineEngine; } }