Skip to content

Commit

Permalink
[feature](statistics)Support get row count for pg and sql server. (ap…
Browse files Browse the repository at this point in the history
…ache#42674)

Support get row count for pg and sql server. Get the row count value
through the databases' statistics info.
  • Loading branch information
Jibing-Li authored Nov 3, 2024
1 parent 354a5ed commit 1349b86
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,7 @@ insert into doris_test.test1 values
(false, 'abc', 'efg', '2022-10-01', 4.5, 1, 2, 1024, 100000, 1.2, '2022-10-02 12:59:01', 24.000);

insert into doris_test.ex_tb0 values (111, 'abc'), (112, 'abd'), (113, 'abe'),(114, 'abf'),(115, 'abg');
analyze table doris_test.ex_tb0;

insert into doris_test.ex_tb1 values ('{"k1":"v1", "k2":"v2"}');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,7 @@ insert into doris_test.test1 values
(cast(0 as bit), 'abc', 'def', '2022-10-11', 1.234, 1, 2, 1022, '2022-10-22 10:59:59', 34.123),
(cast(0 as bit), 'abc', 'def', '2022-10-11', 1.234, 1, 2, 1023, '2022-10-22 10:59:59', 34.123),
(cast(0 as bit), 'abc', 'def', '2022-10-11', 1.234, 1, 2, 1024, '2022-10-22 10:59:59', 34.123);
analyze doris_test.test1;

insert into doris_test.test2 values
(123, 'zhangsan', '2022-01-01 01:02:03', 'zhangsan1', '2022-01-01 01:02:04', 111, 122, false, 'code', 'zhangsan2', 222, 'tag', 'remark'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use doris_test;

Insert into dbo.student values (1, 'doris', 18), (2, 'alice', 19), (3, 'bob', 20);
UPDATE STATISTICS dbo.student;

Insert into dbo.test_int values
(1, 0, 1, 1), (2, 1, -1, -1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,24 @@
public class JdbcExternalTable extends ExternalTable {
private static final Logger LOG = LogManager.getLogger(JdbcExternalTable.class);

public static final String MYSQL_ROW_COUNT_SQL = "SELECT * FROM QUERY"
+ "(\"catalog\"=\"${ctlName}\", \"query\"=\"show table status from `${dbName}` like '${tblName}'\");";
public static final String MYSQL_ROW_COUNT_SQL = "SELECT max(row_count) as rows FROM ("
+ "(SELECT TABLE_ROWS AS row_count FROM INFORMATION_SCHEMA.TABLES "
+ "WHERE TABLE_SCHEMA = '${dbName}' AND TABLE_NAME = '${tblName}' "
+ "AND TABLE_TYPE = 'BASE TABLE') "
+ "UNION ALL "
+ "(SELECT CARDINALITY AS row_count FROM INFORMATION_SCHEMA.STATISTICS "
+ "WHERE TABLE_SCHEMA = '${dbName}' AND TABLE_NAME = '${tblName}' "
+ "AND CARDINALITY IS NOT NULL)) t";

public static final String PG_ROW_COUNT_SQL = "SELECT reltuples as rows FROM pg_class "
+ "WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = '${dbName}') "
+ "AND relname = '${tblName}'";

public static final String SQLSERVER_ROW_COUNT_SQL = "SELECT sum(rows) as rows FROM sys.partitions "
+ "WHERE object_id = (SELECT object_id('${dbName}.${tblName}')) AND index_id IN (0, 1)";

public static final String FETCH_ROW_COUNT_TEMPLATE = "SELECT * FROM QUERY"
+ "(\"catalog\"=\"${ctlName}\", \"query\"=\"${sql}\");";

private JdbcTable jdbcTable;

Expand Down Expand Up @@ -119,41 +135,55 @@ public long fetchRowCount() {
params.put("tblName", name);
switch (((JdbcExternalCatalog) catalog).getDatabaseTypeName()) {
case JdbcResource.MYSQL:
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(MYSQL_ROW_COUNT_SQL);
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
List<ResultRow> resultRows = stmtExecutor.executeInternalQuery();
if (resultRows == null || resultRows.size() != 1) {
LOG.info("No mysql status found for table {}.{}.{}", catalog.getName(), dbName, name);
return -1;
}
StatementBase parsedStmt = stmtExecutor.getParsedStmt();
if (parsedStmt == null || parsedStmt.getColLabels() == null) {
LOG.info("No column label found for table {}.{}.{}", catalog.getName(), dbName, name);
return -1;
}
ResultRow resultRow = resultRows.get(0);
List<String> colLabels = parsedStmt.getColLabels();
int index = colLabels.indexOf("TABLE_ROWS");
if (index == -1) {
LOG.info("No TABLE_ROWS in status for table {}.{}.{}", catalog.getName(), dbName, name);
return -1;
}
long rows = Long.parseLong(resultRow.get(index));
LOG.info("Get mysql table {}.{}.{} row count {}", catalog.getName(), dbName, name, rows);
return rows;
} catch (Exception e) {
LOG.warn("Failed to fetch mysql row count for table {}.{}.{}. Reason [{}]",
catalog.getName(), dbName, name, e.getMessage());
return -1;
}
case JdbcResource.ORACLE:
params.put("sql", MYSQL_ROW_COUNT_SQL);
return getRowCount(params);
case JdbcResource.POSTGRESQL:
params.put("sql", PG_ROW_COUNT_SQL);
return getRowCount(params);
case JdbcResource.SQLSERVER:
params.put("sql", SQLSERVER_ROW_COUNT_SQL);
return getRowCount(params);
case JdbcResource.ORACLE:
default:
break;
}
return -1;
return UNKNOWN_ROW_COUNT;
}

protected long getRowCount(Map<String, String> params) {
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext(false)) {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(FETCH_ROW_COUNT_TEMPLATE);
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
List<ResultRow> resultRows = stmtExecutor.executeInternalQuery();
if (resultRows == null || resultRows.size() != 1) {
LOG.info("No status found for table {}.{}.{}", catalog.getName(), dbName, name);
return UNKNOWN_ROW_COUNT;
}
StatementBase parsedStmt = stmtExecutor.getParsedStmt();
if (parsedStmt == null || parsedStmt.getColLabels() == null) {
LOG.info("No column label found for table {}.{}.{}", catalog.getName(), dbName, name);
return UNKNOWN_ROW_COUNT;
}
ResultRow resultRow = resultRows.get(0);
List<String> colLabels = parsedStmt.getColLabels();
int index = colLabels.indexOf("rows");
if (index == -1) {
LOG.info("No TABLE_ROWS in status for table {}.{}.{}", catalog.getName(), dbName, name);
return UNKNOWN_ROW_COUNT;
}
long rows = Long.parseLong(resultRow.get(index));
if (rows <= 0) {
LOG.info("Table {}.{}.{} row count is {}, discard it and use -1 instead",
catalog.getName(), dbName, name, rows);
return UNKNOWN_ROW_COUNT;
}
LOG.info("Get table {}.{}.{} row count {}", catalog.getName(), dbName, name, rows);
return rows;
} catch (Exception e) {
LOG.warn("Failed to fetch row count for table {}.{}.{}. Reason [{}]",
catalog.getName(), dbName, name, e.getMessage());
return UNKNOWN_ROW_COUNT;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_jdbc_row_count", "p0,external,mysql,external_docker,external_docker_mysql") {
String enabled = context.config.otherConfigs.get("enableJdbcTest")
logger.info("enabled " + enabled)
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String mysql_port = context.config.otherConfigs.get("mysql_57_port");
String s3_endpoint = getS3Endpoint()
String bucket = getS3BucketName()
String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-java-8.0.25.jar"
if (enabled != null && enabled.equalsIgnoreCase("true")) {
// Test mysql
String catalog_name = "test_mysql_jdbc_row_count";
sql """drop catalog if exists ${catalog_name}"""
sql """create catalog if not exists ${catalog_name} properties(
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/doris_test?useSSL=false&zeroDateTimeBehavior=convertToNull",
"driver_url" = "${driver_url}",
"driver_class" = "com.mysql.cj.jdbc.Driver"
);"""
sql """use ${catalog_name}.doris_test"""
def result = sql """show table stats ex_tb0"""
Thread.sleep(1000)
for (int i = 0; i < 60; i++) {
result = sql """show table stats ex_tb0""";
if (result[0][2] != "-1") {
break;
}
logger.info("Table row count not ready yet. Wait 1 second.")
Thread.sleep(1000)
}
assertEquals("5", result[0][2])
sql """drop catalog ${catalog_name}"""

// Test pg
catalog_name = "test_pg_jdbc_row_count";
driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar"
String pg_port = context.config.otherConfigs.get("pg_14_port");
sql """drop catalog if exists ${catalog_name} """
sql """create catalog if not exists ${catalog_name} properties(
"type"="jdbc",
"user"="postgres",
"password"="123456",
"jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/postgres?currentSchema=doris_test&useSSL=false",
"driver_url" = "${driver_url}",
"driver_class" = "org.postgresql.Driver"
);"""
sql """use ${catalog_name}.doris_test"""
result = sql """show table stats test1"""
Thread.sleep(1000)
for (int i = 0; i < 60; i++) {
result = sql """show table stats test1""";
if (result[0][2] != "-1") {
break;
}
logger.info("Table row count not ready yet. Wait 1 second.")
Thread.sleep(1000)
}
assertEquals("1026", result[0][2])
sql """drop catalog ${catalog_name}"""

// Test sqlserver
catalog_name = "test_sqlserver_jdbc_row_count";
driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mssql-jdbc-11.2.3.jre8.jar"
String sqlserver_port = context.config.otherConfigs.get("sqlserver_2022_port");
sql """drop catalog if exists ${catalog_name} """
sql """ create catalog if not exists ${catalog_name} properties(
"type"="jdbc",
"user"="sa",
"password"="Doris123456",
"jdbc_url" = "jdbc:sqlserver://${externalEnvIp}:${sqlserver_port};encrypt=false;databaseName=doris_test;",
"driver_url" = "${driver_url}",
"driver_class" = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
);"""
sql """use ${catalog_name}.dbo"""
result = sql """show table stats student"""
Thread.sleep(1000)
for (int i = 0; i < 60; i++) {
result = sql """show table stats student""";
if (result[0][2] != "-1") {
break;
}
logger.info("Table row count not ready yet. Wait 1 second.")
Thread.sleep(1000)
}
assertEquals("3", result[0][2])
sql """drop catalog ${catalog_name}"""
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,6 @@ suite("test_mysql_jdbc_statistics", "p0,external,mysql,external_docker,external_

sql """use ${catalog_name}.doris_test"""

def result = sql """show table stats ex_tb0"""
Thread.sleep(1000)
for (int i = 0; i < 20; i++) {
result = sql """show table stats ex_tb0""";
if (result[0][2] != "-1") {
assertEquals("5", result[0][2])
break;
}
logger.info("Table row count not ready yet. Wait 1 second.")
Thread.sleep(1000)
}
sql """analyze table ex_tb0 with sync"""
result = sql """show column stats ex_tb0 (name)"""
assertEquals(result.size(), 1)
Expand Down

0 comments on commit 1349b86

Please sign in to comment.