Skip to content

Commit 5b68de5

Browse files
924060929dataroaring
authored andcommitted
[refactor](nereids) New distribute planner (apache#36531)
## Proposed changes The legacy coordinator act not only scheduler but also distribute planner. The code is so complex to understand, and hard to extend, and exist many limitations. This pr extract and refine the computation of degree of parallel(dop) to a new DistributePlanner and resolve the limitations. ## How to use this function This function only use for nereids + pipelinex, and current only support query statement, and non cloud mode. Open this session variables to use this function: ```sql set enable_nereids_distribute_planner=true; -- default is false set enable_nereids_planner=true; -- default is true ``` ## Core process and concepts ``` ┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐ │ │ │ ┌──────────────┐ ┌───────────────┐ ┌───────────────────┐ ┌─────────────────────────┐ │ │ Translate │ │ Typed │ │ Assign │ │ Wrap │ │ │ │ ──────────► │ PlanFragment │ ──────► │ UnassignedJob │ ───────► │ StaticAssignedJob │ ─────► │ PipelineDistributedPlan │ │ │ │ │ │ │ │ │ │ │ │ │ └──────────────┘ └───────────────┘ └───────────────────┘ └─────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘ │ │ │ │ └──────────────────────────────────────────────────┐ ┌─────────────────────────────────────────────────────┘ │ │ │ │ ┌──────────────┐ ┌─────────────────┐ ┌───────────────────┐ │ │ Distribute │ │ AdHoc │ │ │ PhysicalPlan │ ───────────► │ DistributedPlan │ ──────► │ PipelineScheduler │ │ │ │ │ │ │ └──────────────┘ └─────────────────┘ └───────────────────┘ ``` DistributePlanner is a new planner to compute dop and generate instances, it consume PlanFragment and do this tasks 1. Use PlanFragment to generate `UnassignedJob`, it's a **Typed Fragment**, decided how to calculate dop and how to select the datasource, but this fragment not yet assigned some backends and datasources. These are some unassignedJobs: UnassignedScanSingleOlapTableJob, UnassignedScanBucketOlapTableJob, UnassignedShuffleJob, UnassignedQueryConstantJob. Keep UnassignedJob different can decoupling unrelated logic, and easy to extend: just and a new type of UnassignedJob. 2. Use UnassignedJob to select datasource, compute dop, and generate `AssignedJob`, means a instance, which already assigned datasource and backend. There are StaticAssignedJob and LocalShuffleAssignedJob, we will add DynamicAssignedJob when support StageScheduler and adaptive query execution 3. Wrap PlanFragment, UnassignedJob and AssignedJob to `PipelineDistributedPlan`, the coordinator will consume the DistributedPlan and translate to TPlan and schedule instances ## Resolve limitations **1. left table shuffle to right table** if right table has distribution which distribute by `storage hash`, and left table has distribution which distribute by `compute hash`, we can shuffle left to right by `storage hash` to do shuffle bucket join, and keep right side not move. ```sql select * from ( select id2 from test_shuffle_left group by id2 ) a inner join [shuffle] test_shuffle_left b on a.id2=b.id; | PhysicalResultSink[288] ( outputExprs=[id2#1, id#2, id2#3] ) ... | +--PhysicalHashJoin[285]@4 ( type=INNER_JOIN, stats=3, hashCondition=[(id2#1 = id#2)], otherCondition=[], markCondition=[], hint=[shuffle] ) ... | |--PhysicalDistribute[281]@2 ( stats=1.5, distributionSpec=DistributionSpecHash ( orderedShuffledColumns=[1], shuffleType=STORAGE_BUCKETED, tableId=-1, selectedIndexId=-1, partitionIds=... | | +--PhysicalHashAggregate[278]@2 ( aggPhase=GLOBAL, aggMode=BUFFER_TO_RESULT, maybeUseStreaming=false, groupByExpr=[id2#1], outputExpr=[id2#1], partitionExpr=Optional[[id2#1]], requir... | | +--PhysicalDistribute[275]@7 ( stats=1.5, distributionSpec=DistributionSpecHash ( orderedShuffledColumns=[1], shuffleType=EXECUTION_BUCKETED, tableId=-1, selectedIndexId=-1, parti... | | +--PhysicalHashAggregate[272]@7 ( aggPhase=LOCAL, aggMode=INPUT_TO_BUFFER, maybeUseStreaming=true, groupByExpr=[id2#1], outputExpr=[id2#1], partitionExpr=Optional[[id2#1]], req... | | +--PhysicalProject[269]@1 ( stats=3, projects=[id2#1] ) ... | | +--PhysicalOlapScan[test_shuffle_left]@0 ( stats=3 ) ... | +--PhysicalOlapScan[test_shuffle_left]@3 ( stats=3 ) ``` **2. support colocate union numbers function** support use one instance to union/join numbers, note this plan no any PhysicalDistribute plan: ```sql explain physical plan select * from numbers('number'='3')a union all select * from numbers('number'='4')b PhysicalResultSink[98] ( outputExprs=[number#2] ) +--PhysicalUnion@ ( qualifier=ALL, outputs=[number#2], regularChildrenOutputs=[[number#0], [number#1]], constantExprsList=[], stats=7 ) |--PhysicalTVFRelation ( qualified=NumbersTableValuedFunction, output=[number#0], function=numbers('number' = '3') ) +--PhysicalTVFRelation ( qualified=NumbersTableValuedFunction, output=[number#1], function=numbers('number' = '4') ) ``` **3. support bucket prune with right outer bucket shuffle join** left table prune some buckets, say [bucket 1, bucket 3] we should process the right bucket shuffle join like this ``` [ (left bucket 1) right outer join (exchange right table which should process by bucket 1), (empty bucket) right outer join (exchange right table which should process by bucket 2), (left bucket 3) right outer join (exchange right table which should process by bucket 3) ] ``` the left bucket 2 is pruned, so right table can not shuffle to left, because the left instance not exists, so bucket 2 will return empty rows and wrong. new DistributePlanner can fill up this instance. the case: ```sql explain physical plan SELECT * FROM (select * from test_outer_join1 where c0 =1)a RIGHT OUTER JOIN (select * from test_outer_join2)b ON a.c0 = b.c0 ``` ### New feature add an explain statement to show distributed plans ```sql explain distributed plan select ... ``` for example, you can use this function to check how many instances generated, how many bytes the instance will scan, which backend will process the instance: ```sql MySQL [email protected]:test> explain distributed plan select * from test_shuffle_left2 a join [shuffle] test_shuffle_left2 b on a.id2=b.id; Explain String(Nereids Planner) ------------------------------------------------------------------------------------------------------- PipelineDistributedPlan( id: 0, parallel: 2, fragmentJob: UnassignedScanSingleOlapTableJob, fragment: { OUTPUT EXPRS: id[apache#8] id2[apache#9] id[apache#10] id2[apache#11] PARTITION: HASH_PARTITIONED: id2[#3] HAS_COLO_PLAN_NODE: false VRESULT SINK MYSQL_PROTOCAL 3:VHASH JOIN(152) | join op: INNER JOIN(PARTITIONED)[] | equal join conjunct: (id2[#3] = id[#0]) | cardinality=3 | vec output tuple id: 3 | output tuple id: 3 | vIntermediate tuple ids: 2 | hash output slot ids: 0 1 2 3 | isMarkJoin: false | final projections: id[#4], id2[#5], id[#6], id2[#7] | final project output tuple id: 3 | distribute expr lists: id2[#3] | distribute expr lists: id[#0] | tuple ids: 1 0 | |----0:VOlapScanNode(149) | TABLE: test.test_shuffle_left2(test_shuffle_left2), PREAGGREGATION: ON | partitions=1/1 (test_shuffle_left2) | tablets=10/10, tabletList=22038,22040,22042 ... | cardinality=3, avgRowSize=0.0, numNodes=1 | pushAggOp=NONE | tuple ids: 0 | 2:VEXCHANGE offset: 0 distribute expr lists: id[#2] tuple ids: 1 }, instanceJobs: [ LocalShuffleAssignedJob( index: 0, worker: BackendWorker(id: 10095, address: 192.168.126.1:9050), shareScanIndex: 0, scanSource: [ { scanNode: OlapScanNode{id=0, tid=0, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1}, scanRanges: ScanRanges(bytes: 400, ranges: [ tablet 22038, bytes: 0, tablet 22042, bytes: 0, tablet 22046, bytes: 0, tablet 22050, bytes: 400, tablet 22054, bytes: 0 ]) } ] ), LocalShuffleAssignedJob( index: 1, worker: BackendWorker(id: 10096, address: 192.168.126.2:9051), shareScanIndex: 1, scanSource: [ { scanNode: OlapScanNode{id=0, tid=0, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1}, scanRanges: ScanRanges(bytes: 796, ranges: [ tablet 22040, bytes: 397, tablet 22044, bytes: 0, tablet 22048, bytes: 399, tablet 22052, bytes: 0, tablet 22056, bytes: 0 ]) } ] ) ] ) PipelineDistributedPlan( id: 1, parallel: 2, fragmentJob: UnassignedScanSingleOlapTableJob, fragment: { PARTITION: HASH_PARTITIONED: id[#2] HAS_COLO_PLAN_NODE: false STREAM DATA SINK EXCHANGE ID: 02 HASH_PARTITIONED: id2[#3] 1:VOlapScanNode(145) TABLE: test.test_shuffle_left2(test_shuffle_left2), PREAGGREGATION: ON partitions=1/1 (test_shuffle_left2) tablets=10/10, tabletList=22038,22040,22042 ... cardinality=3, avgRowSize=0.0, numNodes=1 pushAggOp=NONE tuple ids: 1 }, instanceJobs: [ LocalShuffleAssignedJob( index: 0, worker: BackendWorker(id: 10095, address: 192.168.126.1:9050), shareScanIndex: 0, scanSource: [ { scanNode: OlapScanNode{id=1, tid=1, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1}, scanRanges: ScanRanges(bytes: 400, ranges: [ tablet 22038, bytes: 0, tablet 22042, bytes: 0, tablet 22046, bytes: 0, tablet 22050, bytes: 400, tablet 22054, bytes: 0 ]) } ] ), LocalShuffleAssignedJob( index: 1, worker: BackendWorker(id: 10096, address: 192.168.126.2:9051), shareScanIndex: 1, scanSource: [ { scanNode: OlapScanNode{id=1, tid=1, tblName=test_shuffle_left2, keyRanges=, preds= limit=-1}, scanRanges: ScanRanges(bytes: 796, ranges: [ tablet 22040, bytes: 397, tablet 22044, bytes: 0, tablet 22048, bytes: 399, tablet 22052, bytes: 0, tablet 22056, bytes: 0 ]) } ] ) ] ) Hint log: Used: [shuffle]_2 UnUsed: SyntaxError: ``` ## TODO 1. extract PipelineScheduler from Coordinator 2. move this framework into cascades and compute cost by dop 3. support StageScheduler, adaptive query execution and DynamicAssignedJob
1 parent f04c185 commit 5b68de5

File tree

76 files changed

+4438
-77
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+4438
-77
lines changed

fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4

+1
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ planType
262262
| OPTIMIZED | PHYSICAL // same type
263263
| SHAPE
264264
| MEMO
265+
| DISTRIBUTED
265266
| ALL // default type
266267
;
267268

fe/fe-core/src/main/java/org/apache/doris/common/Id.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
/**
2626
* Integer ids that cannot accidentally be compared with ints.
2727
*/
28-
public class Id<IdType extends Id<IdType>> {
28+
public class Id<IdType extends Id<IdType>> implements Comparable<Id<IdType>> {
2929
protected final int id;
3030

3131
public Id(int id) {
@@ -62,4 +62,9 @@ public ArrayList<IdType> asList() {
6262
public String toString() {
6363
return Integer.toString(id);
6464
}
65+
66+
@Override
67+
public int compareTo(Id<IdType> idTypeId) {
68+
return id - idTypeId.id;
69+
}
6570
}

fe/fe-core/src/main/java/org/apache/doris/common/TreeNode.java

+25
Original file line numberDiff line numberDiff line change
@@ -249,4 +249,29 @@ public void foreach(ThrowingConsumer<TreeNode<NodeType>> func) throws AnalysisEx
249249
child.foreach(func);
250250
}
251251
}
252+
253+
/** anyMatch */
254+
public boolean anyMatch(Predicate<TreeNode<? extends NodeType>> func) {
255+
if (func.apply(this)) {
256+
return true;
257+
}
258+
259+
for (NodeType child : children) {
260+
if (child.anyMatch(func)) {
261+
return true;
262+
}
263+
}
264+
return false;
265+
}
266+
267+
/** foreachDown */
268+
public void foreachDown(Predicate<TreeNode<NodeType>> visitor) {
269+
if (!visitor.test(this)) {
270+
return;
271+
}
272+
273+
for (TreeNode<NodeType> child : getChildren()) {
274+
child.foreachDown(visitor);
275+
}
276+
}
252277
}

fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java

+10
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.doris.common.util.ProfileManager;
2121
import org.apache.doris.common.util.RuntimeProfile;
2222
import org.apache.doris.nereids.NereidsPlanner;
23+
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
24+
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
2325
import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
2426
import org.apache.doris.planner.Planner;
2527

@@ -108,6 +110,14 @@ public synchronized void updateSummary(long startTime, Map<String, String> summa
108110
}
109111
summaryInfo.put(SummaryProfile.PHYSICAL_PLAN,
110112
builder.toString().replace("\n", "\n "));
113+
114+
FragmentIdMapping<DistributedPlan> distributedPlans = nereidsPlanner.getDistributedPlans();
115+
if (distributedPlans != null) {
116+
summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
117+
DistributedPlan.toString(Lists.newArrayList(distributedPlans.values()))
118+
.replace("\n", "\n ")
119+
);
120+
}
111121
}
112122
summaryProfile.update(summaryInfo);
113123
for (ExecutionProfile executionProfile : executionProfiles) {

fe/fe-core/src/main/java/org/apache/doris/common/profile/SummaryProfile.java

+13
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ public class SummaryProfile {
5454
public static final String TRACE_ID = "Trace ID";
5555
public static final String WORKLOAD_GROUP = "Workload Group";
5656
public static final String PHYSICAL_PLAN = "Physical Plan";
57+
public static final String DISTRIBUTED_PLAN = "Distributed Plan";
5758
// Execution Summary
5859
public static final String EXECUTION_SUMMARY_PROFILE_NAME = "Execution Summary";
5960
public static final String ANALYSIS_TIME = "Analysis Time";
@@ -86,6 +87,7 @@ public class SummaryProfile {
8687
public static final String NEREIDS_REWRITE_TIME = "Nereids Rewrite Time";
8788
public static final String NEREIDS_OPTIMIZE_TIME = "Nereids Optimize Time";
8889
public static final String NEREIDS_TRANSLATE_TIME = "Nereids Translate Time";
90+
public static final String NEREIDS_DISTRIBUTE_TIME = "Nereids Distribute Time";
8991

9092
public static final String FRAGMENT_COMPRESSED_SIZE = "Fragment Compressed Size";
9193
public static final String FRAGMENT_RPC_COUNT = "Fragment RPC Count";
@@ -109,6 +111,7 @@ public class SummaryProfile {
109111
public static final ImmutableList<String> SUMMARY_KEYS = new ImmutableList.Builder<String>()
110112
.addAll(SUMMARY_CAPTIONS)
111113
.add(PHYSICAL_PLAN)
114+
.add(DISTRIBUTED_PLAN)
112115
.build();
113116

114117
// The display order of execution summary items.
@@ -199,6 +202,7 @@ public class SummaryProfile {
199202
private long nereidsRewriteFinishTime = -1;
200203
private long nereidsOptimizeFinishTime = -1;
201204
private long nereidsTranslateFinishTime = -1;
205+
private long nereidsDistributeFinishTime = -1;
202206
// timestamp of query begin
203207
private long queryBeginTime = -1;
204208
// Analysis end time
@@ -315,6 +319,7 @@ private void updateExecutionSummaryProfile() {
315319
executionSummaryProfile.addInfoString(NEREIDS_REWRITE_TIME, getPrettyNereidsRewriteTime());
316320
executionSummaryProfile.addInfoString(NEREIDS_OPTIMIZE_TIME, getPrettyNereidsOptimizeTime());
317321
executionSummaryProfile.addInfoString(NEREIDS_TRANSLATE_TIME, getPrettyNereidsTranslateTime());
322+
executionSummaryProfile.addInfoString(NEREIDS_DISTRIBUTE_TIME, getPrettyNereidsDistributeTime());
318323
executionSummaryProfile.addInfoString(ANALYSIS_TIME,
319324
getPrettyTime(queryAnalysisFinishTime, queryBeginTime, TUnit.TIME_MS));
320325
executionSummaryProfile.addInfoString(PLAN_TIME,
@@ -419,6 +424,10 @@ public void setNereidsTranslateTime() {
419424
this.nereidsTranslateFinishTime = TimeUtils.getStartTimeMs();
420425
}
421426

427+
public void setNereidsDistributeTime() {
428+
this.nereidsDistributeFinishTime = TimeUtils.getStartTimeMs();
429+
}
430+
422431
public void setQueryBeginTime() {
423432
this.queryBeginTime = TimeUtils.getStartTimeMs();
424433
}
@@ -654,6 +663,10 @@ public String getPrettyNereidsTranslateTime() {
654663
return getPrettyTime(nereidsTranslateFinishTime, nereidsOptimizeFinishTime, TUnit.TIME_MS);
655664
}
656665

666+
public String getPrettyNereidsDistributeTime() {
667+
return getPrettyTime(nereidsDistributeFinishTime, nereidsTranslateFinishTime, TUnit.TIME_MS);
668+
}
669+
657670
private String getPrettyGetPartitionVersionTime() {
658671
if (getPartitionVersionTime == 0) {
659672
return "N/A";

fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java

+54-6
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@
5252
import org.apache.doris.nereids.trees.expressions.literal.Literal;
5353
import org.apache.doris.nereids.trees.plans.Plan;
5454
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
55+
import org.apache.doris.nereids.trees.plans.distribute.DistributePlanner;
56+
import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
57+
import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
5558
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
5659
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
5760
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
@@ -70,6 +73,7 @@
7073
import org.apache.doris.qe.ConnectContext;
7174
import org.apache.doris.qe.ResultSet;
7275
import org.apache.doris.qe.ResultSetMetaData;
76+
import org.apache.doris.qe.SessionVariable;
7377
import org.apache.doris.qe.cache.CacheAnalyzer;
7478

7579
import com.google.common.annotations.VisibleForTesting;
@@ -102,6 +106,7 @@ public class NereidsPlanner extends Planner {
102106
private Plan rewrittenPlan;
103107
private Plan optimizedPlan;
104108
private PhysicalPlan physicalPlan;
109+
private FragmentIdMapping<DistributedPlan> distributedPlans;
105110
// The cost of optimized plan
106111
private double cost = 0;
107112
private LogicalPlanAdapter logicalPlanAdapter;
@@ -130,17 +135,18 @@ public void plan(StatementBase queryStmt, org.apache.doris.thrift.TQueryOptions
130135
LogicalPlan parsedPlan = logicalPlanAdapter.getLogicalPlan();
131136
NereidsTracer.logImportantTime("EndParsePlan");
132137
setParsedPlan(parsedPlan);
138+
133139
PhysicalProperties requireProperties = buildInitRequireProperties();
134140
statementContext.getStopwatch().start();
135141
boolean showPlanProcess = showPlanProcess(queryStmt.getExplainOptions());
136142
Plan resultPlan = plan(parsedPlan, requireProperties, explainLevel, showPlanProcess);
137143
statementContext.getStopwatch().stop();
138144
setOptimizedPlan(resultPlan);
139-
if (explainLevel.isPlanLevel) {
140-
return;
145+
146+
if (resultPlan instanceof PhysicalPlan) {
147+
physicalPlan = (PhysicalPlan) resultPlan;
148+
distribute(physicalPlan, explainLevel);
141149
}
142-
physicalPlan = (PhysicalPlan) resultPlan;
143-
translate(physicalPlan);
144150
}
145151

146152
@VisibleForTesting
@@ -315,7 +321,7 @@ private void optimize() {
315321
}
316322
}
317323

318-
private void translate(PhysicalPlan resultPlan) throws UserException {
324+
private void splitFragments(PhysicalPlan resultPlan) throws UserException {
319325
if (resultPlan instanceof PhysicalSqlCache) {
320326
return;
321327
}
@@ -360,6 +366,27 @@ private void translate(PhysicalPlan resultPlan) throws UserException {
360366
ScanNode.setVisibleVersionForOlapScanNodes(getScanNodes());
361367
}
362368

369+
private void distribute(PhysicalPlan physicalPlan, ExplainLevel explainLevel) throws UserException {
370+
boolean canUseNereidsDistributePlanner = SessionVariable.canUseNereidsDistributePlanner();
371+
if ((!canUseNereidsDistributePlanner && explainLevel.isPlanLevel)) {
372+
return;
373+
} else if ((canUseNereidsDistributePlanner && explainLevel.isPlanLevel
374+
&& (explainLevel != ExplainLevel.ALL_PLAN && explainLevel != ExplainLevel.DISTRIBUTED_PLAN))) {
375+
return;
376+
}
377+
378+
splitFragments(physicalPlan);
379+
380+
if (!canUseNereidsDistributePlanner) {
381+
return;
382+
}
383+
384+
distributedPlans = new DistributePlanner(fragments).plan();
385+
if (statementContext.getConnectContext().getExecutor() != null) {
386+
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsDistributeTime();
387+
}
388+
}
389+
363390
private PhysicalPlan postProcess(PhysicalPlan physicalPlan) {
364391
return new PlanPostProcessors(cascadesContext).process(physicalPlan);
365392
}
@@ -498,6 +525,17 @@ public String getExplainString(ExplainOptions explainOptions) {
498525
+ "\n\n========== MATERIALIZATIONS ==========\n"
499526
+ materializationStringBuilder;
500527
break;
528+
case DISTRIBUTED_PLAN:
529+
StringBuilder distributedPlanStringBuilder = new StringBuilder();
530+
531+
distributedPlanStringBuilder.append("========== DISTRIBUTED PLAN ==========\n");
532+
if (distributedPlans == null || distributedPlans.isEmpty()) {
533+
plan = "Distributed plan not generated, please set enable_nereids_distribute_planner "
534+
+ "and enable_pipeline_x_engine to true";
535+
} else {
536+
plan += DistributedPlan.toString(Lists.newArrayList(distributedPlans.values())) + "\n\n";
537+
}
538+
break;
501539
case ALL_PLAN:
502540
plan = "========== PARSED PLAN "
503541
+ getTimeMetricString(SummaryProfile::getPrettyParseSqlTime) + " ==========\n"
@@ -510,7 +548,13 @@ public String getExplainString(ExplainOptions explainOptions) {
510548
+ rewrittenPlan.treeString() + "\n\n"
511549
+ "========== OPTIMIZED PLAN "
512550
+ getTimeMetricString(SummaryProfile::getPrettyNereidsOptimizeTime) + " ==========\n"
513-
+ optimizedPlan.treeString();
551+
+ optimizedPlan.treeString() + "\n\n";
552+
553+
if (distributedPlans != null && !distributedPlans.isEmpty()) {
554+
plan += "========== DISTRIBUTED PLAN "
555+
+ getTimeMetricString(SummaryProfile::getPrettyNereidsDistributeTime) + " ==========\n";
556+
plan += DistributedPlan.toString(Lists.newArrayList(distributedPlans.values())) + "\n\n";
557+
}
514558
break;
515559
default:
516560
plan = super.getExplainString(explainOptions)
@@ -681,6 +725,10 @@ public PhysicalPlan getPhysicalPlan() {
681725
return physicalPlan;
682726
}
683727

728+
public FragmentIdMapping<DistributedPlan> getDistributedPlans() {
729+
return distributedPlans;
730+
}
731+
684732
public LogicalPlanAdapter getLogicalPlanAdapter() {
685733
return logicalPlanAdapter;
686734
}

fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java

+3
Original file line numberDiff line numberDiff line change
@@ -3451,6 +3451,9 @@ private ExplainLevel parseExplainPlanType(PlanTypeContext planTypeContext) {
34513451
if (planTypeContext.MEMO() != null) {
34523452
return ExplainLevel.MEMO_PLAN;
34533453
}
3454+
if (planTypeContext.DISTRIBUTED() != null) {
3455+
return ExplainLevel.DISTRIBUTED_PLAN;
3456+
}
34543457
return ExplainLevel.ALL_PLAN;
34553458
}
34563459

fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java

+32-20
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
4747
import org.apache.doris.nereids.util.JoinUtils;
4848
import org.apache.doris.qe.ConnectContext;
49+
import org.apache.doris.qe.SessionVariable;
4950

5051
import com.google.common.base.Preconditions;
5152
import com.google.common.collect.ImmutableList;
@@ -213,12 +214,12 @@ private boolean couldNotRightBucketShuffleJoin(JoinType joinType, DistributionSp
213214
|| joinType == JoinType.FULL_OUTER_JOIN);
214215
boolean isSpecInScope = (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
215216
|| rightHashSpec.getShuffleType() == ShuffleType.NATURAL);
216-
return isJoinTypeInScope && isSpecInScope;
217+
return isJoinTypeInScope && isSpecInScope && !SessionVariable.canUseNereidsDistributePlanner();
217218
}
218219

219220
@Override
220-
public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin,
221-
Void context) {
221+
public Boolean visitPhysicalHashJoin(
222+
PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, Void context) {
222223
Preconditions.checkArgument(children.size() == 2, "children.size() != 2");
223224
Preconditions.checkArgument(childrenProperties.size() == 2);
224225
Preconditions.checkArgument(requiredProperties.size() == 2);
@@ -303,13 +304,24 @@ public Boolean visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ? extends
303304
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
304305
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
305306
&& rightHashSpec.getShuffleType() == ShuffleType.NATURAL) {
306-
// TODO: we must do shuffle on right because coordinator could not do right be selection in this case,
307-
// since it always to check the left most node whether olap scan node.
308-
// after we fix coordinator problem, we could do right to left bucket shuffle
309-
updatedForRight = Optional.of(calAnotherSideRequired(
310-
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
311-
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
312-
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
307+
if (SessionVariable.canUseNereidsDistributePlanner()) {
308+
// nereids coordinator can exchange left side to right side to do bucket shuffle join
309+
// TODO: maybe we should check if left child is PhysicalDistribute.
310+
// If so add storage bucketed shuffle on left side. Other wise,
311+
// add execution bucketed shuffle on right side.
312+
updatedForLeft = Optional.of(calAnotherSideRequired(
313+
ShuffleType.STORAGE_BUCKETED, rightHashSpec, leftHashSpec,
314+
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec(),
315+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec()));
316+
} else {
317+
// legacy coordinator could not do right be selection in this case,
318+
// since it always to check the left most node whether olap scan node.
319+
// so we can only shuffle right to left side to do normal shuffle join
320+
updatedForRight = Optional.of(calAnotherSideRequired(
321+
ShuffleType.EXECUTION_BUCKETED, leftHashSpec, rightHashSpec,
322+
(DistributionSpecHash) requiredProperties.get(0).getDistributionSpec(),
323+
(DistributionSpecHash) requiredProperties.get(1).getDistributionSpec()));
324+
}
313325
} else if (leftHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED
314326
&& rightHashSpec.getShuffleType() == ShuffleType.EXECUTION_BUCKETED) {
315327
if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
@@ -537,20 +549,20 @@ private List<ExprId> calAnotherSideRequiredShuffleIds(DistributionSpecHash notSh
537549
* calAnotherSideRequiredShuffleIds's comment.
538550
*
539551
* @param shuffleType real output shuffle type
540-
* @param notShuffleSideOutput not shuffle side real output used hash spec
541-
* @param shuffleSideOutput shuffle side real output used hash spec
542-
* @param notShuffleSideRequired not shuffle side required used hash spec
543-
* @param shuffleSideRequired shuffle side required hash spec
552+
* @param notNeedShuffleSideOutput not shuffle side real output used hash spec
553+
* @param needShuffleSideOutput shuffle side real output used hash spec
554+
* @param notNeedShuffleSideRequired not shuffle side required used hash spec
555+
* @param needShuffleSideRequired shuffle side required hash spec
544556
* @return shuffle side new required hash spec
545557
*/
546558
private PhysicalProperties calAnotherSideRequired(ShuffleType shuffleType,
547-
DistributionSpecHash notShuffleSideOutput, DistributionSpecHash shuffleSideOutput,
548-
DistributionSpecHash notShuffleSideRequired, DistributionSpecHash shuffleSideRequired) {
549-
List<ExprId> shuffleSideIds = calAnotherSideRequiredShuffleIds(notShuffleSideOutput,
550-
notShuffleSideRequired, shuffleSideRequired);
559+
DistributionSpecHash notNeedShuffleSideOutput, DistributionSpecHash needShuffleSideOutput,
560+
DistributionSpecHash notNeedShuffleSideRequired, DistributionSpecHash needShuffleSideRequired) {
561+
List<ExprId> shuffleSideIds = calAnotherSideRequiredShuffleIds(notNeedShuffleSideOutput,
562+
notNeedShuffleSideRequired, needShuffleSideRequired);
551563
return new PhysicalProperties(new DistributionSpecHash(shuffleSideIds, shuffleType,
552-
shuffleSideOutput.getTableId(), shuffleSideOutput.getSelectedIndexId(),
553-
shuffleSideOutput.getPartitionIds()));
564+
needShuffleSideOutput.getTableId(), needShuffleSideOutput.getSelectedIndexId(),
565+
needShuffleSideOutput.getPartitionIds()));
554566
}
555567

556568
private void updateChildEnforceAndCost(int index, PhysicalProperties targetProperties) {

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/TableValuedFunction.java

+4
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.doris.nereids.trees.expressions.shape.UnaryExpression;
3232
import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor;
3333
import org.apache.doris.nereids.types.DataType;
34+
import org.apache.doris.qe.SessionVariable;
3435
import org.apache.doris.statistics.ColumnStatistic;
3536
import org.apache.doris.statistics.Statistics;
3637
import org.apache.doris.tablefunction.TableValuedFunctionIf;
@@ -113,6 +114,9 @@ public boolean nullable() {
113114
}
114115

115116
public PhysicalProperties getPhysicalProperties() {
117+
if (SessionVariable.canUseNereidsDistributePlanner()) {
118+
return PhysicalProperties.ANY;
119+
}
116120
return PhysicalProperties.STORAGE_ANY;
117121
}
118122

0 commit comments

Comments
 (0)