本文将演示如何使用Python SDK提交一个计算π的任务,查看任务的状态和日志,超时终止任务, 以及查看虚拟集群的历史状态。
"""
演示如何使用Python SDK操作数据湖分析的spark作业
author aliyun
"""
from aliyunsdkcore.client import AcsClient
from aliyunsdkopenanalytics_open.request.v20180619 import SubmitSparkJobRequest, GetJobStatusRequest, GetJobLogRequest, \
KillSparkJobRequest, ListSparkJobRequest, SubmitSparkSQLRequest
import json
import time
def submit_spark_sql(client: AcsClient, cluster_name, sql):
"""
提交一个spark job, 返回job_id
:param client: 阿里云客户端
:param cluster_name: 运行作业的spark集群名称
:param sql: sql 内容
:return: spark任务的jobid
:rtype: basestring
:exception ClientException
"""
# 初始化请求内容
request = SubmitSparkSQLRequest.SubmitSparkSQLRequest()
request.set_VcName(cluster_name)
request.set_Sql(sql)
# 提交job获取结果
response = client.do_action_with_exception(request)
# 返回job id
r = json.loads(str(response, encoding='utf-8'))
return r['JobId']
def submit_spark_job(client: AcsClient, cluster_name, job_config):
"""
提交一个spark job, 返回job_id
:param client: 阿里云客户端
:param cluster_name: 运行作业的spark集群名称
:param job_config: 定义spark作业的JSON字符串
:return: spark任务的jobid
:rtype: basestring
:exception ClientException
"""
# 初始化请求内容
request = SubmitSparkJobRequest.SubmitSparkJobRequest()
request.set_VcName(cluster_name)
request.set_ConfigJson(job_config)
# 提交job获取结果
response = client.do_action_with_exception(request)
# 返回job id
r = json.loads(str(response, encoding='utf-8'))
return r['JobId']
def get_job_status(client: AcsClient, cluster_name, job_id):
"""
查询某个spark作业的执行状态
:param client: 阿里云客户端
:param cluster_name: 运行作业的spark集群名称
:param job_id: spark作业的id
:return: spark任务的status
:rtype: basestring
:exception ClientException
"""
# 初始化请求内容
request = GetJobStatusRequest.GetJobStatusRequest()
request.set_VcName(cluster_name)
request.set_JobId(job_id)
# 获取job的运行状态
response = client.do_action_with_exception(request)
r = json.loads(str(response, encoding='utf-8'))
return r['Status']
def get_job_log(client: AcsClient, cluster_name, job_id):
"""
获取spark job的日志信息
:param client: 阿里云客户端
:param cluster_name: 运行作业的spark集群名称
:param job_id: spark作业的id
:return: spark作业的日志
:rtype: basestring
:exception ClientException
"""
# 初始化请求内容
request = GetJobLogRequest.GetJobLogRequest()
request.set_VcName(cluster_name)
request.set_JobId(job_id)
# 获取日志信息
response = client.do_action_with_exception(request)
r = json.loads(str(response, encoding='utf-8'))
return r['Data']
def kill_job(client: AcsClient, cluster_name, job_id):
"""
杀死执行中的某个spark job
:param client: 阿里云客户端
:param cluster_name: 运行作业的spark集群名称
:param job_id: spark作业的id
:return: None
:exception ClientException
"""
# 初始化请求内容
request = KillSparkJobRequest.KillSparkJobRequest()
request.set_VcName(cluster_name)
request.set_JobId(job_id)
# 杀死执行中的任务
client.do_action_with_exception(request)
def list_job(client: AcsClient, cluster_name: str, page_number: int,
page_size: int):
"""
打印spark集群中历史作业详情
:param client: 阿里云客户端
:param cluster_name: 运行作业的spark集群名称
:param page_number: 历史作业页码, 编码是从1开始的
:param page_size 每页返回多少作业
:return: None
:exception ClientException
"""
# 初始化请求体
request = ListSparkJobRequest.ListSparkJobRequest()
request.set_VcName(cluster_name)
request.set_PageNumber(page_number)
request.set_PageSize(page_size)
# 打印作业详情
response = client.do_action_with_exception(request)
r = json.loads(str(response, encoding='utf-8'))
for job_item in r["DataResult"]["JobList"]:
job_detail = 'JobId:{}, JobStatus:{}, ' \
'JobUI:{}, SumbitTime:{} \n'.format(job_item['JobId'], job_item['Status'],
job_item['SparkUI'],
job_item['SubmitTime'])
print(job_detail)
if __name__ == '__main__':
# 参数中传入AK SK
access_key_id = sys.argv[1]
access_key_secret = sus.argv[2]
# 区域ID和集群名称
region = "cn-hangzhou"
cluster_name = "SparkCluster"
# 提交一个计算π的作业
job_config = '''
{
"name": "SparkPi",
"file": "local:///tmp/spark-examples.jar",
"className": "org.apache.spark.examples.SparkPi",
"args": [
"1000000"
],
"conf": {
"spark.driver.resourceSpec": "small",
"spark.executor.instances": 1,
"spark.executor.resourceSpec": "small"
"spark.dla.job.log.oss.uri": "oss://test/spark-logs"
}
}
'''
#创建一个show databases的sql语句
sql = '''
-- here is the spark conf
set spark.driver.resourceSpec=medium;
set spark.executor.instances=5;
set spark.executor.resourceSpec=medium;
set spark.app.name=sparksqltest;
set spark.sql.hive.metastore.version=dla;
-- here is your sql statement
-- add your jar
-- add jar oss://path/to/your/jar
show databases;
'''
# 创建客户端
client = AcsClient(ak=access_key_id, secret=access_key_secret, region_id=region)
#提交sql任务
#job_id: str = submit_spark_sql(client, cluster_name, sql)
# 提交任务
job_id: str = submit_spark_job(client, cluster_name, job_config)
# 轮询检查job的状态, 超时则杀死作业
submit_time = time.time()
while True:
job_status: str = get_job_status(client, cluster_name, job_id)
# 作业已经结束
if job_status in ["error", "success", "dead", "killed"]:
break
# 超时则杀死任务
elif int(time.time() - submit_time) > 100:
kill_job(client, cluster_name, job_id)
break
# 等待下一轮扫描
else:
time.sleep(5)
# 打印作业的详情
print("log detail: {} \n".format(get_job_log(client, cluster_name, job_id)))
# 查询最近的10条作业内容
list_job(client, cluster_name, 1, 10)