Co2y's Blog

Phoenix Join和Filter

Apache Phoenix 的Join和Filter机制介绍。

Join

Phoenix中有两种Join,Hash Join和Sort Merge Join。

原理(单机)

Hash Join

Hash Join只能用于相等连接,而Phoenix目前也只支持相等连接。Hash Join适合两个表的数据量差别较大的时候,即一个大表,一个小表。散列连接是CBO做大数据集连接时的常用方式,优化器使用两个表中较小的表(或数据源)利用连接键在内存中建立散列表,然后扫描较大的表并探测散列表,找出与散列表匹配的行。

这种方式适用于较小的表完全可以放于内存中的情况,这样总成本就是访问两个表的成本之和。但是在表很大的情况下并不能完全放入内存,这时优化器会将它分割成若干不同的分区,不能放入内存的部分就把该分区写入磁盘的临时段,此时要有较大的临时段从而尽量提高I/O的性能。

步骤:将两个表中较小的一个在内存中构造一个HASH表(对JOIN KEY),扫描另一个表,同样对JOIN KEY进行HASH后探测是否可以JOIN。适用于记录集比较大的情况。需要注意的是:如果HASH表太大,无法一次构造在内存中,则分成若干个partition,写入磁盘的temporary segment,则会多一个写的代价,会降低效率。

Sort Merge Join

Merge Sort是分布式系统中最常用的排序方式。通常情况下散列连接的效果都比排序合并连接要好,然而如果行源已经被排过序,在执行排序合并连接时不需要再排序了,这时排序合并连接的性能会优于散列连接。

Sort Merge join 用在没有索引,并且数据已经排序的情况。步骤:将两个表排序,然后将两个表合并。通常情况下,只有在以下情况发生时,才会使用此种JOIN方式:

  1. RBO模式
  2. 不等价关联(>,<,>=,<=,<>)
  3. HASH_JOIN_ENABLED=false
  4. 数据源已排序

Nested Loop

这种方式Phoenix本身没有提到,但在Oracle中经常用到。对于被连接的数据子集较小的情况,嵌套循环连接是个较好的选择。在嵌套循环中,内表被外表驱动,外表返回的每一行都要在内表中检索找到与它匹配的行,因此整个查询返回的结果集不能太大(大于1万不适合),要把返回子集较小的表作为外表(CBO 默认外表是驱动表),而且在内表的连接字段上一定要有索引。

Nested loop一般用在连接的表中有索引,并且索引选择性较好的时候。
步骤:确定一个驱动表(outer table),另一个表为inner table,驱动表中的每一行与inner表中的相应记录JOIN。类似一个嵌套的循环。适用于驱动表的记录集比较小(<10000)而且inner表需要有有效的访问方法(Index)。需要注意的是:JOIN的顺序很重要,驱动表的记录集一定要小,返回结果集的响应时间是最快的。

Phoenix(分布式)

Hash Join

Phoenix中的Hash Join最大的问题就是内存不够,它好像没有分区的做法。要求一个父亲是小表,可以全部装载到内存中,并组织成map这样的数据结构。比如a join b其中a是小表,那么每个instance都要有一份a的全量数据,b不要求是有序的,随机分配到每个instance上即可,但是a表要广播到每个Instance上。对于等值连接,b的每一行都可以在内存中查找具有相等key的全部a,做笛卡尔积即可。对于非等值连接,b的每一行和a的全量做笛卡尔积然后按照on条件过滤即可。

在Phoenix中父表的顺序是这样确定的

  1. lhs INNER JOIN rhs

    • rhs will be built as hash table in server cache.
  2. lhs LEFT OUTER JOIN rhs

    • rhs will be built as hash table in server cache.
  3. lhs RIGHT OUTER JOIN rhs

    • lhs will be built as hash table in server cache.

Sort Merge Join

merge join要求左右表都是有序表,每次在两边同时读相等的等值连接的key到内存中(其实在内存的只有一边就可以了)做笛卡尔积。比如 a join b on a.key = b.key。a.key的值是1,2,3 b.key的值是1,3,4,那么先a.key = 1和所有行和b.key = 1的所有行join,忽略掉a.key = 2,再读key = 3。这样就需要左父亲的DAG节点(Task)按照a.key shuffle sort,右父亲的DAG节点按照b.key shuffle-sort。

易见这种算法理论上可以的无限的横向拓展,只要保证每个instance上的key都是有序的,且同一个key都在一个instance上。在Phoenix/HBase上利用MultipleTableInput,扫描A表然后用unique key输出并且和B表匹配。

步骤:在各个RS上对应的region里按列排好序,然后做笛卡尔积,结果放到client端,之后的操作就在client端进行。例如

1
2
3
4
5
6
7
8
9
10
11
SORT-MERGE-JOIN (INNER) TABLES
CLIENT 1-CHUNK 1 ROWS 203 BYTES PARALLEL 1-WAY POINT LOOKUP ON 1 KEY OVER XXX
SERVER SORTED BY [X.COL2]
CLIENT MERGE SORT
AND (SKIP MERGE)
CLIENT 1-CHUNK 0 ROWS 0 BYTES PARALLEL 1-WAY FULL SCAN OVER YYY
SERVER FILTER BY FIRST KEY ONLY
SERVER SORTED BY [Y.COL2]
CLIENT MERGE SORT
CLIENT SORTED BY [X.COL3]
CLIENT AGGREGATE INTO DISTINCT ROWS BY [X.COL3]

Filter

Server Filter

所有where子句后面的应该都是server filter

Client Filter

只有sort-merge join这种client端操作的才会是Client Filter。 例如通过sort join把结果放到client了,之后的聚集或者where过滤就会在client上了。