分布式多步骤事务

在一些复杂的业务场景中,通常需要访问多个远程服务,并且对操作事务性语义有较高要求。小流量应用可通过关系型数据库的ACID特性满足,但在高流量场景下,为实现高可用性和可扩展性,通常采用微服务架构。这需要引入队列和数据库来保证事务一致性,增加了开发与运维成本。您可以使用云工作流的长流程分布式事务解决以上问题。本文将介绍如何使用云工作流提供长流程分布式事务保证,帮助您聚焦于自身业务逻辑。

场景示例

假设某应用为其用户提供预订火车票、航班和酒店的功能,要求三个步骤保证事务性。该功能需要三个远程调用实现(例如预订火车票需要调用12306接口),如果三个调用都成功则该订单成功。然而实际上任何一个远程调用都有可能会失败,因此该应用需要对不同的失败场景做出相应的补偿逻辑,回退已完成操作。如下图所示:

image
  • 如果预订火车票(BuyTrainTicket)成功,而预订航班(ReserveFlight)失败,则需要取消已经购买的火车票(CancelTrainTicket),并告知您订单失败。

  • 如果预订火车票(BuyTrainTicket)和预订航班(ReserveFlight)均成功,但是预订酒店(ReserveHotel)失败,则需要取消已经预订的航班(CancelFlight)和火车票(CancelTrainTicket),并告知您订单失败。

操作步骤

步骤1:创建FC函数

本步骤是模拟场景示例中提到的三个操作,即预订火车票、预订航班和预订酒店。首先创建一个名为Operation的函数,要求使用Python 3.10及以上版本。详细步骤,请参见快速创建函数

Operation函数模拟各操作(例如预订航班、预订酒店)的实现,根据输入决定该操作执行结果(成功或失败)。示例代码如下:

import json
import logging
import uuid

def handler(event, context):
  evt = json.loads(event)
  logger = logging.getLogger()
  id = uuid.uuid4()
  op = "operation"
  if 'operation' in evt:
    op = evt['operation']
    if op in evt:
      result = evt[op]
      if result == False:
        logger.info("%s failed" % op)
        exit()
  logger.info("%s succeeded, id %s" % (op, id))
  return '{"%s":"success", "%s_txnID": "%s"}' % (op, op, id)         

步骤2:创建工作流

