在一些复杂的业务场景中,通常需要访问多个远程服务,并且对操作事务性语义有较高要求。小流量应用可通过关系型数据库的ACID特性满足,但在高流量场景下,为实现高可用性和可扩展性,通常采用微服务架构。这需要引入队列和数据库来保证事务一致性,增加了开发与运维成本。您可以使用云工作流的长流程分布式事务解决以上问题。本文将介绍如何使用云工作流提供长流程分布式事务保证,帮助您聚焦于自身业务逻辑。
场景示例
假设某应用为其用户提供预订火车票、航班和酒店的功能,要求三个步骤保证事务性。该功能需要三个远程调用实现(例如预订火车票需要调用12306接口),如果三个调用都成功则该订单成功。然而实际上任何一个远程调用都有可能会失败,因此该应用需要对不同的失败场景做出相应的补偿逻辑,回退已完成操作。如下图所示:
如果预订火车票(BuyTrainTicket)成功,而预订航班(ReserveFlight)失败,则需要取消已经购买的火车票(CancelTrainTicket),并告知您订单失败。
如果预订火车票(BuyTrainTicket)和预订航班(ReserveFlight)均成功,但是预订酒店(ReserveHotel)失败,则需要取消已经预订的航班(CancelFlight)和火车票(CancelTrainTicket),并告知您订单失败。
操作步骤
步骤1:创建FC函数
本步骤是模拟场景示例中提到的三个操作,即预订火车票、预订航班和预订酒店。首先创建一个名为Operation
的函数,要求使用Python 3.10及以上版本。详细步骤,请参见快速创建函数。
Operation
函数模拟各操作(例如预订航班、预订酒店)的实现,根据输入决定该操作执行结果(成功或失败)。示例代码如下:
import json
import logging
import uuid
def handler(event, context):
evt = json.loads(event)
logger = logging.getLogger()
id = uuid.uuid4()
op = "operation"
if 'operation' in evt:
op = evt['operation']
if op in evt:
result = evt[op]
if result == False:
logger.info("%s failed" % op)
exit()
logger.info("%s succeeded, id %s" % (op, id))
return '{"%s":"success", "%s_txnID": "%s"}' % (op, op, id)
步骤2:创建工作流
本步骤创建了一个针对旅行服务(包括火车票、航班和酒店)的预订工作流,并包含错误处理机制,以确保在遇到问题时能够执行适当的回滚操作。
在云工作流控制台创建工作流,参数默认即可。
在CloudFlow Studio页面,请根据下面步骤拖拽并配置对应的状态节点。
BuyTrainTicket步骤:调用步骤1的
Operation
函数购买火车票。配置错误处理,若遇到未知错误(FC.Unknown
),则跳转到OrderFailed
状态,表示整个订单过程失败,并结束流程。OrderFailed步骤:当
BuyTrainTicket
因未知错误而无法继续时,流程将终止于此,标记为失败。ReserveFlight步骤:调用
Operation
函数预订航班。配置错误处理,若遇到未知错误,则跳转至CancelTrainTicket
状态,试图取消之前购买的火车票。CancelTrainTicket步骤:当预定航班失败时,调用此函数取消已经购买的火车票。
ReserveHotel步骤:预订酒店。配置错误处理,若遇到未知错误,设定了最多三次的指数退避重试策略。如果出现未知错误且所有重试均失败,则跳转至
CancelFlight
状态,尝试取消已订的航班。说明预定航班、预定酒店等远程调用可能会因网络或服务错误等原因导致调用失败。通过增加对瞬时错误的重试机制,可以显著提高订单流程的成功率。
CancelFlight步骤:取消之前的航班预订。完成取消操作后,回到
CancelTrainTicket
再次确认火车票是否已被取消。OrderSucceed步骤:表示整个预订流程顺利完成。
下面是上述逻辑对应的YAML示例代码:
Type: StateMachine Name: MyWorkFlow SpecVersion: v1 StartAt: BuyTrainTicket States: - Type: Task Name: BuyTrainTicket Action: FC:InvokeFunction TaskMode: RequestComplete Parameters: invocationType: Sync body: buy_train_ticket.$: $Context.Execution.Input.buy_train_ticket_result operation: buy_train_ticket resourceArn: acs:fc:{region}:{accountID}:functions/Operation/LATEST Catch: - Errors: - FC.Unknown Description: 购票失败 Next: OrderFailed Next: ReserveFlight - Type: Fail Name: OrderFailed Code: 购票失败 Detail: 购票失败 End: true - Type: Task Name: ReserveFlight Action: FC:InvokeFunction TaskMode: RequestComplete Parameters: invocationType: Sync body: reserve_flight.$: $Context.Execution.Input.reserve_flight_result operation: reserve_flight resourceArn: acs:fc:{region}:{accountID}:functions/Operation/LATEST Catch: # 捕获ReserveFlight task抛出的FC.Unknown错误,跳转到CancelTrainTicket。 - Errors: - FC.Unknown Description: 订单失败处理 Next: CancelTrainTicket Next: ReserveHotel - Type: Task Name: CancelTrainTicket Action: FC:InvokeFunction TaskMode: RequestComplete Parameters: invocationType: Sync body: reserve_flight_txnID.$: $Input.reserve_flight_txnID operation: cancel_train_ticket resourceArn: acs:fc:{region}:{accountID}:functions/Operation/LATEST Next: OrderFailed - Type: Task Name: ReserveHotel Action: FC:InvokeFunction TaskMode: RequestComplete Parameters: invocationType: Sync body: reserve_hotel.$: $Context.Execution.Input.reserve_hotel_result operation: reserve_hotel resourceArn: acs:fc:{region}:{accountID}:functions/Operation/LATEST Catch: # 捕获ReserveHotel task抛出的FC.Unknown错误,跳转到CancelFlight。 - Errors: - FC.Unknown Description: 取消飞机 Next: CancelFlight OutputConstructor: reserve_flight_txnID.$: $Input.reserve_flight_txnID Next: OrderSucceed Retry: # 对FC.Unknown类型的错误最多指数退避重试3次,初始间隔1s,后续间隔=上次间隔*2。 - Errors: - FC.Unknown Description: '重试策略 #1' MaxAttempts: 3 IntervalSeconds: 1 BackoffRate: 2 - Type: Task Name: CancelFlight Action: FC:InvokeFunction TaskMode: RequestComplete Parameters: invocationType: Sync body: reserve_flight_txnID.$: $Input.reserve_flight_txnID operation: cancel_flight resourceArn: acs:fc:{region}:{accountID}:functions/Operation/LATEST Next: CancelTrainTicket - Type: Succeed Name: OrderSucceed End: true
步骤3:执行并查看结果
创建好工作流之后,选择工作流配置页签,设置流程角色信息。
依次单击保存和执行,并提供以下JSON对象作为输入,您可以调整
result
的值观察不同的执行结果。{ "buy_train_ticket_result": true, "reserve_flight_result": true, "reserve_hotel_result": false }
下图展示的是预订酒店失败的可视化。从步骤信息中您可以看到,由于
ReserveHotel
函数调用失败,云工作流按照流程定义,依次取消航班(CancelFlight)、取消火车票(CancelTrainTicket)。云工作流每个步骤转换有持久化的保证,因此网络中断或进程崩溃等异常场景下,不会影响流程的事务性。