Saga 状态机配置

Saga 模式事务是基于 状态机引擎来实现的。分布式事务控制台提供了一个状态设计器,您可以通过该设计器自编排设计状态图,定义各节点的详细信息与属性配置等。

具体的实现机制如下:

  • 通过状态图定义服务调用的流程,并生成 JSON 状态语言定义文件。

  • 状态图中一个节点可以是调用一个服务,节点可以配置它的补偿节点。

  • JSON 状态定义文件由状态机引擎驱动执行。当出现异常时,状态引擎反向执行已成功节点对应的补偿节点将事务回滚。

    说明

    异常发生时,您也可自定义决定是否进行补偿。 可以实现服务编排需求,支持单项选择、并发、子流程、参数转换、参数映射、服务执行状态判断、异常捕获等功能。

本文详细介绍 状态机与 状态的各个属性,以便您进行业务流程的设计编排。

状态机的属性

状态机的属性如下:

{
     "Name":"reduceInventoryAndBalance",
     "Comment":"reduce inventory then reduce balance in a transaction",
     "StartState":"ReduceInventory",
     "Version":"0.0.1",
     "States":{}
}

属性说明

参数

说明

Name

表示状态机的名称,必须唯一。

Comment

状态机的描述。

Version

状态机的版本。

StartState

启动时运行的第一个 状态

States

状态列表,是一个 map 结构。key 是 状态的名称,在状态机内必须唯一;value 是一个 map 结构,表示 状态的属性列表。

状态的属性

ServiceTask

"States":{
            ...
            "ReduceBalance":{
                "Type":"ServiceTask",
                "ServiceName":"balanceAction",
                "ServiceMethod":"reduce",
                "CompensateState":"CompensateReduceBalance",
                "IsForUpdate":true,
                "IsPersist":true,
                "IsAsync":false,
                "Input":[
                    "$.[businessKey]",
                    "$.[amount]",
                    {
                    "throwException":"$.[mockReduceBalanceFail]"
                    }
                ],
                "Output":{
                    "compensateReduceBalanceResult":"$.#root"
                },
                "Status":{
                    "#root == true":"SU",
                    "#root == false":"FA",
                    "$Exception{java.lang.Throwable}":"UN"
                },
                "Retry":[
                    {
                    "Exceptions":["io.seata.saga.engine.mock.DemoException"],
                    "IntervalSeconds":1.5,
                    "MaxAttempts":3,
                    "BackoffRate":1.5
                    },
                    {
                    "IntervalSeconds":1,
                    "MaxAttempts":3,
                    "BackoffRate":1.5
                    }
                ],
                "Catch":[
                    {
                    "Exceptions":[
                    "java.lang.Throwable"
                    ],
                    "Next":"CompensationTrigger"
                    }
                ],
                "Next":"Succeed"
            }
            ...
        }

属性说明

参数

说明

ServiceName

服务名称,通常是服务的 beanId。

ServiceMethod

服务方法名称。

CompensateState

该状态的补偿状态。

IsForUpdate

标识该服务是否会更新数据,默认是 false。如果配置了 CompensateState,则默认是 true。有补偿服务的服务肯定是数据更新类服务。

IsPersist

执行日志是否进行存储,默认是 true。有一些查询类的服务可以配置为 false,执行日志不进行存储提高性能,因为当异常恢复时可以重复执行。

IsAsync

异步调用服务。因为异步调用服务会忽略服务的返回结果,所以用户定义的服务执行状态映射(下文的 status 属性)将被忽略,默认为服务调用成功。如果提交异步调用失败(比如线程池已满),则为服务执行状态失败。

Input

调用服务的输入参数列表,是一个数组,对应于服务方法的参数列表, $.表示使用表达式从状态机上下文中取参数,表达使用的 SpringEL。如果是常量,直接写值即可。关于复杂的参数如何传入,参见 复杂参数的 Input 定义

Output

