Skip to content

Commit

Permalink
[Feature] Support Cluster Snapshot Backup: SQL Interface and meta dat…
Browse files Browse the repository at this point in the history
…a (part 1) (backport #54447) (#54505)

Signed-off-by: srlch <[email protected]>
  • Loading branch information
srlch authored Dec 31, 2024
1 parent 1ab0d83 commit 1b0a9a5
Show file tree
Hide file tree
Showing 18 changed files with 663 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.starrocks.persist.CancelDecommissionDiskInfo;
import com.starrocks.persist.CancelDisableDiskInfo;
import com.starrocks.persist.ChangeMaterializedViewRefreshSchemeLog;
import com.starrocks.persist.ClusterSnapshotLog;
import com.starrocks.persist.ColocatePersistInfo;
import com.starrocks.persist.ColumnRenameInfo;
import com.starrocks.persist.ConsistencyCheckInfo;
Expand Down Expand Up @@ -775,6 +776,10 @@ public void readFields(DataInput in) throws IOException {
data = DropWarehouseLog.read(in);
break;
}
case OperationType.OP_CLUSTER_SNAPSHOT_LOG: {
data = ClusterSnapshotLog.read(in);
break;
}
default: {
if (Config.metadata_ignore_unknown_operation_type) {
LOG.warn("UNKNOWN Operation Type {}", opCode);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.lake.snapshot;

import com.google.gson.annotations.SerializedName;
import com.starrocks.persist.ClusterSnapshotLog;
import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.metablock.SRMetaBlockEOFException;
import com.starrocks.persist.metablock.SRMetaBlockException;
import com.starrocks.persist.metablock.SRMetaBlockID;
import com.starrocks.persist.metablock.SRMetaBlockReader;
import com.starrocks.persist.metablock.SRMetaBlockWriter;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;

// only used for AUTOMATED snapshot for now
public class ClusterSnapshotMgr implements GsonPostProcessable {
public static final Logger LOG = LogManager.getLogger(ClusterSnapshotMgr.class);
public static final String AUTOMATED_NAME_PREFIX = "automated_cluster_snapshot";

@SerializedName(value = "automatedSnapshotSvName")
private String automatedSnapshotSvName = "";

public ClusterSnapshotMgr() {}

// Turn on automated snapshot, use stmt for extension in future
public void setAutomatedSnapshotOn(AdminSetAutomatedSnapshotOnStmt stmt) {
String storageVolumeName = stmt.getStorageVolumeName();
setAutomatedSnapshotOn(storageVolumeName);

ClusterSnapshotLog log = new ClusterSnapshotLog();
log.setCreateSnapshotNamePrefix(AUTOMATED_NAME_PREFIX, storageVolumeName);
GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log);
}

protected void setAutomatedSnapshotOn(String storageVolumeName) {
automatedSnapshotSvName = storageVolumeName;
}

public String getAutomatedSnapshotSvName() {
return automatedSnapshotSvName;
}

public boolean isAutomatedSnapshotOn() {
return automatedSnapshotSvName != null && !automatedSnapshotSvName.isEmpty();
}

// Turn off automated snapshot, use stmt for extension in future
public void setAutomatedSnapshotOff(AdminSetAutomatedSnapshotOffStmt stmt) {
setAutomatedSnapshotOff();

ClusterSnapshotLog log = new ClusterSnapshotLog();
log.setDropSnapshot(AUTOMATED_NAME_PREFIX);
GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log);
}

protected void setAutomatedSnapshotOff() {
// drop AUTOMATED snapshot
automatedSnapshotSvName = "";
}

public void replayLog(ClusterSnapshotLog log) {
ClusterSnapshotLog.ClusterSnapshotLogType logType = log.getType();
switch (logType) {
case CREATE_SNAPSHOT_PREFIX: {
String createSnapshotNamePrefix = log.getCreateSnapshotNamePrefix();
String storageVolumeName = log.getStorageVolumeName();
if (createSnapshotNamePrefix.equals(AUTOMATED_NAME_PREFIX)) {
setAutomatedSnapshotOn(storageVolumeName);
}
break;
}
case DROP_SNAPSHOT: {
String dropSnapshotName = log.getDropSnapshotName();
if (dropSnapshotName.equals(AUTOMATED_NAME_PREFIX)) {
setAutomatedSnapshotOff();
}
break;
}
default: {
LOG.warn("Invalid Cluster Snapshot Log Type {}", logType);
}
}
}

public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockException {
SRMetaBlockWriter writer = imageWriter.getBlockWriter(SRMetaBlockID.CLUSTER_SNAPSHOT_MGR, 1);
writer.writeJson(this);
writer.close();
}

public void load(SRMetaBlockReader reader)
throws SRMetaBlockEOFException, IOException, SRMetaBlockException {
ClusterSnapshotMgr data = reader.readJson(ClusterSnapshotMgr.class);
}

@Override
public void gsonPostProcess() throws IOException {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.persist;

import com.google.gson.annotations.SerializedName;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.persist.gson.GsonUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ClusterSnapshotLog implements Writable {
public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, DROP_SNAPSHOT }
@SerializedName(value = "type")
private ClusterSnapshotLogType type = ClusterSnapshotLogType.NONE;
// For CREATE_SNAPSHOT_PREFIX
@SerializedName(value = "createSnapshotNamePrefix")
private String createSnapshotNamePrefix = "";
@SerializedName(value = "storageVolumeName")
private String storageVolumeName = "";
// For DROP_SNAPSHOT
@SerializedName(value = "dropSnapshotName")
private String dropSnapshotName = "";

public ClusterSnapshotLog() {}

public void setCreateSnapshotNamePrefix(String createSnapshotNamePrefix, String storageVolumeName) {
this.type = ClusterSnapshotLogType.CREATE_SNAPSHOT_PREFIX;
this.createSnapshotNamePrefix = createSnapshotNamePrefix;
this.storageVolumeName = storageVolumeName;
}

public void setDropSnapshot(String dropSnapshotName) {
this.type = ClusterSnapshotLogType.DROP_SNAPSHOT;
this.dropSnapshotName = dropSnapshotName;
}

public ClusterSnapshotLogType getType() {
return type;
}

public String getCreateSnapshotNamePrefix() {
return this.createSnapshotNamePrefix;
}

public String getStorageVolumeName() {
return this.storageVolumeName;
}

public String getDropSnapshotName() {
return this.dropSnapshotName;
}

public static ClusterSnapshotLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ClusterSnapshotLog.class);
}

@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}
9 changes: 9 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,11 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal)
warehouseMgr.replayAlterWarehouse(wh);
break;
}
case OperationType.OP_CLUSTER_SNAPSHOT_LOG: {
ClusterSnapshotLog log = (ClusterSnapshotLog) journal.getData();
globalStateMgr.getClusterSnapshotMgr().replayLog(log);
break;
}
default: {
if (Config.metadata_ignore_unknown_operation_type) {
LOG.warn("UNKNOWN Operation Type {}", opCode);
Expand Down Expand Up @@ -1957,4 +1962,8 @@ public void logCancelDisableDisk(CancelDisableDiskInfo info) {
public void logRecoverPartitionVersion(PartitionVersionRecoveryInfo info) {
logEdit(OperationType.OP_RECOVER_PARTITION_VERSION, info);
}

public void logClusterSnapshotLog(ClusterSnapshotLog info) {
logEdit(OperationType.OP_CLUSTER_SNAPSHOT_LOG, info);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,9 @@ public class OperationType {
@IgnorableOnReplayFailed
public static final short OP_ADD_KEY = 13512;

@IgnorableOnReplayFailed
public static final short OP_CLUSTER_SNAPSHOT_LOG = 13513;

/**
* NOTICE: OperationType cannot use a value exceeding 20000, please follow the above sequence number
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public int getId() {

public static final SRMetaBlockID PIPE_MGR = new SRMetaBlockID(32);

public static final SRMetaBlockID CLUSTER_SNAPSHOT_MGR = new SRMetaBlockID(33);

/**
* NOTICE: SRMetaBlockID cannot use a value exceeding 20000, please follow the above sequence number
*/
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import com.starrocks.sql.ast.AdminCancelRepairTableStmt;
import com.starrocks.sql.ast.AdminCheckTabletsStmt;
import com.starrocks.sql.ast.AdminRepairTableStmt;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt;
import com.starrocks.sql.ast.AdminSetConfigStmt;
import com.starrocks.sql.ast.AdminSetPartitionVersionStmt;
import com.starrocks.sql.ast.AdminSetReplicaStatusStmt;
Expand Down Expand Up @@ -1174,6 +1176,24 @@ public ShowResultSet visitDropWarehouseStatement(DropWarehouseStmt stmt, Connect
});
return null;
}

@Override
public ShowResultSet visitAdminSetAutomatedSnapshotOnStatement(AdminSetAutomatedSnapshotOnStmt stmt,
ConnectContext context) {
ErrorReport.wrapWithRuntimeException(() -> {
context.getGlobalStateMgr().getClusterSnapshotMgr().setAutomatedSnapshotOn(stmt);
});
return null;
}

@Override
public ShowResultSet visitAdminSetAutomatedSnapshotOffStatement(AdminSetAutomatedSnapshotOffStmt stmt,
ConnectContext context) {
ErrorReport.wrapWithRuntimeException(() -> {
context.getGlobalStateMgr().getClusterSnapshotMgr().setAutomatedSnapshotOff(stmt);
});
return null;
}
}

}
11 changes: 11 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@
import com.starrocks.lake.StarOSAgent;
import com.starrocks.lake.compaction.CompactionControlScheduler;
import com.starrocks.lake.compaction.CompactionMgr;
import com.starrocks.lake.snapshot.ClusterSnapshotMgr;
import com.starrocks.lake.vacuum.AutovacuumDaemon;
import com.starrocks.leader.CheckpointController;
import com.starrocks.leader.TaskRunStateSynchronizer;
Expand Down Expand Up @@ -514,6 +515,8 @@ public class GlobalStateMgr {
private final ExecutorService queryDeployExecutor;
private final WarehouseIdleChecker warehouseIdleChecker;

private final ClusterSnapshotMgr clusterSnapshotMgr;

public NodeMgr getNodeMgr() {
return nodeMgr;
}
Expand Down Expand Up @@ -814,7 +817,10 @@ public void transferToNonLeader(FrontendNodeType newType) {
this.queryDeployExecutor =
ThreadPoolManager.newDaemonFixedThreadPool(Config.query_deploy_threadpool_size, Integer.MAX_VALUE,
"query-deploy", true);

this.warehouseIdleChecker = new WarehouseIdleChecker();

this.clusterSnapshotMgr = new ClusterSnapshotMgr();
}

public static void destroyCheckpoint() {
Expand Down Expand Up @@ -1057,6 +1063,10 @@ public GlobalConstraintManager getGlobalConstraintManager() {
return globalConstraintManager;
}

public ClusterSnapshotMgr getClusterSnapshotMgr() {
return clusterSnapshotMgr;
}

// Use tryLock to avoid potential deadlock
public boolean tryLock(boolean mustLock) {
while (true) {
Expand Down Expand Up @@ -1541,6 +1551,7 @@ public void loadImage(String imageDir) throws IOException {
.put(SRMetaBlockID.KEY_MGR, keyMgr::load)
.put(SRMetaBlockID.PIPE_MGR, pipeManager.getRepo()::load)
.put(SRMetaBlockID.WAREHOUSE_MGR, warehouseMgr::load)
.put(SRMetaBlockID.CLUSTER_SNAPSHOT_MGR, clusterSnapshotMgr::load)
.build();

Set<SRMetaBlockID> metaMgrMustExists = new HashSet<>(loadImages.keySet());
Expand Down
15 changes: 15 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.starrocks.sql.ast.AdminCancelRepairTableStmt;
import com.starrocks.sql.ast.AdminCheckTabletsStmt;
import com.starrocks.sql.ast.AdminRepairTableStmt;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt;
import com.starrocks.sql.ast.AdminSetConfigStmt;
import com.starrocks.sql.ast.AdminSetPartitionVersionStmt;
import com.starrocks.sql.ast.AdminSetReplicaStatusStmt;
Expand Down Expand Up @@ -592,6 +594,19 @@ public Void visitPauseRoutineLoadStatement(PauseRoutineLoadStmt statement, Conne
return null;
}

@Override
public Void visitAdminSetAutomatedSnapshotOnStatement(AdminSetAutomatedSnapshotOnStmt statement, ConnectContext context) {
ClusterSnapshotAnalyzer.analyze(statement, context);
return null;
}

@Override
public Void visitAdminSetAutomatedSnapshotOffStatement(AdminSetAutomatedSnapshotOffStmt statement,
ConnectContext context) {
ClusterSnapshotAnalyzer.analyze(statement, context);
return null;
}

// ---------------------------------------- Catalog Statement -------------------------------------------

@Override
Expand Down
Loading

0 comments on commit 1b0a9a5

Please sign in to comment.