Saga 模式事务是基于 状态机引擎来实现的。分布式事务控制台提供了一个状态设计器,您可以通过该设计器自编排设计状态图,定义各节点的详细信息与属性配置等。
具体的实现机制如下:
通过状态图定义服务调用的流程,并生成 JSON 状态语言定义文件。
状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点。
JSON 状态定义文件由状态机引擎驱动执行。当出现异常时,状态引擎反向执行已成功节点对应的补偿节点将事务回滚。
说明异常发生时,您也可自定义决定是否进行补偿。 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能。
本文详细介绍 状态机与 状态的各个属性,以便您进行业务流程的设计编排。
状态机的属性
状态机的属性如下:
{
"Name":"reduceInventoryAndBalance",
"Comment":"reduce inventory then reduce balance in a transaction",
"StartState":"ReduceInventory",
"Version":"0.0.1",
"States":{}
}
属性说明
参数 | 说明 |
| 表示状态机的名称,必须唯一。 |
| 状态机的描述。 |
| 状态机的版本。 |
| 启动时运行的第一个 状态。 |
| 状态列表,是一个 map 结构。key 是 状态的名称,在状态机内必须唯一;value 是一个 map 结构,表示 状态的属性列表。 |
状态的属性
ServiceTask
"States":{
...
"ReduceBalance":{
"Type":"ServiceTask",
"ServiceName":"balanceAction",
"ServiceMethod":"reduce",
"CompensateState":"CompensateReduceBalance",
"IsForUpdate":true,
"IsPersist":true,
"IsAsync":false,
"Input":[
"$.[businessKey]",
"$.[amount]",
{
"throwException":"$.[mockReduceBalanceFail]"
}
],
"Output":{
"compensateReduceBalanceResult":"$.#root"
},
"Status":{
"#root == true":"SU",
"#root == false":"FA",
"$Exception{java.lang.Throwable}":"UN"
},
"Retry":[
{
"Exceptions":["io.seata.saga.engine.mock.DemoException"],
"IntervalSeconds":1.5,
"MaxAttempts":3,
"BackoffRate":1.5
},
{
"IntervalSeconds":1,
"MaxAttempts":3,
"BackoffRate":1.5
}
],
"Catch":[
{
"Exceptions":[
"java.lang.Throwable"
],
"Next":"CompensationTrigger"
}
],
"Next":"Succeed"
}
...
}
属性说明
参数 | 说明 |
| 服务名称,通常是服务的 beanId。 |
| 服务方法名称。 |
| 该状态的补偿状态。 |
| 标识该服务是否会更新数据,默认是 false。如果配置了 |
| 执行日志是否进行存储,默认是 true。有一些查询类的服务可以配置为 false,执行日志不进行存储提高性能,因为当异常恢复时可以重复执行。 |
| 异步调用服务。因为异步调用服务会忽略服务的返回结果,所以用户定义的服务执行状态映射(下文的 status 属性)将被忽略,默认为服务调用成功。如果提交异步调用失败(比如线程池已满),则为服务执行状态失败。 |
| 调用服务的输入参数列表,是一个数组,对应于服务方法的参数列表, |
| 将服务返回的参数赋值到状态机上下文中,是一个 map 结构。
|
| 服务执行状态映射,框架定义了三个状态:
|
| 捕获到异常后的路由。 |
| 捕获异常后的重试策略,是个数组,可以配置多个规则。
|
| 服务执行完成后下一个执行的状态。 |
当没有配置 Status 对服务执行状态进行映射,系统会自动判断状态:
没有异常,则认为执行成功。
如果有异常,则判断异常是不是网络连接超时,如果是则认为是
FA
。如果是其它异常,且服务
IsForUpdate=true
,则状态为UN
,否则为FA
。
整个状态机的执行状态是由框架自己判断的,状态机有两个状态:status(正向执行状态)与 compensateStatus(补偿状态)。
如果所有服务执行成功(事务提交成功),则 status=SU,compensateStatus=null。
如果有服务执行失败,且存在更新类服务执行成功,且没有进行补偿(事务提交失败),则 status=UN,compensateStatus=null。
如果有服务执行失败,且不存在更新类服务执行成功,且没有进行补偿(事务提交失败),则 status=FA,compensateStatus=null。
如果补偿成功(事务回滚成功),则 status=FA/UN,compensateStatus=SU。
发生补偿且有未补偿成功的服务(回滚失败),则 status=FA/UN,compensateStatus=UN。
存在事务提交或回滚失败的情况,Seata Sever 都会不断发起重试。
Choice
"ChoiceState":{
"Type":"Choice",
"Choices":[
{
"Expression":"[reduceInventoryResult] == true",
"Next":"ReduceBalance"
}
],
"Default":"Fail"
}
Choice 类型的状态是单项选择路由。
Choices
:可选的分支列表,只会选择第一个条件成立的分支。Expression
:SpringEL 表达式。Next
:当 Expression 表达式成立时,执行的下一个状态。
Succeed
"Succeed":{
"Type":"Succeed"
}
运行到 Succeed 状态,表示状态机正常结束。正常结束不代表成功结束,是否成功要看每个状态是否都成功。
Fail
"Fail":{
"Type":"Fail",
"ErrorCode":"PURCHASE_FAILED",
"Message":"purchase failed"
}
运行到 Fail 状态,表示状态机异常结束。异常结束时,可以配置 ErrorCode 和 Message,表示错误码和错误信息,可以用于给调用方返回错误码和消息。
CompensationTrigger
"CompensationTrigger":{
"Type":"CompensationTrigger",
"Next":"Fail"
}
CompensationTrigger
类型的 state 是用于触发补偿事件,回滚分布式事务。Next
表示补偿成功后路由到的 state。
SubStateMachine
"CallSubStateMachine":{
"Type":"SubStateMachine",
"StateMachineName":"simpleCompensationStateMachine",
"CompensateState":"CompensateSubMachine",
"Input":[
{
"a":"$.1",
"barThrowException":"$.[barThrowException]",
"fooThrowException":"$.[fooThrowException]",
"compensateFooThrowException":"$.[compensateFooThrowException]"
}
],
"Output":{
"fooResult":"$.#root"
},
"Next":"Succeed"
}
SubStateMachine
类型的“状态”是调用子状态机。
StateMachineName
:要调用的子状态机名称。CompensateState
:子状态机的补偿 state,可以不配置,系统会自动创建它的补偿 state。子状态机的补偿实际就是调用子状态机的 compensate 方法。所以用户并不需要自己实现一个对子状态机的补偿服务。当配置这个属性时,可以利用 Input 属性自定义传入一些变量,参见下面的CompensateSubMachine
。
CompensateSubMachine
"CompensateSubMachine":{
"Type":"CompensateSubMachine",
"Input":[
{
"compensateFooThrowException":"$.[compensateFooThrowException]"
}
]
}
CompensateSubMachine
类型的 state 是专门用于补偿一个子状态机的 state,它会调用子状态机的 compensate 方法,可以利用 Input 属性传入一些自定义的变量,Status 属性自定判断补偿是否成功。
复杂参数的 Input 定义
"FirstState":{
"Type":"ServiceTask",
"ServiceName":"demoService",
"ServiceMethod":"complexParameterMethod",
"Next":"ChoiceState",
"ParameterTypes":["java.lang.String","int","io.seata.saga.engine.mock.DemoService$People","[Lio.seata.saga.engine.mock.DemoService$People;","java.util.List","java.util.Map"],
"Input":[
"$.[people].name",
"$.[people].age",
{
"name":"$.[people].name",
"age":"$.[people].age",
"childrenArray":[
{
"name":"$.[people].name",
"age":"$.[people].age"
},
{
"name":"$.[people].name",
"age":"$.[people].age"
}
],
"childrenList":[
{
"name":"$.[people].name",
"age":"$.[people].age"
},
{
"name":"$.[people].name",
"age":"$.[people].age"
}
],
"childrenMap":{
"lilei":{
"name":"$.[people].name",
"age":"$.[people].age"
}
}
},
[
{
"name":"$.[people].name",
"age":"$.[people].age"
},
{
"name":"$.[people].name",
"age":"$.[people].age"
}
],
[
{
"@type":"io.seata.saga.engine.mock.DemoService$People",
"name":"$.[people].name",
"age":"$.[people].age"
}
],
{
"lilei":{
"@type":"io.seata.saga.engine.mock.DemoService$People",
"name":"$.[people].name",
"age":"$.[people].age"
}
}
],
"Output":{
"complexParameterMethodResult":"$.#root"
}
}
上述的 complexParameterMethod
方法定义如下:
People complexParameterMethod(String name, int age,People people,People[] peopleArray,List<People> peopleList,Map<String,People> peopleMap)
class People{
private String name;
private int age;
private People[] childrenArray;
private List<People> childrenList;
private Map<String,People> childrenMap;
...
}
启动状态机时,需传入参数:
Map<String,Object> paramMap =new HashMap<>(1);
People people =new People();
people.setName("lilei");
people.setAge(18);
paramMap.put("people", people);
String stateMachineName ="simpleStateMachineWithComplexParams";
StateMachineInstance inst = stateMachineEngine.start(stateMachineName,null, paramMap);
ParameterTypes
属性是可以不用传的,调用的方法的参数列表中有 Map、List 这种可以带泛型的集合类型。因为 Java 编译会丢失泛型,所以需要使用这个属性,同时在 Input 的 JSON 中对应的对这个 JSON 加 @type
来申明泛型(集合的元素类型)。