Saga 模式快速入门

在 Saga 模式的业务流程中,每个参与者都会提交本地事务。当某一个参与者失败,则补偿之前已经成功的参与者。一阶段正向服务和二阶段补偿服务均由业务开发实现。

本文将基于 Saga 模式的示例工程 引导您快速体验 Saga 模式的分布式应用事务开发。

  1. 依赖配置:在本地工程项目中,添加分布式事务依赖与相关配置项等。

  2. 参与者开发:进行本地事务参与者开发,实现相应的方法与接口。

  3. 业务开发:设计编排实际业务流程,通过状态机引擎启动该业务。

依赖与配置项

引入依赖

基于 SOFABoot 框架

SOFABoot 框架中已经默认集成了分布式事务 SDK,不需要手动添加分布式事务的 Maven 依赖。您只需确保在工程的主 pom.xml 文件中添加以下依赖配置:

<parent>
     <groupId>com.alipay.sofa</groupId>
     <artifactId>sofaboot-enterprise-dependencies</artifactId>
     <version>3.3.2</version>
</parent>

说明:SOFABoot 的最新版本信息,参见 SOFABoot 版本说明

基于 Spring 框架

在 Spring 框架中,您需要依次添加 Spring 依赖和分布式事务依赖。

  1. 在应用中添加如下 Spring Maven 依赖:

    <!-- spring boot and spring dependencies begin-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-autoconfigure</artifactId>
    </dependency>
    <!-- logback -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-logging</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot</artifactId>
    </dependency>
    <!-- test -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <!-- spring dependencies end -->
    说明

    Spring 使用默认版本即可,也可自定义需要的版本,如 2.1.13.RELEASE

  2. 在工程中添加如下分布式事务依赖:

    <dependency>
          <groupId>com.alipay.dtx</groupId>
          <artifactId>dtx-sofaboot</artifactId>
          <version>${dtx.version}</version>
    </dependency>
    说明

    ${dtx.version} 可自定义需要的版本或最新版本,如 2.3.0 版本。

配置工程属性

在 SOFAStack 云上环境中运行,需要将工程 application.properties 文件中 instanceidantvip.endpoint 等修改为对应环境的配置。详见 引入 SOFA 中间件 > properties 配置项

参与者开发

完成依赖与配置项操作后,即可进行参与者应用开发。此处提供两个参与者示例,分别是转账交易中的扣钱与加钱参与者。您可以根据实际业务,自定义正向和补偿操作方法名。

