本文将演示如何使用Python SDK提交一个计算π的任务,查看任务的状态和日志,超时终止任务, 以及查看虚拟集群的历史状态。

"""
演示如何使用Python SDK操作数据湖分析的spark作业

author aliyun
"""
from aliyunsdkcore.client import AcsClient
from aliyunsdkopenanalytics_open.request.v20180619 import SubmitSparkJobRequest, GetJobStatusRequest, GetJobLogRequest, \
    KillSparkJobRequest, ListSparkJobRequest, SubmitSparkSQLRequest
import json
import time


def submit_spark_sql(client: AcsClient, cluster_name, sql):
    """
    提交一个spark job, 返回job_id

    :param client:             阿里云客户端
    :param cluster_name:       运行作业的spark集群名称
    :param sql:                sql 内容
    :return:                   spark任务的jobid
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化请求内容
    request = SubmitSparkSQLRequest.SubmitSparkSQLRequest()
    request.set_VcName(cluster_name)
    request.set_Sql(sql)

    # 提交job获取结果
    response = client.do_action_with_exception(request)

    # 返回job id
    r = json.loads(str(response, encoding='utf-8'))
    return r['JobId']


def submit_spark_job(client: AcsClient, cluster_name, job_config):
    """
    提交一个spark job, 返回job_id

    :param client:             阿里云客户端
    :param cluster_name:       运行作业的spark集群名称
    :param job_config:         定义spark作业的JSON字符串
    :return:                   spark任务的jobid
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化请求内容
    request = SubmitSparkJobRequest.SubmitSparkJobRequest()
    request.set_VcName(cluster_name)
    request.set_ConfigJson(job_config)

    # 提交job获取结果
    response = client.do_action_with_exception(request)

    # 返回job id
    r = json.loads(str(response, encoding='utf-8'))
    return r['JobId']


def get_job_status(client: AcsClient, cluster_name, job_id):
    """
    查询某个spark作业的执行状态

    :param client:             阿里云客户端
    :param cluster_name:       运行作业的spark集群名称
    :param job_id:             spark作业的id
    :return:                   spark任务的status
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化请求内容
    request = GetJobStatusRequest.GetJobStatusRequest()
    request.set_VcName(cluster_name)
    request.set_JobId(job_id)

    # 获取job的运行状态
    response = client.do_action_with_exception(request)
    r = json.loads(str(response, encoding='utf-8'))
    return r['Status']


def get_job_log(client: AcsClient, cluster_name, job_id):
    """
    获取spark job的日志信息

    :param client:             阿里云客户端
    :param cluster_name:       运行作业的spark集群名称
    :param job_id:             spark作业的id
    :return:                   spark作业的日志
    :rtype:                    basestring
    :exception                 ClientException
    """

    # 初始化请求内容
    request = GetJobLogRequest.GetJobLogRequest()
    request.set_VcName(cluster_name)
    request.set_JobId(job_id)

    # 获取日志信息
    response = client.do_action_with_exception(request)
    r = json.loads(str(response, encoding='utf-8'))
    return r['Data']


def kill_job(client: AcsClient, cluster_name, job_id):
    """
    杀死执行中的某个spark job

    :param client:             阿里云客户端
    :param cluster_name:       运行作业的spark集群名称
    :param job_id:             spark作业的id
    :return:                   None
    :exception                 ClientException
    """

    # 初始化请求内容
    request = KillSparkJobRequest.KillSparkJobRequest()
    request.set_VcName(cluster_name)
    request.set_JobId(job_id)

    # 杀死执行中的任务
    client.do_action_with_exception(request)


def list_job(client: AcsClient, cluster_name: str, page_number: int,
             page_size: int):
    """
    打印spark集群中历史作业详情

    :param client:             阿里云客户端
    :param cluster_name:       运行作业的spark集群名称
    :param page_number:        历史作业页码, 编码是从1开始的
    :param page_size           每页返回多少作业
    :return:                   None
    :exception                 ClientException
    """

    # 初始化请求体
    request = ListSparkJobRequest.ListSparkJobRequest()
    request.set_VcName(cluster_name)
    request.set_PageNumber(page_number)
    request.set_PageSize(page_size)

    # 打印作业详情
    response = client.do_action_with_exception(request)
    r = json.loads(str(response, encoding='utf-8'))
    for job_item in r["DataResult"]["JobList"]:
        job_detail = 'JobId:{}, JobStatus:{}, ' \
                     'JobUI:{}, SumbitTime:{} \n'.format(job_item['JobId'], job_item['Status'],
                                                         job_item['SparkUI'],
                                                         job_item['SubmitTime'])
        print(job_detail)


if __name__ == '__main__':
    # 参数中传入AK SK
    access_key_id = sys.argv[1]
    access_key_secret = sus.argv[2]
    # 区域ID和集群名称
    region = "cn-hangzhou"
    cluster_name = "SparkCluster"

    # 提交一个计算π的作业
    job_config = '''
     {
         "name": "SparkPi",
         "file": "local:///tmp/spark-examples.jar",
         "className": "org.apache.spark.examples.SparkPi",
         "args": [
            "1000000"
         ],
         "conf": {
             "spark.driver.resourceSpec": "small",
             "spark.executor.instances": 1,
             "spark.executor.resourceSpec": "small"
             "spark.dla.job.log.oss.uri": "oss://test/spark-logs"
         }
    }
    '''

    #创建一个show databases的sql语句
    sql = '''
    -- here is the spark conf
    set spark.driver.resourceSpec=medium;
    set spark.executor.instances=5;
    set spark.executor.resourceSpec=medium;
    set spark.app.name=sparksqltest;
    set spark.sql.hive.metastore.version=dla;
    -- here is your sql statement
    -- add your jar
    -- add jar oss://path/to/your/jar
    show databases;
    '''

    # 创建客户端
    client = AcsClient(ak=access_key_id, secret=access_key_secret, region_id=region)

    #提交sql任务
    #job_id: str = submit_spark_sql(client, cluster_name, sql)

    # 提交任务
    job_id: str = submit_spark_job(client, cluster_name, job_config)

    # 轮询检查job的状态, 超时则杀死作业
    submit_time = time.time()
    while True:
        job_status: str = get_job_status(client, cluster_name, job_id)
        # 作业已经结束
        if job_status in ["error", "success", "dead", "killed"]:
            break
        # 超时则杀死任务
        elif int(time.time() - submit_time) > 100:
            kill_job(client, cluster_name, job_id)
            break
        # 等待下一轮扫描
        else:
            time.sleep(5)

    # 打印作业的详情
    print("log detail: {} \n".format(get_job_log(client, cluster_name, job_id)))

    # 查询最近的10条作业内容
    list_job(client, cluster_name, 1, 10)