Skip to content

Commit

Permalink
Merge branch 'main' into fix_hive_image_timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
yuqi1129 authored May 30, 2024
2 parents 3663982 + 7da0ef9 commit c8e69fc
Show file tree
Hide file tree
Showing 76 changed files with 1,333 additions and 194 deletions.
9 changes: 6 additions & 3 deletions .github/workflows/spark-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ jobs:
matrix:
architecture: [linux/amd64]
java-version: [ 8, 11, 17 ]
scala-version: [ 2.12 ]
test-mode: [ embedded, deploy ]
env:
PLATFORM: ${{ matrix.architecture }}
Expand Down Expand Up @@ -91,9 +92,11 @@ jobs:
- name: Spark Integration Test
id: integrationTest
run: |
./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.3:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**"
./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.4:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**"
./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} :spark-connector:spark-3.5:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**"
if [ "${{ matrix.scala-version }}" == "2.12" ];then
./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PscalaVersion=${{ matrix.scala-version }} :spark-connector:spark-3.3:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**"
fi
./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PscalaVersion=${{ matrix.scala-version }} :spark-connector:spark-3.4:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**"
./gradlew --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PscalaVersion=${{ matrix.scala-version }} :spark-connector:spark-3.5:test --tests "com.datastrato.gravitino.spark.connector.integration.test.**"
- name: Upload integrate tests reports
uses: actions/upload-artifact@v3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Metalake alterMetalake(String name, MetalakeChange... changes)
* Drop a metalake with specified identifier.
*
* @param name The identifier of the metalake.
* @return True if the metalake was dropped, false otherwise.
* @return True if the metalake was dropped, false if the metalake does not exist.
*/
boolean dropMetalake(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ default boolean partitionExists(String partitionName) {
* Drop a partition with specified name.
*
* @param partitionName the name of the partition
* @return true if a partition was deleted.
* @return true if a partition was deleted successfully, false if the partition does not exist.
*/
boolean dropPartition(String partitionName);

Expand Down
3 changes: 3 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ dependencies {
testImplementation(project(":server"))
testImplementation(project(":server-common"))

testImplementation(libs.minikdc)
testImplementation(libs.hadoop3.minicluster)

testImplementation(libs.bundles.log4j)
testImplementation(libs.mockito.core)
testImplementation(libs.mysql.driver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
*/
package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig;
import com.datastrato.gravitino.connector.BaseCatalog;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.connector.capability.Capability;
import java.util.Map;
import java.util.Optional;

/**
* Hadoop catalog is a fileset catalog that can manage filesets on the Hadoop Compatible File
Expand All @@ -31,4 +34,13 @@ protected CatalogOperations newOps(Map<String, String> config) {
protected Capability newCapability() {
return new HadoopCatalogCapability();
}

@Override
protected Optional<ProxyPlugin> newProxyPlugin(Map<String, String> config) {
boolean impersonationEnabled = new KerberosConfig(config).isImpersonationEnabled();
if (!impersonationEnabled) {
return Optional.empty();
}
return Optional.of(new HadoopProxyPlugin());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package com.datastrato.gravitino.catalog.hadoop;

import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;

import com.datastrato.gravitino.Entity;
import com.datastrato.gravitino.EntityStore;
Expand All @@ -14,9 +15,12 @@
import com.datastrato.gravitino.Schema;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.StringIdentifier;
import com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig;
import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosClient;
import com.datastrato.gravitino.connector.CatalogInfo;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.PropertiesMetadata;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.connector.SupportsSchemas;
import com.datastrato.gravitino.exceptions.AlreadyExistsException;
import com.datastrato.gravitino.exceptions.FilesetAlreadyExistsException;
Expand All @@ -36,6 +40,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -47,6 +52,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -72,7 +78,15 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem

@VisibleForTesting Optional<Path> catalogStorageLocation;

// For testing only.
private Map<String, String> conf;

@SuppressWarnings("unused")
private ProxyPlugin proxyPlugin;

private String kerberosRealm;

private CatalogInfo catalogInfo;

HadoopCatalogOperations(EntityStore store) {
this.store = store;
}
Expand All @@ -81,10 +95,16 @@ public HadoopCatalogOperations() {
this(GravitinoEnv.getInstance().entityStore());
}

public String getKerberosRealm() {
return kerberosRealm;
}

@Override
public void initialize(Map<String, String> config, CatalogInfo info) throws RuntimeException {
// Initialize Hadoop Configuration.
this.conf = config;
this.hadoopConf = new Configuration();
this.catalogInfo = info;
Map<String, String> bypassConfigs =
config.entrySet().stream()
.filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX))
Expand All @@ -98,9 +118,31 @@ public void initialize(Map<String, String> config, CatalogInfo info) throws Runt
(String)
CATALOG_PROPERTIES_METADATA.getOrDefault(
config, HadoopCatalogPropertiesMetadata.LOCATION);
conf.forEach(hadoopConf::set);

initAuthentication(conf, hadoopConf);

this.catalogStorageLocation = Optional.ofNullable(catalogLocation).map(Path::new);
}

private void initAuthentication(Map<String, String> conf, Configuration hadoopConf) {
AuthenticationConfig config = new AuthenticationConfig(conf);
boolean enableAuth = config.isEnableAuth();
String authType = config.getAuthType();

if (enableAuth && StringUtils.equalsIgnoreCase(authType, "kerberos")) {
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
UserGroupInformation.setConfiguration(hadoopConf);
try {
KerberosClient kerberosClient = new KerberosClient(conf, hadoopConf);
File keytabFile = kerberosClient.saveKeyTabFileFromUri(catalogInfo.id());
this.kerberosRealm = kerberosClient.login(keytabFile.getAbsolutePath());
} catch (IOException e) {
throw new RuntimeException("Failed to login with kerberos", e);
}
}
}

@Override
public NameIdentifier[] listFilesets(Namespace namespace) throws NoSuchSchemaException {
try {
Expand Down Expand Up @@ -609,4 +651,8 @@ static Path formalizePath(Path path, Configuration configuration) throws IOExcep
FileSystem defaultFs = FileSystem.get(configuration);
return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory());
}

void setProxyPlugin(HadoopProxyPlugin hadoopProxyPlugin) {
this.proxyPlugin = hadoopProxyPlugin;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.catalog.hadoop.kerberos.AuthenticationConfig;
import com.datastrato.gravitino.catalog.hadoop.kerberos.KerberosConfig;
import com.datastrato.gravitino.connector.BaseCatalogPropertiesMetadata;
import com.datastrato.gravitino.connector.PropertyEntry;
import com.google.common.collect.ImmutableMap;
Expand All @@ -29,6 +31,9 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
null,
false /* hidden */))
.putAll(BASIC_CATALOG_PROPERTY_ENTRIES)
// The following two are about authentication.
.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
.build();

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.hadoop;

import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.ProxyPlugin;
import com.datastrato.gravitino.utils.Executable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.Principal;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import org.apache.hadoop.security.UserGroupInformation;

public class HadoopProxyPlugin implements ProxyPlugin {
private HadoopCatalogOperations ops;
private UserGroupInformation realUser;

public HadoopProxyPlugin() {
try {
realUser = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
throw new IllegalStateException("Fail to init HadoopCatalogProxyPlugin");
}
}

@Override
public Object doAs(
Principal principal, Executable<Object, Exception> action, Map<String, String> properties)
throws Throwable {
try {
UserGroupInformation proxyUser;

if (UserGroupInformation.isSecurityEnabled() && ops != null) {
// The Gravitino server may use multiple KDC servers.
// The http authentication use one KDC server, the Hadoop catalog may use another KDC
// server.
// The KerberosAuthenticator will remove realm of principal.
// And then we add the realm of Hadoop catalog to the user.
String proxyKerberosPrincipalName = principal.getName();
if (!proxyKerberosPrincipalName.contains("@")) {
proxyKerberosPrincipalName =
String.format("%s@%s", proxyKerberosPrincipalName, ops.getKerberosRealm());
}

proxyUser = UserGroupInformation.createProxyUser(proxyKerberosPrincipalName, realUser);
} else {
proxyUser = UserGroupInformation.createProxyUser(principal.getName(), realUser);
}

return proxyUser.doAs((PrivilegedExceptionAction<Object>) action::execute);
} catch (UndeclaredThrowableException e) {
Throwable innerException = e.getCause();
if (innerException instanceof PrivilegedActionException) {
throw innerException.getCause();
} else if (innerException instanceof InvocationTargetException) {
throw innerException.getCause();
} else {
throw innerException;
}
}
}

@Override
public void bindCatalogOperation(CatalogOperations ops) {
this.ops = ((HadoopCatalogOperations) ops);
this.ops.setProxyPlugin(this);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.catalog.hadoop.kerberos;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.config.ConfigBuilder;
import com.datastrato.gravitino.config.ConfigConstants;
import com.datastrato.gravitino.config.ConfigEntry;
import com.datastrato.gravitino.connector.PropertyEntry;
import com.google.common.collect.ImmutableMap;
import java.util.Map;

public class AuthenticationConfig extends Config {
public static final String ENABLE_AUTH_KEY = "authentication.enable";
public static final String AUTH_TYPE_KEY = "authentication.type";

public AuthenticationConfig(Map<String, String> properties) {
super(false);
loadFromMap(properties, k -> true);
}

public static final ConfigEntry<Boolean> ENABLE_AUTH_ENTRY =
new ConfigBuilder(ENABLE_AUTH_KEY)
.doc("Whether to enable authentication for Hadoop catalog")
.version(ConfigConstants.VERSION_0_5_1)
.booleanConf()
.createWithDefault(false);

public static final ConfigEntry<String> AUTH_TYPE_ENTRY =
new ConfigBuilder(AUTH_TYPE_KEY)
.doc("The type of authentication for Hadoop catalog, currently we only support kerberos")
.version(ConfigConstants.VERSION_0_5_1)
.stringConf()
.create();

public boolean isEnableAuth() {
return get(ENABLE_AUTH_ENTRY);
}

public String getAuthType() {
return get(AUTH_TYPE_ENTRY);
}

public static final Map<String, PropertyEntry<?>> AUTHENTICATION_PROPERTY_ENTRIES =
new ImmutableMap.Builder<String, PropertyEntry<?>>()
.put(
ENABLE_AUTH_KEY,
PropertyEntry.booleanPropertyEntry(
ENABLE_AUTH_KEY,
"Whether to enable authentication for Hadoop catalog",
false,
true,
false,
false,
false))
.put(
AUTH_TYPE_KEY,
PropertyEntry.stringImmutablePropertyEntry(
AUTH_TYPE_KEY,
"The type of authentication for Hadoop catalog, currently we only support kerberos",
false,
null,
false,
false))
.build();
}
Loading

0 comments on commit c8e69fc

Please sign in to comment.