Skip to content

Commit

Permalink
[Improve][Jdbc] Use catalog table lookup instead of slow contains c…
Browse files Browse the repository at this point in the history
…heck
  • Loading branch information
hailin0 committed Jul 11, 2024
1 parent 49d397c commit 13b9ca6
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,16 @@ public String getDefaultDatabase() {
return defaultDatabase;
}

protected Connection getConnection(TablePath tablePath) {
String dbUrl;
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
} else {
dbUrl = getUrlFromDatabaseName(defaultDatabase);
}
return getConnection(dbUrl);
}

protected Connection getConnection(String url) {
if (connectionMap.containsKey(url)) {
return connectionMap.get(url);
Expand All @@ -126,7 +136,6 @@ protected Connection getConnection(String url) {

@Override
public void open() throws CatalogException {
getConnection(defaultUrl);
LOG.info("Catalog {} established connection to {}", catalogName, defaultUrl);
}

Expand Down Expand Up @@ -162,17 +171,7 @@ protected TableIdentifier getTableIdentifier(TablePath tablePath) {

public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(catalogName, tablePath);
}

String dbUrl;
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
} else {
dbUrl = getUrlFromDatabaseName(defaultDatabase);
}
Connection conn = getConnection(dbUrl);
Connection conn = getConnection(tablePath);
try {
DatabaseMetaData metaData = conn.getMetaData();
Optional<PrimaryKey> primaryKey = getPrimaryKey(metaData, tablePath);
Expand Down Expand Up @@ -286,6 +285,10 @@ protected String getListTableSql(String databaseName) {
throw new UnsupportedOperationException();
}

protected String getTableSQL(TablePath tablePath) {
throw new UnsupportedOperationException();
}

protected String getTableName(ResultSet rs) throws SQLException {
String schemaName = rs.getString(1);
String tableName = rs.getString(2);
Expand Down Expand Up @@ -317,11 +320,13 @@ public List<String> listTables(String databaseName)

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName()).contains(getTableName(tablePath));
} catch (DatabaseNotExistException e) {
return false;
try (Connection connection = getConnection(tablePath);
PreparedStatement ps = connection.prepareStatement(getTableSQL(tablePath));
ResultSet rs = ps.executeQuery()) {
return rs.next();
} catch (SQLException e) {
throw new CatalogException(
String.format("Failed to check table exists", tablePath.getFullName()), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbTypeMapper;

import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
Expand Down Expand Up @@ -95,6 +93,13 @@ protected String getListTableSql(String databaseName) {
return "SELECT OWNER, TABLE_NAME FROM ALL_TABLES";
}

@Override
protected String getTableSQL(TablePath tablePath) {
return String.format(
"SELECT OWNER, TABLE_NAME FROM ALL_TABLES WHERE OWNER = '%s' AND TABLE_NAME = '%s'",
tablePath.getSchemaName(), tablePath.getTableName());
}

@Override
protected String getTableName(ResultSet rs) throws SQLException {
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
Expand Down Expand Up @@ -145,25 +150,6 @@ protected String getOptionTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName())
.contains(tablePath.getSchemaAndTableName());
}
return listTables().contains(tablePath.getSchemaAndTableName());
} catch (DatabaseNotExistException e) {
return false;
}
}

private List<String> listTables() {
List<String> databases = listDatabases();
return listTables(databases.get(0));
}

@Override
public List<String> listTables(String databaseName)
throws CatalogException, DatabaseNotExistException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris.IrisTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris.IrisTypeMapper;

import org.apache.commons.lang3.StringUtils;

import com.google.common.annotations.VisibleForTesting;
import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -90,6 +88,13 @@ protected String getListTableSql(String tableSchemaName) {
return String.format(LIST_TABLES_SQL_TEMPLATE, tableSchemaName);
}

@Override
protected String getTableSQL(TablePath tablePath) {
return String.format(
"SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s'",
tablePath.getSchemaName(), tablePath.getTableName());
}

@Override
protected String getTableName(ResultSet rs) throws SQLException {
String schemaName = rs.getString(1);
Expand Down Expand Up @@ -142,16 +147,6 @@ public boolean databaseExists(String databaseName) throws CatalogException {
throw new SeaTunnelException("Not supported for list databases for iris");
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
return listTables(tablePath.getSchemaName())
.contains(tablePath.getSchemaAndTableName());
} catch (DatabaseNotExistException e) {
return false;
}
}

@Override
public List<String> listTables(String schemaName)
throws CatalogException, DatabaseNotExistException {
Expand All @@ -172,18 +167,8 @@ public CatalogTable getTable(String sqlQuery) throws SQLException {
@Override
public CatalogTable getTable(TablePath tablePath)
throws CatalogException, TableNotExistException {
if (!tableExists(tablePath)) {
throw new TableNotExistException(catalogName, tablePath);
}

String dbUrl;
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
} else {
dbUrl = getUrlFromDatabaseName(defaultDatabase);
}
try {
Connection conn = getConnection(dbUrl);
Connection conn = getConnection(tablePath);
DatabaseMetaData metaData = conn.getMetaData();
try (ResultSet resultSet =
metaData.getColumns(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,13 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
public MySqlCatalog(
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
super(catalogName, username, pwd, urlInfo, null);
}

@Override
public void open() throws CatalogException {
this.version = resolveVersion();
this.typeConverter = new MySqlTypeConverter(version);
log.info("Open mysql catalog success, version: {}", version);
}

@Override
Expand All @@ -78,6 +83,13 @@ protected String getListTableSql(String databaseName) {
return "SHOW TABLES;";
}

@Override
protected String getTableSQL(TablePath tablePath) {
return String.format(
"SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'",
tablePath.getDatabaseName(), tablePath.getTableName());
}

@Override
protected String getTableName(ResultSet rs) throws SQLException {
return rs.getString(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleTypeMapper;

import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

import java.sql.Connection;
Expand Down Expand Up @@ -138,6 +134,13 @@ protected String getListTableSql(String databaseName) {
+ " AND (TABLE_NAME NOT LIKE 'SYS_IOT_OVER_%' AND IOT_NAME IS NULL)";
}

@Override
protected String getTableSQL(TablePath tablePath) {
return String.format(
"SELECT OWNER, TABLE_NAME FROM ALL_TABLES WHERE OWNER = '%s' AND TABLE_NAME = '%s'",
tablePath.getSchemaName(), tablePath.getTableName());
}

@Override
protected String getTableName(ResultSet rs) throws SQLException {
if (EXCLUDED_SCHEMAS.contains(rs.getString(1))) {
Expand Down Expand Up @@ -191,25 +194,6 @@ protected String getOptionTableName(TablePath tablePath) {
return tablePath.getSchemaAndTableName();
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName())
.contains(tablePath.getSchemaAndTableName());
}
return listTables().contains(tablePath.getSchemaAndTableName());
} catch (DatabaseNotExistException e) {
return false;
}
}

private List<String> listTables() {
List<String> databases = listDatabases();
return listTables(databases.get(0));
}

@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog;
Expand All @@ -30,7 +29,6 @@
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresTypeMapper;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -114,6 +112,13 @@ protected String getListTableSql(String databaseName) {
return "SELECT table_schema, table_name FROM information_schema.tables;";
}

@Override
protected String getTableSQL(TablePath tablePath) {
return String.format(
"SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s'",
tablePath.getSchemaName(), tablePath.getTableName());
}

@Override
protected String getSelectColumnsSql(TablePath tablePath) {
return String.format(
Expand Down Expand Up @@ -231,21 +236,6 @@ protected void dropDatabaseInternal(String databaseName) throws CatalogException
super.dropDatabaseInternal(databaseName);
}

@Override
public boolean tableExists(TablePath tablePath) throws CatalogException {
try {
if (StringUtils.isNotBlank(tablePath.getDatabaseName())) {
return databaseExists(tablePath.getDatabaseName())
&& listTables(tablePath.getDatabaseName())
.contains(tablePath.getSchemaAndTableName());
}

return listTables(defaultDatabase).contains(tablePath.getSchemaAndTableName());
} catch (DatabaseNotExistException e) {
return false;
}
}

@Override
public CatalogTable getTable(String sqlQuery) throws SQLException {
Connection defaultConnection = getConnection(defaultUrl);
Expand Down
Loading

0 comments on commit 13b9ca6

Please sign in to comment.