Skip to content

Commit

Permalink
refine again
Browse files Browse the repository at this point in the history
Signed-off-by: before-Sunrise <[email protected]>
  • Loading branch information
before-Sunrise committed Feb 11, 2025
1 parent 177a8f2 commit 604b9cd
Show file tree
Hide file tree
Showing 10 changed files with 50 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class ExchangeNode extends PlanNode {
private long offset;
// partitionType is used for BE's exchange source node to specify the input partition type
// exchange source then decide whether local shuffle is needed
// to be set in ExecutionDAG::connectXXXFragmentToDestFragments
private TPartitionType partitionType;
// this is the same as input fragment's output dataPartition, right now only used for explain
private DataPartition dataPartition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ public void init(Analyzer analyzer) {
}

protected void toThrift(TPlanNode msg, TPlanNodeType nodeType) {
Preconditions.checkState(materializedResultExprLists_.size() == children.size());
List<List<TExpr>> texprLists = Lists.newArrayList();
for (List<Expr> exprList : materializedResultExprLists_) {
texprLists.add(Expr.treesToThrift(exprList));
Expand Down
10 changes: 1 addition & 9 deletions fe/fe-core/src/main/java/com/starrocks/planner/UnionNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,20 @@

package com.starrocks.planner;

import com.starrocks.analysis.Expr;
import com.starrocks.analysis.TupleId;
import com.starrocks.common.LocalExchangerType;
import com.starrocks.thrift.TExplainLevel;
import com.starrocks.thrift.TPlanNode;
import com.starrocks.thrift.TPlanNodeType;

import java.util.List;

public class UnionNode extends SetOperationNode {
private LocalExchangerType localExchangeType;
public UnionNode(PlanNodeId id, TupleId tupleId) {
super(id, tupleId, "UNION");
// default is pass through
// default is pass through, which means round robin
this.localExchangeType = LocalExchangerType.PASS_THROUGH;
}

protected UnionNode(PlanNodeId id, TupleId tupleId,
List<Expr> setOpResultExprs, boolean isInSubplan) {
super(id, tupleId, "UNION", setOpResultExprs, isInSubplan);
}

public void setLocalExchangeType(LocalExchangerType localExchangeType) {
this.localExchangeType = localExchangeType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ public OperatorStr visitPhysicalStreamAgg(OptExpression optExpression, Integer s
}

@Override
public OperatorStr visitPhysicalMerge(OptExpression optExpression, Integer step) {
public OperatorStr visitPhysicalConcatenater(OptExpression optExpression, Integer step) {
return visitPhysicalUnion(optExpression, step);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ public R visitPhysicalSplitConsumer(OptExpression optExpression, C context) {
return visit(optExpression, context);
}

public R visitPhysicalMerge(OptExpression optExpression, C context) {
public R visitPhysicalConcatenater(OptExpression optExpression, C context) {
return visit(optExpression, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public boolean equals(Object o) {

@Override
public <R, C> R accept(OptExpressionVisitor<R, C> visitor, OptExpression optExpression, C context) {
return visitor.visitPhysicalMerge(optExpression, context);
return visitor.visitPhysicalConcatenater(optExpression, context);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ public class PhysicalSplitConsumeOperator extends PhysicalOperator {

private ScalarOperator splitPredicate;

private final Map<ColumnRefOperator, ColumnRefOperator> outputColumnRefMap;
private final List<ColumnRefOperator> outputColumnRefOp;

public PhysicalSplitConsumeOperator(int splitId, ScalarOperator splitPredicate, DistributionSpec distributionSpec,
Map<ColumnRefOperator, ColumnRefOperator> outputColumnRefMap) {
List<ColumnRefOperator> outputColumnRefOp) {
// distributionSpec specifies the distribution of the input of this operator
super(OperatorType.PHYSICAL_SPLIT_CONSUME, distributionSpec);
this.splitId = splitId;
this.splitPredicate = splitPredicate;
this.outputColumnRefMap = outputColumnRefMap;
this.outputColumnRefOp = outputColumnRefOp;
}

public int getSplitId() {
Expand All @@ -53,11 +53,9 @@ public ScalarOperator getSplitPredicate() {

@Override
public RowOutputInfo deriveRowOutputInfo(List<OptExpression> inputs) {
List<ColumnOutputInfo> entryList = Lists.newArrayList();
for (Map.Entry<ColumnRefOperator, ColumnRefOperator> entry : outputColumnRefMap.entrySet()) {
entryList.add(new ColumnOutputInfo(entry.getKey(), entry.getValue()));
}
return new RowOutputInfo(entryList);
List<ColumnOutputInfo> columnOutputInfoList = Lists.newArrayList();
outputColumnRefOp.stream().forEach(e -> columnOutputInfoList.add(new ColumnOutputInfo(e, e)));
return new RowOutputInfo(columnOutputInfoList, outputColumnRefOp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -136,8 +135,10 @@ public OptExpression visitPhysicalHashJoin(OptExpression opt, Void Context) {
return opt;
}

// rewrite plan
OptExpression leftExchangeOptExp = opt.inputAt(0);
PhysicalDistributionOperator leftExchangeOp = (PhysicalDistributionOperator) leftExchangeOptExp.getOp();
PhysicalDistributionOperator leftExchangeOpOfOriginalShuffleJoin =
(PhysicalDistributionOperator) leftExchangeOptExp.getOp();

OptExpression rightExchangeOptExp = opt.inputAt(1);
PhysicalDistributionOperator rightExchangeOpOfOriginalShuffleJoin =
Expand Down Expand Up @@ -166,31 +167,29 @@ public OptExpression visitPhysicalHashJoin(OptExpression opt, Void Context) {
Pair<ScalarOperator, ScalarOperator> rightTablePredicates =
generateInAndNotInPredicate(rightSkewColumn, skewValues);

Map<ColumnRefOperator, ColumnRefOperator> leftSplitOutputColumnRefMap =
generateOutputColumnRefMap(
leftExchangeOptExp.getOutputColumns().getColumnRefOperators(columnRefFactory));
Map<ColumnRefOperator, ColumnRefOperator> rightSplitOutputColumnRefMap =
generateOutputColumnRefMap(
rightExchangeOptExp.getOutputColumns().getColumnRefOperators(columnRefFactory));
List<ColumnRefOperator> leftSplitOutputColumns =
leftExchangeOptExp.getOutputColumns().getColumnRefOperators(columnRefFactory);
List<ColumnRefOperator> rightSplitOutputColumns =
rightExchangeOptExp.getOutputColumns().getColumnRefOperators(columnRefFactory);

PhysicalSplitConsumeOperator leftSplitConsumerOptForShuffleJoin =
new PhysicalSplitConsumeOperator(leftSplitProduceOperator.getSplitId(),
leftTablePredicates.second,
leftExchangeOp.getDistributionSpec(), leftSplitOutputColumnRefMap);
leftExchangeOpOfOriginalShuffleJoin.getDistributionSpec(), leftSplitOutputColumns);

PhysicalSplitConsumeOperator leftSplitConsumerOptForBroadcastJoin =
new PhysicalSplitConsumeOperator(leftSplitProduceOperator.getSplitId(), leftTablePredicates.first,
new RoundRobinDistributionSpec(), leftSplitOutputColumnRefMap);
new RoundRobinDistributionSpec(), leftSplitOutputColumns);

PhysicalSplitConsumeOperator rightSplitConsumerOptForShuffleJoin =
new PhysicalSplitConsumeOperator(rightSplitProduceOperator.getSplitId(),
rightTablePredicates.second,
rightExchangeOpOfOriginalShuffleJoin.getDistributionSpec(), rightSplitOutputColumnRefMap);
rightExchangeOpOfOriginalShuffleJoin.getDistributionSpec(), rightSplitOutputColumns);

PhysicalSplitConsumeOperator rightSplitConsumerOptForBroadcastJoin =
new PhysicalSplitConsumeOperator(rightSplitProduceOperator.getSplitId(),
rightTablePredicates.first,
DistributionSpec.createReplicatedDistributionSpec(), rightSplitOutputColumnRefMap);
DistributionSpec.createReplicatedDistributionSpec(), rightSplitOutputColumns);

Projection projectionOnJoin = originalShuffleJoinOperator.getProjection();

Expand Down Expand Up @@ -415,15 +414,6 @@ private SkewColumnAndValues findSkewColumns(OptExpression input) {
return new SkewColumnAndValues(skewValues, Pair.create(leftSkewColumn, rightSkewColumn));
}

private Map<ColumnRefOperator, ColumnRefOperator> generateOutputColumnRefMap(
List<ColumnRefOperator> outputColumns) {
Map<ColumnRefOperator, ColumnRefOperator> result = new HashMap<>();
for (ColumnRefOperator columnRefOperator : outputColumns) {
result.put(columnRefOperator, columnRefOperator);
}
return result;
}

private ScalarOperator replaceColumnRef(ScalarOperator scalarOperator,
Projection projection) {
if (projection == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3952,7 +3952,7 @@ public PlanFragment visitPhysicalSplitConsumer(OptExpression optExpression, Exec
}

@Override
public PlanFragment visitPhysicalMerge(OptExpression optExpr, ExecPlan context) {
public PlanFragment visitPhysicalConcatenater(OptExpression optExpr, ExecPlan context) {
PlanFragment leftChild = visit(optExpr.inputAt(0), context);
PlanFragment rightChild = visit(optExpr.inputAt(1), context);

Expand All @@ -3974,11 +3974,19 @@ public PlanFragment visitPhysicalMerge(OptExpression optExpr, ExecPlan context)
context.getColRefToExpr().put(columnRefOperator, new SlotRef(columnRefOperator.toString(), slotDesc));
}

// all use union pass through, wchch means just output the input-chunk
setOperationNode.setFirstMaterializedChildIdx_(optExpr.arity());

currentExecGroup.add(setOperationNode, true);

List<Map<Integer, Integer>> outputSlotIdToChildSlotIdMaps = new ArrayList<>();

ScalarOperatorToExpr.FormatterContext formatterContext =
new ScalarOperatorToExpr.FormatterContext(context.getColRefToExpr());
// materializedResultExprLists is actually useless, since all child is union-passthrough
// we add it just for pass check
List<List<Expr>> materializedResultExprLists = Lists.newArrayList();

for (int childIdx = 0; childIdx < optExpr.arity(); ++childIdx) {
Map<Integer, Integer> slotIdMap = new HashMap<>();
List<ColumnRefOperator> childOutput = mergeOperator.getChildOutputColumns().get(childIdx);
Expand All @@ -3989,8 +3997,16 @@ public PlanFragment visitPhysicalMerge(OptExpression optExpr, ExecPlan context)
}
outputSlotIdToChildSlotIdMaps.add(slotIdMap);
Preconditions.checkState(slotIdMap.size() == mergeOperator.getOutputColumnRefOp().size());

List<Expr> materializedExpressions = Lists.newArrayList();
for (ColumnRefOperator ref : childOutput) {
materializedExpressions.add(ScalarOperatorToExpr.buildExecExpression(ref, formatterContext));
}

materializedResultExprLists.add(materializedExpressions);
}
setOperationNode.setOutputSlotIdToChildSlotIdMaps(outputSlotIdToChildSlotIdMaps);
setOperationNode.setMaterializedResultExprLists_(materializedResultExprLists);

Preconditions.checkState(optExpr.getInputs().size() == mergeOperator.getChildOutputColumns().size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public void testSkewJoinV2() throws Exception {
" RESULT SINK");
// this is a normal union, which means its local exchanger is PASS_THROUGH
assertCContains(sqlPlan, "10:UNION\n" +
" | child exprs:\n" +
" | [2: v2, BIGINT, true] | [5: v5, BIGINT, true]\n" +
" | [2: v2, BIGINT, true] | [5: v5, BIGINT, true]\n" +
" | pass-through-operands: all");

// shuffle join is UNION's left child, with global runtime filter
Expand Down Expand Up @@ -151,6 +154,9 @@ public void testSkewJoinV2WithOneAgg() throws Exception {

// union's local exchange type is DIRECT
assertCContains(sqlPlan, " 11:UNION\n" +
" | child exprs:\n" +
" | [4: v4, BIGINT, true]\n" +
" | [4: v4, BIGINT, true]\n" +
" | pass-through-operands: all\n" +
" | local exchange type: DIRECT");

Expand Down Expand Up @@ -181,7 +187,10 @@ public void testSkewJoinV2WithTwoAgg() throws Exception {
assertNotContains(sqlPlan, "local exchange type");

// union's right child is broadcast join instead of exchange node
assertCContains(sqlPlan, " 10:UNION\n" +
assertCContains(sqlPlan, "10:UNION\n" +
" | child exprs:\n" +
" | [3: v3, BIGINT, true]\n" +
" | [3: v3, BIGINT, true]\n" +
" | pass-through-operands: all");
assertCContains(sqlPlan, "9:Project\n" +
" | | output columns:\n" +
Expand Down

0 comments on commit 604b9cd

Please sign in to comment.