Runtime Filter

本文介绍云数据库 SelectDB 版提供的Runtime Filter的使用方式和注意事项,作为您进行Join优化的参考。

概述

Runtime Filter为某些Join查询在运行时动态生成过滤条件,来减少数据的扫描计算,避免不必要的I/O和网络传输,从而加速查询。它的设计、实现和效果详情请参见ISSUE 6116

名词解释

  • 左表:Join查询时左边的表,进行Probe操作。可被Join Reorder调整顺序。

  • 右表:Join查询时右边的表,进行Build操作。可被Join Reorder调整顺序。

  • Fragment:FE会将具体的SQL语句转化为对应的执行片段(Fragment),然后下发到分布式集群中的BE节点进行执行。BE上执行对应Fragment,并将结果汇聚返回给FE。

基本原理

Runtime Filter在查询规划时动态生成,由HashJoin算子(HashJoinNode)中将Join过程中的右表转换为过滤条件,下推给数据扫描算子(ScanNode),然后在左表扫描过程中进行裁剪过滤。这种方式大幅降低查询过程中的数据读取和计算,提升了查询性能。

例如当前存在T1表与T2表的Join查询,它的Join方式为HashJoin,T1是一张事实表,数据行数为1000000,T2是一张维度表,数据行数为200。在以上场景下,常规的HashJoin的实际情况如下。

|          >      HashJoinNode      <
|         |                          |
|         | 1000000                  | 200
|         |                          |
|   OlapScanNode              OlapScanNode
|         ^                          ^   
|         | 1000000                  | 200
|        T1                          T2
|

因此T1表需要扫描大量数据,并进行大量的Hash Join计算。

如果主动将T2将扫描的数据记录交给HashJoinNode后,HashJoinNode会根据T2的数据计算出一个过滤条件,比如T2数据的最大/最小值,或者构建一个Bloom Filter,接着将这个过滤条件发给等待扫描T1ScanNode。后者应用这个过滤条件,将过滤后的数据交给HashJoinNode,从而减少Probe Hashtable的次数和网络开销。这个过滤条件就是Runtime Filter,效果如下。

|          >      HashJoinNode     <
|         |                         |
|         | 6000                    | 200
|         |                         |
|   OlapScanNode              OlapScanNode
|         ^                         ^   
|         | 1000000                 | 200
|        T1                         T2
|

如果能进一步将过滤条件(Runtime Filter)下推到存储引擎,则某些情况下可以利用索引进行裁剪数据。大幅减少实际读取的数据量,从而大大降低扫描耗时,效果如下。

|          >      HashJoinNode     <
|         |                         |
|         | 6000                    | 200
|         |                         |
|   OlapScanNode              OlapScanNode
|         ^                         ^   
|         | 6000                    | 200
|        T1                        T2
|

通过上述分析,发现Runtime Filter和谓词下推、分区裁剪不同,Runtime Filter是在运行时动态生成的过滤条件,即在查询运行时解析Join on Clause确定过滤表达式,并将表达式广播给正在读取左表的ScanNode,从而减少查询过程中数据的读取和计算,大幅提升查询性能。

Runtime Filter类型

SelectDB提供了三种不同的Runtime Filter类型。

  • IN类型:利用HashSet结构实现IN过滤条件,下推到数据扫描节点。IN的优点是过滤效果明显且快速。缺点方面:首先,它只适用于BroadCast;其次,当它右表超过一定数据量的时候就会失效。当前SelectDB配置的数据量限制为1024,即右表如果大于1024,IN类型的Runtime Filter就直接失效。

  • Bloom Filter类型:利用哈希表的数据构造一个Bloom Filter,然后下推到查询数据的扫描节点。Bloom Filter的特点是通用,适用于各种类型、效果也较好。缺点是它的配置比较复杂且计算较高。

  • MinMax类型:通过右表数据确定一个Range范围后,下推给数据扫描节点。MinMax的优点是开销比较小。缺点是对于非数值列的效用不大。

适用场景

Runtime Filter主要用于大表Join小表的优化,如果左表的数据量太小,或者右表的数据量太大,则Runtime Filter可能不会取得预期效果。Runtime Filter适用的场景有以下两个要求。

  • 左表大右表小。因为构建Runtime Filter需要承担计算成本,包括一些内存的开销。

  • 左右表Join出来的结果很少。左右表Join出来的结果很少说明这个Join可以过滤掉左表的绝大部分数据。

使用方式

查询

SelectDB默认开启Runtime Filter功能。SelectDB在处理用户查询时,会自动根据表、查询语句情况,生成IN类型或Bloom Filter类型的Runtime Filter,进行查询优化。

Runtime Filter查询选项

参数名称

参数说明

runtime_filter_mode

