本文模拟了转账的业务场景,分别用分布式事务的 TCC 模式、SAGA 模式与 FMT 模式实现了转账功能,并对 TCC 模式、SAGA 模式与 FMT 模式进行了压测。通过本文,您可对 TCC、SAGA、FMT 模式的性能有一个大致了解。
压测模型介绍
如下图所示,“应用 A”对外发布“转账服务”,服务完成“账号 A”向“账号 B”转账的功能。
实现流程如下:
“转账服务”内部开启分布式事务。
“参与者 A”负责从“账号 A”扣款,“参与者 A”由“应用 A”提供(与发起方为同一应用)。
“参与者 B”负责向“账号 B”加钱,“参与者 B”服务由“应用 B”提供。
A 的账号数据存储在“数据库 A”中,此数据库仅由“应用 A”访问;B 的账号数据存储“数据库 B”中,此数据库由“应用 B”访问。
实现转账
表结构设计
数据库表 A 结构为:
## 账户余额表,存储账户余额信息
create table account(
account_no varchar(256)notnull comment '账户',
amount DOUBLE comment ‘账户余额’,
freezed_amount DOUBLE comment ‘账户冻结金额,TCC才会用到的字段’,
primary key (account_no)
);
## 账号操作流水表,记录每一笔分布式事务操作的账号、操作金额、操作类型(扣钱/加钱),TCC模式下才会使用到此表
create table account_transaction(
tx_id varchar(256)notnull,
account_no varchar(256)notnull,
amount DOUBLE,
type varchar(256)notnull,
primary key (tx_id)
);
数据库表 B 结构与数据库表 A 结构相同。
实现 TCC 模式
发起方实现:
@DtxTransaction(bizType = "single-transfer-by-tcc")
public boolean transferByTcc(String from, String to, double amount) {
try {
//第一个参与者
boolean ret = firstTccActionRef.prepare_minus(null, from, amount);
if (!ret) {
//事务回滚
RuntimeContext.setRollBack();
return false;
}
//第二个参与者
ret = secondTccActionRef.prepare_add(null, to, amount);
if (!ret) {
//事务回滚
RuntimeContext.setRollBack();
return false;
}
return ret;
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
参与者 A(扣钱)实现:
public interface FirstTccAction {
@TwoPhaseBusinessAction(name = "firstTccAction", commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare_minus(BusinessActionContext businessActionContext, String accountNo,
double amount);
public boolean commit(BusinessActionContext businessActionContext);
public boolean rollback(BusinessActionContext businessActionContext);
}
参与者 B(加钱)实现:
public interface SecondTccAction {
@TwoPhaseBusinessAction(name = "secondTccAction", commitMethod = "commit", rollbackMethod = "rollback")
public boolean prepare_add(BusinessActionContext businessActionContext, String accountNo, double amount);
public boolean commit(BusinessActionContext businessActionContext);
public boolean rollback(BusinessActionContext businessActionContext);
}
实现 FMT 模式
发起方实现:
@DtxTransaction(bizType = "transfer-by-auto")
public boolean transferByAuto(String from, String to, double amount) {
boolean ret = false;
try {
//第一个参与者
ret = firstAutoAction.amount_minus(from, amount);
if (!ret) {
//事务回滚
RuntimeContext.setRollBack();
return false;
}
//第二个参与者
ret = secondAutoAction.amount_add(to, amount);
if (!ret) {
//事务回滚
RuntimeContext.setRollBack();
return false;
}
return ret;
} catch (Throwable t) {
throw new RuntimeException(t);
}
}
参与者 A(扣钱)实现:
public class FirstAutoActionImpl implements FirstAutoAction {
......
@Override
public boolean amount_minus(final String accountNo, final double amount) {
try {
return tccFirstActionTransactionTemplateAuto.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
try {
Account account = firstAccountDAOAuto.getAccountForUpdate(accountNo);
if (account == null) {
throw new RuntimeException("账号不存在");
}
//扣钱
double newAmount = account.getAmount() - amount;
if (amount < 0) {
throw new RuntimeException("余额不足");
}
account.setAmount(newAmount);
int n = firstAccountDAOAuto.updateAmount(account);
if (n == 1) {
return true;
} else {
status.setRollbackOnly();
return false;
}
} catch (Exception e) {
logger.error("amount_minus error", e);
status.setRollbackOnly();
return false;
}
}
});
} catch (Exception e) {
logger.error("amount_minus failed", e);
return false;
}
}
}
参与者 B(加钱)实现:
public class SecodeAutoActionImpl implements SecondAutoAction {
......
@Override
public boolean amount_add(final String accountNo, final double amount) {
try {
return tccSecondActionTransactionTemplateAuto.execute(new TransactionCallback < Boolean > () {
@Override
public Boolean doInTransaction (TransactionStatus status){
try {
Account account = secondAccountDAOAuto.getAccountForUpdate(accountNo);
if (account == null) {
throw new RuntimeException("账号不存在");
}
//加钱
double newAmount = account.getAmount() + amount;
account.setAmount(newAmount);
secondAccountDAOAuto.updateAmount(account);
} catch (Exception e) {
logger.error("amount_add error", e);
status.setRollbackOnly();
return false;
}
return true;
}
});
} catch (Exception e) {
logger.error("amount_add failed", e);
return false;
}
}
}
实现 SAGA 模式
使用 SAGA 状态机设计业务流程,如下图所示:
amountMinus
是余额扣减服务。amountAdd
是加钱服务。compensateMinus
是余额扣减服务的补偿(冲正)服务,执行的事务就是取消扣减。compensateAdd
是加钱服务的补偿服务,执行的事务是取消加钱。
发起方实现:
public class TransferServiceImpl implements TransferService {
protected final static Logger logger = LoggerFactory.getLogger(TransferServiceImpl.class);
@Autowired
private StateMachineEngine stateMachineEngine;
@Override
public boolean transferBySaga(String from, String to, BigDecimal amount) {
try {
//生产业务ID
String businessKey = UUID.randomUUID().toString().replaceAll("-", "");
Map<String, Object> params = new HashMap<>(4);
params.put("from", from);//转出帐户
params.put("to", to);//转入帐户
params.put("amount", amount);//转帐金额
StateMachineInstance inst = stateMachineEngine.startWithBusinessKey("transferBySaga", null, businessKey, params);
if (ExecutionStatus.SU.equals(inst.getStatus())
&& inst.getCompensationStatus() == null) {
//正向状态为成功, 补偿状态为空(没为触发回滚),则交易成功
return true;
} else {
//如果执行到了Fail节点会有errorCode和errorMessage
String errorCode = (String) inst.getContext().get(DomainConstants.VAR_NAME_STATEMACHINE_ERROR_CODE);
String errorMessage = (String) inst.getContext().get(DomainConstants.VAR_NAME_STATEMACHINE_ERROR_MSG);
System.out.println("ErrorCode:" + errorCode + ", ErrorMsg:" + errorMessage + ", exception: " + inst.getException());
return false;
}
} catch (Throwable t) {
logger.error("转账交易执行失败", t);
throw new RuntimeException(t);
}
}
}
参与者 A(扣钱)实现:
public class FirstSagaActionImpl implements FirstSagaAction {
protected final staticLogger logger = LoggerFactory.getLogger(FirstSagaActionImpl.class);
//账户dao
private AccountDAO firstAccountDAO;
//业务流水dao
private AccountTransactionDAO firstAccountTransactionDAO;
//事务模版
private TransactionTemplate firstActionTransactionTemplate;
/**
* 扣钱操作
**/
@Override
public boolean amountMinus(final String businessKey, final String accountNo, final BigDecimal amount, final Map<String, Object> extParams) {
try {
if (amount.compareTo(BigDecimal.ZERO) < 0) {
throw new RuntimeException("金额必须大小0");
}
AccountTransaction accountTransaction = firstAccountTransactionDAO.findTransaction(businessKey);
if (accountTransaction != null && Status.SUCCESS.name().equals(accountTransaction.getStatus())) {
//幂等控制: 交易已成功,直接返回成功
return true;
}
return firstActionTransactionTemplate.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
try {
//校验账户余额
Account account = firstAccountDAO.getAccountForUpdate(accountNo);
if (account.getAmount().compareTo(amount) < 0) {
throw new RuntimeException("余额不足");
}
//记录账户操作流水
AccountTransaction accountTransaction = new AccountTransaction();
accountTransaction.setBusinessKey(businessKey);
accountTransaction.setAccountNo(accountNo);
accountTransaction.setAmount(amount);
accountTransaction.setType(Type.MINUS.name());
accountTransaction.setStatus(Status.SUCCESS.name());
firstAccountTransactionDAO.addTransaction(accountTransaction);
//扣钱
BigDecimal amount = account.getAmount().subtract(accountTransaction.getAmount());
if (amount.compareTo(BigDecimal.ZERO) < 0) {
throw new RuntimeException("余额不足");
}
account.setAmount(amount);
firstAccountDAO.updateAmount(account);
} catch (Exception e) {
logger.error("扣钱操作失败", e);
throw new RuntimeException("扣钱操作失败", e);
}
return true;
}
});
} catch (Exception e) {
logger.error("扣钱操作失败", e);
return false;
}
}
/**
* 补偿(冲正)扣钱操作
**/
@Override
public boolean compensateAmountMinus(final String businessKey, final String accountNo) {
AccountTransaction accountTransaction;
try {
accountTransaction = firstAccountTransactionDAO.findTransaction(businessKey);
if (accountTransaction == null) {
//原交易流水不存在, 记录防悬挂(后发先至)流水
accountTransaction = new AccountTransaction();
accountTransaction.setBusinessKey(businessKey);
accountTransaction.setAccountNo(accountNo);
accountTransaction.setType(Type.MINUS.name());
accountTransaction.setStatus(Status.COMPENSATED.name());
firstAccountTransactionDAO.addTransaction(accountTransaction);
return true;
}
if (Status.COMPENSATED.name().equals(accountTransaction.getStatus())) {
//幂等控制: 补偿已成功,直接返回成功
return true;
}
final AccountTransaction accountTransactionFinal = accountTransaction;
return firstActionTransactionTemplate.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
try {
//补偿已扣减金额
Account account = firstAccountDAO.getAccountForUpdate(accountTransactionFinal.getAccountNo());
BigDecimal amount = account.getAmount().add(accountTransactionFinal.getAmount());
account.setAmount(amount);
firstAccountDAO.updateAmount(account);
//更新流水状态
accountTransactionFinal.setStatus(Status.COMPENSATED.name());
firstAccountTransactionDAO.updateTransaction(accountTransactionFinal);
return true;
} catch (Exception e) {
logger.error("扣钱操作补偿失败", e);
status.setRollbackOnly();
return false;
}
}
});
} catch (SQLException e) {
logger.error("扣钱操作补偿失败", e);
return false;
}
}
}
参与者 B(加钱)实现:
public class SecondSagaActionImpl implements SecondSagaAction {
protected final static Logger logger = LoggerFactory.getLogger(SecondSagaActionImpl.class);
private AccountDAO secondAccountDAO;
private AccountTransactionDAO secondAccountTransactionDAO;
private TransactionTemplate secondActionTransactionTemplate;
/**
* 加钱操作
**/
@Override
public boolean amountAdd(finalString businessKey, final String accountNo, final BigDecimal amount, final Map<String, Object> extParams) {
try {
if (amount.compareTo(BigDecimal.ZERO) < 0) {
throw new RuntimeException("金额必须大于0");
}
AccountTransaction accountTransaction = secondAccountTransactionDAO.findTransaction(businessKey);
if (accountTransaction != null && Status.SUCCESS.name().equals(accountTransaction.getStatus())) {
//幂等控制: 交易已成功,直接返回成功
return true;
}
return secondActionTransactionTemplate.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
try {
//记录账户操作流水
AccountTransaction accountTransaction = new AccountTransaction();
accountTransaction.setBusinessKey(businessKey);
accountTransaction.setAccountNo(accountNo);
accountTransaction.setAmount(amount);
accountTransaction.setType(Type.ADD.name());
accountTransaction.setStatus(Status.SUCCESS.name());
secondAccountTransactionDAO.addTransaction(accountTransaction);
Account account = secondAccountDAO.getAccountForUpdate(accountNo);
//加钱
BigDecimal amount = account.getAmount().add(accountTransaction.getAmount());
account.setAmount(amount);
secondAccountDAO.updateAmount(account);
} catch (Exception e) {
logger.error("加钱操作失败", e);
throw new RuntimeException("加钱操作失败", e);
}
return true;
}
});
} catch (Exception e) {
logger.error("加钱操作失败", e);
return false;
}
}
/**
* 补偿(冲正)加钱操作
**/
@Override
public boolean compensateAmountAdd(final String businessKey, final String accountNo) {
AccountTransaction accountTransaction;
try {
accountTransaction = secondAccountTransactionDAO.findTransaction(businessKey);
if (accountTransaction == null) {
//防悬挂: 原交流流水不存在, 记录防悬挂(后发先至)流水
accountTransaction = new AccountTransaction();
accountTransaction.setBusinessKey(businessKey);
accountTransaction.setAccountNo(accountNo);
accountTransaction.setType(Type.ADD.name());
accountTransaction.setStatus(Status.COMPENSATED.name());
secondAccountTransactionDAO.addTransaction(accountTransaction);
//允许空补偿: 返回补偿成功
return true;
}
if (Status.COMPENSATED.name().equals(accountTransaction.getStatus())) {
//幂等控制: 补偿已成功,直接返回成功
return true;
}
final AccountTransaction accountTransactionFinal = accountTransaction;
return secondActionTransactionTemplate.execute(new TransactionCallback < Boolean > () {
@Override
public Boolean doInTransaction (TransactionStatus status){
try {
//扣回已加金额
Account account = secondAccountDAO.getAccountForUpdate(accountTransactionFinal.getAccountNo());
BigDecimal amount = account.getAmount().subtract(accountTransactionFinal.getAmount());
if (amount.compareTo(BigDecimal.ZERO) < 0) {
throw new RuntimeException("余额不足, 请工人核对");
}
account.setAmount(amount);
secondAccountDAO.updateAmount(account);
//更新流水状态
accountTransactionFinal.setStatus(Status.COMPENSATED.name());
secondAccountTransactionDAO.updateTransaction(accountTransactionFinal);
return true;
} catch (Exception e) {
logger.error("加钱操作补偿失败", e);
status.setRollbackOnly();
return false;
}
}
});
} catch (SQLException e) {
logger.error("加钱操作补偿失败", e);
return false;
}
}
}
硬件环境
本场景使用的硬件配置如下:
RDS 为 MySql 5.6 数据库。
压测样本数据策略
用户数据规模(账号数量):A 组转账至 B 组,以验证在不同业务数据规模的场景下,分布式事务的 TCC 模式、SAGA 模式和 FMT 模式的性能表现。
压测模式 | A 组账号数 | B 组账号数 |
TCC,FMT,SAGA | 5000 | 5000 |
压测结果
如下图所示,用户数据规模 A 组转账至 B 组的场景下,TCC、SAGA、FMT 模式的 TPS 与 Response Time 数据如下:
TCC(总)为 TCC 模式下 TPS 总数,TCC(成)为 TCC 模式下成功 TPS 总数; FMT(总) 为 FMT 模式下 TPS 总数,FMT(成)为 FMT 模式下成功 TPS 总数; SAGA(总) 为 SAGA 模式下 TPS 总数,SAGA(成)为 SAGA 模式下成功 TPS 总数。
TPS
用户量(A 组转账至 B 组)
TPS
A 组:5000 B 组:5000
并发数
TCC(总)
TCC(成)
FMT(总)
FMT(成)
SAGA(总)
SAGA(成)
1
50
50
40
40
48
48
5
250
250
200
200
240
240
10
460
460
380
380
450
450
20
800
800
695
690
780
780
50
1450
1450
1220
1200
1300
1300
100
1500
1500
1550
1500
1480
1480
200
1500
1500
1600
1450
1550
1550
500
1600
1600
1500
1400
1580
1580
700
1500
1500
1500
1300
1550
1550
Response Time
Response Time(成功)
并发数
TCC
FMT
SAGA
1
19
26
21
5
21
25
21
10
23
27
23
20
27
30
26
50
35
45
38
100
70
75
68
200
135
160
129
500
225
380
316
700
490
520
451