本文以一个完整的程序为例,提交一个计算π的作业到数据湖分析DLA,跟踪它的状态,查询历史上的运行结果。

import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.IAcsClient;
import com.aliyuncs.exceptions.ClientException;
import com.aliyuncs.openanalytics_open.model.v20180619.*;
import com.aliyuncs.profile.DefaultProfile;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/**
 * 演示如何使用Java SDK操作数据湖分析的spark作业
 *
 * @author aliyun
 */
public class Demo {

    /**
     * 提交一个SQL作业到数据湖分析Serverless Spark
     *
     * @param virtualClusterName 数据湖分析虚拟集群名称
     * @param sql          sql内容
     * @return Spark JobId, 提交作业成功, 返回作业的ID, 用于后续的状态跟踪
     * @throws ClientException 提交作业可能因为网络原因等抛出错误
     */
    public static String submitSparkSQL(IAcsClient client,
                                        String virtualClusterName,
                                        String sql) throws ClientException {
        // 初始化Request, 填入集群名称和作业内容
        SubmitSparkSQLRequest request = new SubmitSparkSQLRequest();
        request.setVcName(virtualClusterName);
        request.setSql(sql);
        // 提交作业, 返回Spark作业的JobId
        SubmitSparkSQLResponse response = client.getAcsResponse(request);
        return response.getJobId();
    }