扣钱参与者

  1. 实现扣钱方法和失败补偿方法,示例如下:

    /**
     * Saga扣钱参与者
     */
    public interface FirstSagaAction{
    
        /**
         * 扣钱操作
         * @param businessKey
         * @param accountNo
         * @param amount
         * @return
         */
        boolean amountMinus(String businessKey,String accountNo,BigDecimal amount,Map<String,Object> extParams);
    
        /**
         * 补偿(冲正)扣钱操作
         * @param businessKey
         * @param accountNo
         * @return
         */
        boolean compensateAmountMinus(String businessKey,String accountNo);
    }
    
  2. 实现接口,示例如下:

    public class FirstSagaActionImpl implements FirstSagaAction{
    
            protected final static Logger logger =LoggerFactory.getLogger(FirstSagaActionImpl.class);
    
            //账户dao
            private AccountDAO            firstAccountDAO;
            //业务流水dao
            private AccountTransactionDAO firstAccountTransactionDAO;
            //事务模版
            private TransactionTemplate   firstActionTransactionTemplate;
    
            @Override
            public boolean amountMinus(final String businessKey,final String accountNo,final BigDecimal amount,final Map<String,Object> extParams){
                try{
                    if(amount.compareTo(BigDecimal.ZERO)<0){
                        throw new RuntimeException("金额必须大小0");
                    }
    
                    AccountTransaction accountTransaction = firstAccountTransactionDAO.findTransaction(businessKey);
                    if(accountTransaction !=null&&Status.SUCCESS.name().equals(accountTransaction.getStatus())){
                        //幂等控制:交易已成功,直接返回成功
                        return true;
                    }
    
                    return firstActionTransactionTemplate.execute(newTransactionCallback<Boolean>(){
                        @Override
                        public Boolean doInTransaction(TransactionStatus status){
                            try{
                                //校验账户余额
                                Account account = firstAccountDAO.getAccountForUpdate(accountNo);
                                if(account.getAmount().compareTo(amount)<0){
                                    throw new RuntimeException("余额不足");
                                }
                                //记录账户操作流水
                                AccountTransaction accountTransaction =new AccountTransaction();
                                accountTransaction.setBusinessKey(businessKey);
                                accountTransaction.setAccountNo(accountNo);
                                accountTransaction.setAmount(amount);
                                accountTransaction.setType(Type.MINUS.name());
                                accountTransaction.setStatus(Status.SUCCESS.name());
                                firstAccountTransactionDAO.addTransaction(accountTransaction);
    
                                //扣钱
                                BigDecimal amount = account.getAmount().subtract(accountTransaction.getAmount());
                                if(amount.compareTo(BigDecimal.ZERO)<0){
                                    throw new RuntimeException("余额不足");
                                }
                                account.setAmount(amount);
                                firstAccountDAO.updateAmount(account);
                            }catch(Exception e){
                                logger.error("扣钱操作失败", e);
                                throw new RuntimeException("扣钱操作失败", e);
                            }
                            return true;
                        }
                    });
                }catch(Exception e){
                    logger.error("扣钱操作失败", e);
                    return false;
                }
            }
    
            @Override
            public boolean compensateAmountMinus(final String businessKey,final String accountNo){
                AccountTransaction accountTransaction;
                try{
                    accountTransaction = firstAccountTransactionDAO.findTransaction(businessKey);
                    if(accountTransaction ==null){
                        //原交易流水不存在, 记录防悬挂(后发先至)流水
                        accountTransaction =new AccountTransaction();
                        accountTransaction.setBusinessKey(businessKey);
                        accountTransaction.setAccountNo(accountNo);
                        accountTransaction.setType(Type.MINUS.name());
                        accountTransaction.setStatus(Status.COMPENSATED.name());
                        firstAccountTransactionDAO.addTransaction(accountTransaction);
                        return true;
                    }
                    if(Status.COMPENSATED.name().equals(accountTransaction.getStatus())){
                        //幂等控制:补偿已成功,直接返回成功
                        return true;
                    }
    
                    final AccountTransaction accountTransactionFinal = accountTransaction;
                    return firstActionTransactionTemplate.execute(new TransactionCallback<Boolean>(){
    
                        @Override
                        public Boolean doInTransaction(TransactionStatus status){
                            try{
                                //补偿已扣减金额
                                Account account = firstAccountDAO.getAccountForUpdate(accountTransactionFinal.getAccountNo());
                                BigDecimal amount = account.getAmount().add(accountTransactionFinal.getAmount());
                                account.setAmount(amount);
                                firstAccountDAO.updateAmount(account);
                                //更新流水状态
                                accountTransactionFinal.setStatus(Status.COMPENSATED.name());
                                firstAccountTransactionDAO.updateTransaction(accountTransactionFinal);
                                return true;
                            }catch(Exception e){
                                logger.error("扣钱操作补偿失败", e);
                                status.setRollbackOnly();
                                return false;
                            }
                        }
                    });
                }catch(SQLException e){
                    logger.error("扣钱操作补偿失败", e);
                    return false;
                }
            }
    
            public void setFirstAccountDAO(AccountDAO firstAccountDAO){
                this.firstAccountDAO = firstAccountDAO;
            }
    
            public void setFirstAccountTransactionDAO(AccountTransactionDAO firstAccountTransactionDAO){
                this.firstAccountTransactionDAO = firstAccountTransactionDAO;
            }
    
            public void setFirstActionTransactionTemplate(TransactionTemplate firstActionTransactionTemplate){
                this.firstActionTransactionTemplate = firstActionTransactionTemplate;
            }
        }
    

