异步提交
语法:
submit job insert overwrite into xxx select ...
该命令会返回一个task_id,例如:
mysql> submit job insert overwrite into test select * from test_external_table;
+---------------------------------------+
| job_id |
+---------------------------------------+
| 2017112122202917203100908203303000715 |
+---------------------------------------+
1 row in set (1.77 sec)
查询状态
语法:
show job status where job=‘xxx’
例如:
mysql> show job status where job='2017112122202917203100908203303000715';
+---------------------------------------+-------------+---------+----------+-----------------------+-----------------------+--------------------------------------+
| job_id | schema_name | status | fail_msg | create_time | update_time | definition |
+---------------------------------------+-------------+---------+----------+-----------------------+-----------------------+--------------------------------------+
| 2017112122202917203100908203303000715 | test | RUNNING | NULL | 2017-11-21 22:20:31.0 | 2017-11-21 22:20:40.0 | insert into test select * from test |
+---------------------------------------+-------------+---------+----------+-----------------------+-----------------------+--------------------------------------+
1 row in set (0.35 sec)
任务状态:
INIT -> SUBMITTED -> RUNNING -> FINISH | FAILED
入队列 -> 被调度起来提交 -> 后台开始执行 -> 成功或失败
任务终止
语法:
cancel job 'xxx'
例如:
mysql> cancel job '2017112122202917203100908203303000715';
Query OK, 1 row affected (0.02 sec)
说明:
- 未调度起来的任务会被移除队列。
- 正在运行的任务会被终止,已导入数据会被回滚。
- 已完成(失败或成功)的任务也会被移除。
如何在D2/DataWorks定时调度
在D2/DataWorks中新增一个Shell任务,通过这个任务来使用D2/DataWorks的定时调度。
下面的例子是配置MaxCompute导入的Shell脚本:
#!/usr/bin/env bash
host="xxx.petadata.rds.aliyuncs.com" ## your instance dns
port="3303" ## your instance vport
user="cstore"
password="cstore"
database="testdb"
source_table="odps_test_external_table"
target_table="cstore_test"
timeout=86400 #24个小时,超时
sql="submit job insert overwrite into ${target_table} select * from ${source_table}"
echo "exec SQL:$sql"
mysql -h"${host}" -P"${port}" -u"${user}" -p"${password}" "${database}" -e "${sql}" | egrep "201[0-9]+" > import_${target_table}_jobid.txt
echo 执行返回的jobid: `cat import_${target_table}_jobid.txt`
jobid=`cat import_${target_table}_jobid.txt`
check_finish_sql="show job status where job = '${jobid}'"
echo "Check Finish SQL:$check_finish_sql"
mysql -h"${host}" -P"${port}" -u"${user}" -p"${password}" "${database}" -e "${check_finish_sql}" | egrep "${jobid}" | awk '{print $3}' > import_${target_table}_result.txt
result=`cat import_${target_table}_result.txt`
begin_time=$(date "+%s")
echo "begin_time = "$begin_time
while true
do
if [[ "$result" == "FINISH" ]]; then
break;
elif [[ "$result" == "FAILED" ]]; then
echo "$jobid is failed, so exit"
exit -1
else
echo "$jobid current status is $result"
fi
end_time=$(date "+%s")
cost=$(($end_time - $begin_time))
if [[ "$cost" -gt "$timeout" ]]
then
echo "$jobid has cost $cost second >= $timeout , so exit"
exit -1
else
echo "$jobid is running using $cost seconds"
sleep 30
fi
mysql -h"${host}" -P"${port}" -u"${user}" -p"${password}" "${database}" -e "${check_finish_sql}" | egrep "${jobid}" | awk '{print $3}' > import_${target_table}_result.txt
result=`cat import_${target_table}_result.txt`
done
end_time=$(date "+%s")
cost=$(($end_time - $begin_time))
echo "$jobid has finished, using $cost seconds"