Skip to content

Commit

Permalink
[Enhancement] Support only deleting shard meta from fe for shared-dat…
Browse files Browse the repository at this point in the history
…a cluster (#53141)

Signed-off-by: drake_wang <[email protected]>
(cherry picked from commit ffbd98b)
  • Loading branch information
wxl24life authored and mergify[bot] committed Dec 25, 2024
1 parent fb59c74 commit 0760fbe
Show file tree
Hide file tree
Showing 2 changed files with 262 additions and 40 deletions.
66 changes: 50 additions & 16 deletions fe/fe-core/src/main/java/com/starrocks/lake/StarMgrMetaSyncer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.staros.proto.ShardGroupInfo;
import com.staros.proto.ShardInfo;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
Expand All @@ -43,6 +44,7 @@
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TStatusCode;
import com.starrocks.warehouse.Warehouse;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -123,7 +125,6 @@ public static void dropTabletAndDeleteShard(List<Long> shardIds, StarOSAgent sta
DeleteTabletRequest request = new DeleteTabletRequest();
request.tabletIds = Lists.newArrayList(shards);

boolean forceDelete = Config.meta_sync_force_delete_shard_meta;
try {
LakeService lakeService = BrpcProxy.getLakeService(node.getHost(), node.getBrpcPort());
DeleteTabletResponse response = lakeService.deleteTablet(request).get();
Expand All @@ -132,7 +133,7 @@ public static void dropTabletAndDeleteShard(List<Long> shardIds, StarOSAgent sta
LOG.info("Fail to delete tablet. StatusCode: {}, failedTablets: {}", stCode, response.failedTablets);

// ignore INVALID_ARGUMENT error, treat it as success
if (stCode != TStatusCode.INVALID_ARGUMENT && !forceDelete) {
if (stCode != TStatusCode.INVALID_ARGUMENT) {
response.failedTablets.forEach(shards::remove);
}
}
Expand All @@ -141,9 +142,6 @@ public static void dropTabletAndDeleteShard(List<Long> shardIds, StarOSAgent sta
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (!forceDelete) {
continue;
}
}

// 2. delete shard
Expand All @@ -153,7 +151,6 @@ public static void dropTabletAndDeleteShard(List<Long> shardIds, StarOSAgent sta
}
} catch (DdlException e) {
LOG.warn("failed to delete shard from starMgr");
continue;
}
}
}
Expand Down Expand Up @@ -198,16 +195,7 @@ private void deleteUnusedShardAndShardGroup() {
List<Long> emptyShardGroup = new ArrayList<>();
for (long groupId : diffList) {
if (Config.shard_group_clean_threshold_sec * 1000L + Long.parseLong(groupToCreateTimeMap.get(groupId)) < nowMs) {
try {
List<Long> shardIds = starOSAgent.listShard(groupId);
if (shardIds.isEmpty()) {
emptyShardGroup.add(groupId);
} else {
dropTabletAndDeleteShard(shardIds, starOSAgent);
}
} catch (Exception e) {
continue;
}
cleanOneGroup(groupId, starOSAgent, emptyShardGroup);
}
}

Expand All @@ -217,6 +205,52 @@ private void deleteUnusedShardAndShardGroup() {
}
}

private void cleanOneGroup(long groupId, StarOSAgent starOSAgent, List<Long> emptyShardGroup) {
try {
List<Long> shardIds = starOSAgent.listShard(groupId);
if (shardIds.isEmpty()) {
emptyShardGroup.add(groupId);
return;
}
// delete shard from star manager only, not considering tablet data on be/cn
if (Config.meta_sync_force_delete_shard_meta) {
forceDeleteShards(groupId, starOSAgent, shardIds);
} else {
// drop meta and data
long start = System.currentTimeMillis();
dropTabletAndDeleteShard(shardIds, starOSAgent);
LOG.debug("delete shards from starMgr and FE, shard group: {}, cost: {} ms",
groupId, (System.currentTimeMillis() - start));
}
} catch (Exception e) {
LOG.warn("delete shards from starMgr and FE failed, shard group: {}, {}", groupId, e.getMessage());
}
}

private static void forceDeleteShards(long groupId, StarOSAgent starOSAgent, List<Long> shardIds)
throws DdlException {
LOG.debug("delete shards from starMgr only, shard group: {}", groupId);
// before deleting shardIds, let's record the root directory of this shard group first
// root directory has the format like `s3://bucket/xx/db15570/15648/15944`
String rootDirectory = null;
long shardId = shardIds.get(0);
try {
// all shards have the same root directory
ShardInfo shardInfo = starOSAgent.getShardInfo(shardId, StarOSAgent.DEFAULT_WORKER_GROUP_ID);
if (shardInfo != null) {
rootDirectory = shardInfo.getFilePath().getFullPath();
}
} catch (Exception e) {
LOG.warn("failed to get shard root directory from starMgr, shard id: {}, group id: {}, {}", shardId,
groupId, e.getMessage());
}
starOSAgent.deleteShards(new HashSet<>(shardIds));
if (StringUtils.isNotEmpty(rootDirectory)) {
LOG.info("shard group {} deleted from starMgr only, you may need to delete remote file path manually," +
" file path is: {}", groupId, rootDirectory);
}
}

// get snapshot of star mgr workers and fe backend/compute node,
// if worker not found in backend/compute node, remove it from star mgr
public int deleteUnusedWorker() {
Expand Down
Loading

0 comments on commit 0760fbe

Please sign in to comment.