用于调整Runtime Filter的下推策略,包括OFF、LOCAL、GLOBAL三种策略,默认设置为GLOBAL策略。

runtime_filter_type

指定使用的Runtime Filter类型。大多数情况下只需要调整这一个选项,其他选项保持默认即可。

包括Bloom Filter、MinMax Filter、IN Predicate、IN Or Bloom Filter、Bitmap Filter,默认会使用IN Or Bloom Filter。部分情况下同时使用Bloom Filter、MinMax Filter、IN Predicate时性能更高。

runtime_filter_wait_time_ms

左表的ScanNode等待每个Runtime Filter的时间,单位为ms,默认为1000。

runtime_filters_max_num

每个查询可应用的Runtime FilterBloom Filter的最大数量,默认10。

runtime_bloom_filter_min_size

Runtime FilterBloom Filter的最小长度,默认1048576(1 MiB)。

runtime_bloom_filter_max_size

Runtime FilterBloom Filter的最大长度,默认16777216(16 MiB)。

runtime_bloom_filter_size

Runtime FilterBloom Filter的默认长度,默认2097152(2 MiB)。

runtime_filter_max_in_num

如果join右表数据行数大于这个值,将不生成IN Predicate,默认1024。

runtime_filter_mode

用于控制Runtime Filter在查询执行的最小单元之间传输的范围。

取值范围:数字(0,1,2)或者相对应的助记符字符串(OFF,LOCAL,GLOBAL)。默认2(GLOBAL)。

  • LOCAL:相对保守,构建的Runtime Filter只能在同一个查询执行的最小单元的同一个Fragment中使用,即Runtime Filter生产者(构建FilterHashJoinNode)和消费者(使用Runtime FilterScanNode)在同一个Fragment,例如Broadcast Join的一般场景。

  • GLOBAL:相对激进,除满足LOCAL策略的场景外,还可以将Runtime Filter合并后通过网络传输到不同执行单元上的不同Fragment中使用,例如Runtime Filter生产者和消费者在不同Fragment,比如Shuffle Join。

通常,GLOBAL策略可以在更广泛的场景对查询进行优化。但在有些Shuffle Join中,生成和合并Runtime Filter的开销超过给查询带来的性能优势,可以更改为LOCAL策略。如果集群中涉及的Join查询不会因为Runtime Filter而提高性能,您可以将设置更改为OFF,从而完全关闭该功能。

在不同Fragment上构建和应用Runtime Filter时,需要合并Runtime Filter的原因和策略详情请参见ISSUE 6116(opens new window)

runtime_filter_type

指定使用的Runtime Filter类型。

取值范围:数字(1,2,4,8,16)或者相对应的助记符字符串(IN,BLOOM_FILTER,MIN_MAX,IN_OR_BLOOM_FILTER,BITMAP_FILTER)。默认值为8(IN_OR_BLOOM_FILTER)。使用多个时用逗号分隔,注意需要加引号,或者将任意多个类型的数字相加,示例如下。

set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX";

上述设置等价于如下设置。

set runtime_filter_type=7;

Runtime Filter类型的具体含义如下表。

参数名称

参数说明

IN Predicate

根据Join on ClauseKey列在右表上的所有值构建IN Predicate,使用构建的IN Predicate在左表上过滤,相比Bloom Filter构建和应用的开销更低,在右表数据量较少时性能更高。

  • IN Predicate已实现合并方法。

  • 当同时指定IN Predicate和其他Filter,并且in的过滤数值没达到runtime_filter_max_in_num时,会尝试把其他filter去除掉。原因在于IN Predicate是精确的过滤条件,即使没有其他filter也可以高效过滤。目前仅在Runtime filter的生产者和消费者处于同一个Fragment时才会有去除非In Filter的逻辑。

Bloom Filter

有一定的误判率,导致过滤的数据比预期少一点,但不会导致最终结果不准确,在大部分情况下Bloom Filter都可以提升性能或对性能没有显著影响,在少部分情况下会导致性能降低。

  • Bloom Filter构建和应用的开销较高,所以当过滤率较低时,或者左表数据量较少时,Bloom Filter可能会导致性能降低。

  • 目前只有左表的Key列应用Bloom Filter才能下推到存储引擎,并且测试结果显示Bloom Filter不下推到存储引擎时往往会导致性能降低。

  • 目前Bloom Filter仅在ScanNode上使用表达式过滤时有短路(short-circuit)逻辑,即当假阳性率过高时,不继续使用Bloom Filter,但当Bloom Filter下推到存储引擎后没有短路逻辑,所以当过滤率较低时可能导致性能降低。

MinMax Filter

