Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: srlch <[email protected]>
  • Loading branch information
srlch committed Dec 30, 2024
1 parent 6fa9062 commit 0f400f3
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 704 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,8 @@
package com.starrocks.lake.snapshot;

import com.google.gson.annotations.SerializedName;
import com.starrocks.backup.BlobStorage;
import com.starrocks.backup.Status;
import com.starrocks.common.Config;
import com.starrocks.ha.FrontendNodeType;
import com.starrocks.lake.snapshot.ClusterSnapshotJob.ClusterSnapshotJobState;
import com.starrocks.persist.ClusterSnapshotLog;
import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.Storage;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.metablock.SRMetaBlockEOFException;
import com.starrocks.persist.metablock.SRMetaBlockException;
Expand All @@ -32,16 +26,10 @@
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.AdminSetAutomatedSnapshotStmt;
import com.starrocks.sql.ast.AdminSetOffAutomatedSnapshotStmt;
import com.starrocks.storagevolume.StorageVolume;
import com.starrocks.thrift.TClusterSnapshotJobsResponse;
import com.starrocks.thrift.TClusterSnapshotsResponse;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.TreeMap;

// only used for AUTOMATED snapshot for now
public class ClusterSnapshotMgr implements GsonPostProcessable {
Expand All @@ -51,13 +39,6 @@ public class ClusterSnapshotMgr implements GsonPostProcessable {

@SerializedName(value = "automatedSnapshotSvName")
private String automatedSnapshotSvName = "";
@SerializedName(value = "automatedSnapshot")
private ClusterSnapshot automatedSnapshot = null;
@SerializedName(value = "historyAutomatedSnapshotJobs")
private TreeMap<Long, ClusterSnapshotJob> historyAutomatedSnapshotJobs = new TreeMap<>();

private BlobStorage remoteStorage;
private String locationWithServiceId;

public ClusterSnapshotMgr() {}

Expand All @@ -76,80 +57,14 @@ protected void setAutomatedSnapshotOn(AdminSetAutomatedSnapshotStmt stmt, boolea
automatedSnapshotSvName = storageVolumeName;
}

public void createAutomatedSnaphot(ClusterSnapshotJob job) {
ClusterSnapshot newAutomatedSnapshot = new ClusterSnapshot(
GlobalStateMgr.getCurrentState().getNextId(), job.getSnapshotName(), job.getStorageVolumeName(),
job.getCreateTime(), job.getSuccessTime(), job.getFeJournalId(), job.getStarMgrJournalId());

ClusterSnapshotLog log = new ClusterSnapshotLog();
log.setCreateSnapshot(newAutomatedSnapshot);
GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log);

if (automatedSnapshot != null && automatedSnapshot.getSnapshotName().startsWith(AUTOMATED_NAME_PREFIX)) {
deleteSnapshotFromRemote(automatedSnapshot.getSnapshotName());
}

automatedSnapshot = newAutomatedSnapshot;

LOG.info("Finish automated cluster snapshot job successfully, job id: {}, snapshot name: {}", job.getJobId(),
job.getSnapshotName());
}

public ClusterSnapshotJob createNewAutomatedSnapshotJob() {
long createTime = System.currentTimeMillis();
long jobId = GlobalStateMgr.getCurrentState().getNextId();
String snapshotNamePrefix = ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX;
String snapshotName = snapshotNamePrefix + '_' + String.valueOf(createTime);
String storageVolumeName = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr().getAutomatedSnapshotSvName();
ClusterSnapshotJob job = new ClusterSnapshotJob(jobId, snapshotNamePrefix, snapshotName, storageVolumeName);
job.setState(ClusterSnapshotJobState.INITIALIZING, false);

addJob(job);

LOG.info("Create automated cluster snapshot job successfully, job id: {}, snapshot name: {}", jobId, snapshotName);

return job;
}

public String getAutomatedSnapshotSvName() {
return automatedSnapshotSvName;
}

public StorageVolume getAutomatedSnapshotSv() {
if (automatedSnapshotSvName.isEmpty()) {
return null;
}

return GlobalStateMgr.getCurrentState().getStorageVolumeMgr().getStorageVolumeByName(automatedSnapshotSvName);
}

public ClusterSnapshot getAutomatedSnapshot() {
return automatedSnapshot;
}

public boolean containsAutomatedSnapshot() {
return getAutomatedSnapshot() != null;
}

public boolean isAutomatedSnapshotOn() {
return automatedSnapshotSvName != null && !automatedSnapshotSvName.isEmpty();
}

public synchronized void addJob(ClusterSnapshotJob job) {
if (historyAutomatedSnapshotJobs.size() == Config.max_historical_automated_cluster_snapshot_jobs) {
historyAutomatedSnapshotJobs.pollFirstEntry();
}
historyAutomatedSnapshotJobs.put(job.getJobId(), job);
}

public String getLastFinishedAutomatedSnapshotJobName() {
ClusterSnapshot snapshot = getAutomatedSnapshot();
if (snapshot == null) {
return "";
}
return snapshot.getSnapshotName();
}

// Turn off automated snapshot, use stmt for extension in future
public void setAutomatedSnapshotOff(AdminSetOffAutomatedSnapshotStmt stmt) {
setAutomatedSnapshotOff(stmt, false);
Expand All @@ -160,102 +75,10 @@ protected void setAutomatedSnapshotOff(AdminSetOffAutomatedSnapshotStmt stmt, bo
ClusterSnapshotLog log = new ClusterSnapshotLog();
log.setDropSnapshot(AUTOMATED_NAME_PREFIX);
GlobalStateMgr.getCurrentState().getEditLog().logClusterSnapshotLog(log);

// avoid network communication when replay log
if (automatedSnapshot != null) {
deleteSnapshotFromRemote(automatedSnapshot.getSnapshotName());
}
}

// drop AUTOMATED snapshot
automatedSnapshotSvName = "";
automatedSnapshot = null;
historyAutomatedSnapshotJobs.clear();
}

public TClusterSnapshotJobsResponse getAllJobsInfo() {
TClusterSnapshotJobsResponse response = new TClusterSnapshotJobsResponse();
for (Map.Entry<Long, ClusterSnapshotJob> entry : historyAutomatedSnapshotJobs.entrySet()) {
response.addToItems(entry.getValue().getInfo());
}
return response;
}

public TClusterSnapshotsResponse getAllInfo() {
TClusterSnapshotsResponse response = new TClusterSnapshotsResponse();
if (automatedSnapshot != null) {
response.addToItems(automatedSnapshot.getInfo());
}
return response;
}

public Status actualUploadImageForSnapshot(boolean belongToGlobalStateMgr, String snapshotName, String localMetaDir) {
initRemoteStorageContext();
String curRemoteSnapshotPath = locationWithServiceId + UPLOAD_SUB_PATH + snapshotName;

Status status = Status.OK;
try {
if (belongToGlobalStateMgr) {
do {
Storage localStorage = new Storage(localMetaDir);
Storage localStorageV2 = new Storage(localMetaDir + "/v2");
long imageJournalId = localStorage.getImageJournalId();
File curFile = null;

curFile = localStorageV2.getCurrentImageFile();
status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/v2/" + curFile.getName());
if (!status.ok()) {
break;
}

curFile = localStorageV2.getCurrentChecksumFile();
status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/v2/" + curFile.getName());
if (!status.ok()) {
break;
}

curFile = localStorage.getRoleFile();
status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/" + curFile.getName());
if (!status.ok()) {
break;
}

curFile = localStorage.getVersionFile();
status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/" + curFile.getName());
if (!status.ok()) {
break;
}
} while (false);
} else {
Storage localStorage = new Storage(localMetaDir);
File curFile = localStorage.getCurrentImageFile();

status = remoteStorage.upload(curFile.getAbsolutePath(), curRemoteSnapshotPath + "/starmgr/" + curFile.getName());
}
} catch (IOException e) {
status = new Status(Status.ErrCode.COMMON_ERROR, e.getMessage());
}

return status;
}

public void deleteSnapshotFromRemote(String snapshotName) {
if (snapshotName == null || snapshotName.isEmpty()) {
return;
}

initRemoteStorageContext();
String curRemoteSnapshotPath = locationWithServiceId + UPLOAD_SUB_PATH + snapshotName + '/';
remoteStorage.delete(curRemoteSnapshotPath);
}

private void initRemoteStorageContext() {
if (this.remoteStorage == null || this.locationWithServiceId == null) {
StorageVolume sv = getAutomatedSnapshotSv();
this.remoteStorage = new BlobStorage(null, sv.getProperties(), false);
this.locationWithServiceId = sv.getLocations().get(0) + "/" +
GlobalStateMgr.getCurrentState().getStarOSAgent().getRawServiceId();
}
}

public void replayLog(ClusterSnapshotLog log) {
Expand All @@ -269,49 +92,13 @@ public void replayLog(ClusterSnapshotLog log) {
}
break;
}
case CREATE_SNAPSHOT: {
ClusterSnapshot snapshot = log.getSnapshot();
automatedSnapshot = snapshot;
break;
}
case DROP_SNAPSHOT: {
String dropSnapshotName = log.getDropSnapshotName();
if (dropSnapshotName.equals(AUTOMATED_NAME_PREFIX)) {
setAutomatedSnapshotOff(new AdminSetOffAutomatedSnapshotStmt(), true);
}
break;
}
case UPDATE_SNAPSHOT_JOB: {
FrontendNodeType feType = GlobalStateMgr.getCurrentState().getFeType();
ClusterSnapshotJob job = log.getSnapshotJob();
ClusterSnapshotJobState state = job.getState();

if ((state == ClusterSnapshotJobState.INITIALIZING || state == ClusterSnapshotJobState.SNAPSHOTING) &&
(feType == FrontendNodeType.INIT)) {
job.setState(ClusterSnapshotJobState.ERROR, true);
job.setErrMsg("Snapshot job has been failed");
}

switch (state) {
case INITIALIZING: {
addJob(job);
break;
}
case SNAPSHOTING:
case FINISHED:
case ERROR: {
if (historyAutomatedSnapshotJobs.containsKey(job.getJobId())) {
historyAutomatedSnapshotJobs.remove(job.getJobId());
historyAutomatedSnapshotJobs.put(job.getJobId(), job);
}
break;
}
default: {
LOG.warn("Invalid Cluster Snapshot Job state {}", state);
}
}
break;
}
default: {
LOG.warn("Invalid Cluster Snapshot Log Type {}", logType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,24 @@
import com.google.gson.annotations.SerializedName;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.lake.snapshot.ClusterSnapshot;
import com.starrocks.lake.snapshot.ClusterSnapshotJob;
import com.starrocks.persist.gson.GsonUtils;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class ClusterSnapshotLog implements Writable {
public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, CREATE_SNAPSHOT, DROP_SNAPSHOT, UPDATE_SNAPSHOT_JOB }
public enum ClusterSnapshotLogType { NONE, CREATE_SNAPSHOT_PREFIX, DROP_SNAPSHOT }
@SerializedName(value = "type")
private ClusterSnapshotLogType type = ClusterSnapshotLogType.NONE;
// For CREATE_SNAPSHOT_PREFIX
@SerializedName(value = "createSnapshotNamePrefix")
private String createSnapshotNamePrefix = "";
@SerializedName(value = "storageVolumeName")
private String storageVolumeName = "";
// For CREATE_SNAPSHOT
@SerializedName(value = "snapshot")
private ClusterSnapshot snapshot = null;
// For DROP_SNAPSHOT
@SerializedName(value = "dropSnapshotName")
private String dropSnapshotName = "";
// For UPDATE_SNAPSHOT_JOB
@SerializedName(value = "snapshotJob")
private ClusterSnapshotJob snapshotJob = null;

public ClusterSnapshotLog() {}

Expand All @@ -52,21 +44,11 @@ public void setCreateSnapshotNamePrefix(String createSnapshotNamePrefix, String
this.storageVolumeName = storageVolumeName;
}

public void setCreateSnapshot(ClusterSnapshot snapshot) {
this.type = ClusterSnapshotLogType.CREATE_SNAPSHOT;
this.snapshot = snapshot;
}

public void setDropSnapshot(String dropSnapshotName) {
this.type = ClusterSnapshotLogType.DROP_SNAPSHOT;
this.dropSnapshotName = dropSnapshotName;
}

public void setSnapshotJob(ClusterSnapshotJob job) {
this.type = ClusterSnapshotLogType.UPDATE_SNAPSHOT_JOB;
this.snapshotJob = job;
}

public ClusterSnapshotLogType getType() {
return type;
}
Expand All @@ -79,18 +61,10 @@ public String getStorageVolumeName() {
return this.storageVolumeName;
}

public ClusterSnapshot getSnapshot() {
return this.snapshot;
}

public String getDropSnapshotName() {
return this.dropSnapshotName;
}

public ClusterSnapshotJob getSnapshotJob() {
return this.snapshotJob;
}

public static ClusterSnapshotLog read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ClusterSnapshotLog.class);
Expand Down
Loading

0 comments on commit 0f400f3

Please sign in to comment.