Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into relex_sort_merge_join…
Browse files Browse the repository at this point in the history
…_keys
  • Loading branch information
alamb committed Jan 29, 2024
2 parents 6572542 + 9bf0f68 commit 25040ec
Show file tree
Hide file tree
Showing 41 changed files with 2,403 additions and 687 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,4 @@ lto = false
opt-level = 3
overflow-checks = false
panic = 'unwind'
rpath = false
rpath = false
3 changes: 1 addition & 2 deletions benchmarks/queries/clickbench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ SELECT COUNT(DISTINCT "SearchPhrase"), COUNT(DISTINCT "MobilePhone"), COUNT(DIST
FROM hits;
```


### Q1: Data Exploration

**Question**: "How many distinct "hit color", "browser country" and "language" are there in the dataset?"
Expand All @@ -42,7 +41,7 @@ SELECT COUNT(DISTINCT "HitColor"), COUNT(DISTINCT "BrowserCountry"), COUNT(DISTI
FROM hits;
```

### Q2: Top 10 anaylsis
### Q2: Top 10 analysis

**Question**: "Find the top 10 "browser country" by number of distinct "social network"s,
including the distinct counts of "hit color", "browser language",
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

134 changes: 132 additions & 2 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,11 @@ pub(crate) const TABLES: &str = "tables";
pub(crate) const VIEWS: &str = "views";
pub(crate) const COLUMNS: &str = "columns";
pub(crate) const DF_SETTINGS: &str = "df_settings";
pub(crate) const SCHEMATA: &str = "schemata";

/// All information schema tables
pub const INFORMATION_SCHEMA_TABLES: &[&str] = &[TABLES, VIEWS, COLUMNS, DF_SETTINGS];
pub const INFORMATION_SCHEMA_TABLES: &[&str] =
&[TABLES, VIEWS, COLUMNS, DF_SETTINGS, SCHEMATA];

/// Implements the `information_schema` virtual schema and tables
///
Expand Down Expand Up @@ -115,6 +117,27 @@ impl InformationSchemaConfig {
DF_SETTINGS,
TableType::View,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
SCHEMATA,
TableType::View,
);
}
}

async fn make_schemata(&self, builder: &mut InformationSchemataBuilder) {
for catalog_name in self.catalog_list.catalog_names() {
let catalog = self.catalog_list.catalog(&catalog_name).unwrap();

for schema_name in catalog.schema_names() {
if schema_name != INFORMATION_SCHEMA {
if let Some(schema) = catalog.schema(&schema_name) {
let schema_owner = schema.owner_name();
builder.add_schemata(&catalog_name, &schema_name, schema_owner);
}
}
}
}
}

Expand Down Expand Up @@ -196,6 +219,7 @@ impl SchemaProvider for InformationSchemaProvider {
VIEWS.to_string(),
COLUMNS.to_string(),
DF_SETTINGS.to_string(),
SCHEMATA.to_string(),
]
}

Expand All @@ -209,6 +233,8 @@ impl SchemaProvider for InformationSchemaProvider {
Arc::new(InformationSchemaViews::new(config))
} else if name.eq_ignore_ascii_case("df_settings") {
Arc::new(InformationSchemaDfSettings::new(config))
} else if name.eq_ignore_ascii_case("schemata") {
Arc::new(InformationSchemata::new(config))
} else {
return None;
};
Expand All @@ -219,7 +245,10 @@ impl SchemaProvider for InformationSchemaProvider {
}

fn table_exist(&self, name: &str) -> bool {
matches!(name.to_ascii_lowercase().as_str(), TABLES | VIEWS | COLUMNS)
matches!(
name.to_ascii_lowercase().as_str(),
TABLES | VIEWS | COLUMNS | SCHEMATA
)
}
}

Expand Down Expand Up @@ -617,6 +646,107 @@ impl InformationSchemaColumnsBuilder {
}
}

struct InformationSchemata {
schema: SchemaRef,
config: InformationSchemaConfig,
}

impl InformationSchemata {
fn new(config: InformationSchemaConfig) -> Self {
let schema = Arc::new(Schema::new(vec![
Field::new("catalog_name", DataType::Utf8, false),
Field::new("schema_name", DataType::Utf8, false),
Field::new("schema_owner", DataType::Utf8, true),
Field::new("default_character_set_catalog", DataType::Utf8, true),
Field::new("default_character_set_schema", DataType::Utf8, true),
Field::new("default_character_set_name", DataType::Utf8, true),
Field::new("sql_path", DataType::Utf8, true),
]));
Self { schema, config }
}

fn builder(&self) -> InformationSchemataBuilder {
InformationSchemataBuilder {
schema: self.schema.clone(),
catalog_name: StringBuilder::new(),
schema_name: StringBuilder::new(),
schema_owner: StringBuilder::new(),
default_character_set_catalog: StringBuilder::new(),
default_character_set_schema: StringBuilder::new(),
default_character_set_name: StringBuilder::new(),
sql_path: StringBuilder::new(),
}
}
}

struct InformationSchemataBuilder {
schema: SchemaRef,
catalog_name: StringBuilder,
schema_name: StringBuilder,
schema_owner: StringBuilder,
default_character_set_catalog: StringBuilder,
default_character_set_schema: StringBuilder,
default_character_set_name: StringBuilder,
sql_path: StringBuilder,
}

impl InformationSchemataBuilder {
fn add_schemata(
&mut self,
catalog_name: &str,
schema_name: &str,
schema_owner: Option<&str>,
) {
self.catalog_name.append_value(catalog_name);
self.schema_name.append_value(schema_name);
match schema_owner {
Some(owner) => self.schema_owner.append_value(owner),
None => self.schema_owner.append_null(),
}
// refer to https://www.postgresql.org/docs/current/infoschema-schemata.html,
// these rows apply to a feature that is not implemented in DataFusion
self.default_character_set_catalog.append_null();
self.default_character_set_schema.append_null();
self.default_character_set_name.append_null();
self.sql_path.append_null();
}

fn finish(&mut self) -> RecordBatch {
RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(self.catalog_name.finish()),
Arc::new(self.schema_name.finish()),
Arc::new(self.schema_owner.finish()),
Arc::new(self.default_character_set_catalog.finish()),
Arc::new(self.default_character_set_schema.finish()),
Arc::new(self.default_character_set_name.finish()),
Arc::new(self.sql_path.finish()),
],
)
.unwrap()
}
}

impl PartitionStream for InformationSchemata {
fn schema(&self) -> &SchemaRef {
&self.schema
}

fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
let mut builder = self.builder();
let config = self.config.clone();
Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
// TODO: Stream this
futures::stream::once(async move {
config.make_schemata(&mut builder).await;
Ok(builder.finish())
}),
))
}
}

struct InformationSchemaDfSettings {
schema: SchemaRef,
config: InformationSchemaConfig,
Expand Down
6 changes: 6 additions & 0 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ use crate::error::{DataFusionError, Result};
/// [`CatalogProvider`]: super::CatalogProvider
#[async_trait]
pub trait SchemaProvider: Sync + Send {
/// Returns the owner of the Schema, default is None. This value is reported
/// as part of `information_tables.schemata
fn owner_name(&self) -> Option<&str> {
None
}

/// Returns this `SchemaProvider` as [`Any`] so that it can be downcast to a
/// specific implementation.
fn as_any(&self) -> &dyn Any;
Expand Down
Loading

0 comments on commit 25040ec

Please sign in to comment.