包含最大值和最小值,从而过滤小于最小值和大于最大值的数据,MinMax Filter的过滤效果与Join on ClauseKey列的类型和左右表数据分布有关。

  • Join on ClauseKey列的类型为int/bigint/double等时,极端情况下,如果左右表的最大最小值相同则没有效果,反之右表最大值小于左表最小值,或右表最小值大于左表最大值,则效果最好。

  • Join on ClauseKey列的类型为varchar等时,应用MinMax Filter往往会导致性能降低。

IN or Bloom Filter

根据右表在执行过程中的真实行数,由系统自动判断使用IN Predicate还是 Bloom Filter。

  • 默认在右表数据行数少于102400时会使用IN Predicate(可通过session变量中的runtime_filter_max_in_num调整),否则使用Bloom Filter。

Bitmap Filter

  • 当前仅当In Subquery操作中的子查询返回Bitmap列时会使用Bitmap Filter。

  • 当前仅在向量化引擎中支持Bitmap Filter。

runtime_filter_wait_time_ms

Runtime Filter的等待耗时。

取值范围:整数,单位ms,默认为1000。

在开启Runtime Filter后,左表的ScanNode会为每一个分配给自己的Runtime Filter等待一段时间再扫描数据,即如果ScanNode被分配了3Runtime Filter,那么它最多会等待3000ms。

因为Runtime Filter的构建和合并均需要时间,ScanNode会尝试将等待时间内到达的Runtime Filter下推到存储引擎,如果超过等待时间后,ScanNode会使用已经到达的Runtime Filter直接开始扫描数据。

如果Runtime FilterScanNode开始扫描之后到达,则ScanNode不会将该Runtime Filter下推到存储引擎,而是对已经从存储引擎扫描上来的数据,在ScanNode上基于该Runtime Filter使用表达式过滤。之前已经扫描的数据则不会应用该Runtime Filter。这样得到的中间数据规模会大于最优解,但可以避免严重的裂化。

如果集群比较繁忙,并且集群上有许多资源密集型或长耗时的查询,可以增加等待时间,以避免复杂查询错过优化机会。如果集群负载较轻,并且集群上有许多只需要几秒的小查询,可以减少等待时间,以避免每个查询增加1s的延迟。

runtime_filters_max_num

每个查询生成的Runtime FilterBloom Filter数量的上限。

取值范围:整数,默认10。

目前仅对Bloom Filter的数量进行限制,因为相比MinMax FilterIN Predicate,Bloom Filter构建和应用的代价更高。

如果生成的Bloom Filter超过允许的最大数量,则保留选择性大的Bloom Filter,选择性大意味着预期可以过滤更多的行。这个设置可以防止Bloom Filter耗费过多的内存开销而导致潜在的问题。

选择性=(HashJoinNode Cardinality / HashJoinNode left child Cardinality)
-- 因为目前FE拿到Cardinality不准,所以这里Bloom Filter计算的选择性与实际不准,因此最终可能只是随机保留了部分Bloom Filter。
说明

仅在对大表间Join的某些长耗时查询进行调优时,才需要调整此查询选项。

Bloom Filter长度相关参数

包括runtime_bloom_filter_min_sizeruntime_bloom_filter_max_sizeruntime_bloom_filter_size,用于确定Runtime Filter使用的Bloom Filter数据结构的大小(以字节为单位)。

取值范围:整数。

因为需要保证每个HashJoinNode构建的Bloom Filter长度相同才能合并,所以目前在FE查询规划时计算Bloom Filter的长度。

如果能拿到Join右表统计信息中的数据行数(Cardinality),会尝试根据Cardinality估计Bloom Filter的最佳大小,并四舍五入到最接近的2的幂(以2为底的log值)。如果无法拿到右表的Cardinality,则会使用默认的Bloom Filter长度runtime_bloom_filter_sizeruntime_bloom_filter_min_sizeruntime_bloom_filter_max_size用于限制最终使用的Bloom Filter长度最小和最大值。

更大的Bloom Filter在处理高基数的输入集时更有效,但需要消耗更多的内存。例如查询中需要过滤高基数列(比如含有数百万个不同的取值),可以增加runtime_bloom_filter_size的值进行一些基准测试,这有助于使Bloom Filter过滤的更加精准,从而获得预期的性能提升。

Bloom Filter的有效性取决于查询的数据分布,因此通常仅对一些特定查询额外调整其Bloom Filter长度,而不是全局修改。一般仅在对大表间join的某些长耗时查询进行调优时,才需要调整此查询选项。

查看Query生成的Runtime Filter

explain命令可以显示的查询计划中包括每个Fragment使用的Join on Clause信息,以及Fragment生成和使用Runtime Filter的注释,从而确认是否将Runtime Filter应用到了期望的Join on Clause上。

  • 生成Runtime FilterFragment包含的注释例如runtime filters: filter_id[type] <- table.column

  • 使用Runtime FilterFragment包含的注释例如runtime filters: filter_id[type] -> table.column

