diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java index c98421e27725..e3983a6d9d77 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java @@ -362,7 +362,16 @@ public void testInformationSchema() throws SQLException { statement.executeQuery("show tables"), "TableName,TTL(ms),", new HashSet<>( - Arrays.asList("databases,INF,", "tables,INF,", "columns,INF,", "queries,INF,"))); + Arrays.asList( + "databases,INF,", + "tables,INF,", + "columns,INF,", + "queries,INF,", + "regions,INF,", + "topics,INF,", + "pipe_plugins,INF,", + "pipes,INF,", + "subscriptions,INF,"))); TestUtils.assertResultSetEqual( statement.executeQuery("desc databases"), @@ -405,7 +414,44 @@ public void testInformationSchema() throws SQLException { "start_time,TIMESTAMP,ATTRIBUTE,", "datanode_id,INT32,ATTRIBUTE,", "elapsed_time,FLOAT,ATTRIBUTE,", - "statement,STRING,ATTRIBUTE,"))); + "statement,STRING,ATTRIBUTE,", + "user,STRING,ATTRIBUTE,"))); + TestUtils.assertResultSetEqual( + statement.executeQuery("desc pipes"), + "ColumnName,DataType,Category,", + new HashSet<>( + Arrays.asList( + "id,STRING,TAG,", + "creation_time,TIMESTAMP,ATTRIBUTE,", + "state,STRING,ATTRIBUTE,", + "pipe_source,STRING,ATTRIBUTE,", + "pipe_processor,STRING,ATTRIBUTE,", + "pipe_sink,STRING,ATTRIBUTE,", + "exception_message,STRING,ATTRIBUTE,", + "remaining_event_count,INT64,ATTRIBUTE,", + "estimated_remaining_seconds,DOUBLE,ATTRIBUTE,"))); + TestUtils.assertResultSetEqual( + statement.executeQuery("desc pipe_plugins"), + "ColumnName,DataType,Category,", + new HashSet<>( + Arrays.asList( + "plugin_name,STRING,TAG,", + "plugin_type,STRING,ATTRIBUTE,", + "class_name,STRING,ATTRIBUTE,", + "plugin_jar,STRING,ATTRIBUTE,"))); + TestUtils.assertResultSetEqual( + statement.executeQuery("desc topics"), + "ColumnName,DataType,Category,", + new HashSet<>( + Arrays.asList("topic_name,STRING,TAG,", "topic_configs,STRING,ATTRIBUTE,"))); + TestUtils.assertResultSetEqual( + statement.executeQuery("desc subscriptions"), + "ColumnName,DataType,Category,", + new HashSet<>( + Arrays.asList( + "topic_name,STRING,TAG,", + "consumer_group_name,STRING,TAG,", + "subscribed_consumers,STRING,ATTRIBUTE,"))); // Test table query statement.execute("create database test"); @@ -427,11 +473,16 @@ public void testInformationSchema() throws SQLException { "information_schema,tables,INF,USING,", "information_schema,columns,INF,USING,", "information_schema,queries,INF,USING,", + "information_schema,regions,INF,USING,", + "information_schema,topics,INF,USING,", + "information_schema,pipe_plugins,INF,USING,", + "information_schema,pipes,INF,USING,", + "information_schema,subscriptions,INF,USING,", "test,test,INF,USING,"))); TestUtils.assertResultSetEqual( statement.executeQuery("count devices from tables where status = 'USING'"), "count(devices),", - Collections.singleton("5,")); + Collections.singleton("10,")); TestUtils.assertResultSetEqual( statement.executeQuery( "select * from columns where table_name = 'queries' or database = 'test'"), @@ -443,10 +494,36 @@ public void testInformationSchema() throws SQLException { "information_schema,queries,datanode_id,INT32,ATTRIBUTE,USING,", "information_schema,queries,elapsed_time,FLOAT,ATTRIBUTE,USING,", "information_schema,queries,statement,STRING,ATTRIBUTE,USING,", + "information_schema,queries,user,STRING,ATTRIBUTE,USING,", "test,test,time,TIMESTAMP,TIME,USING,", "test,test,a,STRING,TAG,USING,", "test,test,b,STRING,ATTRIBUTE,USING,", "test,test,c,INT32,FIELD,USING,"))); + + statement.execute( + "create pipe a2b with source('double-living'='true') with sink ('sink'='write-back-sink')"); + TestUtils.assertResultSetEqual( + statement.executeQuery("select id, pipe_sink from pipes where creation_time > 0"), + "id,pipe_sink,", + Collections.singleton("a2b,{sink=write-back-sink},")); + TestUtils.assertResultSetEqual( + statement.executeQuery("select * from pipe_plugins"), + "plugin_name,plugin_type,class_name,plugin_jar,", + new HashSet<>( + Arrays.asList( + "IOTDB-THRIFT-SSL-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftSslConnector,null,", + "IOTDB-AIR-GAP-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.airgap.IoTDBAirGapConnector,null,", + "DO-NOTHING-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.donothing.DoNothingConnector,null,", + "DO-NOTHING-PROCESSOR,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.processor.donothing.DoNothingProcessor,null,", + "IOTDB-THRIFT-SINK,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.connector.iotdb.thrift.IoTDBThriftConnector,null,", + "IOTDB-SOURCE,Builtin,org.apache.iotdb.commons.pipe.agent.plugin.builtin.extractor.iotdb.IoTDBExtractor,null,"))); + + statement.execute("create topic tp with ('start-time'='2025-01-13T10:03:19.229+08:00')"); + TestUtils.assertResultSetEqual( + statement.executeQuery("select * from topics where topic_name = 'tp'"), + "topic_name,topic_configs,", + Collections.singleton( + "tp,{__system.sql-dialect=table, start-time=2025-01-13T10:03:19.229+08:00},")); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java index ccb75ebb22f9..afd89a407be0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java @@ -19,7 +19,11 @@ package org.apache.iotdb.db.queryengine.execution.operator.source.relational; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.exception.auth.AccessDeniedException; +import org.apache.iotdb.commons.pipe.agent.plugin.builtin.BuiltinPipePlugin; +import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta; import org.apache.iotdb.commons.schema.table.InformationSchema; import org.apache.iotdb.commons.schema.table.TableNodeStatus; import org.apache.iotdb.commons.schema.table.TsTable; @@ -28,15 +32,25 @@ import org.apache.iotdb.confignode.rpc.thrift.TDatabaseInfo; import org.apache.iotdb.confignode.rpc.thrift.TDescTable4InformationSchemaResp; import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq; -import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp; +import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowSubscriptionReq; +import org.apache.iotdb.confignode.rpc.thrift.TShowTopicInfo; +import org.apache.iotdb.confignode.rpc.thrift.TShowTopicReq; import org.apache.iotdb.confignode.rpc.thrift.TTableInfo; +import org.apache.iotdb.db.pipe.metric.PipeDataNodeRemainingEventAndTimeMetrics; import org.apache.iotdb.db.protocol.client.ConfigNodeClient; import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.protocol.session.IClientSession; import org.apache.iotdb.db.queryengine.plan.Coordinator; import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.schemaengine.table.InformationSchemaUtils; +import org.apache.iotdb.db.utils.TimestampPrecisionUtils; import org.apache.tsfile.block.column.ColumnBuilder; import org.apache.tsfile.common.conf.TSFileConfig; @@ -48,7 +62,6 @@ import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.Pair; -import java.security.AccessControlException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -57,12 +70,15 @@ import java.util.NoSuchElementException; import java.util.Objects; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.iotdb.commons.conf.IoTDBConstant.TTL_INFINITE; import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_SCOPE; import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_RESULT_NODES; import static org.apache.iotdb.commons.schema.table.TsTable.TTL_PROPERTY; +import static org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask.PIPE_PLUGIN_TYPE_BUILTIN; +import static org.apache.iotdb.db.queryengine.plan.execution.config.metadata.ShowPipePluginsTask.PIPE_PLUGIN_TYPE_EXTERNAL; public class InformationSchemaContentSupplierFactory { private InformationSchemaContentSupplierFactory() {} @@ -71,13 +87,23 @@ public static Iterator getSupplier( final String tableName, final List dataTypes, final String userName) { switch (tableName) { case InformationSchema.QUERIES: - return new QueriesSupplier(dataTypes); + return new QueriesSupplier(dataTypes, userName); case InformationSchema.DATABASES: return new DatabaseSupplier(dataTypes, userName); case InformationSchema.TABLES: return new TableSupplier(dataTypes, userName); case InformationSchema.COLUMNS: return new ColumnSupplier(dataTypes, userName); + case InformationSchema.REGIONS: + return new RegionSupplier(dataTypes, userName); + case InformationSchema.PIPES: + return new PipeSupplier(dataTypes, userName); + case InformationSchema.PIPE_PLUGINS: + return new PipePluginSupplier(dataTypes, userName); + case InformationSchema.TOPICS: + return new TopicSupplier(dataTypes, userName); + case InformationSchema.SUBSCRIPTIONS: + return new SubscriptionSupplier(dataTypes, userName); default: throw new UnsupportedOperationException("Unknown table: " + tableName); } @@ -88,29 +114,40 @@ private static class QueriesSupplier extends TsBlockSupplier { // We initialize it later for the convenience of data preparation protected int totalSize; protected int nextConsumedIndex; - private final List queryExecutions = - Coordinator.getInstance().getAllQueryExecutions(); + private List queryExecutions; - private QueriesSupplier(final List dataTypes) { + private QueriesSupplier(final List dataTypes, final String userName) { super(dataTypes); + queryExecutions = Coordinator.getInstance().getAllQueryExecutions(); + try { + Coordinator.getInstance().getAccessControl().checkUserHasMaintainPrivilege(userName); + } catch (final AccessDeniedException e) { + queryExecutions = + queryExecutions.stream() + .filter(iQueryExecution -> userName.equals(iQueryExecution.getUser())) + .collect(Collectors.toList()); + } this.totalSize = queryExecutions.size(); } @Override protected void constructLine() { - IQueryExecution queryExecution = queryExecutions.get(nextConsumedIndex); + final IQueryExecution queryExecution = queryExecutions.get(nextConsumedIndex); if (queryExecution.getSQLDialect().equals(IClientSession.SqlDialect.TABLE)) { - String[] splits = queryExecution.getQueryId().split("_"); - int dataNodeId = Integer.parseInt(splits[splits.length - 1]); + final String[] splits = queryExecution.getQueryId().split("_"); + final int dataNodeId = Integer.parseInt(splits[splits.length - 1]); columnBuilders[0].writeBinary(BytesUtils.valueOf(queryExecution.getQueryId())); - columnBuilders[1].writeLong(queryExecution.getStartExecutionTime()); + columnBuilders[1].writeLong( + TimestampPrecisionUtils.convertToCurrPrecision( + queryExecution.getStartExecutionTime(), TimeUnit.MILLISECONDS)); columnBuilders[2].writeInt(dataNodeId); columnBuilders[3].writeFloat( (float) (currTime - queryExecution.getStartExecutionTime()) / 1000); columnBuilders[4].writeBinary( BytesUtils.valueOf(queryExecution.getExecuteSQL().orElse("UNKNOWN"))); + columnBuilders[5].writeBinary(BytesUtils.valueOf(queryExecution.getUser())); resultBuilder.declarePosition(); } nextConsumedIndex++; @@ -133,11 +170,15 @@ private DatabaseSupplier(final List dataTypes, final String userName this.userName = userName; try (final ConfigNodeClient client = ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { - final TShowDatabaseResp resp = - client.showDatabase( - new TGetDatabaseReq(Arrays.asList(ALL_RESULT_NODES), ALL_MATCH_SCOPE.serialize()) - .setIsTableModel(true)); - iterator = resp.getDatabaseInfoMap().entrySet().iterator(); + iterator = + client + .showDatabase( + new TGetDatabaseReq( + Arrays.asList(ALL_RESULT_NODES), ALL_MATCH_SCOPE.serialize()) + .setIsTableModel(true)) + .getDatabaseInfoMap() + .entrySet() + .iterator(); } catch (final Exception e) { lastException = e; } @@ -193,6 +234,7 @@ public boolean hasNext() { private static class TableSupplier extends TsBlockSupplier { private Iterator>> dbIterator; private Iterator tableInfoIterator = null; + private TTableInfo currentTable; private String dbName; private final String userName; @@ -224,20 +266,29 @@ private TableSupplier(final List dataTypes, final String userName) { @Override protected void constructLine() { - final TTableInfo info = tableInfoIterator.next(); columnBuilders[0].writeBinary(new Binary(dbName, TSFileConfig.STRING_CHARSET)); - columnBuilders[1].writeBinary(new Binary(info.getTableName(), TSFileConfig.STRING_CHARSET)); - columnBuilders[2].writeBinary(new Binary(info.getTTL(), TSFileConfig.STRING_CHARSET)); + columnBuilders[1].writeBinary( + new Binary(currentTable.getTableName(), TSFileConfig.STRING_CHARSET)); + columnBuilders[2].writeBinary(new Binary(currentTable.getTTL(), TSFileConfig.STRING_CHARSET)); columnBuilders[3].writeBinary( new Binary( - TableNodeStatus.values()[info.getState()].toString(), TSFileConfig.STRING_CHARSET)); + TableNodeStatus.values()[currentTable.getState()].toString(), + TSFileConfig.STRING_CHARSET)); resultBuilder.declarePosition(); + currentTable = null; } @Override public boolean hasNext() { // Get next table info iterator - while (Objects.isNull(tableInfoIterator) || !tableInfoIterator.hasNext()) { + while (Objects.isNull(currentTable)) { + while (Objects.nonNull(tableInfoIterator) && tableInfoIterator.hasNext()) { + final TTableInfo info = tableInfoIterator.next(); + if (canShowTable(userName, dbName, info.getTableName())) { + currentTable = info; + return true; + } + } if (!dbIterator.hasNext()) { return false; } @@ -329,19 +380,257 @@ public boolean hasNext() { tableInfoIterator = entry.getValue().entrySet().iterator(); } - final Map.Entry>> tableEntry = tableInfoIterator.next(); - tableName = tableEntry.getKey(); - preDeletedColumns = tableEntry.getValue().getRight(); - columnSchemaIterator = tableEntry.getValue().getLeft().getColumnList().iterator(); + Map.Entry>> tableEntry; + while (tableInfoIterator.hasNext()) { + tableEntry = tableInfoIterator.next(); + if (canShowTable(userName, dbName, tableEntry.getKey())) { + tableName = tableEntry.getKey(); + preDeletedColumns = tableEntry.getValue().getRight(); + columnSchemaIterator = tableEntry.getValue().getLeft().getColumnList().iterator(); + break; + } + } } return true; } } + private static class RegionSupplier extends TsBlockSupplier { + private Iterator iterator; + + private RegionSupplier(final List dataTypes, final String userName) { + super(dataTypes); + Coordinator.getInstance().getAccessControl().checkUserHasMaintainPrivilege(userName); + try (final ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + iterator = + client + .showRegion(new TShowRegionReq().setIsTableModel(true).setDatabases(null)) + .getRegionInfoListIterator(); + } catch (final Exception e) { + lastException = e; + } + } + + @Override + protected void constructLine() { + final TRegionInfo regionInfo = iterator.next(); + columnBuilders[0].writeInt(regionInfo.getConsensusGroupId().getId()); + columnBuilders[1].writeInt(regionInfo.getDataNodeId()); + if (regionInfo.getConsensusGroupId().getType().ordinal() + == TConsensusGroupType.SchemaRegion.ordinal()) { + columnBuilders[2].writeBinary( + BytesUtils.valueOf(String.valueOf(TConsensusGroupType.SchemaRegion))); + } else if (regionInfo.getConsensusGroupId().getType().ordinal() + == TConsensusGroupType.DataRegion.ordinal()) { + columnBuilders[2].writeBinary( + BytesUtils.valueOf(String.valueOf(TConsensusGroupType.DataRegion))); + } + columnBuilders[3].writeBinary( + BytesUtils.valueOf(regionInfo.getStatus() == null ? "" : regionInfo.getStatus())); + columnBuilders[4].writeBinary(BytesUtils.valueOf(regionInfo.getDatabase())); + columnBuilders[5].writeInt(regionInfo.getSeriesSlots()); + columnBuilders[6].writeLong(regionInfo.getTimeSlots()); + columnBuilders[7].writeBinary(BytesUtils.valueOf(regionInfo.getClientRpcIp())); + columnBuilders[8].writeInt(regionInfo.getClientRpcPort()); + columnBuilders[9].writeBinary(BytesUtils.valueOf(regionInfo.getInternalAddress())); + columnBuilders[10].writeBinary(BytesUtils.valueOf(regionInfo.getRoleType())); + columnBuilders[11].writeLong(regionInfo.getCreateTime()); + if (regionInfo.getConsensusGroupId().getType().ordinal() + == TConsensusGroupType.DataRegion.ordinal()) { + columnBuilders[12].writeLong(regionInfo.getTsFileSize()); + } else { + columnBuilders[12].appendNull(); + } + resultBuilder.declarePosition(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + } + + private static class PipeSupplier extends TsBlockSupplier { + private Iterator iterator; + + private PipeSupplier(final List dataTypes, final String userName) { + super(dataTypes); + Coordinator.getInstance().getAccessControl().checkUserHasMaintainPrivilege(userName); + try (final ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + iterator = + client.showPipe(new TShowPipeReq().setIsTableModel(true)).getPipeInfoListIterator(); + } catch (final Exception e) { + lastException = e; + } + } + + @Override + protected void constructLine() { + final TShowPipeInfo tPipeInfo = iterator.next(); + columnBuilders[0].writeBinary(new Binary(tPipeInfo.getId(), TSFileConfig.STRING_CHARSET)); + columnBuilders[1].writeLong( + TimestampPrecisionUtils.convertToCurrPrecision( + tPipeInfo.getCreationTime(), TimeUnit.MILLISECONDS)); + columnBuilders[2].writeBinary(new Binary(tPipeInfo.getState(), TSFileConfig.STRING_CHARSET)); + columnBuilders[3].writeBinary( + new Binary(tPipeInfo.getPipeExtractor(), TSFileConfig.STRING_CHARSET)); + columnBuilders[4].writeBinary( + new Binary(tPipeInfo.getPipeProcessor(), TSFileConfig.STRING_CHARSET)); + columnBuilders[5].writeBinary( + new Binary(tPipeInfo.getPipeConnector(), TSFileConfig.STRING_CHARSET)); + columnBuilders[6].writeBinary( + new Binary(tPipeInfo.getExceptionMessage(), TSFileConfig.STRING_CHARSET)); + + // Optional, default 0/0.0 + long remainingEventCount = tPipeInfo.getRemainingEventCount(); + double remainingTime = tPipeInfo.getEstimatedRemainingTime(); + + if (remainingEventCount == -1 && remainingTime == -1) { + final Pair remainingEventAndTime = + PipeDataNodeRemainingEventAndTimeMetrics.getInstance() + .getRemainingEventAndTime(tPipeInfo.getId(), tPipeInfo.getCreationTime()); + remainingEventCount = remainingEventAndTime.getLeft(); + remainingTime = remainingEventAndTime.getRight(); + } + + columnBuilders[7].writeLong(tPipeInfo.isSetRemainingEventCount() ? remainingEventCount : -1); + columnBuilders[8].writeDouble(tPipeInfo.isSetEstimatedRemainingTime() ? remainingTime : -1); + + resultBuilder.declarePosition(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + } + + private static class PipePluginSupplier extends TsBlockSupplier { + private Iterator iterator; + + private PipePluginSupplier(final List dataTypes, final String userName) { + super(dataTypes); + Coordinator.getInstance().getAccessControl().checkUserHasMaintainPrivilege(userName); + try (final ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + iterator = + client.getPipePluginTable().getAllPipePluginMeta().stream() + .map(PipePluginMeta::deserialize) + .filter( + pipePluginMeta -> + !BuiltinPipePlugin.SHOW_PIPE_PLUGINS_BLACKLIST.contains( + pipePluginMeta.getPluginName())) + .iterator(); + } catch (final Exception e) { + lastException = e; + } + } + + @Override + protected void constructLine() { + final PipePluginMeta pipePluginMeta = iterator.next(); + columnBuilders[0].writeBinary(BytesUtils.valueOf(pipePluginMeta.getPluginName())); + columnBuilders[1].writeBinary( + pipePluginMeta.isBuiltin() ? PIPE_PLUGIN_TYPE_BUILTIN : PIPE_PLUGIN_TYPE_EXTERNAL); + columnBuilders[2].writeBinary(BytesUtils.valueOf(pipePluginMeta.getClassName())); + if (Objects.nonNull(pipePluginMeta.getJarName())) { + columnBuilders[3].writeBinary(BytesUtils.valueOf(pipePluginMeta.getJarName())); + } else { + columnBuilders[3].appendNull(); + } + resultBuilder.declarePosition(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + } + + private static class TopicSupplier extends TsBlockSupplier { + private Iterator iterator; + + private TopicSupplier(final List dataTypes, final String userName) { + super(dataTypes); + Coordinator.getInstance().getAccessControl().checkUserHasMaintainPrivilege(userName); + try (final ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + iterator = client.showTopic(new TShowTopicReq()).getTopicInfoList().iterator(); + } catch (final Exception e) { + lastException = e; + } + } + + @Override + protected void constructLine() { + final TShowTopicInfo topicInfo = iterator.next(); + columnBuilders[0].writeBinary( + new Binary(topicInfo.getTopicName(), TSFileConfig.STRING_CHARSET)); + columnBuilders[1].writeBinary( + new Binary(topicInfo.getTopicAttributes(), TSFileConfig.STRING_CHARSET)); + resultBuilder.declarePosition(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + } + + private static class SubscriptionSupplier extends TsBlockSupplier { + private Iterator iterator; + + private SubscriptionSupplier(final List dataTypes, final String userName) { + super(dataTypes); + Coordinator.getInstance().getAccessControl().checkUserHasMaintainPrivilege(userName); + try (final ConfigNodeClient client = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + iterator = + client + .showSubscription(new TShowSubscriptionReq()) + .getSubscriptionInfoList() + .iterator(); + } catch (final Exception e) { + lastException = e; + } + } + + @Override + protected void constructLine() { + final TShowSubscriptionInfo tSubscriptionInfo = iterator.next(); + columnBuilders[0].writeBinary( + new Binary(tSubscriptionInfo.getTopicName(), TSFileConfig.STRING_CHARSET)); + columnBuilders[1].writeBinary( + new Binary(tSubscriptionInfo.getConsumerGroupId(), TSFileConfig.STRING_CHARSET)); + columnBuilders[2].writeBinary( + new Binary(tSubscriptionInfo.getConsumerIds().toString(), TSFileConfig.STRING_CHARSET)); + resultBuilder.declarePosition(); + } + + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + } + private static boolean canShowDB(final String userName, final String dbName) { try { Coordinator.getInstance().getAccessControl().checkCanShowOrUseDatabase(userName, dbName); - } catch (final AccessControlException e) { + } catch (final AccessDeniedException e) { + return false; + } + return true; + } + + private static boolean canShowTable( + final String userName, final String dbName, final String tableName) { + try { + Coordinator.getInstance() + .getAccessControl() + .checkCanShowOrDescTable(userName, new QualifiedObjectName(dbName, tableName)); + } catch (final AccessDeniedException e) { return false; } return true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java index 366af9fdfca8..d5a35ff8ca8c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java @@ -72,4 +72,6 @@ public interface IQueryExecution { String getStatementType(); IClientSession.SqlDialect getSQLDialect(); + + String getUser(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java index aefe524d2acd..682f4cb08618 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java @@ -695,6 +695,11 @@ public IClientSession.SqlDialect getSQLDialect() { return context.getSession().getSqlDialect(); } + @Override + public String getUser() { + return context.getSession().getUserName(); + } + public MPPQueryContext getContext() { return context; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java index 92a8a979d653..dcd71692fd3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/ConfigExecution.java @@ -308,4 +308,9 @@ public String getStatementType() { public IClientSession.SqlDialect getSQLDialect() { return context.getSession().getSqlDialect(); } + + @Override + public String getUser() { + return context.getSession().getUserName(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java index 3292d838e206..f186702d595c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/metadata/ShowPipePluginsTask.java @@ -46,8 +46,8 @@ public class ShowPipePluginsTask implements IConfigTask { - private static final Binary PIPE_PLUGIN_TYPE_BUILTIN = BytesUtils.valueOf("Builtin"); - private static final Binary PIPE_PLUGIN_TYPE_EXTERNAL = BytesUtils.valueOf("External"); + public static final Binary PIPE_PLUGIN_TYPE_BUILTIN = BytesUtils.valueOf("Builtin"); + public static final Binary PIPE_PLUGIN_TYPE_EXTERNAL = BytesUtils.valueOf("External"); private static final Binary PIPE_JAR_NAME_EMPTY_FIELD = BytesUtils.valueOf(""); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java index b91c9eecaf42..c8d92110d7c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/DataNodeLocationSupplierFactory.java @@ -87,14 +87,20 @@ private static InformationSchemaTableDataNodeLocationSupplier getInstance() { @Override public List getDataNodeLocations(final String tableName) { - if (tableName.equals(InformationSchema.QUERIES)) { - return getReadableDataNodeLocations(); - } else if (tableName.equals(InformationSchema.DATABASES) - || tableName.equals(InformationSchema.TABLES) - || tableName.equals(InformationSchema.COLUMNS)) { - return Collections.singletonList(DataNodeEndPoints.getLocalDataNodeLocation()); - } else { - throw new UnsupportedOperationException("Unknown table: " + tableName); + switch (tableName) { + case InformationSchema.QUERIES: + return getReadableDataNodeLocations(); + case InformationSchema.DATABASES: + case InformationSchema.TABLES: + case InformationSchema.COLUMNS: + case InformationSchema.REGIONS: + case InformationSchema.PIPES: + case InformationSchema.PIPE_PLUGINS: + case InformationSchema.TOPICS: + case InformationSchema.SUBSCRIPTIONS: + return Collections.singletonList(DataNodeEndPoints.getLocalDataNodeLocation()); + default: + throw new UnsupportedOperationException("Unknown table: " + tableName); } } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java index b479a5787907..6041f37bcec4 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/MergeTreeSortOperatorTest.java @@ -55,6 +55,7 @@ import org.apache.iotdb.db.storageengine.dataregion.read.reader.series.SeriesReaderTestUtil; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.datastructure.SortKey; +import org.apache.iotdb.isession.SessionConfig; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.ListenableFuture; @@ -1844,6 +1845,11 @@ public IClientSession.SqlDialect getSQLDialect() { return IClientSession.SqlDialect.TREE; } + @Override + public String getUser() { + return SessionConfig.DEFAULT_USER; + } + @Override public void start() {} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ShowQueriesTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ShowQueriesTest.java index bdf44401aade..63b7b783d747 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ShowQueriesTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/ShowQueriesTest.java @@ -32,6 +32,7 @@ import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.QUERY_ID_TABLE_MODEL; import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.START_TIME_TABLE_MODEL; import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.STATEMENT; +import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.USER; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanAssert.assertPlan; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.collect; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.exchange; @@ -60,7 +61,8 @@ public void testNormal() { START_TIME_TABLE_MODEL, DATA_NODE_ID_TABLE_MODEL, ELAPSED_TIME_TABLE_MODEL, - STATEMENT.toLowerCase(Locale.ENGLISH))))); + STATEMENT.toLowerCase(Locale.ENGLISH), + USER.toLowerCase(Locale.ENGLISH))))); // - Exchange // Output - Collect - Exchange @@ -137,6 +139,7 @@ public void testNonSelectAll() { START_TIME_TABLE_MODEL, DATA_NODE_ID_TABLE_MODEL, ELAPSED_TIME_TABLE_MODEL, - STATEMENT.toLowerCase(Locale.ENGLISH)))))); + STATEMENT.toLowerCase(Locale.ENGLISH), + USER.toLowerCase(Locale.ENGLISH)))))); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java index e2858e8dc44a..e475fbe14542 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/column/ColumnHeaderConstant.java @@ -212,6 +212,36 @@ private ColumnHeaderConstant() { public static final String SCHEMA_REGION_GROUP_NUM_TABLE_MODEL = "schema_region_group_num"; public static final String DATA_REGION_GROUP_NUM_TABLE_MODEL = "data_region_group_num"; + public static final String REGION_ID_TABLE_MODEL = "region_id"; + public static final String DATANODE_ID_TABLE_MODEL = "datanode_id"; + public static final String SERIES_SLOT_NUM_TABLE_MODEL = "series_slot_num"; + public static final String TIME_SLOT_NUM_TABLE_MODEL = "time_slot_num"; + public static final String RPC_ADDRESS_TABLE_MODEL = "rpc_address"; + public static final String RPC_PORT_TABLE_MODEL = "rpc_port"; + public static final String INTERNAL_ADDRESS_TABLE_MODEL = "internal_address"; + public static final String CREATE_TIME_TABLE_MODEL = "create_time"; + public static final String TS_FILE_SIZE_BYTES_TABLE_MODEL = "tsfile_size_bytes"; + + public static final String CREATION_TIME_TABLE_MODEL = "creation_time"; + public static final String PIPE_SOURCE_TABLE_MODEL = "pipe_source"; + public static final String PIPE_PROCESSOR_TABLE_MODEL = "pipe_processor"; + public static final String PIPE_SINK_TABLE_MODEL = "pipe_sink"; + public static final String EXCEPTION_MESSAGE_TABLE_MODEL = "exception_message"; + public static final String REMAINING_EVENT_COUNT_TABLE_MODEL = "remaining_event_count"; + public static final String ESTIMATED_REMAINING_SECONDS_TABLE_MODEL = + "estimated_remaining_seconds"; + + public static final String PLUGIN_NAME_TABLE_MODEL = "plugin_name"; + public static final String PLUGIN_TYPE_TABLE_MODEL = "plugin_type"; + public static final String CLASS_NAME_TABLE_MODEL = "class_name"; + public static final String PLUGIN_JAR_TABLE_MODEL = "plugin_jar"; + + public static final String TOPIC_NAME_TABLE_MODEL = "topic_name"; + public static final String TOPIC_CONFIGS_TABLE_MODEL = "topic_configs"; + + public static final String CONSUMER_GROUP_NAME_TABLE_MODEL = "consumer_group_name"; + public static final String SUBSCRIBED_CONSUMERS_TABLE_MODEL = "subscribed_consumers"; + // column names for show space quota public static final String QUOTA_TYPE = "QuotaType"; public static final String LIMIT = "Limit"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java index f2f4e548d4ce..6fdf456ab76a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/schema/table/InformationSchema.java @@ -37,6 +37,11 @@ public class InformationSchema { public static final String DATABASES = "databases"; public static final String TABLES = "tables"; public static final String COLUMNS = "columns"; + public static final String REGIONS = "regions"; + public static final String PIPES = "pipes"; + public static final String PIPE_PLUGINS = "pipe_plugins"; + public static final String TOPICS = "topics"; + public static final String SUBSCRIPTIONS = "subscriptions"; static { final TsTable queriesTable = new TsTable(QUERIES); @@ -52,6 +57,9 @@ public class InformationSchema { queriesTable.addColumnSchema( new AttributeColumnSchema( ColumnHeaderConstant.STATEMENT.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); + queriesTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.USER.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); queriesTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); schemaTables.put(QUERIES, queriesTable); @@ -114,6 +122,107 @@ public class InformationSchema { ColumnHeaderConstant.STATUS.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); columnTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); schemaTables.put(COLUMNS, columnTable); + + final TsTable regionTable = new TsTable(REGIONS); + regionTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.REGION_ID_TABLE_MODEL, TSDataType.INT32)); + regionTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.DATANODE_ID_TABLE_MODEL, TSDataType.INT32)); + regionTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.TYPE.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); + regionTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.STATUS.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); + regionTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.DATABASE.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); + regionTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.SERIES_SLOT_NUM_TABLE_MODEL, TSDataType.INT32)); + regionTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.TIME_SLOT_NUM_TABLE_MODEL, TSDataType.INT64)); + regionTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.RPC_ADDRESS_TABLE_MODEL, TSDataType.STRING)); + regionTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.RPC_PORT_TABLE_MODEL, TSDataType.INT32)); + regionTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.INTERNAL_ADDRESS_TABLE_MODEL, TSDataType.STRING)); + regionTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.ROLE.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); + regionTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.CREATE_TIME_TABLE_MODEL, TSDataType.TIMESTAMP)); + regionTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.TS_FILE_SIZE_BYTES_TABLE_MODEL, TSDataType.INT64)); + regionTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); + schemaTables.put(REGIONS, regionTable); + + final TsTable pipeTable = new TsTable(PIPES); + pipeTable.addColumnSchema( + new TagColumnSchema( + ColumnHeaderConstant.ID.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); + pipeTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.CREATION_TIME_TABLE_MODEL, TSDataType.TIMESTAMP)); + pipeTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.STATE.toLowerCase(Locale.ENGLISH), TSDataType.STRING)); + pipeTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.PIPE_SOURCE_TABLE_MODEL, TSDataType.STRING)); + pipeTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.PIPE_PROCESSOR_TABLE_MODEL, TSDataType.STRING)); + pipeTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.PIPE_SINK_TABLE_MODEL, TSDataType.STRING)); + pipeTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.EXCEPTION_MESSAGE_TABLE_MODEL, TSDataType.STRING)); + pipeTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.REMAINING_EVENT_COUNT_TABLE_MODEL, TSDataType.INT64)); + pipeTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.ESTIMATED_REMAINING_SECONDS_TABLE_MODEL, TSDataType.DOUBLE)); + pipeTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); + schemaTables.put(PIPES, pipeTable); + + final TsTable pipePluginTable = new TsTable(PIPE_PLUGINS); + pipePluginTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.PLUGIN_NAME_TABLE_MODEL, TSDataType.STRING)); + pipePluginTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.PLUGIN_TYPE_TABLE_MODEL, TSDataType.STRING)); + pipePluginTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.CLASS_NAME_TABLE_MODEL, TSDataType.STRING)); + pipePluginTable.addColumnSchema( + new AttributeColumnSchema(ColumnHeaderConstant.PLUGIN_JAR_TABLE_MODEL, TSDataType.STRING)); + pipePluginTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); + schemaTables.put(PIPE_PLUGINS, pipePluginTable); + + final TsTable topicTable = new TsTable(TOPICS); + topicTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.TOPIC_NAME_TABLE_MODEL, TSDataType.STRING)); + topicTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.TOPIC_CONFIGS_TABLE_MODEL, TSDataType.STRING)); + topicTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); + schemaTables.put(TOPICS, topicTable); + + final TsTable subscriptionTable = new TsTable(SUBSCRIPTIONS); + subscriptionTable.addColumnSchema( + new TagColumnSchema(ColumnHeaderConstant.TOPIC_NAME_TABLE_MODEL, TSDataType.STRING)); + subscriptionTable.addColumnSchema( + new TagColumnSchema( + ColumnHeaderConstant.CONSUMER_GROUP_NAME_TABLE_MODEL, TSDataType.STRING)); + subscriptionTable.addColumnSchema( + new AttributeColumnSchema( + ColumnHeaderConstant.SUBSCRIBED_CONSUMERS_TABLE_MODEL, TSDataType.STRING)); + subscriptionTable.removeColumnSchema(TsTable.TIME_COLUMN_NAME); + schemaTables.put(SUBSCRIPTIONS, subscriptionTable); } public static Map getSchemaTables() {