Co2y's Blog

Phoenix SQL执行源码分析

在做TPC-C on phoenix的时候遇到了一个SQL优化的问题,主要表现为phoenix的join没有考虑where子句,见PHOENIX-3310

例子

看下面两条语句和他们的执行计划:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
0: jdbc:phoenix:slave206,slave207,slave208,sl> explain select * from XXX x join YYY y on x.col1=y.col1 where y.col1='1';
+----------------------------------------------------------------------------------------------------+
| PLAN |
+----------------------------------------------------------------------------------------------------+
| CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER XXX |
| PARALLEL INNER-JOIN TABLE 0 |
| CLIENT 1-CHUNK 1 ROWS 203 BYTES PARALLEL 1-WAY ROUND ROBIN POINT LOOKUP ON 1 KEY OVER YYY |
| DYNAMIC SERVER FILTER BY X.COL1 IN (Y.COL1) |
+----------------------------------------------------------------------------------------------------+
4 rows selected (0.017 seconds)
0: jdbc:phoenix:slave206,slave207,slave208,sl> explain select * from XXX x join YYY y on x.col1=y.col1 where x.col1='1';
+--------------------------------------------------------------------------------------------+
| PLAN |
+--------------------------------------------------------------------------------------------+
| CLIENT 1-CHUNK 1 ROWS 203 BYTES PARALLEL 1-WAY ROUND ROBIN POINT LOOKUP ON 1 KEY OVER XXX |
| PARALLEL INNER-JOIN TABLE 0 |
| CLIENT 1-CHUNK PARALLEL 1-WAY ROUND ROBIN FULL SCAN OVER YYY |
| DYNAMIC SERVER FILTER BY X.COL1 IN (Y.COL1) |
+--------------------------------------------------------------------------------------------+

其中where子句里的col1是两张表的primary key。但是join compiler得到的plan还是要做full scan。

调用过程

通常我们通过jdbc连接phoenix然后执行SQL的代码会这么写

1
2
3
4
Properties info = new Properties();
Connection con = DriverManager.getConnection(url, info);
PreparedStatement ps = con.prepareStatement("select * from XXX x join YYY y on x.col1=y.col1 where y.col1='1'");
ResultSet rs = ps.executeQuery();

这些都是通过java标准的jdbc操作的,getConnection()这里不进行追踪,主要看PreparedStatementexecuteQuery方法。

首先createStatement()返回的是PhoenixPreparedStatement对象,它实现了java.sql.PreparedStatement,继承自PhoenixStatement。调用executeQuery()方法,调用PhoenixStatementexecuteQuery(),下面主要看包含join的select where执行计划生成过程。executeQuery()的主要代码为:

1
2
3
4
5
6
7
8
9
10
11
12
QueryPlan plan = stmt.compilePlan(PhoenixStatement.this, Sequence.ValueOp.VALIDATE_SEQUENCE);
Iterator<TableRef> tableRefs = plan.getSourceRefs().iterator();
connection.getMutationState().sendUncommitted(tableRefs);
plan = connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, plan);
……
ResultIterator resultIterator = plan.iterator();
StatementContext context = plan.getContext();
context.getOverallQueryMetrics().startQuery();
PhoenixResultSet rs = newResultSet(resultIterator, plan.getProjector(), context);
……

整个过程分为3步,根据statement进行compile得到plan,然后optimize plan,最后根据不同的plan获取iterator,iterator是一层层加上去的,最终得到resultset。我们主要关注前两步。

compile

跟踪上面代码第一行的compilePlan,它是CompilableStatement接口定义的,根据SQL这里的stmt应该是ExecutableSelectStatement,于是查看ExecutableSelectStatementcompilePlan方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public QueryPlan compilePlan(PhoenixStatement stmt, Sequence.ValueOp seqAction) throws SQLException {
if(!getUdfParseNodes().isEmpty()) {
stmt.throwIfUnallowedUserDefinedFunctions(getUdfParseNodes());
}
SelectStatement select = SubselectRewriter.flatten(this, stmt.getConnection());
ColumnResolver resolver = FromCompiler.getResolverForQuery(select, stmt.getConnection());
select = StatementNormalizer.normalize(select, resolver);
SelectStatement transformedSelect = SubqueryRewriter.transform(select, resolver, stmt.getConnection());
if (transformedSelect != select) {
resolver = FromCompiler.getResolverForQuery(transformedSelect, stmt.getConnection());
select = StatementNormalizer.normalize(transformedSelect, resolver);
}
QueryPlan plan = new QueryCompiler(stmt, select, resolver, Collections.<PDatum>emptyList(), stmt.getConnection().getIteratorFactory(), new SequenceManager(stmt), true).compile();
plan.getContext().getSequenceManager().validateSequences(seqAction);
return plan;
}

前面是做一个检查,看有没有UDF,然后重写statement,后面3行是关键,通过new一个QueryCompiler,调用其compile()方法得到plan,进入compile()方法。compile() -> compileSelect()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public QueryPlan compileSelect(SelectStatement select) throws SQLException{
List<Object> binds = statement.getParameters();
StatementContext context = new StatementContext(statement, resolver, scan, sequenceManager);
if (select.isJoin()) {
select = JoinCompiler.optimize(statement, select, resolver);
if (this.select != select) {
ColumnResolver resolver = FromCompiler.getResolverForQuery(select, statement.getConnection());
context = new StatementContext(statement, resolver, scan, sequenceManager);
}
JoinTable joinTable = JoinCompiler.compile(statement, select, context.getResolver());
return compileJoinQuery(context, binds, joinTable, false, false, null);
} else {
return compileSingleQuery(context, select, binds, false, true);
}
}

从上面可以看到,首先是通过JoinCompiler.optimize进行了优化,然后compileJoinQuery()JoinCompiler.optimize下部分优化时再详细看,先接着走。

compileJoinQuery()这部分很长,最终是通过转换然后compileSingleFlatQuery(),把groupby,orderby,where等合起来得到plan。

optimize

plan = connection.getQueryServices().getOptimizer().optimize(PhoenixStatement.this, plan)optimze()开始跟踪

optimize()->getApplicablePlans() -> orderPlansBestToWorst() 这些都是在QueryOptimizer里的,orderPlansBestToWorst返回一个排好序的plan列表,取第一个就是最终的plan。

优化方法

从上面的流程中可以看出,JoinCompiler.optimizecompileJoinQuery()compileSingleFlatQuery()还有后面的getApplicablePlans()orderPlansBestToWorst()这些地方可以进行优化。