Skip to content

Commit

Permalink
feat: merge scan for distributed execution (#1660)
Browse files Browse the repository at this point in the history
* generate exec plan

Signed-off-by: Ruihang Xia <[email protected]>

* move DatanodeClients to client crate

Signed-off-by: Ruihang Xia <[email protected]>

* wip MergeScanExec::to_stream

Signed-off-by: Ruihang Xia <[email protected]>

* fix compile errors

Signed-off-by: Ruihang Xia <[email protected]>

* fix default catalog

Signed-off-by: Ruihang Xia <[email protected]>

* fix expand order of new stage

Signed-off-by: Ruihang Xia <[email protected]>

* move sqlness cases contains plan out of common dir

Signed-off-by: Ruihang Xia <[email protected]>

* refactor information schema to allow duplicated scan call

Signed-off-by: Ruihang Xia <[email protected]>

* fix: ignore two cases due to substrait

Signed-off-by: Ruihang Xia <[email protected]>

* reorganise sqlness common cases

Signed-off-by: Ruihang Xia <[email protected]>

* fix typos

Signed-off-by: Ruihang Xia <[email protected]>

* redact round robin partition number

Signed-off-by: Ruihang Xia <[email protected]>

* Apply suggestions from code review

Co-authored-by: Yingwen <[email protected]>

* skip tranforming projection

Signed-off-by: Ruihang Xia <[email protected]>

* update sqlness result

Signed-off-by: Ruihang Xia <[email protected]>

* revert common/order

Signed-off-by: Ruihang Xia <[email protected]>

* fix clippy

Signed-off-by: Ruihang Xia <[email protected]>

* Update src/query/src/dist_plan/merge_scan.rs

Co-authored-by: JeremyHi <[email protected]>

* update sqlness result

Signed-off-by: Ruihang Xia <[email protected]>

* update sqlness result again

Signed-off-by: Ruihang Xia <[email protected]>

* resolve CR comments

Signed-off-by: Ruihang Xia <[email protected]>

* ignore region failover IT

Signed-off-by: Ruihang Xia <[email protected]>

* update sqlness result again and again

Signed-off-by: Ruihang Xia <[email protected]>

* unignore some tests about projection

Signed-off-by: Ruihang Xia <[email protected]>

* enable failover tests

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
Co-authored-by: Yingwen <[email protected]>
Co-authored-by: JeremyHi <[email protected]>
  • Loading branch information
3 people authored Jun 2, 2023
1 parent fe6e3da commit 2615718
Show file tree
Hide file tree
Showing 61 changed files with 1,162 additions and 116 deletions.
9 changes: 8 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 26 additions & 27 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ mod columns;
mod tables;

use std::any::Any;
use std::sync::{Arc, Mutex};
use std::sync::Arc;

use async_trait::async_trait;
use common_error::prelude::BoxedError;
use common_query::physical_plan::PhysicalPlanRef;
use common_query::prelude::Expr;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;
use store_api::storage::ScanRequest;
use table::error::{DuplicatedExecuteCallSnafu, SchemaConversionSnafu};
use table::error::{SchemaConversionSnafu, TablesRecordBatchSnafu};
use table::{Result as TableResult, Table, TableRef};

use self::columns::InformationSchemaColumns;
Expand Down Expand Up @@ -64,23 +65,21 @@ impl SchemaProvider for InformationSchemaProvider {
}

async fn table(&self, name: &str) -> Result<Option<TableRef>> {
let stream = match name.to_ascii_lowercase().as_ref() {
TABLES => InformationSchemaTables::new(
let stream_builder = match name.to_ascii_lowercase().as_ref() {
TABLES => Arc::new(InformationSchemaTables::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
)
.to_stream()?,
COLUMNS => InformationSchemaColumns::new(
)) as _,
COLUMNS => Arc::new(InformationSchemaColumns::new(
self.catalog_name.clone(),
self.catalog_provider.clone(),
)
.to_stream()?,
)) as _,
_ => {
return Ok(None);
}
};

Ok(Some(Arc::new(InformationTable::new(stream))))
Ok(Some(Arc::new(InformationTable::new(stream_builder))))
}

async fn table_exist(&self, name: &str) -> Result<bool> {
Expand All @@ -89,18 +88,21 @@ impl SchemaProvider for InformationSchemaProvider {
}
}

// TODO(ruihang): make it a more generic trait:
// https://github.com/GreptimeTeam/greptimedb/pull/1639#discussion_r1205001903
pub trait InformationStreamBuilder: Send + Sync {
fn to_stream(&self) -> Result<SendableRecordBatchStream>;

fn schema(&self) -> SchemaRef;
}

pub struct InformationTable {
schema: SchemaRef,
stream: Arc<Mutex<Option<SendableRecordBatchStream>>>,
stream_builder: Arc<dyn InformationStreamBuilder>,
}

impl InformationTable {
pub fn new(stream: SendableRecordBatchStream) -> Self {
let schema = stream.schema();
Self {
schema,
stream: Arc::new(Mutex::new(Some(stream))),
}
pub fn new(stream_builder: Arc<dyn InformationStreamBuilder>) -> Self {
Self { stream_builder }
}
}

Expand All @@ -111,7 +113,7 @@ impl Table for InformationTable {
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
self.stream_builder.schema()
}

fn table_info(&self) -> table::metadata::TableInfoRef {
Expand Down Expand Up @@ -144,13 +146,10 @@ impl Table for InformationTable {
self.schema()
};
let stream = self
.stream
.lock()
.unwrap()
.take()
.with_context(|| DuplicatedExecuteCallSnafu {
table: self.table_info().name.clone(),
})?
.stream_builder
.to_stream()
.map_err(BoxedError::new)
.context(TablesRecordBatchSnafu)?
.map(move |batch| {
batch.and_then(|batch| {
if let Some(projection) = &projection {
Expand Down
13 changes: 10 additions & 3 deletions src/catalog/src/information_schema/columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datatypes::schema::{ColumnSchema, Schema, SchemaRef};
use datatypes::vectors::{StringVectorBuilder, VectorRef};
use snafu::ResultExt;

use super::InformationStreamBuilder;
use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::CatalogProviderRef;

Expand Down Expand Up @@ -71,9 +72,15 @@ impl InformationSchemaColumns {
self.catalog_provider.clone(),
)
}
}

impl InformationStreamBuilder for InformationSchemaColumns {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

pub fn to_stream(&self) -> Result<SendableRecordBatchStream> {
let schema = self.schema().clone();
fn to_stream(&self) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
Expand Down Expand Up @@ -190,7 +197,7 @@ impl DfPartitionStream for InformationSchemaColumns {
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema().clone();
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
Expand Down
13 changes: 10 additions & 3 deletions src/catalog/src/information_schema/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use snafu::ResultExt;
use table::metadata::TableType;

use crate::error::{CreateRecordBatchSnafu, InternalSnafu, Result};
use crate::information_schema::InformationStreamBuilder;
use crate::CatalogProviderRef;

pub(super) struct InformationSchemaTables {
Expand Down Expand Up @@ -62,9 +63,15 @@ impl InformationSchemaTables {
self.catalog_provider.clone(),
)
}
}

impl InformationStreamBuilder for InformationSchemaTables {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}

pub fn to_stream(&self) -> Result<SendableRecordBatchStream> {
let schema = self.schema().clone();
fn to_stream(&self) -> Result<SendableRecordBatchStream> {
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
let stream = Box::pin(DfRecordBatchStreamAdapter::new(
schema,
Expand Down Expand Up @@ -182,7 +189,7 @@ impl DfPartitionStream for InformationSchemaTables {
}

fn execute(&self, _: Arc<TaskContext>) -> DfSendableRecordBatchStream {
let schema = self.schema().clone();
let schema = self.schema.arrow_schema().clone();
let mut builder = self.builder();
Box::pin(DfRecordBatchStreamAdapter::new(
schema,
Expand Down
5 changes: 5 additions & 0 deletions src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ version.workspace = true
edition.workspace = true
license.workspace = true

[features]
testing = []

[dependencies]
api = { path = "../api" }
arrow-flight.workspace = true
Expand All @@ -16,11 +19,13 @@ common-grpc-expr = { path = "../common/grpc-expr" }
common-query = { path = "../common/query" }
common-recordbatch = { path = "../common/recordbatch" }
common-time = { path = "../common/time" }
common-meta = { path = "../common/meta" }
common-telemetry = { path = "../common/telemetry" }
datafusion.workspace = true
datatypes = { path = "../datatypes" }
enum_dispatch = "0.3"
futures-util.workspace = true
moka = { version = "0.9", features = ["future"] }
parking_lot = "0.12"
prost.workspace = true
rand.workspace = true
Expand Down
16 changes: 13 additions & 3 deletions src/frontend/src/datanode.rs → src/client/src/client_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Debug, Formatter};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_meta::peer::Peer;
use common_telemetry::info;
use moka::future::{Cache, CacheBuilder};

use crate::Client;

pub struct DatanodeClients {
channel_manager: ChannelManager,
clients: Cache<Peer, Client>,
Expand All @@ -40,8 +42,16 @@ impl Default for DatanodeClients {
}
}

impl Debug for DatanodeClients {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DatanodeClients")
.field("channel_manager", &self.channel_manager)
.finish()
}
}

impl DatanodeClients {
pub(crate) fn start(&self) {
pub fn start(&self) {
let mut started = self.started.lock().unwrap();
if *started {
return;
Expand All @@ -53,7 +63,7 @@ impl DatanodeClients {
*started = true;
}

pub(crate) async fn get_client(&self, datanode: &Peer) -> Client {
pub async fn get_client(&self, datanode: &Peer) -> Client {
self.clients
.get_with_by_ref(datanode, async move {
Client::with_manager_and_urls(
Expand Down
1 change: 1 addition & 0 deletions src/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

mod client;
pub mod client_manager;
mod database;
mod error;
pub mod load_balance;
Expand Down
4 changes: 3 additions & 1 deletion src/cmd/src/cli/repl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ use std::sync::Arc;
use std::time::Instant;

use catalog::remote::CachedMetaKvBackend;
use client::client_manager::DatanodeClients;
use client::{Client, Database, DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_error::prelude::ErrorExt;
use common_query::Output;
use common_recordbatch::RecordBatches;
use common_telemetry::logging;
use either::Either;
use frontend::catalog::FrontendCatalogManager;
use frontend::datanode::DatanodeClients;
use meta_client::client::MetaClientBuilder;
use partition::manager::PartitionRuleManager;
use partition::route::TableRoutes;
Expand Down Expand Up @@ -269,6 +269,8 @@ async fn create_query_engine(meta_addr: &str) -> Result<DatafusionQueryEngine> {
let state = Arc::new(QueryEngineState::new(
catalog_list,
false,
None,
None,
Default::default(),
));

Expand Down
1 change: 1 addition & 0 deletions src/common/grpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ common-base = { path = "../base" }
common-error = { path = "../error" }
common-function-macro = { path = "../function-macro" }
common-query = { path = "../query" }
common-meta = { path = "../meta" }
common-recordbatch = { path = "../recordbatch" }
common-runtime = { path = "../runtime" }
common-telemetry = { path = "../telemetry" }
Expand Down
2 changes: 1 addition & 1 deletion src/common/meta/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ common-runtime = { path = "../runtime" }
common-telemetry = { path = "../telemetry" }
common-time = { path = "../time" }
serde.workspace = true
snafu.workspace = true
serde_json.workspace = true
snafu.workspace = true
store-api = { path = "../../store-api" }
table = { path = "../../table" }
tokio.workspace = true
Expand Down
9 changes: 9 additions & 0 deletions src/common/meta/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Display, Formatter};

use api::v1::meta::Peer as PbPeer;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Hash, Eq, PartialEq, Deserialize, Serialize)]
pub struct Peer {
/// Node identifier. Unique in a cluster.
pub id: u64,
pub addr: String,
}
Expand Down Expand Up @@ -47,3 +50,9 @@ impl Peer {
}
}
}

impl Display for Peer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "peer-{}({})", self.id, self.addr)
}
}
Loading

0 comments on commit 2615718

Please sign in to comment.