Skip to content

Commit a733a56

Browse files
authored
[fix](nereids)Add catalog/db/table filter info in SchemaScanNode (apache#46864) (apache#47550)
backport: apache#46864
1 parent 5e95141 commit a733a56

File tree

15 files changed

+347
-21
lines changed

15 files changed

+347
-21
lines changed

be/src/exec/schema_scanner/schema_tables_scanner.cpp

+3
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ Status SchemaTablesScanner::_get_new_table() {
112112
if (nullptr != _param->common_param->wild) {
113113
table_params.__set_pattern(*(_param->common_param->wild));
114114
}
115+
if (nullptr != _param->common_param->table) {
116+
table_params.__set_table(*(_param->common_param->table));
117+
}
115118
if (nullptr != _param->common_param->current_user_ident) {
116119
table_params.__set_current_user_ident(*(_param->common_param->current_user_ident));
117120
} else {

fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java

+6-2
Original file line numberDiff line numberDiff line change
@@ -887,9 +887,13 @@ public PlanFragment visitPhysicalSchemaScan(PhysicalSchemaScan schemaScan, PlanT
887887
SchemaScanNode scanNode = null;
888888
if (BackendPartitionedSchemaScanNode.isBackendPartitionedSchemaTable(
889889
table.getName())) {
890-
scanNode = new BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), tupleDescriptor);
890+
scanNode = new BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), tupleDescriptor,
891+
schemaScan.getSchemaCatalog().orElse(null), schemaScan.getSchemaDatabase().orElse(null),
892+
schemaScan.getSchemaTable().orElse(null));
891893
} else {
892-
scanNode = new SchemaScanNode(context.nextPlanNodeId(), tupleDescriptor);
894+
scanNode = new SchemaScanNode(context.nextPlanNodeId(), tupleDescriptor,
895+
schemaScan.getSchemaCatalog().orElse(null), schemaScan.getSchemaDatabase().orElse(null),
896+
schemaScan.getSchemaTable().orElse(null));
893897
}
894898
scanNode.setNereidsId(schemaScan.getId());
895899
SchemaScanNode finalScanNode = scanNode;

fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@
115115
import org.apache.doris.nereids.rules.rewrite.PushDownAggThroughJoinOneSide;
116116
import org.apache.doris.nereids.rules.rewrite.PushDownAggWithDistinctThroughJoinOneSide;
117117
import org.apache.doris.nereids.rules.rewrite.PushDownDistinctThroughJoin;
118+
import org.apache.doris.nereids.rules.rewrite.PushDownFilterIntoSchemaScan;
118119
import org.apache.doris.nereids.rules.rewrite.PushDownFilterThroughProject;
119120
import org.apache.doris.nereids.rules.rewrite.PushDownLimit;
120121
import org.apache.doris.nereids.rules.rewrite.PushDownLimitDistinctThroughJoin;
@@ -395,7 +396,8 @@ public class Rewriter extends AbstractBatchJobExecutor {
395396
topDown(
396397
new PruneOlapScanPartition(),
397398
new PruneEmptyPartition(),
398-
new PruneFileScanPartition()
399+
new PruneFileScanPartition(),
400+
new PushDownFilterIntoSchemaScan()
399401
)
400402
),
401403
topic("MV optimization",

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java

+1
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ public enum RuleType {
291291
OLAP_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
292292

293293
FILE_SCAN_PARTITION_PRUNE(RuleTypeClass.REWRITE),
294+
PUSH_FILTER_INTO_SCHEMA_SCAN(RuleTypeClass.REWRITE),
294295
PUSH_CONJUNCTS_INTO_JDBC_SCAN(RuleTypeClass.REWRITE),
295296
PUSH_CONJUNCTS_INTO_ODBC_SCAN(RuleTypeClass.REWRITE),
296297
PUSH_CONJUNCTS_INTO_ES_SCAN(RuleTypeClass.REWRITE),

fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalSchemaScanToPhysicalSchemaScan.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ public Rule build() {
3434
scan.getTable(),
3535
scan.getQualifier(),
3636
Optional.empty(),
37-
scan.getLogicalProperties())
37+
scan.getLogicalProperties(),
38+
scan.getSchemaCatalog(),
39+
scan.getSchemaDatabase(),
40+
scan.getSchemaTable())
3841
).toRule(RuleType.LOGICAL_SCHEMA_SCAN_TO_PHYSICAL_SCHEMA_SCAN_RULE);
3942
}
4043
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.nereids.rules.rewrite;
19+
20+
import org.apache.doris.catalog.Column;
21+
import org.apache.doris.nereids.rules.Rule;
22+
import org.apache.doris.nereids.rules.RuleType;
23+
import org.apache.doris.nereids.trees.expressions.EqualTo;
24+
import org.apache.doris.nereids.trees.expressions.Expression;
25+
import org.apache.doris.nereids.trees.expressions.SlotReference;
26+
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
27+
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
28+
import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan;
29+
30+
import com.google.common.collect.ImmutableList;
31+
32+
import java.util.Optional;
33+
34+
/**
35+
* Used to push down catalog/db/table name to schema scan node.
36+
*/
37+
public class PushDownFilterIntoSchemaScan extends OneRewriteRuleFactory {
38+
39+
@Override
40+
public Rule build() {
41+
return logicalFilter(logicalSchemaScan()).when(p -> !p.child().isFilterPushed()).thenApply(ctx -> {
42+
LogicalFilter<LogicalSchemaScan> filter = ctx.root;
43+
LogicalSchemaScan scan = filter.child();
44+
Optional<String> schemaCatalog = Optional.empty();
45+
Optional<String> schemaDatabase = Optional.empty();
46+
Optional<String> schemaTable = Optional.empty();
47+
for (Expression expression : filter.getConjuncts()) {
48+
if (!(expression instanceof EqualTo)) {
49+
continue;
50+
}
51+
Expression slot = expression.child(0);
52+
if (!(slot instanceof SlotReference)) {
53+
continue;
54+
}
55+
Optional<Column> column = ((SlotReference) slot).getColumn();
56+
if (!column.isPresent()) {
57+
continue;
58+
}
59+
String columnName = column.get().getName();
60+
Expression slotValue = expression.child(1);
61+
if (!(slotValue instanceof VarcharLiteral)) {
62+
continue;
63+
}
64+
String columnValue = ((VarcharLiteral) slotValue).getValue();
65+
if ("TABLE_CATALOG".equals(columnName)) {
66+
schemaCatalog = Optional.of(columnValue);
67+
} else if ("TABLE_SCHEMA".equals(columnName)) {
68+
schemaDatabase = Optional.of(columnValue);
69+
} else if ("TABLE_NAME".equals(columnName)) {
70+
schemaTable = Optional.of(columnValue);
71+
}
72+
}
73+
LogicalSchemaScan rewrittenScan = scan.withSchemaIdentifier(schemaCatalog, schemaDatabase, schemaTable);
74+
return filter.withChildren(ImmutableList.of(rewrittenScan));
75+
}).toRule(RuleType.PUSH_FILTER_INTO_SCHEMA_SCAN);
76+
}
77+
}

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSchemaScan.java

+68-4
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,52 @@
2727
import org.apache.doris.nereids.util.Utils;
2828

2929
import java.util.List;
30+
import java.util.Objects;
3031
import java.util.Optional;
3132

3233
/**
3334
* LogicalSchemaScan.
3435
*/
3536
public class LogicalSchemaScan extends LogicalCatalogRelation {
3637

38+
private final boolean filterPushed;
39+
private final Optional<String> schemaCatalog;
40+
private final Optional<String> schemaDatabase;
41+
private final Optional<String> schemaTable;
42+
3743
public LogicalSchemaScan(RelationId id, TableIf table, List<String> qualifier) {
3844
super(id, PlanType.LOGICAL_SCHEMA_SCAN, table, qualifier);
45+
this.filterPushed = false;
46+
this.schemaCatalog = Optional.empty();
47+
this.schemaDatabase = Optional.empty();
48+
this.schemaTable = Optional.empty();
3949
}
4050

4151
public LogicalSchemaScan(RelationId id, TableIf table, List<String> qualifier,
42-
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties) {
52+
Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties,
53+
boolean filterPushed, Optional<String> schemaCatalog, Optional<String> schemaDatabase,
54+
Optional<String> schemaTable) {
4355
super(id, PlanType.LOGICAL_SCHEMA_SCAN, table, qualifier, groupExpression, logicalProperties);
56+
this.filterPushed = filterPushed;
57+
this.schemaCatalog = schemaCatalog;
58+
this.schemaDatabase = schemaDatabase;
59+
this.schemaTable = schemaTable;
60+
}
61+
62+
public boolean isFilterPushed() {
63+
return filterPushed;
64+
}
65+
66+
public Optional<String> getSchemaCatalog() {
67+
return schemaCatalog;
68+
}
69+
70+
public Optional<String> getSchemaDatabase() {
71+
return schemaDatabase;
72+
}
73+
74+
public Optional<String> getSchemaTable() {
75+
return schemaTable;
4476
}
4577

4678
@Override
@@ -56,22 +88,54 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
5688
@Override
5789
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
5890
return new LogicalSchemaScan(relationId, table, qualifier,
59-
groupExpression, Optional.of(getLogicalProperties()));
91+
groupExpression, Optional.of(getLogicalProperties()), filterPushed,
92+
schemaCatalog, schemaDatabase, schemaTable);
6093
}
6194

