diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java index 0608aba462b..f56b5297e17 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -23,7 +23,6 @@ import com.google.errorprone.annotations.FormatString; import java.io.IOException; import java.util.Collections; -import java.util.Map; import java.util.function.Consumer; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.EnvironmentSettings; @@ -159,10 +158,6 @@ protected TableResult sql(@FormatString String sql, Object... args) { return tableEnv.executeSql(String.format(sql, args)); } - protected Map schemaOptions(String schemaName) { - return null; - } - protected void doWithSchema( Catalog catalog, String schemaName, Consumer action, boolean dropSchema) { Preconditions.checkNotNull(catalog); @@ -170,7 +165,7 @@ protected void doWithSchema( try { tableEnv.useCatalog(catalog.name()); if (!catalog.asSchemas().schemaExists(schemaName)) { - catalog.asSchemas().createSchema(schemaName, null, schemaOptions(schemaName)); + catalog.asSchemas().createSchema(schemaName, null, null); } tableEnv.useDatabase(schemaName); action.accept(catalog); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 0534922ef70..bb7b25f6b20 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; import java.util.stream.Collectors; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.DataTypes; @@ -587,8 +588,27 @@ protected org.apache.gravitino.Catalog currentCatalog() { return hiveCatalog; } - @Override - protected Map schemaOptions(String schemaName) { - return ImmutableMap.of("location", warehouse + "/" + schemaName); + protected void doWithSchema( + org.apache.gravitino.Catalog catalog, + String schemaName, + Consumer action, + boolean dropSchema) { + Preconditions.checkNotNull(catalog); + Preconditions.checkNotNull(schemaName); + try { + tableEnv.useCatalog(catalog.name()); + if (!catalog.asSchemas().schemaExists(schemaName)) { + catalog + .asSchemas() + .createSchema( + schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); + } + tableEnv.useDatabase(schemaName); + action.accept(catalog); + } finally { + if (dropSchema) { + catalog.asSchemas().dropSchema(schemaName, true); + } + } } }