本步骤创建了一个针对旅行服务(包括火车票、航班和酒店)的预订工作流,并包含错误处理机制,以确保在遇到问题时能够执行适当的回滚操作。

  1. 云工作流控制台创建工作流,参数默认即可。

  2. CloudFlow Studio页面,请根据下面步骤拖拽并配置对应的状态节点。

    1. BuyTrainTicket步骤:调用步骤1Operation函数购买火车票。配置错误处理,若遇到未知错误(FC.Unknown),则跳转到OrderFailed状态,表示整个订单过程失败,并结束流程。

    2. OrderFailed步骤:当BuyTrainTicket因未知错误而无法继续时,流程将终止于此,标记为失败。

    3. ReserveFlight步骤:调用Operation函数预订航班。配置错误处理,若遇到未知错误,则跳转至CancelTrainTicket状态,试图取消之前购买的火车票。

    4. CancelTrainTicket步骤:当预定航班失败时,调用此函数取消已经购买的火车票。

    5. ReserveHotel步骤:预订酒店。配置错误处理,若遇到未知错误,设定了最多三次的指数退避重试策略。如果出现未知错误且所有重试均失败,则跳转至CancelFlight状态,尝试取消已订的航班。

      说明

      预定航班、预定酒店等远程调用可能会因网络或服务错误等原因导致调用失败。通过增加对瞬时错误的重试机制,可以显著提高订单流程的成功率。

    6. CancelFlight步骤:取消之前的航班预订。完成取消操作后,回到CancelTrainTicket再次确认火车票是否已被取消。

    7. OrderSucceed步骤:表示整个预订流程顺利完成。

    下面是上述逻辑对应的YAML示例代码:

    Type: StateMachine
    Name: MyWorkFlow
    SpecVersion: v1
    StartAt: BuyTrainTicket
    States:
      - Type: Task
        Name: BuyTrainTicket
        Action: FC:InvokeFunction
        TaskMode: RequestComplete
        Parameters:
          invocationType: Sync
          body:
            buy_train_ticket.$: $Context.Execution.Input.buy_train_ticket_result
            operation: buy_train_ticket
          resourceArn: acs:fc:{region}:{accountID}:functions/Operation/LATEST
        Catch:
          - Errors:
              - FC.Unknown
            Description: 购票失败
            Next: OrderFailed
        Next: ReserveFlight
      - Type: Fail
        Name: OrderFailed
        Code: 购票失败
        Detail: 购票失败
        End: true
      - Type: Task
        Name: ReserveFlight
        Action: FC:InvokeFunction
        TaskMode: RequestComplete
        Parameters:
          invocationType: Sync
          body:
            reserve_flight.$: $Context.Execution.Input.reserve_flight_result
            operation: reserve_flight
          resourceArn: acs:fc:{region}:{accountID}:functions/Operation/LATEST
        Catch:  # 捕获ReserveFlight task抛出的FC.Unknown错误,跳转到CancelTrainTicket。
          - Errors:
              - FC.Unknown
            Description: 订单失败处理
            Next: CancelTrainTicket
        Next: ReserveHotel
      - Type: Task
        Name: CancelTrainTicket
        Action: FC:InvokeFunction
        TaskMode: RequestComplete
        Parameters:
          invocationType: Sync
          body:
            reserve_flight_txnID.$: $Input.reserve_flight_txnID
            operation: cancel_train_ticket
          resourceArn: acs:fc:{region}:{accountID}:functions/Operation/LATEST
        Next: OrderFailed
      - Type: Task
        Name: ReserveHotel
        Action: FC:InvokeFunction
        TaskMode: RequestComplete
        Parameters:
          invocationType: Sync
          body:
            reserve_hotel.$: $Context.Execution.Input.reserve_hotel_result
            operation: reserve_hotel
          resourceArn: acs:fc:{region}:{accountID}:functions/Operation/LATEST
        Catch:   # 捕获ReserveHotel task抛出的FC.Unknown错误,跳转到CancelFlight。
          - Errors:
              - FC.Unknown
            Description: 取消飞机
            Next: CancelFlight
            OutputConstructor:
              reserve_flight_txnID.$: $Input.reserve_flight_txnID
        Next: OrderSucceed
        Retry:   # 对FC.Unknown类型的错误最多指数退避重试3次,初始间隔1s,后续间隔=上次间隔*2。
          - Errors:
              - FC.Unknown
            Description: '重试策略 #1'
            MaxAttempts: 3
            IntervalSeconds: 1
            BackoffRate: 2
      - Type: Task
        Name: CancelFlight
        Action: FC:InvokeFunction
        TaskMode: RequestComplete
        Parameters:
          invocationType: Sync
          body:
            reserve_flight_txnID.$: $Input.reserve_flight_txnID
            operation: cancel_flight
          resourceArn: acs:fc:{region}:{accountID}:functions/Operation/LATEST
        Next: CancelTrainTicket
      - Type: Succeed
        Name: OrderSucceed
        End: true
    

步骤3:执行并查看结果

  1. 创建好工作流之后,选择工作流配置页签,设置流程角色信息。

  2. 依次单击保存执行,并提供以下JSON对象作为输入,您可以调整result的值观察不同的执行结果。

    {
      "buy_train_ticket_result": true,
      "reserve_flight_result": true,
      "reserve_hotel_result": false
    }

    下图展示的是预订酒店失败的可视化。从步骤信息中您可以看到,由于ReserveHotel函数调用失败,云工作流按照流程定义,依次取消航班(CancelFlight)、取消火车票(CancelTrainTicket)。云工作流每个步骤转换有持久化的保证,因此网络中断或进程崩溃等异常场景下,不会影响流程的事务性。image