6295
@Override
6396
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
6497
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
65-
return new LogicalSchemaScan(relationId, table, qualifier, groupExpression, logicalProperties);
98+
return new LogicalSchemaScan(relationId, table, qualifier, groupExpression, logicalProperties, filterPushed,
99+
schemaCatalog, schemaDatabase, schemaTable);
66100
}
67101

68102
@Override
69103
public LogicalSchemaScan withRelationId(RelationId relationId) {
70-
return new LogicalSchemaScan(relationId, table, qualifier, Optional.empty(), Optional.empty());
104+
return new LogicalSchemaScan(relationId, table, qualifier, Optional.empty(), Optional.empty(), filterPushed,
105+
schemaCatalog, schemaDatabase, schemaTable);
106+
}
107+
108+
public LogicalSchemaScan withSchemaIdentifier(Optional<String> schemaCatalog, Optional<String> schemaDatabase,
109+
Optional<String> schemaTable) {
110+
return new LogicalSchemaScan(relationId, table, qualifier, Optional.empty(),
111+
Optional.of(getLogicalProperties()), true, schemaCatalog, schemaDatabase, schemaTable);
71112
}
72113

73114
@Override
74115
public String toString() {
75116
return Utils.toSqlString("LogicalSchemaScan");
76117
}
118+
119+
@Override
120+
public boolean equals(Object o) {
121+
if (this == o) {
122+
return true;
123+
}
124+
if (o == null || getClass() != o.getClass()) {
125+
return false;
126+
}
127+
if (!super.equals(o)) {
128+
return false;
129+
}
130+
LogicalSchemaScan that = (LogicalSchemaScan) o;
131+
return Objects.equals(schemaCatalog, that.schemaCatalog)
132+
&& Objects.equals(schemaDatabase, that.schemaDatabase)
133+
&& Objects.equals(schemaTable, that.schemaTable)
134+
&& filterPushed == that.filterPushed;
135+
}
136+
137+
@Override
138+
public int hashCode() {
139+
return Objects.hash(super.hashCode(), schemaCatalog, schemaDatabase, schemaTable, filterPushed);
140+
}
77141
}

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSchemaScan.java

+55-5
Original file line numberDiff line numberDiff line change
@@ -29,23 +29,48 @@
2929
import org.apache.doris.statistics.Statistics;
3030

3131
import java.util.List;
32+
import java.util.Objects;
3233
import java.util.Optional;
3334

3435
/**
3536
* PhysicalSchemaScan.
3637
*/
3738
public class PhysicalSchemaScan extends PhysicalCatalogRelation {
3839

40+
private final Optional<String> schemaCatalog;
41+
private final Optional<String> schemaDatabase;
42+
private final Optional<String> schemaTable;
43+
3944
public PhysicalSchemaScan(RelationId id, TableIf table, List<String> qualifier,
40-
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties) {
45+
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
46+
Optional<String> schemaCatalog, Optional<String> schemaDatabase, Optional<String> schemaTable) {
4147
super(id, PlanType.PHYSICAL_SCHEMA_SCAN, table, qualifier, groupExpression, logicalProperties);
48+
this.schemaCatalog = schemaCatalog;
49+
this.schemaDatabase = schemaDatabase;
50+
this.schemaTable = schemaTable;
4251
}
4352

4453
public PhysicalSchemaScan(RelationId id, TableIf table, List<String> qualifier,
4554
Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties,
46-
PhysicalProperties physicalProperties, Statistics statistics) {
55+
PhysicalProperties physicalProperties, Statistics statistics,
56+
Optional<String> schemaCatalog, Optional<String> schemaDatabase, Optional<String> schemaTable) {
4757
super(id, PlanType.PHYSICAL_SCHEMA_SCAN, table, qualifier, groupExpression,
4858
logicalProperties, physicalProperties, statistics);
59+
this.schemaCatalog = schemaCatalog;
60+
this.schemaDatabase = schemaDatabase;
61+
this.schemaTable = schemaTable;
62+
}
63+
64+
public Optional<String> getSchemaCatalog() {
65+
return schemaCatalog;
66+
}
67+
68+
public Optional<String> getSchemaDatabase() {
69+
return schemaDatabase;
70+
}
71+
72+
public Optional<String> getSchemaTable() {
73+
return schemaTable;
4974
}
5075

5176
@Override
@@ -61,28 +86,53 @@ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
6186
@Override
6287
public Plan withGroupExpression(Optional<GroupExpression> groupExpression) {
6388
return new PhysicalSchemaScan(relationId, getTable(), qualifier,
64-
groupExpression, getLogicalProperties(), physicalProperties, statistics);
89+
groupExpression, getLogicalProperties(), physicalProperties, statistics,
90+
schemaCatalog, schemaDatabase, schemaTable);
6591
}
6692

