From 94b2e670262486f4d597d44b34c7b77a1b8e9c6e Mon Sep 17 00:00:00 2001 From: srlch Date: Mon, 30 Dec 2024 15:40:10 +0800 Subject: [PATCH 1/2] [Feature] Support Cluster Snapshot Backup: SQL Interface and meta data (part 1) Signed-off-by: srlch --- .../com/starrocks/journal/JournalEntity.java | 5 + .../lake/snapshot/ClusterSnapshotMgr.java | 332 +++++++++ .../starrocks/persist/ClusterSnapshotLog.java | 104 +++ .../java/com/starrocks/persist/EditLog.java | 9 + .../com/starrocks/persist/OperationType.java | 3 + .../persist/metablock/SRMetaBlockID.java | 2 + .../com/starrocks/qe/DDLStmtExecutor.java | 20 + .../com/starrocks/server/GlobalStateMgr.java | 11 + .../com/starrocks/sql/analyzer/Analyzer.java | 15 + .../sql/analyzer/ClusterSnapshotAnalyzer.java | 62 ++ .../ast/AdminSetAutomatedSnapshotOffStmt.java | 33 + .../ast/AdminSetAutomatedSnapshotOnStmt.java | 40 ++ .../com/starrocks/sql/ast/AstVisitor.java | 8 + .../com/starrocks/sql/parser/AstBuilder.java | 19 + .../com/starrocks/sql/parser/StarRocks.g4 | 16 +- .../com/starrocks/sql/parser/StarRocksLex.g4 | 3 + .../starrocks/lake/ClusterSnapshotTest.java | 638 ++++++++++++++++++ .../starrocks/persist/OperationTypeTest.java | 1 + 18 files changed, 1318 insertions(+), 3 deletions(-) create mode 100644 fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ClusterSnapshotAnalyzer.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/ast/AdminSetAutomatedSnapshotOffStmt.java create mode 100644 fe/fe-core/src/main/java/com/starrocks/sql/ast/AdminSetAutomatedSnapshotOnStmt.java create mode 100644 fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java diff --git a/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java b/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java index 168af2944afcb..5a75285881c82 100644 --- a/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java +++ b/fe/fe-core/src/main/java/com/starrocks/journal/JournalEntity.java @@ -76,6 +76,7 @@ import com.starrocks.persist.CancelDecommissionDiskInfo; import com.starrocks.persist.CancelDisableDiskInfo; import com.starrocks.persist.ChangeMaterializedViewRefreshSchemeLog; +import com.starrocks.persist.ClusterSnapshotLog; import com.starrocks.persist.ColocatePersistInfo; import com.starrocks.persist.ColumnRenameInfo; import com.starrocks.persist.ConsistencyCheckInfo; @@ -775,6 +776,10 @@ public void readFields(DataInput in) throws IOException { data = DropWarehouseLog.read(in); break; } + case OperationType.OP_CLUSTER_SNAPSHOT_LOG: { + data = ClusterSnapshotLog.read(in); + break; + } default: { if (Config.metadata_ignore_unknown_operation_type) { LOG.warn("UNKNOWN Operation Type {}", opCode); diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java new file mode 100644 index 0000000000000..d6ba74221b693 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java @@ -0,0 +1,332 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.lake.snapshot; + +import com.google.gson.annotations.SerializedName; +import com.starrocks.backup.BlobStorage; +import com.starrocks.backup.Status; +import com.starrocks.common.Config; +import com.starrocks.ha.FrontendNodeType; +import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState; +import com.starrocks.persist.ClusterSnapshotLog; +import com.starrocks.persist.ImageWriter; +import com.starrocks.persist.Storage; +import com.starrocks.persist.gson.GsonPostProcessable; +import com.starrocks.persist.metablock.SRMetaBlockEOFException; +import com.starrocks.persist.metablock.SRMetaBlockException; +import com.starrocks.persist.metablock.SRMetaBlockID; +import com.starrocks.persist.metablock.SRMetaBlockReader; +import com.starrocks.persist.metablock.SRMetaBlockWriter; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt; +import com.starrocks.storagevolume.StorageVolume; +import com.starrocks.thrift.TClusterSnapshotJobsResponse; +import com.starrocks.thrift.TClusterSnapshotsResponse; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.TreeMap; + +// only used for AUTOMATED snapshot for now +public class ClusterSnapshotMgr implements GsonPostProcessable { + public static final Logger LOG = LogManager.getLogger(ClusterSnapshotMgr.class); + public static final String AUTOMATED_NAME_PREFIX = "automated_cluster_snapshot"; + private static final String UPLOAD_SUB_PATH = "/meta/image/"; + + @SerializedName(value = "automatedSnapshotSvName") + private String automatedSnapshotSvName = ""; + @SerializedName(value = "automatedSnapshot") + private ClusterSnapshot automatedSnapshot = null; + @SerializedName(value = "historyAutomatedSnapshotJobs") + private TreeMap historyAutomatedSnapshotJobs = new TreeMap<>(); + + private BlobStorage remoteStorage; + private String locationWithServiceId; + + public ClusterSnapshotMgr() {} + + // Turn on automated snapshot, use stmt for extension in future + public void setAutomatedSnapshotOn(AdminSetAutomatedSnapshotOnStmt stmt) { + String storageVolumeName = stmt.getStorageVolumeName(); + setAutomatedSnapshotOn(storageVolumeName); + + ClusterSnapshotLog log = new ClusterSnapshotLog(); + log.setCreateSnapshotNamePrefix(AUTOMATED_NAME_PREFIX, storageVolumeName); + GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log); + } + + protected void setAutomatedSnapshotOn(String storageVolumeName) { + automatedSnapshotSvName = storageVolumeName; + } + + public void createAutomatedSnaphot(ClusterSnapshotJob job) { + ClusterSnapshot newAutomatedSnapshot = new ClusterSnapshot( + GlobalStateMgr.getCurrentState().getNextId(), job.getSnapshotName(), job.getStorageVolumeName(), + job.getCreateTime(), job.getSuccessTime(), job.getFeJournalId(), job.getStarMgrJournalId()); + + ClusterSnapshotLog log = new ClusterSnapshotLog(); + log.setCreateSnapshot(newAutomatedSnapshot); + GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log); + + if (automatedSnapshot != null && automatedSnapshot.getSnapshotName().startsWith(AUTOMATED_NAME_PREFIX)) { + deleteSnapshotFromRemote(automatedSnapshot.getSnapshotName()); + } + + automatedSnapshot = newAutomatedSnapshot; + + LOG.info("Finish automated cluster snapshot job successfully, job id: {}, snapshot name: {}", job.getJobId(), + job.getSnapshotName()); + } + + public ClusterSnapshotJob createNewAutomatedSnapshotJob() { + long createTime = System.currentTimeMillis(); + long jobId = GlobalStateMgr.getCurrentState().getNextId(); + String snapshotNamePrefix = ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX; + String snapshotName = snapshotNamePrefix + '_' + String.valueOf(createTime); + String storageVolumeName = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshotSvName(); + ClusterSnapshotJob job = new ClusterSnapshotJob(jobId, snapshotNamePrefix, snapshotName, storageVolumeName); + job.setState(ClusterSnapshotJobState.INITIALIZING, false); + + addJob(job); + + LOG.info("Create automated cluster snapshot job successfully, job id: {}, snapshot name: {}", jobId, snapshotName); + + return job; + } + + public String getAutomatedSnapshotSvName() { + return automatedSnapshotSvName; + } + + public StorageVolume getAutomatedSnapshotSv() { + if (automatedSnapshotSvName.isEmpty()) { + return null; + } + + return GlobalStateMgr.getCurrentState().getStorageVolumeMgr().getStorageVolumeByName(automatedSnapshotSvName); + } + + public ClusterSnapshot getAutomatedSnapshot() { + return automatedSnapshot; + } + + public boolean containsAutomatedSnapshot() { + return getAutomatedSnapshot() != null; + } + + public boolean isAutomatedSnapshotOn() { + return automatedSnapshotSvName != null && !automatedSnapshotSvName.isEmpty(); + } + + public synchronized void addJob(ClusterSnapshotJob job) { + if (historyAutomatedSnapshotJobs.size() == Config.max_historical_automated_cluster_snapshot_jobs) { + historyAutomatedSnapshotJobs.pollFirstEntry(); + } + historyAutomatedSnapshotJobs.put(job.getJobId(), job); + } + + public String getLastFinishedAutomatedSnapshotJobName() { + ClusterSnapshot snapshot = getAutomatedSnapshot(); + if (snapshot == null) { + return ""; + } + return snapshot.getSnapshotName(); + } + + // Turn off automated snapshot, use stmt for extension in future + public void setAutomatedSnapshotOff(AdminSetAutomatedSnapshotOffStmt stmt) { + setAutomatedSnapshotOff(); + + ClusterSnapshotLog log = new ClusterSnapshotLog(); + log.setDropSnapshot(AUTOMATED_NAME_PREFIX); + GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log); + + // avoid network communication when replay log + if (automatedSnapshot != null) { + deleteSnapshotFromRemote(automatedSnapshot.getSnapshotName()); + } + } + + protected void setAutomatedSnapshotOff() { + // drop AUTOMATED snapshot + automatedSnapshotSvName = ""; + automatedSnapshot = null; + historyAutomatedSnapshotJobs.clear(); + } + + public TClusterSnapshotJobsResponse getAllJobsInfo() { + TClusterSnapshotJobsResponse response = new TClusterSnapshotJobsResponse(); + for (Map.Entry entry : historyAutomatedSnapshotJobs.entrySet()) { + response.addToItems(entry.getValue().getInfo()); + } + return response; + } + + public TClusterSnapshotsResponse getAllInfo() { + TClusterSnapshotsResponse response = new TClusterSnapshotsResponse(); + if (automatedSnapshot != null) { + response.addToItems(automatedSnapshot.getInfo()); + } + return response; + } + + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, String snapshotName, String localMetaDir) { + initRemoteStorageContext(); + String curRemoteSnapshotPath = locationWithServiceId + UPLOAD_SUB_PATH + snapshotName; + + Status status = Status.OK; + try { + if (belongToGlobalStateMgr) { + do { + Storage localStorage = new Storage(localMetaDir); + Storage localStorageV2 = new Storage(localMetaDir + "/v2"); + long imageJournalId = localStorage.getImageJournalId(); + File curFile = null; + + curFile = localStorageV2.getCurrentImageFile(); + status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/v2/" + curFile.getName()); + if (!status.ok()) { + break; + } + + curFile = localStorageV2.getCurrentChecksumFile(); + status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/v2/" + curFile.getName()); + if (!status.ok()) { + break; + } + + curFile = localStorage.getRoleFile(); + status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/" + curFile.getName()); + if (!status.ok()) { + break; + } + + curFile = localStorage.getVersionFile(); + status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/" + curFile.getName()); + if (!status.ok()) { + break; + } + } while (false); + } else { + Storage localStorage = new Storage(localMetaDir); + File curFile = localStorage.getCurrentImageFile(); + + status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/starmgr/" + curFile.getName()); + } + } catch (IOException e) { + status = new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); + } + + return status; + } + + public void deleteSnapshotFromRemote(String snapshotName) { + if (snapshotName == null || snapshotName.isEmpty()) { + return; + } + + initRemoteStorageContext(); + String curRemoteSnapshotPath = locationWithServiceId + UPLOAD_SUB_PATH + snapshotName + '/'; + remoteStorage.delete(curRemoteSnapshotPath); + } + + private void initRemoteStorageContext() { + if (this.remoteStorage == null || this.locationWithServiceId == null) { + StorageVolume sv = getAutomatedSnapshotSv(); + this.remoteStorage = new BlobStorage(null, sv.getProperties(), false); + this.locationWithServiceId = sv.getLocations().get(0) + "/" + + GlobalStateMgr.getCurrentState().getStarOSAgent().getRawServiceId(); + } + } + + public void replayLog(ClusterSnapshotLog log) { + ClusterSnapshotLog.ClusterSnapshotLogType logType = log.getType(); + switch (logType) { + case CREATE_SNAPSHOT_PREFIX: { + String createSnapshotNamePrefix = log.getCreateSnapshotNamePrefix(); + String storageVolumeName = log.getStorageVolumeName(); + if (createSnapshotNamePrefix.equals(AUTOMATED_NAME_PREFIX)) { + setAutomatedSnapshotOn(storageVolumeName); + } + break; + } + case CREATE_SNAPSHOT: { + ClusterSnapshot snapshot = log.getSnapshot(); + automatedSnapshot = snapshot; + break; + } + case DROP_SNAPSHOT: { + String dropSnapshotName = log.getDropSnapshotName(); + if (dropSnapshotName.equals(AUTOMATED_NAME_PREFIX)) { + setAutomatedSnapshotOff(); + } + break; + } + case UPDATE_SNAPSHOT_JOB: { + FrontendNodeType feType = GlobalStateMgr.getCurrentState().getFeType(); + ClusterSnapshotJob job = log.getSnapshotJob(); + ClusterSnapshotJobState state = job.getState(); + + if ((state == ClusterSnapshotJobState.INITIALIZING || state == ClusterSnapshotJobState.SNAPSHOTING) && + (feType == FrontendNodeType.INIT)) { + job.setState(ClusterSnapshotJobState.ERROR, true); + job.setErrMsg("Snapshot job has been failed"); + } + + switch (state) { + case INITIALIZING: { + addJob(job); + break; + } + case SNAPSHOTING: + case FINISHED: + case ERROR: { + if (historyAutomatedSnapshotJobs.containsKey(job.getJobId())) { + historyAutomatedSnapshotJobs.remove(job.getJobId()); + historyAutomatedSnapshotJobs.put(job.getJobId(), job); + } + break; + } + default: { + LOG.warn("Invalid Cluster Snapshot Job state {}", state); + } + } + break; + } + default: { + LOG.warn("Invalid Cluster Snapshot Log Type {}", logType); + } + } + } + + public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockException { + SRMetaBlockWriter writer = imageWriter.getBlockWriter(SRMetaBlockID.CLUSTER_SNAPSHOT_MGR, 1); + writer.writeJson(this); + writer.close(); + } + + public void load(SRMetaBlockReader reader) + throws SRMetaBlockEOFException, IOException, SRMetaBlockException { + ClusterSnapshotMgr data = reader.readJson(ClusterSnapshotMgr.class); + } + + @Override + public void gsonPostProcess() throws IOException { + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java b/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java new file mode 100644 index 0000000000000..da5f7abb28dd6 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java @@ -0,0 +1,104 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.persist; + +import com.google.gson.annotations.SerializedName; +import com.starrocks.common.io.Text; +import com.starrocks.common.io.Writable; +import com.starrocks.lake.snapshot.ClusterSnapshot; +import com.starrocks.lake.snapshot.ClusterSnapshotJob; +import com.starrocks.persist.gson.GsonUtils; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +public class ClusterSnapshotLog implements Writable { + public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, CREATE_SNAPSHOT, DROP_SNAPSHOT, UPDATE_SNAPSHOT_JOB } + @SerializedName(value = "type") + private ClusterSnapshotLogType type = ClusterSnapshotLogType.NONE; + // For CREATE_SNAPSHOT_PREFIX + @SerializedName(value = "createSnapshotNamePrefix") + private String createSnapshotNamePrefix = ""; + @SerializedName(value = "storageVolumeName") + private String storageVolumeName = ""; + // For CREATE_SNAPSHOT + @SerializedName(value = "snapshot") + private ClusterSnapshot snapshot = null; + // For DROP_SNAPSHOT + @SerializedName(value = "dropSnapshotName") + private String dropSnapshotName = ""; + // For UPDATE_SNAPSHOT_JOB + @SerializedName(value = "snapshotJob") + private ClusterSnapshotJob snapshotJob = null; + + public ClusterSnapshotLog() {} + + public void setCreateSnapshotNamePrefix(String createSnapshotNamePrefix, String storageVolumeName) { + this.type = ClusterSnapshotLogType.CREATE_SNAPSHOT_PREFIX; + this.createSnapshotNamePrefix = createSnapshotNamePrefix; + this.storageVolumeName = storageVolumeName; + } + + public void setCreateSnapshot(ClusterSnapshot snapshot) { + this.type = ClusterSnapshotLogType.CREATE_SNAPSHOT; + this.snapshot = snapshot; + } + + public void setDropSnapshot(String dropSnapshotName) { + this.type = ClusterSnapshotLogType.DROP_SNAPSHOT; + this.dropSnapshotName = dropSnapshotName; + } + + public void setSnapshotJob(ClusterSnapshotJob job) { + this.type = ClusterSnapshotLogType.UPDATE_SNAPSHOT_JOB; + this.snapshotJob = job; + } + + public ClusterSnapshotLogType getType() { + return type; + } + + public String getCreateSnapshotNamePrefix() { + return this.createSnapshotNamePrefix; + } + + public String getStorageVolumeName() { + return this.storageVolumeName; + } + + public ClusterSnapshot getSnapshot() { + return this.snapshot; + } + + public String getDropSnapshotName() { + return this.dropSnapshotName; + } + + public ClusterSnapshotJob getSnapshotJob() { + return this.snapshotJob; + } + + public static ClusterSnapshotLog read(DataInput in) throws IOException { + String json = Text.readString(in); + return GsonUtils.GSON.fromJson(json, ClusterSnapshotLog.class); + } + + @Override + public void write(DataOutput out) throws IOException { + String json = GsonUtils.GSON.toJson(this); + Text.writeString(out, json); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java b/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java index a7eb1428676cf..c6d42ee3b1f11 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/EditLog.java @@ -1110,6 +1110,11 @@ public void loadJournal(GlobalStateMgr globalStateMgr, JournalEntity journal) warehouseMgr.replayAlterWarehouse(wh); break; } + case OperationType.OP_CLUSTER_SNAPSHOT_LOG: { + ClusterSnapshotLog log = (ClusterSnapshotLog) journal.getData(); + globalStateMgr.getClusterSnapshotMgr().replayLog(log); + break; + } default: { if (Config.metadata_ignore_unknown_operation_type) { LOG.warn("UNKNOWN Operation Type {}", opCode); @@ -1957,4 +1962,8 @@ public void logCancelDisableDisk(CancelDisableDiskInfo info) { public void logRecoverPartitionVersion(PartitionVersionRecoveryInfo info) { logEdit(OperationType.OP_RECOVER_PARTITION_VERSION, info); } + + public void logClusterSnapshotLog(ClusterSnapshotLog info) { + logEdit(OperationType.OP_CLUSTER_SNAPSHOT_LOG, info); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java b/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java index aa94a62600343..892150ae323d5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/OperationType.java @@ -588,6 +588,9 @@ public class OperationType { @IgnorableOnReplayFailed public static final short OP_ADD_KEY = 13512; + @IgnorableOnReplayFailed + public static final short OP_CLUSTER_SNAPSHOT_LOG = 13513; + /** * NOTICE: OperationType cannot use a value exceeding 20000, please follow the above sequence number */ diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java b/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java index f46283654c213..f9a7505b9f31a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/metablock/SRMetaBlockID.java @@ -97,6 +97,8 @@ public int getId() { public static final SRMetaBlockID PIPE_MGR = new SRMetaBlockID(32); + public static final SRMetaBlockID CLUSTER_SNAPSHOT_MGR = new SRMetaBlockID(33); + /** * NOTICE: SRMetaBlockID cannot use a value exceeding 20000, please follow the above sequence number */ diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java b/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java index fa79e2a88a79c..9a601c5279dcf 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/DDLStmtExecutor.java @@ -41,6 +41,8 @@ import com.starrocks.sql.ast.AdminCancelRepairTableStmt; import com.starrocks.sql.ast.AdminCheckTabletsStmt; import com.starrocks.sql.ast.AdminRepairTableStmt; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt; import com.starrocks.sql.ast.AdminSetConfigStmt; import com.starrocks.sql.ast.AdminSetPartitionVersionStmt; import com.starrocks.sql.ast.AdminSetReplicaStatusStmt; @@ -1185,6 +1187,24 @@ public ShowResultSet visitAlterWarehouseStatement(AlterWarehouseStmt stmt, Conne }); return null; } + + @Override + public ShowResultSet visitAdminSetAutomatedSnapshotOnStatement(AdminSetAutomatedSnapshotOnStmt stmt, + ConnectContext context) { + ErrorReport.wrapWithRuntimeException(() -> { + context.getGlobalStateMgr().getClusterSnapshotMgr().setAutomatedSnapshotOn(stmt); + }); + return null; + } + + @Override + public ShowResultSet visitAdminSetAutomatedSnapshotOffStatement(AdminSetAutomatedSnapshotOffStmt stmt, + ConnectContext context) { + ErrorReport.wrapWithRuntimeException(() -> { + context.getGlobalStateMgr().getClusterSnapshotMgr().setAutomatedSnapshotOff(stmt); + }); + return null; + } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java index c39534a9cc820..83ba6f6645a07 100644 --- a/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/server/GlobalStateMgr.java @@ -139,6 +139,7 @@ import com.starrocks.lake.StarOSAgent; import com.starrocks.lake.compaction.CompactionControlScheduler; import com.starrocks.lake.compaction.CompactionMgr; +import com.starrocks.lake.snapshot.ClusterSnapshotMgr; import com.starrocks.lake.vacuum.AutovacuumDaemon; import com.starrocks.leader.CheckpointController; import com.starrocks.leader.TaskRunStateSynchronizer; @@ -515,6 +516,8 @@ public class GlobalStateMgr { private final ExecutorService queryDeployExecutor; private final WarehouseIdleChecker warehouseIdleChecker; + private final ClusterSnapshotMgr clusterSnapshotMgr; + public NodeMgr getNodeMgr() { return nodeMgr; } @@ -815,7 +818,10 @@ public void transferToNonLeader(FrontendNodeType newType) { this.queryDeployExecutor = ThreadPoolManager.newDaemonFixedThreadPool(Config.query_deploy_threadpool_size, Integer.MAX_VALUE, "query-deploy", true); + this.warehouseIdleChecker = new WarehouseIdleChecker(); + + this.clusterSnapshotMgr = new ClusterSnapshotMgr(); } public static void destroyCheckpoint() { @@ -1058,6 +1064,10 @@ public GlobalConstraintManager getGlobalConstraintManager() { return globalConstraintManager; } + public ClusterSnapshotMgr getClusterSnapshotMgr() { + return clusterSnapshotMgr; + } + // Use tryLock to avoid potential deadlock public boolean tryLock(boolean mustLock) { while (true) { @@ -1544,6 +1554,7 @@ public void loadImage(String imageDir) throws IOException { .put(SRMetaBlockID.KEY_MGR, keyMgr::load) .put(SRMetaBlockID.PIPE_MGR, pipeManager.getRepo()::load) .put(SRMetaBlockID.WAREHOUSE_MGR, warehouseMgr::load) + .put(SRMetaBlockID.CLUSTER_SNAPSHOT_MGR, clusterSnapshotMgr::load) .build(); Set metaMgrMustExists = new HashSet<>(loadImages.keySet()); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java index 83595a2944fdc..23793cb2a651d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/Analyzer.java @@ -21,6 +21,8 @@ import com.starrocks.sql.ast.AdminCancelRepairTableStmt; import com.starrocks.sql.ast.AdminCheckTabletsStmt; import com.starrocks.sql.ast.AdminRepairTableStmt; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt; import com.starrocks.sql.ast.AdminSetConfigStmt; import com.starrocks.sql.ast.AdminSetPartitionVersionStmt; import com.starrocks.sql.ast.AdminSetReplicaStatusStmt; @@ -579,6 +581,19 @@ public Void visitPauseRoutineLoadStatement(PauseRoutineLoadStmt statement, Conne return null; } + @Override + public Void visitAdminSetAutomatedSnapshotOnStatement(AdminSetAutomatedSnapshotOnStmt statement, ConnectContext context) { + ClusterSnapshotAnalyzer.analyze(statement, context); + return null; + } + + @Override + public Void visitAdminSetAutomatedSnapshotOffStatement(AdminSetAutomatedSnapshotOffStmt statement, + ConnectContext context) { + ClusterSnapshotAnalyzer.analyze(statement, context); + return null; + } + // ---------------------------------------- Catalog Statement ------------------------------------------- @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ClusterSnapshotAnalyzer.java b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ClusterSnapshotAnalyzer.java new file mode 100644 index 0000000000000..c643c956add60 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/analyzer/ClusterSnapshotAnalyzer.java @@ -0,0 +1,62 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.analyzer; + +import com.starrocks.common.DdlException; +import com.starrocks.qe.ConnectContext; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.StorageVolumeMgr; +import com.starrocks.sql.analyzer.SemanticException; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt; +import com.starrocks.sql.ast.AstVisitor; +import com.starrocks.sql.ast.StatementBase; + +public class ClusterSnapshotAnalyzer { + public static void analyze(StatementBase stmt, ConnectContext session) { + new ClusterSnapshotAnalyzer.ClusterSnapshotAnalyzerVisitor().visit(stmt, session); + } + + static class ClusterSnapshotAnalyzerVisitor implements AstVisitor { + @Override + public Void visitAdminSetAutomatedSnapshotOnStatement(AdminSetAutomatedSnapshotOnStmt statement, ConnectContext context) { + if (GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().isAutomatedSnapshotOn()) { + throw new SemanticException("Automated snapshot has been turn on"); + } + + String storageVolumeName = statement.getStorageVolumeName(); + StorageVolumeMgr storageVolumeMgr = GlobalStateMgr.getCurrentState().getStorageVolumeMgr(); + try { + if (!storageVolumeMgr.exists(storageVolumeName)) { + throw new SemanticException("Unknown storage volume: %s", storageVolumeName); + } + } catch (DdlException e) { + throw new SemanticException("Failed to get storage volume", e); + } + + return null; + } + + @Override + public Void visitAdminSetAutomatedSnapshotOffStatement(AdminSetAutomatedSnapshotOffStmt statement, + ConnectContext context) { + if (!GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().isAutomatedSnapshotOn()) { + throw new SemanticException("Automated snapshot has not been turn on"); + } + + return null; + } + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AdminSetAutomatedSnapshotOffStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AdminSetAutomatedSnapshotOffStmt.java new file mode 100644 index 0000000000000..3eafc0edd9c1e --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AdminSetAutomatedSnapshotOffStmt.java @@ -0,0 +1,33 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.ast; + +import com.starrocks.sql.parser.NodePosition; + +public class AdminSetAutomatedSnapshotOffStmt extends DdlStmt { + + public AdminSetAutomatedSnapshotOffStmt() { + super(NodePosition.ZERO); + } + + public AdminSetAutomatedSnapshotOffStmt(NodePosition pos) { + super(pos); + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitAdminSetAutomatedSnapshotOffStatement(this, context); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AdminSetAutomatedSnapshotOnStmt.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AdminSetAutomatedSnapshotOnStmt.java new file mode 100644 index 0000000000000..b8f209ee75db4 --- /dev/null +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AdminSetAutomatedSnapshotOnStmt.java @@ -0,0 +1,40 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.sql.ast; + +import com.starrocks.sql.parser.NodePosition; + +public class AdminSetAutomatedSnapshotOnStmt extends DdlStmt { + private final String storageVolumeName; + + public AdminSetAutomatedSnapshotOnStmt(String storageVolumeName) { + super(NodePosition.ZERO); + this.storageVolumeName = storageVolumeName; + } + + public AdminSetAutomatedSnapshotOnStmt(String storageVolumeName, NodePosition pos) { + super(pos); + this.storageVolumeName = storageVolumeName; + } + + public String getStorageVolumeName() { + return storageVolumeName; + } + + @Override + public R accept(AstVisitor visitor, C context) { + return visitor.visitAdminSetAutomatedSnapshotOnStatement(this, context); + } +} diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java index 37aab4f36671e..8bf71a9220e4b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/ast/AstVisitor.java @@ -426,6 +426,14 @@ default R visitSyncStatement(SyncStmt statement, C context) { return visitDDLStatement(statement, context); } + default R visitAdminSetAutomatedSnapshotOnStatement(AdminSetAutomatedSnapshotOnStmt clause, C context) { + return visitDDLStatement(clause, context); + } + + default R visitAdminSetAutomatedSnapshotOffStatement(AdminSetAutomatedSnapshotOffStmt clause, C context) { + return visitDDLStatement(clause, context); + } + // ---------------------------------------- Cluster Management Statement ------------------------------------------- default R visitAlterSystemStatement(AlterSystemStmt statement, C context) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java index bf19cebcc5159..1d7ebd2304ff8 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/AstBuilder.java @@ -114,6 +114,7 @@ import com.starrocks.qe.OriginStatement; import com.starrocks.qe.SqlModeHelper; import com.starrocks.scheduler.persist.TaskSchedule; +import com.starrocks.server.StorageVolumeMgr; import com.starrocks.server.WarehouseManager; import com.starrocks.sql.ShowTemporaryTableStmt; import com.starrocks.sql.analyzer.AnalyzerUtils; @@ -136,6 +137,8 @@ import com.starrocks.sql.ast.AdminCancelRepairTableStmt; import com.starrocks.sql.ast.AdminCheckTabletsStmt; import com.starrocks.sql.ast.AdminRepairTableStmt; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt; import com.starrocks.sql.ast.AdminSetConfigStmt; import com.starrocks.sql.ast.AdminSetPartitionVersionStmt; import com.starrocks.sql.ast.AdminSetReplicaStatusStmt; @@ -2603,6 +2606,22 @@ public ParseNode visitSyncStatement(StarRocksParser.SyncStatementContext context return new SyncStmt(createPos(context)); } + @Override + public ParseNode visitAdminSetAutomatedSnapshotOnStatement( + StarRocksParser.AdminSetAutomatedSnapshotOnStatementContext context) { + String svName = StorageVolumeMgr.BUILTIN_STORAGE_VOLUME; + if (context.svName != null) { + svName = getIdentifierName(context.svName); + } + return new AdminSetAutomatedSnapshotOnStmt(svName, createPos(context)); + } + + @Override + public ParseNode visitAdminSetAutomatedSnapshotOffStatement( + StarRocksParser.AdminSetAutomatedSnapshotOffStatementContext context) { + return new AdminSetAutomatedSnapshotOffStmt(createPos(context)); + } + // ------------------------------------------- Cluster Management Statement ---------------------------------------- @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 index 33e711552ff4d..5d629ee3ef067 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 @@ -126,6 +126,8 @@ statement | killStatement | syncStatement | executeScriptStatement + | adminSetAutomatedSnapshotOnStatement + | adminSetAutomatedSnapshotOffStatement // Cluster Management Statement | alterSystemStatement @@ -734,6 +736,14 @@ syncStatement : SYNC ; +adminSetAutomatedSnapshotOnStatement + : ADMIN SET AUTOMATED CLUSTER SNAPSHOT ON (STORAGE VOLUME svName=identifier)? + ; + +adminSetAutomatedSnapshotOffStatement + : ADMIN SET AUTOMATED CLUSTER SNAPSHOT OFF + ; + // ------------------------------------------- Cluster Management Statement --------------------------------------------- alterSystemStatement @@ -2932,7 +2942,7 @@ number ; nonReserved - : ACCESS | ACTIVE | ADVISOR | AFTER | AGGREGATE | APPLY | ASYNC | AUTHORS | AVG | ADMIN | ANTI | AUTHENTICATION | AUTO_INCREMENT + : ACCESS | ACTIVE | ADVISOR | AFTER | AGGREGATE | APPLY | ASYNC | AUTHORS | AVG | ADMIN | ANTI | AUTHENTICATION | AUTO_INCREMENT | AUTOMATED | ARRAY_AGG | ARRAY_AGG_DISTINCT | BACKEND | BACKENDS | BACKUP | BEGIN | BITMAP_UNION | BLACKLIST | BLACKHOLE | BINARY | BODY | BOOLEAN | BRANCH | BROKER | BUCKETS | BUILTIN | BASE | BEFORE @@ -2947,11 +2957,11 @@ nonReserved | HASH | HISTOGRAM | HELP | HLL_UNION | HOST | HOUR | HOURS | HUB | IDENTIFIED | IMAGE | IMPERSONATE | INACTIVE | INCREMENTAL | INDEXES | INSTALL | INTEGRATION | INTEGRATIONS | INTERMEDIATE | INTERVAL | ISOLATION - | JOB + | JOB | JOBS | LABEL | LAST | LESS | LEVEL | LIST | LOCAL | LOCATION | LOGS | LOGICAL | LOW_PRIORITY | LOCK | LOCATIONS | MANUAL | MAP | MAPPING | MAPPINGS | MASKING | MATCH | MAPPINGS | MATERIALIZED | MAX | META | MIN | MINUTE | MINUTES | MODE | MODIFY | MONTH | MERGE | MINUS | NAME | NAMES | NEGATIVE | NO | NODE | NODES | NONE | NULLS | NUMBER | NUMERIC - | OBSERVER | OF | OFFSET | ONLY | OPTIMIZER | OPEN | OPERATE | OPTION | OVERWRITE + | OBSERVER | OF | OFFSET | ONLY | OPTIMIZER | OPEN | OPERATE | OPTION | OVERWRITE | OFF | PARTITIONS | PASSWORD | PATH | PAUSE | PENDING | PERCENTILE_UNION | PIVOT | PLAN | PLUGIN | PLUGINS | POLICY | POLICIES | PERCENT_RANK | PREDICATE | PRECEDING | PRIORITY | PROC | PROCESSLIST | PROFILE | PROFILELIST | PRIVILEGES | PROBABILITY | PROPERTIES | PROPERTY | PIPE | PIPES | QUARTER | QUERY | QUERIES | QUEUE | QUOTA | QUALIFY diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocksLex.g4 b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocksLex.g4 index 70e7d61af90d0..7f4b65ce020d5 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocksLex.g4 +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocksLex.g4 @@ -45,6 +45,7 @@ ASC: 'ASC'; ASYNC: 'ASYNC'; AUTHORS: 'AUTHORS'; AUTHENTICATION: 'AUTHENTICATION'; +AUTOMATED: 'AUTOMATED'; AUTO_INCREMENT: 'AUTO_INCREMENT'; AVG: 'AVG'; BACKEND: 'BACKEND'; @@ -230,6 +231,7 @@ OVERWRITE: 'OVERWRITE'; IS: 'IS'; ISOLATION: 'ISOLATION'; JOB: 'JOB'; +JOBS: 'JOBS'; JOIN: 'JOIN'; JSON: 'JSON'; KEY: 'KEY'; @@ -292,6 +294,7 @@ NUMBER: 'NUMBER'; NUMERIC: 'NUMERIC'; OBSERVER: 'OBSERVER'; OF: 'OF'; +OFF: 'OFF'; OFFSET: 'OFFSET'; ON: 'ON'; ONLY: 'ONLY'; diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java new file mode 100644 index 0000000000000..f3ce44fba82ab --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java @@ -0,0 +1,638 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.lake.snapshot; + +import com.google.common.collect.Lists; +import com.starrocks.alter.AlterTest; +import com.starrocks.backup.BlobStorage; +import com.starrocks.backup.Status; +import com.starrocks.common.AlreadyExistsException; +import com.starrocks.common.DdlException; +import com.starrocks.common.MetaNotFoundException; +import com.starrocks.common.Pair; +import com.starrocks.journal.bdbje.BDBJEJournal; +import com.starrocks.lake.StarOSAgent; +import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState; +import com.starrocks.persist.ClusterSnapshotLog; +import com.starrocks.persist.EditLog; +import com.starrocks.persist.Storage; +import com.starrocks.server.GlobalStateMgr; +import com.starrocks.server.RunMode; +import com.starrocks.server.StorageVolumeMgr; +import com.starrocks.sql.analyzer.AnalyzeTestUtil; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; +import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt; +import mockit.Delegate; +import mockit.Expectations; +import mockit.Mock; +import mockit.MockUp; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AWS_S3_ENDPOINT; +import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AWS_S3_REGION; +import static com.starrocks.connector.share.credential.CloudConfigurationConstants.AWS_S3_USE_AWS_SDK_DEFAULT_BEHAVIOR; +import static com.starrocks.sql.analyzer.AnalyzeTestUtil.analyzeFail; +import static com.starrocks.sql.analyzer.AnalyzeTestUtil.analyzeSuccess; + +public class ClusterSnapshotTest { + @Mocked + private EditLog editLog; + + private StarOSAgent starOSAgent = new StarOSAgent(); + + private String storageVolumeName = StorageVolumeMgr.BUILTIN_STORAGE_VOLUME; + private ClusterSnapshotMgr clusterSnapshotMgr = new ClusterSnapshotMgr(); + private boolean initSv = false; + + private File mockedFile = new File("/abc/abc"); + + private ClusterSnapshotCheckpointContext context = new ClusterSnapshotCheckpointContext(); + + @BeforeClass + public static void beforeClass() throws Exception { + AlterTest.beforeClass(); + AnalyzeTestUtil.init(); + } + + @Before + public void setUp() { + try { + initStorageVolume(); + } catch (Exception ignore) { + } + + new Expectations() { + { + editLog.logClusterSnapshotLog((ClusterSnapshotLog) any); + minTimes = 0; + result = new Delegate() { + public void logClusterSnapshotLog(ClusterSnapshotLog log) { + } + }; + } + }; + + new MockUp() { + @Mock + public EditLog getEditLog() { + return editLog; + } + + @Mock + public ClusterSnapshotMgr getClusterSnapshotMgr() { + return clusterSnapshotMgr; + } + + @Mock + public long getNextId() { + return 0L; + } + }; + + new MockUp() { + @Mock + public Status delete(String remotePath) { + return Status.OK; + } + }; + + new MockUp() { + @Mock + public File getCurrentImageFile() { + return mockedFile; + } + + @Mock + public File getCurrentChecksumFile() { + return mockedFile; + } + + @Mock + public File getRoleFile() { + return mockedFile; + } + + @Mock + public File getVersionFile() { + return mockedFile; + } + }; + + new MockUp() { + @Mock + public StarOSAgent getStarOSAgent() { + return starOSAgent; + } + }; + + new MockUp() { + @Mock + public String getRawServiceId() { + return "qwertty"; + } + }; + + setAutomatedSnapshotOff(false); + } + + private void setAutomatedSnapshotOn(boolean testReplay) { + if (!testReplay) { + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().setAutomatedSnapshotOn( + new AdminSetAutomatedSnapshotOnStmt(storageVolumeName)); + } else { + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().setAutomatedSnapshotOn(storageVolumeName); + } + } + + private void setAutomatedSnapshotOff(boolean testReplay) { + if (!testReplay) { + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().setAutomatedSnapshotOff( + new AdminSetAutomatedSnapshotOffStmt()); + } else { + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().setAutomatedSnapshotOff(); + } + } + + private void initStorageVolume() throws AlreadyExistsException, DdlException, MetaNotFoundException { + if (!initSv) { + List locations = Arrays.asList("s3://abc"); + Map storageParams = new HashMap<>(); + storageParams.put(AWS_S3_REGION, "region"); + storageParams.put(AWS_S3_ENDPOINT, "endpoint"); + storageParams.put(AWS_S3_USE_AWS_SDK_DEFAULT_BEHAVIOR, "true"); + String svKey = GlobalStateMgr.getCurrentState().getStorageVolumeMgr() + .createStorageVolume(storageVolumeName, "S3", locations, storageParams, Optional.empty(), ""); + Assert.assertEquals(true, GlobalStateMgr.getCurrentState().getStorageVolumeMgr().exists(storageVolumeName)); + Assert.assertEquals(storageVolumeName, + GlobalStateMgr.getCurrentState().getStorageVolumeMgr().getStorageVolumeName(svKey)); + initSv = true; + } + } + + @Test + public void testOperationOfAutomatedSnapshot() throws DdlException { + // 1. test analyer and execution + String turnOnSql = "ADMIN SET AUTOMATED CLUSTER SNAPSHOT ON"; + // no sv + analyzeFail(turnOnSql + " STORAGE VOLUME testSv"); + + analyzeSuccess(turnOnSql); + setAutomatedSnapshotOn(false); + // duplicate creation + analyzeFail(turnOnSql); + + setAutomatedSnapshotOff(false); + + String turnOFFSql = "ADMIN SET AUTOMATED CLUSTER SNAPSHOT OFF"; + analyzeFail(turnOFFSql); + setAutomatedSnapshotOn(false); + analyzeSuccess(turnOFFSql); + + // 2. test getInfo + setAutomatedSnapshotOn(false); + ClusterSnapshotJob job = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().createNewAutomatedSnapshotJob(); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().createAutomatedSnaphot(job); + ClusterSnapshot snapshot = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot(); + Assert.assertTrue(job.getInfo() != null); + Assert.assertTrue(snapshot.getInfo() != null); + setAutomatedSnapshotOff(false); + + // 3. test network communication interface + setAutomatedSnapshotOn(false); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().actualUploadImageForSnapshot(true, "abc", "/abc/"); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().actualUploadImageForSnapshot(false, "abc", "/abc/"); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr() + .deleteSnapshotFromRemote(ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX); + } + + @Test + public void testReplayClusterSnapshotLog() { + // create atuomated snapshot request log + ClusterSnapshotLog logCreate = new ClusterSnapshotLog(); + logCreate.setCreateSnapshotNamePrefix(ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX, storageVolumeName); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().replayLog(logCreate); + + // create snapshot log + ClusterSnapshotLog logSnapshot = new ClusterSnapshotLog(); + clusterSnapshotMgr.createNewAutomatedSnapshotJob().setState(ClusterSnapshotJobState.FINISHED, false); + logSnapshot.setCreateSnapshot(clusterSnapshotMgr.getAutomatedSnapshot()); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().replayLog(logCreate); + + // drop automated snapshot request log + ClusterSnapshotLog logDrop = new ClusterSnapshotLog(); + logDrop.setDropSnapshot(ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX); + GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().replayLog(logDrop); + } + + @Test + public void testCheckpointCoordination() { + new MockUp() { + @Mock + public long getMaxJournalId() { + return 123456L; + } + }; + + new MockUp() { + @Mock + public long getImageJournalId() { + return 1L; + } + }; + + new MockUp() { + @Mock + public boolean isSharedDataMode() { + return true; + } + }; + + new MockUp() { + @Mock + public void deleteSnapshotFromRemote(String snapshotName) { + return; + } + }; + + String feImageDir = "/root/meta/"; + String starMgrImageDir = "/root/meta/starmgr/"; + + context.setJournal(new BDBJEJournal(null, ""), true); + context.setJournal(new BDBJEJournal(null, "starmgr_"), false); + + Pair> coordinateRet1 = null; + Pair> coordinateRet2 = null; + Pair createImageRet = Pair.create(true, ""); + + List flags = Lists.newArrayList(); + flags.add(new Boolean(true)); + flags.add(new Boolean(false)); + for (Boolean f : flags) { + boolean f1 = f.booleanValue(); + boolean f2 = !f1; + + // case 1: normal case, cross execution + { + new MockUp() { + @Mock + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, + String snapshotName, String localMetaDir) { + return Status.OK; + } + }; + boolean checkpointIsReady = true; + + setAutomatedSnapshotOn(false); + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(coordinateRet1.first); + Assert.assertTrue(coordinateRet2.first); + Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet1.second.second); + Assert.assertTrue(coordinateRet2.second.second); + + context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); + context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); + setAutomatedSnapshotOff(false); + } + + // case 2: normal case, ordered execution + { + new MockUp() { + @Mock + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, + String snapshotName, String localMetaDir) { + return Status.OK; + } + }; + boolean checkpointIsReady = true; + + setAutomatedSnapshotOn(false); + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet1.second.second); + context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); + + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(coordinateRet2.first); + Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet2.second.second); + context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); + + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); + setAutomatedSnapshotOff(false); + } + + // case 3: normal case, peer is super slow + { + new MockUp() { + @Mock + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, + String snapshotName, String localMetaDir) { + return Status.OK; + } + }; + boolean checkpointIsReady = true; + + setAutomatedSnapshotOn(false); + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet1.second.second); + context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); + + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(coordinateRet2.first); + Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet2.second.second); + context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); + setAutomatedSnapshotOff(false); + } + + // case 4: error case, double upload failure, cross execution + { + new MockUp() { + @Mock + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, + String snapshotName, String localMetaDir) { + return new Status(Status.ErrCode.COMMON_ERROR, "test error"); + } + }; + boolean checkpointIsReady = true; + + setAutomatedSnapshotOn(false); + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(coordinateRet1.first); + Assert.assertTrue(coordinateRet2.first); + Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet1.second.second); + Assert.assertTrue(coordinateRet2.second.second); + + context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); + context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); + + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + + // reset by f2, and begin a new round + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(!coordinateRet2.first); + Assert.assertTrue(coordinateRet2.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet2.second.second); + + new MockUp() { + @Mock + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, + String snapshotName, String localMetaDir) { + return Status.OK; + } + }; + + // retry and success + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(coordinateRet1.first); + Assert.assertTrue(coordinateRet2.first); + Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet1.second.second); + Assert.assertTrue(coordinateRet2.second.second); + + context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); + context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); + + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); + setAutomatedSnapshotOff(false); + } + + // case 5: error case, leader upload failure, peer success + { + new MockUp() { + @Mock + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, + String snapshotName, String localMetaDir) { + return new Status(Status.ErrCode.COMMON_ERROR, "test error"); + } + }; + boolean checkpointIsReady = true; + + setAutomatedSnapshotOn(false); + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet1.second.second); + context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); + + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + + new MockUp() { + @Mock + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, + String snapshotName, String localMetaDir) { + return Status.OK; + } + }; + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(coordinateRet2.first); + Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet2.second.second); + context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); + + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + + // reset by f2, and begin a new round + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(!coordinateRet2.first); + Assert.assertTrue(coordinateRet2.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet2.second.second); + + // retry and success + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(coordinateRet1.first); + Assert.assertTrue(coordinateRet2.first); + Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet1.second.second); + Assert.assertTrue(coordinateRet2.second.second); + + context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); + context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); + + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); + setAutomatedSnapshotOff(false); + } + + // case 6: error case, leader success, peer upload failure + { + new MockUp() { + @Mock + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, + String snapshotName, String localMetaDir) { + return Status.OK; + } + }; + boolean checkpointIsReady = true; + + setAutomatedSnapshotOn(false); + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet1.second.second); + context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + + new MockUp() { + @Mock + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, + String snapshotName, String localMetaDir) { + return new Status(Status.ErrCode.COMMON_ERROR, "test error"); + } + }; + + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(coordinateRet2.first); + Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet2.second.second); + context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); + + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(!coordinateRet2.first); + Assert.assertTrue(coordinateRet2.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet2.second.second); + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(!coordinateRet2.first); + Assert.assertTrue(coordinateRet2.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet2.second.second); + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(!coordinateRet2.first); + Assert.assertTrue(coordinateRet2.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet2.second.second); + + + new MockUp() { + @Mock + public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, + String snapshotName, String localMetaDir) { + return Status.OK; + } + }; + + // reset by leader, and begin a new round + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + Assert.assertTrue(!coordinateRet1.first); + Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(!coordinateRet1.second.second); + + // retry and success + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); + coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); + coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); + Assert.assertTrue(coordinateRet1.first); + Assert.assertTrue(coordinateRet2.first); + Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); + Assert.assertTrue(coordinateRet1.second.second); + Assert.assertTrue(coordinateRet2.second.second); + + context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); + context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); + + Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); + setAutomatedSnapshotOff(false); + } + } + } +} diff --git a/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java b/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java index 4d75611d49494..8bf1b7cd69683 100644 --- a/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/persist/OperationTypeTest.java @@ -151,6 +151,7 @@ public void testRecoverableOperations() { Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_REPLICATION_JOB)); Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DELETE_REPLICATION_JOB)); Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_RESET_FRONTENDS)); + Assert.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_CLUSTER_SNAPSHOT_LOG)); } @Test From 46d5c7817fc1be906a73a7476dd8c4993005a3ce Mon Sep 17 00:00:00 2001 From: srlch Date: Mon, 30 Dec 2024 16:00:11 +0800 Subject: [PATCH 2/2] fix Signed-off-by: srlch --- .../lake/snapshot/ClusterSnapshotMgr.java | 214 --------- .../starrocks/persist/ClusterSnapshotLog.java | 28 +- .../com/starrocks/sql/parser/StarRocks.g4 | 2 +- .../com/starrocks/sql/parser/StarRocksLex.g4 | 1 - .../starrocks/lake/ClusterSnapshotTest.java | 423 +----------------- 5 files changed, 7 insertions(+), 661 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java index d6ba74221b693..058ac4ff43a3e 100644 --- a/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java +++ b/fe/fe-core/src/main/java/com/starrocks/lake/snapshot/ClusterSnapshotMgr.java @@ -15,14 +15,8 @@ package com.starrocks.lake.snapshot; import com.google.gson.annotations.SerializedName; -import com.starrocks.backup.BlobStorage; -import com.starrocks.backup.Status; -import com.starrocks.common.Config; -import com.starrocks.ha.FrontendNodeType; -import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState; import com.starrocks.persist.ClusterSnapshotLog; import com.starrocks.persist.ImageWriter; -import com.starrocks.persist.Storage; import com.starrocks.persist.gson.GsonPostProcessable; import com.starrocks.persist.metablock.SRMetaBlockEOFException; import com.starrocks.persist.metablock.SRMetaBlockException; @@ -32,32 +26,18 @@ import com.starrocks.server.GlobalStateMgr; import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOnStmt; -import com.starrocks.storagevolume.StorageVolume; -import com.starrocks.thrift.TClusterSnapshotJobsResponse; -import com.starrocks.thrift.TClusterSnapshotsResponse; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.File; import java.io.IOException; -import java.util.Map; -import java.util.TreeMap; // only used for AUTOMATED snapshot for now public class ClusterSnapshotMgr implements GsonPostProcessable { public static final Logger LOG = LogManager.getLogger(ClusterSnapshotMgr.class); public static final String AUTOMATED_NAME_PREFIX = "automated_cluster_snapshot"; - private static final String UPLOAD_SUB_PATH = "/meta/image/"; @SerializedName(value = "automatedSnapshotSvName") private String automatedSnapshotSvName = ""; - @SerializedName(value = "automatedSnapshot") - private ClusterSnapshot automatedSnapshot = null; - @SerializedName(value = "historyAutomatedSnapshotJobs") - private TreeMap historyAutomatedSnapshotJobs = new TreeMap<>(); - - private BlobStorage remoteStorage; - private String locationWithServiceId; public ClusterSnapshotMgr() {} @@ -75,80 +55,14 @@ protected void setAutomatedSnapshotOn(String storageVolumeName) { automatedSnapshotSvName = storageVolumeName; } - public void createAutomatedSnaphot(ClusterSnapshotJob job) { - ClusterSnapshot newAutomatedSnapshot = new ClusterSnapshot( - GlobalStateMgr.getCurrentState().getNextId(), job.getSnapshotName(), job.getStorageVolumeName(), - job.getCreateTime(), job.getSuccessTime(), job.getFeJournalId(), job.getStarMgrJournalId()); - - ClusterSnapshotLog log = new ClusterSnapshotLog(); - log.setCreateSnapshot(newAutomatedSnapshot); - GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log); - - if (automatedSnapshot != null && automatedSnapshot.getSnapshotName().startsWith(AUTOMATED_NAME_PREFIX)) { - deleteSnapshotFromRemote(automatedSnapshot.getSnapshotName()); - } - - automatedSnapshot = newAutomatedSnapshot; - - LOG.info("Finish automated cluster snapshot job successfully, job id: {}, snapshot name: {}", job.getJobId(), - job.getSnapshotName()); - } - - public ClusterSnapshotJob createNewAutomatedSnapshotJob() { - long createTime = System.currentTimeMillis(); - long jobId = GlobalStateMgr.getCurrentState().getNextId(); - String snapshotNamePrefix = ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX; - String snapshotName = snapshotNamePrefix + '_' + String.valueOf(createTime); - String storageVolumeName = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshotSvName(); - ClusterSnapshotJob job = new ClusterSnapshotJob(jobId, snapshotNamePrefix, snapshotName, storageVolumeName); - job.setState(ClusterSnapshotJobState.INITIALIZING, false); - - addJob(job); - - LOG.info("Create automated cluster snapshot job successfully, job id: {}, snapshot name: {}", jobId, snapshotName); - - return job; - } - public String getAutomatedSnapshotSvName() { return automatedSnapshotSvName; } - public StorageVolume getAutomatedSnapshotSv() { - if (automatedSnapshotSvName.isEmpty()) { - return null; - } - - return GlobalStateMgr.getCurrentState().getStorageVolumeMgr().getStorageVolumeByName(automatedSnapshotSvName); - } - - public ClusterSnapshot getAutomatedSnapshot() { - return automatedSnapshot; - } - - public boolean containsAutomatedSnapshot() { - return getAutomatedSnapshot() != null; - } - public boolean isAutomatedSnapshotOn() { return automatedSnapshotSvName != null && !automatedSnapshotSvName.isEmpty(); } - public synchronized void addJob(ClusterSnapshotJob job) { - if (historyAutomatedSnapshotJobs.size() == Config.max_historical_automated_cluster_snapshot_jobs) { - historyAutomatedSnapshotJobs.pollFirstEntry(); - } - historyAutomatedSnapshotJobs.put(job.getJobId(), job); - } - - public String getLastFinishedAutomatedSnapshotJobName() { - ClusterSnapshot snapshot = getAutomatedSnapshot(); - if (snapshot == null) { - return ""; - } - return snapshot.getSnapshotName(); - } - // Turn off automated snapshot, use stmt for extension in future public void setAutomatedSnapshotOff(AdminSetAutomatedSnapshotOffStmt stmt) { setAutomatedSnapshotOff(); @@ -156,103 +70,11 @@ public void setAutomatedSnapshotOff(AdminSetAutomatedSnapshotOffStmt stmt) { ClusterSnapshotLog log = new ClusterSnapshotLog(); log.setDropSnapshot(AUTOMATED_NAME_PREFIX); GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log); - - // avoid network communication when replay log - if (automatedSnapshot != null) { - deleteSnapshotFromRemote(automatedSnapshot.getSnapshotName()); - } } protected void setAutomatedSnapshotOff() { // drop AUTOMATED snapshot automatedSnapshotSvName = ""; - automatedSnapshot = null; - historyAutomatedSnapshotJobs.clear(); - } - - public TClusterSnapshotJobsResponse getAllJobsInfo() { - TClusterSnapshotJobsResponse response = new TClusterSnapshotJobsResponse(); - for (Map.Entry entry : historyAutomatedSnapshotJobs.entrySet()) { - response.addToItems(entry.getValue().getInfo()); - } - return response; - } - - public TClusterSnapshotsResponse getAllInfo() { - TClusterSnapshotsResponse response = new TClusterSnapshotsResponse(); - if (automatedSnapshot != null) { - response.addToItems(automatedSnapshot.getInfo()); - } - return response; - } - - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, String snapshotName, String localMetaDir) { - initRemoteStorageContext(); - String curRemoteSnapshotPath = locationWithServiceId + UPLOAD_SUB_PATH + snapshotName; - - Status status = Status.OK; - try { - if (belongToGlobalStateMgr) { - do { - Storage localStorage = new Storage(localMetaDir); - Storage localStorageV2 = new Storage(localMetaDir + "/v2"); - long imageJournalId = localStorage.getImageJournalId(); - File curFile = null; - - curFile = localStorageV2.getCurrentImageFile(); - status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/v2/" + curFile.getName()); - if (!status.ok()) { - break; - } - - curFile = localStorageV2.getCurrentChecksumFile(); - status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/v2/" + curFile.getName()); - if (!status.ok()) { - break; - } - - curFile = localStorage.getRoleFile(); - status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/" + curFile.getName()); - if (!status.ok()) { - break; - } - - curFile = localStorage.getVersionFile(); - status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/" + curFile.getName()); - if (!status.ok()) { - break; - } - } while (false); - } else { - Storage localStorage = new Storage(localMetaDir); - File curFile = localStorage.getCurrentImageFile(); - - status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/starmgr/" + curFile.getName()); - } - } catch (IOException e) { - status = new Status(Status.ErrCode.COMMON_ERROR, e.getMessage()); - } - - return status; - } - - public void deleteSnapshotFromRemote(String snapshotName) { - if (snapshotName == null || snapshotName.isEmpty()) { - return; - } - - initRemoteStorageContext(); - String curRemoteSnapshotPath = locationWithServiceId + UPLOAD_SUB_PATH + snapshotName + '/'; - remoteStorage.delete(curRemoteSnapshotPath); - } - - private void initRemoteStorageContext() { - if (this.remoteStorage == null || this.locationWithServiceId == null) { - StorageVolume sv = getAutomatedSnapshotSv(); - this.remoteStorage = new BlobStorage(null, sv.getProperties(), false); - this.locationWithServiceId = sv.getLocations().get(0) + "/" + - GlobalStateMgr.getCurrentState().getStarOSAgent().getRawServiceId(); - } } public void replayLog(ClusterSnapshotLog log) { @@ -266,11 +88,6 @@ public void replayLog(ClusterSnapshotLog log) { } break; } - case CREATE_SNAPSHOT: { - ClusterSnapshot snapshot = log.getSnapshot(); - automatedSnapshot = snapshot; - break; - } case DROP_SNAPSHOT: { String dropSnapshotName = log.getDropSnapshotName(); if (dropSnapshotName.equals(AUTOMATED_NAME_PREFIX)) { @@ -278,37 +95,6 @@ public void replayLog(ClusterSnapshotLog log) { } break; } - case UPDATE_SNAPSHOT_JOB: { - FrontendNodeType feType = GlobalStateMgr.getCurrentState().getFeType(); - ClusterSnapshotJob job = log.getSnapshotJob(); - ClusterSnapshotJobState state = job.getState(); - - if ((state == ClusterSnapshotJobState.INITIALIZING || state == ClusterSnapshotJobState.SNAPSHOTING) && - (feType == FrontendNodeType.INIT)) { - job.setState(ClusterSnapshotJobState.ERROR, true); - job.setErrMsg("Snapshot job has been failed"); - } - - switch (state) { - case INITIALIZING: { - addJob(job); - break; - } - case SNAPSHOTING: - case FINISHED: - case ERROR: { - if (historyAutomatedSnapshotJobs.containsKey(job.getJobId())) { - historyAutomatedSnapshotJobs.remove(job.getJobId()); - historyAutomatedSnapshotJobs.put(job.getJobId(), job); - } - break; - } - default: { - LOG.warn("Invalid Cluster Snapshot Job state {}", state); - } - } - break; - } default: { LOG.warn("Invalid Cluster Snapshot Log Type {}", logType); } diff --git a/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java b/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java index da5f7abb28dd6..e62d116e6d18b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java +++ b/fe/fe-core/src/main/java/com/starrocks/persist/ClusterSnapshotLog.java @@ -17,8 +17,6 @@ import com.google.gson.annotations.SerializedName; import com.starrocks.common.io.Text; import com.starrocks.common.io.Writable; -import com.starrocks.lake.snapshot.ClusterSnapshot; -import com.starrocks.lake.snapshot.ClusterSnapshotJob; import com.starrocks.persist.gson.GsonUtils; import java.io.DataInput; @@ -26,7 +24,7 @@ import java.io.IOException; public class ClusterSnapshotLog implements Writable { - public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, CREATE_SNAPSHOT, DROP_SNAPSHOT, UPDATE_SNAPSHOT_JOB } + public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, DROP_SNAPSHOT } @SerializedName(value = "type") private ClusterSnapshotLogType type = ClusterSnapshotLogType.NONE; // For CREATE_SNAPSHOT_PREFIX @@ -34,15 +32,9 @@ public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, CREATE_SNAPSH private String createSnapshotNamePrefix = ""; @SerializedName(value = "storageVolumeName") private String storageVolumeName = ""; - // For CREATE_SNAPSHOT - @SerializedName(value = "snapshot") - private ClusterSnapshot snapshot = null; // For DROP_SNAPSHOT @SerializedName(value = "dropSnapshotName") private String dropSnapshotName = ""; - // For UPDATE_SNAPSHOT_JOB - @SerializedName(value = "snapshotJob") - private ClusterSnapshotJob snapshotJob = null; public ClusterSnapshotLog() {} @@ -52,21 +44,11 @@ public void setCreateSnapshotNamePrefix(String createSnapshotNamePrefix, String this.storageVolumeName = storageVolumeName; } - public void setCreateSnapshot(ClusterSnapshot snapshot) { - this.type = ClusterSnapshotLogType.CREATE_SNAPSHOT; - this.snapshot = snapshot; - } - public void setDropSnapshot(String dropSnapshotName) { this.type = ClusterSnapshotLogType.DROP_SNAPSHOT; this.dropSnapshotName = dropSnapshotName; } - public void setSnapshotJob(ClusterSnapshotJob job) { - this.type = ClusterSnapshotLogType.UPDATE_SNAPSHOT_JOB; - this.snapshotJob = job; - } - public ClusterSnapshotLogType getType() { return type; } @@ -79,18 +61,10 @@ public String getStorageVolumeName() { return this.storageVolumeName; } - public ClusterSnapshot getSnapshot() { - return this.snapshot; - } - public String getDropSnapshotName() { return this.dropSnapshotName; } - public ClusterSnapshotJob getSnapshotJob() { - return this.snapshotJob; - } - public static ClusterSnapshotLog read(DataInput in) throws IOException { String json = Text.readString(in); return GsonUtils.GSON.fromJson(json, ClusterSnapshotLog.class); diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 index 5d629ee3ef067..cd736caedf390 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocks.g4 @@ -2957,7 +2957,7 @@ nonReserved | HASH | HISTOGRAM | HELP | HLL_UNION | HOST | HOUR | HOURS | HUB | IDENTIFIED | IMAGE | IMPERSONATE | INACTIVE | INCREMENTAL | INDEXES | INSTALL | INTEGRATION | INTEGRATIONS | INTERMEDIATE | INTERVAL | ISOLATION - | JOB | JOBS + | JOB | LABEL | LAST | LESS | LEVEL | LIST | LOCAL | LOCATION | LOGS | LOGICAL | LOW_PRIORITY | LOCK | LOCATIONS | MANUAL | MAP | MAPPING | MAPPINGS | MASKING | MATCH | MAPPINGS | MATERIALIZED | MAX | META | MIN | MINUTE | MINUTES | MODE | MODIFY | MONTH | MERGE | MINUS | NAME | NAMES | NEGATIVE | NO | NODE | NODES | NONE | NULLS | NUMBER | NUMERIC diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocksLex.g4 b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocksLex.g4 index 7f4b65ce020d5..3bebb0ca736e7 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocksLex.g4 +++ b/fe/fe-core/src/main/java/com/starrocks/sql/parser/StarRocksLex.g4 @@ -231,7 +231,6 @@ OVERWRITE: 'OVERWRITE'; IS: 'IS'; ISOLATION: 'ISOLATION'; JOB: 'JOB'; -JOBS: 'JOBS'; JOIN: 'JOIN'; JSON: 'JSON'; KEY: 'KEY'; diff --git a/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java b/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java index f3ce44fba82ab..57c785e744d68 100644 --- a/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/lake/ClusterSnapshotTest.java @@ -14,22 +14,22 @@ package com.starrocks.lake.snapshot; -import com.google.common.collect.Lists; +//import com.google.common.collect.Lists; import com.starrocks.alter.AlterTest; import com.starrocks.backup.BlobStorage; import com.starrocks.backup.Status; import com.starrocks.common.AlreadyExistsException; import com.starrocks.common.DdlException; import com.starrocks.common.MetaNotFoundException; -import com.starrocks.common.Pair; -import com.starrocks.journal.bdbje.BDBJEJournal; +//import com.starrocks.common.Pair; +//import com.starrocks.journal.bdbje.BDBJEJournal; import com.starrocks.lake.StarOSAgent; -import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState; +//import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState; import com.starrocks.persist.ClusterSnapshotLog; import com.starrocks.persist.EditLog; import com.starrocks.persist.Storage; import com.starrocks.server.GlobalStateMgr; -import com.starrocks.server.RunMode; +//import com.starrocks.server.RunMode; import com.starrocks.server.StorageVolumeMgr; import com.starrocks.sql.analyzer.AnalyzeTestUtil; import com.starrocks.sql.ast.AdminSetAutomatedSnapshotOffStmt; @@ -69,8 +69,6 @@ public class ClusterSnapshotTest { private File mockedFile = new File("/abc/abc"); - private ClusterSnapshotCheckpointContext context = new ClusterSnapshotCheckpointContext(); - @BeforeClass public static void beforeClass() throws Exception { AlterTest.beforeClass(); @@ -210,22 +208,6 @@ public void testOperationOfAutomatedSnapshot() throws DdlException { analyzeFail(turnOFFSql); setAutomatedSnapshotOn(false); analyzeSuccess(turnOFFSql); - - // 2. test getInfo - setAutomatedSnapshotOn(false); - ClusterSnapshotJob job = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().createNewAutomatedSnapshotJob(); - GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().createAutomatedSnaphot(job); - ClusterSnapshot snapshot = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot(); - Assert.assertTrue(job.getInfo() != null); - Assert.assertTrue(snapshot.getInfo() != null); - setAutomatedSnapshotOff(false); - - // 3. test network communication interface - setAutomatedSnapshotOn(false); - GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().actualUploadImageForSnapshot(true, "abc", "/abc/"); - GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().actualUploadImageForSnapshot(false, "abc", "/abc/"); - GlobalStateMgr.getCurrentState().getClusterSnapshotMgr() - .deleteSnapshotFromRemote(ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX); } @Test @@ -235,404 +217,9 @@ public void testReplayClusterSnapshotLog() { logCreate.setCreateSnapshotNamePrefix(ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX, storageVolumeName); GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().replayLog(logCreate); - // create snapshot log - ClusterSnapshotLog logSnapshot = new ClusterSnapshotLog(); - clusterSnapshotMgr.createNewAutomatedSnapshotJob().setState(ClusterSnapshotJobState.FINISHED, false); - logSnapshot.setCreateSnapshot(clusterSnapshotMgr.getAutomatedSnapshot()); - GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().replayLog(logCreate); - // drop automated snapshot request log ClusterSnapshotLog logDrop = new ClusterSnapshotLog(); logDrop.setDropSnapshot(ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX); GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().replayLog(logDrop); } - - @Test - public void testCheckpointCoordination() { - new MockUp() { - @Mock - public long getMaxJournalId() { - return 123456L; - } - }; - - new MockUp() { - @Mock - public long getImageJournalId() { - return 1L; - } - }; - - new MockUp() { - @Mock - public boolean isSharedDataMode() { - return true; - } - }; - - new MockUp() { - @Mock - public void deleteSnapshotFromRemote(String snapshotName) { - return; - } - }; - - String feImageDir = "/root/meta/"; - String starMgrImageDir = "/root/meta/starmgr/"; - - context.setJournal(new BDBJEJournal(null, ""), true); - context.setJournal(new BDBJEJournal(null, "starmgr_"), false); - - Pair> coordinateRet1 = null; - Pair> coordinateRet2 = null; - Pair createImageRet = Pair.create(true, ""); - - List flags = Lists.newArrayList(); - flags.add(new Boolean(true)); - flags.add(new Boolean(false)); - for (Boolean f : flags) { - boolean f1 = f.booleanValue(); - boolean f2 = !f1; - - // case 1: normal case, cross execution - { - new MockUp() { - @Mock - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, - String snapshotName, String localMetaDir) { - return Status.OK; - } - }; - boolean checkpointIsReady = true; - - setAutomatedSnapshotOn(false); - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(coordinateRet1.first); - Assert.assertTrue(coordinateRet2.first); - Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet1.second.second); - Assert.assertTrue(coordinateRet2.second.second); - - context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); - context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); - setAutomatedSnapshotOff(false); - } - - // case 2: normal case, ordered execution - { - new MockUp() { - @Mock - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, - String snapshotName, String localMetaDir) { - return Status.OK; - } - }; - boolean checkpointIsReady = true; - - setAutomatedSnapshotOn(false); - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet1.second.second); - context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); - - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(coordinateRet2.first); - Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet2.second.second); - context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); - - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); - setAutomatedSnapshotOff(false); - } - - // case 3: normal case, peer is super slow - { - new MockUp() { - @Mock - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, - String snapshotName, String localMetaDir) { - return Status.OK; - } - }; - boolean checkpointIsReady = true; - - setAutomatedSnapshotOn(false); - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet1.second.second); - context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); - - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(coordinateRet2.first); - Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet2.second.second); - context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); - setAutomatedSnapshotOff(false); - } - - // case 4: error case, double upload failure, cross execution - { - new MockUp() { - @Mock - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, - String snapshotName, String localMetaDir) { - return new Status(Status.ErrCode.COMMON_ERROR, "test error"); - } - }; - boolean checkpointIsReady = true; - - setAutomatedSnapshotOn(false); - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(coordinateRet1.first); - Assert.assertTrue(coordinateRet2.first); - Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet1.second.second); - Assert.assertTrue(coordinateRet2.second.second); - - context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); - context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); - - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - - // reset by f2, and begin a new round - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(!coordinateRet2.first); - Assert.assertTrue(coordinateRet2.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet2.second.second); - - new MockUp() { - @Mock - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, - String snapshotName, String localMetaDir) { - return Status.OK; - } - }; - - // retry and success - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(coordinateRet1.first); - Assert.assertTrue(coordinateRet2.first); - Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet1.second.second); - Assert.assertTrue(coordinateRet2.second.second); - - context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); - context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); - - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); - setAutomatedSnapshotOff(false); - } - - // case 5: error case, leader upload failure, peer success - { - new MockUp() { - @Mock - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, - String snapshotName, String localMetaDir) { - return new Status(Status.ErrCode.COMMON_ERROR, "test error"); - } - }; - boolean checkpointIsReady = true; - - setAutomatedSnapshotOn(false); - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet1.second.second); - context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); - - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - - new MockUp() { - @Mock - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, - String snapshotName, String localMetaDir) { - return Status.OK; - } - }; - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(coordinateRet2.first); - Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet2.second.second); - context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); - - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - - // reset by f2, and begin a new round - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(!coordinateRet2.first); - Assert.assertTrue(coordinateRet2.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet2.second.second); - - // retry and success - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(coordinateRet1.first); - Assert.assertTrue(coordinateRet2.first); - Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet1.second.second); - Assert.assertTrue(coordinateRet2.second.second); - - context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); - context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); - - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); - setAutomatedSnapshotOff(false); - } - - // case 6: error case, leader success, peer upload failure - { - new MockUp() { - @Mock - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, - String snapshotName, String localMetaDir) { - return Status.OK; - } - }; - boolean checkpointIsReady = true; - - setAutomatedSnapshotOn(false); - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet1.second.second); - context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - - new MockUp() { - @Mock - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, - String snapshotName, String localMetaDir) { - return new Status(Status.ErrCode.COMMON_ERROR, "test error"); - } - }; - - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(coordinateRet2.first); - Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet2.second.second); - context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); - - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(!coordinateRet2.first); - Assert.assertTrue(coordinateRet2.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet2.second.second); - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(!coordinateRet2.first); - Assert.assertTrue(coordinateRet2.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet2.second.second); - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(!coordinateRet2.first); - Assert.assertTrue(coordinateRet2.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet2.second.second); - - - new MockUp() { - @Mock - public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, - String snapshotName, String localMetaDir) { - return Status.OK; - } - }; - - // reset by leader, and begin a new round - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - Assert.assertTrue(!coordinateRet1.first); - Assert.assertTrue(coordinateRet1.second.first == ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(!coordinateRet1.second.second); - - // retry and success - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() == null); - coordinateRet1 = context.coordinateTwoCheckpointsIfNeeded(f1, checkpointIsReady); - coordinateRet2 = context.coordinateTwoCheckpointsIfNeeded(f2, checkpointIsReady); - Assert.assertTrue(coordinateRet1.first); - Assert.assertTrue(coordinateRet2.first); - Assert.assertTrue(coordinateRet1.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet2.second.first != ClusterSnapshotCheckpointContext.INVALID_JOURANL_ID); - Assert.assertTrue(coordinateRet1.second.second); - Assert.assertTrue(coordinateRet2.second.second); - - context.handleImageUpload(createImageRet, checkpointIsReady, f1 ? feImageDir : starMgrImageDir, f1); - context.handleImageUpload(createImageRet, checkpointIsReady, f2 ? feImageDir : starMgrImageDir, f2); - - Assert.assertTrue(GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshot() != null); - setAutomatedSnapshotOff(false); - } - } - } }