在长时间任务的场景中如果任务结束后没有回调机制,开发者通常会采用轮询的方式来判断任务的结束。可靠的轮询实现需要维护状态的持久化以保证即使当前轮询进程失败退出,进程恢复后轮询也会继续进行。本文将为您介绍任务状态轮询和云工作流实现的具体步骤。
场景示例
当用户调用函数计算提交一个执行时间跨度较大(1分钟至数小时)的多媒体处理任务时,需要通过持续轮询API来获取任务状态。
操作步骤
步骤1:创建FC函数
本步骤是模拟场景示例中所需使用的函数。首先创建两个Python 3.10及以上版本的任务函数。详细步骤,请参见创建任务函数。
创建一个名为StartJob的函数:模拟通过调用API开始一个长时间的任务,返回一个任务ID。
import logging import uuid def handler(event, context): logger = logging.getLogger() id = uuid.uuid4() logger.info('Started job with ID %s' % id) return {"job_id": str(id)}
创建一个名为GetJobStatus函数:模拟通过调用API获取指定任务的执行结果,比较当前的时间和函数第一次执行的时间的差值和输入中
delay
的值,返回不同的状态:“success”或“running”。import logging import time import json start_time = int(time.time()) def handler(event, context): evt = json.loads(event) logger = logging.getLogger() job_id = evt["job_id"] logger.info('Started job with ID %s' % job_id) now = int(time.time()) status = "running" delay = 60 if "delay" in evt: delay = evt["delay"] if now - start_time > delay: status = "success" else: status = "running" try_count = 0 if "try_count" in evt: try_count = evt["try_count"] try_count = try_count + 1 logger.info('Job %s, status %s, try_count %d' % (job_id, status, try_count)) return {"job_id": job_id, "job_status":status, "try_count":try_count}
步骤2:创建工作流
本步骤是创建一个工作流,用于启动一个任务、轮询该任务的状态,并根据任务的最终状态决定如何结束流程。
在云工作流控制台创建工作流,参数默认即可。
在CloudFlow Studio页面,请根据下面步骤拖拽并配置对应的状态节点。
StartJob步骤:调用步骤1的
StartJob
函数开始一个任务。Wait步骤:设置等待时间10秒。
GetJobStatus步骤:调用步骤1的
GetJobStatus
函数获取任务的执行结果。CheckJobComplete步骤:根据
GetJobStatus
函数返回的结果选择下一个步骤:如果返回"success",则整个流程执行成功。
如果轮询尝试次数大于3次,认为任务执行失败,流程执行失败。
如果返回"running"则跳回到
Wait
步骤,继续执行。
下面是上述逻辑对应的YAML示例代码:
Type: StateMachine Name: MyWorkFlow SpecVersion: v1 StartAt: StartJob States: - Type: Task Name: StartJob Action: FC:InvokeFunction TaskMode: RequestComplete Parameters: invocationType: Sync resourceArn: acs:fc:{region}:{accountID}:functions/StartJob/LATEST Next: Pass - Type: Pass Name: Pass Next: Wait OutputConstructor: $: $Input InputConstructor: try_count: 0 job_id.$: $Input.Body.job_id - Type: Wait Name: Wait Seconds: 10 Next: GetJobStatus - Type: Task Name: GetJobStatus Action: FC:InvokeFunction TaskMode: RequestComplete Parameters: invocationType: Sync resourceArn: acs:fc:{region}:{accountID}:functions/GetJobStatus/LATEST body: job_id.$: $Input.job_id try_count.$: $Input.try_count delay.$: $Context.Execution.Input.delay Next: CheckJobComplete OutputConstructor: $: jsonMerge($Input, $Output.Body) - Type: Choice Name: CheckJobComplete Branches: - Condition: $Input.job_status== "success" Next: JobSucceeded - Condition: $Input.try_count > 3 Next: FailJobFailed - Condition: $Input.job_status== "running" Next: Wait Default: FailJobFailed - Type: Fail Name: FailJobFailed Code: 超时 End: true - Type: Succeed Name: JobSucceeded End: true
步骤3:开始执行并查看结果
创建好工作流之后,选择工作流配置页签,设置流程角色信息。
依次单击保存和执行,并提供以下JSON对象作为输入,其中
delay
字段的值模拟任务完成需要的时间,这里预期任务在开始30秒后,GetJobStatus
函数返回“success”,在此之前均返回“running”,您可以调整delay
的值观察不同的执行结果。{ "delay": 30 }
下图展示的是轮询从开始到结束的流程执行可视化。
下图展示的是任务需要30秒完成,可以看到流程执行历史中
GetJobStatus
返回“running”因此CheckJobComplete
的后续步骤回到Wait
进行等待和下一次查询。