本文介绍如何使用计算下推功能,该功能可以有效解决Tablestore计算慢的问题。
背景信息
在Presto里面,通常查询是从对应数据源拉取数据,并放到Presto端计算。当需要拉取的数据量比较大时,会严重影响计算效率。计算下推的原理就是把一部分计算放到数据源端进行,数据源端计算完再把结果返回给Presto,在一些场景下(比如聚合)可以有效减少数据源与Presto之间的数据传输,从而提升计算效率。
支持的算子
算子名称 | 具体类型 |
---|---|
FILTER |
|
LIMIT |
|
AGGREGATE |
|
计算下推限制
计算下推功能有如下限制。
- 必须是简单的表达式,支持下推的例如:a>10、sum(a)、avg(a);不支持下推的例如:a+b>10、abs(a)>10、sum(a+b)。
- 所涉及的列都必须在Tablestore的同一个索引里面,否则会导致只能下推一部分算子或者都不下推。
- LIKE的字符必须小于等于20个字符,比如a like '%123456789123456789%',包括%在内需要小于等于20个字符。
- 目前GROUP BY后面的列最多只能支持4层,比如group by a1,a2,a3,a4 ,如果超过4层,将不能下推。
- GROUP BY下推默认是关闭的,下推到Tablestore之后,Tablestore只能返回2000条数据,如果聚合的结果不止这些会导致结果不对。开启计算下推详情,请参见开启计算下推。
开启计算下推
Tablestore数据源计算下推默认是关闭的,需要通过ots-index-first=auto,ots-pushdown-enabled=true开关开启,示例如下:
/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100;
/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 limit 100;
/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 order by l_commitdate desc limit 10;
/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select * from lineitem where l_orderkey > 100 order by l_commitdate desc limit 10;
/*+ots-index-first=auto,ots-pushdown-enabled=true*/ select count(l_orderkey),sum(l_partkey), avg(l_linenumber) from lineitem;
GROUP BY下推
要实现GROUP BY下推,需要在开启计算下推的基础上加上ots-groupby-pushdown-enabled=true hint,示例如下:
/*+ots-index-first=auto,ots-pushdown-enabled=true,ots-groupby-pushdown-enabled=true*/
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-12-01'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
查看计算下推
您可以使用EXPLAIN查看SQL是否下推到Tablestore上,SQL示例如下:
explain select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-12-01'
group by
l_returnflag,
l_linestatus;
返回结果如下:
- Output[l_returnflag, l_linestatus, sum_qty, sum_base_price, avg_qty, avg_price, avg_disc, count_order] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
sum_qty := sum
sum_base_price := sum_7
avg_qty := avg
avg_price := avg_8
avg_disc := avg_9
count_order := count
- RemoteStreamingMerge[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
- LocalMerge[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
- PartialSort[l_returnflag ASC_NULLS_FIRST, l_linestatus ASC_NULLS_FIRST] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
- RemoteStreamingExchange[REPARTITION] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: ?}
- TableScan[TableHandle {connectorId='ots', connectorHandle='table=ots20201208_mzl.lineitem', layout='Optional[table=ots20201208_mzl.lineitem, OtsPushDownInfo = Optional[OtsQueryGeneratorContext{filter=L_SHIPDATE <= 1998-12-01, candidateIndexMap=Optional[{lineitem_index=[L_SHIPMODE, L_TAX, L_COMMITDATE, L_EXTENDEDPRICE, L_ORDERKEY, L_LINESTATUS, L_RETURNFLAG, L_RECEIPTDATE, L_PARTKEY, L_LINENUMBER, L_SHIPDATE, L_SHIPINSTRUCT, L_SUPPKEY, L_DISCOUNT, L_COMMENT, L_QUANTITY]}], aggregationLists=Optional[[SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_quantity), fieldName=L_QUANTITY), SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_quantity), fieldName=L_QUANTITY), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_discount), fieldName=L_DISCOUNT), CountAggregation(aggregationType=AGG_COUNT, aggName=count(*), fieldName=_ID)]], groupByColumns={l_returnflag=l_returnflag, l_linestatus=l_linestatus}, orderBy={}, limit=OptionalInt.empty}]]'}] => [l_returnflag:varchar, l_linestatus:varchar, sum:double, sum_7:double, avg:double, avg_8:double, avg_9:double, count:bigint]
Estimates: {rows: ? (?), cpu: ?, memory: 0.00, network: 0.00}
LAYOUT: table=ots20201208_mzl.lineitem, OtsPushDownInfo = Optional[OtsQueryGeneratorContext{filter=L_SHIPDATE <= 1998-12-01, candidateIndexMap=Optional[{lineitem_index=[L_SHIPMODE, L_TAX, L_COMMITDATE, L_EXTENDEDPRICE, L_ORDERKEY, L_LINESTATUS, L_RETURNFLAG, L_RECEIPTDATE, L_PARTKEY, L_LINENUMBER, L_SHIPDATE, L_SHIPINSTRUCT, L_SUPPKEY, L_DISCOUNT, L_COMMENT, L_QUANTITY]}], aggregationLists=Optional[[SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_quantity), fieldName=L_QUANTITY), SumAggregation(aggregationType=AGG_SUM, aggName=sum(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_quantity), fieldName=L_QUANTITY), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_extendedprice), fieldName=L_EXTENDEDPRICE), AvgAggregation(aggregationType=AGG_AVG, aggName=avg(l_discount), fieldName=L_DISCOUNT), CountAggregation(aggregationType=AGG_COUNT, aggName=count(*), fieldName=_ID)]], groupByColumns={l_returnflag=l_returnflag, l_linestatus=l_linestatus}, orderBy={}, limit=OptionalInt.empty}]
avg_8 := OtsColumnHandle{columnName=avg(l_extendedprice), mappedName=L_EXTENDEDPRICE, primaryKey=false, columnType=double}
avg := OtsColumnHandle{columnName=avg(l_quantity), mappedName=L_QUANTITY, primaryKey=false, columnType=double}
sum := OtsColumnHandle{columnName=sum(l_quantity), mappedName=L_QUANTITY, primaryKey=false, columnType=double}
avg_9 := OtsColumnHandle{columnName=avg(l_discount), mappedName=L_DISCOUNT, primaryKey=false, columnType=double}
count := OtsColumnHandle{columnName=count(*), mappedName=_ID, primaryKey=false, columnType=bigint}
l_returnflag := OtsColumnHandle{columnName=l_returnflag, mappedName=L_RETURNFLAG, primaryKey=false, columnType=varchar}
sum_7 := OtsColumnHandle{columnName=sum(l_extendedprice), mappedName=L_EXTENDEDPRICE, primaryKey=false, columnType=double}
l_linestatus := OtsColumnHandle{columnName=l_linestatus, mappedName=L_LINESTATUS, primaryKey=false, columnType=varchar}
上述输出的执行计划已经看不到Aggregate Node了,而且在TableScan的OtsQueryGeneratorContex中包含下推的相关信息,表明查询已经下推到Tablestore了。