加钱参与者

  1. 实现正向加钱方法和失败补偿方法,示例如下:

    /**
    * Saga 加钱参与者
    */
    public interface SecondSagaAction{
    
    /**
      * 加钱操作
      * @param businessKey
      * @param accountNo
      * @param amount
      * @return
      */
    boolean amountAdd(String businessKey,String accountNo,BigDecimal amount,Map<String,Object> extParams);
    
    /**
      * 补偿(冲正)加钱操作
      * @param businessKey
      * @param accountNo
      * @return
      */
    boolean compensateAmountAdd(String businessKey,String accountNo);
    }
  2. 实现接口,示例如下:

    @Override
        public boolean compensateAmountAdd(final String businessKey,final String accountNo){
    
            AccountTransaction accountTransaction;
            try{
                accountTransaction = secondAccountTransactionDAO.findTransaction(businessKey);
                if(accountTransaction ==null){
                    //防悬挂:原交流流水不存在, 记录防悬挂(后发先至)流水
                    accountTransaction =new AccountTransaction();
                    accountTransaction.setBusinessKey(businessKey);
                    accountTransaction.setAccountNo(accountNo);
                    accountTransaction.setType(Type.ADD.name());
                    accountTransaction.setStatus(Status.COMPENSATED.name());
                    secondAccountTransactionDAO.addTransaction(accountTransaction);
    
                    //允许空补偿:返回补偿成功
                    return true;
                }
                if(Status.COMPENSATED.name().equals(accountTransaction.getStatus())){
                    //幂等控制:补偿已成功,直接返回成功
                    return true;
                }
    
                final AccountTransaction accountTransactionFinal = accountTransaction;
                return secondActionTransactionTemplate.execute(new TransactionCallback<Boolean>(){
    
                    @Override
                    public Boolean doInTransaction(TransactionStatus status){
                        try{
                            //扣回已加金额
                            Account account = secondAccountDAO.getAccountForUpdate(accountTransactionFinal.getAccountNo());
                            BigDecimal amount = account.getAmount().subtract(accountTransactionFinal.getAmount());
                            if(amount.compareTo(BigDecimal.ZERO)<0){
                                throw new RuntimeException("余额不足, 请工人核对");
                            }
                            account.setAmount(amount);
                            secondAccountDAO.updateAmount(account);
                            //更新流水状态
                            accountTransactionFinal.setStatus(Status.COMPENSATED.name());
                            secondAccountTransactionDAO.updateTransaction(accountTransactionFinal);
                            return true;
                        }catch(Exception e){
                            logger.error("加钱操作补偿失败", e);
                            status.setRollbackOnly();
                            return false;
                        }
                    }
                });
    
            }catch(SQLException e){
                logger.error("加钱操作补偿失败", e);
                return false;
            }
        }
    
        public void setSecondAccountDAO(AccountDAO secondAccountDAO){
            this.secondAccountDAO = secondAccountDAO;
        }
    
        public void setSecondAccountTransactionDAO(AccountTransactionDAO secondAccountTransactionDAO){
            this.secondAccountTransactionDAO = secondAccountTransactionDAO;
        }
    
        public void setSecondActionTransactionTemplate(TransactionTemplate secondActionTransactionTemplate){
            this.secondActionTransactionTemplate = secondActionTransactionTemplate;
        }
    }
    

业务开发

完成参与者开发后,您需要通过分布式事务控制台提供的状态设计器进行业务服务编排,如编排成转账交易服务。

