diff --git a/.github/workflows/spark-integration-test.yml b/.github/workflows/spark-integration-test.yml index f73a366ad78..39bf66c5876 100644 --- a/.github/workflows/spark-integration-test.yml +++ b/.github/workflows/spark-integration-test.yml @@ -57,6 +57,7 @@ jobs: matrix: architecture: [linux/amd64] java-version: [ 8, 11, 17 ] + scala-version: [ 2.12 ] test-mode: [ embedded, deploy ] env: PLATFORM: ${{ matrix.architecture }} @@ -91,9 +92,11 @@ jobs: - name: Spark Integration Test id: integrationTest run: | - ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.3:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" - ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.4:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" - ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.5:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" + if [ "${{ matrix.scala-version }}" == "2.12" ];then + ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PscalaVersion=${{ matrix.scala-version }} :spark-connector:spark-3.3:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" + fi + ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PscalaVersion=${{ matrix.scala-version }} :spark-connector:spark-3.4:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" + ./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PscalaVersion=${{ matrix.scala-version }} :spark-connector:spark-3.5:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**" - name: Upload integrate tests reports uses: actions/upload-artifact@v3 diff --git a/api/src/main/java/com/datastrato/gravitino/SupportsMetalakes.java b/api/src/main/java/com/datastrato/gravitino/SupportsMetalakes.java index 9c08e7fe9de..c8e4c10881f 100644 --- a/api/src/main/java/com/datastrato/gravitino/SupportsMetalakes.java +++ b/api/src/main/java/com/datastrato/gravitino/SupportsMetalakes.java @@ -75,7 +75,7 @@ Metalake alterMetalake(String name, MetalakeChange... changes) * Drop a metalake with specified identifier. * * @param name The identifier of the metalake. - * @return True if the metalake was dropped, false otherwise. + * @return True if the metalake was dropped, false if the metalake does not exist. */ boolean dropMetalake(String name); } diff --git a/api/src/main/java/com/datastrato/gravitino/rel/SupportsPartitions.java b/api/src/main/java/com/datastrato/gravitino/rel/SupportsPartitions.java index 024ea0473b0..9cc3c1b97d8 100644 --- a/api/src/main/java/com/datastrato/gravitino/rel/SupportsPartitions.java +++ b/api/src/main/java/com/datastrato/gravitino/rel/SupportsPartitions.java @@ -80,7 +80,7 @@ default boolean partitionExists(String partitionName) { * Drop a partition with specified name. * * @param partitionName the name of the partition - * @return true if a partition was deleted. + * @return true if a partition was deleted successfully, false if the partition does not exist. */ boolean dropPartition(String partitionName); diff --git a/catalogs/catalog-hadoop/build.gradle.kts b/catalogs/catalog-hadoop/build.gradle.kts index ccdc0519683..09f47dd075f 100644 --- a/catalogs/catalog-hadoop/build.gradle.kts +++ b/catalogs/catalog-hadoop/build.gradle.kts @@ -34,6 +34,9 @@ dependencies { testImplementation(project(":server")) testImplementation(project(":server-common")) + testImplementation(libs.minikdc) + testImplementation(libs.hadoop3.minicluster) + testImplementation(libs.bundles.log4j) testImplementation(libs.mockito.core) testImplementation(libs.mysql.driver) diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java index f9ecb37578f..174a237d3ec 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalog.java @@ -4,10 +4,13 @@ */ package com.datastrato.gravitino.catalog.hadoop; +import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig; import com.datastrato.gravitino.connector.BaseCatalog; import com.datastrato.gravitino.connector.CatalogOperations; +import com.datastrato.gravitino.connector.ProxyPlugin; import com.datastrato.gravitino.connector.capability.Capability; import java.util.Map; +import java.util.Optional; /** * Hadoop catalog is a fileset catalog that can manage filesets on the Hadoop Compatible File @@ -31,4 +34,13 @@ protected CatalogOperations newOps(Map config) { protected Capability newCapability() { return new HadoopCatalogCapability(); } + + @Override + protected Optional newProxyPlugin(Map config) { + boolean impersonationEnabled = new KerberosConfig(config).isImpersonationEnabled(); + if (!impersonationEnabled) { + return Optional.empty(); + } + return Optional.of(new HadoopProxyPlugin()); + } } diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java index 3f077b99560..9a1dd4d4629 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -5,6 +5,7 @@ package com.datastrato.gravitino.catalog.hadoop; import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION; import com.datastrato.gravitino.Entity; import com.datastrato.gravitino.EntityStore; @@ -14,9 +15,12 @@ import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.SchemaChange; import com.datastrato.gravitino.StringIdentifier; +import com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig; +import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosClient; import com.datastrato.gravitino.connector.CatalogInfo; import com.datastrato.gravitino.connector.CatalogOperations; import com.datastrato.gravitino.connector.PropertiesMetadata; +import com.datastrato.gravitino.connector.ProxyPlugin; import com.datastrato.gravitino.connector.SupportsSchemas; import com.datastrato.gravitino.exceptions.AlreadyExistsException; import com.datastrato.gravitino.exceptions.FilesetAlreadyExistsException; @@ -36,6 +40,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import java.io.File; import java.io.IOException; import java.time.Instant; import java.util.Collections; @@ -47,6 +52,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +78,15 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem @VisibleForTesting Optional catalogStorageLocation; - // For testing only. + private Map conf; + + @SuppressWarnings("unused") + private ProxyPlugin proxyPlugin; + + private String kerberosRealm; + + private CatalogInfo catalogInfo; + HadoopCatalogOperations(EntityStore store) { this.store = store; } @@ -81,10 +95,16 @@ public HadoopCatalogOperations() { this(GravitinoEnv.getInstance().entityStore()); } + public String getKerberosRealm() { + return kerberosRealm; + } + @Override public void initialize(Map config, CatalogInfo info) throws RuntimeException { // Initialize Hadoop Configuration. + this.conf = config; this.hadoopConf = new Configuration(); + this.catalogInfo = info; Map bypassConfigs = config.entrySet().stream() .filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX)) @@ -98,9 +118,31 @@ public void initialize(Map config, CatalogInfo info) throws Runt (String) CATALOG_PROPERTIES_METADATA.getOrDefault( config, HadoopCatalogPropertiesMetadata.LOCATION); + conf.forEach(hadoopConf::set); + + initAuthentication(conf, hadoopConf); + this.catalogStorageLocation = Optional.ofNullable(catalogLocation).map(Path::new); } + private void initAuthentication(Map conf, Configuration hadoopConf) { + AuthenticationConfig config = new AuthenticationConfig(conf); + boolean enableAuth = config.isEnableAuth(); + String authType = config.getAuthType(); + + if (enableAuth && StringUtils.equalsIgnoreCase(authType, "kerberos")) { + hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos"); + UserGroupInformation.setConfiguration(hadoopConf); + try { + KerberosClient kerberosClient = new KerberosClient(conf, hadoopConf); + File keytabFile = kerberosClient.saveKeyTabFileFromUri(catalogInfo.id()); + this.kerberosRealm = kerberosClient.login(keytabFile.getAbsolutePath()); + } catch (IOException e) { + throw new RuntimeException("Failed to login with kerberos", e); + } + } + } + @Override public NameIdentifier[] listFilesets(Namespace namespace) throws NoSuchSchemaException { try { @@ -609,4 +651,8 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep FileSystem defaultFs = FileSystem.get(configuration); return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory()); } + + void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) { + this.proxyPlugin = hadoopProxyPlugin; + } } diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java index 4eb5c871625..37a5700e57c 100644 --- a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java @@ -4,6 +4,8 @@ */ package com.datastrato.gravitino.catalog.hadoop; +import com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig; +import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig; import com.datastrato.gravitino.connector.BaseCatalogPropertiesMetadata; import com.datastrato.gravitino.connector.PropertyEntry; import com.google.common.collect.ImmutableMap; @@ -29,6 +31,9 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada null, false /* hidden */)) .putAll(BASIC_CATALOG_PROPERTY_ENTRIES) + // The following two are about authentication. + .putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES) + .putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES) .build(); @Override diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java new file mode 100644 index 00000000000..471f6a165cd --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/HadoopProxyPlugin.java @@ -0,0 +1,74 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.hadoop; + +import com.datastrato.gravitino.connector.CatalogOperations; +import com.datastrato.gravitino.connector.ProxyPlugin; +import com.datastrato.gravitino.utils.Executable; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.UndeclaredThrowableException; +import java.security.Principal; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; +import java.util.Map; +import org.apache.hadoop.security.UserGroupInformation; + +public class HadoopProxyPlugin implements ProxyPlugin { + private HadoopCatalogOperations ops; + private UserGroupInformation realUser; + + public HadoopProxyPlugin() { + try { + realUser = UserGroupInformation.getCurrentUser(); + } catch (IOException ioe) { + throw new IllegalStateException("Fail to init HadoopCatalogProxyPlugin"); + } + } + + @Override + public Object doAs( + Principal principal, Executable action, Map properties) + throws Throwable { + try { + UserGroupInformation proxyUser; + + if (UserGroupInformation.isSecurityEnabled() && ops != null) { + // The Gravitino server may use multiple KDC servers. + // The http authentication use one KDC server, the Hadoop catalog may use another KDC + // server. + // The KerberosAuthenticator will remove realm of principal. + // And then we add the realm of Hadoop catalog to the user. + String proxyKerberosPrincipalName = principal.getName(); + if (!proxyKerberosPrincipalName.contains("@")) { + proxyKerberosPrincipalName = + String.format("%s@%s", proxyKerberosPrincipalName, ops.getKerberosRealm()); + } + + proxyUser = UserGroupInformation.createProxyUser(proxyKerberosPrincipalName, realUser); + } else { + proxyUser = UserGroupInformation.createProxyUser(principal.getName(), realUser); + } + + return proxyUser.doAs((PrivilegedExceptionAction) action::execute); + } catch (UndeclaredThrowableException e) { + Throwable innerException = e.getCause(); + if (innerException instanceof PrivilegedActionException) { + throw innerException.getCause(); + } else if (innerException instanceof InvocationTargetException) { + throw innerException.getCause(); + } else { + throw innerException; + } + } + } + + @Override + public void bindCatalogOperation(CatalogOperations ops) { + this.ops = ((HadoopCatalogOperations) ops); + this.ops.setProxyPlugin(this); + } +} diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/AuthenticationConfig.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/AuthenticationConfig.java new file mode 100644 index 00000000000..2128231a3f9 --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/AuthenticationConfig.java @@ -0,0 +1,69 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.hadoop.kerberos; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.config.ConfigBuilder; +import com.datastrato.gravitino.config.ConfigConstants; +import com.datastrato.gravitino.config.ConfigEntry; +import com.datastrato.gravitino.connector.PropertyEntry; +import com.google.common.collect.ImmutableMap; +import java.util.Map; + +public class AuthenticationConfig extends Config { + public static final String ENABLE_AUTH_KEY = "authentication.enable"; + public static final String AUTH_TYPE_KEY = "authentication.type"; + + public AuthenticationConfig(Map properties) { + super(false); + loadFromMap(properties, k -> true); + } + + public static final ConfigEntry ENABLE_AUTH_ENTRY = + new ConfigBuilder(ENABLE_AUTH_KEY) + .doc("Whether to enable authentication for Hadoop catalog") + .version(ConfigConstants.VERSION_0_5_1) + .booleanConf() + .createWithDefault(false); + + public static final ConfigEntry AUTH_TYPE_ENTRY = + new ConfigBuilder(AUTH_TYPE_KEY) + .doc("The type of authentication for Hadoop catalog, currently we only support kerberos") + .version(ConfigConstants.VERSION_0_5_1) + .stringConf() + .create(); + + public boolean isEnableAuth() { + return get(ENABLE_AUTH_ENTRY); + } + + public String getAuthType() { + return get(AUTH_TYPE_ENTRY); + } + + public static final Map> AUTHENTICATION_PROPERTY_ENTRIES = + new ImmutableMap.Builder>() + .put( + ENABLE_AUTH_KEY, + PropertyEntry.booleanPropertyEntry( + ENABLE_AUTH_KEY, + "Whether to enable authentication for Hadoop catalog", + false, + true, + false, + false, + false)) + .put( + AUTH_TYPE_KEY, + PropertyEntry.stringImmutablePropertyEntry( + AUTH_TYPE_KEY, + "The type of authentication for Hadoop catalog, currently we only support kerberos", + false, + null, + false, + false)) + .build(); +} diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/FetchFileUtils.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/FetchFileUtils.java new file mode 100644 index 00000000000..30f75687f6e --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/FetchFileUtils.java @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.hadoop.kerberos; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.util.Optional; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +public class FetchFileUtils { + + private FetchFileUtils() {} + + public static void fetchFileFromUri( + String fileUri, File destFile, int timeout, Configuration conf) throws IOException { + try { + URI uri = new URI(fileUri); + String scheme = Optional.ofNullable(uri.getScheme()).orElse("file"); + + switch (scheme) { + case "http": + case "https": + case "ftp": + FileUtils.copyURLToFile(uri.toURL(), destFile, timeout * 1000, timeout * 1000); + break; + + case "file": + Files.createSymbolicLink(destFile.toPath(), new File(uri.getPath()).toPath()); + break; + + case "hdfs": + FileSystem.get(conf).copyToLocalFile(new Path(uri), new Path(destFile.toURI())); + break; + + default: + throw new IllegalArgumentException( + String.format("Doesn't support the scheme %s", scheme)); + } + } catch (URISyntaxException ue) { + throw new IllegalArgumentException("The uri of file has the wrong format", ue); + } + } +} diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosClient.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosClient.java new file mode 100644 index 00000000000..67b9f2a86ae --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosClient.java @@ -0,0 +1,109 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.hadoop.kerberos; + +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class KerberosClient { + + public static final String GRAVITINO_KEYTAB_FORMAT = "keytabs/gravitino-%s-keytab"; + + private static final Logger LOG = LoggerFactory.getLogger(KerberosClient.class); + + private final ScheduledThreadPoolExecutor checkTgtExecutor; + private final Map conf; + private final Configuration hadoopConf; + + public KerberosClient(Map conf, Configuration hadoopConf) { + this.conf = conf; + this.hadoopConf = hadoopConf; + this.checkTgtExecutor = new ScheduledThreadPoolExecutor(1, getThreadFactory("check-tgt")); + } + + public String login(String keytabFilePath) throws IOException { + KerberosConfig kerberosConfig = new KerberosConfig(conf); + + // Check the principal and keytab file + String catalogPrincipal = kerberosConfig.getPrincipalName(); + Preconditions.checkArgument( + StringUtils.isNotBlank(catalogPrincipal), "The principal can't be blank"); + @SuppressWarnings("null") + List principalComponents = Splitter.on('@').splitToList(catalogPrincipal); + Preconditions.checkArgument( + principalComponents.size() == 2, "The principal has the wrong format"); + + // Login + UserGroupInformation.setConfiguration(hadoopConf); + KerberosName.resetDefaultRealm(); + UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath); + UserGroupInformation kerberosLoginUgi = UserGroupInformation.getCurrentUser(); + + // Refresh the cache if it's out of date. + int checkInterval = kerberosConfig.getCheckIntervalSec(); + checkTgtExecutor.scheduleAtFixedRate( + () -> { + try { + kerberosLoginUgi.checkTGTAndReloginFromKeytab(); + } catch (Exception e) { + LOG.error("Fail to refresh ugi token: ", e); + } + }, + checkInterval, + checkInterval, + TimeUnit.SECONDS); + + return principalComponents.get(1); + } + + public File saveKeyTabFileFromUri(Long catalogId) throws IOException { + + KerberosConfig kerberosConfig = new KerberosConfig(conf); + + String keyTabUri = kerberosConfig.getKeytab(); + Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri can't be blank"); + // TODO: Support to download the file from Kerberos HDFS + Preconditions.checkArgument( + !keyTabUri.trim().startsWith("hdfs"), "Keytab uri doesn't support to use HDFS"); + + java.io.File keytabsDir = new File("keytabs"); + if (!keytabsDir.exists()) { + // Ignore the return value, because there exists many Hadoop catalog operations making + // this directory. + keytabsDir.mkdir(); + } + + File keytabFile = new File(String.format(GRAVITINO_KEYTAB_FORMAT, catalogId)); + keytabFile.deleteOnExit(); + if (keytabFile.exists() && !keytabFile.delete()) { + throw new IllegalStateException( + String.format("Fail to delete keytab file %s", keytabFile.getAbsolutePath())); + } + + int fetchKeytabFileTimeout = kerberosConfig.getFetchTimeoutSec(); + FetchFileUtils.fetchFileFromUri(keyTabUri, keytabFile, fetchKeytabFileTimeout, hadoopConf); + + return keytabFile; + } + + private static ThreadFactory getThreadFactory(String factoryName) { + return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(factoryName + "-%d").build(); + } +} diff --git a/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosConfig.java b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosConfig.java new file mode 100644 index 00000000000..7e2686a47aa --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/java/com/datastrato/gravitino/catalog/hadoop/kerberos/KerberosConfig.java @@ -0,0 +1,128 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.hadoop.kerberos; + +import com.datastrato.gravitino.config.ConfigBuilder; +import com.datastrato.gravitino.config.ConfigConstants; +import com.datastrato.gravitino.config.ConfigEntry; +import com.datastrato.gravitino.connector.PropertyEntry; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; + +public class KerberosConfig extends AuthenticationConfig { + public static final String KEY_TAB_URI_KEY = "authentication.kerberos.keytab-uri"; + + public static final String PRINCIPAL_KEY = "authentication.kerberos.principal"; + + public static final String CHECK_INTERVAL_SEC_KEY = "authentication.kerberos.check-interval-sec"; + + public static final String FETCH_TIMEOUT_SEC_KEY = + "authentication.kerberos.keytab-fetch-timeout-sec"; + + public static final String IMPERSONATION_ENABLE_KEY = + "authentication.kerberos.impersonation-enable"; + + public static final boolean DEFAULT_IMPERSONATION_ENABLE = false; + + public static final ConfigEntry PRINCIPAL_ENTRY = + new ConfigBuilder(PRINCIPAL_KEY) + .doc("The principal of the kerberos connection") + .version(ConfigConstants.VERSION_0_5_1) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry KEYTAB_ENTRY = + new ConfigBuilder(KEY_TAB_URI_KEY) + .doc("The keytab of the kerberos connection") + .version(ConfigConstants.VERSION_0_5_1) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry CHECK_INTERVAL_SEC_ENTRY = + new ConfigBuilder(CHECK_INTERVAL_SEC_KEY) + .doc("The check interval of the kerberos connection for Hadoop catalog") + .version(ConfigConstants.VERSION_0_5_1) + .intConf() + .checkValue(value -> value > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(2); + + public static final ConfigEntry FETCH_TIMEOUT_SEC_ENTRY = + new ConfigBuilder(FETCH_TIMEOUT_SEC_KEY) + .doc("The fetch timeout of the kerberos connection") + .version(ConfigConstants.VERSION_0_5_1) + .intConf() + .checkValue(value -> value > 0, ConfigConstants.POSITIVE_NUMBER_ERROR_MSG) + .createWithDefault(2); + + public static final ConfigEntry ENABLE_IMPERSONATION_ENTRY = + new ConfigBuilder(IMPERSONATION_ENABLE_KEY) + .doc("Whether to enable impersonation for the Hadoop catalog") + .version(ConfigConstants.VERSION_0_5_1) + .booleanConf() + .createWithDefault(DEFAULT_IMPERSONATION_ENABLE); + + public KerberosConfig(Map properties) { + super(properties); + loadFromMap(properties, k -> true); + } + + public boolean isImpersonationEnabled() { + return get(ENABLE_IMPERSONATION_ENTRY); + } + + public String getPrincipalName() { + return get(PRINCIPAL_ENTRY); + } + + public String getKeytab() { + return get(KEYTAB_ENTRY); + } + + public int getCheckIntervalSec() { + return get(CHECK_INTERVAL_SEC_ENTRY); + } + + public int getFetchTimeoutSec() { + return get(FETCH_TIMEOUT_SEC_ENTRY); + } + + public static final Map> KERBEROS_PROPERTY_ENTRIES = + new ImmutableMap.Builder>() + .put( + IMPERSONATION_ENABLE_KEY, + PropertyEntry.booleanPropertyEntry( + IMPERSONATION_ENABLE_KEY, + "Whether to enable impersonation for the Hadoop catalog", + false, + true, + DEFAULT_IMPERSONATION_ENABLE, + false, + false)) + .put( + KEY_TAB_URI_KEY, + PropertyEntry.stringImmutablePropertyEntry( + KEY_TAB_URI_KEY, "The uri of key tab for the catalog", false, null, false, false)) + .put( + PRINCIPAL_KEY, + PropertyEntry.stringImmutablePropertyEntry( + PRINCIPAL_KEY, "The principal for the catalog", false, null, false, false)) + .put( + CHECK_INTERVAL_SEC_KEY, + PropertyEntry.integerOptionalPropertyEntry( + CHECK_INTERVAL_SEC_KEY, + "The interval to check validness of the principal", + true, + 60, + false)) + .put( + FETCH_TIMEOUT_SEC_KEY, + PropertyEntry.integerOptionalPropertyEntry( + FETCH_TIMEOUT_SEC_KEY, "The timeout to fetch key tab", true, 60, false)) + .build(); +} diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java index a46d1094d5a..b130ac39770 100644 --- a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java @@ -311,8 +311,8 @@ public void testDropSchema() throws IOException { Assertions.assertFalse(fs.exists(schemaPath)); // Test drop empty schema - Assertions.assertFalse(ops.dropSchema(id, true)); - Assertions.assertFalse(ops.dropSchema(id, false)); + Assertions.assertFalse(ops.dropSchema(id, true), "schema should be non-existent"); + Assertions.assertFalse(ops.dropSchema(id, false), "schema should be non-existent"); } } @@ -364,6 +364,9 @@ public void testCreateLoadAndDeleteFilesetWithLocations( } else { Assertions.assertTrue(fs.exists(expectedPath)); } + + // Test drop non-existent fileset + Assertions.assertFalse(ops.dropFileset(filesetIdent), "fileset should be non-existent"); } } diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HDFSKerberosIT.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HDFSKerberosIT.java index a8d3f657f24..a28407b7f08 100644 --- a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HDFSKerberosIT.java +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HDFSKerberosIT.java @@ -18,6 +18,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authentication.util.KerberosName; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -68,6 +69,7 @@ public static void setup() throws IOException { LOG.info("Kerberos kdc config:\n{}", content); System.setProperty("java.security.krb5.conf", krb5Path); + KerberosName.resetDefaultRealm(); System.setProperty("sun.security.krb5.debug", "true"); } @@ -88,6 +90,7 @@ public void testKerberosHDFS() throws IOException { conf.setBoolean("fs.hdfs.impl.disable.cache", true); conf.set("hadoop.security.authentication", "kerberos"); + UserGroupInformation.reset(); UserGroupInformation.setConfiguration(conf); clientUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(CLIENT_PRINCIPAL, keytabPath); PrivilegedAction action = diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java index 277652b8ed7..e79432f209d 100644 --- a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopCatalogIT.java @@ -67,8 +67,10 @@ public static void setup() throws IOException { @AfterAll public static void stop() throws IOException { + Catalog catalog = metalake.loadCatalog(catalogName); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName); client.dropMetalake(metalakeName); - if (hdfs != null) { hdfs.close(); } diff --git a/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java new file mode 100644 index 00000000000..b7cd99f809e --- /dev/null +++ b/catalogs/catalog-hadoop/src/test/java/com/datastrato/gravitino/catalog/hadoop/integration/test/HadoopUserImpersonationIT.java @@ -0,0 +1,391 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.hadoop.integration.test; + +import static com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig.AUTH_TYPE_KEY; +import static com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig.ENABLE_AUTH_KEY; +import static com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig.IMPERSONATION_ENABLE_KEY; +import static com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig.KEY_TAB_URI_KEY; +import static com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig.PRINCIPAL_KEY; +import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Schema; +import com.datastrato.gravitino.client.GravitinoMetalake; +import com.datastrato.gravitino.exceptions.FilesetAlreadyExistsException; +import com.datastrato.gravitino.file.Fileset; +import com.datastrato.gravitino.integration.test.util.AbstractIT; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.datastrato.gravitino.integration.test.util.ITUtils; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import org.apache.commons.lang3.JavaVersion; +import org.apache.commons.lang3.SystemUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.minikdc.MiniKdc; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HadoopUserImpersonationIT extends AbstractIT { + private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalogIT.class); + + public static final String metalakeName = + GravitinoITUtils.genRandomName("CatalogFilesetIT_metalake"); + public static final String catalogName = + GravitinoITUtils.genRandomName("CatalogFilesetIT_catalog"); + public static final String SCHEMA_PREFIX = "CatalogFilesetIT_schema"; + public static final String schemaName = GravitinoITUtils.genRandomName(SCHEMA_PREFIX); + private static final String provider = "hadoop"; + private static GravitinoMetalake metalake; + private static Catalog catalog; + + private static MiniKdc kdc; + private static MiniDFSCluster hdfsCluster; + private static File kdcWorkDir; + + private static File serverKeytabFile; + private static File clientKeytabFile; + private static Configuration conf; + + private static final String HOSTNAME = "localhost"; + private static final String SERVER_PRINCIPAL = "hdfs/_HOST".replace("_HOST", HOSTNAME); + private static final String CLIENT_PRINCIPAL = "anonymous"; + + private static String hdfsUri; + private static UserGroupInformation clientUGI; + + @BeforeAll + public static void setup() throws Exception { + if (!isEmbedded()) { + return; + } + + System.setProperty("sun.security.krb5.debug", "true"); + // Start MiniKDC + kdcWorkDir = new File(System.getProperty("java.io.tmpdir"), "kdc"); + kdcWorkDir.mkdir(); + Properties kdcConf = MiniKdc.createConf(); + kdc = new MiniKdc(kdcConf, kdcWorkDir); + kdc.start(); + + String krb5ConfFile = kdc.getKrb5conf().getAbsolutePath(); + System.setProperty("java.security.krb5.conf", krb5ConfFile); + + // Reload config when krb5 conf is setup + if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_1_8)) { + Class classRef; + if (System.getProperty("java.vendor").contains("IBM")) { + classRef = Class.forName("com.ibm.security.krb5.internal.Config"); + } else { + classRef = Class.forName("sun.security.krb5.Config"); + } + Method method = classRef.getDeclaredMethod("refresh"); + method.invoke(null); + } + + // Create a keytab file + serverKeytabFile = new File(kdcWorkDir, "server.keytab"); + kdc.createPrincipal(serverKeytabFile, SERVER_PRINCIPAL); + + clientKeytabFile = new File(kdcWorkDir, "client.keytab"); + kdc.createPrincipal(clientKeytabFile, CLIENT_PRINCIPAL); + + // Start MiniDFSCluster + conf = new HdfsConfiguration(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, kdcWorkDir.getAbsolutePath()); + conf.setBoolean("dfs.block.access.token.enable", true); + conf.setBoolean("dfs.webhdfs.enabled", false); + conf.set("dfs.permissions", "true"); + conf.set("hadoop.security.authentication", "kerberos"); + conf.set("dfs.namenode.kerberos.principal", SERVER_PRINCIPAL + "@" + kdc.getRealm()); + conf.set("dfs.namenode.keytab.file", serverKeytabFile.getAbsolutePath()); + conf.set("dfs.datanode.kerberos.principal", SERVER_PRINCIPAL + "@" + kdc.getRealm()); + conf.set("dfs.datanode.keytab.file", serverKeytabFile.getAbsolutePath()); + conf.set("ignore.secure.ports.for.testing", "true"); + conf.set(DFS_DATA_TRANSFER_PROTECTION_KEY, "authentication"); + conf.set("dfs.web.authentication.kerberos.principal", SERVER_PRINCIPAL); + conf.set("dfs.namenode.http-address", "hdfs://localhost:64965"); + conf.set("hadoop.proxyuser.hdfs.hosts", "*"); + conf.set("hadoop.proxyuser.hdfs.groups", "*"); + conf.set("hadoop.proxyuser.hdfs.users", "*"); + conf.set( + "hadoop.security.auth_to_local", "RULE:[2:$1@$0](.*@EXAMPLE.COM)s/.*/hadoop/\nDEFAULT"); + UserGroupInformation.setConfiguration(conf); + UserGroupInformation.loginUserFromKeytab( + SERVER_PRINCIPAL.replaceAll("_HOST", HOSTNAME) + "@" + kdc.getRealm(), + serverKeytabFile.getAbsolutePath()); + + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + hdfsCluster = builder.build(); + hdfsCluster.waitActive(); + + // Hadoop user 'anonymous' to '/anonymous' rw permission + UserGroupInformation ugiSuperUser = UserGroupInformation.getCurrentUser(); + ugiSuperUser.doAs( + (PrivilegedExceptionAction) + () -> { + try (FileSystem fs = hdfsCluster.getFileSystem()) { + Path home = new Path("/anonymous"); + fs.mkdirs(home); + FsPermission newPerm = new FsPermission(FsPermission.createImmutable((short) 0777)); + // Set permission to "/" (root) for user "anonymous" + fs.setPermission(home, newPerm); + fs.setOwner(home, "anonymous", null); + } + return null; + }); + + hdfsUri = "hdfs://" + HOSTNAME + ":" + hdfsCluster.getNameNodePort() + "/"; + + createMetalake(); + createCatalog(); + createSchema(); + } + + @AfterAll + public static void stop() { + if (client != null) { + client.dropMetalake(metalakeName); + } + + try { + closer.close(); + } catch (Exception e) { + LOG.error("Failed to close CloseableGroup", e); + } + + // Shutdown MiniDFSCluster + if (hdfsCluster != null) { + hdfsCluster.shutdown(); + } + // Stop the MiniKDC + if (kdc != null) { + kdc.stop(); + } + // Delete KDC directory + if (kdcWorkDir != null) { + kdcWorkDir.delete(); + } + + System.clearProperty("sun.security.krb5.debug"); + } + + @Test + @EnabledIf("isEmbedded") + void testListFileSystem() throws Exception { + Configuration clientConf = hdfsCluster.getFileSystem().getConf(); + clientConf.set("fs.defaultFS", hdfsUri); + clientConf.setBoolean("fs.hdfs.impl.disable.cache", true); + clientConf.set("hadoop.security.authentication", "kerberos"); + UserGroupInformation.loginUserFromKeytab( + SERVER_PRINCIPAL + "@EXAMPLE.COM", serverKeytabFile.getAbsolutePath()); + clientUGI = UserGroupInformation.getCurrentUser(); + clientUGI.doAs( + (PrivilegedAction) + () -> { + try { + FileSystem fs = FileSystem.get(clientConf); + Path path = new Path("/anonymous"); + Assertions.assertTrue(fs.exists(path)); + return null; + } catch (IOException e) { + return Assertions.fail("Failed to list file system", e); + } + }); + } + + private static void createMetalake() { + GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes(); + Assertions.assertEquals(0, gravitinoMetalakes.length); + + GravitinoMetalake createdMetalake = + client.createMetalake(metalakeName, "comment", Collections.emptyMap()); + GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName); + Assertions.assertEquals(createdMetalake, loadMetalake); + + metalake = loadMetalake; + } + + private static void createCatalog() { + ImmutableMap catalogProperties = + new ImmutableMap.Builder() + .put(ENABLE_AUTH_KEY, "true") + .put(AUTH_TYPE_KEY, "kerberos") + .put(IMPERSONATION_ENABLE_KEY, "true") + .put(PRINCIPAL_KEY, SERVER_PRINCIPAL + "@" + kdc.getRealm()) + .put(KEY_TAB_URI_KEY, serverKeytabFile.getAbsolutePath()) + .build(); + + metalake.createCatalog( + catalogName, Catalog.Type.FILESET, provider, "comment", catalogProperties); + + catalog = metalake.loadCatalog(catalogName); + } + + private static void createSchema() { + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + String comment = "comment"; + + catalog.asSchemas().createSchema(schemaName, comment, properties); + Schema loadSchema = catalog.asSchemas().loadSchema(schemaName); + Assertions.assertEquals(schemaName, loadSchema.name()); + Assertions.assertEquals(comment, loadSchema.comment()); + Assertions.assertEquals("val1", loadSchema.properties().get("key1")); + Assertions.assertEquals("val2", loadSchema.properties().get("key2")); + } + + private String storageLocation(String filesetName) { + return new Path(hdfsUri + "anonymous/", filesetName).toString(); + } + + private Fileset createFileset( + String filesetName, + String comment, + Fileset.Type type, + String storageLocation, + Map properties) { + return catalog + .asFilesetCatalog() + .createFileset( + NameIdentifier.of(metalakeName, catalogName, schemaName, filesetName), + comment, + type, + storageLocation, + properties); + } + + private boolean checkFilePathExists(String pathString) throws Exception { + Configuration clientConf = new Configuration(); + clientConf.set("fs.defaultFS", hdfsUri); + clientConf.set("hadoop.security.authentication", "kerberos"); + clientConf.set("dfs.namenode.kerberos.principal", SERVER_PRINCIPAL + "@" + kdc.getRealm()); + + UserGroupInformation.loginUserFromKeytab( + SERVER_PRINCIPAL + "@EXAMPLE.COM", serverKeytabFile.getAbsolutePath()); + clientUGI = UserGroupInformation.getCurrentUser(); + return (Boolean) + clientUGI.doAs( + (PrivilegedAction) + () -> { + try { + FileSystem fs = FileSystem.get(clientConf); + Path path = new Path(pathString); + return fs.exists(path); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + + private static boolean isEmbedded() { + String mode = + System.getProperty(ITUtils.TEST_MODE) == null + ? ITUtils.EMBEDDED_TEST_MODE + : System.getProperty(ITUtils.TEST_MODE); + + return Objects.equals(mode, ITUtils.EMBEDDED_TEST_MODE); + } + + /** + * Only runs in embedded mode as miniKDC starts after Gravitino server. in the deploy mode, + * Gravitino server runs in a separate JVM and miniKDC is not accessible as we haven't get the KDC + * ip && port. + */ + @Test + @EnabledIf("isEmbedded") + public void testCreateFileset() throws Exception { + // create fileset + String filesetName = "test_create_fileset"; + String storageLocation = storageLocation(filesetName); + Fileset fileset = + createFileset( + filesetName, + "comment", + Fileset.Type.MANAGED, + storageLocation, + ImmutableMap.of("k1", "v1")); + + Assertions.assertTrue( + checkFilePathExists(storageLocation), "storage location should be created"); + Assertions.assertNotNull(fileset, "fileset should be created"); + Assertions.assertEquals("comment", fileset.comment()); + Assertions.assertEquals(Fileset.Type.MANAGED, fileset.type()); + Assertions.assertEquals(storageLocation, fileset.storageLocation()); + Assertions.assertEquals(1, fileset.properties().size()); + Assertions.assertEquals("v1", fileset.properties().get("k1")); + + // test create a fileset that already exist + Assertions.assertThrows( + FilesetAlreadyExistsException.class, + () -> + createFileset( + filesetName, + "comment", + Fileset.Type.MANAGED, + storageLocation, + ImmutableMap.of("k1", "v1")), + "Should throw FilesetAlreadyExistsException when fileset already exists"); + + // create fileset with null storage location + String filesetName2 = "test_create_fileset_no_storage_location"; + Fileset fileset2 = + createFileset( + filesetName2, null, Fileset.Type.MANAGED, storageLocation(filesetName2), null); + Assertions.assertNotNull(fileset2, "fileset should be created"); + Assertions.assertNull(fileset2.comment(), "comment should be null"); + Assertions.assertEquals(Fileset.Type.MANAGED, fileset2.type(), "type should be MANAGED"); + Assertions.assertEquals( + storageLocation(filesetName2), + fileset2.storageLocation(), + "storage location should be created"); + Assertions.assertEquals(ImmutableMap.of(), fileset2.properties(), "properties should be empty"); + + // create fileset with null fileset name + Assertions.assertThrows( + IllegalArgumentException.class, + () -> + createFileset( + null, + "comment", + Fileset.Type.MANAGED, + storageLocation, + ImmutableMap.of("k1", "v1")), + "Should throw IllegalArgumentException when fileset name is null"); + + // create fileset with null fileset type + String filesetName3 = "test_create_fileset_no_type"; + String storageLocation3 = storageLocation(filesetName3); + Fileset fileset3 = + createFileset(filesetName3, "comment", null, storageLocation3, ImmutableMap.of("k1", "v1")); + Assertions.assertTrue( + checkFilePathExists(storageLocation3), "storage location should be created"); + Assertions.assertEquals( + Fileset.Type.MANAGED, fileset3.type(), "fileset type should be MANAGED by default"); + } +} diff --git a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java index aea817adba5..c095e292189 100644 --- a/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java +++ b/catalogs/catalog-hive/src/main/java/com/datastrato/gravitino/catalog/hive/HiveCatalogOperations.java @@ -455,7 +455,7 @@ public HiveSchema alterSchema(NameIdentifier ident, SchemaChange... changes) * * @param ident The identifier of the schema to drop. * @param cascade If set to true, drops all the tables in the schema as well. - * @return true if the schema was dropped successfully, false otherwise. + * @return true if the schema was dropped successfully, false if the schema does not exist. * @throws NonEmptySchemaException If the schema is not empty and 'cascade' is set to false. */ @Override @@ -987,10 +987,14 @@ private void doUpdateColumnType(List cols, TableChange.UpdateColumn */ @Override public boolean dropTable(NameIdentifier tableIdent) { - if (isExternalTable(tableIdent)) { - return dropHiveTable(tableIdent, false, false); - } else { - return dropHiveTable(tableIdent, true, false); + try { + if (isExternalTable(tableIdent)) { + return dropHiveTable(tableIdent, false, false); + } else { + return dropHiveTable(tableIdent, true, false); + } + } catch (NoSuchTableException e) { + return false; } } diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveSchema.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveSchema.java index 5dfc1679ba5..db067fc965a 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveSchema.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveSchema.java @@ -156,5 +156,7 @@ public void testDropSchema() { Assertions.assertTrue(catalogOperations.schemaExists(ident)); catalogOperations.dropSchema(ident, true); Assertions.assertFalse(catalogOperations.schemaExists(ident)); + Assertions.assertFalse( + catalogOperations.dropSchema(ident, true), "schema should not be exists"); } } diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTable.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTable.java index edd5fb5f972..c4b9306db65 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTable.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTable.java @@ -320,6 +320,8 @@ public void testDropHiveTable() { Assertions.assertTrue(hiveCatalogOperations.tableExists(tableIdentifier)); hiveCatalogOperations.dropTable(tableIdentifier); Assertions.assertFalse(hiveCatalogOperations.tableExists(tableIdentifier)); + Assertions.assertFalse( + hiveCatalogOperations.dropTable(tableIdentifier), "table should not be exists"); } @Test diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTableOperations.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTableOperations.java index f2423c80405..501095a0ef4 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTableOperations.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/TestHiveTableOperations.java @@ -271,7 +271,9 @@ public void testDropPartition() { Assertions.assertEquals( "Error partition format: does_not_exist_partition", exception5.getMessage()); - Assertions.assertFalse(hiveTable.supportPartitions().dropPartition("city=not_exist")); + Assertions.assertFalse( + hiveTable.supportPartitions().dropPartition("city=not_exist"), + "partition should be non-existent"); } @Test diff --git a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java index 2c59f3a365b..39d8a45aa65 100644 --- a/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java +++ b/catalogs/catalog-hive/src/test/java/com/datastrato/gravitino/catalog/hive/integration/test/CatalogHiveIT.java @@ -199,6 +199,17 @@ public static void startup() throws Exception { @AfterAll public static void stop() throws IOException { + Arrays.stream(catalog.asSchemas().listSchemas()) + .filter(ident -> !ident.name().equals("default")) + .forEach( + (ident -> { + catalog.asSchemas().dropSchema(ident.name(), true); + })); + Arrays.stream(metalake.listCatalogs()) + .forEach( + (ident -> { + metalake.dropCatalog(ident.name()); + })); if (client != null) { client.dropMetalake(metalakeName); } diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java index cd0b6197ba0..340347fa61f 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/JdbcCatalogOperations.java @@ -239,13 +239,12 @@ public JdbcSchema alterSchema(NameIdentifier ident, SchemaChange... changes) * * @param ident The identifier of the schema to drop. * @param cascade If set to true, drops all the tables in the schema as well. - * @return true if the schema was dropped successfully, false otherwise. + * @return true if the schema is successfully dropped; false if the schema does not exist. * @throws NonEmptySchemaException If the schema is not empty and 'cascade' is set to false. */ @Override public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException { - databaseOperation.delete(ident.name(), cascade); - return true; + return databaseOperation.delete(ident.name(), cascade); } /** @@ -342,8 +341,7 @@ public Table alterTable(NameIdentifier tableIdent, TableChange... changes) @Override public boolean dropTable(NameIdentifier tableIdent) { String databaseName = NameIdentifier.of(tableIdent.namespace().levels()).name(); - tableOperation.drop(databaseName, tableIdent.name()); - return true; + return tableOperation.drop(databaseName, tableIdent.name()); } /** diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/DatabaseOperation.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/DatabaseOperation.java index a58c012e8b1..db424b86809 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/DatabaseOperation.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/DatabaseOperation.java @@ -37,8 +37,9 @@ void create(String databaseName, String comment, Map properties) /** * @param databaseName The name of the database to check. * @param cascade If set to true, drops all the tables in the database as well. + * @return true if the database is successfully deleted; false if the database does not exist. */ - void delete(String databaseName, boolean cascade) throws NoSuchSchemaException; + boolean delete(String databaseName, boolean cascade); /** @return The list name of databases. */ List listDatabases(); diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java index 6c0be7351c3..7b13d4f3541 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcDatabaseOperations.java @@ -49,14 +49,15 @@ public void create(String databaseName, String comment, Map prop } @Override - public void delete(String databaseName, boolean cascade) throws NoSuchSchemaException { + public boolean delete(String databaseName, boolean cascade) { LOG.info("Beginning to drop database {}", databaseName); - try (final Connection connection = getConnection()) { - JdbcConnectorUtils.executeUpdate(connection, generateDropDatabaseSql(databaseName, cascade)); + try { + dropDatabase(databaseName, cascade); LOG.info("Finished dropping database {}", databaseName); - } catch (final SQLException se) { - throw this.exceptionMapper.toGravitinoException(se); + } catch (NoSuchSchemaException e) { + return false; } + return true; } @Override @@ -77,6 +78,14 @@ public List listDatabases() { } } + protected void dropDatabase(String databaseName, boolean cascade) { + try (final Connection connection = getConnection()) { + JdbcConnectorUtils.executeUpdate(connection, generateDropDatabaseSql(databaseName, cascade)); + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + /** * @param databaseName The name of the database. * @param comment The comment of the database. diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java index 3f7e1cbda81..126e792905d 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/JdbcTableOperations.java @@ -96,14 +96,17 @@ public void create( } @Override - public void drop(String databaseName, String tableName) throws NoSuchTableException { + public boolean drop(String databaseName, String tableName) { LOG.info("Attempting to delete table {} from database {}", tableName, databaseName); - try (Connection connection = getConnection(databaseName)) { - JdbcConnectorUtils.executeUpdate(connection, generateDropTableSql(tableName)); + try { + dropTable(databaseName, tableName); LOG.info("Deleted table {} from database {}", tableName, databaseName); - } catch (final SQLException se) { - throw this.exceptionMapper.toGravitinoException(se); + } catch (NoSuchTableException e) { + return false; + } catch (NoSuchSchemaException e) { + return false; } + return true; } @Override @@ -259,6 +262,15 @@ public void purge(String databaseName, String tableName) throws NoSuchTableExcep } } + protected void dropTable(String databaseName, String tableName) { + LOG.info("Attempting to delete table {} from database {}", tableName, databaseName); + try (Connection connection = getConnection(databaseName)) { + JdbcConnectorUtils.executeUpdate(connection, generateDropTableSql(tableName)); + } catch (final SQLException se) { + throw this.exceptionMapper.toGravitinoException(se); + } + } + protected ResultSet getTables(Connection connection) throws SQLException { final DatabaseMetaData metaData = connection.getMetaData(); String catalogName = connection.getCatalog(); diff --git a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/TableOperation.java b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/TableOperation.java index e8a4fb36dbc..8cd5bb68509 100644 --- a/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/TableOperation.java +++ b/catalogs/catalog-jdbc-common/src/main/java/com/datastrato/gravitino/catalog/jdbc/operation/TableOperation.java @@ -61,8 +61,9 @@ void create( /** * @param databaseName The name of the database. * @param tableName The name of the table. + * @return true if the table is successfully dropped; false if the table does not exist. */ - void drop(String databaseName, String tableName) throws NoSuchTableException; + boolean drop(String databaseName, String tableName); /** * @param databaseName The name of the database. diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/integration/test/TestJdbcAbstractIT.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/integration/test/TestJdbcAbstractIT.java index 31c6b2b799e..4772dcd7c59 100644 --- a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/integration/test/TestJdbcAbstractIT.java +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/integration/test/TestJdbcAbstractIT.java @@ -82,12 +82,15 @@ protected void testBaseOperation( protected static void testDropDatabase(String databaseName) { List databases; - DATABASE_OPERATIONS.delete(databaseName, true); + Assertions.assertTrue( + DATABASE_OPERATIONS.delete(databaseName, true), "database should be dropped"); Assertions.assertThrows( NoSuchSchemaException.class, () -> DATABASE_OPERATIONS.load(databaseName)); databases = DATABASE_OPERATIONS.listDatabases(); Assertions.assertFalse(databases.contains(databaseName)); + Assertions.assertFalse( + DATABASE_OPERATIONS.delete(databaseName, true), "database should be non-existent"); } protected static void assertionsTableInfo( diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java index b8fd9ad9f06..57f11669d67 100644 --- a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/SqliteDatabaseOperations.java @@ -73,12 +73,12 @@ public JdbcSchema load(String databaseName) throws NoSuchSchemaException { } @Override - public void delete(String databaseName, boolean cascade) throws NoSuchSchemaException { - delete(databaseName); + public boolean delete(String databaseName, boolean cascade) throws NoSuchSchemaException { + return delete(databaseName); } - public void delete(String databaseName) { - FileUtils.deleteQuietly(new File(dbPath + "/" + databaseName)); + public boolean delete(String databaseName) { + return FileUtils.deleteQuietly(new File(dbPath + "/" + databaseName)); } @Override diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcDatabaseOperations.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcDatabaseOperations.java index 2225548d6a1..7dcc634ea94 100644 --- a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcDatabaseOperations.java +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcDatabaseOperations.java @@ -91,11 +91,15 @@ public void testOperationDatabase() { Assertions.assertTrue(listDatabases.contains(database2)); // drop database - JDBC_DATABASE_OPERATIONS.delete(database1); + Assertions.assertTrue(JDBC_DATABASE_OPERATIONS.delete(database1), "database should be dropped"); List databases = JDBC_DATABASE_OPERATIONS.listDatabases(); Assertions.assertFalse(databases.contains(database1)); Assertions.assertNotNull(JDBC_DATABASE_OPERATIONS.load(database2)); - JDBC_DATABASE_OPERATIONS.delete(database2); + Assertions.assertTrue(JDBC_DATABASE_OPERATIONS.delete(database2), "database should be dropped"); Assertions.assertNull(JDBC_DATABASE_OPERATIONS.load(database2)); + + // drop non-existent database + Assertions.assertFalse( + JDBC_DATABASE_OPERATIONS.delete(database1), "database should be non-existent"); } } diff --git a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java index 213aa17f549..27fbc9c4a1a 100644 --- a/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java +++ b/catalogs/catalog-jdbc-common/src/test/java/com/datastrato/gravitino/catalog/jdbc/operation/TestJdbcTableOperations.java @@ -187,9 +187,14 @@ public void testOperationTable() { () -> JDBC_TABLE_OPERATIONS.alterTable(DATABASE_NAME, newName, tableChange)); // delete table. - JDBC_TABLE_OPERATIONS.drop(DATABASE_NAME, newName); + Assertions.assertTrue( + JDBC_TABLE_OPERATIONS.drop(DATABASE_NAME, newName), "table should be dropped"); allTables = JDBC_TABLE_OPERATIONS.listTables(DATABASE_NAME); Assertions.assertEquals(0, allTables.size()); + + // delete non-existent table. + Assertions.assertFalse( + JDBC_TABLE_OPERATIONS.drop(DATABASE_NAME, newName), "table should be non-existent"); } private static JdbcColumn[] generateRandomColumn(int minSize, int maxSize) { diff --git a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisExceptionConverter.java b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisExceptionConverter.java index 60bcf61b29a..7b5758bb479 100644 --- a/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisExceptionConverter.java +++ b/catalogs/catalog-jdbc-doris/src/main/java/com/datastrato/gravitino/catalog/doris/converter/DorisExceptionConverter.java @@ -25,6 +25,7 @@ public class DorisExceptionConverter extends JdbcExceptionConverter { static final int CODE_TABLE_EXISTS = 1050; static final int CODE_NO_SUCH_SCHEMA = 1049; static final int CODE_DATABASE_NOT_EXISTS = 1008; + static final int CODE_UNKNOWN_DATABASE = 1049; static final int CODE_NO_SUCH_TABLE = 1051; static final int CODE_UNAUTHORIZED = 1045; static final int CODE_NO_SUCH_COLUMN = 1054; @@ -35,6 +36,16 @@ public class DorisExceptionConverter extends JdbcExceptionConverter { private static final Pattern DATABASE_ALREADY_EXISTS_PATTERN = Pattern.compile(DATABASE_ALREADY_EXISTS_PATTERN_STRING); + private static final String DATABASE_NOT_EXISTS_PATTERN_STRING = + ".*?detailMessage = Can't drop database '.*?'; database doesn't exist"; + private static final Pattern DATABASE_NOT_EXISTS_PATTERN = + Pattern.compile(DATABASE_NOT_EXISTS_PATTERN_STRING); + + private static final String UNKNOWN_DATABASE_PATTERN_STRING = + ".*?detailMessage = Unknown database '.*?'"; + private static final Pattern UNKNOWN_DATABASE_PATTERN_PATTERN = + Pattern.compile(UNKNOWN_DATABASE_PATTERN_STRING); + private static final String TABLE_NOT_EXIST_PATTERN_STRING = ".*detailMessage = Unknown table '.*' in .*:.*"; @@ -55,6 +66,7 @@ public GravitinoRuntimeException toGravitinoException(SQLException se) { case CODE_TABLE_EXISTS: return new TableAlreadyExistsException(se, se.getMessage()); case CODE_DATABASE_NOT_EXISTS: + case CODE_UNKNOWN_DATABASE: return new NoSuchSchemaException(se, se.getMessage()); case CODE_NO_SUCH_TABLE: return new NoSuchTableException(se, se.getMessage()); @@ -76,6 +88,14 @@ static int getErrorCodeFromMessage(String message) { return CODE_DATABASE_EXISTS; } + if (DATABASE_NOT_EXISTS_PATTERN.matcher(message).matches()) { + return CODE_DATABASE_NOT_EXISTS; + } + + if (UNKNOWN_DATABASE_PATTERN_PATTERN.matcher(message).matches()) { + return CODE_UNKNOWN_DATABASE; + } + if (TABLE_NOT_EXIST_PATTERN.matcher(message).matches()) { return CODE_NO_SUCH_TABLE; } diff --git a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/converter/TestDorisExceptionConverter.java b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/converter/TestDorisExceptionConverter.java index 88d484d35b3..3ae51acdb82 100644 --- a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/converter/TestDorisExceptionConverter.java +++ b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/converter/TestDorisExceptionConverter.java @@ -16,6 +16,17 @@ public void testGetErrorCodeFromMessage() { DorisExceptionConverter.CODE_DATABASE_EXISTS, DorisExceptionConverter.getErrorCodeFromMessage(msg)); + msg = + "errCode = 2, detailMessage = Can't drop database 'default_cluster:test_schema'; database doesn't exist"; + Assertions.assertEquals( + DorisExceptionConverter.CODE_DATABASE_NOT_EXISTS, + DorisExceptionConverter.getErrorCodeFromMessage(msg)); + + msg = "errCode = 2, detailMessage = Unknown database 'default_cluster:no-exits'"; + Assertions.assertEquals( + DorisExceptionConverter.CODE_UNKNOWN_DATABASE, + DorisExceptionConverter.getErrorCodeFromMessage(msg)); + msg = "errCode = 2, detailMessage = Unknown table 'table_name' in default_cluster:database_name"; Assertions.assertEquals( diff --git a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/CatalogDorisIT.java b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/CatalogDorisIT.java index 5751ae6d817..12f4a5649c3 100644 --- a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/CatalogDorisIT.java +++ b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/CatalogDorisIT.java @@ -8,7 +8,6 @@ import com.datastrato.gravitino.Catalog; import com.datastrato.gravitino.NameIdentifier; -import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.SupportsSchemas; import com.datastrato.gravitino.catalog.jdbc.config.JdbcConfig; @@ -109,6 +108,7 @@ public void startup() throws IOException { @AfterAll public void stop() { clearTableAndSchema(); + metalake.dropCatalog(catalogName); AbstractIT.client.dropMetalake(metalakeName); } @@ -119,11 +119,6 @@ public void resetSchema() { } private void clearTableAndSchema() { - NameIdentifier[] nameIdentifiers = - catalog.asTableCatalog().listTables(Namespace.of(metalakeName, catalogName, schemaName)); - for (NameIdentifier nameIdentifier : nameIdentifiers) { - catalog.asTableCatalog().dropTable(nameIdentifier); - } catalog.asSchemas().dropSchema(schemaName, true); } @@ -262,7 +257,10 @@ void testDropDorisSchema() { null); // Try to drop a database, and cascade equals to false, it should not be allowed. - Assertions.assertFalse(catalog.asSchemas().dropSchema(schemaName, false)); + Throwable excep = + Assertions.assertThrows( + RuntimeException.class, () -> catalog.asSchemas().dropSchema(schemaName, false)); + Assertions.assertTrue(excep.getMessage().contains("the value of cascade should be true.")); // Check the database still exists catalog.asSchemas().loadSchema(schemaName); diff --git a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java index 1b6967f5c76..89c04124f99 100644 --- a/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java +++ b/catalogs/catalog-jdbc-doris/src/test/java/com/datastrato/gravitino/catalog/doris/integration/test/DorisTableOperationsIT.java @@ -6,7 +6,6 @@ import com.datastrato.gravitino.catalog.jdbc.JdbcColumn; import com.datastrato.gravitino.catalog.jdbc.JdbcTable; -import com.datastrato.gravitino.exceptions.NoSuchTableException; import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.rel.expressions.NamedReference; @@ -107,13 +106,13 @@ public void testBasicTableOperation() { Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.rename(databaseName, tableName, newName)); Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.load(databaseName, newName)); - Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.drop(databaseName, newName)); + Assertions.assertTrue(TABLE_OPERATIONS.drop(databaseName, newName), "table should be dropped"); listTables = TABLE_OPERATIONS.listTables(databaseName); Assertions.assertFalse(listTables.contains(newName)); - Assertions.assertThrows( - NoSuchTableException.class, () -> TABLE_OPERATIONS.drop(databaseName, newName)); + Assertions.assertFalse( + TABLE_OPERATIONS.drop(databaseName, newName), "table should be non-existent"); } @Test diff --git a/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/AuditCatalogMysqlIT.java b/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/AuditCatalogMysqlIT.java index d3459975463..51b8d531d76 100644 --- a/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/AuditCatalogMysqlIT.java +++ b/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/AuditCatalogMysqlIT.java @@ -75,9 +75,9 @@ public static void startIntegrationTest() throws Exception { @AfterAll public static void stopIntegrationTest() throws IOException, InterruptedException { - AbstractIT.stopIntegrationTest(); client.dropMetalake(metalakeName); mysqlService.close(); + AbstractIT.stopIntegrationTest(); } @Test @@ -92,6 +92,8 @@ public void testAuditCatalog() throws Exception { catalog = metalake.alterCatalog(catalogName, CatalogChange.setProperty("key1", "value1")); Assertions.assertEquals(expectUser, catalog.auditInfo().creator()); Assertions.assertEquals(expectUser, catalog.auditInfo().lastModifier()); + + metalake.dropCatalog(catalogName); } @Test @@ -103,6 +105,9 @@ public void testAuditSchema() throws Exception { Schema schema = catalog.asSchemas().createSchema(schemaName, null, prop); Assertions.assertEquals(expectUser, schema.auditInfo().creator()); Assertions.assertNull(schema.auditInfo().lastModifier()); + + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName); } @Test @@ -134,6 +139,12 @@ public void testAuditTable() throws Exception { TableChange.addColumn(new String[] {"col_4"}, Types.StringType.get())); Assertions.assertEquals(expectUser, table.auditInfo().creator()); Assertions.assertEquals(expectUser, table.auditInfo().lastModifier()); + + catalog + .asTableCatalog() + .dropTable(NameIdentifier.of(metalakeName, catalogName, schemaName, tableName)); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName); } private static Catalog createCatalog(String catalogName) throws SQLException { diff --git a/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java b/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java index 425e33db939..c092aa26b28 100644 --- a/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java +++ b/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java @@ -138,6 +138,7 @@ public void startup() throws IOException, SQLException { @AfterAll public void stop() { clearTableAndSchema(); + metalake.dropCatalog(catalogName); client.dropMetalake(metalakeName); mysqlService.close(); } @@ -856,7 +857,11 @@ void testDropMySQLDatabase() { // Try to drop a database, and cascade equals to false, it should not be // allowed. - catalog.asSchemas().dropSchema(schemaName, false); + Throwable excep = + Assertions.assertThrows( + RuntimeException.class, () -> catalog.asSchemas().dropSchema(schemaName, false)); + Assertions.assertTrue(excep.getMessage().contains("the value of cascade should be true.")); + // Check the database still exists catalog.asSchemas().loadSchema(schemaName); diff --git a/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/MysqlTableOperationsIT.java b/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/MysqlTableOperationsIT.java index bd0e4b04fdb..dd780a4c9ae 100644 --- a/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/MysqlTableOperationsIT.java +++ b/catalogs/catalog-jdbc-mysql/src/test/java/com/datastrato/gravitino/catalog/mysql/integration/test/MysqlTableOperationsIT.java @@ -10,7 +10,6 @@ import com.datastrato.gravitino.catalog.jdbc.JdbcColumn; import com.datastrato.gravitino.catalog.jdbc.JdbcTable; import com.datastrato.gravitino.exceptions.GravitinoRuntimeException; -import com.datastrato.gravitino.exceptions.NoSuchTableException; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.TableChange; import com.datastrato.gravitino.rel.expressions.distributions.Distributions; @@ -171,9 +170,10 @@ public void testOperationTable() { TEST_DB_NAME.toString(), newName, TableChange.deleteColumn(new String[] {newColumn.name()}, true)); - Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName)); - Assertions.assertThrows( - NoSuchTableException.class, () -> TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName)); + Assertions.assertTrue( + TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName), "table should be dropped"); + Assertions.assertFalse( + TABLE_OPERATIONS.drop(TEST_DB_NAME.toString(), newName), "table should be non-existent"); } @Test diff --git a/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java b/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java index ca8d7714279..e0a8ddb0242 100644 --- a/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java +++ b/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java @@ -119,6 +119,11 @@ public void startup() throws IOException, SQLException { @AfterAll public void stop() { clearTableAndSchema(); + NameIdentifier[] schemaIdentifiers = catalog.asSchemas().listSchemas(); + for (NameIdentifier nameIdentifier : schemaIdentifiers) { + catalog.asSchemas().dropSchema(nameIdentifier.name(), true); + } + metalake.dropCatalog(catalogName); client.dropMetalake(metalakeName); postgreSqlService.close(); } @@ -337,7 +342,7 @@ void testOperationPostgreSqlSchema() { schemaMap = Arrays.stream(nameIdentifiers).collect(Collectors.toMap(NameIdentifier::name, v -> v)); Assertions.assertFalse(schemaMap.containsKey(testSchemaName)); - Assertions.assertFalse(schemas.dropSchema("no-exits", false)); + Assertions.assertFalse(schemas.dropSchema("no_exits", false)); TableCatalog tableCatalog = catalog.asTableCatalog(); // create failed check. diff --git a/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/PostgreSqlTableOperationsIT.java b/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/PostgreSqlTableOperationsIT.java index 65e0e890020..7290b7b7940 100644 --- a/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/PostgreSqlTableOperationsIT.java +++ b/catalogs/catalog-jdbc-postgresql/src/test/java/com/datastrato/gravitino/catalog/postgresql/integration/test/PostgreSqlTableOperationsIT.java @@ -246,8 +246,7 @@ public void testOperationTable() { newName, TableChange.deleteColumn(new String[] {newColumn.name()}, true))); Assertions.assertDoesNotThrow(() -> TABLE_OPERATIONS.drop(TEST_DB_NAME, newName)); - Assertions.assertThrows( - NoSuchTableException.class, () -> TABLE_OPERATIONS.drop(TEST_DB_NAME, newName)); + Assertions.assertFalse(TABLE_OPERATIONS.drop(TEST_DB_NAME, newName)); } @Test diff --git a/catalogs/catalog-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java b/catalogs/catalog-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java index 3aec55d8812..fda4b9d152d 100644 --- a/catalogs/catalog-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java +++ b/catalogs/catalog-kafka/src/test/java/com/datastrato/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java @@ -99,6 +99,18 @@ public static void startUp() throws ExecutionException, InterruptedException { @AfterAll public static void shutdown() { + Catalog catalog = metalake.loadCatalog(CATALOG_NAME); + Arrays.stream(catalog.asSchemas().listSchemas()) + .filter(ident -> !ident.name().equals("default")) + .forEach( + (ident -> { + catalog.asSchemas().dropSchema(ident.name(), true); + })); + Arrays.stream(metalake.listCatalogs()) + .forEach( + (ident -> { + metalake.dropCatalog(ident.name()); + })); client.dropMetalake(METALAKE_NAME); if (adminClient != null) { adminClient.close(); @@ -144,6 +156,7 @@ public void testCatalog() throws ExecutionException, InterruptedException { Assertions.assertThrows( NoSuchCatalogException.class, () -> metalake.loadCatalog(catalogName)); Assertions.assertTrue(exception.getMessage().contains(catalogName)); + Assertions.assertFalse(metalake.dropCatalog(catalogName), "catalog should be non-existent"); // assert topic exists in Kafka after catalog dropped Assertions.assertFalse(adminClient.listTopics().names().get().isEmpty()); } @@ -226,8 +239,11 @@ public void testDefaultSchema() { Assertions.assertTrue(exception.getMessage().contains("Cannot alter the default schema")); // test drop default schema - boolean dropped = catalog.asSchemas().dropSchema(DEFAULT_SCHEMA_NAME, true); - Assertions.assertFalse(dropped); + Throwable excep = + Assertions.assertThrows( + RuntimeException.class, + () -> catalog.asSchemas().dropSchema(DEFAULT_SCHEMA_NAME, true)); + Assertions.assertTrue(excep.getMessage().contains("Cannot drop the default schema")); } @Test diff --git a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java index 1d52b578688..53840adcdab 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java +++ b/catalogs/catalog-lakehouse-iceberg/src/main/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java @@ -284,7 +284,7 @@ public IcebergSchema alterSchema(NameIdentifier ident, SchemaChange... changes) * * @param ident The identifier of the schema to drop. * @param cascade If set to true, drops all the tables in the schema as well. - * @return true if the schema was dropped successfully, false otherwise. + * @return true if the schema was dropped successfully, false if the schema does not exist. * @throws NonEmptySchemaException If the schema is not empty and 'cascade' is set to false. */ @Override diff --git a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java index bcadbe1758d..a2d0979ec74 100644 --- a/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java +++ b/catalogs/catalog-lakehouse-iceberg/src/test/java/com/datastrato/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java @@ -120,6 +120,7 @@ public void startup() throws Exception { @AfterAll public void stop() throws Exception { clearTableAndSchema(); + metalake.dropCatalog(catalogName); client.dropMetalake(metalakeName); spark.close(); AbstractIT.stopIntegrationTest(); @@ -159,12 +160,14 @@ private void initSparkEnv() { } private void clearTableAndSchema() { - NameIdentifier[] nameIdentifiers = - catalog.asTableCatalog().listTables(Namespace.of(metalakeName, catalogName, schemaName)); - for (NameIdentifier nameIdentifier : nameIdentifiers) { - catalog.asTableCatalog().dropTable(nameIdentifier); + if (catalog.asSchemas().schemaExists(schemaName)) { + NameIdentifier[] nameIdentifiers = + catalog.asTableCatalog().listTables(Namespace.of(metalakeName, catalogName, schemaName)); + for (NameIdentifier nameIdentifier : nameIdentifiers) { + catalog.asTableCatalog().dropTable(nameIdentifier); + } + catalog.asSchemas().dropSchema(schemaName, false); } - catalog.asSchemas().dropSchema(schemaName, false); } private void createMetalake() { @@ -324,7 +327,12 @@ void testOperationIcebergSchema() { Distributions.NONE, null)); // drop schema failed check. - Assertions.assertFalse(schemas.dropSchema(schemaIdent.name(), true)); + Throwable excep = + Assertions.assertThrows( + IllegalArgumentException.class, () -> schemas.dropSchema(schemaIdent.name(), true)); + Assertions.assertTrue( + excep.getMessage().contains("Iceberg does not support cascading delete operations.")); + Assertions.assertFalse(schemas.dropSchema(schemaIdent.name(), false)); Assertions.assertFalse(tableCatalog.dropTable(table)); icebergNamespaces = @@ -731,6 +739,7 @@ public void testAlterIcebergTable() { Table delColTable = catalog.asTableCatalog().loadTable(tableIdentifier); Assertions.assertEquals(1, delColTable.columns().length); Assertions.assertEquals(col1.name(), delColTable.columns()[0].name()); + catalog.asTableCatalog().dropTable(tableIdentifier); } @Test diff --git a/clients/client-java/src/main/java/com/datastrato/gravitino/client/BaseSchemaCatalog.java b/clients/client-java/src/main/java/com/datastrato/gravitino/client/BaseSchemaCatalog.java index 66ec720e795..16d372b4b26 100644 --- a/clients/client-java/src/main/java/com/datastrato/gravitino/client/BaseSchemaCatalog.java +++ b/clients/client-java/src/main/java/com/datastrato/gravitino/client/BaseSchemaCatalog.java @@ -28,8 +28,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * BaseSchemaCatalog is the base abstract class for all the catalog with schema. It provides the @@ -37,8 +35,6 @@ * create, load, alter and drop a schema with specified identifier. */ abstract class BaseSchemaCatalog extends CatalogDTO implements Catalog, SupportsSchemas { - private static final Logger LOG = LoggerFactory.getLogger(BaseSchemaCatalog.class); - /** The REST client to send the requests. */ protected final RESTClient restClient; /** The namespace of current catalog, which is the metalake name. */ @@ -180,24 +176,15 @@ public Schema alterSchema(String schemaName, SchemaChange... changes) */ @Override public boolean dropSchema(String schemaName, boolean cascade) throws NonEmptySchemaException { - - try { - DropResponse resp = - restClient.delete( - formatSchemaRequestPath(schemaNamespace) + "/" + RESTUtils.encodeString(schemaName), - Collections.singletonMap("cascade", String.valueOf(cascade)), - DropResponse.class, - Collections.emptyMap(), - ErrorHandlers.schemaErrorHandler()); - resp.validate(); - return resp.dropped(); - - } catch (NonEmptySchemaException e) { - throw e; - } catch (Exception e) { - LOG.warn("Failed to drop schema {}", schemaName, e); - return false; - } + DropResponse resp = + restClient.delete( + formatSchemaRequestPath(schemaNamespace) + "/" + RESTUtils.encodeString(schemaName), + Collections.singletonMap("cascade", String.valueOf(cascade)), + DropResponse.class, + Collections.emptyMap(), + ErrorHandlers.schemaErrorHandler()); + resp.validate(); + return resp.dropped(); } static String formatSchemaRequestPath(Namespace ns) { diff --git a/clients/client-java/src/main/java/com/datastrato/gravitino/client/GravitinoAdminClient.java b/clients/client-java/src/main/java/com/datastrato/gravitino/client/GravitinoAdminClient.java index 1ba99650744..aaff7ee9a56 100644 --- a/clients/client-java/src/main/java/com/datastrato/gravitino/client/GravitinoAdminClient.java +++ b/clients/client-java/src/main/java/com/datastrato/gravitino/client/GravitinoAdminClient.java @@ -42,8 +42,6 @@ import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Gravitino Client for the administrator to interact with the Gravitino API, allowing the client to @@ -52,8 +50,6 @@ *

