diff --git a/Cargo.lock b/Cargo.lock index 4fa808559b4d..f6d2fb42567c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1232,6 +1232,7 @@ dependencies = [ "async-stream", "async-trait", "backoff", + "catalog", "chrono", "common-catalog", "common-error", @@ -4054,7 +4055,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45#ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" +source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=4398d20c56d5f7939cc2960789cb1fa7dd18e6fe#4398d20c56d5f7939cc2960789cb1fa7dd18e6fe" dependencies = [ "prost", "serde", @@ -6116,6 +6117,7 @@ dependencies = [ name = "partition" version = "0.2.0" dependencies = [ + "api", "async-trait", "common-catalog", "common-error", @@ -8426,6 +8428,7 @@ dependencies = [ "humantime-serde", "hyper", "influxdb_line_protocol", + "itertools", "metrics", "metrics-process", "mime_guess", diff --git a/Cargo.toml b/Cargo.toml index d4f669b3cae1..12c8bb884853 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,7 +71,8 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" } futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" } +greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4398d20c56d5f7939cc2960789cb1fa7dd18e6fe" } +itertools = "0.10" parquet = "40.0" paste = "1.0" prost = "0.11" diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 579530784a00..172a060b30af 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -9,6 +9,6 @@ arrow.workspace = true clap = { version = "4.0", features = ["derive"] } client = { path = "../src/client" } indicatif = "0.17.1" -itertools = "0.10.5" +itertools.workspace = true parquet.workspace = true tokio.workspace = true diff --git a/benchmarks/src/bin/nyc-taxi.rs b/benchmarks/src/bin/nyc-taxi.rs index 8dfdd4befe14..9bc6a5fd2e15 100644 --- a/benchmarks/src/bin/nyc-taxi.rs +++ b/benchmarks/src/bin/nyc-taxi.rs @@ -26,7 +26,9 @@ use arrow::datatypes::{DataType, Float64Type, Int64Type}; use arrow::record_batch::RecordBatch; use clap::Parser; use client::api::v1::column::Values; -use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest}; +use client::api::v1::{ + Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests, +}; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -107,8 +109,12 @@ async fn write_data( columns, row_count, }; + let requests = InsertRequests { + inserts: vec![request], + }; + let now = Instant::now(); - db.insert(request).await.unwrap(); + db.insert(requests).await.unwrap(); let elapsed = now.elapsed(); total_rpc_elapsed_ms += elapsed.as_millis(); progress_bar.inc(row_count as _); diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index a48be91ab090..10ad7d5bb353 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -231,7 +231,7 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) { /// Returns the type name of the [Request]. pub fn request_type(request: &Request) -> &'static str { match request { - Request::Insert(_) => "insert", + Request::Inserts(_) => "inserts", Request::Query(query_req) => query_request_type(query_req), Request::Ddl(ddl_req) => ddl_request_type(ddl_req), Request::Delete(_) => "delete", diff --git a/src/catalog/Cargo.toml b/src/catalog/Cargo.toml index d6dbf30f597e..11c6c676e582 100644 --- a/src/catalog/Cargo.toml +++ b/src/catalog/Cargo.toml @@ -4,6 +4,9 @@ version.workspace = true edition.workspace = true license.workspace = true +[features] +testing = [] + [dependencies] api = { path = "../api" } arc-swap = "1.0" @@ -42,6 +45,7 @@ table = { path = "../table" } tokio.workspace = true [dev-dependencies] +catalog = { path = ".", features = ["testing"] } common-test-util = { path = "../common/test-util" } chrono.workspace = true log-store = { path = "../log-store" } diff --git a/src/catalog/src/remote.rs b/src/catalog/src/remote.rs index a4ebebf4fb3a..607a1cd56a24 100644 --- a/src/catalog/src/remote.rs +++ b/src/catalog/src/remote.rs @@ -27,6 +27,9 @@ use crate::error::Error; mod client; mod manager; +#[cfg(feature = "testing")] +pub mod mock; + #[derive(Debug, Clone)] pub struct Kv(pub Vec, pub Vec); diff --git a/src/catalog/tests/mock.rs b/src/catalog/src/remote/mock.rs similarity index 98% rename from src/catalog/tests/mock.rs rename to src/catalog/src/remote/mock.rs index 10a924e3dd50..e8bd73f93adb 100644 --- a/src/catalog/tests/mock.rs +++ b/src/catalog/src/remote/mock.rs @@ -20,9 +20,6 @@ use std::str::FromStr; use std::sync::Arc; use async_stream::stream; -use catalog::error::Error; -use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; -use catalog::remote::{Kv, KvBackend, ValueIter}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_recordbatch::RecordBatch; use common_telemetry::logging::info; @@ -37,6 +34,10 @@ use table::test_util::MemTable; use table::TableRef; use tokio::sync::RwLock; +use crate::error::Error; +use crate::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; +use crate::remote::{Kv, KvBackend, ValueIter}; + pub struct MockKvBackend { map: RwLock, Vec>>, } diff --git a/src/catalog/tests/remote_catalog_tests.rs b/src/catalog/tests/remote_catalog_tests.rs index f577844baeb8..324bae49f232 100644 --- a/src/catalog/tests/remote_catalog_tests.rs +++ b/src/catalog/tests/remote_catalog_tests.rs @@ -14,8 +14,6 @@ #![feature(assert_matches)] -mod mock; - #[cfg(test)] mod tests { use std::assert_matches::assert_matches; @@ -23,6 +21,7 @@ mod tests { use std::sync::Arc; use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue}; + use catalog::remote::mock::{MockKvBackend, MockTableEngine}; use catalog::remote::{ CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider, RemoteSchemaProvider, @@ -35,8 +34,6 @@ mod tests { use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; - use crate::mock::{MockKvBackend, MockTableEngine}; - #[tokio::test] async fn test_backend() { common_telemetry::init_default_ut_logging(); diff --git a/src/client/src/client_manager.rs b/src/client/src/client_manager.rs index 2f5bcc2f2638..d8e85785ad9d 100644 --- a/src/client/src/client_manager.rs +++ b/src/client/src/client_manager.rs @@ -16,7 +16,7 @@ use std::fmt::{Debug, Formatter}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use common_grpc::channel_manager::ChannelManager; +use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; use common_meta::peer::Peer; use common_telemetry::info; use moka::future::{Cache, CacheBuilder}; @@ -31,8 +31,11 @@ pub struct DatanodeClients { impl Default for DatanodeClients { fn default() -> Self { + // TODO(LFC): Make this channel config configurable. + let config = ChannelConfig::new().timeout(Duration::from_secs(8)); + Self { - channel_manager: ChannelManager::new(), + channel_manager: ChannelManager::with_config(config), clients: CacheBuilder::new(1024) .time_to_live(Duration::from_secs(30 * 60)) .time_to_idle(Duration::from_secs(5 * 60)) diff --git a/src/client/src/database.rs b/src/client/src/database.rs index 24541c5f76ce..6cb6e3aeef85 100644 --- a/src/client/src/database.rs +++ b/src/client/src/database.rs @@ -18,7 +18,7 @@ use api::v1::greptime_request::Request; use api::v1::query_request::Query; use api::v1::{ greptime_response, AffectedRows, AlterExpr, AuthHeader, CreateTableExpr, DdlRequest, - DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequest, PromRangeQuery, + DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery, QueryRequest, RequestHeader, }; use arrow_flight::{FlightData, Ticket}; @@ -109,9 +109,9 @@ impl Database { }); } - pub async fn insert(&self, request: InsertRequest) -> Result { + pub async fn insert(&self, requests: InsertRequests) -> Result { let _timer = timer!(metrics::METRIC_GRPC_INSERT); - self.handle(Request::Insert(request)).await + self.handle(Request::Inserts(requests)).await } pub async fn delete(&self, request: DeleteRequest) -> Result { diff --git a/src/common/grpc/src/channel_manager.rs b/src/common/grpc/src/channel_manager.rs index 977ddb3f0c24..4d80a0464984 100644 --- a/src/common/grpc/src/channel_manager.rs +++ b/src/common/grpc/src/channel_manager.rs @@ -28,6 +28,7 @@ use tower::make::MakeConnection; use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsConfigSnafu, Result}; const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60; +const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 2; #[derive(Clone, Debug)] pub struct ChannelManager { @@ -236,7 +237,7 @@ pub struct ChannelConfig { impl Default for ChannelConfig { fn default() -> Self { Self { - timeout: Some(Duration::from_secs(2)), + timeout: Some(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)), connect_timeout: Some(Duration::from_secs(4)), concurrency_limit: None, rate_limit: None, @@ -497,7 +498,7 @@ mod tests { let default_cfg = ChannelConfig::new(); assert_eq!( ChannelConfig { - timeout: Some(Duration::from_secs(2)), + timeout: Some(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)), connect_timeout: Some(Duration::from_secs(4)), concurrency_limit: None, rate_limit: None, diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index e50a50ba3556..3ffcb4c92bac 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -22,7 +22,7 @@ use api::v1::meta::{ }; use serde::{Deserialize, Serialize, Serializer}; use snafu::{OptionExt, ResultExt}; -use store_api::storage::RegionId; +use store_api::storage::{RegionId, RegionNumber}; use table::metadata::RawTableInfo; use crate::error::{self, Result}; @@ -141,9 +141,23 @@ impl TryFrom for RouteResponse { pub struct TableRoute { pub table: Table, pub region_routes: Vec, + region_leaders: HashMap>, } impl TableRoute { + pub fn new(table: Table, region_routes: Vec) -> Self { + let region_leaders = region_routes + .iter() + .map(|x| (x.region.id as RegionNumber, x.leader_peer.clone())) + .collect::>(); + + Self { + table, + region_routes, + region_leaders, + } + } + pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result { let table = table_route .table @@ -179,10 +193,7 @@ impl TableRoute { }); } - Ok(Self { - table, - region_routes, - }) + Ok(Self::new(table, region_routes)) } pub fn try_into_raw(self) -> Result<(Vec, PbTableRoute)> { @@ -267,6 +278,12 @@ impl TableRoute { }) .collect() } + + pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> { + self.region_leaders + .get(®ion_number) + .and_then(|x| x.as_ref()) + } } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] @@ -670,6 +687,10 @@ mod tests { follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")], }, ], + region_leaders: HashMap::from([ + (2, Some(Peer::new(1, "a1"))), + (1, Some(Peer::new(2, "a2"))), + ]), }; let from_raw = TableRoute::try_from_raw(&raw_peers, raw_table_route.clone()).unwrap(); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 77d0817e38d3..88b59b4ff590 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -230,6 +230,12 @@ pub enum Error { source: servers::error::Error, }, + #[snafu(display("Failed to wait for GRPC serving, source: {}", source))] + WaitForGrpcServing { + source: servers::error::Error, + location: Location, + }, + #[snafu(display("Failed to parse address {}, source: {}", addr, source))] ParseAddr { addr: String, @@ -467,6 +473,12 @@ pub enum Error { #[snafu(display("Missing WAL dir config"))] MissingWalDirConfig { location: Location }, + + #[snafu(display("Failed to join task, source: {}", source))] + JoinTask { + source: common_runtime::JoinError, + location: Location, + }, } pub type Result = std::result::Result; @@ -537,16 +549,19 @@ impl ErrorExt for Error { | GetTable { source, .. } => source.status_code(), // TODO(yingwen): Further categorize http error. - StartServer { .. } - | ParseAddr { .. } + ParseAddr { .. } | CreateDir { .. } | RemoveDir { .. } | Catalog { .. } | MissingRequiredField { .. } | IncorrectInternalState { .. } - | ShutdownServer { .. } | ShutdownInstance { .. } - | CloseTableEngine { .. } => StatusCode::Internal, + | CloseTableEngine { .. } + | JoinTask { .. } => StatusCode::Internal, + + StartServer { source } + | ShutdownServer { source } + | WaitForGrpcServing { source, .. } => source.status_code(), InitBackend { .. } => StatusCode::StorageUnavailable, diff --git a/src/datanode/src/instance/grpc.rs b/src/datanode/src/instance/grpc.rs index 841cf5aaa8b4..a727bf27c1b5 100644 --- a/src/datanode/src/instance/grpc.rs +++ b/src/datanode/src/instance/grpc.rs @@ -18,15 +18,17 @@ use std::sync::Arc; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request as GrpcRequest; use api::v1::query_request::Query; -use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequest}; +use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequests}; use async_trait::async_trait; use catalog::CatalogManagerRef; +use common_grpc_expr::insert::to_table_insert_request; use common_query::Output; use datafusion::catalog::catalog::{ CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider, }; use datafusion::catalog::schema::SchemaProvider; use datafusion::datasource::TableProvider; +use futures::future; use query::parser::{PromQuery, QueryLanguageParser, QueryStatement}; use query::plan::LogicalPlan; use query::query_engine::SqlStatementExecutor; @@ -41,8 +43,8 @@ use table::table::adapter::DfTableProviderAdapter; use crate::error::{ self, CatalogNotFoundSnafu, CatalogSnafu, DecodeLogicalPlanSnafu, DeleteExprToRequestSnafu, - DeleteSnafu, ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertSnafu, PlanStatementSnafu, Result, - SchemaNotFoundSnafu, TableNotFoundSnafu, + DeleteSnafu, ExecuteLogicalPlanSnafu, ExecuteSqlSnafu, InsertDataSnafu, InsertSnafu, + JoinTaskSnafu, PlanStatementSnafu, Result, SchemaNotFoundSnafu, TableNotFoundSnafu, }; use crate::instance::Instance; @@ -119,31 +121,41 @@ impl Instance { } } - pub async fn handle_insert( + pub async fn handle_inserts( &self, - request: InsertRequest, - ctx: QueryContextRef, + requests: InsertRequests, + ctx: &QueryContextRef, ) -> Result { - let catalog = &ctx.current_catalog(); - let schema = &ctx.current_schema(); - let table_name = &request.table_name.clone(); - let table_ref = TableReference::full(catalog, schema, table_name); - - let table = self - .catalog_manager - .table(catalog, schema, table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: table_ref.to_string(), - })?; - - let request = common_grpc_expr::insert::to_table_insert_request(catalog, schema, request) - .context(error::InsertDataSnafu)?; + let results = future::try_join_all(requests.inserts.into_iter().map(|insert| { + let catalog_manager = self.catalog_manager.clone(); + let catalog = ctx.current_catalog(); + let schema = ctx.current_schema(); + + common_runtime::spawn_write(async move { + let table_name = &insert.table_name.clone(); + let table = catalog_manager + .table(&catalog, &schema, table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: common_catalog::format_full_table_name( + &catalog, &schema, table_name, + ), + })?; - let affected_rows = table.insert(request).await.with_context(|_| InsertSnafu { - table_name: table_ref.to_string(), - })?; + let request = + to_table_insert_request(&catalog, &schema, insert).context(InsertDataSnafu)?; + + table.insert(request).await.with_context(|_| InsertSnafu { + table_name: common_catalog::format_full_table_name( + &catalog, &schema, table_name, + ), + }) + }) + })) + .await + .context(JoinTaskSnafu)?; + let affected_rows = results.into_iter().sum::>()?; Ok(Output::AffectedRows(affected_rows)) } @@ -191,7 +203,7 @@ impl GrpcQueryHandler for Instance { async fn do_query(&self, request: GrpcRequest, ctx: QueryContextRef) -> Result { match request { - GrpcRequest::Insert(request) => self.handle_insert(request, ctx).await, + GrpcRequest::Inserts(requests) => self.handle_inserts(requests, &ctx).await, GrpcRequest::Delete(request) => self.handle_delete(request, ctx).await, GrpcRequest::Query(query_request) => { let query = query_request @@ -296,7 +308,7 @@ mod test { use api::v1::column::{SemanticType, Values}; use api::v1::{ alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, - CreateDatabaseExpr, CreateTableExpr, QueryRequest, + CreateDatabaseExpr, CreateTableExpr, InsertRequest, InsertRequests, QueryRequest, }; use common_catalog::consts::MITO_ENGINE; use common_recordbatch::RecordBatches; @@ -481,7 +493,9 @@ mod test { ..Default::default() }; - let query = GrpcRequest::Insert(insert); + let query = GrpcRequest::Inserts(InsertRequests { + inserts: vec![insert], + }); let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); assert!(matches!(output, Output::AffectedRows(3))); diff --git a/src/datanode/src/server.rs b/src/datanode/src/server.rs index 3d51c271b5a9..db0c41e466db 100644 --- a/src/datanode/src/server.rs +++ b/src/datanode/src/server.rs @@ -17,17 +17,18 @@ use std::net::SocketAddr; use std::sync::Arc; use common_runtime::Builder as RuntimeBuilder; +use futures::future; use servers::grpc::GrpcServer; use servers::http::{HttpServer, HttpServerBuilder}; use servers::metrics_handler::MetricsHandler; use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor; use servers::server::Server; use snafu::ResultExt; -use tokio::select; use crate::datanode::DatanodeOptions; use crate::error::{ ParseAddrSnafu, Result, RuntimeResourceSnafu, ShutdownServerSnafu, StartServerSnafu, + WaitForGrpcServingSnafu, }; use crate::instance::InstanceRef; @@ -71,10 +72,14 @@ impl Services { })?; let grpc = self.grpc_server.start(grpc_addr); let http = self.http_server.start(http_addr); - select!( - v = grpc => v.context(StartServerSnafu)?, - v = http => v.context(StartServerSnafu)?, - ); + future::try_join_all(vec![grpc, http]) + .await + .context(StartServerSnafu)?; + + self.grpc_server + .wait_for_serve() + .await + .context(WaitForGrpcServingSnafu)?; Ok(()) } diff --git a/src/frontend/Cargo.toml b/src/frontend/Cargo.toml index 71828fbec530..4487b10e3743 100644 --- a/src/frontend/Cargo.toml +++ b/src/frontend/Cargo.toml @@ -37,7 +37,7 @@ datatypes = { path = "../datatypes" } file-table-engine = { path = "../file-table-engine" } futures = "0.3" futures-util.workspace = true -itertools = "0.10" +itertools.workspace = true meta-client = { path = "../meta-client" } meter-core.workspace = true meter-macros.workspace = true @@ -65,6 +65,7 @@ tokio.workspace = true tonic.workspace = true [dev-dependencies] +catalog = { path = "../catalog", features = ["testing"] } common-test-util = { path = "../common/test-util" } datanode = { path = "../datanode" } futures = "0.3" diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 70880063bf10..d91fd6eabb1f 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -279,9 +279,7 @@ impl CatalogManager for FrontendCatalogManager { Ok(self.backend.get(key.as_bytes()).await?.map(|_| { Arc::new(FrontendCatalogProvider { catalog_name: catalog.to_string(), - backend: self.backend.clone(), - partition_manager: self.partition_manager.clone(), - datanode_clients: self.datanode_clients.clone(), + catalog_manager: Arc::new(self.clone()), }) as Arc<_> })) } @@ -320,9 +318,7 @@ impl CatalogManager for FrontendCatalogManager { pub struct FrontendCatalogProvider { catalog_name: String, - backend: KvBackendRef, - partition_manager: PartitionRuleManagerRef, - datanode_clients: Arc, + catalog_manager: Arc, } #[async_trait::async_trait] @@ -333,7 +329,8 @@ impl CatalogProvider for FrontendCatalogProvider { async fn schema_names(&self) -> catalog::error::Result> { let key = build_schema_prefix(&self.catalog_name); - let mut iter = self.backend.range(key.as_bytes()); + let backend = self.catalog_manager.backend(); + let mut iter = backend.range(key.as_bytes()); let mut res = HashSet::new(); while let Some(r) = iter.next().await { let Kv(k, _) = r?; @@ -361,15 +358,17 @@ impl CatalogProvider for FrontendCatalogProvider { } .to_string(); - let val = self.backend.get(schema_key.as_bytes()).await?; + let val = self + .catalog_manager + .backend() + .get(schema_key.as_bytes()) + .await?; let provider = val.map(|_| { Arc::new(FrontendSchemaProvider { catalog_name: catalog.clone(), schema_name: name.to_string(), - backend: self.backend.clone(), - partition_manager: self.partition_manager.clone(), - datanode_clients: self.datanode_clients.clone(), + catalog_manager: self.catalog_manager.clone(), }) as Arc }); @@ -380,9 +379,7 @@ impl CatalogProvider for FrontendCatalogProvider { pub struct FrontendSchemaProvider { catalog_name: String, schema_name: String, - backend: KvBackendRef, - partition_manager: PartitionRuleManagerRef, - datanode_clients: Arc, + catalog_manager: Arc, } #[async_trait] @@ -397,7 +394,8 @@ impl SchemaProvider for FrontendSchemaProvider { tables.push("numbers".to_string()); } let key = build_table_global_prefix(&self.catalog_name, &self.schema_name); - let iter = self.backend.range(key.as_bytes()); + let backend = self.catalog_manager.backend(); + let iter = backend.range(key.as_bytes()); let result = iter .map(|r| { let Kv(k, _) = r?; @@ -424,7 +422,9 @@ impl SchemaProvider for FrontendSchemaProvider { schema_name: self.schema_name.clone(), table_name: name.to_string(), }; - let Some(kv) = self.backend.get(table_global_key.to_string().as_bytes()).await? else { return Ok(None) }; + let Some(kv) = self.catalog_manager.backend().get(table_global_key.to_string().as_bytes()).await? else { + return Ok(None); + }; let v = TableGlobalValue::from_bytes(kv.1).context(InvalidCatalogValueSnafu)?; let table_info = Arc::new( v.table_info @@ -434,9 +434,7 @@ impl SchemaProvider for FrontendSchemaProvider { let table = Arc::new(DistTable::new( TableName::new(&self.catalog_name, &self.schema_name, name), table_info, - self.partition_manager.clone(), - self.datanode_clients.clone(), - self.backend.clone(), + self.catalog_manager.clone(), )); Ok(Some(table)) } diff --git a/src/frontend/src/error.rs b/src/frontend/src/error.rs index a7d5ccc6fcb9..fbc545714ad0 100644 --- a/src/frontend/src/error.rs +++ b/src/frontend/src/error.rs @@ -19,7 +19,7 @@ use datafusion::parquet; use datatypes::arrow::error::ArrowError; use datatypes::value::Value; use snafu::Location; -use store_api::storage::RegionId; +use store_api::storage::RegionNumber; #[derive(Debug, Snafu)] #[snafu(visibility(pub))] @@ -96,6 +96,9 @@ pub enum Error { source: api::error::Error, }, + #[snafu(display("Failed to convert vector to GRPC column, reason: {}", reason))] + VectorToGrpcColumn { reason: String, location: Location }, + #[snafu(display( "Failed to convert column default constraint, column: {}, source: {}", column_name, @@ -115,7 +118,7 @@ pub enum Error { #[snafu(display("Failed to find Datanode by region: {:?}", region))] FindDatanode { - region: RegionId, + region: RegionNumber, location: Location, }, @@ -192,6 +195,12 @@ pub enum Error { source: partition::error::Error, }, + #[snafu(display("Failed to split insert request, source: {}", source))] + SplitInsert { + source: partition::error::Error, + location: Location, + }, + #[snafu(display("Failed to create table info, source: {}", source))] CreateTableInfo { #[snafu(backtrace)] @@ -582,7 +591,8 @@ impl ErrorExt for Error { | Error::CreateTableRoute { .. } | Error::FindRegionRoute { .. } | Error::BuildDfLogicalPlan { .. } - | Error::BuildTableMeta { .. } => StatusCode::Internal, + | Error::BuildTableMeta { .. } + | Error::VectorToGrpcColumn { .. } => StatusCode::Internal, Error::IncompleteGrpcResult { .. } | Error::ContextValueNotFound { .. } @@ -623,7 +633,9 @@ impl ErrorExt for Error { Error::External { source } => source.status_code(), Error::DeserializePartition { source, .. } | Error::FindTablePartitionRule { source, .. } - | Error::FindTableRoute { source, .. } => source.status_code(), + | Error::FindTableRoute { source, .. } + | Error::SplitInsert { source, .. } => source.status_code(), + Error::UnrecognizedTableOption { .. } => StatusCode::InvalidArguments, Error::StartScriptManager { source } => source.status_code(), diff --git a/src/frontend/src/heartbeat/handler.rs b/src/frontend/src/heartbeat/handler.rs index 8e4a6c80c353..24f15d115e66 100644 --- a/src/frontend/src/heartbeat/handler.rs +++ b/src/frontend/src/heartbeat/handler.rs @@ -13,5 +13,6 @@ // limitations under the License. pub mod invalidate_table_cache; + #[cfg(test)] -mod tests; +pub(crate) mod tests; diff --git a/src/frontend/src/heartbeat/handler/tests.rs b/src/frontend/src/heartbeat/handler/tests.rs index b3fa1f2810ca..0e6d03782438 100644 --- a/src/frontend/src/heartbeat/handler/tests.rs +++ b/src/frontend/src/heartbeat/handler/tests.rs @@ -30,6 +30,7 @@ use tokio::sync::mpsc; use super::invalidate_table_cache::InvalidateTableCacheHandler; +#[derive(Default)] pub struct MockKvCacheInvalidator { inner: Mutex, i32>>, } diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 3bdd97fb7e8f..a231137b2dbc 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -28,7 +28,7 @@ use api::v1::alter_expr::Kind; use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::meta::Role; -use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, InsertRequest}; +use api::v1::{AddColumns, AlterExpr, Column, DdlRequest, InsertRequest, InsertRequests}; use async_trait::async_trait; use catalog::remote::CachedMetaKvBackend; use catalog::CatalogManagerRef; @@ -47,6 +47,7 @@ use datanode::instance::sql::table_idents_to_full_name; use datanode::instance::InstanceRef as DnInstanceRef; use datatypes::schema::Schema; use distributed::DistInstance; +use futures::future; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOptions; use partition::manager::PartitionRuleManager; @@ -281,24 +282,21 @@ impl Instance { /// Handle batch inserts pub async fn handle_inserts( &self, - requests: Vec, + requests: InsertRequests, ctx: QueryContextRef, ) -> Result { - let mut success = 0; - for request in requests { - match self.handle_insert(request, ctx.clone()).await? { - Output::AffectedRows(rows) => success += rows, - _ => unreachable!("Insert should not yield output other than AffectedRows"), - } - } - Ok(Output::AffectedRows(success)) - } - - async fn handle_insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result { - self.create_or_alter_table_on_demand(ctx.clone(), &request) - .await?; + // TODO(LFC): Optimize concurrent table creation and table alteration. + // Currently table creation is guarded by a distributed lock in Metasrv. However, table + // alteration is not. We should all switch to procedures in Metasrv. + let _ = future::join_all( + requests + .inserts + .iter() + .map(|x| self.create_or_alter_table_on_demand(ctx.clone(), x)), + ) + .await; - let query = Request::Insert(request); + let query = Request::Inserts(requests); GrpcQueryHandler::do_query(&*self.grpc_query_handler, query, ctx).await } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index a808e21f6b96..ce357d5b6d00 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub(crate) mod inserter; + use std::collections::HashMap; use std::sync::Arc; @@ -20,7 +22,7 @@ use api::v1::ddl_request::Expr as DdlExpr; use api::v1::greptime_request::Request; use api::v1::{ column_def, AlterExpr, CreateDatabaseExpr, CreateTableExpr, DeleteRequest, DropTableExpr, - FlushTableExpr, InsertRequest, TableId, + FlushTableExpr, InsertRequests, TableId, }; use async_trait::async_trait; use catalog::helper::{SchemaKey, SchemaValue}; @@ -38,7 +40,7 @@ use common_meta::rpc::router::{ use common_meta::rpc::store::CompareAndPutRequest; use common_meta::table_name::TableName; use common_query::Output; -use common_telemetry::debug; +use common_telemetry::{debug, info, warn}; use datanode::instance::sql::table_idents_to_full_name; use datanode::sql::SqlHandler; use datatypes::prelude::ConcreteDataType; @@ -68,9 +70,10 @@ use crate::error::{ DeserializePartitionSnafu, InvokeDatanodeSnafu, ParseSqlSnafu, PrimaryKeyNotFoundSnafu, RequestDatanodeSnafu, RequestMetaSnafu, Result, SchemaExistsSnafu, StartMetaClientSnafu, TableAlreadyExistSnafu, TableNotFoundSnafu, TableSnafu, ToTableDeleteRequestSnafu, - ToTableInsertRequestSnafu, UnrecognizedTableOptionSnafu, + UnrecognizedTableOptionSnafu, }; use crate::expr_factory; +use crate::instance::distributed::inserter::DistInserter; use crate::table::DistTable; const MAX_VALUE: &str = "MAXVALUE"; @@ -95,6 +98,17 @@ impl DistInstance { } } + async fn find_table(&self, table_name: &TableName) -> Result> { + self.catalog_manager + .table( + &table_name.catalog_name, + &table_name.schema_name, + &table_name.table_name, + ) + .await + .context(CatalogSnafu) + } + pub async fn create_table( &self, create_table: &mut CreateTableExpr, @@ -107,16 +121,7 @@ impl DistInstance { &create_table.table_name, ); - if let Some(table) = self - .catalog_manager - .table( - &table_name.catalog_name, - &table_name.schema_name, - &table_name.table_name, - ) - .await - .context(CatalogSnafu)? - { + if let Some(table) = self.find_table(&table_name).await? { return if create_table.create_if_not_exists { Ok(table) } else { @@ -131,7 +136,19 @@ impl DistInstance { let response = self .create_table_in_meta(create_table, partitions, &table_info) - .await?; + .await; + let response = match response { + Ok(response) => response, + Err(e) => { + return if let Some(table) = self.find_table(&table_name).await? { + warn!("Table '{table_name}' is created concurrently by other Frontend nodes!"); + Ok(table) + } else { + Err(e) + } + } + }; + let table_routes = response.table_routes; ensure!( table_routes.len() == 1, @@ -140,7 +157,7 @@ impl DistInstance { } ); let table_route = table_routes.first().unwrap(); - debug!( + info!( "Creating distributed table {table_name} with table routes: {}", serde_json::to_string_pretty(table_route) .unwrap_or_else(|_| format!("{table_route:#?}")) @@ -162,9 +179,7 @@ impl DistInstance { let table = Arc::new(DistTable::new( table_name.clone(), table_info, - self.catalog_manager.partition_manager(), - self.catalog_manager.datanode_clients(), - self.catalog_manager.backend(), + self.catalog_manager.clone(), )); let request = RegisterTableRequest { @@ -569,36 +584,21 @@ impl DistInstance { .context(RequestMetaSnafu) } - // TODO(LFC): Refactor GRPC insertion and deletion implementation here, - // Take insertion as an example. GRPC insertion is converted to Table InsertRequest here, - // than split the Table InsertRequest in DistTable, than assemble each GRPC InsertRequest there. - // Rather inefficient, should operate on GRPC InsertRequest directly. async fn handle_dist_insert( &self, - request: InsertRequest, + requests: InsertRequests, ctx: QueryContextRef, ) -> Result { - let catalog = &ctx.current_catalog(); - let schema = &ctx.current_schema(); - let table_name = &request.table_name; - let table_ref = TableReference::full(catalog, schema, table_name); - - let table = self - .catalog_manager - .table(catalog, schema, table_name) - .await - .context(CatalogSnafu)? - .with_context(|| TableNotFoundSnafu { - table_name: table_ref.to_string(), - })?; - - let request = common_grpc_expr::insert::to_table_insert_request(catalog, schema, request) - .context(ToTableInsertRequestSnafu)?; - - let affected_rows = table.insert(request).await.context(TableSnafu)?; - Ok(Output::AffectedRows(affected_rows)) + let inserter = DistInserter::new( + ctx.current_catalog(), + ctx.current_schema(), + self.catalog_manager.clone(), + ); + let affected_rows = inserter.grpc_insert(requests).await?; + Ok(Output::AffectedRows(affected_rows as usize)) } + // TODO(LFC): Like insertions above, refactor GRPC deletion impl here. async fn handle_dist_delete( &self, request: DeleteRequest, @@ -650,7 +650,7 @@ impl GrpcQueryHandler for DistInstance { async fn do_query(&self, request: Request, ctx: QueryContextRef) -> Result { match request { - Request::Insert(request) => self.handle_dist_insert(request, ctx).await, + Request::Inserts(requests) => self.handle_dist_insert(requests, ctx).await, Request::Delete(request) => self.handle_dist_delete(request, ctx).await, Request::Query(_) => { unreachable!("Query should have been handled directly in Frontend Instance!") diff --git a/src/frontend/src/instance/distributed/inserter.rs b/src/frontend/src/instance/distributed/inserter.rs new file mode 100644 index 000000000000..896247193e30 --- /dev/null +++ b/src/frontend/src/instance/distributed/inserter.rs @@ -0,0 +1,380 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::InsertRequests; +use catalog::CatalogManager; +use client::Database; +use common_grpc_expr::insert::to_table_insert_request; +use common_meta::peer::Peer; +use common_meta::table_name::TableName; +use futures::future; +use metrics::counter; +use snafu::{OptionExt, ResultExt}; +use table::metadata::TableInfoRef; +use table::meter_insert_request; +use table::requests::InsertRequest; + +use crate::catalog::FrontendCatalogManager; +use crate::error::{ + CatalogSnafu, FindDatanodeSnafu, FindTableRouteSnafu, JoinTaskSnafu, RequestDatanodeSnafu, + Result, SplitInsertSnafu, TableNotFoundSnafu, ToTableInsertRequestSnafu, +}; +use crate::table::insert::to_grpc_insert_request; + +/// A distributed inserter. It ingests GRPC [InsertRequests] or table [InsertRequest] (so it can be +/// used in protocol handlers or table insertion API). +/// +/// Table data partitioning and Datanode requests batching are handled inside. +/// +/// Note that the inserter is confined to a single catalog and schema. I.e., it cannot handle +/// multiple insert requests with different catalog or schema (will throw "NotSupported" error). +/// This is because we currently do not have this kind of requirements. Let's keep it simple for now. +pub(crate) struct DistInserter { + catalog: String, + schema: String, + catalog_manager: Arc, +} + +impl DistInserter { + pub(crate) fn new( + catalog: String, + schema: String, + catalog_manager: Arc, + ) -> Self { + Self { + catalog, + schema, + catalog_manager, + } + } + + pub(crate) async fn grpc_insert(&self, requests: InsertRequests) -> Result { + let inserts = requests + .inserts + .into_iter() + .map(|x| { + to_table_insert_request(&self.catalog, &self.schema, x) + .context(ToTableInsertRequestSnafu) + }) + .collect::>>()?; + + self.insert(inserts).await + } + + pub(crate) async fn insert(&self, requests: Vec) -> Result { + debug_assert!(requests + .iter() + .all(|x| x.catalog_name == self.catalog && x.schema_name == self.schema)); + + let inserts = self.split_inserts(requests).await?; + + self.request_datanodes(inserts).await + } + + /// Splits multiple table [InsertRequest]s into multiple GRPC [InsertRequests]s, each of which + /// is grouped by the peer of Datanode, so we can batch them together when invoking gRPC write + /// method in Datanode. + async fn split_inserts( + &self, + requests: Vec, + ) -> Result> { + let partition_manager = self.catalog_manager.partition_manager(); + + let mut inserts = HashMap::new(); + + for request in requests { + meter_insert_request!(request); + + let table_name = TableName::new(&self.catalog, &self.schema, &request.table_name); + let table_info = self.find_table_info(&request.table_name).await?; + let table_meta = &table_info.meta; + + let split = partition_manager + .split_insert_request(&table_name, request, table_meta.schema.as_ref()) + .await + .context(SplitInsertSnafu)?; + + let table_route = partition_manager + .find_table_route(&table_name) + .await + .with_context(|_| FindTableRouteSnafu { + table_name: table_name.to_string(), + })?; + + for (region_number, insert) in split { + let datanode = + table_route + .find_region_leader(region_number) + .context(FindDatanodeSnafu { + region: region_number, + })?; + + let insert = to_grpc_insert_request(table_meta, region_number, insert)?; + + inserts + .entry(datanode.clone()) + .or_insert_with(|| InsertRequests { inserts: vec![] }) + .inserts + .push(insert); + } + } + Ok(inserts) + } + + async fn find_table_info(&self, table_name: &str) -> Result { + let table = self + .catalog_manager + .table(&self.catalog, &self.schema, table_name) + .await + .context(CatalogSnafu)? + .with_context(|| TableNotFoundSnafu { + table_name: common_catalog::format_full_table_name( + &self.catalog, + &self.schema, + table_name, + ), + })?; + Ok(table.table_info()) + } + + async fn request_datanodes(&self, inserts: HashMap) -> Result { + let results = future::try_join_all(inserts.into_iter().map(|(peer, inserts)| { + let datanode_clients = self.catalog_manager.datanode_clients(); + let catalog = self.catalog.clone(); + let schema = self.schema.clone(); + + common_runtime::spawn_write(async move { + let client = datanode_clients.get_client(&peer).await; + let database = Database::new(&catalog, &schema, client); + database.insert(inserts).await.context(RequestDatanodeSnafu) + }) + })) + .await + .context(JoinTaskSnafu)?; + + let affected_rows = results.into_iter().sum::>()?; + counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows as u64); + Ok(affected_rows) + } +} + +#[cfg(test)] +mod tests { + use api::v1::column::{SemanticType, Values}; + use api::v1::{Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; + use catalog::helper::{ + CatalogKey, CatalogValue, SchemaKey, SchemaValue, TableGlobalKey, TableGlobalValue, + }; + use catalog::remote::mock::MockKvBackend; + use catalog::remote::{KvBackend, KvBackendRef}; + use client::client_manager::DatanodeClients; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use datatypes::prelude::{ConcreteDataType, VectorRef}; + use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema, Schema}; + use datatypes::vectors::Int32Vector; + use table::metadata::{TableInfoBuilder, TableMetaBuilder}; + + use super::*; + use crate::heartbeat::handler::tests::MockKvCacheInvalidator; + use crate::table::test::create_partition_rule_manager; + + async fn prepare_mocked_backend() -> KvBackendRef { + let backend = Arc::new(MockKvBackend::default()); + + let default_catalog = CatalogKey { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + } + .to_string(); + backend + .set( + default_catalog.as_bytes(), + CatalogValue.as_bytes().unwrap().as_slice(), + ) + .await + .unwrap(); + + let default_schema = SchemaKey { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + } + .to_string(); + backend + .set( + default_schema.as_bytes(), + SchemaValue.as_bytes().unwrap().as_slice(), + ) + .await + .unwrap(); + + backend + } + + async fn create_testing_table(backend: &KvBackendRef, table_name: &str) { + let table_global_key = TableGlobalKey { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + }; + + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) + .with_time_index(true) + .with_default_constraint(Some(ColumnDefaultConstraint::Function( + "current_timestamp()".to_string(), + ))) + .unwrap(), + ColumnSchema::new("a", ConcreteDataType::int32_datatype(), true), + ])); + + let mut builder = TableMetaBuilder::default(); + builder.schema(schema); + builder.primary_key_indices(vec![]); + builder.next_column_id(1); + let table_meta = builder.build().unwrap(); + + let table_info = TableInfoBuilder::new(table_name, table_meta) + .build() + .unwrap(); + + let table_global_value = TableGlobalValue { + node_id: 1, + regions_id_map: HashMap::from([(1, vec![1]), (2, vec![2]), (3, vec![3])]), + table_info: table_info.into(), + }; + + backend + .set( + table_global_key.to_string().as_bytes(), + table_global_value.as_bytes().unwrap().as_slice(), + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_split_inserts() { + let backend = prepare_mocked_backend().await; + + let table_name = "one_column_partitioning_table"; + create_testing_table(&backend, table_name).await; + + let catalog_manager = Arc::new(FrontendCatalogManager::new( + backend, + Arc::new(MockKvCacheInvalidator::default()), + create_partition_rule_manager().await, + Arc::new(DatanodeClients::default()), + )); + + let inserter = DistInserter::new( + DEFAULT_CATALOG_NAME.to_string(), + DEFAULT_SCHEMA_NAME.to_string(), + catalog_manager, + ); + + let new_insert_request = |vector: VectorRef| -> InsertRequest { + InsertRequest { + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + columns_values: HashMap::from([("a".to_string(), vector)]), + region_number: 0, + } + }; + let requests = vec![ + new_insert_request(Arc::new(Int32Vector::from(vec![ + Some(1), + None, + Some(11), + Some(101), + ]))), + new_insert_request(Arc::new(Int32Vector::from(vec![ + Some(2), + Some(12), + None, + Some(102), + ]))), + ]; + + let mut inserts = inserter.split_inserts(requests).await.unwrap(); + assert_eq!(inserts.len(), 3); + + let new_grpc_insert_request = |column_values: Vec, + null_mask: Vec, + row_count: u32, + region_number: u32| + -> GrpcInsertRequest { + GrpcInsertRequest { + table_name: table_name.to_string(), + columns: vec![Column { + column_name: "a".to_string(), + semantic_type: SemanticType::Field as i32, + values: Some(Values { + i32_values: column_values, + ..Default::default() + }), + null_mask, + datatype: ColumnDataType::Int32 as i32, + }], + row_count, + region_number, + } + }; + + // region to datanode placement: + // 1 -> 1 + // 2 -> 2 + // 3 -> 3 + // + // region value ranges: + // 1 -> [50, max) + // 2 -> [10, 50) + // 3 -> (min, 10) + + let datanode_inserts = inserts.remove(&Peer::new(1, "")).unwrap().inserts; + assert_eq!(datanode_inserts.len(), 2); + assert_eq!( + datanode_inserts[0], + new_grpc_insert_request(vec![101], vec![0], 1, 1) + ); + assert_eq!( + datanode_inserts[1], + new_grpc_insert_request(vec![102], vec![0], 1, 1) + ); + + let datanode_inserts = inserts.remove(&Peer::new(2, "")).unwrap().inserts; + assert_eq!(datanode_inserts.len(), 2); + assert_eq!( + datanode_inserts[0], + new_grpc_insert_request(vec![11], vec![0], 1, 2) + ); + assert_eq!( + datanode_inserts[1], + new_grpc_insert_request(vec![12], vec![0], 1, 2) + ); + + let datanode_inserts = inserts.remove(&Peer::new(3, "")).unwrap().inserts; + assert_eq!(datanode_inserts.len(), 2); + assert_eq!( + datanode_inserts[0], + new_grpc_insert_request(vec![1], vec![2], 2, 3) + ); + assert_eq!( + datanode_inserts[1], + new_grpc_insert_request(vec![2], vec![2], 2, 3) + ); + } +} diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index a2d46e3dbaf5..a20d0f78bc86 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -36,7 +36,7 @@ impl GrpcQueryHandler for Instance { interceptor.pre_execute(&request, ctx.clone())?; let output = match request { - Request::Insert(request) => self.handle_insert(request, ctx.clone()).await?, + Request::Inserts(requests) => self.handle_inserts(requests, ctx.clone()).await?, Request::Query(query_request) => { let query = query_request.query.context(IncompleteGrpcResultSnafu { err_msg: "Missing field 'QueryRequest.query'", diff --git a/src/frontend/src/instance/opentsdb.rs b/src/frontend/src/instance/opentsdb.rs index 42c0ea7000cf..7e388408d978 100644 --- a/src/frontend/src/instance/opentsdb.rs +++ b/src/frontend/src/instance/opentsdb.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use api::v1::InsertRequests; use async_trait::async_trait; use common_error::prelude::BoxedError; use servers::error as server_error; @@ -25,8 +26,10 @@ use crate::instance::Instance; #[async_trait] impl OpentsdbProtocolHandler for Instance { async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> { - let request = data_point.as_grpc_insert(); - self.handle_insert(request, ctx) + let requests = InsertRequests { + inserts: vec![data_point.as_grpc_insert()], + }; + self.handle_inserts(requests, ctx) .await .map_err(BoxedError::new) .with_context(|_| server_error::ExecuteQuerySnafu { diff --git a/src/frontend/src/instance/prometheus.rs b/src/frontend/src/instance/prometheus.rs index 6dd1d0b04c6c..f06db0fcff49 100644 --- a/src/frontend/src/instance/prometheus.rs +++ b/src/frontend/src/instance/prometheus.rs @@ -21,6 +21,7 @@ use common_error::prelude::BoxedError; use common_query::Output; use common_recordbatch::RecordBatches; use common_telemetry::logging; +use metrics::counter; use prost::Message; use servers::error::{self, Result as ServerResult}; use servers::prometheus::{self, Metrics}; @@ -30,6 +31,7 @@ use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt}; use crate::instance::Instance; +use crate::metrics::PROMETHEUS_REMOTE_WRITE_SAMPLES; const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32; @@ -106,11 +108,13 @@ impl Instance { #[async_trait] impl PrometheusProtocolHandler for Instance { async fn write(&self, request: WriteRequest, ctx: QueryContextRef) -> ServerResult<()> { - let requests = prometheus::to_grpc_insert_requests(request.clone())?; + let (requests, samples) = prometheus::to_grpc_insert_requests(request)?; self.handle_inserts(requests, ctx) .await .map_err(BoxedError::new) .context(error::ExecuteGrpcQuerySnafu)?; + + counter!(PROMETHEUS_REMOTE_WRITE_SAMPLES, samples as u64); Ok(()) } diff --git a/src/frontend/src/metrics.rs b/src/frontend/src/metrics.rs index 508699e84088..61ffab4089a5 100644 --- a/src/frontend/src/metrics.rs +++ b/src/frontend/src/metrics.rs @@ -22,3 +22,6 @@ pub const DIST_CREATE_TABLE: &str = "frontend.dist.create_table"; pub const DIST_CREATE_TABLE_IN_META: &str = "frontend.dist.create_table.update_meta"; pub const DIST_CREATE_TABLE_IN_DATANODE: &str = "frontend.dist.create_table.invoke_datanode"; pub const DIST_INGEST_ROW_COUNT: &str = "frontend.dist.ingest_rows"; + +/// The samples count of Prometheus remote write. +pub const PROMETHEUS_REMOTE_WRITE_SAMPLES: &str = "frontend.prometheus.remote_write.samples"; diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index bc1a7391ccbc..87fa2622fb2e 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -20,8 +20,6 @@ use std::sync::Arc; use api::v1::AlterExpr; use async_trait::async_trait; use catalog::helper::{TableGlobalKey, TableGlobalValue}; -use catalog::remote::KvBackendRef; -use client::client_manager::DatanodeClients; use client::Database; use common_error::prelude::BoxedError; use common_meta::key::TableRouteKey; @@ -45,7 +43,7 @@ use datafusion::physical_plan::{ use datafusion_common::DataFusionError; use datatypes::schema::{ColumnSchema, Schema, SchemaRef}; use futures_util::{Stream, StreamExt}; -use partition::manager::{PartitionRuleManagerRef, TableRouteCacheInvalidator}; +use partition::manager::TableRouteCacheInvalidator; use partition::splitter::WriteSplitter; use snafu::prelude::*; use store_api::storage::{RegionNumber, ScanRequest}; @@ -53,12 +51,13 @@ use table::error::TableOperationSnafu; use table::metadata::{FilterPushDownType, TableInfo, TableInfoRef}; use table::requests::{AlterKind, AlterTableRequest, DeleteRequest, InsertRequest}; use table::table::AlterContext; -use table::{meter_insert_request, Table}; +use table::Table; use tokio::sync::RwLock; +use crate::catalog::FrontendCatalogManager; use crate::error::{self, FindDatanodeSnafu, FindTableRouteSnafu, Result}; +use crate::instance::distributed::inserter::DistInserter; use crate::table::delete::to_grpc_delete_request; -use crate::table::insert::to_grpc_insert_request; use crate::table::scan::{DatanodeInstance, TableScanPlan}; mod delete; @@ -69,10 +68,7 @@ pub(crate) mod scan; pub struct DistTable { table_name: TableName, table_info: TableInfoRef, - partition_manager: PartitionRuleManagerRef, - // TODO(ruihang): move this field into PartitionRuleManager - datanode_clients: Arc, - backend: KvBackendRef, + catalog_manager: Arc, } #[async_trait] @@ -90,29 +86,17 @@ impl Table for DistTable { } async fn insert(&self, request: InsertRequest) -> table::Result { - meter_insert_request!(request); - - let splits = self - .partition_manager - .split_insert_request(&self.table_name, request, &self.schema()) - .await - .map_err(BoxedError::new) - .context(TableOperationSnafu)?; - - let inserts = splits - .into_iter() - .map(|(region_number, insert)| to_grpc_insert_request(region_number, insert)) - .collect::>>() - .map_err(BoxedError::new) - .context(TableOperationSnafu)?; - - let output = self - .dist_insert(inserts) + let inserter = DistInserter::new( + request.catalog_name.clone(), + request.schema_name.clone(), + self.catalog_manager.clone(), + ); + let affected_rows = inserter + .insert(vec![request]) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; - let Output::AffectedRows(rows) = output else { unreachable!() }; - Ok(rows) + Ok(affected_rows as usize) } async fn scan( @@ -121,20 +105,20 @@ impl Table for DistTable { filters: &[Expr], limit: Option, ) -> table::Result { - let partition_rule = self - .partition_manager + let partition_manager = self.catalog_manager.partition_manager(); + let datanode_clients = self.catalog_manager.datanode_clients(); + + let partition_rule = partition_manager .find_table_partition_rule(&self.table_name) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; - let regions = self - .partition_manager + let regions = partition_manager .find_regions_by_filters(partition_rule, filters) .map_err(BoxedError::new) .context(TableOperationSnafu)?; - let datanodes = self - .partition_manager + let datanodes = partition_manager .find_region_datanodes(&self.table_name, regions) .await .map_err(BoxedError::new) @@ -143,7 +127,7 @@ impl Table for DistTable { let table_name = &self.table_name; let mut partition_execs = Vec::with_capacity(datanodes.len()); for (datanode, _regions) in datanodes.iter() { - let client = self.datanode_clients.get_client(datanode).await; + let client = datanode_clients.get_client(datanode).await; let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db); @@ -169,20 +153,20 @@ impl Table for DistTable { &self, request: ScanRequest, ) -> table::Result { - let partition_rule = self - .partition_manager + let partition_manager = self.catalog_manager.partition_manager(); + let datanode_clients = self.catalog_manager.datanode_clients(); + + let partition_rule = partition_manager .find_table_partition_rule(&self.table_name) .await .map_err(BoxedError::new) .context(TableOperationSnafu)?; - let regions = self - .partition_manager + let regions = partition_manager .find_regions_by_filters(partition_rule, &request.filters) .map_err(BoxedError::new) .context(TableOperationSnafu)?; - let datanodes = self - .partition_manager + let datanodes = partition_manager .find_region_datanodes(&self.table_name, regions) .await .map_err(BoxedError::new) @@ -191,7 +175,7 @@ impl Table for DistTable { let table_name = &self.table_name; let mut partition_execs = Vec::with_capacity(datanodes.len()); for (datanode, _regions) in datanodes.iter() { - let client = self.datanode_clients.get_client(datanode).await; + let client = datanode_clients.get_client(datanode).await; let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); let datanode_instance = DatanodeInstance::new(Arc::new(self.clone()) as _, db); @@ -247,8 +231,9 @@ impl Table for DistTable { } async fn delete(&self, request: DeleteRequest) -> table::Result { - let partition_rule = self - .partition_manager + let partition_manager = self.catalog_manager.partition_manager(); + + let partition_rule = partition_manager .find_table_partition_rule(&self.table_name) .await .map_err(BoxedError::new) @@ -276,7 +261,12 @@ impl Table for DistTable { requests .into_iter() .map(|(region_number, request)| { - to_grpc_delete_request(&self.table_name, region_number, request) + to_grpc_delete_request( + &table_info.meta, + &self.table_name, + region_number, + request, + ) }) .collect::>>() .map_err(BoxedError::new) @@ -297,16 +287,12 @@ impl DistTable { pub fn new( table_name: TableName, table_info: TableInfoRef, - partition_manager: PartitionRuleManagerRef, - datanode_clients: Arc, - backend: KvBackendRef, + catalog_manager: Arc, ) -> Self { Self { table_name, table_info, - partition_manager, - datanode_clients, - backend, + catalog_manager, } } @@ -315,7 +301,8 @@ impl DistTable { key: &TableGlobalKey, ) -> Result> { let raw = self - .backend + .catalog_manager + .backend() .get(key.to_string().as_bytes()) .await .context(error::CatalogSnafu)?; @@ -332,14 +319,16 @@ impl DistTable { value: TableGlobalValue, ) -> Result<()> { let value = value.as_bytes().context(error::CatalogEntrySerdeSnafu)?; - self.backend + self.catalog_manager + .backend() .set(key.to_string().as_bytes(), &value) .await .context(error::CatalogSnafu) } async fn delete_table_global_value(&self, key: TableGlobalKey) -> Result<()> { - self.backend + self.catalog_manager + .backend() .delete(key.to_string().as_bytes()) .await .context(error::CatalogSnafu) @@ -369,12 +358,14 @@ impl DistTable { } .key(); - self.backend + self.catalog_manager + .backend() .move_value(old_key.as_bytes(), new_key.as_bytes()) .await .context(error::CatalogSnafu)?; - self.partition_manager + self.catalog_manager + .partition_manager() .invalidate_table_route(&TableName { catalog_name: catalog_name.to_string(), schema_name: schema_name.to_string(), @@ -451,7 +442,8 @@ impl DistTable { /// [`table::requests::AlterTableRequest`] and [`AlterExpr`]. async fn alter_by_expr(&self, expr: &AlterExpr) -> Result<()> { let table_routes = self - .partition_manager + .catalog_manager + .partition_manager() .find_table_route(&self.table_name) .await .with_context(|_| error::FindTableRouteSnafu { @@ -467,8 +459,10 @@ impl DistTable { ) } ); + + let datanode_clients = self.catalog_manager.datanode_clients(); for datanode in leaders { - let client = self.datanode_clients.get_client(&datanode).await; + let client = datanode_clients.get_client(&datanode).await; let db = Database::new(&expr.catalog_name, &expr.schema_name, client); debug!("Sending {:?} to {:?}", expr, db); let result = db @@ -487,7 +481,8 @@ impl DistTable { ) -> Result> { let table_name = &self.table_name; let route = self - .partition_manager + .catalog_manager + .partition_manager() .find_table_route(table_name) .await .with_context(|_| FindTableRouteSnafu { @@ -497,24 +492,16 @@ impl DistTable { let datanodes = regions .iter() .map(|&n| { - let region_id = n as u64; route - .region_routes - .iter() - .find_map(|x| { - if x.region.id == region_id { - x.leader_peer.clone() - } else { - None - } - }) - .context(FindDatanodeSnafu { region: region_id }) + .find_region_leader(n) + .context(FindDatanodeSnafu { region: n }) }) .collect::>>()?; + let datanode_clients = self.catalog_manager.datanode_clients(); let mut instances = Vec::with_capacity(datanodes.len()); for datanode in datanodes { - let client = self.datanode_clients.get_client(&datanode).await; + let client = datanode_clients.get_client(datanode).await; let db = Database::new(&table_name.catalog_name, &table_name.schema_name, client); instances.push(DatanodeInstance::new(Arc::new(self.clone()) as _, db)); } @@ -622,12 +609,12 @@ impl PartitionExec { } #[cfg(test)] -mod test { +pub(crate) mod test { use std::collections::HashMap; use std::sync::atomic::{AtomicU32, Ordering}; - use catalog::error::Result; - use catalog::remote::{KvBackend, ValueIter}; + use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; + use common_meta::peer::Peer; use common_meta::rpc::router::{Region, RegionRoute, Table, TableRoute}; use datafusion_expr::expr_fn::{and, binary_expr, col, or}; use datafusion_expr::{lit, Operator}; @@ -637,7 +624,7 @@ mod test { use meter_core::global::global_registry; use meter_core::write_calc::WriteCalculator; use partition::columns::RangeColumnsPartitionRule; - use partition::manager::PartitionRuleManager; + use partition::manager::{PartitionRuleManager, PartitionRuleManagerRef}; use partition::partition::{PartitionBound, PartitionDef}; use partition::range::RangePartitionRule; use partition::route::TableRoutes; @@ -647,56 +634,38 @@ mod test { use super::*; - struct DummyKvBackend; - - #[async_trait] - impl KvBackend for DummyKvBackend { - fn as_any(&self) -> &dyn Any { - self - } - - fn range<'a, 'b>(&'a self, _key: &[u8]) -> ValueIter<'b, catalog::error::Error> - where - 'a: 'b, - { - unimplemented!() - } - - async fn set(&self, _key: &[u8], _val: &[u8]) -> Result<()> { - unimplemented!() - } - - async fn compare_and_set( - &self, - _key: &[u8], - _expect: &[u8], - _val: &[u8], - ) -> Result>>> { - unimplemented!() - } - - async fn move_value(&self, _from_key: &[u8], _to_key: &[u8]) -> Result<()> { - unimplemented!() - } - - async fn delete_range(&self, _key: &[u8], _end: &[u8]) -> Result<()> { - unimplemented!() - } - } - - #[tokio::test(flavor = "multi_thread")] - async fn test_find_partition_rule() { - let table_name = TableName::new("greptime", "public", "foo"); + /// Create a partition rule manager with two tables, one is partitioned by single column, and + /// the other one is two. The tables are under default catalog and schema. + /// + /// Table named "one_column_partitioning_table" is partitioned by column "a" like this: + /// PARTITION BY RANGE (a) ( + /// PARTITION r1 VALUES LESS THAN (10), + /// PARTITION r2 VALUES LESS THAN (50), + /// PARTITION r3 VALUES LESS THAN (MAXVALUE), + /// ) + /// + /// Table named "two_column_partitioning_table" is partitioned by columns "a" and "b" like this: + /// PARTITION BY RANGE (a, b) ( + /// PARTITION r1 VALUES LESS THAN (10, 'hz'), + /// PARTITION r2 VALUES LESS THAN (50, 'sh'), + /// PARTITION r3 VALUES LESS THAN (MAXVALUE, MAXVALUE), + /// ) + pub(crate) async fn create_partition_rule_manager() -> PartitionRuleManagerRef { let table_routes = Arc::new(TableRoutes::new(Arc::new(MetaClient::default()))); let partition_manager = Arc::new(PartitionRuleManager::new(table_routes.clone())); - let table_route = TableRoute { - table: Table { + let table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "one_column_partitioning_table", + ); + let table_route = TableRoute::new( + Table { id: 1, table_name: table_name.clone(), table_schema: vec![], }, - region_routes: vec![ + vec![ RegionRoute { region: Region { id: 3, @@ -711,7 +680,7 @@ mod test { ), attrs: HashMap::new(), }, - leader_peer: None, + leader_peer: Some(Peer::new(3, "")), follower_peers: vec![], }, RegionRoute { @@ -728,7 +697,7 @@ mod test { ), attrs: HashMap::new(), }, - leader_peer: None, + leader_peer: Some(Peer::new(2, "")), follower_peers: vec![], }, RegionRoute { @@ -745,34 +714,27 @@ mod test { ), attrs: HashMap::new(), }, - leader_peer: None, + leader_peer: Some(Peer::new(1, "")), follower_peers: vec![], }, ], - }; + ); table_routes .insert_table_route(table_name.clone(), Arc::new(table_route)) .await; - let partition_rule = partition_manager - .find_table_partition_rule(&table_name) - .await - .unwrap(); - let range_rule = partition_rule - .as_any() - .downcast_ref::() - .unwrap(); - assert_eq!(range_rule.column_name(), "a"); - assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]); - assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]); - - let table_route = TableRoute { - table: Table { + let table_name = TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "two_column_partitioning_table", + ); + let table_route = TableRoute::new( + Table { id: 1, table_name: table_name.clone(), table_schema: vec![], }, - region_routes: vec![ + vec![ RegionRoute { region: Region { id: 1, @@ -831,13 +793,40 @@ mod test { follower_peers: vec![], }, ], - }; + ); table_routes .insert_table_route(table_name.clone(), Arc::new(table_route)) .await; + partition_manager + } + + #[tokio::test(flavor = "multi_thread")] + async fn test_find_partition_rule() { + let partition_manager = create_partition_rule_manager().await; + + let partition_rule = partition_manager + .find_table_partition_rule(&TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "one_column_partitioning_table", + )) + .await + .unwrap(); + let range_rule = partition_rule + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(range_rule.column_name(), "a"); + assert_eq!(range_rule.all_regions(), &vec![3, 2, 1]); + assert_eq!(range_rule.bounds(), &vec![10_i32.into(), 50_i32.into()]); + let partition_rule = partition_manager - .find_table_partition_rule(&table_name) + .find_table_partition_rule(&TableName::new( + DEFAULT_CATALOG_NAME, + DEFAULT_SCHEMA_NAME, + "two_column_partitioning_table", + )) .await .unwrap(); let range_columns_rule = partition_rule diff --git a/src/frontend/src/table/delete.rs b/src/frontend/src/table/delete.rs index 726090efb7b3..b620f7f3e5de 100644 --- a/src/frontend/src/table/delete.rs +++ b/src/frontend/src/table/delete.rs @@ -18,6 +18,7 @@ use common_query::Output; use futures::future; use snafu::ResultExt; use store_api::storage::RegionNumber; +use table::metadata::TableMeta; use table::requests::DeleteRequest; use crate::error::{JoinTaskSnafu, RequestDatanodeSnafu, Result}; @@ -48,11 +49,12 @@ impl DistTable { } pub(super) fn to_grpc_delete_request( + table_meta: &TableMeta, table_name: &TableName, region_number: RegionNumber, request: DeleteRequest, ) -> Result { - let (key_columns, row_count) = to_grpc_columns(&request.key_column_values)?; + let (key_columns, row_count) = to_grpc_columns(table_meta, &request.key_column_values)?; Ok(GrpcDeleteRequest { table_name: table_name.table_name.clone(), region_number, @@ -68,13 +70,27 @@ mod tests { use api::v1::column::{SemanticType, Values}; use api::v1::{Column, ColumnDataType}; - use datatypes::prelude::VectorRef; + use datatypes::prelude::{ConcreteDataType, VectorRef}; + use datatypes::schema::{ColumnSchema, Schema}; use datatypes::vectors::Int32Vector; + use table::metadata::TableMetaBuilder; use super::*; #[test] fn test_to_grpc_delete_request() { + let schema = Schema::new(vec![ + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) + .with_time_index(true), + ColumnSchema::new("id", ConcreteDataType::int32_datatype(), false), + ]); + + let mut builder = TableMetaBuilder::default(); + builder.schema(Arc::new(schema)); + builder.primary_key_indices(vec![]); + builder.next_column_id(2); + + let table_meta = builder.build().unwrap(); let table_name = TableName { catalog_name: "greptime".to_string(), schema_name: "public".to_string(), @@ -88,7 +104,8 @@ mod tests { )]); let request = DeleteRequest { key_column_values }; - let result = to_grpc_delete_request(&table_name, region_number, request).unwrap(); + let result = + to_grpc_delete_request(&table_meta, &table_name, region_number, request).unwrap(); assert_eq!(result.table_name, "foo"); assert_eq!(result.region_number, region_number); diff --git a/src/frontend/src/table/insert.rs b/src/frontend/src/table/insert.rs index 3edce8c00464..4ee1ab84332f 100644 --- a/src/frontend/src/table/insert.rs +++ b/src/frontend/src/table/insert.rs @@ -15,49 +15,18 @@ use std::collections::HashMap; use api::helper::{push_vals, ColumnDataTypeWrapper}; -use api::v1::column::SemanticType; +use api::v1::column::{SemanticType, Values}; use api::v1::{Column, InsertRequest as GrpcInsertRequest}; -use common_query::Output; -use datatypes::prelude::{ConcreteDataType, VectorRef}; -use futures::future; -use metrics::counter; -use snafu::{ensure, ResultExt}; +use datatypes::prelude::*; +use snafu::{ensure, OptionExt, ResultExt}; use store_api::storage::RegionNumber; +use table::metadata::TableMeta; use table::requests::InsertRequest; -use super::DistTable; -use crate::error; -use crate::error::{JoinTaskSnafu, RequestDatanodeSnafu, Result}; - -impl DistTable { - pub async fn dist_insert(&self, inserts: Vec) -> Result { - let regions = inserts.iter().map(|x| x.region_number).collect::>(); - let instances = self.find_datanode_instances(®ions).await?; - - let results = future::try_join_all(instances.into_iter().zip(inserts.into_iter()).map( - |(instance, request)| { - common_runtime::spawn_write(async move { - instance - .grpc_insert(request) - .await - .context(RequestDatanodeSnafu) - }) - }, - )) - .await - .context(JoinTaskSnafu)?; - - let affected_rows = results.into_iter().sum::>()?; - counter!(crate::metrics::DIST_INGEST_ROW_COUNT, affected_rows as u64); - Ok(Output::AffectedRows(affected_rows as _)) - } -} - -pub fn insert_request_to_insert_batch(insert: &InsertRequest) -> Result<(Vec, u32)> { - to_grpc_columns(&insert.columns_values) -} +use crate::error::{self, ColumnDataTypeSnafu, NotSupportedSnafu, Result, VectorToGrpcColumnSnafu}; pub(crate) fn to_grpc_columns( + table_meta: &TableMeta, columns_values: &HashMap, ) -> Result<(Vec, u32)> { let mut row_count = None; @@ -76,27 +45,7 @@ pub(crate) fn to_grpc_columns( None => row_count = Some(vector.len()), } - let datatype: ColumnDataTypeWrapper = vector - .data_type() - .try_into() - .context(error::ColumnDataTypeSnafu)?; - - // TODO(hl): need refactor - let semantic_type = - if vector.data_type() == ConcreteDataType::timestamp_millisecond_datatype() { - SemanticType::Timestamp - } else { - SemanticType::Field - }; - - let mut column = Column { - column_name: column_name.clone(), - semantic_type: semantic_type.into(), - datatype: datatype.datatype() as i32, - ..Default::default() - }; - - push_vals(&mut column, 0, vector.clone()); + let column = vector_to_grpc_column(table_meta, column_name, vector.clone())?; Ok(column) }) .collect::>>()?; @@ -107,11 +56,12 @@ pub(crate) fn to_grpc_columns( } pub(crate) fn to_grpc_insert_request( + table_meta: &TableMeta, region_number: RegionNumber, insert: InsertRequest, ) -> Result { let table_name = insert.table_name.clone(); - let (columns, row_count) = insert_request_to_insert_batch(&insert)?; + let (columns, row_count) = to_grpc_columns(table_meta, &insert.columns_values)?; Ok(GrpcInsertRequest { table_name, region_number, @@ -120,23 +70,142 @@ pub(crate) fn to_grpc_insert_request( }) } +fn vector_to_grpc_column( + table_meta: &TableMeta, + column_name: &str, + vector: VectorRef, +) -> Result { + let time_index_column = &table_meta + .schema + .timestamp_column() + .context(NotSupportedSnafu { + feat: "Table without time index.", + })? + .name; + let semantic_type = if column_name == time_index_column { + SemanticType::Timestamp + } else { + let column_index = table_meta + .schema + .column_index_by_name(column_name) + .context(VectorToGrpcColumnSnafu { + reason: format!("unable to find column {column_name} in table schema"), + })?; + if table_meta.primary_key_indices.contains(&column_index) { + SemanticType::Tag + } else { + SemanticType::Field + } + }; + + let datatype: ColumnDataTypeWrapper = + vector.data_type().try_into().context(ColumnDataTypeSnafu)?; + + let mut column = Column { + column_name: column_name.to_string(), + semantic_type: semantic_type as i32, + null_mask: vec![], + datatype: datatype.datatype() as i32, + values: Some(Values::default()), // vector values will be pushed into it below + }; + push_vals(&mut column, 0, vector); + Ok(column) +} + #[cfg(test)] mod tests { use std::collections::HashMap; + use std::sync::Arc; use api::v1::ColumnDataType; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use datatypes::prelude::ScalarVectorBuilder; - use datatypes::vectors::{Int16VectorBuilder, MutableVector, StringVectorBuilder}; + use datatypes::schema::{ColumnSchema, Schema}; + use datatypes::vectors::{ + Int16VectorBuilder, Int32Vector, Int64Vector, MutableVector, StringVector, + StringVectorBuilder, + }; + use table::metadata::TableMetaBuilder; use table::requests::InsertRequest; use super::*; + #[test] + fn test_vector_to_grpc_column() { + let schema = Arc::new(Schema::new(vec![ + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) + .with_time_index(true), + ColumnSchema::new("k", ConcreteDataType::int32_datatype(), false), + ColumnSchema::new("v", ConcreteDataType::string_datatype(), true), + ])); + + let mut builder = TableMetaBuilder::default(); + builder.schema(schema); + builder.primary_key_indices(vec![1]); + builder.next_column_id(3); + let table_meta = builder.build().unwrap(); + + let column = vector_to_grpc_column( + &table_meta, + "ts", + Arc::new(Int64Vector::from_slice([1, 2, 3])), + ) + .unwrap(); + assert_eq!(column.column_name, "ts"); + assert_eq!(column.semantic_type, SemanticType::Timestamp as i32); + assert_eq!(column.values.unwrap().i64_values, vec![1, 2, 3]); + assert_eq!(column.null_mask, vec![0]); + assert_eq!(column.datatype, ColumnDataType::Int64 as i32); + + let column = vector_to_grpc_column( + &table_meta, + "k", + Arc::new(Int32Vector::from_slice([3, 2, 1])), + ) + .unwrap(); + assert_eq!(column.column_name, "k"); + assert_eq!(column.semantic_type, SemanticType::Tag as i32); + assert_eq!(column.values.unwrap().i32_values, vec![3, 2, 1]); + assert_eq!(column.null_mask, vec![0]); + assert_eq!(column.datatype, ColumnDataType::Int32 as i32); + + let column = vector_to_grpc_column( + &table_meta, + "v", + Arc::new(StringVector::from(vec![ + Some("hello"), + None, + Some("greptime"), + ])), + ) + .unwrap(); + assert_eq!(column.column_name, "v"); + assert_eq!(column.semantic_type, SemanticType::Field as i32); + assert_eq!( + column.values.unwrap().string_values, + vec!["hello", "greptime"] + ); + assert_eq!(column.null_mask, vec![2]); + assert_eq!(column.datatype, ColumnDataType::String as i32); + } + #[test] fn test_to_grpc_insert_request() { - let insert_request = mock_insert_request(); + let schema = Schema::new(vec![ + ColumnSchema::new("ts", ConcreteDataType::int64_datatype(), false) + .with_time_index(true), + ColumnSchema::new("id", ConcreteDataType::int16_datatype(), false), + ColumnSchema::new("host", ConcreteDataType::string_datatype(), false), + ]); - let request = to_grpc_insert_request(12, insert_request).unwrap(); + let mut builder = TableMetaBuilder::default(); + builder.schema(Arc::new(schema)); + builder.primary_key_indices(vec![]); + builder.next_column_id(3); + + let table_meta = builder.build().unwrap(); + let insert_request = mock_insert_request(); + let request = to_grpc_insert_request(&table_meta, 12, insert_request).unwrap(); verify_grpc_insert_request(request); } diff --git a/src/frontend/src/table/scan.rs b/src/frontend/src/table/scan.rs index f93389013540..7d8080bf5ff9 100644 --- a/src/frontend/src/table/scan.rs +++ b/src/frontend/src/table/scan.rs @@ -15,7 +15,7 @@ use std::fmt::Formatter; use std::sync::Arc; -use api::v1::{DeleteRequest, InsertRequest}; +use api::v1::DeleteRequest; use client::Database; use common_meta::table_name::TableName; use common_query::prelude::Expr; @@ -47,10 +47,6 @@ impl DatanodeInstance { Self { table, db } } - pub(crate) async fn grpc_insert(&self, request: InsertRequest) -> client::Result { - self.db.insert(request).await - } - pub(crate) async fn grpc_delete(&self, request: DeleteRequest) -> client::Result { self.db.delete(request).await } diff --git a/src/meta-srv/src/bootstrap.rs b/src/meta-srv/src/bootstrap.rs index 33018f81aa51..6b56b822a9e3 100644 --- a/src/meta-srv/src/bootstrap.rs +++ b/src/meta-srv/src/bootstrap.rs @@ -33,6 +33,7 @@ use tonic::transport::server::Router; use crate::cluster::MetaPeerClientBuilder; use crate::election::etcd::EtcdElection; use crate::lock::etcd::EtcdLock; +use crate::lock::memory::MemLock; use crate::metasrv::builder::MetaSrvBuilder; use crate::metasrv::{MetaSrv, MetaSrvOptions, SelectorRef}; use crate::selector::lease_based::LeaseBasedSelector; @@ -152,7 +153,11 @@ pub fn router(meta_srv: MetaSrv) -> Router { pub async fn build_meta_srv(opts: &MetaSrvOptions) -> Result { let (kv_store, election, lock) = if opts.use_memory_store { - (Arc::new(MemStore::new()) as _, None, None) + ( + Arc::new(MemStore::new()) as _, + None, + Some(Arc::new(MemLock::default()) as _), + ) } else { let etcd_endpoints = [&opts.store_addr]; let etcd_client = Client::connect(etcd_endpoints, None) diff --git a/src/meta-srv/src/error.rs b/src/meta-srv/src/error.rs index f26f9f5d73b6..6c33b3029633 100644 --- a/src/meta-srv/src/error.rs +++ b/src/meta-srv/src/error.rs @@ -283,6 +283,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Table already exists: {table_name}"))] + TableAlreadyExists { + table_name: String, + location: Location, + }, + #[snafu(display("Pusher not found: {pusher_id}"))] PusherNotFound { pusher_id: String, @@ -394,6 +400,7 @@ impl ErrorExt for Error { | Error::SendShutdownSignal { .. } | Error::ParseAddr { .. } | Error::SchemaAlreadyExists { .. } + | Error::TableAlreadyExists { .. } | Error::PusherNotFound { .. } | Error::PushMessage { .. } | Error::MailboxClosed { .. } diff --git a/src/meta-srv/src/lock.rs b/src/meta-srv/src/lock.rs index b4d0a3eafe30..8defc86b4e9a 100644 --- a/src/meta-srv/src/lock.rs +++ b/src/meta-srv/src/lock.rs @@ -18,6 +18,8 @@ pub(crate) mod memory; use std::sync::Arc; +use common_telemetry::error; + use crate::error::Result; pub type Key = Vec; @@ -49,3 +51,50 @@ pub trait DistLock: Send + Sync { } pub type DistLockRef = Arc; + +pub(crate) struct DistLockGuard<'a> { + lock: &'a DistLockRef, + name: Vec, + key: Option, +} + +impl<'a> DistLockGuard<'a> { + pub(crate) fn new(lock: &'a DistLockRef, name: Vec) -> Self { + Self { + lock, + name, + key: None, + } + } + + pub(crate) async fn lock(&mut self) -> Result<()> { + if self.key.is_some() { + return Ok(()); + } + let key = self + .lock + .lock( + self.name.clone(), + Opts { + expire_secs: Some(2), + }, + ) + .await?; + self.key = Some(key); + Ok(()) + } +} + +impl Drop for DistLockGuard<'_> { + fn drop(&mut self) { + if let Some(key) = self.key.take() { + let lock = self.lock.clone(); + let name = self.name.clone(); + common_runtime::spawn_bg(async move { + if let Err(e) = lock.unlock(key).await { + error!(e; "Failed to unlock '{}'", String::from_utf8_lossy(&name)); + } + }); + } + } +} diff --git a/src/meta-srv/src/lock/keys.rs b/src/meta-srv/src/lock/keys.rs index 513b308ac081..8020ab768b81 100644 --- a/src/meta-srv/src/lock/keys.rs +++ b/src/meta-srv/src/lock/keys.rs @@ -15,6 +15,7 @@ //! All keys used for distributed locking in the Metasrv. //! Place them in this unified module for better maintenance. +use common_meta::table_name::TableName; use common_meta::RegionIdent; use crate::lock::Key; @@ -30,3 +31,7 @@ pub(crate) fn table_metadata_lock_key(region: &RegionIdent) -> Key { ) .into_bytes() } + +pub(crate) fn table_creation_lock_key(table_name: &TableName) -> Key { + format!("table_creation_lock_({})", table_name).into_bytes() +} diff --git a/src/meta-srv/src/service/router.rs b/src/meta-srv/src/service/router.rs index 4dfa22714eb0..949fe6a7d9a0 100644 --- a/src/meta-srv/src/service/router.rs +++ b/src/meta-srv/src/service/router.rs @@ -21,16 +21,19 @@ use api::v1::meta::{ }; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_meta::key::TableRouteKey; +use common_meta::table_name::TableName; use common_telemetry::{timer, warn}; -use snafu::{OptionExt, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use table::metadata::RawTableInfo; use tonic::{Request, Response}; use crate::error; use crate::error::Result; +use crate::lock::{keys, DistLockGuard}; use crate::metasrv::{Context, MetaSrv, SelectorContext, SelectorRef}; use crate::metrics::METRIC_META_ROUTE_REQUEST; use crate::sequence::SequenceRef; +use crate::service::store::ext::KvStoreExt; use crate::service::store::kv::KvStoreRef; use crate::service::GrpcResult; use crate::table_routes::{get_table_global_value, get_table_route_value}; @@ -43,6 +46,30 @@ impl router_server::Router for MetaSrv { let CreateRequest { header, table_name, .. } = &req; + let table_name: TableName = table_name + .as_ref() + .context(error::EmptyTableNameSnafu)? + .clone() + .into(); + + // TODO(LFC): Use procedure to create table, and get rid of locks here. + let mut guard = DistLockGuard::new(self.lock(), keys::table_creation_lock_key(&table_name)); + guard.lock().await?; + + let table_global_key = TableGlobalKey { + catalog_name: table_name.catalog_name.clone(), + schema_name: table_name.schema_name.clone(), + table_name: table_name.table_name.clone(), + } + .to_string() + .into_bytes(); + ensure!( + self.kv_store().get(table_global_key).await?.is_none(), + error::TableAlreadyExistsSnafu { + table_name: table_name.to_string(), + } + ); + let cluster_id = header.as_ref().map_or(0, |h| h.cluster_id); let _timer = timer!( @@ -53,14 +80,13 @@ impl router_server::Router for MetaSrv { ] ); - let table_name = table_name.clone().context(error::EmptyTableNameSnafu)?; let ctx = SelectorContext { datanode_lease_secs: self.options().datanode_lease_secs, server_addr: self.options().server_addr.clone(), kv_store: self.kv_store(), - catalog: Some(table_name.catalog_name), - schema: Some(table_name.schema_name), - table: Some(table_name.table_name), + catalog: Some(table_name.catalog_name.clone()), + schema: Some(table_name.schema_name.clone()), + table: Some(table_name.table_name.clone()), }; let selector = self.selector(); @@ -138,7 +164,7 @@ async fn handle_create( return Ok(RouteResponse { header: Some(ResponseHeader::failed( cluster_id, - Error::not_enough_active_datanodes(peers.len() as _), + Error::not_enough_available_datanodes(partitions.len(), peers.len()), )), ..Default::default() }); diff --git a/src/partition/Cargo.toml b/src/partition/Cargo.toml index 0746da964679..b4f87f1235b0 100644 --- a/src/partition/Cargo.toml +++ b/src/partition/Cargo.toml @@ -7,6 +7,7 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +api = { path = "../api" } async-trait = "0.1" common-catalog = { path = "../common/catalog" } common-error = { path = "../common/error" } diff --git a/src/partition/src/manager.rs b/src/partition/src/manager.rs index 704fee00977e..c4189de309fb 100644 --- a/src/partition/src/manager.rs +++ b/src/partition/src/manager.rs @@ -88,21 +88,13 @@ impl PartitionRuleManager { let mut datanodes = HashMap::with_capacity(regions.len()); for region in regions.iter() { let datanode = route - .region_routes - .iter() - .find_map(|x| { - if x.region.id == *region as RegionId { - x.leader_peer.clone() - } else { - None - } - }) + .find_region_leader(*region) .context(error::FindDatanodeSnafu { table: table.to_string(), region: *region, })?; datanodes - .entry(datanode) + .entry(datanode.clone()) .or_insert_with(Vec::new) .push(*region); } diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index fa1a5985e08e..379b123e82ad 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -39,6 +39,7 @@ http-body = "0.4" humantime-serde = "1.1" hyper = { version = "0.14", features = ["full"] } influxdb_line_protocol = { git = "https://github.com/evenyag/influxdb_iox", branch = "feat/line-protocol" } +itertools.workspace = true metrics.workspace = true # metrics-process 1.0.10 depends on metrics-0.21 but opendal depends on metrics-0.20.1 metrics-process = "<1.0.10" diff --git a/src/servers/src/grpc.rs b/src/servers/src/grpc.rs index 87d13840ef5d..64cba0037874 100644 --- a/src/servers/src/grpc.rs +++ b/src/servers/src/grpc.rs @@ -28,10 +28,11 @@ use arrow_flight::flight_service_server::{FlightService, FlightServiceServer}; use async_trait::async_trait; use common_runtime::Runtime; use common_telemetry::logging::info; +use common_telemetry::{error, warn}; use futures::FutureExt; -use snafu::{ensure, ResultExt}; +use snafu::{ensure, OptionExt, ResultExt}; use tokio::net::TcpListener; -use tokio::sync::oneshot::{self, Sender}; +use tokio::sync::oneshot::{self, Receiver, Sender}; use tokio::sync::Mutex; use tokio_stream::wrappers::TcpListenerStream; use tonic::{Request, Response, Status}; @@ -39,7 +40,8 @@ use tonic::{Request, Response, Status}; use self::prom_query_gateway::PrometheusGatewayService; use crate::auth::UserProviderRef; use crate::error::{ - AlreadyStartedSnafu, GrpcReflectionServiceSnafu, Result, StartGrpcSnafu, TcpBindSnafu, + AlreadyStartedSnafu, GrpcReflectionServiceSnafu, InternalSnafu, Result, StartGrpcSnafu, + TcpBindSnafu, }; use crate::grpc::database::DatabaseService; use crate::grpc::flight::FlightHandler; @@ -55,6 +57,10 @@ pub struct GrpcServer { request_handler: Arc, /// Handler for Prometheus-compatible PromQL queries. Only present for frontend server. promql_handler: Option, + + /// gRPC serving state receiver. Only present if the gRPC server is started. + /// Used to wait for the server to stop, performing the old blocking fashion. + serve_state: Mutex>>>, } impl GrpcServer { @@ -73,6 +79,7 @@ impl GrpcServer { shutdown_tx: Mutex::new(None), request_handler, promql_handler, + serve_state: Mutex::new(None), } } @@ -94,6 +101,22 @@ impl GrpcServer { ) -> PrometheusGatewayServer { PrometheusGatewayServer::new(PrometheusGatewayService::new(handler)) } + + pub async fn wait_for_serve(&self) -> Result<()> { + let mut serve_state = self.serve_state.lock().await; + let rx = serve_state.take().context(InternalSnafu { + err_msg: "gRPC serving state is unknown, maybe the server is not started, \ + or we have already waited for the serve result before.", + })?; + let Ok(result) = rx.await else { + warn!("Background gRPC serving task is quited before we can receive the serve result."); + return Ok(()); + }; + if let Err(e) = result { + error!(e; "GRPC serve error"); + } + Ok(()) + } } pub struct HealthCheckHandler; @@ -151,7 +174,6 @@ impl Server for GrpcServer { .build() .context(GrpcReflectionServiceSnafu)?; - // Would block to serve requests. let mut builder = tonic::transport::Server::builder() .add_service(self.create_flight_service()) .add_service(self.create_database_service()) @@ -160,12 +182,19 @@ impl Server for GrpcServer { builder = builder.add_service(self.create_prom_query_gateway_service(promql_handler.clone())) } - builder - .add_service(reflection_service) - .serve_with_incoming_shutdown(TcpListenerStream::new(listener), rx.map(drop)) - .await - .context(StartGrpcSnafu)?; + let builder = builder.add_service(reflection_service); + let (serve_state_tx, serve_state_rx) = oneshot::channel(); + let mut serve_state = self.serve_state.lock().await; + *serve_state = Some(serve_state_rx); + + common_runtime::spawn_bg(async move { + let result = builder + .serve_with_incoming_shutdown(TcpListenerStream::new(listener), rx.map(drop)) + .await + .context(StartGrpcSnafu); + serve_state_tx.send(result) + }); Ok(addr) } diff --git a/src/servers/src/influxdb.rs b/src/servers/src/influxdb.rs index 986d5ce6deb3..3086e36cbbed 100644 --- a/src/servers/src/influxdb.rs +++ b/src/servers/src/influxdb.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use api::v1::InsertRequest as GrpcInsertRequest; +use api::v1::{InsertRequest as GrpcInsertRequest, InsertRequests}; use common_grpc::writer::{LinesWriter, Precision}; use influxdb_line_protocol::{parse_lines, FieldValue}; use snafu::ResultExt; @@ -32,7 +32,7 @@ pub struct InfluxdbRequest { type TableName = String; -impl TryFrom<&InfluxdbRequest> for Vec { +impl TryFrom<&InfluxdbRequest> for InsertRequests { type Error = Error; fn try_from(value: &InfluxdbRequest) -> Result { @@ -103,7 +103,7 @@ impl TryFrom<&InfluxdbRequest> for Vec { writer.commit(); } - Ok(writers + let inserts = writers .into_iter() .map(|(table_name, writer)| { let (columns, row_count) = writer.finish(); @@ -114,7 +114,8 @@ impl TryFrom<&InfluxdbRequest> for Vec { row_count, } }) - .collect()) + .collect(); + Ok(InsertRequests { inserts }) } } @@ -140,10 +141,10 @@ monitor2,host=host4 cpu=66.3,memory=1029 1663840496400340003"; lines: lines.to_string(), }; - let requests: Vec = influxdb_req.try_into().unwrap(); - assert_eq!(2, requests.len()); + let requests: InsertRequests = influxdb_req.try_into().unwrap(); + assert_eq!(2, requests.inserts.len()); - for request in requests { + for request in requests.inserts { match &request.table_name[..] { "monitor1" => assert_monitor_1(&request.columns), "monitor2" => assert_monitor_2(&request.columns), diff --git a/src/servers/src/prometheus.rs b/src/servers/src/prometheus.rs index 848a714ec315..17ab69053e02 100644 --- a/src/servers/src/prometheus.rs +++ b/src/servers/src/prometheus.rs @@ -21,7 +21,7 @@ use std::hash::{Hash, Hasher}; use api::prometheus::remote::label_matcher::Type as MatcherType; use api::prometheus::remote::{Label, Query, Sample, TimeSeries, WriteRequest}; use api::v1::column::SemanticType; -use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; +use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests}; use common_recordbatch::{RecordBatch, RecordBatches}; use common_time::timestamp::TimeUnit; use datatypes::prelude::{ConcreteDataType, Value}; @@ -283,15 +283,23 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result Result> { - let timeseries = std::mem::take(&mut request.timeseries); - timeseries.into_iter().map(to_grpc_insert_request).collect() +pub fn to_grpc_insert_requests(request: WriteRequest) -> Result<(InsertRequests, usize)> { + let (inserts, samples_counts) = itertools::process_results( + request.timeseries.into_iter().map(to_grpc_insert_request), + |x| x.unzip::<_, _, Vec<_>, Vec<_>>(), + )?; + Ok(( + InsertRequests { inserts }, + samples_counts.into_iter().sum::(), + )) } -fn to_grpc_insert_request(mut timeseries: TimeSeries) -> Result { +fn to_grpc_insert_request(timeseries: TimeSeries) -> Result<(GrpcInsertRequest, usize)> { + let samples_count = timeseries.samples.len(); + // TODO(dennis): save exemplars into a column - let labels = std::mem::take(&mut timeseries.labels); - let samples = std::mem::take(&mut timeseries.samples); + let labels = timeseries.labels; + let samples = timeseries.samples; let row_count = samples.len(); let mut columns = Vec::with_capacity(2 + labels.len()); @@ -344,14 +352,15 @@ fn to_grpc_insert_request(mut timeseries: TimeSeries) -> Result Result<()> { - let requests: Vec = request.try_into()?; - - for expr in requests { + let requests: InsertRequests = request.try_into()?; + for expr in requests.inserts { let _ = self.tx.send((ctx.current_schema(), expr.table_name)).await; } diff --git a/src/servers/tests/interceptor.rs b/src/servers/tests/interceptor.rs index 9aca93b56173..01ff05557b94 100644 --- a/src/servers/tests/interceptor.rs +++ b/src/servers/tests/interceptor.rs @@ -16,7 +16,7 @@ use std::borrow::Cow; use std::sync::Arc; use api::v1::greptime_request::Request; -use api::v1::InsertRequest; +use api::v1::{InsertRequest, InsertRequests}; use servers::error::{self, NotSupportedSnafu, Result}; use servers::interceptor::{GrpcQueryInterceptor, SqlQueryInterceptor}; use session::context::{QueryContext, QueryContextRef}; @@ -51,9 +51,9 @@ impl GrpcQueryInterceptor for NoopInterceptor { _query_ctx: QueryContextRef, ) -> std::result::Result<(), Self::Error> { match req { - Request::Insert(insert) => { + Request::Inserts(insert) => { ensure!( - insert.region_number == 0, + insert.inserts.iter().all(|x| x.region_number == 0), NotSupportedSnafu { feat: "region not 0" } @@ -72,15 +72,17 @@ fn test_grpc_interceptor() { let di = NoopInterceptor; let ctx = Arc::new(QueryContext::new()); - let req = Request::Insert(InsertRequest { - region_number: 1, - ..Default::default() + let req = Request::Inserts(InsertRequests { + inserts: vec![InsertRequest { + region_number: 1, + ..Default::default() + }], }); let fail = GrpcQueryInterceptor::pre_execute(&di, &req, ctx.clone()); assert!(fail.is_err()); - let req = Request::Insert(InsertRequest::default()); + let req = Request::Inserts(InsertRequests::default()); let success = GrpcQueryInterceptor::pre_execute(&di, &req, ctx); assert!(success.is_ok()); } diff --git a/src/servers/tests/mod.rs b/src/servers/tests/mod.rs index 171ef03e114a..e91eebb9d568 100644 --- a/src/servers/tests/mod.rs +++ b/src/servers/tests/mod.rs @@ -154,7 +154,7 @@ impl GrpcQueryHandler for DummyInstance { ctx: QueryContextRef, ) -> std::result::Result { let output = match request { - Request::Insert(_) | Request::Delete(_) => unimplemented!(), + Request::Inserts(_) | Request::Delete(_) => unimplemented!(), Request::Query(query_request) => { let query = query_request.query.unwrap(); match query { diff --git a/src/sql/Cargo.toml b/src/sql/Cargo.toml index 90528fd262fc..078e2bba0e32 100644 --- a/src/sql/Cargo.toml +++ b/src/sql/Cargo.toml @@ -15,7 +15,7 @@ common-time = { path = "../common/time" } datafusion-sql.workspace = true datatypes = { path = "../datatypes" } hex = "0.4" -itertools = "0.10" +itertools.workspace = true mito = { path = "../mito" } once_cell = "1.10" snafu = { version = "0.7", features = ["backtraces"] } diff --git a/src/storage/Cargo.toml b/src/storage/Cargo.toml index 51248a34b496..2aae3f549adb 100644 --- a/src/storage/Cargo.toml +++ b/src/storage/Cargo.toml @@ -25,7 +25,7 @@ datafusion-common.workspace = true datafusion-expr.workspace = true futures.workspace = true futures-util.workspace = true -itertools = "0.10" +itertools.workspace = true lazy_static = "1.4" metrics.workspace = true object-store = { path = "../object-store" } diff --git a/tests-integration/Cargo.toml b/tests-integration/Cargo.toml index f7d43fbabb4c..90bed17fa378 100644 --- a/tests-integration/Cargo.toml +++ b/tests-integration/Cargo.toml @@ -61,7 +61,7 @@ uuid.workspace = true common-procedure = { path = "../src/common/procedure" } datafusion.workspace = true datafusion-expr.workspace = true -itertools = "0.10" +itertools.workspace = true partition = { path = "../src/partition" } paste.workspace = true prost.workspace = true diff --git a/tests-integration/src/grpc.rs b/tests-integration/src/grpc.rs index 99a4cc8b3eb0..db16b171de99 100644 --- a/tests-integration/src/grpc.rs +++ b/tests-integration/src/grpc.rs @@ -23,7 +23,7 @@ mod test { use api::v1::{ alter_expr, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateDatabaseExpr, CreateTableExpr, DdlRequest, DeleteRequest, DropTableExpr, - FlushTableExpr, InsertRequest, QueryRequest, + FlushTableExpr, InsertRequest, InsertRequests, QueryRequest, }; use catalog::helper::{TableGlobalKey, TableGlobalValue}; use common_catalog::consts::MITO_ENGINE; @@ -482,7 +482,13 @@ CREATE TABLE {table_name} ( row_count: 16, ..Default::default() }; - let output = query(instance, Request::Insert(insert)).await; + let output = query( + instance, + Request::Inserts(InsertRequests { + inserts: vec![insert], + }), + ) + .await; assert!(matches!(output, Output::AffectedRows(16))); let request = Request::Query(QueryRequest { @@ -670,7 +676,9 @@ CREATE TABLE {table_name} ( }; // Test auto create not existed table upon insertion. - let request = Request::Insert(insert); + let request = Request::Inserts(InsertRequests { + inserts: vec![insert], + }); let output = query(instance, request).await; assert!(matches!(output, Output::AffectedRows(3))); @@ -703,7 +711,9 @@ CREATE TABLE {table_name} ( }; // Test auto add not existed column upon insertion. - let request = Request::Insert(insert); + let request = Request::Inserts(InsertRequests { + inserts: vec![insert], + }); let output = query(instance, request).await; assert!(matches!(output, Output::AffectedRows(3))); @@ -829,7 +839,9 @@ CREATE TABLE {table_name} ( ..Default::default() }; - let request = Request::Insert(insert); + let request = Request::Inserts(InsertRequests { + inserts: vec![insert], + }); let output = query(instance, request).await; assert!(matches!(output, Output::AffectedRows(8))); diff --git a/tests-integration/src/table.rs b/tests-integration/src/table.rs index 395f5ef5cde7..4a2d21a3f4cd 100644 --- a/tests-integration/src/table.rs +++ b/tests-integration/src/table.rs @@ -18,7 +18,9 @@ mod test { use std::sync::Arc; use api::v1::column::SemanticType; - use api::v1::{column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest}; + use api::v1::{ + column, Column, ColumnDataType, InsertRequest as GrpcInsertRequest, InsertRequests, + }; use common_meta::table_name::TableName; use common_query::logical_plan::Expr; use common_query::physical_plan::DfPhysicalPlanAdapter; @@ -46,7 +48,6 @@ mod test { #[tokio::test(flavor = "multi_thread")] async fn test_dist_table_scan() { - common_telemetry::init_default_ut_logging(); let table = Arc::new(new_dist_table("test_dist_table_scan").await); // should scan all regions // select a, row_id from numbers @@ -220,7 +221,6 @@ mod test { .downcast_ref::() .unwrap(); let partition_manager = catalog_manager.partition_manager(); - let datanode_clients = catalog_manager.datanode_clients(); let table_name = TableName::new("greptime", "public", "dist_numbers"); @@ -295,9 +295,7 @@ mod test { DistTable::new( table_name, Arc::new(table_info), - partition_manager, - datanode_clients, - catalog_manager.backend(), + Arc::new(catalog_manager.clone()), ) } @@ -345,8 +343,11 @@ mod test { row_count, region_number, }; + let requests = InsertRequests { + inserts: vec![request], + }; dn_instance - .handle_insert(request, QueryContext::arc()) + .handle_inserts(requests, &QueryContext::arc()) .await .unwrap(); } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index d01ad1de30fc..f4a7bc99a616 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -421,8 +421,6 @@ pub async fn setup_grpc_server( .unwrap(), ); - let fe_grpc_addr = format!("127.0.0.1:{}", ports::get_port()); - let fe_instance = FeInstance::try_new_standalone(instance.clone()) .await .unwrap(); @@ -434,13 +432,13 @@ pub async fn setup_grpc_server( None, runtime, )); - let grpc_server_clone = fe_grpc_server.clone(); - let fe_grpc_addr_clone = fe_grpc_addr.clone(); - tokio::spawn(async move { - let addr = fe_grpc_addr_clone.parse::().unwrap(); - grpc_server_clone.start(addr).await.unwrap() - }); + let fe_grpc_addr = "127.0.0.1:0".parse::().unwrap(); + let fe_grpc_addr = fe_grpc_server + .start(fe_grpc_addr) + .await + .unwrap() + .to_string(); // wait for GRPC server to start tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/tests-integration/tests/grpc.rs b/tests-integration/tests/grpc.rs index 9de95f1ba5b0..f07409e21c73 100644 --- a/tests-integration/tests/grpc.rs +++ b/tests-integration/tests/grpc.rs @@ -17,7 +17,8 @@ use api::v1::column::SemanticType; use api::v1::promql_request::Promql; use api::v1::{ column, AddColumn, AddColumns, AlterExpr, Column, ColumnDataType, ColumnDef, CreateTableExpr, - InsertRequest, PromInstantQuery, PromRangeQuery, PromqlRequest, RequestHeader, TableId, + InsertRequest, InsertRequests, PromInstantQuery, PromRangeQuery, PromqlRequest, RequestHeader, + TableId, }; use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_catalog::consts::{MIN_USER_TABLE_ID, MITO_ENGINE}; @@ -86,7 +87,11 @@ pub async fn test_invalid_dbname(store_type: StorageType) { ], row_count: 4, }; - let result = db.insert(request).await; + let result = db + .insert(InsertRequests { + inserts: vec![request], + }) + .await; assert!(result.is_err()); let _ = fe_grpc_server.shutdown().await; @@ -230,7 +235,11 @@ async fn insert_and_assert(db: &Database) { ], row_count: 4, }; - let result = db.insert(request).await; + let result = db + .insert(InsertRequests { + inserts: vec![request], + }) + .await; assert_eq!(result.unwrap(), 4); let result = db