Skip to content

Commit

Permalink
[fix](vault) Fix bugs about altering storage vault name
Browse files Browse the repository at this point in the history
* fix altering storage name but not writing disk in meta-service
* check vault if existed when altering stoarge vault name
  • Loading branch information
SWJTU-ZhangLei committed Dec 20, 2024
1 parent b124c56 commit baa357d
Show file tree
Hide file tree
Showing 7 changed files with 273 additions and 28 deletions.
73 changes: 51 additions & 22 deletions cloud/src/meta-service/meta_service_resource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,9 +523,18 @@ static void set_default_vault_log_helper(const InstanceInfoPB& instance,
LOG(INFO) << vault_msg;
}

static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction> txn,
static bool vault_exist(const InstanceInfoPB& instance, const std::string& new_vault_name) {
for (auto& name : instance.storage_vault_names()) {
if (new_vault_name == name) {
return true;
}
}
return false;
}

static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
const StorageVaultPB& vault, MetaServiceCode& code,
std::string& msg) {
std::string& msg, AlterObjStoreInfoResponse* response) {
if (!vault.has_hdfs_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
Expand Down Expand Up @@ -591,6 +600,13 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tr
msg = ss.str();
return -1;
}

if (vault_exist(instance, vault.alter_name())) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already existed", vault.alter_name());
return -1;
}

new_vault.set_name(vault.alter_name());
*name_itr = vault.alter_name();
}
Expand Down Expand Up @@ -623,19 +639,22 @@ static int alter_hdfs_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tr
txn->put(vault_key, val);
LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
<< ", origin vault=" << origin_vault_info << ", new_vault=" << new_vault_info;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}

// err = txn->commit();
// if (err != TxnErrorCode::TXN_OK) {
// code = cast_as<ErrCategory::COMMIT>(err);
// msg = fmt::format("failed to commit kv txn, err={}", err);
// LOG(WARNING) << msg;
// }

DCHECK_EQ(new_vault.id(), vault_id);
response->set_storage_vault_id(new_vault.id());
response->set_storage_vault_name(new_vault.name());
return 0;
}

static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction> txn,
static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Transaction>& txn,
const StorageVaultPB& vault, MetaServiceCode& code,
std::string& msg) {
std::string& msg, AlterObjStoreInfoResponse* response) {
if (!vault.has_obj_info()) {
code = MetaServiceCode::INVALID_ARGUMENT;
std::stringstream ss;
Expand Down Expand Up @@ -708,6 +727,13 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
msg = ss.str();
return -1;
}

if (vault_exist(instance, vault.alter_name())) {
code = MetaServiceCode::ALREADY_EXISTED;
msg = fmt::format("vault_name={} already existed", vault.alter_name());
return -1;
}

new_vault.set_name(vault.alter_name());
*name_itr = vault.alter_name();
}
Expand Down Expand Up @@ -747,13 +773,16 @@ static int alter_s3_storage_vault(InstanceInfoPB& instance, std::unique_ptr<Tran
txn->put(vault_key, val);
LOG(INFO) << "put vault_id=" << vault_id << ", vault_key=" << hex(vault_key)
<< ", origin vault=" << origin_vault_info << ", new vault=" << new_vault_info;
err = txn->commit();
if (err != TxnErrorCode::TXN_OK) {
code = cast_as<ErrCategory::COMMIT>(err);
msg = fmt::format("failed to commit kv txn, err={}", err);
LOG(WARNING) << msg;
}

// err = txn->commit();
// if (err != TxnErrorCode::TXN_OK) {
// code = cast_as<ErrCategory::COMMIT>(err);
// msg = fmt::format("failed to commit kv txn, err={}", err);
// LOG(WARNING) << msg;
// }

DCHECK_EQ(new_vault.id(), vault_id);
response->set_storage_vault_id(new_vault.id());
response->set_storage_vault_name(new_vault.name());
return 0;
}

