Skip to content

Commit

Permalink
[Enhancement] support pk/fk constraint in iceberg table (#54364)
Browse files Browse the repository at this point in the history
Signed-off-by: yanz <[email protected]>
  • Loading branch information
dirtysalt authored Dec 27, 2024
1 parent 13d0296 commit 90a3da1
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ public String getTableLocation() {
return getNativeTable().location();
}

@Override
public Map<String, String> getProperties() {
return getNativeTable().properties();
}

public PartitionField getPartitionFiled(String colName) {
org.apache.iceberg.Table nativeTable = getNativeTable();
return nativeTable.spec().fields().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -874,7 +874,7 @@ public static int analyzeCompressionLevel(Map<String, String> properties) throws

// analyzeCompressionType will parse the compression type from properties
public static Pair<TCompressionType, Integer> analyzeCompressionType(
Map<String, String> properties) throws AnalysisException {
Map<String, String> properties) throws AnalysisException {
TCompressionType compressionType = TCompressionType.LZ4_FRAME;
if (ConnectContext.get() != null) {
String defaultCompression = ConnectContext.get().getSessionVariable().getDefaultTableCompression();
Expand Down Expand Up @@ -1131,7 +1131,7 @@ public static int analyzePrimaryIndexCacheExpireSecProp(Map<String, String> prop
return val;
}

public static List<UniqueConstraint> analyzeUniqueConstraint(Map<String, String> properties, Database db, OlapTable table) {
public static List<UniqueConstraint> analyzeUniqueConstraint(Map<String, String> properties, Database db, Table table) {
List<UniqueConstraint> uniqueConstraints = Lists.newArrayList();
List<UniqueConstraint> analyzedUniqueConstraints = Lists.newArrayList();

Expand Down Expand Up @@ -1170,11 +1170,11 @@ public static List<UniqueConstraint> analyzeUniqueConstraint(Map<String, String>
return analyzedUniqueConstraints;
}

private static Pair<BaseTableInfo, Table> analyzeForeignKeyConstraintTablePath(String tablePath,
private static Pair<BaseTableInfo, Table> analyzeForeignKeyConstraintTablePath(String catalogName,
String tablePath,
String foreignKeyConstraintDesc,
Database db) {
String[] parts = tablePath.split("\\.");
String catalogName = InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME;
String dbName = db.getFullName();
String tableName = "";
if (parts.length == 3) {
Expand Down Expand Up @@ -1305,21 +1305,17 @@ public static List<ForeignKeyConstraint> analyzeForeignKeyConstraint(
" columns' size does not match", foreignKeyConstraintDesc));
}
// analyze table exist for foreign key constraint
Pair<BaseTableInfo, Table> parentTablePair = analyzeForeignKeyConstraintTablePath(targetTablePath,
foreignKeyConstraintDesc, db);
Pair<BaseTableInfo, Table> parentTablePair = analyzeForeignKeyConstraintTablePath(analyzedTable.getCatalogName(),
targetTablePath, foreignKeyConstraintDesc, db);
BaseTableInfo parentTableInfo = parentTablePair.first;
Table parentTable = parentTablePair.second;
List<ColumnId> parentColumnIds = MetaUtils.getColumnIdsByColumnNames(parentTable, parentColumnNames);
Pair<BaseTableInfo, Table> childTablePair = Pair.create(null, analyzedTable);
Table childTable = analyzedTable;
if (analyzedTable.isMaterializedView()) {
childTablePair = analyzeForeignKeyConstraintTablePath(sourceTablePath, foreignKeyConstraintDesc,
db);
childTablePair = analyzeForeignKeyConstraintTablePath(analyzedTable.getCatalogName(),
sourceTablePath, foreignKeyConstraintDesc, db);
childTable = childTablePair.second;
} else {
if (!analyzedTable.isNativeTable()) {
throw new SemanticException("do not support add foreign key on external table");
}
}
List<ColumnId> childColumnIds = MetaUtils.getColumnIdsByColumnNames(childTable, childColumnNames);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.starrocks.catalog.IcebergTable;
import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.constraint.ForeignKeyConstraint;
import com.starrocks.catalog.constraint.UniqueConstraint;
import com.starrocks.common.AlreadyExistsException;
import com.starrocks.common.DdlException;
import com.starrocks.common.ErrorCode;
Expand All @@ -33,6 +35,7 @@
import com.starrocks.common.StarRocksException;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.connector.ConnectorMetadatRequestContext;
import com.starrocks.connector.ConnectorMetadata;
Expand Down Expand Up @@ -351,6 +354,15 @@ public void dropTable(DropTableStmt stmt) {
asyncRefreshOthersFeMetadataCache(stmt.getDbName(), stmt.getTableName());
}

public void updateTableProperty(Database db, IcebergTable icebergTable) {
Map<String, String> properties = new HashMap(icebergTable.getNativeTable().properties());
List<UniqueConstraint> uniqueConstraints = PropertyAnalyzer.analyzeUniqueConstraint(properties, db, icebergTable);
icebergTable.setUniqueConstraints(uniqueConstraints);
List<ForeignKeyConstraint> foreignKeyConstraints =
PropertyAnalyzer.analyzeForeignKeyConstraint(properties, db, icebergTable);
icebergTable.setForeignKeyConstraints(foreignKeyConstraints);
}

@Override
public Table getTable(String dbName, String tblName) {
TableIdentifier identifier = TableIdentifier.of(dbName, tblName);
Expand All @@ -369,9 +381,12 @@ public Table getTable(String dbName, String tblName) {
dbName = dbName.toLowerCase();
tblName = tblName.toLowerCase();
}
Table table = IcebergApiConverter.toIcebergTable(icebergTable, catalogName, dbName, tblName, catalogType.name());
Database db = getDb(dbName);
IcebergTable table =
IcebergApiConverter.toIcebergTable(icebergTable, catalogName, dbName, tblName, catalogType.name());
table.setComment(icebergTable.properties().getOrDefault(COMMENT, ""));
tables.put(identifier, icebergTable);
updateTableProperty(db, table);
return table;
} catch (StarRocksConnectorException e) {
LOG.error("Failed to get iceberg table {}", identifier, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import com.starrocks.sql.parser.NodePosition;
import com.starrocks.storagevolume.StorageVolume;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
Expand Down Expand Up @@ -1907,26 +1908,39 @@ public static String getExternalCatalogTableDdlStmt(Table table) {
createTableSql.append(String.join(", ", partitionNames)).append(")");
}

// Location
String location = null;
if (table.isHiveTable() || table.isHudiTable()) {
location = (table).getTableLocation();
} else if (table.isIcebergTable()) {
location = table.getTableLocation();
} else if (table.isDeltalakeTable()) {
location = table.getTableLocation();
} else if (table.isPaimonTable()) {
location = table.getTableLocation();
}

// Comment
if (!Strings.isNullOrEmpty(table.getComment())) {
createTableSql.append("\nCOMMENT (\"").append(table.getComment()).append("\")");
}

if (!Strings.isNullOrEmpty(location)) {
createTableSql.append("\nPROPERTIES (\"location\" = \"").append(location).append("\");");
// Properties
Map<String, String> properties = new HashMap<>();
try {
properties = new HashMap<>(table.getProperties());
} catch (NotImplementedException e) {
}

// Location
String location = null;
try {
location = table.getTableLocation();
if (!Strings.isNullOrEmpty(location)) {
properties.put("location", location);
}
} catch (NotImplementedException e) {
}

if (!properties.isEmpty()) {
createTableSql.append("\nPROPERTIES (");
for (Map.Entry<String, String> kv : properties.entrySet()) {
createTableSql.append("\"" + kv.getKey() + "\" = \"").append(kv.getValue()).append("\",");
}
if (createTableSql.charAt(createTableSql.length() - 1) == ',') {
createTableSql.deleteCharAt(createTableSql.length() - 1);
}
createTableSql.append(")");
}
createTableSql.append(";");

return createTableSql.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,12 @@ public void testShowCreateHiveTbl() {
" `col1` int(11) DEFAULT NULL\n" +
")\n" +
"PARTITION BY (col1)\n" +
"PROPERTIES (\"location\" = \"hdfs://127.0.0.1:10000/hive\");",
"PROPERTIES (\"hive.table.serde.lib\" = \"org.apache.hadoop.hive.ql.io.orc.OrcSerde\",\"totalSize\" = " +
"\"100\"," +
"\"hive.table.column.names\" = \"col2\",\"numRows\" = \"50\",\"hive.table.column.types\" = \"INT\"," +
"\"hive.table" +
".input.format\" = \"org.apache.hadoop.hive.ql.io.orc.OrcInputFormat\",\"location\" = \"hdfs://127.0.0" +
".1:10000/hive\");",
AstToStringBuilder.getExternalCatalogTableDdlStmt(hiveTable));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,11 @@ public void testListPartitionNames() {
org.apache.iceberg.Table getTable(String dbName, String tableName) throws StarRocksConnectorException {
return mockedNativeTableB;
}

@Mock
Database getDB(String dbName) {
return new Database(0, dbName);
}
};
IcebergHiveCatalog icebergHiveCatalog = new IcebergHiveCatalog(CATALOG_NAME, new Configuration(), DEFAULT_CONFIG);
CachingIcebergCatalog cachingIcebergCatalog = new CachingIcebergCatalog(CATALOG_NAME, icebergHiveCatalog,
Expand Down Expand Up @@ -1297,6 +1302,11 @@ public void testGetMetaSpec(@Mocked LocalMetastore localMetastore, @Mocked Tempo
org.apache.iceberg.Table getTable(String dbName, String tableName) throws StarRocksConnectorException {
return mockedNativeTableG;
}

@Mock
Database getDB(String dbName) {
return new Database(0, dbName);
}
};

IcebergHiveCatalog icebergHiveCatalog = new IcebergHiveCatalog(CATALOG_NAME, new Configuration(), DEFAULT_CONFIG);
Expand Down Expand Up @@ -1336,6 +1346,11 @@ public void testGetMetaSpecWithDeleteFile(@Mocked LocalMetastore localMetastore,
org.apache.iceberg.Table getTable(String dbName, String tableName) throws StarRocksConnectorException {
return mockedNativeTableA;
}

@Mock
Database getDB(String dbName) {
return new Database(0, dbName);
}
};

Map<String, String> copiedMap = new HashMap<>(DEFAULT_CONFIG);
Expand Down
2 changes: 1 addition & 1 deletion test/sql/test_iceberg/R/test_iceberg_show_stmt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ partition_transform_table CREATE TABLE `partition_transform_table` (
`p2` varchar(1073741824) DEFAULT NULL
)
PARTITION BY (year(t1), month(t2), day(t3), hour(t4), truncate(p1, 5), bucket(p2, 3))
PROPERTIES ("location" = "oss://starrocks-ci-test/iceberg_ci_db/partition_transform_table");
PROPERTIES ("owner" = "root","location" = "oss://starrocks-ci-test/iceberg_ci_db/partition_transform_table");
-- !result
drop catalog iceberg_sql_test_${uuid0};
-- result:
Expand Down

0 comments on commit 90a3da1

Please sign in to comment.