Skip to content

Commit

Permalink
[Enhancement] enhance delete pruning for all kinds of partitions (#55400
Browse files Browse the repository at this point in the history
)

Signed-off-by: Murphy <[email protected]>
(cherry picked from commit 4ae63bd)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java
#	fe/fe-core/src/test/java/com/starrocks/load/DeletePruneTest.java
  • Loading branch information
murphyatwork authored and mergify[bot] committed Jan 26, 2025
1 parent 9738e2a commit 30af703
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 43 deletions.
102 changes: 59 additions & 43 deletions fe/fe-core/src/main/java/com/starrocks/load/DeleteMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,28 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.BinaryPredicate;
import com.starrocks.analysis.BinaryType;
import com.starrocks.analysis.DateLiteral;
import com.starrocks.analysis.DecimalLiteral;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.FunctionCallExpr;
import com.starrocks.analysis.InPredicate;
import com.starrocks.analysis.IsNullPredicate;
import com.starrocks.analysis.LiteralExpr;
import com.starrocks.analysis.NullLiteral;
import com.starrocks.analysis.Predicate;
import com.starrocks.analysis.SlotRef;
import com.starrocks.analysis.StringLiteral;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.ExpressionRangePartitionInfoV2;
import com.starrocks.catalog.FunctionSet;
import com.starrocks.catalog.KeysType;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.MaterializedIndexMeta;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.PartitionType;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Type;
import com.starrocks.common.AnalysisException;
Expand All @@ -84,7 +73,6 @@
import com.starrocks.common.Pair;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.ListComparator;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.lake.delete.LakeDeleteJob;
Expand All @@ -95,19 +83,35 @@
import com.starrocks.persist.metablock.SRMetaBlockID;
import com.starrocks.persist.metablock.SRMetaBlockReader;
import com.starrocks.persist.metablock.SRMetaBlockWriter;
<<<<<<< HEAD
import com.starrocks.planner.PartitionColumnFilter;
import com.starrocks.planner.RangePartitionPruner;
=======
import com.starrocks.qe.ConnectContext;
>>>>>>> 4ae63bdce7 ([Enhancement] enhance delete pruning for all kinds of partitions (#55400))
import com.starrocks.qe.QueryState;
import com.starrocks.qe.QueryStateException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.service.FrontendOptions;
import com.starrocks.sql.StatementPlanner;
import com.starrocks.sql.analyzer.Analyzer;
import com.starrocks.sql.analyzer.DeleteAnalyzer;
import com.starrocks.sql.ast.DeleteStmt;
<<<<<<< HEAD
import com.starrocks.transaction.BeginTransactionException;
=======
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.optimizer.Utils;
import com.starrocks.sql.optimizer.operator.physical.PhysicalOlapScanOperator;
import com.starrocks.sql.parser.SqlParser;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.transaction.RunningTxnExceedException;
>>>>>>> 4ae63bdce7 ([Enhancement] enhance delete pruning for all kinds of partitions (#55400))
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionState.TxnCoordinator;
import com.starrocks.transaction.TransactionState.TxnSourceType;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -116,11 +120,6 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatterBuilder;
import java.time.format.ResolverStyle;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -241,28 +240,11 @@ private DeleteJob createJob(DeleteStmt stmt, List<Predicate> conditions, Databas
Preconditions.checkState(partitionNames != null);
boolean noPartitionSpecified = partitionNames.isEmpty();
if (noPartitionSpecified) {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
if (partitionInfo.isRangePartition()) {
partitionNames = extractPartitionNamesByCondition(olapTable, conditions);
if (partitionNames.isEmpty()) {
LOG.info("The delete statement [{}] prunes all partitions",
stmt.getOrigStmt().originStmt);
return null;
}
} else if (partitionInfo.getType() == PartitionType.UNPARTITIONED) {
// this is a unpartitioned table, use table name as partition name
partitionNames.add(olapTable.getName());
} else if (partitionInfo.getType() == PartitionType.LIST) {
// TODO: support list partition prune
ListPartitionInfo listPartitionInfo = (ListPartitionInfo) partitionInfo;
List<Long> partitionIds = listPartitionInfo.getPartitionIds(false);
if (partitionIds.isEmpty()) {
return null;
}
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
partitionNames.add(partition.getName());
}
partitionNames = partitionPruneForDelete(stmt, olapTable);

if (partitionNames.isEmpty()) {
LOG.info("The delete statement [{}] prunes all partitions", stmt.getOrigStmt().originStmt);
return null;
}
}

Expand Down Expand Up @@ -316,11 +298,11 @@ private DeleteJob createJob(DeleteStmt stmt, List<Predicate> conditions, Databas
}

@VisibleForTesting
public List<String> extractPartitionNamesByCondition(DeleteStmt stmt, OlapTable olapTable)
throws DdlException, AnalysisException {
return extractPartitionNamesByCondition(olapTable, stmt.getDeleteConditions());
public List<String> extractPartitionNamesByCondition(DeleteStmt stmt, OlapTable olapTable) throws DdlException {
return partitionPruneForDelete(stmt, olapTable);
}

<<<<<<< HEAD
public List<String> extractPartitionNamesByCondition(OlapTable olapTable, List<Predicate> conditions)
throws DdlException, AnalysisException {
List<String> partitionNames = Lists.newArrayList();
Expand All @@ -343,8 +325,35 @@ public List<String> extractPartitionNamesByCondition(OlapTable olapTable, List<P
Partition partition = olapTable.getPartition(partitionId);
partitionNames.add(partition.getName());
}
=======
/**
* Construct a fake sql then leverage the optimizer to prune partitions
*
* @return pruned partitions with delete conditions
*/
private List<String> partitionPruneForDelete(DeleteStmt stmt, OlapTable table) {
String tableName = stmt.getTableName().toSql();
String predicate = stmt.getWherePredicate().toSql();
String fakeSql = String.format("SELECT * FROM %s WHERE %s", tableName, predicate);
PhysicalOlapScanOperator physicalOlapScanOperator;
try {
List<StatementBase> parse = SqlParser.parse(fakeSql, ConnectContext.get().getSessionVariable());
StatementBase selectStmt = parse.get(0);
Analyzer.analyze(selectStmt, ConnectContext.get());
ExecPlan plan = StatementPlanner.plan(selectStmt, ConnectContext.get());
List<PhysicalOlapScanOperator> physicalOlapScanOperators =
Utils.extractPhysicalOlapScanOperator(plan.getPhysicalPlan());
// it's supposed to be empty set
if (CollectionUtils.isEmpty(physicalOlapScanOperators)) {
return Lists.newArrayList();
>>>>>>> 4ae63bdce7 ([Enhancement] enhance delete pruning for all kinds of partitions (#55400))
}
physicalOlapScanOperator = physicalOlapScanOperators.get(0);
} catch (Exception e) {
LOG.warn("failed to do partition pruning for delete {}", stmt.toString(), e);
return Lists.newArrayList(table.getVisiblePartitionNames());
}
<<<<<<< HEAD
return partitionNames;
}

Expand Down Expand Up @@ -450,6 +459,13 @@ private Map<String, PartitionColumnFilter> extractColumnFilter(Table table, Rang

}
return columnFilters;
=======
List<Long> selectedPartitionId = physicalOlapScanOperator.getSelectedPartitionId();
return ListUtils.emptyIfNull(selectedPartitionId)
.stream()
.map(x -> table.getPartition(x).getName())
.collect(Collectors.toList());
>>>>>>> 4ae63bdce7 ([Enhancement] enhance delete pruning for all kinds of partitions (#55400))
}

/**
Expand Down
95 changes: 95 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/load/DeletePruneTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@

package com.starrocks.load;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.FeConstants;
import com.starrocks.qe.ConnectContext;
import com.starrocks.sql.ast.DeleteStmt;
import com.starrocks.utframe.StarRocksAssert;
Expand All @@ -41,6 +45,7 @@ public static void beforeClass() throws Exception {
deleteHandler = new DeleteMgr();

starRocksAssert.withDatabase("test").useDatabase("test")
<<<<<<< HEAD
.withTable("CREATE TABLE `test_delete` (\n" +
" `k1` date NULL COMMENT \"\",\n" +
" `k2` datetime NULL COMMENT \"\",\n" +
Expand All @@ -66,6 +71,33 @@ public static void beforeClass() throws Exception {
"\"replication_num\" = \"1\",\n" +
"\"in_memory\" = \"false\"\n" +
");")
=======
.withTable("CREATE TABLE `test_delete` (\n" +
" `k1` date NULL COMMENT \"\",\n" +
" `k2` datetime NULL COMMENT \"\",\n" +
" `k3` char(20) NULL COMMENT \"\",\n" +
" `k4` varchar(20) NULL COMMENT \"\",\n" +
" `k5` boolean NULL COMMENT \"\",\n" +
" `k6` tinyint(4) NULL COMMENT \"\",\n" +
" `k7` smallint(6) NULL COMMENT \"\",\n" +
" `k8` int(11) NULL COMMENT \"\",\n" +
" `k9` bigint(20) NULL COMMENT \"\",\n" +
" `k10` largeint(40) NULL COMMENT \"\",\n" +
" `k11` float NULL COMMENT \"\",\n" +
" `k12` double NULL COMMENT \"\",\n" +
" `k13` decimal128(27, 9) NULL COMMENT \"\"\n" +
") ENGINE=OLAP \n" +
"DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`, `k5`)\n" +
"COMMENT \"OLAP\"\n" +
"PARTITION BY RANGE(`k1`) (\n" +
" START (\"2020-01-01\") END (\"2021-01-01\") EVERY (INTERVAL 1 day)\n" +
")\n" +
"DISTRIBUTED BY HASH(`k1`, `k2`, `k3`) BUCKETS 3 \n" +
"PROPERTIES (\n" +
"\"replication_num\" = \"1\",\n" +
"\"in_memory\" = \"false\"\n" +
");")
>>>>>>> 4ae63bdce7 ([Enhancement] enhance delete pruning for all kinds of partitions (#55400))
.withTable("CREATE TABLE `test_delete2` (\n" +
" `date` date NULL COMMENT \"\",\n" +
" `id` int(11) NULL COMMENT \"\",\n" +
Expand All @@ -82,7 +114,27 @@ public static void beforeClass() throws Exception {
"DISTRIBUTED BY HASH(`date`, `id`) BUCKETS 3 \n" +
"PROPERTIES (\n" +
"\"replication_num\" = \"1\"\n" +
<<<<<<< HEAD
");");
=======
");")
.withTable("CREATE TABLE `test_delete3` (" +
" `date` date NULL," +
" c1 int NULL" +
") PARTITION BY (`date`)" +
" PROPERTIES ('replication_num'='1') ")
.withTable("CREATE TABLE `test_delete4` (" +
" `date` date NULL," +
" c1 int NULL) " +
" PROPERTIES ('replication_num'='1') ");
UtFrameUtils.mockDML();
starRocksAssert.getCtx().executeSql("insert into test_delete3 values('2020-01-01', 1), ('2020-01-02', 2)");
starRocksAssert.getCtx()
.executeSql("alter table test_delete3 add partition p20200101 values in ('2020-01-01')");
starRocksAssert.getCtx()
.executeSql("alter table test_delete3 add partition p20200102 values in ('2020-01-02')");
FeConstants.runningUnitTest = true;
>>>>>>> 4ae63bdce7 ([Enhancement] enhance delete pruning for all kinds of partitions (#55400))
}

@Test
Expand Down Expand Up @@ -180,4 +232,47 @@ public void testDeletePruneMultiPartition() throws Exception {
Assert.assertEquals(3, res.size());
}

@Test
public void testDeletePruneListPartition() throws Exception {
ConnectContext ctx = starRocksAssert.getCtx();
OlapTable tbl = (OlapTable) starRocksAssert.getTable("test", "test_delete3");
Assert.assertEquals(Sets.newHashSet("p20200101", "p20200102"), tbl.getVisiblePartitionNames());

// delete one partition
String deleteSQL = "delete from test_delete3 where date in ('2020-01-01') ";
DeleteStmt deleteStmt = (DeleteStmt) UtFrameUtils.parseStmtWithNewParser(deleteSQL, ctx);
List<String> res = deleteHandler.extractPartitionNamesByCondition(deleteStmt, tbl);
Assert.assertEquals(1, res.size());
Assert.assertEquals(res.get(0), "p20200101");

// delete two partitions
deleteSQL = "delete from test_delete3 where date in ('2020-01-01', '2020-01-02') ";
deleteStmt = (DeleteStmt) UtFrameUtils.parseStmtWithNewParser(deleteSQL, ctx);
res = deleteHandler.extractPartitionNamesByCondition(deleteStmt, tbl);
Assert.assertEquals(Lists.newArrayList("p20200101", "p20200102"), res);

// exceptional
deleteSQL = "delete from test_delete3 where date in ('2020-01-01') ";
deleteStmt = (DeleteStmt) UtFrameUtils.parseStmtWithNewParser(deleteSQL, ctx);
DeleteStmt exceptionStmt = new DeleteStmt(TableName.fromString("not_exists"), deleteStmt.getPartitionNames(),
deleteStmt.getWherePredicate());
exceptionStmt.setDeleteConditions(deleteStmt.getDeleteConditions());
res = deleteHandler.extractPartitionNamesByCondition(exceptionStmt, tbl);
Assert.assertEquals(Lists.newArrayList("p20200102", "p20200101"), res);
}

@Test
public void testDeleteUnPartitionTable() throws Exception {
ConnectContext ctx = starRocksAssert.getCtx();
OlapTable tbl = (OlapTable) starRocksAssert.getTable("test", "test_delete4");
Assert.assertEquals(Sets.newHashSet("test_delete4"), tbl.getVisiblePartitionNames());

// delete one partition
String deleteSQL = "delete from test_delete4 where date in ('2020-01-01') ";
DeleteStmt deleteStmt = (DeleteStmt) UtFrameUtils.parseStmtWithNewParser(deleteSQL, ctx);
List<String> res = deleteHandler.extractPartitionNamesByCondition(deleteStmt, tbl);
Assert.assertEquals(1, res.size());
Assert.assertEquals(res.get(0), "test_delete4");
}

}

0 comments on commit 30af703

Please sign in to comment.