Normal users should use {@link GravitinoClient} to connect with the Gravitino server. */ public class GravitinoAdminClient extends GravitinoClientBase implements SupportsMetalakes { - - private static final Logger LOG = LoggerFactory.getLogger(GravitinoAdminClient.class); private static final String API_METALAKES_USERS_PATH = "api/metalakes/%s/users/%s"; private static final String API_METALAKES_GROUPS_PATH = "api/metalakes/%s/groups/%s"; private static final String API_METALAKES_ROLES_PATH = "api/metalakes/%s/roles/%s"; @@ -165,25 +161,19 @@ public GravitinoMetalake alterMetalake(String name, MetalakeChange... changes) * Drops a specific Metalake using the Gravitino API. * * @param name The name of the Metalake to be dropped. - * @return True if the Metalake was successfully dropped, false otherwise. + * @return True if the Metalake was successfully dropped, false if the Metalake does not exist. */ @Override public boolean dropMetalake(String name) { checkMetalakeName(name); - try { - DropResponse resp = - restClient.delete( - API_METALAKES_IDENTIFIER_PATH + name, - DropResponse.class, - Collections.emptyMap(), - ErrorHandlers.metalakeErrorHandler()); - resp.validate(); - return resp.dropped(); - - } catch (Exception e) { - LOG.warn("Failed to drop metadata {}", name, e); - return false; - } + DropResponse resp = + restClient.delete( + API_METALAKES_IDENTIFIER_PATH + name, + DropResponse.class, + Collections.emptyMap(), + ErrorHandlers.metalakeErrorHandler()); + resp.validate(); + return resp.dropped(); } /** diff --git a/clients/client-java/src/main/java/com/datastrato/gravitino/client/GravitinoMetalake.java b/clients/client-java/src/main/java/com/datastrato/gravitino/client/GravitinoMetalake.java index f207d4d9919..5b8cd070230 100644 --- a/clients/client-java/src/main/java/com/datastrato/gravitino/client/GravitinoMetalake.java +++ b/clients/client-java/src/main/java/com/datastrato/gravitino/client/GravitinoMetalake.java @@ -28,8 +28,6 @@ import java.util.Map; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Gravitino Metalake is the top-level metadata repository for users. It contains a list of catalogs @@ -37,9 +35,6 @@ * alter and drop a catalog with specified identifier. */ public class GravitinoMetalake extends MetalakeDTO implements SupportsCatalogs { - - private static final Logger LOG = LoggerFactory.getLogger(GravitinoMetalake.class); - private static final String API_METALAKES_CATALOGS_PATH = "api/metalakes/%s/catalogs/%s"; private final RESTClient restClient; @@ -192,25 +187,19 @@ public Catalog alterCatalog(String catalogName, CatalogChange... changes) * Drop the catalog with specified identifier. * * @param catalogName the name of the catalog. - * @return true if the catalog is dropped successfully, false otherwise. + * @return true if the catalog is dropped successfully, false if the catalog does not exist. */ @Override public boolean dropCatalog(String catalogName) { - try { - DropResponse resp = - restClient.delete( - String.format(API_METALAKES_CATALOGS_PATH, this.name(), catalogName), - DropResponse.class, - Collections.emptyMap(), - ErrorHandlers.catalogErrorHandler()); - resp.validate(); - return resp.dropped(); - - } catch (Exception e) { - LOG.warn("Failed to drop catalog {}", catalogName, e); - return false; - } + DropResponse resp = + restClient.delete( + String.format(API_METALAKES_CATALOGS_PATH, this.name(), catalogName), + DropResponse.class, + Collections.emptyMap(), + ErrorHandlers.catalogErrorHandler()); + resp.validate(); + return resp.dropped(); } static class Builder extends MetalakeDTO.Builder { diff --git a/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalCatalog.java b/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalCatalog.java index 2cd54fab6ad..02080961e46 100644 --- a/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalCatalog.java +++ b/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalCatalog.java @@ -199,28 +199,20 @@ public Table alterTable(NameIdentifier ident, TableChange... changes) * Drop the table with specified identifier. * * @param ident The identifier of the table. - * @return true if the table is dropped successfully, false otherwise. + * @return true if the table is dropped successfully, false if the table does not exist. */ @Override public boolean dropTable(NameIdentifier ident) { NameIdentifier.checkTable(ident); - try { - DropResponse resp = - restClient.delete( - formatTableRequestPath(ident.namespace()) - + "/" - + RESTUtils.encodeString(ident.name()), - DropResponse.class, - Collections.emptyMap(), - ErrorHandlers.tableErrorHandler()); - resp.validate(); - return resp.dropped(); - - } catch (Exception e) { - LOG.warn("Failed to drop table {}", ident, e); - return false; - } + DropResponse resp = + restClient.delete( + formatTableRequestPath(ident.namespace()) + "/" + RESTUtils.encodeString(ident.name()), + DropResponse.class, + Collections.emptyMap(), + ErrorHandlers.tableErrorHandler()); + resp.validate(); + return resp.dropped(); } /** diff --git a/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalTable.java b/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalTable.java index a551558d0e8..26628374b46 100644 --- a/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalTable.java +++ b/clients/client-java/src/main/java/com/datastrato/gravitino/client/RelationalTable.java @@ -222,7 +222,7 @@ public Partition addPartition(Partition partition) throws PartitionAlreadyExists * Drops the partition with the given name. * * @param partitionName The name of the partition. - * @return true if the partition is dropped, false otherwise. + * @return true if the partition is dropped, false if the partition does not exist. */ @Override public boolean dropPartition(String partitionName) { diff --git a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoClient.java b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoClient.java index 7d150addbaa..98123eb338a 100644 --- a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoClient.java +++ b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoClient.java @@ -223,17 +223,19 @@ public void testAlterMetalake() throws JsonProcessingException { public void testDropMetalake() throws JsonProcessingException { DropResponse resp = new DropResponse(true); buildMockResource(Method.DELETE, "/api/metalakes/mock", null, resp, HttpStatus.SC_OK); - Assertions.assertTrue(client.dropMetalake("mock")); + Assertions.assertTrue(client.dropMetalake("mock"), "metalake should be dropped"); DropResponse resp1 = new DropResponse(false); buildMockResource(Method.DELETE, "/api/metalakes/mock", null, resp1, HttpStatus.SC_OK); - Assertions.assertFalse(client.dropMetalake("mock")); + Assertions.assertFalse(client.dropMetalake("mock"), "metalake should be non-existent"); // Test return internal error ErrorResponse errorResp = ErrorResponse.internalError("mock error"); buildMockResource( Method.DELETE, "/api/metalakes/mock", null, errorResp, HttpStatus.SC_INTERNAL_SERVER_ERROR); - Assertions.assertFalse(client.dropMetalake("mock")); + Throwable excep = + Assertions.assertThrows(RuntimeException.class, () -> client.dropMetalake("mock")); + Assertions.assertTrue(excep.getMessage().contains("mock error")); // Test illegal metalake name identifier String badName = "mock.mock"; diff --git a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java index 7cdf69a420b..1969b8fff0d 100644 --- a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java +++ b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestGravitinoMetalake.java @@ -367,13 +367,13 @@ public void testDropCatalog() throws JsonProcessingException { DropResponse resp = new DropResponse(true); buildMockResource(Method.DELETE, path, null, resp, HttpStatus.SC_OK); boolean dropped = gravitinoClient.dropCatalog(catalogName); - Assertions.assertTrue(dropped); + Assertions.assertTrue(dropped, "catalog should be dropped"); // Test return false DropResponse resp1 = new DropResponse(false); buildMockResource(Method.DELETE, path, null, resp1, HttpStatus.SC_OK); boolean dropped1 = gravitinoClient.dropCatalog(catalogName); - Assertions.assertFalse(dropped1); + Assertions.assertFalse(dropped1, "catalog should be non-existent"); } static GravitinoMetalake createMetalake(GravitinoAdminClient client, String metalakeName) diff --git a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestRelationalCatalog.java b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestRelationalCatalog.java index 83705ba5c75..7997870dcdc 100644 --- a/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestRelationalCatalog.java +++ b/clients/client-java/src/test/java/com/datastrato/gravitino/client/TestRelationalCatalog.java @@ -1073,7 +1073,10 @@ public void testDropTable() throws JsonProcessingException { ErrorResponse errorResp = ErrorResponse.internalError("internal error"); buildMockResource(Method.DELETE, tablePath, null, errorResp, SC_INTERNAL_SERVER_ERROR); - Assertions.assertFalse(catalog.asTableCatalog().dropTable(tableId)); + Throwable excep = + Assertions.assertThrows( + RuntimeException.class, () -> catalog.asTableCatalog().dropTable(tableId)); + Assertions.assertTrue(excep.getMessage().contains("internal error")); } @Test diff --git a/core/src/main/java/com/datastrato/gravitino/catalog/SupportsCatalogs.java b/core/src/main/java/com/datastrato/gravitino/catalog/SupportsCatalogs.java index c1ce8cb586f..2e68d1fae80 100644 --- a/core/src/main/java/com/datastrato/gravitino/catalog/SupportsCatalogs.java +++ b/core/src/main/java/com/datastrato/gravitino/catalog/SupportsCatalogs.java @@ -104,7 +104,7 @@ Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes) * Drop a catalog with specified identifier. * * @param ident the identifier of the catalog. - * @return True if the catalog was dropped, false otherwise. + * @return True if the catalog was dropped, false if the catalog does not exist. */ boolean dropCatalog(NameIdentifier ident); } diff --git a/core/src/main/java/com/datastrato/gravitino/metalake/SupportsMetalakes.java b/core/src/main/java/com/datastrato/gravitino/metalake/SupportsMetalakes.java index 664f60e3f06..43b0b1eb11c 100644 --- a/core/src/main/java/com/datastrato/gravitino/metalake/SupportsMetalakes.java +++ b/core/src/main/java/com/datastrato/gravitino/metalake/SupportsMetalakes.java @@ -78,7 +78,7 @@ Metalake alterMetalake(NameIdentifier ident, MetalakeChange... changes) * Drop a metalake with specified identifier. * * @param ident The identifier of the metalake. - * @return True if the metalake was dropped, false otherwise. + * @return True if the metalake was dropped, false if the metalake does not exist. */ boolean dropMetalake(NameIdentifier ident); } diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestFilesetOperationDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestFilesetOperationDispatcher.java index 068bdeb9ba4..a29df9a2765 100644 --- a/core/src/test/java/com/datastrato/gravitino/catalog/TestFilesetOperationDispatcher.java +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestFilesetOperationDispatcher.java @@ -141,5 +141,6 @@ public void testCreateAndDropFileset() { boolean dropped = filesetOperationDispatcher.dropFileset(filesetIdent1); Assertions.assertTrue(dropped); + Assertions.assertFalse(filesetOperationDispatcher.dropFileset(filesetIdent1)); } } diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestSchemaOperationDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestSchemaOperationDispatcher.java index adb8fc6c3d2..c04b8e1a72b 100644 --- a/core/src/test/java/com/datastrato/gravitino/catalog/TestSchemaOperationDispatcher.java +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestSchemaOperationDispatcher.java @@ -189,6 +189,7 @@ public void testCreateAndDropSchema() throws IOException { boolean dropped = dispatcher.dropSchema(schemaIdent, false); Assertions.assertTrue(dropped); + Assertions.assertFalse(dispatcher.dropSchema(schemaIdent, false)); // Test if entity store is failed to drop the schema entity dispatcher.createSchema(schemaIdent, "comment", props); diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestTableOperationDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestTableOperationDispatcher.java index d7cc7236e5c..83cee605232 100644 --- a/core/src/test/java/com/datastrato/gravitino/catalog/TestTableOperationDispatcher.java +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestTableOperationDispatcher.java @@ -265,6 +265,7 @@ public void testCreateAndDropTable() throws IOException { boolean dropped = tableOperationDispatcher.dropTable(tableIdent); Assertions.assertTrue(dropped); + Assertions.assertFalse(tableOperationDispatcher.dropTable(tableIdent)); // Test if the entity store is failed to drop the table entity tableOperationDispatcher.createTable(tableIdent, columns, "comment", props, new Transform[0]); diff --git a/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java b/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java index 7aeb39ec1e0..fd06db7614a 100644 --- a/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java +++ b/core/src/test/java/com/datastrato/gravitino/catalog/TestTopicOperationDispatcher.java @@ -193,6 +193,7 @@ public void testCreateAndDropTopic() throws IOException { boolean dropped = topicOperationDispatcher.dropTopic(topicIdent); Assertions.assertTrue(dropped); + Assertions.assertFalse(topicOperationDispatcher.dropTopic(topicIdent)); // Test if the entity store is failed to drop the topic entity topicOperationDispatcher.createTopic(topicIdent, "comment", null, props); diff --git a/core/src/test/java/com/datastrato/gravitino/metalake/TestMetalakeManager.java b/core/src/test/java/com/datastrato/gravitino/metalake/TestMetalakeManager.java index ad86b4709ff..dbfebfc1df7 100644 --- a/core/src/test/java/com/datastrato/gravitino/metalake/TestMetalakeManager.java +++ b/core/src/test/java/com/datastrato/gravitino/metalake/TestMetalakeManager.java @@ -160,12 +160,12 @@ public void testDropMetalake() { testProperties(props, metalake.properties()); boolean dropped = metalakeManager.dropMetalake(ident); - Assertions.assertTrue(dropped); + Assertions.assertTrue(dropped, "metalake should be dropped"); // Test with NoSuchMetalakeException NameIdentifier ident1 = NameIdentifier.of("test42"); boolean dropped1 = metalakeManager.dropMetalake(ident1); - Assertions.assertFalse(dropped1); + Assertions.assertFalse(dropped1, "metalake should be non-existent"); } private void testProperties(Map expectedProps, Map testProps) { diff --git a/docs/spark-connector/spark-catalog-iceberg.md b/docs/spark-connector/spark-catalog-iceberg.md index 70bede73cb8..34b5d756390 100644 --- a/docs/spark-connector/spark-catalog-iceberg.md +++ b/docs/spark-connector/spark-catalog-iceberg.md @@ -12,10 +12,8 @@ The Gravitino Spark connector offers the capability to read and write Iceberg ta #### Support DML and DDL operations: -- `CREATE TABLE` - -Supports basic create table clause including table schema, properties, partition, does not support distribution and sort orders. - +- `CREATE TABLE` + - `Supports basic create table clause including table schema, properties, partition, does not support distribution and sort orders.` - `DROP TABLE` - `ALTER TABLE` - `INSERT INTO&OVERWRITE` @@ -23,20 +21,26 @@ Supports basic create table clause including table schema, properties, partition - `MERGE INOT` - `DELETE FROM` - `UPDATE` +- `CALL` +- `TIME TRAVEL QUERY` +- `DESCRIBE TABLE` #### Not supported operations: - View operations. -- Branching and tagging operations. -- Spark procedures. +- Metadata tables, like: + - `{iceberg_catalog}.{iceberg_database}.{iceberg_table}.snapshots` - Other Iceberg extension SQL, like: - `ALTER TABLE prod.db.sample ADD PARTITION FIELD xx` - `ALTER TABLE ... WRITE ORDERED BY` + - `ALTER TABLE prod.db.sample CREATE BRANCH branchName` + - `ALTER TABLE prod.db.sample CREATE TAG tagName` +- AtomicCreateTableAsSelect&AtomicReplaceTableAsSelect ## SQL example ```sql -// Suppose iceberg_a is the Iceberg catalog name managed by Gravitino +-- Suppose iceberg_a is the Iceberg catalog name managed by Gravitino USE iceberg_a; CREATE DATABASE IF NOT EXISTS mydatabase; @@ -57,7 +61,7 @@ VALUES (2, 'Bob', 'Marketing', TIMESTAMP '2021-02-01 10:30:00'), (3, 'Charlie', 'Sales', TIMESTAMP '2021-03-01 08:45:00'); -SELECT * FROM employee WHERE date(hire_date) = '2021-01-01' +SELECT * FROM employee WHERE date(hire_date) = '2021-01-01'; UPDATE employee SET department = 'Jenny' WHERE id = 1; @@ -74,18 +78,57 @@ USING (SELECT 4 as id, 'David' as name, 'Engineering' as department, TIMESTAMP ' ON employee.id = new_employee.id WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT *; + +-- Suppose that the first snapshotId of employee is 1L and the second snapshotId is 2L +-- Rollback the snapshot for iceberg_a.mydatabase.employee to 1L +CALL iceberg_a.system.rollback_to_snapshot('iceberg_a.mydatabase.employee', 1); +-- Set the snapshot for iceberg_a.mydatabase.employee to 2L +CALL iceberg_a.system.set_current_snapshot('iceberg_a.mydatabase.employee', 2); + +-- Suppose that the commit timestamp of the first snapshot is older than '2024-05-27 01:01:00' +-- Time travel to '2024-05-27 01:01:00' +SELECT * FROM employee TIMESTAMP AS OF '2024-05-27 01:01:00'; +SELECT * FROM employee FOR SYSTEM_TIME AS OF '2024-05-27 01:01:00'; + +-- Show the details of employee, such as schema and reserved properties(like location, current-snapshot-id, provider, format, format-version, etc) +DESC EXTENDED employee; ``` -## Catalog properties +For more details about `CALL`, please refer to the [Spark Procedures description](https://iceberg.apache.org/docs/latest/spark-procedures/#spark-procedures) in Iceberg official document. \ + +## Iceberg backend-catalog support +- HiveCatalog +- JdbcCatalog +- RESTCatalog + +### Catalog properties Gravitino spark connector will transform below property names which are defined in catalog properties to Spark Iceberg connector configuration. -| Gravitino catalog property name | Spark Iceberg connector configuration | Description | Since Version | -|---------------------------------|---------------------------------------|---------------------------|---------------| -| `catalog-backend` | `type` | Catalog backend type | 0.5.0 | -| `uri` | `uri` | Catalog backend uri | 0.5.0 | -| `warehouse` | `warehouse` | Catalog backend warehouse | 0.5.0 | -| `jdbc-user` | `jdbc.user` | JDBC user name | 0.5.0 | -| `jdbc-password` | `jdbc.password` | JDBC password | 0.5.0 | +#### HiveCatalog + +| Gravitino catalog property name | Spark Iceberg connector configuration | Default Value | Required | Description | Since Version | +|---------------------------------|---------------------------------------|---------------|----------|---------------------------|---------------| +| `catalog-backend` | `type` | `hive` | Yes | Catalog backend type | 0.5.0 | +| `uri` | `uri` | (none) | Yes | Catalog backend uri | 0.5.0 | +| `warehouse` | `warehouse` | (none) | Yes | Catalog backend warehouse | 0.5.0 | + +#### JdbcCatalog + +| Gravitino catalog property name | Spark Iceberg connector configuration | Default Value | Required | Description | Since Version | +|---------------------------------|---------------------------------------|---------------|----------|---------------------------|---------------| +| `catalog-backend` | `type` | `jdbc` | Yes | Catalog backend type | 0.5.0 | +| `uri` | `uri` | (none) | Yes | Catalog backend uri | 0.5.0 | +| `warehouse` | `warehouse` | (none) | Yes | Catalog backend warehouse | 0.5.0 | +| `jdbc-user` | `jdbc.user` | (none) | Yes | JDBC user name | 0.5.0 | +| `jdbc-password` | `jdbc.password` | (none) | Yes | JDBC password | 0.5.0 | + +#### RESTCatalog + +| Gravitino catalog property name | Spark Iceberg connector configuration | Default Value | Required | Description | Since Version | +|---------------------------------|---------------------------------------|---------------|----------|---------------------------|---------------| +| `catalog-backend` | `type` | `rest` | Yes | Catalog backend type | 0.5.1 | +| `uri` | `uri` | (none) | Yes | Catalog backend uri | 0.5.1 | +| `warehouse` | `warehouse` | (none) | No | Catalog backend warehouse | 0.5.1 | Gravitino catalog property names with the prefix `spark.bypass.` are passed to Spark Iceberg connector. For example, using `spark.bypass.io-impl` to pass the `io-impl` to the Spark Iceberg connector. diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f4054616463..47e6ddbbf92 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -117,6 +117,7 @@ hadoop3-hdfs = { group = "org.apache.hadoop", name = "hadoop-hdfs", version.ref hadoop3-common = { group = "org.apache.hadoop", name = "hadoop-common", version.ref = "hadoop3"} hadoop3-client = { group = "org.apache.hadoop", name = "hadoop-client", version.ref = "hadoop3"} hadoop3-mapreduce-client-core = { group = "org.apache.hadoop", name = "hadoop-mapreduce-client-core", version.ref = "hadoop3"} +hadoop3-minicluster = { group = "org.apache.hadoop", name = "hadoop-minicluster", version.ref = "hadoop-minikdc"} airlift-units = { group = "io.airlift", name = "units", version.ref = "airlift-units"} airlift-log = { group = "io.airlift", name = "log", version.ref = "airlift-log"} httpclient5 = { group = "org.apache.httpcomponents.client5", name = "httpclient5", version.ref = "httpclient5" } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/AuditIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/AuditIT.java index d99d2d154e5..298d0c1a8c6 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/AuditIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/AuditIT.java @@ -46,6 +46,7 @@ public void testAuditMetalake() throws Exception { metaLake = client.alterMetalake(metalakeAuditName, changes); Assertions.assertEquals(expectUser, metaLake.auditInfo().creator()); Assertions.assertEquals(expectUser, metaLake.auditInfo().lastModifier()); - client.dropMetalake(newName); + Assertions.assertTrue(client.dropMetalake(newName), "metaLake should be dropped"); + Assertions.assertFalse(client.dropMetalake(newName), "metalake should be non-existent"); } } diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/CatalogIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/CatalogIT.java index e11f9252ef4..b7508c5dbe9 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/CatalogIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/CatalogIT.java @@ -89,6 +89,23 @@ public void testCreateCatalog() { metalake.dropCatalog(catalogName); } + @Test + public void testDropCatalog() { + String catalogName = GravitinoITUtils.genRandomName("catalog"); + Assertions.assertFalse(metalake.catalogExists(catalogName)); + + Map properties = Maps.newHashMap(); + properties.put("metastore.uris", hmsUri); + Catalog catalog = + metalake.createCatalog( + catalogName, Catalog.Type.RELATIONAL, "hive", "catalog comment", properties); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + Assertions.assertEquals(catalogName, catalog.name()); + + Assertions.assertTrue(metalake.dropCatalog(catalogName), "catalog should be dropped"); + Assertions.assertFalse(metalake.dropCatalog(catalogName), "catalog should be non-existent"); + } + @Test public void testCreateCatalogWithoutProperties() { String catalogName = GravitinoITUtils.genRandomName("catalog"); @@ -159,6 +176,9 @@ public void testListCatalogsInfo() { } Assertions.assertTrue(ArrayUtils.contains(catalogs, relCatalog)); Assertions.assertTrue(ArrayUtils.contains(catalogs, fileCatalog)); + + metalake.dropCatalog(relCatalogName); + metalake.dropCatalog(fileCatalogName); } private void assertCatalogEquals(Catalog catalog1, Catalog catalog2) { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/MetalakeIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/MetalakeIT.java index 33eda2fbc17..4096a59d799 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/MetalakeIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/MetalakeIT.java @@ -167,7 +167,7 @@ public void testCreateMetalakeWithChinese() { public void testDropMetalakes() { GravitinoMetalake metalakeA = client.createMetalake(metalakeNameA, "metalake A comment", Collections.emptyMap()); - assertTrue(client.dropMetalake(metalakeA.name())); + assertTrue(client.dropMetalake(metalakeA.name()), "metaLake should be dropped"); NameIdentifier id = NameIdentifier.of(metalakeNameA); assertThrows( NoSuchMetalakeException.class, @@ -176,7 +176,7 @@ public void testDropMetalakes() { }); // Metalake does not exist, so we return false - assertFalse(client.dropMetalake(metalakeA.name())); + assertFalse(client.dropMetalake(metalakeA.name()), "metalake should be non-existent"); } public void dropMetalakes() { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/filesystem/hadoop/GravitinoVirtualFileSystemIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/filesystem/hadoop/GravitinoVirtualFileSystemIT.java index 5d6735c4fe6..fa1050341d2 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/filesystem/hadoop/GravitinoVirtualFileSystemIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/client/filesystem/hadoop/GravitinoVirtualFileSystemIT.java @@ -75,6 +75,9 @@ public static void startUp() { @AfterAll public static void tearDown() throws IOException { + Catalog catalog = metalake.loadCatalog(catalogName); + catalog.asSchemas().dropSchema(schemaName, true); + metalake.dropCatalog(catalogName); client.dropMetalake(metalakeName); if (client != null) { diff --git a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java index 2574e6c7a54..d314fa056cf 100644 --- a/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java +++ b/integration-test/src/test/java/com/datastrato/gravitino/integration/test/trino/TrinoConnectorIT.java @@ -1081,8 +1081,11 @@ void testIcebergCatalogCreatedByGravitino() { } // Do not support the cascade drop - success = catalog.asSchemas().dropSchema(schemaName, true); - Assertions.assertFalse(success); + Throwable excep = + Assertions.assertThrows( + IllegalArgumentException.class, () -> catalog.asSchemas().dropSchema(schemaName, true)); + Assertions.assertTrue( + excep.getMessage().contains("Iceberg does not support cascading delete operations.")); final String sql3 = String.format("show schemas in %s like '%s'", catalogName, schemaName); success = checkTrinoHasLoaded(sql3, 30); if (!success) { diff --git a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestFilesetOperations.java b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestFilesetOperations.java index f998b4ce47c..96ebd196c2c 100644 --- a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestFilesetOperations.java +++ b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestFilesetOperations.java @@ -391,7 +391,7 @@ public void testDropFileset() { .accept("application/vnd.gravitino.v1+json") .delete(); - Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp.getStatus()); + Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp1.getStatus()); DropResponse dropResponse1 = resp1.readEntity(DropResponse.class); Assertions.assertEquals(0, dropResponse1.getCode()); diff --git a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestMetalakeOperations.java b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestMetalakeOperations.java index fd2e9a6bafe..a00afd62166 100644 --- a/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestMetalakeOperations.java +++ b/server/src/test/java/com/datastrato/gravitino/server/web/rest/TestMetalakeOperations.java @@ -346,6 +346,18 @@ public void testDropMetalake() { boolean dropped = dropResponse.dropped(); Assertions.assertTrue(dropped); + // Test when failed to drop metalake + when(metalakeManager.dropMetalake(any())).thenReturn(false); + Response resp2 = + target("/metalakes/test") + .request(MediaType.APPLICATION_JSON_TYPE) + .accept("application/vnd.gravitino.v1+json") + .delete(); + Assertions.assertEquals(Response.Status.OK.getStatusCode(), resp2.getStatus()); + DropResponse dropResponse2 = resp2.readEntity(DropResponse.class); + Assertions.assertEquals(0, dropResponse2.getCode()); + Assertions.assertFalse(dropResponse2.dropped()); + // Test throw an exception when deleting tenant. doThrow(new RuntimeException("Internal error")).when(metalakeManager).dropMetalake(any()); diff --git a/settings.gradle.kts b/settings.gradle.kts index d645efd0b02..7dc36f9a3b2 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -8,6 +8,9 @@ plugins { rootProject.name = "gravitino" +val scalaVersion: String = gradle.startParameter.projectProperties["scalaVersion"]?.toString() + ?: settings.extra["defaultScalaVersion"].toString() + include("api", "common", "core", "meta", "server", "integration-test", "server-common") include("catalogs:bundled-catalog") include("catalogs:catalog-hive") @@ -28,9 +31,14 @@ include( "clients:client-python" ) include("trino-connector") -include("spark-connector:spark-common", "spark-connector:spark-3.3", "spark-connector:spark-runtime-3.3", "spark-connector:spark-3.4", "spark-connector:spark-runtime-3.4", "spark-connector:spark-3.5", "spark-connector:spark-runtime-3.5") -project(":spark-connector:spark-3.3").projectDir = file("spark-connector/v3.3/spark") -project(":spark-connector:spark-runtime-3.3").projectDir = file("spark-connector/v3.3/spark-runtime") +include("spark-connector:spark-common") +// kyuubi hive connector doesn't support 2.13 for Spark3.3 +if (scalaVersion == "2.12") { + include("spark-connector:spark-3.3", "spark-connector:spark-runtime-3.3") + project(":spark-connector:spark-3.3").projectDir = file("spark-connector/v3.3/spark") + project(":spark-connector:spark-runtime-3.3").projectDir = file("spark-connector/v3.3/spark-runtime") +} +include("spark-connector:spark-3.4", "spark-connector:spark-runtime-3.4", "spark-connector:spark-3.5", "spark-connector:spark-runtime-3.5") project(":spark-connector:spark-3.4").projectDir = file("spark-connector/v3.4/spark") project(":spark-connector:spark-runtime-3.4").projectDir = file("spark-connector/v3.4/spark-runtime") project(":spark-connector:spark-3.5").projectDir = file("spark-connector/v3.5/spark") diff --git a/spark-connector/spark-common/build.gradle.kts b/spark-connector/spark-common/build.gradle.kts index 3b485942f28..804516b822b 100644 --- a/spark-connector/spark-common/build.gradle.kts +++ b/spark-connector/spark-common/build.gradle.kts @@ -17,7 +17,8 @@ val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extr val sparkVersion: String = libs.versions.spark33.get() val sparkMajorVersion: String = sparkVersion.substringBeforeLast(".") val icebergVersion: String = libs.versions.iceberg4spark.get() -val kyuubiVersion: String = libs.versions.kyuubi4spark33.get() +// kyuubi hive connector for Spark 3.3 doesn't support scala 2.13 +val kyuubiVersion: String = libs.versions.kyuubi4spark34.get() val scalaJava8CompatVersion: String = libs.versions.scala.java.compat.get() val scalaCollectionCompatVersion: String = libs.versions.scala.collection.compat.get() diff --git a/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java index 033ac6d0f3a..8be80454745 100644 --- a/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java +++ b/spark-connector/spark-common/src/main/java/com/datastrato/gravitino/spark/connector/SparkTransformConverter.java @@ -239,6 +239,7 @@ public org.apache.spark.sql.connector.expressions.Transform[] toSparkTransform( return sparkTransforms.toArray(new org.apache.spark.sql.connector.expressions.Transform[0]); } + @SuppressWarnings("deprecation") private static Distribution toGravitinoDistribution(BucketTransform bucketTransform) { int bucketNum = (Integer) bucketTransform.numBuckets().value(); Expression[] expressions = @@ -249,6 +250,7 @@ private static Distribution toGravitinoDistribution(BucketTransform bucketTransf } // Spark datasourceV2 doesn't support specify sort order direction, use ASCENDING as default. + @SuppressWarnings("deprecation") private static Pair toGravitinoDistributionAndSortOrders( SortedBucketTransform sortedBucketTransform) { int bucketNum = (Integer) sortedBucketTransform.numBuckets().value(); diff --git a/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java index c75dcf91d84..5618b3795ac 100644 --- a/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java +++ b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/TestSparkTransformConverter.java @@ -71,6 +71,7 @@ void testPartition(boolean supportsBucketPartition) { }); } + @SuppressWarnings("deprecation") @ParameterizedTest @ValueSource(booleans = {false, true}) void testGravitinoToSparkDistributionWithoutSortOrder(boolean supportsBucketPartition) { @@ -180,6 +181,7 @@ void testSparkToGravitinoDistributionWithSortOrder(boolean supportsBucketPartiti } } + @SuppressWarnings("deprecation") @ParameterizedTest @ValueSource(booleans = {false, true}) void testGravitinoToSparkDistributionWithSortOrder(boolean supportsBucketPartition) { diff --git a/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java index f4251eb200c..09b5d653d16 100644 --- a/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java +++ b/spark-connector/spark-common/src/test/java/com/datastrato/gravitino/spark/connector/integration/test/SparkCommonIT.java @@ -140,9 +140,14 @@ void init() { @AfterAll void cleanUp() { - sql("USE " + getCatalogName()); - getDatabases() - .forEach(database -> sql(String.format("DROP DATABASE IF EXISTS %s CASCADE", database))); + getDatabases().stream() + .filter(database -> !database.equals("default")) + .forEach( + database -> { + sql("USE " + database); + listTableNames().forEach(table -> dropTableIfExists(table)); + dropDatabaseIfExists(database); + }); } @Test diff --git a/web/LICENSE b/web/LICENSE index 28d3111e34a..0a172274bd9 100644 --- a/web/LICENSE +++ b/web/LICENSE @@ -207,6 +207,12 @@ Apache Zeppelin ./web/WEB-INF/web.xml + Apache Hive + ./web/src/lib/icons/svg/hive.svg + + Apache Doris + ./web/src/lib/icons/svg/doris.svg + This product bundles various third-party components also under the MIT license. @@ -215,6 +221,3 @@ ./src/types/axios.d.ts ./src/lib/enums/httpEnum.ts ./src/lib/utils/index.js (parts of) - - Third party SIL Open Font License v1.1 (OFL-1.1) - (SIL OPEN FONT LICENSE Version 1.1) The Alata font family (https://github.com/SorkinType/Alata) diff --git a/web/src/app/login/page.js b/web/src/app/login/page.js index 9ed077ddfe3..867c2274c45 100644 --- a/web/src/app/login/page.js +++ b/web/src/app/login/page.js @@ -7,7 +7,7 @@ import { useRouter } from 'next/navigation' import Image from 'next/image' -import { Alata } from 'next/font/google' +import { Roboto } from 'next/font/google' import { Box, Card, Grid, Button, CardContent, Typography, TextField, FormControl, FormHelperText } from '@mui/material' @@ -19,7 +19,7 @@ import { yupResolver } from '@hookform/resolvers/yup' import { useAppDispatch } from '@/lib/hooks/useStore' import { loginAction } from '@/lib/store/auth' -const fonts = Alata({ subsets: ['latin'], weight: ['400'], display: 'swap' }) +const fonts = Roboto({ subsets: ['latin'], weight: ['400'], display: 'swap' }) const defaultValues = { grant_type: 'client_credentials', diff --git a/web/src/app/rootLayout/AppBar.js b/web/src/app/rootLayout/AppBar.js index eadc84de1eb..04d3a961fa5 100644 --- a/web/src/app/rootLayout/AppBar.js +++ b/web/src/app/rootLayout/AppBar.js @@ -7,7 +7,7 @@ import Link from 'next/link' import Image from 'next/image' -import { Alata } from 'next/font/google' +import { Roboto } from 'next/font/google' import { useState, useEffect } from 'react' @@ -32,7 +32,7 @@ import { useRouter } from 'next/navigation' import { useAppSelector, useAppDispatch } from '@/lib/hooks/useStore' import { fetchMetalakes } from '@/lib/store/metalakes' -const fonts = Alata({ subsets: ['latin'], weight: ['400'], display: 'swap' }) +const fonts = Roboto({ subsets: ['latin'], weight: ['400'], display: 'swap' }) const AppBar = () => { const searchParams = useSearchParams() diff --git a/web/src/lib/icons/svg/doris.svg b/web/src/lib/icons/svg/doris.svg index 7bd05ce60a8..9d877bf29d2 100644 --- a/web/src/lib/icons/svg/doris.svg +++ b/web/src/lib/icons/svg/doris.svg @@ -1,4 +1,25 @@ + + diff --git a/web/src/lib/icons/svg/hive.svg b/web/src/lib/icons/svg/hive.svg index 031ad38977a..1bef16ff263 100644 --- a/web/src/lib/icons/svg/hive.svg +++ b/web/src/lib/icons/svg/hive.svg @@ -1,4 +1,25 @@ + +