From 0b1ace7238ef1ddc58cddfd5bce00bb445992335 Mon Sep 17 00:00:00 2001 From: yangyang zhong <35210666+hdygxsj@users.noreply.github.com> Date: Wed, 15 Jan 2025 14:10:31 +0800 Subject: [PATCH] paimon table ddl --- .../flink/connector/catalog/BaseCatalog.java | 4 ++-- .../paimon/GravitinoPaimonCatalog.java | 24 +++++++++++++++++++ .../integration/test/FlinkEnvIT.java | 13 +++++----- .../test/hive/FlinkHiveCatalogIT.java | 5 ++++ .../test/paimon/FlinkPaimonCatalogIT.java | 10 -------- 5 files changed, 38 insertions(+), 18 deletions(-) diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index 1496742177f..fd8e118ee49 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -656,11 +656,11 @@ static SchemaChange[] getSchemaChange(CatalogDatabase current, CatalogDatabase u return schemaChanges.toArray(new SchemaChange[0]); } - private Catalog catalog() { + protected Catalog catalog() { return GravitinoCatalogManager.get().getGravitinoCatalogInfo(getName()); } - private String catalogName() { + protected String catalogName() { return getName(); } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java index 017ac6e7085..c22e00fa122 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalog.java @@ -19,10 +19,17 @@ package org.apache.gravitino.flink.connector.paimon; +import java.util.Optional; import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.factories.Factory; +import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.flink.connector.PartitionConverter; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.gravitino.flink.connector.catalog.BaseCatalog; +import org.apache.paimon.flink.FlinkTableFactory; /** * The GravitinoPaimonCatalog class is an implementation of the BaseCatalog class that is used to @@ -45,4 +52,21 @@ protected GravitinoPaimonCatalog( protected AbstractCatalog realCatalog() { return paimonCatalog; } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + boolean dropped = + catalog() + .asTableCatalog() + .purgeTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName())); + if (!dropped && !ignoreIfNotExists) { + throw new TableNotExistException(catalogName(), tablePath); + } + } + + @Override + public Optional getFactory() { + return Optional.of(new FlinkTableFactory()); + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java index 5ae8847c6c1..0608aba462b 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -19,11 +19,11 @@ package org.apache.gravitino.flink.connector.integration.test; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; import com.google.errorprone.annotations.FormatMethod; import com.google.errorprone.annotations.FormatString; import java.io.IOException; import java.util.Collections; +import java.util.Map; import java.util.function.Consumer; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.api.EnvironmentSettings; @@ -159,17 +159,18 @@ protected TableResult sql(@FormatString String sql, Object... args) { return tableEnv.executeSql(String.format(sql, args)); } - protected static void doWithSchema( + protected Map schemaOptions(String schemaName) { + return null; + } + + protected void doWithSchema( Catalog catalog, String schemaName, Consumer action, boolean dropSchema) { Preconditions.checkNotNull(catalog); Preconditions.checkNotNull(schemaName); try { tableEnv.useCatalog(catalog.name()); if (!catalog.asSchemas().schemaExists(schemaName)) { - catalog - .asSchemas() - .createSchema( - schemaName, null, ImmutableMap.of("location", warehouse + "/" + schemaName)); + catalog.asSchemas().createSchema(schemaName, null, schemaOptions(schemaName)); } tableEnv.useDatabase(schemaName); action.accept(catalog); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 333aa83f0b6..0534922ef70 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -586,4 +586,9 @@ public void testGetHiveTable() { protected org.apache.gravitino.Catalog currentCatalog() { return hiveCatalog; } + + @Override + protected Map schemaOptions(String schemaName) { + return ImmutableMap.of("location", warehouse + "/" + schemaName); + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 10fab3567a3..57a17c2a114 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -42,16 +42,6 @@ public class FlinkPaimonCatalogIT extends FlinkCommonIT { private static org.apache.gravitino.Catalog catalog; - @Override - protected boolean supportColumnOperation() { - return false; - } - - @Override - protected boolean supportTableOperation() { - return false; - } - @Override protected boolean supportSchemaOperationWithCommentAndOptions() { return false;