Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: before-Sunrise <[email protected]>
  • Loading branch information
before-Sunrise committed Feb 7, 2025
1 parent 52691d3 commit eb2792e
Showing 1 changed file with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,7 @@ public PhysicalPropertySet visitPhysicalHashAggregate(PhysicalHashAggregateOpera
public PhysicalPropertySet visitPhysicalRepeat(PhysicalRepeatOperator node, ExpressionContext context) {
checkState(childrenOutputProperties.size() == 1);
PhysicalPropertySet childPropertySet = childrenOutputProperties.get(0);
//
List<ColumnRefOperator> subRefs = Lists.newArrayList(node.getRepeatColumnRef().get(0));
node.getRepeatColumnRef().forEach(subRefs::retainAll);
Set<ColumnRefOperator> allGroupingRefs = Sets.newHashSet();
Expand All @@ -358,19 +359,32 @@ public PhysicalPropertySet visitPhysicalRepeat(PhysicalRepeatOperator node, Expr
subRefs.forEach(allGroupingRefs::remove);

DistributionProperty childDistribution = childPropertySet.getDistributionProperty();
// update null distribution info to null relax for allGroupingRefs
if (!allGroupingRefs.isEmpty() && childDistribution.isShuffle()) {
HashDistributionSpec distributionSpec = (HashDistributionSpec) childDistribution.getSpec();
EquivalentDescriptor newEquivDesc = distributionSpec.getEquivDesc().copy();
newEquivDesc.clearNullStrictUnionFind();
HashDistributionSpec newDistributionSpec = distributionSpec.getNullRelaxSpec(newEquivDesc);
DistributionProperty newDistributionProperty = DistributionProperty.createProperty(
newDistributionSpec,
childPropertySet.getDistributionProperty().isCTERequired());
return new PhysicalPropertySet(newDistributionProperty, childPropertySet.getSortProperty(),
childPropertySet.getCteProperty());
// only if the Intersection of RepeatColumnRef is the superset of the childrenOutputProperties
// we can use childrenOutputProperties as RepeatNode's output property
// such as RepeatColumnRef is (cola,colb),(cola), and childrenOutputProperties is hash(cola)
// since cola won't be inserted with null value, it's safe to use childrenOutputProperties
// if RepeatColumnRef is (cola,colb),(cola), and childrenOutputProperties is hash(cola,colb)
// since cola will be inserted with null value, it's unsafe to use hash(cola,colb)
DistributionProperty outputDistribution = EmptyDistributionProperty.INSTANCE;
if (childDistribution.isShuffle()) {
boolean canFollowChild = true;
HashDistributionSpec childDistributionSpec = (HashDistributionSpec) childDistribution.getSpec();
Set<Integer> commonRefs = subRefs.stream().map(ColumnRefOperator::getId).collect(Collectors.toSet());

for (DistributionCol col : childDistributionSpec.getHashDistributionDesc().getDistributionCols()) {
if (!commonRefs.contains(col.getColId())) {
canFollowChild = false;
break;
}
}

if (canFollowChild) {
outputDistribution = childDistribution;
}
}
return childPropertySet;

return new PhysicalPropertySet(outputDistribution, childPropertySet.getSortProperty(),
childPropertySet.getCteProperty());
}

@Override
Expand Down

0 comments on commit eb2792e

Please sign in to comment.