Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: metadata columns #14057

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion-examples/examples/metadata_columns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use datafusion::physical_plan::{
use datafusion::prelude::*;

use datafusion::catalog::Session;
use datafusion_common::FieldId;
use datafusion::common::FieldId;
use itertools::Itertools;
use tokio::time::timeout;

Expand Down
240 changes: 189 additions & 51 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use std::sync::Arc;
use crate::error::{DataFusionError, Result, _plan_err, _schema_err};
use crate::{
field_not_found, unqualified_field_not_found, Column, FunctionalDependencies,
SchemaError, TableReference,
JoinType, SchemaError, TableReference,
};

use arrow::compute::can_cast_types;
Expand Down Expand Up @@ -253,6 +253,11 @@ impl QualifiedSchema {
.expect("field qualifier length should match schema")
}

/// Get a reference to the underlying Arrow Schema
pub fn schema(&self) -> SchemaRef {
Arc::clone(&self.schema)
}

/// Checks if the schema is empty.
///
/// Returns:
Expand Down Expand Up @@ -366,6 +371,128 @@ impl QualifiedSchema {
pub fn field_qualifier(&self, i: usize) -> Option<&TableReference> {
self.field_qualifiers[i].as_ref()
}
/// Join two qualified schemas together by concatenating their fields.
///
/// This method creates a new schema by combining the fields from `self` followed by the fields from `schema`.
/// The metadata from both schemas is also merged.
///
/// # Arguments
///
/// * `schema` - The schema to join with this schema
///
/// # Returns
///
/// Returns a new `QualifiedSchema` containing all fields from both schemas, or an error if the join fails.
///
/// # Example
///
/// ```rust
/// use datafusion_common::{DFSchema, QualifiedSchema, TableReference};
/// use arrow_schema::{Field, Schema};
/// use arrow::datatypes::DataType;
/// use std::sync::Arc;
///
/// let schema1 = QualifiedSchema::new_with_table(
/// Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
/// &TableReference::from("t1")
/// );
///
/// let schema2 = QualifiedSchema::new_with_table(
/// Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)])),
/// &TableReference::from("t2")
/// );
///
/// let joined = schema1.join(&schema2).unwrap();
/// assert_eq!(joined.len(), 2);
/// ```
pub fn join(&self, schema: &QualifiedSchema) -> Result<Self> {
let mut schema_builder = SchemaBuilder::new();
schema_builder.extend(self.fields().iter().cloned());
schema_builder.extend(schema.fields().iter().cloned());
let new_schema = schema_builder.finish();

let mut new_metadata: HashMap<String, String> = self.schema.metadata.clone();
new_metadata.extend(schema.schema.metadata.clone());
let new_schema_with_metadata = new_schema.with_metadata(new_metadata);

let mut new_qualifiers = self.field_qualifiers.clone();
new_qualifiers.extend_from_slice(schema.field_qualifiers.as_slice());

QualifiedSchema::new(Arc::new(new_schema_with_metadata), new_qualifiers)
}

/// Merge another schema into this schema, ignoring any duplicate fields.
///
/// This method modifies the current schema by appending non-duplicate fields from the other schema.
/// Fields are considered duplicates if:
/// - For qualified fields: they have the same qualifier and field name
/// - For unqualified fields: they have the same field name
///
/// # Arguments
///
/// * `other_schema` - The schema to merge into this one
///
/// # Example
///
/// ```
/// use datafusion_common::{DFSchema, QualifiedSchema, TableReference};
/// use arrow_schema::{Field, Schema};
/// use arrow::datatypes::DataType;
/// use std::sync::Arc;
///
/// let mut schema1 = QualifiedSchema::new_with_table(
/// Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])),
/// &TableReference::from("t1")
/// );
///
/// let schema2 = QualifiedSchema::new_with_table(
/// Arc::new(Schema::new(vec![
/// Field::new("id", DataType::Int32, false),
/// Field::new("name", DataType::Utf8, false)
/// ])),
/// &TableReference::from("t1")
/// );
///
/// schema1.merge(&schema2);
/// // Only "name" is added since "id" already exists
/// assert_eq!(schema1.len(), 2);
/// ```
pub fn merge(&mut self, other_schema: &QualifiedSchema) {
if other_schema.schema.fields.is_empty() {
return;
}

let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
self.iter().collect();
let self_unqualified_names: HashSet<&str> = self
.schema
.fields
.iter()
.map(|field| field.name().as_str())
.collect();

let mut schema_builder = SchemaBuilder::from(self.schema.fields.clone());
let mut qualifiers = Vec::new();
for (qualifier, field) in other_schema.iter() {
// skip duplicate columns
let duplicated_field = match qualifier {
Some(q) => self_fields.contains(&(Some(q), field)),
// for unqualified columns, check as unqualified name
None => self_unqualified_names.contains(field.name().as_str()),
};
if !duplicated_field {
schema_builder.push(Arc::clone(field));
qualifiers.push(qualifier.cloned());
}
}
let mut metadata = self.schema.metadata.clone();
metadata.extend(other_schema.schema.metadata.clone());

let finished = schema_builder.finish();
let finished_with_metadata = finished.with_metadata(metadata);
self.schema = finished_with_metadata.into();
self.field_qualifiers.extend(qualifiers);
}
}

impl DFSchema {
Expand Down Expand Up @@ -555,27 +682,66 @@ impl DFSchema {
}
}

/// Build metadata schema for join operation based on join type
///
/// # Arguments
/// * `left` - Left input's qualified schema
/// * `right` - Right input's qualified schema
/// * `join_type` - Type of join operation
///
/// # Returns
/// * `Result<Option<QualifiedSchema>>` - The resulting metadata schema after join
///
/// # Details
/// For different join types:
/// - Left/LeftSemi/LeftAnti/LeftMark joins: Use left schema
/// - Right/RightSemi/RightAnti joins: Use right schema
/// - Inner/Full joins: Join both schemas if they exist
pub fn join_metadata_schema(
left: &Option<QualifiedSchema>,
right: &Option<QualifiedSchema>,
join_type: &JoinType,
) -> Result<Option<QualifiedSchema>> {
match join_type {
JoinType::LeftSemi | JoinType::LeftMark => Ok(left.clone()),
JoinType::RightSemi => Ok(right.clone()),
_ => {
let ret = match (left, right) {
(Some(left), Some(right)) => Some(left.join(right)?),
(None, Some(right)) => Some(right.clone()),
(Some(left), None) => Some(left.clone()),
(None, None) => None,
};
Ok(ret)
}
}
}

/// Create a new schema that contains the fields from this schema followed by the fields
/// from the supplied schema. An error will be returned if there are duplicate field names.
pub fn join(&self, schema: &DFSchema) -> Result<Self> {
let mut schema_builder = SchemaBuilder::new();
schema_builder.extend(self.inner.fields().iter().cloned());
schema_builder.extend(schema.fields().iter().cloned());
let new_schema = schema_builder.finish();

let mut new_metadata: HashMap<String, String> =
self.inner.schema.metadata.clone();
new_metadata.extend(schema.inner.schema.metadata.clone());
let new_schema_with_metadata = new_schema.with_metadata(new_metadata);

let mut new_qualifiers = self.inner.field_qualifiers.clone();
new_qualifiers.extend_from_slice(schema.inner.field_qualifiers.as_slice());

pub fn join_with_type(
&self,
schema: &DFSchema,
join_type: &JoinType,
) -> Result<Self> {
let new_self = Self {
inner: QualifiedSchema::new(
Arc::new(new_schema_with_metadata),
new_qualifiers,
inner: self.inner.join(&schema.inner)?,
functional_dependencies: FunctionalDependencies::empty(),
metadata: DFSchema::join_metadata_schema(
&self.metadata,
&schema.metadata,
join_type,
)?,
};
new_self.check_names()?;
Ok(new_self)
}

/// Create a new schema that contains the fields from this schema followed by the fields
/// from the supplied schema. An error will be returned if there are duplicate field names.
pub fn join(&self, schema: &DFSchema) -> Result<Self> {
let new_self = Self {
inner: self.inner.join(&schema.inner)?,
functional_dependencies: FunctionalDependencies::empty(),
metadata: None,
};
Expand All @@ -586,41 +752,13 @@ impl DFSchema {
/// Modify this schema by appending the fields from the supplied schema, ignoring any
/// duplicate fields.
pub fn merge(&mut self, other_schema: &DFSchema) {
if other_schema.inner.schema.fields.is_empty() {
return;
}

let self_fields: HashSet<(Option<&TableReference>, &FieldRef)> =
self.iter().collect();
let self_unqualified_names: HashSet<&str> = self
.inner
.schema
.fields
.iter()
.map(|field| field.name().as_str())
.collect();

let mut schema_builder = SchemaBuilder::from(self.inner.schema.fields.clone());
let mut qualifiers = Vec::new();
for (qualifier, field) in other_schema.iter() {
// skip duplicate columns
let duplicated_field = match qualifier {
Some(q) => self_fields.contains(&(Some(q), field)),
// for unqualified columns, check as unqualified name
None => self_unqualified_names.contains(field.name().as_str()),
};
if !duplicated_field {
schema_builder.push(Arc::clone(field));
qualifiers.push(qualifier.cloned());
self.inner.merge(&other_schema.inner);
if let Some(other_metadata) = &other_schema.metadata {
match &mut self.metadata {
Some(metadata) => metadata.merge(other_metadata),
None => self.metadata = Some(other_metadata.clone()),
}
}
let mut metadata = self.inner.schema.metadata.clone();
metadata.extend(other_schema.inner.schema.metadata.clone());

let finished = schema_builder.finish();
let finished_with_metadata = finished.with_metadata(metadata);
self.inner.schema = finished_with_metadata.into();
self.inner.field_qualifiers.extend(qualifiers);
}

/// Get a list of fields
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ use uuid::Uuid;
/// # #[tokio::main]
/// # async fn main() -> Result<()> {
/// let state = SessionStateBuilder::new()
/// .with_config(SessionConfig::new())
/// .with_config(SessionConfig::new())
/// .with_runtime_env(Arc::new(RuntimeEnv::default()))
/// .with_default_features()
/// .build();
/// Ok(())
/// Ok(())
/// # }
/// ```
///
Expand Down Expand Up @@ -1330,7 +1330,7 @@ impl SessionStateBuilder {
/// let url = Url::try_from("file://").unwrap();
/// let object_store = object_store::local::LocalFileSystem::new();
/// let state = SessionStateBuilder::new()
/// .with_config(SessionConfig::new())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this file add no value and should be reverted.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @Omega359 could you please review latested commits. I think these are already reverted before.

/// .with_config(SessionConfig::new())
/// .with_object_store(&url, Arc::new(object_store))
/// .with_default_features()
/// .build();
Expand Down
9 changes: 7 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::{
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
ScalarValue,
FieldId, ScalarValue,
};
use datafusion_expr::dml::{CopyTo, InsertOp};
use datafusion_expr::expr::{
Expand Down Expand Up @@ -1979,7 +1979,12 @@ impl DefaultPhysicalPlanner {
match input_schema.index_of_column(col) {
Ok(idx) => {
// index physical field using logical field index
Ok(input_exec.schema().field(idx).name().to_string())
match FieldId::from(idx) {
FieldId::Normal(idx) => {
Ok(input_exec.schema().field(idx).name().to_string())
}
FieldId::Metadata(_) => Ok(String::from(col.name())),
}
}
// logical column is not a derived column, safe to pass along to
// physical_name
Expand Down
Loading