diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java b/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java index bfc1745c51f9a..03675c52682e4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java @@ -845,7 +845,6 @@ private Set dropPartition(long dbId, String partitionName, boolean isForce // drop partition info rangePartitionInfo.dropPartition(partition.getId()); - GlobalStateMgr.getCurrentAnalyzeMgr().dropPartition(partition.getId()); } return tabletIds; } diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 010e6e77e5df2..010863077c658 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -1542,6 +1542,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static boolean statistic_check_expire_partition = true; + /** + * Clear stale partition statistics data job work interval + */ + @ConfField(mutable = true) + public static long clear_stale_stats_interval_sec = 12 * 60 * 60L; // 12 hour + /** * The collect thread work interval */ diff --git a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java index f3b8f69a6a936..508fed2d63eca 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java @@ -1495,8 +1495,8 @@ public void dropPartition(Database db, Table table, DropPartitionClause clause) if (isTempPartition) { olapTable.dropTempPartition(partitionName, true); } else { + Partition partition = olapTable.getPartition(partitionName); if (!clause.isForceDrop()) { - Partition partition = olapTable.getPartition(partitionName); if (partition != null) { if (stateMgr.getGlobalTransactionMgr() .existCommittedTxns(db.getId(), olapTable.getId(), partition.getId())) { @@ -1508,6 +1508,9 @@ public void dropPartition(Database db, Table table, DropPartitionClause clause) } } } + if (partition != null) { + GlobalStateMgr.getCurrentState().getAnalyzeManager().recordDropPartition(partition.getId()); + } tabletIdSet = olapTable.dropPartition(db.getId(), partitionName, clause.isForceDrop()); if (olapTable instanceof MaterializedView) { @@ -4640,12 +4643,13 @@ public void truncateTable(TruncateTableStmt truncateTableStmt) throws DdlExcepti if (partition == null) { throw new DdlException("Partition " + partName + " does not exist"); } - origPartitions.put(partName, partition); + GlobalStateMgr.getCurrentState().getAnalyzeManager().recordDropPartition(partition.getId()); } } else { for (Partition partition : olapTable.getPartitions()) { origPartitions.put(partition.getName(), partition); + GlobalStateMgr.getCurrentState().getAnalyzeManager().recordDropPartition(partition.getId()); } } @@ -4973,7 +4977,9 @@ public void replaceTempPartition(Database db, String tableName, ReplacePartition throw new DdlException("Temp partition[" + partName + "] does not exist"); } } - + partitionNames.stream().forEach(e -> + GlobalStateMgr.getCurrentState().getAnalyzeManager() + .recordDropPartition(olapTable.getPartition(e).getId())); olapTable.replaceTempPartitions(partitionNames, tempPartitionNames, isStrictRange, useTempPartitionName); // write log diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeJob.java b/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeJob.java index 4ec1c0f137fa4..5c274863aee57 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeJob.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeJob.java @@ -147,7 +147,7 @@ public void run(ConnectContext statsConnectContext, StatisticExecutor statisticE } if (!hasFailedCollectJob) { - setStatus(StatsConstants.ScheduleStatus.PENDING); + setStatus(ScheduleStatus.FINISH); setWorkTime(LocalDateTime.now()); GlobalStateMgr.getCurrentAnalyzeMgr().updateAnalyzeJobWithLog(this); } diff --git a/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeManager.java b/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeManager.java index f78f397f3f416..38ea0759780e4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/statistic/AnalyzeManager.java @@ -34,6 +34,7 @@ import java.io.DataOutputStream; import java.io.EOFException; import java.io.IOException; +import java.time.Duration; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.HashMap; @@ -63,6 +64,8 @@ public class AnalyzeManager implements Writable { private final Set dropPartitionIds = new ConcurrentSkipListSet<>(); private final List> checkTableIds = Lists.newArrayList(CHECK_ALL_TABLES); + private LocalDateTime lastCleanTime; + public AnalyzeManager() { analyzeJobMap = Maps.newConcurrentMap(); analyzeStatusMap = Maps.newConcurrentMap(); @@ -250,12 +253,13 @@ public void clearStatisticFromDroppedTable() { dropHistogramStatsMetaAndData(statsConnectCtx, tableIdHasDeleted); } - public void dropPartition(long partitionId) { + public void recordDropPartition(long partitionId) { dropPartitionIds.add(partitionId); } public void clearStatisticFromDroppedPartition() { - checkAndDropPartitionStatistics(); + clearStaleStatsWhenStarted(); + clearStalePartitionStats(); dropPartitionStatistics(); } @@ -277,7 +281,70 @@ private void dropPartitionStatistics() { } } - private void checkAndDropPartitionStatistics() { + private void clearStalePartitionStats() { + // It means FE is restarted, the previous step had cleared the stats. + if (lastCleanTime == null) { + lastCleanTime = LocalDateTime.now(); + return; + } + + // do the clear task once every 12 hours. + if (Duration.between(lastCleanTime, LocalDateTime.now()).toMinutes() * 60 < Config.clear_stale_stats_interval_sec) { + return; + } + + List tables = Lists.newArrayList(); + LocalDateTime workTime = LocalDateTime.now(); + for (Map.Entry entry : analyzeStatusMap.entrySet()) { + AnalyzeStatus analyzeStatus = entry.getValue(); + LocalDateTime endTime = analyzeStatus.getEndTime(); + // After the last cleanup, if a table has successfully undergone a statistics collection, + // and the collection completion time is after the last cleanup time, + // then during the next cleanup process, the stale column statistics would be cleared. + if (analyzeStatus.getStatus() == StatsConstants.ScheduleStatus.FINISH + && Duration.between(endTime, lastCleanTime).toMinutes() < 30) { + Database db = GlobalStateMgr.getCurrentState().getDb(analyzeStatus.getDbId()); + if (db != null && db.getTable(analyzeStatus.getTableId()) != null) { + tables.add(db.getTable(analyzeStatus.getTableId())); + } + } + } + + if (tables.isEmpty()) { + lastCleanTime = workTime; + } + + List tableIds = Lists.newArrayList(); + List partitionIds = Lists.newArrayList(); + int exprLimit = Config.expr_children_limit / 2; + for (Table table : tables) { + List pids = table.getPartitions().stream().map(Partition::getId).collect(Collectors.toList()); + if (pids.size() > exprLimit) { + tableIds.clear(); + partitionIds.clear(); + tableIds.add(table.getId()); + partitionIds.addAll(pids); + break; + } else if ((tableIds.size() + partitionIds.size() + pids.size()) > exprLimit) { + break; + } + tableIds.add(table.getId()); + partitionIds.addAll(pids); + } + + ConnectContext statsConnectCtx = StatisticUtils.buildConnectContext(); + statsConnectCtx.setStatisticsConnection(true); + statsConnectCtx.setThreadLocalInfo(); + StatisticExecutor executor = new StatisticExecutor(); + statsConnectCtx.getSessionVariable().setExprChildrenLimit(partitionIds.size() * 3); + boolean res = executor.dropTableInvalidPartitionStatistics(statsConnectCtx, tableIds, partitionIds); + if (!res) { + LOG.debug("failed to clean stale column statistics before time: {}", lastCleanTime); + } + lastCleanTime = LocalDateTime.now(); + } + + private void clearStaleStatsWhenStarted() { if (!Config.statistic_check_expire_partition || checkTableIds.isEmpty()) { return; } diff --git a/test/sql/test_refresh_statistics/R/test_clear_stats b/test/sql/test_refresh_statistics/R/test_clear_stats new file mode 100644 index 0000000000000..84c164d542c89 --- /dev/null +++ b/test/sql/test_refresh_statistics/R/test_clear_stats @@ -0,0 +1,81 @@ +-- name: test_clear_stats +CREATE DATABASE test_clear_stats; +-- result: +-- !result +use test_clear_stats; +-- result: +-- !result +CREATE TABLE tbl ( + `c1` int(11) NOT NULL COMMENT " ", + `c2` int(11) NULL COMMENT " " +) ENGINE=OLAP +PRIMARY KEY(`c1`) +COMMENT " " +DISTRIBUTED BY HASH(`c1`) +PROPERTIES ( +"replication_num" = "1", +"in_memory" = "false", +"storage_format" = "DEFAULT", +"enable_persistent_index" = "false", +"compression" = "LZ4" +); +-- result: +-- !result +insert into tbl values (1, 1); +-- result: +-- !result +analyze table test_clear_stats.tbl WITH SYNC MODE; +-- result: +test_clear_stats.tbl analyze status OK +-- !result +insert into _statistics_.column_statistics +select +table_id, 1 , column_name, db_id, table_name, partition_name, row_count, data_size, ndv, null_count,max, min, '2024-05-01' +from _statistics_.column_statistics where table_name = 'test_clear_stats.tbl' limit 1; +-- result: +-- !result +select sleep(2); +-- result: +1 +-- !result +select count(*) > 2 from _statistics_.column_statistics where table_name = 'test_clear_stats.tbl'; +-- result: +1 +-- !result +ADMIN SET FRONTEND CONFIG ("clear_stale_stats_interval_sec" = "20"); +-- result: +-- !result +ADMIN SET FRONTEND CONFIG ("statistic_manager_sleep_time_sec" = "2"); +-- result: +-- !result +insert into tbl values (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9); +-- result: +-- !result +analyze table test_clear_stats.tbl WITH SYNC MODE; +-- result: +test_clear_stats.tbl analyze status OK +-- !result +truncate table tbl; +-- result: +-- !result +insert into tbl values (1, 1), (2, 2), (3, 3), (4, 4); +-- result: +-- !result +analyze table test_clear_stats.tbl WITH SYNC MODE; +-- result: +test_clear_stats.tbl analyze status OK +-- !result +select sleep(80); +-- result: +1 +-- !result +select count(*) <= 3 from _statistics_.column_statistics where table_name = 'test_clear_stats.tbl'; +-- result: +1 +-- !result +ADMIN SET FRONTEND CONFIG ("clear_stale_stats_interval_sec" = "43200"); +-- result: +-- !result +ADMIN SET FRONTEND CONFIG ("statistic_manager_sleep_time_sec" = "60"); +-- result: +-- !result \ No newline at end of file diff --git a/test/sql/test_refresh_statistics/T/test_clear_stats b/test/sql/test_refresh_statistics/T/test_clear_stats new file mode 100644 index 0000000000000..bc2a4a77c728c --- /dev/null +++ b/test/sql/test_refresh_statistics/T/test_clear_stats @@ -0,0 +1,43 @@ +-- name: test_clear_stats +CREATE DATABASE test_clear_stats; +use test_clear_stats; +CREATE TABLE tbl ( + `c1` int(11) NOT NULL COMMENT " ", + `c2` int(11) NULL COMMENT " " +) ENGINE=OLAP +PRIMARY KEY(`c1`) +COMMENT " " +DISTRIBUTED BY HASH(`c1`) +PROPERTIES ( +"replication_num" = "1", +"in_memory" = "false", +"storage_format" = "DEFAULT", +"enable_persistent_index" = "false", +"compression" = "LZ4" +); + +insert into tbl values (1, 1); +analyze table test_clear_stats.tbl WITH SYNC MODE; +insert into _statistics_.column_statistics +select +table_id, 1 , column_name, db_id, table_name, partition_name, row_count, data_size, ndv, null_count,max, min, '2024-05-01' +from _statistics_.column_statistics where table_name = 'test_clear_stats.tbl' limit 1; +select sleep(2); +select count(*) > 2 from _statistics_.column_statistics where table_name = 'test_clear_stats.tbl'; + +ADMIN SET FRONTEND CONFIG ("clear_stale_stats_interval_sec" = "20"); +ADMIN SET FRONTEND CONFIG ("statistic_manager_sleep_time_sec" = "2"); + +insert into tbl values (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9); +analyze table test_clear_stats.tbl WITH SYNC MODE; +truncate table tbl; +insert into tbl values (1, 1), (2, 2), (3, 3), (4, 4); +analyze table test_clear_stats.tbl WITH SYNC MODE; +select sleep(80); +select count(*) <= 3 from _statistics_.column_statistics where table_name = 'test_clear_stats.tbl'; +ADMIN SET FRONTEND CONFIG ("clear_stale_stats_interval_sec" = "43200"); +ADMIN SET FRONTEND CONFIG ("statistic_manager_sleep_time_sec" = "60"); + + + +