操作步骤如下:

  1. 进入分布式事务控制台页面,在左侧导航栏中选择 应用事务 > 事务配置

  2. 在当前页面中,点击 创建应用事务 > Saga 模式

  3. 创建 Saga 事务 页面,根据提示配置以下信息:

    • 应用名称:可从下拉菜单中选择现有应用或新建一个应用。

    • BizType:填写要创建的 Saga 事务的业务流程名称。

    • 描述:对该事务的备注信息。

  4. 单击 下一步,进入设计器页面,开始编排业务流程。

    1. 以便您快速体验,点击此处直接下载一个已经编排好的 JSON 文件

    2. 单击编辑区上方的 JSON View 图标,并将下载的 JSON 语句粘贴到设计器的 JSON View 框中。

    3. 单击 Designer View 图标,切换回设计器模式,即可查看具体的业务流程图。

  5. 将刚刚设计器生成的 JSON 保存到工程中。如在示例工程将其保存至 account-demo-saga/account-demo-service/src/main/resources/statelang/transfer.json

  6. 在工程中配置状态机引擎,参见示例工程文件 account-demo-saga/account-demo-service/src/main/resources/META-INF/spring/transfer-bean.xml

    <bean id="stateMachineEngine" class="io.seata.saga.engine.impl.ProcessCtrlStateMachineEngine">
        <property name="stateMachineConfig" ref="dbStateMachineConfig"></property>
    </bean>
    <bean id="dbStateMachineConfig" class="com.alipay.dtx.client.saga.config.DtxSagaStateMachineConfig">
        <property name="dataSource" ref="transferDataSource"></property>
        <property name="resources" value="classpath:statelang/*.json"></property>
        <property name="enableAsync" value="true"></property>
        <property name="threadPoolExecutor" ref="threadExecutor"></property>
    </bean>
    
    <bean id="threadExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolExecutorFactoryBean">
        <property name="threadNamePrefix" value="SAGA_ASYNC_EXE_"/>
        <property name="corePoolSize" value="20"/>
        <property name="maxPoolSize" value="20"/>
        <property name="queueCapacity" value="100"/>
        <property name="rejectedExecutionHandler" ref="callerRunsPolicy"/>
    </bean>
    
    <bean name="callerRunsPolicy" class="java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy"></bean>
    
    <bean class="io.seata.saga.rm.StateMachineEngineHolder">
        <property name="stateMachineEngine" ref="stateMachineEngine"/>
    </bean>
    说明

    本文的示例工程中是用的 h2 数据源,且自动建表。Saga 事务日志建表语句,可参考 Saga 模式建表语句示例

  7. 在 Java 代码中,通过状态机引擎启动业务流程:

    /**
    *  转账交易
    */
    public interface TransferService{
    
    /**
      * Saga转账
      *
      * @param from
      * @param to
      * @param amount
      * @return
      */
    boolean transferBySaga(final String from,final String to,final BigDecimal amount,Map<String,Object> extParams);
    }

    对应的接口实现代码示例如下:

    public class TransferServiceImpl implements TransferService{
    
            protected final static Logger logger =LoggerFactory.getLogger(TransferServiceImpl.class);
    
            @Autowired
            private StateMachineEngine stateMachineEngine;
    
            @Override
            public boolean transferBySaga(String from,String to,BigDecimal amount,Map<String,Object> extParams){
                try{
                    String businessKey = UUID.randomUUID().toString().replaceAll("-","");
    
                    Map<String,Object> params =new HashMap<>(4);
                    params.put("from", from);
                    params.put("to", to);
                    params.put("amount", amount);
                    if(extParams !=null){
                        params.put("extParams", extParams);
                    }
    
                    StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("transferBySaga",null, businessKey, params);
    
                    if(ExecutionStatus.SU.equals(inst.getStatus())
                            && inst.getCompensationStatus() == null){
                        //正向状态为成功, 补偿状态为空(没为触发回滚),则交易成功
                        return true;
                    }
                    else{
                        //如果执行到了Fail节点会有errorCode和errorMessage
                        String errorCode =(String) inst.getContext().get(DomainConstants.VAR_NAME_STATEMACHINE_ERROR_CODE);
                        String errorMessage =(String) inst.getContext().get(DomainConstants.VAR_NAME_STATEMACHINE_ERROR_MSG);
                        System.out.println("ErrorCode:"+ errorCode +", ErrorMsg:"+ errorMessage +", exception: "+ inst.getException());
                        return false;
                    }
    
                }catch(Throwable t){
                    logger.error("转账交易执行失败", t);
                    throw new RuntimeException(t);
                }
            }
    
            public void setStateMachineEngine(StateMachineEngine stateMachineEngine){
                this.stateMachineEngine = stateMachineEngine;
            }
        }
    

相关链接