Co2y's Blog

Phoenix源码阅读 select scan filter

Phoenix把SQL翻译成scan,但是一条简单查询的执行计划并没有看到filter,那么Phoenix中select字段是否有对应的FamilyFilter或者QualifierFilter呢,也就是应该在scan的时候addFamily或者addColumn。

select

考虑这样一条SQL

1
select a.cf1.cq1, b.cf2.cq1 from a,b where b.cf2.cq2 = 1;

这是一条join语句,执行过程应该是scan b并且加上filter,scan a也加上filter,然后join,b的filter是由where语句和select字段决定的,a的filter应该是由select字段决定的,也就是本条语句应该不涉及到a除cf1之外的cf和b除cf2之外的cf。

phoenix过程

在之前的一篇博客,有说过一条select语句经过antlr-》parser-》statement,然后调用statement的executeQuery()方法,executeQuery()又分为,先得到QueryPlan,再进行optimize(),然后由QueryPlan得到对应的Iterator,最后返回的是resultSet。调用resultSet的next方法会调用iterator的next方法,下面看/phoenix-core/src/main/java/org/apache/phoenix/iterate/BaseResultIterators.java的initializeScan(),它是把每个scan都加上了select的projection。这里的iterator是构建scan的,采用装饰器模式,一层一层的加上去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
private static void initializeScan(QueryPlan plan, Integer perScanLimit) {
StatementContext context = plan.getContext();
TableRef tableRef = plan.getTableRef();
PTable table = tableRef.getTable();
Scan scan = context.getScan();
Map<byte [], NavigableSet<byte []>> familyMap = scan.getFamilyMap();
// Hack for PHOENIX-2067 to force raw scan over all KeyValues to fix their row keys
if (context.getConnection().isDescVarLengthRowKeyUpgrade()) {
// We project *all* KeyValues across all column families as we make a pass over
// a physical table and we want to make sure we catch all KeyValues that may be
// dynamic or part of an updatable view.
familyMap.clear();
scan.setMaxVersions();
scan.setFilter(null); // Remove any filter
scan.setRaw(true); // Traverse (and subsequently clone) all KeyValues
// Pass over PTable so we can re-write rows according to the row key schema
scan.setAttribute(BaseScannerRegionObserver.UPGRADE_DESC_ROW_KEY, UngroupedAggregateRegionObserver.serialize(table));
} else {
FilterableStatement statement = plan.getStatement();
RowProjector projector = plan.getProjector();
boolean optimizeProjection = false;
boolean keyOnlyFilter = familyMap.isEmpty() && context.getWhereConditionColumns().isEmpty();
if (!projector.projectEverything()) {
// If nothing projected into scan and we only have one column family, just allow everything
// to be projected and use a FirstKeyOnlyFilter to skip from row to row. This turns out to
// be quite a bit faster.
// Where condition columns also will get added into familyMap
// When where conditions are present, we cannot add FirstKeyOnlyFilter at beginning.
// FIXME: we only enter this if the number of column families is 1 because otherwise
// local indexes break because it appears that the column families in the PTable do
// not match the actual column families of the table (which is bad).
if (keyOnlyFilter && table.getColumnFamilies().size() == 1) {
// Project the one column family. We must project a column family since it's possible
// that there are other non declared column families that we need to ignore.
scan.addFamily(table.getColumnFamilies().get(0).getName().getBytes());
} else {
optimizeProjection = true;
if (projector.projectEveryRow()) {
if (table.getViewType() == ViewType.MAPPED) {
// Since we don't have the empty key value in MAPPED tables,
// we must project all CFs in HRS. However, only the
// selected column values are returned back to client.
context.getWhereConditionColumns().clear();
for (PColumnFamily family : table.getColumnFamilies()) {
context.addWhereCoditionColumn(family.getName().getBytes(), null);
}
} else {
byte[] ecf = SchemaUtil.getEmptyColumnFamily(table);
// Project empty key value unless the column family containing it has
// been projected in its entirety.
if (!familyMap.containsKey(ecf) || familyMap.get(ecf) != null) {
scan.addColumn(ecf, QueryConstants.EMPTY_COLUMN_BYTES);
}
}
}
}
}
// Add FirstKeyOnlyFilter if there are no references to key value columns
if (keyOnlyFilter) {
ScanUtil.andFilterAtBeginning(scan, new FirstKeyOnlyFilter());
}
if (perScanLimit != null) {
ScanUtil.andFilterAtEnd(scan, new PageFilter(perScanLimit));
}
if (optimizeProjection) {
optimizeProjection(context, scan, table, statement);
}
}
}
private static void optimizeProjection(StatementContext context, Scan scan, PTable table, FilterableStatement statement) {
Map<byte[], NavigableSet<byte[]>> familyMap = scan.getFamilyMap();
// columnsTracker contain cf -> qualifiers which should get returned.
Map<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>> columnsTracker =
new TreeMap<ImmutableBytesPtr, NavigableSet<ImmutableBytesPtr>>();
Set<byte[]> conditionOnlyCfs = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
int referencedCfCount = familyMap.size();
boolean filteredColumnNotInProjection = false;
for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
byte[] filteredFamily = whereCol.getFirst();
if (!(familyMap.containsKey(filteredFamily))) {
referencedCfCount++;
filteredColumnNotInProjection = true;
} else if (!filteredColumnNotInProjection) {
NavigableSet<byte[]> projectedColumns = familyMap.get(filteredFamily);
if (projectedColumns != null) {
byte[] filteredColumn = whereCol.getSecond();
if (filteredColumn == null) {
filteredColumnNotInProjection = true;
} else {
filteredColumnNotInProjection = !projectedColumns.contains(filteredColumn);
}
}
}
}
boolean preventSeekToColumn;
if (statement.getHint().hasHint(Hint.SEEK_TO_COLUMN)) {
// Allow seeking to column during filtering
preventSeekToColumn = false;
} else if (statement.getHint().hasHint(Hint.NO_SEEK_TO_COLUMN)) {
// Prevent seeking to column during filtering
preventSeekToColumn = true;
} else {
int hbaseServerVersion = context.getConnection().getQueryServices().getLowestClusterHBaseVersion();
// When only a single column family is referenced, there are no hints, and HBase server version
// is less than when the fix for HBASE-13109 went in (0.98.12), then we prevent seeking to a
// column.
preventSeekToColumn = referencedCfCount == 1 && hbaseServerVersion < MIN_SEEK_TO_COLUMN_VERSION;
}
for (Entry<byte[], NavigableSet<byte[]>> entry : familyMap.entrySet()) {
ImmutableBytesPtr cf = new ImmutableBytesPtr(entry.getKey());
NavigableSet<byte[]> qs = entry.getValue();
NavigableSet<ImmutableBytesPtr> cols = null;
if (qs != null) {
cols = new TreeSet<ImmutableBytesPtr>();
for (byte[] q : qs) {
cols.add(new ImmutableBytesPtr(q));
}
}
columnsTracker.put(cf, cols);
}
// Making sure that where condition CFs are getting scanned at HRS.
for (Pair<byte[], byte[]> whereCol : context.getWhereConditionColumns()) {
byte[] family = whereCol.getFirst();
if (preventSeekToColumn) {
if (!(familyMap.containsKey(family))) {
conditionOnlyCfs.add(family);
}
scan.addFamily(family);
} else {
if (familyMap.containsKey(family)) {
// where column's CF is present. If there are some specific columns added against this CF, we
// need to ensure this where column also getting added in it.
// If the select was like select cf1.*, then that itself will select the whole CF. So no need to
// specifically add the where column. Adding that will remove the cf1.* stuff and only this
// where condition column will get returned!
NavigableSet<byte[]> cols = familyMap.get(family);
// cols is null means the whole CF will get scanned.
if (cols != null) {
if (whereCol.getSecond() == null) {
scan.addFamily(family);
} else {
scan.addColumn(family, whereCol.getSecond());
}
}
} else if (whereCol.getSecond() == null) {
scan.addFamily(family);
} else {
// where column's CF itself is not present in family map. We need to add the column
scan.addColumn(family, whereCol.getSecond());
}
}
}
if (!columnsTracker.isEmpty()) {
if (preventSeekToColumn) {
for (ImmutableBytesPtr f : columnsTracker.keySet()) {
// This addFamily will remove explicit cols in scan familyMap and make it as entire row.
// We don't want the ExplicitColumnTracker to be used. Instead we have the ColumnProjectionFilter
scan.addFamily(f.get());
}
}
// We don't need this filter for aggregates, as we're not returning back what's
// in the scan in this case. We still want the other optimization that causes
// the ExplicitColumnTracker not to be used, though.
if (!statement.isAggregate() && filteredColumnNotInProjection) {
ScanUtil.andFilterAtEnd(scan, new ColumnProjectionFilter(SchemaUtil.getEmptyColumnFamily(table),
columnsTracker, conditionOnlyCfs));
}
}
}