Skip to content

Commit

Permalink
[fix](storage vault) Fix some bug and add more regression test
Browse files Browse the repository at this point in the history
* fix `set_as_default` not work
* fix alter name lack of checking
  • Loading branch information
SWJTU-ZhangLei committed Jan 6, 2025
1 parent a539e22 commit 9da130e
Show file tree
Hide file tree
Showing 20 changed files with 1,198 additions and 1,070 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,14 @@ public void createStorageVaultResource(CreateStorageVaultStmt stmt) throws Excep
ALTER_BE_SYNC_THREAD_POOL.execute(() -> alterSyncVaultTask());
}

public void refreshVaultMap(Map<String, String> vaultMap) {
rwLock.writeLock().lock();
vaultNameToVaultId = vaultMap;
rwLock.writeLock().unlock();
public void refreshVaultMap(Map<String, String> vaultMap, Pair<String, String> defaultVault) {
try {
rwLock.writeLock().lock();
vaultNameToVaultId = vaultMap;
defaultVaultInfo = defaultVault;
} finally {
rwLock.writeLock().unlock();
}
}

public String getVaultIdByName(String vaultName) {
Expand All @@ -105,7 +109,19 @@ public String getVaultNameById(String vaultId) {
}
}

private void updateVaultNameToIdCache(String oldVaultName, String newVaultName, String vaultId) {
private void addStorageVaultToCache(String vaultName, String vaultId, boolean defaultVault) {
try {
rwLock.writeLock().lock();
vaultNameToVaultId.put(vaultName, vaultId);
if (defaultVault) {
defaultVaultInfo = Pair.of(vaultName, vaultId);
}
} finally {
rwLock.writeLock().unlock();
}
}

private void updateStorageVaultCache(String oldVaultName, String newVaultName, String vaultId) {
try {
rwLock.writeLock().lock();
String cachedVaultId = vaultNameToVaultId.get(oldVaultName);
Expand All @@ -120,6 +136,15 @@ private void updateVaultNameToIdCache(String oldVaultName, String newVaultName,
}
}

private void updateDefaultStorageVaultCache(Pair<String, String> newDefaultVaultInfo) {
try {
rwLock.writeLock().lock();
defaultVaultInfo = newDefaultVaultInfo;
} finally {
rwLock.writeLock().unlock();
}
}

private Cloud.StorageVaultPB.Builder buildAlterS3VaultRequest(Map<String, String> properties, String name)
throws Exception {
Cloud.ObjectStoreInfoPB.Builder objBuilder = S3Properties.getObjStoreInfoPB(properties);
Expand Down Expand Up @@ -183,7 +208,9 @@ public void alterStorageVault(StorageVaultType type, Map<String, String> propert
request.setOp(Operation.ALTER_S3_VAULT);
} else if (type == StorageVaultType.HDFS) {
properties.keySet().stream()
.filter(HdfsStorageVault.FORBID_CHECK_PROPERTIES::contains)
.filter(key -> HdfsStorageVault.FORBID_CHECK_PROPERTIES.contains(key)
|| key.toLowerCase().contains(S3Properties.S3_PREFIX)
|| key.toLowerCase().contains(S3Properties.PROVIDER))
.findAny()
.ifPresent(key -> {
throw new IllegalArgumentException("Alter property " + key + " is not allowed.");
Expand All @@ -200,7 +227,7 @@ public void alterStorageVault(StorageVaultType type, Map<String, String> propert
}

if (request.hasVault() && request.getVault().hasAlterName()) {
updateVaultNameToIdCache(name, request.getVault().getAlterName(), response.getStorageVaultId());
updateStorageVaultCache(name, request.getVault().getAlterName(), response.getStorageVaultId());
LOG.info("Succeed to alter storage vault, old name:{} new name: {} id:{}", name,
request.getVault().getAlterName(), response.getStorageVaultId());
}
Expand Down Expand Up @@ -241,7 +268,7 @@ public void setDefaultStorageVault(String vaultName) throws DdlException {
throw new DdlException(e.getMessage());
}
LOG.info("succeed to set {} as default vault, vault id {}", vaultName, vaultId);
setDefaultStorageVault(Pair.of(vaultName, vaultId));
updateDefaultStorageVaultCache(Pair.of(vaultName, vaultId));
}

public void unsetDefaultStorageVault() throws DdlException {
Expand All @@ -258,29 +285,16 @@ public void unsetDefaultStorageVault() throws DdlException {
LOG.warn("failed to unset default storage vault");
throw new DdlException(e.getMessage());
}
defaultVaultInfo = null;
updateDefaultStorageVaultCache(null);
}

public void setDefaultStorageVault(Pair<String, String> vaultInfo) {
try {
rwLock.writeLock().lock();
defaultVaultInfo = vaultInfo;
} finally {
rwLock.writeLock().unlock();
}
}

public Pair getDefaultStorageVaultInfo() {
Pair vault = null;
public Pair<String, String> getDefaultStorageVault() {
try {
rwLock.readLock().lock();
if (defaultVaultInfo != null) {
vault = defaultVaultInfo;
}
return defaultVaultInfo;
} finally {
rwLock.readLock().unlock();
}
return vault;
}

@VisibleForTesting
Expand All @@ -304,12 +318,11 @@ public void createHdfsVault(StorageVault vault) throws Exception {
vault.getName(), response);
throw new DdlException(response.getStatus().getMsg());
}
rwLock.writeLock().lock();
vaultNameToVaultId.put(vault.getName(), response.getStorageVaultId());
rwLock.writeLock().unlock();

LOG.info("Succeed to create hdfs vault {}, id {}, origin default vault replaced {}",
vault.getName(), response.getStorageVaultId(),
response.getDefaultStorageVaultReplaced());
addStorageVaultToCache(vault.getName(), response.getStorageVaultId(), vault.setAsDefault());
} catch (RpcException e) {
LOG.warn("failed to alter storage vault due to RpcException: {}", e);
throw new DdlException(e.getMessage());
Expand Down Expand Up @@ -354,11 +367,10 @@ public void createS3Vault(StorageVault vault) throws Exception {
LOG.warn("failed to alter storage vault response: {} ", response);
throw new DdlException(response.getStatus().getMsg());
}
rwLock.writeLock().lock();
vaultNameToVaultId.put(vault.getName(), response.getStorageVaultId());
rwLock.writeLock().unlock();

LOG.info("Succeed to create s3 vault {}, id {}, origin default vault replaced {}",
vault.getName(), response.getStorageVaultId(), response.getDefaultStorageVaultReplaced());
addStorageVaultToCache(vault.getName(), response.getStorageVaultId(), vault.setAsDefault());
} catch (RpcException e) {
LOG.warn("failed to alter storage vault due to RpcException: {}", e);
throw new DdlException(e.getMessage());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ protected void runAfterCatalogReady() {
String id = response.getInstance().getResourceIds(i);
vaultMap.put(name, id);
}
Env.getCurrentEnv().getStorageVaultMgr().refreshVaultMap(vaultMap);
Env.getCurrentEnv().getStorageVaultMgr().setDefaultStorageVault(
Env.getCurrentEnv().getStorageVaultMgr().refreshVaultMap(vaultMap,
Pair.of(response.getInstance().getDefaultStorageVaultName(),
response.getInstance().getDefaultStorageVaultId()));
response.getInstance().getDefaultStorageVaultId()));
}
} catch (Exception e) {
LOG.warn("get instance from ms exception", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2742,7 +2742,7 @@ private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserEx
String storageVaultId = null;
// If user does not specify one storage vault then FE would use the default vault
if (Strings.isNullOrEmpty(storageVaultName)) {
Pair<String, String> info = env.getStorageVaultMgr().getDefaultStorageVaultInfo();
Pair<String, String> info = env.getStorageVaultMgr().getDefaultStorageVault();
if (info != null) {
storageVaultName = info.first;
storageVaultId = info.second;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
if (properties.containsKey(StorageVault.VAULT_NAME)) {
String newName = properties.get(StorageVault.VAULT_NAME);
FeNameFormat.checkStorageVaultName(newName);
Preconditions.checkArgument(!name.equalsIgnoreCase(newName), "vault name no change");
Preconditions.checkArgument(!name.equalsIgnoreCase(newName), "Vault name has not been changed");
}
Env.getCurrentEnv().getStorageVaultMgr().alterStorageVault(vaultType, properties, name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ public void testSetDefaultVault() throws Exception {
private HashSet<String> existed = new HashSet<>();

@Mock
public Pair getDefaultStorageVaultInfo() {
public Pair getDefaultStorageVault() {
return defaultVaultInfo;
}

Expand Down Expand Up @@ -210,8 +210,8 @@ public Pair getDefaultStorageVaultInfo() {
"type", "hdfs",
"path", "abs/"));
mgr.createHdfsVault(vault);
Assertions.assertTrue(mgr.getDefaultStorageVaultInfo() == null);
Assertions.assertTrue(mgr.getDefaultStorageVault() == null);
mgr.setDefaultStorageVault(new SetDefaultStorageVaultStmt(vault.getName()));
Assertions.assertTrue(mgr.getDefaultStorageVaultInfo().first.equals(vault.getName()));
Assertions.assertTrue(mgr.getDefaultStorageVault().first.equals(vault.getName()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,10 @@ class Suite implements GroovyInterceptable {
return "hdfs://" + host + ":" + port;
}

String getHmsUser() {
return context.config.otherConfigs.get("extHiveHmsUser")
}

String getHdfsUser() {
String hdfsUser = context.config.otherConfigs.get("hdfsUser")
return hdfsUser
Expand Down
105 changes: 77 additions & 28 deletions regression-test/suites/vault_p0/alter/test_alter_hdfs_vault.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -27,72 +27,121 @@ suite("test_alter_hdfs_vault", "nonConcurrent") {
return
}


def randomStr = UUID.randomUUID().toString().replace("-", "")
def hdfsVaultName = "hdfs_" + randomStr

sql """
CREATE STORAGE VAULT IF NOT EXISTS ${suiteName}
CREATE STORAGE VAULT IF NOT EXISTS ${hdfsVaultName}
PROPERTIES (
"type"="HDFS",
"fs.defaultFS"="${getHmsHdfsFs()}",
"path_prefix" = "${suiteName}",
"hadoop.username" = "hadoop"
"path_prefix" = "${hdfsVaultName}",
"hadoop.username" = "${getHmsUser()}"
);
"""

expectExceptionLike({
sql """
ALTER STORAGE VAULT ${suiteName}
ALTER STORAGE VAULT ${hdfsVaultName}
PROPERTIES (
"type"="hdfs",
"path_prefix" = "${suiteName}"
"path_prefix" = "error_path"
);
"""
}, "Alter property")

expectExceptionLike({
sql """
ALTER STORAGE VAULT ${suiteName}
ALTER STORAGE VAULT ${hdfsVaultName}
PROPERTIES (
"type"="hdfs",
"fs.defaultFS" = "not_exist_vault"
"fs.defaultFS" = "error_fs"
);
"""
}, "Alter property")

def vaultName = suiteName
String properties;
expectExceptionLike({
sql """
ALTER STORAGE VAULT ${hdfsVaultName}
PROPERTIES (
"type"="hdfs",
"s3.endpoint" = "error_endpoint"
);
"""
}, "Alter property")

def vaultInfos = try_sql """show storage vault"""
expectExceptionLike({
sql """
ALTER STORAGE VAULT ${hdfsVaultName}
PROPERTIES (
"type"="hdfs",
"s3.region" = "error_region"
);
"""
}, "Alter property")

for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
if (name.equals(vaultName)) {
properties = vaultInfos[i][2]
}
}
expectExceptionLike({
sql """
ALTER STORAGE VAULT ${hdfsVaultName}
PROPERTIES (
"type"="hdfs",
"s3.access_key" = "error_access_key"
);
"""
}, "Alter property")

expectExceptionLike({
sql """
ALTER STORAGE VAULT ${hdfsVaultName}
PROPERTIES (
"type"="hdfs",
"provider" = "error_provider"
);
"""
}, "Alter property")

sql """
CREATE TABLE ${hdfsVaultName} (
C_CUSTKEY INTEGER NOT NULL,
C_NAME INTEGER NOT NULL
)
DUPLICATE KEY(C_CUSTKEY, C_NAME)
DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"storage_vault_name" = ${hdfsVaultName}
)
"""
sql """ insert into ${hdfsVaultName} values(1, 1); """
sql """ sync;"""
def result = sql """ select * from ${hdfsVaultName}; """
assertEquals(result.size(), 1);

def newVaultName = suiteName + "_new";
def newHdfsVaultName = hdfsVaultName + "_new";
sql """
ALTER STORAGE VAULT ${vaultName}
ALTER STORAGE VAULT ${hdfsVaultName}
PROPERTIES (
"type"="hdfs",
"VAULT_NAME" = "${newVaultName}",
"VAULT_NAME" = "${newHdfsVaultName}",
"hadoop.username" = "hdfs"
);
"""

vaultInfos = sql """ SHOW STORAGE VAULT; """
boolean exist = false

def vaultInfos = sql """ SHOW STORAGE VAULT; """
boolean found = false
for (int i = 0; i < vaultInfos.size(); i++) {
def name = vaultInfos[i][0]
logger.info("name is ${name}, info ${vaultInfos[i]}")
if (name.equals(vaultName)) {
logger.info("info ${vaultInfos[i]}")
if (name.equals(hdfsVaultName)) {
assertTrue(false);
}
if (name.equals(newVaultName)) {
if (name.equals(newHdfsVaultName)) {
assertTrue(vaultInfos[i][2].contains("""user: "hdfs" """))
exist = true
found = true
}
}
assertTrue(exist)
expectExceptionLike({sql """insert into ${suiteName} values("2", "2");"""}, "")
assertTrue(found)

expectExceptionLike({sql """insert into ${hdfsVaultName} values("2", "2");"""}, "open file failed")
}
Loading

0 comments on commit 9da130e

Please sign in to comment.