Expand Down Expand Up @@ -1100,12 +1129,12 @@ void MetaServiceImpl::alter_storage_vault(google::protobuf::RpcController* contr
break;
}
case AlterObjStoreInfoRequest::ALTER_S3_VAULT: {
alter_s3_storage_vault(instance, std::move(txn), request->vault(), code, msg);
return;
alter_s3_storage_vault(instance, txn, request->vault(), code, msg, response);
break;
}
case AlterObjStoreInfoRequest::ALTER_HDFS_VAULT: {
alter_hdfs_storage_vault(instance, std::move(txn), request->vault(), code, msg);
return;
alter_hdfs_storage_vault(instance, txn, request->vault(), code, msg, response);
break;
}
case AlterObjStoreInfoRequest::DROP_S3_VAULT:
[[fallthrough]];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public abstract class StorageVault {
public static final String LOWER_CASE_META_NAMES = "lower_case_meta_names";
public static final String META_NAMES_MAPPING = "meta_names_mapping";

public static final String VAULT_NAME = "VAULT_NAME";

public enum StorageVaultType {
UNKNOWN,
S3,
Expand All @@ -60,7 +62,6 @@ public static StorageVaultType fromString(String storageVaultTypeType) {
}
}

protected static final String VAULT_NAME = "VAULT_NAME";
protected String name;
protected StorageVaultType type;
protected String id;
Expand Down Expand Up @@ -146,7 +147,7 @@ public void setId(String id) {
break;
case S3:
if (!stmt.getProperties().containsKey(PropertyConverter.USE_PATH_STYLE)) {
stmt.getProperties().put(PropertyConverter.USE_PATH_STYLE, "true");
stmt.getProperties().put(PropertyConverter.USE_PATH_STYLE, "false");
}
CreateResourceStmt resourceStmt =
new CreateResourceStmt(false, ifNotExists, name, stmt.getProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.apache.doris.thrift.TNetworkAddress;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -88,6 +90,21 @@ public String getVaultIdByName(String name) {
return vaultId;
}

private void updateVaultNameToIdCache(String oldVaultName, String newVaultName, String vaultId) {
try {
rwLock.writeLock().lock();
String cachedVaultId = vaultNameToVaultId.get(oldVaultName);
vaultNameToVaultId.remove(oldVaultName);
Preconditions.checkArgument(!Strings.isNullOrEmpty(cachedVaultId), cachedVaultId,
"Cached vault id is null or empty");
Preconditions.checkArgument(cachedVaultId.equals(vaultId),
"Cached vault id not equal to remote storage." + cachedVaultId + " - " + vaultId);
vaultNameToVaultId.put(newVaultName, vaultId);
} finally {
rwLock.writeLock().unlock();
}
}

private Cloud.StorageVaultPB.Builder buildAlterS3VaultRequest(Map<String, String> properties, String name)
throws Exception {
Cloud.ObjectStoreInfoPB.Builder objBuilder = S3Properties.getObjStoreInfoPB(properties);
Expand Down Expand Up @@ -166,8 +183,10 @@ public void alterStorageVault(StorageVaultType type, Map<String, String> propert
LOG.warn("failed to alter storage vault response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
}
LOG.info("Succeed to alter storage vault {}, id {}, origin default vault replaced {}",
name, response.getStorageVaultId(), response.getDefaultStorageVaultReplaced());

updateVaultNameToIdCache(name, response.getStorageVaultName(), response.getStorageVaultId());
LOG.info("Succeed to alter storage vault, old name:{} new name: {} id:{}", name,
response.getStorageVaultName(), response.getStorageVaultId());

// Make BE eagerly fetch the storage vault info from Meta Service
ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.apache.doris.catalog.StorageVault;
import org.apache.doris.catalog.StorageVault.StorageVaultType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;

import com.google.common.base.Preconditions;

import java.util.Map;

/**
Expand All @@ -48,6 +51,13 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (vaultType == StorageVault.StorageVaultType.UNKNOWN) {
throw new AnalysisException("Unsupported Storage Vault type: " + type);
}

FeNameFormat.checkStorageVaultName(name);
if (properties.containsKey(StorageVault.VAULT_NAME)) {
String newName = properties.get(StorageVault.VAULT_NAME);
FeNameFormat.checkStorageVaultName(newName);
Preconditions.checkArgument(!name.equalsIgnoreCase(newName), "vault name no change");
}
Env.getCurrentEnv().getStorageVaultMgr().alterStorageVault(vaultType, properties, name);
}

Expand Down
3 changes: 3 additions & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,9 @@ message AlterObjStoreInfoResponse {
optional MetaServiceResponseStatus status = 1;
optional string storage_vault_id = 2;
optional bool default_storage_vault_replaced = 3;

// storage_vault_name maybe changed, so return new storage_vault_name
optional string storage_vault_name = 4;
}

message UpdateAkSkRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -812,11 +812,12 @@ class Suite implements GroovyInterceptable {
return randomBoolean ? "true" : "false"
}

void expectExceptionLike(Closure userFunction, String errorMessage = null) {
void expectExceptionLike(Closure userFunction, String errMsg = null) {
try {
userFunction()
throw new Exception("Expect exception, but success");
} catch (Exception e) {
if (!e.getMessage().contains(errorMessage)) {
if (!Strings.isNullOrEmpty(errMsg) && !e.getMessage().contains(errMsg)) {
throw e
}
}
Expand Down
Loading

0 comments on commit baa357d

Please sign in to comment.