将服务返回的参数赋值到状态机上下文中,是一个 map 结构。

  • key 为放入到状态机上文时的 key(状态机上下文也是一个 map)。

  • value 中 $.是表示 SpringEL 表达式,表示从服务的返回参数中取值,#root表示服务的整个返回参数。

Status

服务执行状态映射,框架定义了三个状态:SU成功、FA失败、UN未知。需要把服务执行的状态映射成这三个状态,帮助框架判断整个事务的一致性,是一个 map 结构。

  • key 是条件表达式,一般是取服务的返回值或抛出的异常进行判断,默认是 SpringEL 表达式,判断服务返回参数,带 $Exception{开头表示判断异常类型。

  • value 是当这个条件表达式成立时则将服务执行状态映射成这个值。

Catch

捕获到异常后的路由。

Retry

捕获异常后的重试策略,是个数组,可以配置多个规则。

  • Exceptions为匹配的异常列表。

  • IntervalSeconds为重试间隔。

  • MaxAttempts为最大重试次数。

  • BackoffRate为下一次重试间隔相对于上一次重试间隔的倍数。例如,如果上次一重试间隔是 2 秒,而 BackoffRate=1.5,则下一次重试间隔是 3 秒。

  • Exceptions属性可以不配置。不配置时,表示框架自动匹配网络超时异常。当在重试过程中发生了别的异常,框架会重新匹配规则,并按新规则进行重试,同一种规则的总重试次数不会超过该规则的 MaxAttempts。

Next

服务执行完成后下一个执行的状态。

当没有配置 Status 对服务执行状态进行映射,系统会自动判断状态:

  • 没有异常,则认为执行成功。

  • 如果有异常,则判断异常是不是网络连接超时,如果是则认为是 FA

  • 如果是其它异常,且服务 IsForUpdate=true,则状态为 UN,否则为 FA

整个状态机的执行状态是由框架自己判断的,状态机有两个状态:status(正向执行状态)与 compensateStatus(补偿状态)。

  • 如果所有服务执行成功(事务提交成功),则 status=SU,compensateStatus=null。

  • 如果有服务执行失败,且存在更新类服务执行成功,且没有进行补偿(事务提交失败),则 status=UN,compensateStatus=null。

  • 如果有服务执行失败,且不存在更新类服务执行成功,且没有进行补偿(事务提交失败),则 status=FA,compensateStatus=null。

  • 如果补偿成功(事务回滚成功),则 status=FA/UN,compensateStatus=SU。

  • 发生补偿且有未补偿成功的服务(回滚失败),则 status=FA/UN,compensateStatus=UN。

  • 存在事务提交或回滚失败的情况,Seata Sever 都会不断发起重试。

Choice

"ChoiceState":{
        "Type":"Choice",
        "Choices":[
            {
            "Expression":"[reduceInventoryResult] == true",
            "Next":"ReduceBalance"
            }
        ],
        "Default":"Fail"
}

Choice 类型的状态是单项选择路由。

  • Choices:可选的分支列表,只会选择第一个条件成立的分支。

  • Expression:SpringEL 表达式。

  • Next:当 Expression 表达式成立时,执行的下一个状态。

Succeed

"Succeed":{
    "Type":"Succeed"
}

运行到 Succeed 状态,表示状态机正常结束。正常结束不代表成功结束,是否成功要看每个状态是否都成功。

Fail

"Fail":{
    "Type":"Fail",
    "ErrorCode":"PURCHASE_FAILED",
    "Message":"purchase failed"
}

运行到 Fail 状态,表示状态机异常结束。异常结束时,可以配置 ErrorCode 和 Message,表示错误码和错误信息,可以用于给调用方返回错误码和消息。

CompensationTrigger

"CompensationTrigger":{
    "Type":"CompensationTrigger",
    "Next":"Fail"
}

CompensationTrigger类型的 state 是用于触发补偿事件,回滚分布式事务。Next表示补偿成功后路由到的 state。

SubStateMachine

"CallSubStateMachine":{
        "Type":"SubStateMachine",
        "StateMachineName":"simpleCompensationStateMachine",
        "CompensateState":"CompensateSubMachine",
        "Input":[
            {
            "a":"$.1",
            "barThrowException":"$.[barThrowException]",
            "fooThrowException":"$.[fooThrowException]",
            "compensateFooThrowException":"$.[compensateFooThrowException]"
            }
        ],
        "Output":{
             "fooResult":"$.#root"
        },
        "Next":"Succeed"
}

SubStateMachine类型的“状态”是调用子状态机。

  • StateMachineName:要调用的子状态机名称。

  • CompensateState:子状态机的补偿 state,可以不配置,系统会自动创建它的补偿 state。子状态机的补偿实际就是调用子状态机的 compensate 方法。所以用户并不需要自己实现一个对子状态机的补偿服务。当配置这个属性时,可以利用 Input 属性自定义传入一些变量,参见下面的 CompensateSubMachine

CompensateSubMachine

"CompensateSubMachine":{
      "Type":"CompensateSubMachine",
      "Input":[
           {
                 "compensateFooThrowException":"$.[compensateFooThrowException]"
           }
       ]
}

CompensateSubMachine类型的 state 是专门用于补偿一个子状态机的 state,它会调用子状态机的 compensate 方法,可以利用 Input 属性传入一些自定义的变量,Status 属性自定判断补偿是否成功。

复杂参数的 Input 定义

"FirstState":{
        "Type":"ServiceTask",
        "ServiceName":"demoService",
        "ServiceMethod":"complexParameterMethod",
        "Next":"ChoiceState",
        "ParameterTypes":["java.lang.String","int","io.seata.saga.engine.mock.DemoService$People","[Lio.seata.saga.engine.mock.DemoService$People;","java.util.List","java.util.Map"],
        "Input":[
                "$.[people].name",
                "$.[people].age",
                 {
                    "name":"$.[people].name",
                    "age":"$.[people].age",
                    "childrenArray":[
                        {
                        "name":"$.[people].name",
                        "age":"$.[people].age"
                        },
                        {
                        "name":"$.[people].name",
                        "age":"$.[people].age"
                        }
                    ],
                    "childrenList":[
                        {
                        "name":"$.[people].name",
                        "age":"$.[people].age"
                        },
                        {
                        "name":"$.[people].name",
                        "age":"$.[people].age"
                        }
                    ],
                    "childrenMap":{
                        "lilei":{
                        "name":"$.[people].name",
                        "age":"$.[people].age"
                        }
                    }
                 },
                [
                    {
                    "name":"$.[people].name",
                    "age":"$.[people].age"
                    },
                    {
                    "name":"$.[people].name",
                    "age":"$.[people].age"
                    }
                ],
                [
                    {
                    "@type":"io.seata.saga.engine.mock.DemoService$People",
                    "name":"$.[people].name",
                    "age":"$.[people].age"
                    }
                ],
                {
                "lilei":{
                "@type":"io.seata.saga.engine.mock.DemoService$People",
                "name":"$.[people].name",
                "age":"$.[people].age"
                }
            }
        ],
        "Output":{
             "complexParameterMethodResult":"$.#root"
        }
}

上述的 complexParameterMethod方法定义如下:

People complexParameterMethod(String name, int age,People people,People[] peopleArray,List<People> peopleList,Map<String,People> peopleMap)
class People{
    private String name;
    private int    age;
    private People[] childrenArray;
    private List<People> childrenList;
    private Map<String,People> childrenMap;
...
}

启动状态机时,需传入参数:

Map<String,Object> paramMap =new HashMap<>(1);
People people =new People();
people.setName("lilei");
people.setAge(18);
paramMap.put("people", people);
String stateMachineName ="simpleStateMachineWithComplexParams";
StateMachineInstance inst = stateMachineEngine.start(stateMachineName,null, paramMap);
说明

ParameterTypes属性是可以不用传的,调用的方法的参数列表中有 Map、List 这种可以带泛型的集合类型。因为 Java 编译会丢失泛型,所以需要使用这个属性,同时在 Input 的 JSON 中对应的对这个 JSON 加 @type来申明泛型(集合的元素类型)。