    /**
     * 提交一个作业到数据湖分析Serverless Spark
     *
     * @param virtualClusterName 数据湖分析虚拟集群名称
     * @param jobConfig          提交Spark作业的描述文件,需要是JSON格式
     * @return Spark JobId, 提交作业成功, 返回作业的ID, 用于后续的状态跟踪
     * @throws ClientException 提交作业可能因为网络原因等抛出错误
     */
    public static String submitSparkJob(IAcsClient client,
                                        String virtualClusterName,
                                        String jobConfig) throws ClientException {
        // 初始化Request, 填入集群名称和作业内容
        SubmitSparkJobRequest request = new SubmitSparkJobRequest();
        request.setVcName(virtualClusterName);
        request.setConfigJson(jobConfig);
        // 提交作业, 返回Spark作业的JobId
        SubmitSparkJobResponse response = client.getAcsResponse(request);
        return response.getJobId();
    }
    /**
     * 返回一个Spark Job当前的状态
     *
     * @param sparkJobId 用户Spark作业的ID
     * @return 返回Spark作业的状态, 类型为String
     * @throws ClientException 提交作业可能因为网络原因等抛出错
     */
    public static String getSparkJobStatus(IAcsClient client,
                                           String virtualClusterName,
                                           String sparkJobId) throws ClientException {
        // 初始化Request, 填入spark job id
        GetJobStatusRequest request = new GetJobStatusRequest();
        request.setJobId(sparkJobId);
        request.setVcName(virtualClusterName);
        // 提交作业, 返回Spark作业的状态码
        GetJobStatusResponse response = client.getAcsResponse(request);
        return response.getStatus();
    }
    /**
     * 停止一个Spark Job
     *
     * @param sparkJobId         用户Spark作业的ID
     * @param virtualClusterName 数据湖分析虚拟集群名称
     * @return 无返回值
     * @throws ClientException 提交作业可能因为网络原因等抛出错
     */
    public static void killSparkJob(IAcsClient client,
                                    String virtualClusterName,
                                    String sparkJobId) throws ClientException {
        // 初始化Request, 填入spark job id
        KillSparkJobRequest request = new KillSparkJobRequest();
        request.setVcName(virtualClusterName);
        request.setJobId(sparkJobId);
        // 提交作业, 返回Spark作业的状态码
        KillSparkJobResponse response = client.getAcsResponse(request);
    }
    /**
     * 返回一个Spark Job的日志
     *
     * @param client     客户端
     * @param virtualClusterName 数据湖分析虚拟集群名称
     * @param sparkJobId         用户Spark作业的ID
     * @return 返回Spark作业的状态, 类型为String
     * @throws ClientException 提交作业可能因为网络原因等抛出错
     */
    public static String getSparkJobLog(IAcsClient client,
                                 String virtualClusterName,
                                 String sparkJobId) throws ClientException {

        // 初始化Request, 填入spark job id
        GetJobLogRequest request = new GetJobLogRequest();
        request.setJobId(sparkJobId);
        request.setVcName(virtualClusterName);
        // 提交作业, 返回Spark作业的日志
        GetJobLogResponse response = client.getAcsResponse(request);
        return response.getData();
    }
    /**
     * 查询某个虚拟集群上提交的Spark作业, 通过翻页可以遍历所有的历史作业信息
     *
     * @param client     客户端
     * @param pageNumber 查询的页码, 从1开始
     * @param pageSize   每页返回数量
     * @throws ClientException 提交作业可能因为网络原因等抛出错
     */
    public static void listSparkJob(IAcsClient client,
                                    String virtualClusterName,
                                    int pageNumber,
                                    int pageSize) throws ClientException {
        // 初始化Request, 填入spark job id
        ListSparkJobRequest request = new ListSparkJobRequest();
        request.setVcName(virtualClusterName);
        request.setPageNumber(pageNumber); // pageNumber 从1开始
        request.setPageSize(pageSize);
        // 提交作业, 返回Spark作业的状态码
        ListSparkJobResponse response = client.getAcsResponse(request);
        // 获取任务列表
        List<ListSparkJobResponse.DataResult.Data> sparkJobList = response.getDataResult().getJobList();
        for (ListSparkJobResponse.DataResult.Data job : sparkJobList) {
            System.out.println(String.format("JobName: %s, JobUI: %s, JobStatus: %s, JobConfig: %s",
                                             job.getJobName(),
                                             job.getStatus(),
                                             job.getSparkUI(),
                                             job.getDetail()));
        }
    }
    public static void main(String[] args) throws IOException, ClientException, InterruptedException {
        // 提交任务必须的参数
        String region = "cn-hangzhou";
        // 获取一个合法的AK SK
        String accessKeyId = System.getEnv("RAM_AK");
        String accessKeySecret = "System.getEnv("RAM_SK");
        String virtualClusterName = "MyCluster";

        // 需要是一个合法的JSON格式的字符串
        String jobConfig=
                "{\n" +
                "    \"name\": \"SparkPi\",\n" +
                "    \"file\": \"local:///tmp/spark-examples.jar\",\n" +
                "    \"className\": \"org.apache.spark.examples.SparkPi\",\n" +
                "    \"args\": [\n" +
                "        \"100\"\n" +
                "    ],\n" +
                "    \"conf\": {\n" +
                "        \"spark.driver.resourceSpec\": \"medium\",\n" +
                "        \"spark.executor.instances\": 5,\n" +
                "        \"spark.executor.resourceSpec\": \"medium\"\n" +
                "    }\n" +
                "}";

        String sql = "-- here is the spark conf\n"
                     + "set spark.driver.resourceSpec=medium;\n"
                     + "set spark.executor.instances=5;\n"
                     + "set spark.executor.resourceSpec=medium;\n"
                     + "set spark.app.name=sparksqltest;\n"
                     + "set spark.sql.hive.metastore.version=dla;\n"
                     + "-- here is your sql statement\n"
                     + "-- add your jar\n"
                     + "-- add jar oss://path/to/your/jar\n"
                     + "show databases;";

        // 初始化阿里云平台开发Client
        DefaultProfile profile = DefaultProfile.getProfile(region, accessKeyId, accessKeySecret);
        IAcsClient client = new DefaultAcsClient(profile);
        // 提交任务
        // 您也可以选择提交SQL作业
        //String sparkJobId = submitSparkSQL(client, virtualClusterName, sql);
        String sparkJobId = submitSparkJob(client, virtualClusterName, jobConfig);
        // 轮询任务状态, 超时未完成则杀死任务
        long startTime = System.currentTimeMillis();
        List<String> finalStatusList = Arrays.asList("error", "success", "dead", "killed");
        while (true) {
            String status = getSparkJobStatus(client, virtualClusterName, sparkJobId);
            if (finalStatusList.contains(status)) {
                System.out.println("Job went to final status");
                break;
            } else if ((System.currentTimeMillis() - startTime) > 100000) {
                // 如果超时则杀死job
                System.out.println("Kill expire time job");
                killSparkJob(client, virtualClusterName, sparkJobId);
                break;
            }
            // 打印状态, 等待5秒, 进入下一轮查询
            System.out.println(String.format("Job %s status is %s", sparkJobId, status));
            Thread.sleep(5000);
        }
        // 打印作业的日志
        String logDetail = getSparkJobLog(client, virtualClusterName, sparkJobId);
        System.out.println(logDetail);
        // 打印最近10条作业的明细
        listSparkJob(client, virtualClusterName, 1, 10);
    }
}