Skip to content

Commit

Permalink
feat: make grpc insert requests in a batch (#1687)
Browse files Browse the repository at this point in the history
* feat: make Prometheus remote write in a batch

* rebase

* fix: resolve PR comments

* fix: resolve PR comments

* fix: resolve PR comments
  • Loading branch information
MichaelScofield authored Jun 2, 2023
1 parent 8e69aef commit 5004cf6
Show file tree
Hide file tree
Showing 53 changed files with 1,133 additions and 441 deletions.
5 changes: 4 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ datafusion-sql = { git = "https://github.com/waynexia/arrow-datafusion.git", rev
datafusion-substrait = { git = "https://github.com/waynexia/arrow-datafusion.git", rev = "63e52dde9e44cac4b1f6c6e6b6bf6368ba3bd323" }
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "ff0a47b6462bf196cbcd01b589c5dddfa6bfbc45" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "4398d20c56d5f7939cc2960789cb1fa7dd18e6fe" }
itertools = "0.10"
parquet = "40.0"
paste = "1.0"
prost = "0.11"
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ arrow.workspace = true
clap = { version = "4.0", features = ["derive"] }
client = { path = "../src/client" }
indicatif = "0.17.1"
itertools = "0.10.5"
itertools.workspace = true
parquet.workspace = true
tokio.workspace = true
10 changes: 8 additions & 2 deletions benchmarks/src/bin/nyc-taxi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use arrow::datatypes::{DataType, Float64Type, Int64Type};
use arrow::record_batch::RecordBatch;
use clap::Parser;
use client::api::v1::column::Values;
use client::api::v1::{Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest};
use client::api::v1::{
Column, ColumnDataType, ColumnDef, CreateTableExpr, InsertRequest, InsertRequests,
};
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
Expand Down Expand Up @@ -107,8 +109,12 @@ async fn write_data(
columns,
row_count,
};
let requests = InsertRequests {
inserts: vec![request],
};

let now = Instant::now();
db.insert(request).await.unwrap();
db.insert(requests).await.unwrap();
let elapsed = now.elapsed();
total_rpc_elapsed_ms += elapsed.as_millis();
progress_bar.inc(row_count as _);
Expand Down
2 changes: 1 addition & 1 deletion src/api/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ pub fn push_vals(column: &mut Column, origin_count: usize, vector: VectorRef) {
/// Returns the type name of the [Request].
pub fn request_type(request: &Request) -> &'static str {
match request {
Request::Insert(_) => "insert",
Request::Inserts(_) => "inserts",
Request::Query(query_req) => query_request_type(query_req),
Request::Ddl(ddl_req) => ddl_request_type(ddl_req),
Request::Delete(_) => "delete",
Expand Down
4 changes: 4 additions & 0 deletions src/catalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true

[features]
testing = []

[dependencies]
api = { path = "../api" }
arc-swap = "1.0"
Expand Down Expand Up @@ -42,6 +45,7 @@ table = { path = "../table" }
tokio.workspace = true

[dev-dependencies]
catalog = { path = ".", features = ["testing"] }
common-test-util = { path = "../common/test-util" }
chrono.workspace = true
log-store = { path = "../log-store" }
Expand Down
3 changes: 3 additions & 0 deletions src/catalog/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use crate::error::Error;
mod client;
mod manager;

#[cfg(feature = "testing")]
pub mod mock;

#[derive(Debug, Clone)]
pub struct Kv(pub Vec<u8>, pub Vec<u8>);

Expand Down
7 changes: 4 additions & 3 deletions src/catalog/tests/mock.rs → src/catalog/src/remote/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ use std::str::FromStr;
use std::sync::Arc;

use async_stream::stream;
use catalog::error::Error;
use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::{Kv, KvBackend, ValueIter};
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_recordbatch::RecordBatch;
use common_telemetry::logging::info;
Expand All @@ -37,6 +34,10 @@ use table::test_util::MemTable;
use table::TableRef;
use tokio::sync::RwLock;

use crate::error::Error;
use crate::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use crate::remote::{Kv, KvBackend, ValueIter};

pub struct MockKvBackend {
map: RwLock<BTreeMap<Vec<u8>, Vec<u8>>>,
}
Expand Down
5 changes: 1 addition & 4 deletions src/catalog/tests/remote_catalog_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@

#![feature(assert_matches)]

mod mock;

#[cfg(test)]
mod tests {
use std::assert_matches::assert_matches;
use std::collections::HashSet;
use std::sync::Arc;

use catalog::helper::{CatalogKey, CatalogValue, SchemaKey, SchemaValue};
use catalog::remote::mock::{MockKvBackend, MockTableEngine};
use catalog::remote::{
CachedMetaKvBackend, KvBackend, KvBackendRef, RemoteCatalogManager, RemoteCatalogProvider,
RemoteSchemaProvider,
Expand All @@ -35,8 +34,6 @@ mod tests {
use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;

use crate::mock::{MockKvBackend, MockTableEngine};

#[tokio::test]
async fn test_backend() {
common_telemetry::init_default_ut_logging();
Expand Down
7 changes: 5 additions & 2 deletions src/client/src/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use common_grpc::channel_manager::ChannelManager;
use common_grpc::channel_manager::{ChannelConfig, ChannelManager};
use common_meta::peer::Peer;
use common_telemetry::info;
use moka::future::{Cache, CacheBuilder};
Expand All @@ -31,8 +31,11 @@ pub struct DatanodeClients {

impl Default for DatanodeClients {
fn default() -> Self {
// TODO(LFC): Make this channel config configurable.
let config = ChannelConfig::new().timeout(Duration::from_secs(8));

Self {
channel_manager: ChannelManager::new(),
channel_manager: ChannelManager::with_config(config),
clients: CacheBuilder::new(1024)
.time_to_live(Duration::from_secs(30 * 60))
.time_to_idle(Duration::from_secs(5 * 60))
Expand Down
6 changes: 3 additions & 3 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use api::v1::greptime_request::Request;
use api::v1::query_request::Query;
use api::v1::{
greptime_response, AffectedRows, AlterExpr, AuthHeader, CreateTableExpr, DdlRequest,
DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequest, PromRangeQuery,
DeleteRequest, DropTableExpr, FlushTableExpr, GreptimeRequest, InsertRequests, PromRangeQuery,
QueryRequest, RequestHeader,
};
use arrow_flight::{FlightData, Ticket};
Expand Down Expand Up @@ -109,9 +109,9 @@ impl Database {
});
}

pub async fn insert(&self, request: InsertRequest) -> Result<u32> {
pub async fn insert(&self, requests: InsertRequests) -> Result<u32> {
let _timer = timer!(metrics::METRIC_GRPC_INSERT);
self.handle(Request::Insert(request)).await
self.handle(Request::Inserts(requests)).await
}

pub async fn delete(&self, request: DeleteRequest) -> Result<u32> {
Expand Down
5 changes: 3 additions & 2 deletions src/common/grpc/src/channel_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use tower::make::MakeConnection;
use crate::error::{CreateChannelSnafu, InvalidConfigFilePathSnafu, InvalidTlsConfigSnafu, Result};

const RECYCLE_CHANNEL_INTERVAL_SECS: u64 = 60;
const DEFAULT_REQUEST_TIMEOUT_SECS: u64 = 2;

#[derive(Clone, Debug)]
pub struct ChannelManager {
Expand Down Expand Up @@ -236,7 +237,7 @@ pub struct ChannelConfig {
impl Default for ChannelConfig {
fn default() -> Self {
Self {
timeout: Some(Duration::from_secs(2)),
timeout: Some(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)),
connect_timeout: Some(Duration::from_secs(4)),
concurrency_limit: None,
rate_limit: None,
Expand Down Expand Up @@ -497,7 +498,7 @@ mod tests {
let default_cfg = ChannelConfig::new();
assert_eq!(
ChannelConfig {
timeout: Some(Duration::from_secs(2)),
timeout: Some(Duration::from_secs(DEFAULT_REQUEST_TIMEOUT_SECS)),
connect_timeout: Some(Duration::from_secs(4)),
concurrency_limit: None,
rate_limit: None,
Expand Down
31 changes: 26 additions & 5 deletions src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use api::v1::meta::{
};
use serde::{Deserialize, Serialize, Serializer};
use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;
use store_api::storage::{RegionId, RegionNumber};
use table::metadata::RawTableInfo;

use crate::error::{self, Result};
Expand Down Expand Up @@ -141,9 +141,23 @@ impl TryFrom<PbRouteResponse> for RouteResponse {
pub struct TableRoute {
pub table: Table,
pub region_routes: Vec<RegionRoute>,
region_leaders: HashMap<RegionNumber, Option<Peer>>,
}

impl TableRoute {
pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
let region_leaders = region_routes
.iter()
.map(|x| (x.region.id as RegionNumber, x.leader_peer.clone()))
.collect::<HashMap<_, _>>();

Self {
table,
region_routes,
region_leaders,
}
}

pub fn try_from_raw(peers: &[PbPeer], table_route: PbTableRoute) -> Result<Self> {
let table = table_route
.table
Expand Down Expand Up @@ -179,10 +193,7 @@ impl TableRoute {
});
}

Ok(Self {
table,
region_routes,
})
Ok(Self::new(table, region_routes))
}

pub fn try_into_raw(self) -> Result<(Vec<PbPeer>, PbTableRoute)> {
Expand Down Expand Up @@ -267,6 +278,12 @@ impl TableRoute {
})
.collect()
}

pub fn find_region_leader(&self, region_number: RegionNumber) -> Option<&Peer> {
self.region_leaders
.get(&region_number)
.and_then(|x| x.as_ref())
}
}

#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
Expand Down Expand Up @@ -670,6 +687,10 @@ mod tests {
follower_peers: vec![Peer::new(2, "a2"), Peer::new(3, "a3")],
},
],
region_leaders: HashMap::from([
(2, Some(Peer::new(1, "a1"))),
(1, Some(Peer::new(2, "a2"))),
]),
};

let from_raw = TableRoute::try_from_raw(&raw_peers, raw_table_route.clone()).unwrap();
Expand Down
23 changes: 19 additions & 4 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ pub enum Error {
source: servers::error::Error,
},

#[snafu(display("Failed to wait for GRPC serving, source: {}", source))]
WaitForGrpcServing {
source: servers::error::Error,
location: Location,
},

#[snafu(display("Failed to parse address {}, source: {}", addr, source))]
ParseAddr {
addr: String,
Expand Down Expand Up @@ -467,6 +473,12 @@ pub enum Error {

#[snafu(display("Missing WAL dir config"))]
MissingWalDirConfig { location: Location },

#[snafu(display("Failed to join task, source: {}", source))]
JoinTask {
source: common_runtime::JoinError,
location: Location,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand Down Expand Up @@ -537,16 +549,19 @@ impl ErrorExt for Error {
| GetTable { source, .. } => source.status_code(),

// TODO(yingwen): Further categorize http error.
StartServer { .. }
| ParseAddr { .. }
ParseAddr { .. }
| CreateDir { .. }
| RemoveDir { .. }
| Catalog { .. }
| MissingRequiredField { .. }
| IncorrectInternalState { .. }
| ShutdownServer { .. }
| ShutdownInstance { .. }
| CloseTableEngine { .. } => StatusCode::Internal,
| CloseTableEngine { .. }
| JoinTask { .. } => StatusCode::Internal,

StartServer { source }
| ShutdownServer { source }
| WaitForGrpcServing { source, .. } => source.status_code(),

InitBackend { .. } => StatusCode::StorageUnavailable,

Expand Down
Loading

0 comments on commit 5004cf6

Please sign in to comment.