6793
@Override
6894
public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression,
6995
Optional<LogicalProperties> logicalProperties, List<Plan> children) {
7096
return new PhysicalSchemaScan(relationId, getTable(), qualifier,
71-
groupExpression, logicalProperties.get(), physicalProperties, statistics);
97+
groupExpression, logicalProperties.get(), physicalProperties, statistics,
98+
schemaCatalog, schemaDatabase, schemaTable);
7299
}
73100

74101
@Override
75102
public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
76103
Statistics statistics) {
77104
return new PhysicalSchemaScan(relationId, getTable(), qualifier,
78-
groupExpression, getLogicalProperties(), physicalProperties, statistics);
105+
groupExpression, getLogicalProperties(), physicalProperties, statistics,
106+
schemaCatalog, schemaDatabase, schemaTable);
79107
}
80108

81109
@Override
82110
public String toString() {
83111
return Utils.toSqlString("PhysicalSchemaScan");
84112
}
85113

114+
@Override
115+
public boolean equals(Object o) {
116+
if (this == o) {
117+
return true;
118+
}
119+
if (o == null || getClass() != o.getClass()) {
120+
return false;
121+
}
122+
if (!super.equals(o)) {
123+
return false;
124+
}
125+
PhysicalSchemaScan that = (PhysicalSchemaScan) o;
126+
return Objects.equals(schemaCatalog, that.schemaCatalog)
127+
&& Objects.equals(schemaDatabase, that.schemaDatabase)
128+
&& Objects.equals(schemaTable, that.schemaTable);
129+
}
130+
131+
@Override
132+
public int hashCode() {
133+
return Objects.hash(super.hashCode(), schemaCatalog, schemaDatabase, schemaTable);
134+
}
135+
86136
@Override
87137
public boolean canPushDownRuntimeFilter() {
88138
// currently be doesn't support schema scan rf

fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,9 @@ public static boolean isBackendPartitionedSchemaTable(String tableName) {
8989
private Map<Long, Long> partitionIDToBackendID;
9090
private Collection<Long> selectedPartitionIds = Lists.newArrayList();
9191

92-
public BackendPartitionedSchemaScanNode(PlanNodeId id, TupleDescriptor desc) {
93-
super(id, desc);
92+
public BackendPartitionedSchemaScanNode(PlanNodeId id, TupleDescriptor desc,
93+
String schemaCatalog, String schemaDatabase, String schemaTable) {
94+
super(id, desc, schemaCatalog, schemaDatabase, schemaTable);
9495
}
9596

9697
@Override

fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,13 @@ public class SchemaScanNode extends ScanNode {
6161
/**
6262
* Constructs node to scan given data files of table 'tbl'.
6363
*/
64-
public SchemaScanNode(PlanNodeId id, TupleDescriptor desc) {
64+
public SchemaScanNode(PlanNodeId id, TupleDescriptor desc,
65+
String schemaCatalog, String schemaDb, String schemaTable) {
6566
super(id, desc, "SCAN SCHEMA", StatisticalType.SCHEMA_SCAN_NODE);
6667
this.tableName = desc.getTable().getName();
68+
this.schemaCatalog = schemaCatalog;
69+
this.schemaDb = schemaDb;
70+
this.schemaTable = schemaTable;
6771
}
6872

6973
public String getTableName() {

0 commit comments

Comments
 (0)