以下示例中的查询使用了一个IDRF000Runtime Filter。

CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2;
INSERT INTO test VALUES (1), (2), (3), (4);

CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2;
INSERT INTO test2 VALUES (3), (4), (5);

EXPLAIN SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2;
+-------------------------------------------------------------------+
| Explain String                                                    |
+-------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                   |
|  OUTPUT EXPRS:`t1`                                                |
|                                                                   |
|   4:EXCHANGE                                                      |
|                                                                   |
| PLAN FRAGMENT 1                                                   |
|  OUTPUT EXPRS:                                                    |
|   PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test`.`t1`  |
|                                                                   |
|   2:HASH JOIN                                                     |
|   |  join op: INNER JOIN (BUCKET_SHUFFLE)                         |
|   |  equal join conjunct: `test`.`t1` = `test2`.`t2`              |
|   |  runtime filters: RF000[in] <- `test2`.`t2`                   |
|   |                                                               |
|   |----3:EXCHANGE                                                 |
|   |                                                               |
|   0:OlapScanNode                                                  |
|      TABLE: test                                                  |
|      runtime filters: RF000[in] -> `test`.`t1`                    |
|                                                                   |
| PLAN FRAGMENT 2                                                   |
|  OUTPUT EXPRS:                                                    |
|   PARTITION: HASH_PARTITIONED: `default_cluster:ssb`.`test2`.`t2` |
|                                                                   |
|   1:OlapScanNode                                                  |
|      TABLE: test2                                                 |
+-------------------------------------------------------------------+
-- 上面`runtime filters`的行显示了`PLAN FRAGMENT 1`的`2:HASH JOIN`生成了ID为RF000的IN Predicate,
-- 其中`test2`.`t2`的key values仅在运行时可知,
-- 在`0:OlapScanNode`使用了该IN Predicate用于在读取`test`.`t1`时过滤不必要的数据。

SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2; 
-- 返回2行结果[3, 4];

-- 通过query的profile(set enable_profile=true;)可以查看查询内部工作的详细信息,
-- 包括每个Runtime Filter是否下推、等待耗时、以及OLAP_SCAN_NODE从prepare到接收到Runtime Filter的总时长。
RuntimeFilter:in:
    -  HasPushDownToEngine:  true
    -  AWaitTimeCost:  0ns
    -  EffectTimeCost:  2.76ms

-- 此外,在profile的OLAP_SCAN_NODE中还可以查看Runtime Filter下推后的过滤效果和耗时。
    -  RowsVectorPredFiltered:  9.320008M  (9320008)
    -  VectorPredEvalTime:  364.39ms

Runtime Filter的规划规则

  • 只支持对Join on Clause中的等值条件生成Runtime Filter,不包括Null-safe条件,因为其可能会过滤掉Join左表的null值。

  • 不支持将Runtime Filter下推到Left Outer、Full Outer、Anti Join的左表。

  • 不支持Src exprTarget expr是常量的场景。

  • 不支持Src exprTarget expr相等的场景。

  • 不支持Src expr的类型等于HLL或者BITMAP

  • 仅支持将Runtime Filter下推给OlapScanNode。

  • 不支持Target expr包含NULL-checking表达式,例如COALESCE/IFNULL/CASE,因为当Outer Join上层其他JoinJoin on Clause包含NULL-checking表达式并生成Runtime Filter时,将这个Runtime Filter下推到Outer Join的左表时可能导致结果不正确。

  • 不支持Target expr中的列(slot)无法在原始表中找到某个等价列。

  • 在以下场景不支持列传导:

    • Join on Clause包含A.k = B.k and B.k = C.k时,目前C.k只可以下推给B.k,而不可以下推给A.k;

    • Join on Clause包含A.a + B.b = C.c,如果A.a可以列传导到B.a,即A.aB.a是等价的列,那么可以用B.a替换A.a,然后可以尝试将Runtime Filter下推给B(如果A.aB.a不是等价列,则不能下推给B,因为Target expr必须与唯一一个Join左表绑定);

  • Target exprSrc expr的类型必须相等,因为Bloom Filter基于hash,若类型不等则会尝试将Target expr的类型转换为Src expr的类型。

  • 不支持PlanNode.Conjuncts生成的Runtime Filter下推,与HashJoinNodeeqJoinConjunctsotherJoinConjuncts不同,PlanNode.Conjuncts生成的Runtime Filter在测试中发现可能会导致错误的结果,例如IN子查询转换为Join时,自动生成的Join on Clause将保存在PlanNode.Conjuncts中,此时应用Runtime Filter可能会导致结果缺少一些行。