任务状态轮询

在长时间任务的场景中如果任务结束后没有回调机制,开发者通常会采用轮询的方式来判断任务的结束。可靠的轮询实现需要维护状态的持久化以保证即使当前轮询进程失败退出,进程恢复后轮询也会继续进行。本文将为您介绍任务状态轮询和云工作流实现的具体步骤。

场景示例

当用户调用函数计算提交一个执行时间跨度较大(1分钟至数小时)的多媒体处理任务时,需要通过持续轮询API来获取任务状态。

操作步骤

步骤1:创建FC函数

本步骤是模拟场景示例中所需使用的函数。首先创建两个Python 3.10及以上版本的任务函数。详细步骤,请参见创建任务函数

  • 创建一个名为StartJob的函数:模拟通过调用API开始一个长时间的任务,返回一个任务ID。

    import logging
    import uuid
    
    def handler(event, context):
      logger = logging.getLogger()
      id = uuid.uuid4()
      logger.info('Started job with ID %s' % id)
      return {"job_id": str(id)}           
  • 创建一个名为GetJobStatus函数:模拟通过调用API获取指定任务的执行结果,比较当前的时间和函数第一次执行的时间的差值和输入中delay的值,返回不同的状态:“success”或“running”。

    import logging
    import time
    import json
    
    start_time = int(time.time())
    
    def handler(event, context):
      evt = json.loads(event)
      logger = logging.getLogger()
      job_id = evt["job_id"]
      logger.info('Started job with ID %s' % job_id)
    
      now = int(time.time())
      status = "running"
    
      delay = 60
      if "delay" in evt:
        delay = evt["delay"]
    
      if now - start_time > delay:
        status = "success"
      else:
        status = "running"
    
      try_count = 0
      if "try_count" in evt:
        try_count = evt["try_count"]
    
      try_count = try_count + 1
      logger.info('Job %s, status %s, try_count %d' % (job_id, status, try_count))
      return {"job_id": job_id, "job_status":status, "try_count":try_count}         

步骤2:创建工作流

本步骤是创建一个工作流,用于启动一个任务、轮询该任务的状态,并根据任务的最终状态决定如何结束流程。

  1. 云工作流控制台创建工作流,参数默认即可。

  2. CloudFlow Studio页面,请根据下面步骤拖拽并配置对应的状态节点。

    1. StartJob步骤:调用步骤1StartJob函数开始一个任务。

    2. Wait步骤:设置等待时间10秒。

    3. GetJobStatus步骤:调用步骤1GetJobStatus函数获取任务的执行结果。

    4. CheckJobComplete步骤:根据GetJobStatus函数返回的结果选择下一个步骤:

      • 如果返回"success",则整个流程执行成功。

      • 如果轮询尝试次数大于3次,认为任务执行失败,流程执行失败。

      • 如果返回"running"则跳回到Wait步骤,继续执行。

    下面是上述逻辑对应的YAML示例代码:

    Type: StateMachine
    Name: MyWorkFlow
    SpecVersion: v1
    StartAt: StartJob
    States:
      - Type: Task
        Name: StartJob
        Action: FC:InvokeFunction
        TaskMode: RequestComplete
        Parameters:
          invocationType: Sync
          resourceArn: acs:fc:{region}:{accountID}:functions/StartJob/LATEST
        Next: Pass
      - Type: Pass
        Name: Pass
        Next: Wait
        OutputConstructor:
          $: $Input
        InputConstructor:
          try_count: 0
          job_id.$: $Input.Body.job_id
      - Type: Wait
        Name: Wait
        Seconds: 10
        Next: GetJobStatus
      - Type: Task
        Name: GetJobStatus
        Action: FC:InvokeFunction
        TaskMode: RequestComplete
        Parameters:
          invocationType: Sync
          resourceArn: acs:fc:{region}:{accountID}:functions/GetJobStatus/LATEST
          body:
            job_id.$: $Input.job_id
            try_count.$: $Input.try_count
            delay.$: $Context.Execution.Input.delay
        Next: CheckJobComplete
        OutputConstructor:
          $: jsonMerge($Input, $Output.Body)
      - Type: Choice
        Name: CheckJobComplete
        Branches:
          - Condition: $Input.job_status== "success"
            Next: JobSucceeded
          - Condition: $Input.try_count > 3
            Next: FailJobFailed
          - Condition: $Input.job_status== "running"
            Next: Wait
        Default: FailJobFailed
      - Type: Fail
        Name: FailJobFailed
        Code: 超时
        End: true
      - Type: Succeed
        Name: JobSucceeded
        End: true
    

步骤3:开始执行并查看结果

  1. 创建好工作流之后,选择工作流配置页签,设置流程角色信息。

  2. 依次单击保存执行,并提供以下JSON对象作为输入,其中delay字段的值模拟任务完成需要的时间,这里预期任务在开始30秒后,GetJobStatus函数返回“success”,在此之前均返回“running”,您可以调整delay的值观察不同的执行结果。

    {
      "delay": 30
    }
    • 下图展示的是轮询从开始到结束的流程执行可视化。image

    • 下图展示的是任务需要30秒完成,可以看到流程执行历史中GetJobStatus返回“running”因此CheckJobComplete的后续步骤回到Wait进行等待和下一次查询。image