From a3fa170b1129bba0fc2244a61fb8af17a8462f57 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Tue, 25 Feb 2025 12:15:59 +0900 Subject: [PATCH 1/8] Initial work on metadata columns --- .../core/src/datasource/listing/table.rs | 179 ++++++++++++++---- 1 file changed, 146 insertions(+), 33 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 15125fe5a090..2f450597d7e1 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -40,7 +40,7 @@ use datafusion_expr::{SortExpr, TableType}; use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; -use arrow_schema::Schema; +use arrow_schema::{Schema, TimeUnit}; use datafusion_common::{ config_datafusion_err, internal_err, plan_err, project_schema, Constraints, SchemaExt, ToDFSchema, @@ -59,6 +59,8 @@ use futures::{future, stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; +const METADATA_COLUMNS: [&str; 3] = ["location", "last_modified", "size"]; + /// Configuration for creating a [`ListingTable`] #[derive(Debug, Clone)] pub struct ListingTableConfig { @@ -259,6 +261,9 @@ pub struct ListingOptions { /// multiple equivalent orderings, the outer `Vec` will have a /// single element. pub file_sort_order: Vec>, + /// Additional columns to include in the table schema, based on the metadata of the files. + /// See [Self::with_metadata_cols] for details. + pub metadata_cols: Vec, } impl ListingOptions { @@ -276,6 +281,7 @@ impl ListingOptions { collect_stat: true, target_partitions: 1, file_sort_order: vec![], + metadata_cols: vec![], } } @@ -394,6 +400,50 @@ impl ListingOptions { self } + /// Set metadata columns on [`ListingOptions`] and returns self. + /// + /// "metadata columns" are columns that are computed from the `ObjectMeta` of the files from object store. + /// + /// Available metadata columns: + /// - `location`: The full path to the object + /// - `last_modified`: The last modified time + /// - `size`: The size in bytes of the object + /// + /// For example, given the following files in object store: + /// + /// ```text + /// /mnt/nyctaxi/tripdata01.parquet + /// /mnt/nyctaxi/tripdata02.parquet + /// /mnt/nyctaxi/tripdata03.parquet + /// ``` + /// + /// If the `last_modified` field in the `ObjectMeta` for `tripdata01.parquet` is `2024-01-01 12:00:00`, + /// then the table schema will include a column named `last_modified` with the value `2024-01-01 12:00:00` + /// for all rows read from `tripdata01.parquet`. + /// + /// | | last_modified | + /// |-----------------|-----------------------| + /// | ... | 2024-01-01 12:00:00 | + /// | ... | 2024-01-02 15:30:00 | + /// | ... | 2024-01-03 09:15:00 | + /// + /// # Example + /// ``` + /// # use std::sync::Arc; + /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; + /// + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::default() + /// )) + /// .with_metadata_cols(vec!["last_modified".to_string()]); + /// + /// assert_eq!(listing_options.metadata_cols, vec!["last_modified".to_string()]); + /// ``` + pub fn with_metadata_cols(mut self, metadata_cols: Vec) -> Self { + self.metadata_cols = metadata_cols; + self + } + /// Set stat collection on [`ListingOptions`] and returns self. /// /// ``` @@ -478,6 +528,25 @@ impl ListingOptions { Ok(schema) } + /// Validates that the metadata columns do not already exist in the schema, and are one of the allowed metadata columns. + pub fn validate_metadata_cols(&self, schema: &SchemaRef) -> Result<()> { + for col in self.metadata_cols.iter() { + if schema.column_with_name(col).is_some() { + return plan_err!("Column {} already exists in schema", col); + } + + if !METADATA_COLUMNS.contains(&col.as_str()) { + return plan_err!( + "Metadata column {} invalid; must be one of {:?}", + col, + METADATA_COLUMNS + ); + } + } + + Ok(()) + } + /// Infers the partition columns stored in `LOCATION` and compares /// them with the columns provided in `PARTITIONED BY` to help prevent /// accidental corrupts of partitioned tables. @@ -583,6 +652,15 @@ impl ListingOptions { } } } + + pub(crate) fn metadata_column_type(col: &str) -> Result { + match col { + "location" => Ok(DataType::Utf8), + "last_modified" => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)), + "size" => Ok(DataType::UInt64), + _ => plan_err!("Invalid metadata column: {}", col), + } + } } /// Reads data from one or more files as a single table. @@ -684,7 +762,7 @@ pub struct ListingTable { table_paths: Vec, /// File fields only file_schema: SchemaRef, - /// File fields + partition columns + /// File fields + partition columns + metadata columns table_schema: SchemaRef, options: ListingOptions, definition: Option, @@ -701,9 +779,8 @@ impl ListingTable { /// `ListingOptions` and `SchemaRef` are optional. If they are not /// provided the file type is inferred based on the file suffix. /// If the schema is provided then it must be resolved before creating the table - /// and should contain the fields of the file without the table - /// partitioning columns. - /// + /// and should contain the fields of the file without the extended table columns, + /// i.e. the partitioning and metadata columns. pub fn try_new(config: ListingTableConfig) -> Result { let file_schema = config .file_schema @@ -719,6 +796,16 @@ impl ListingTable { builder.push(Field::new(part_col_name, part_col_type.clone(), false)); } + // Validate and add metadata columns to the schema + options.validate_metadata_cols(&file_schema)?; + for col in &options.metadata_cols { + builder.push(Field::new( + col, + ListingOptions::metadata_column_type(col)?, + false, + )); + } + let table_schema = Arc::new( builder .finish() @@ -786,16 +873,38 @@ impl ListingTable { fn try_create_output_ordering(&self) -> Result> { create_ordering(&self.table_schema, &self.options.file_sort_order) } + + fn partition_column_fields(&self) -> Result> { + self.options + .table_partition_cols + .iter() + .map(|col| &col.0) + .chain(self.options.metadata_cols.iter()) + .map(|col| Ok(self.table_schema.field_with_name(col)?)) + .collect::>>() + } + + fn partition_column_names(&self) -> Result> { + Ok(self + .options + .table_partition_cols + .iter() + .map(|col| col.0.as_str())) + } + + fn metadata_column_names(&self) -> impl Iterator { + self.options.metadata_cols.iter().map(|col| col.as_str()) + } } -// Expressions can be used for parttion pruning if they can be evaluated using -// only the partiton columns and there are partition columns. -fn can_be_evaluted_for_partition_pruning( - partition_column_names: &[&str], +// Expressions can be used for extended columns (partition/metadata) pruning if they can be evaluated using +// only the extended columns and there are extended columns. +fn can_be_evaluated_for_extended_col_pruning( + extended_column_names: &[&str], expr: &Expr, ) -> bool { - !partition_column_names.is_empty() - && expr_applicable_for_cols(partition_column_names, expr) + !extended_column_names.is_empty() + && expr_applicable_for_cols(extended_column_names, expr) } #[async_trait] @@ -823,23 +932,20 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { - // extract types of partition columns - let table_partition_cols = self - .options - .table_partition_cols - .iter() - .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) - .collect::>>()?; - - let table_partition_col_names = table_partition_cols - .iter() - .map(|field| field.name().as_str()) + // extract types of extended columns (partition + metadata columns) + let extended_table_col_names = self + .partition_column_names()? + .chain(self.metadata_column_names()) .collect::>(); - // If the filters can be resolved using only partition cols, there is no need to - // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated + + // If the filters can be resolved using only partition/metadata cols, there is no need to + // push it down to the TableScan, otherwise, `unhandled` pruning predicates will be generated let (partition_filters, filters): (Vec<_>, Vec<_>) = filters.iter().cloned().partition(|filter| { - can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter) + can_be_evaluated_for_extended_col_pruning( + &extended_table_col_names, + filter, + ) }); // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here? let session_state = state.as_any().downcast_ref::().unwrap(); @@ -899,6 +1005,12 @@ impl TableProvider for ListingTable { return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); }; + let table_partition_cols = self + .partition_column_fields()? + .into_iter() + .cloned() + .collect(); + // create the execution plan self.options .format @@ -920,18 +1032,19 @@ impl TableProvider for ListingTable { &self, filters: &[&Expr], ) -> Result> { - let partition_column_names = self - .options - .table_partition_cols - .iter() - .map(|col| col.0.as_str()) + let extended_column_names = self + .partition_column_names()? + .chain(self.metadata_column_names()) .collect::>(); + filters .iter() .map(|filter| { - if can_be_evaluted_for_partition_pruning(&partition_column_names, filter) - { - // if filter can be handled by partition pruning, it is exact + if can_be_evaluated_for_extended_col_pruning( + &extended_column_names, + filter, + ) { + // if filter can be handled by pruning from the extended columns, it is exact return Ok(TableProviderFilterPushDown::Exact); } From 371adb3683bd356d859f7330a55f47d4c20e215e Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Tue, 25 Feb 2025 17:14:18 +0900 Subject: [PATCH 2/8] Metadata filtering working --- .../core/src/datasource/listing/helpers.rs | 73 ++++--- .../core/src/datasource/listing/metadata.rs | 185 ++++++++++++++++++ datafusion/core/src/datasource/listing/mod.rs | 1 + .../core/src/datasource/listing/table.rs | 82 ++++---- 4 files changed, 282 insertions(+), 59 deletions(-) create mode 100644 datafusion/core/src/datasource/listing/metadata.rs diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 47012f777ad1..5fa39f681360 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -29,7 +29,8 @@ use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{BinaryExpr, Operator}; use arrow::{ - array::{Array, ArrayRef, AsArray, StringBuilder}, + array::{Array, ArrayRef, AsArray, BooleanArray, StringBuilder}, + buffer::BooleanBuffer, compute::{and, cast, prep_null_mask_filter}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, @@ -272,26 +273,59 @@ async fn prune_partitions( .collect(); let schema = Arc::new(Schema::new(fields)); - let df_schema = DFSchema::from_unqualified_fields( - partition_cols - .iter() - .map(|(n, d)| Field::new(n, d.clone(), true)) - .collect(), - Default::default(), - )?; - let batch = RecordBatch::try_new(schema, arrays)?; // TODO: Plumb this down let props = ExecutionProps::new(); + // Don't retain partitions that evaluated to null + let prepared = apply_filters(&batch, filters, &props)?; + + // If all rows are retained, return all partitions + if prepared.true_count() == prepared.len() { + return Ok(partitions); + } + + // Sanity check + assert_eq!(prepared.len(), partitions.len()); + + let filtered = partitions + .into_iter() + .zip(prepared.values()) + .filter_map(|(p, f)| f.then_some(p)) + .collect(); + + Ok(filtered) +} + +/// Applies the given filters to the input batch and returns a boolean mask that represents +/// the result of the filters applied to each row. +pub(crate) fn apply_filters( + batch: &RecordBatch, + filters: &[Expr], + props: &ExecutionProps, +) -> Result { + if filters.is_empty() { + return Ok(BooleanArray::new( + BooleanBuffer::new_set(batch.num_rows()), + None, + )); + } + + let num_rows = batch.num_rows(); + + let df_schema = DFSchema::from_unqualified_fields( + batch.schema().fields().clone(), + HashMap::default(), + )?; + // Applies `filter` to `batch` returning `None` on error let do_filter = |filter| -> Result { - let expr = create_physical_expr(filter, &df_schema, &props)?; - expr.evaluate(&batch)?.into_array(partitions.len()) + let expr = create_physical_expr(filter, &df_schema, props)?; + expr.evaluate(batch)?.into_array(num_rows) }; - //.Compute the conjunction of the filters + // Compute the conjunction of the filters let mask = filters .iter() .map(|f| do_filter(f).map(|a| a.as_boolean().clone())) @@ -300,25 +334,16 @@ async fn prune_partitions( let mask = match mask { Some(Ok(mask)) => mask, Some(Err(err)) => return Err(err), - None => return Ok(partitions), + None => return Ok(BooleanArray::new(BooleanBuffer::new_set(num_rows), None)), }; - // Don't retain partitions that evaluated to null + // Don't retain rows that evaluated to null let prepared = match mask.null_count() { 0 => mask, _ => prep_null_mask_filter(&mask), }; - // Sanity check - assert_eq!(prepared.len(), partitions.len()); - - let filtered = partitions - .into_iter() - .zip(prepared.values()) - .filter_map(|(p, f)| f.then_some(p)) - .collect(); - - Ok(filtered) + Ok(prepared) } #[derive(Debug)] diff --git a/datafusion/core/src/datasource/listing/metadata.rs b/datafusion/core/src/datasource/listing/metadata.rs new file mode 100644 index 000000000000..2a98186a6610 --- /dev/null +++ b/datafusion/core/src/datasource/listing/metadata.rs @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functions that support extracting metadata from files. + +use std::fmt; +use std::str::FromStr; +use std::sync::Arc; + +use super::PartitionedFile; +use crate::datasource::listing::helpers::apply_filters; +use datafusion_common::plan_err; +use datafusion_common::Result; + +use arrow::{ + array::{Array, StringBuilder, TimestampMicrosecondBuilder, UInt64Builder}, + datatypes::{DataType, Field, Schema, TimeUnit}, + record_batch::RecordBatch, +}; +use arrow_schema::Fields; +use datafusion_common::ScalarValue; +use datafusion_expr::execution_props::ExecutionProps; + +use datafusion_common::DataFusionError; +use datafusion_expr::Expr; +use object_store::ObjectMeta; + +pub(crate) enum MetadataColumn { + Location, + LastModified, + Size, +} + +impl fmt::Display for MetadataColumn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + MetadataColumn::Location => write!(f, "location"), + MetadataColumn::LastModified => write!(f, "last_modified"), + MetadataColumn::Size => write!(f, "size"), + } + } +} + +pub(crate) enum MetadataBuilder { + Location(StringBuilder), + LastModified(TimestampMicrosecondBuilder), + Size(UInt64Builder), +} + +impl MetadataBuilder { + pub fn append(&mut self, meta: &ObjectMeta) { + match self { + Self::Location(builder) => builder.append_value(&meta.location), + Self::LastModified(builder) => { + builder.append_value(meta.last_modified.timestamp_micros()) + } + Self::Size(builder) => builder.append_value(meta.size as u64), + } + } + + pub fn finish(self) -> Arc { + match self { + MetadataBuilder::Location(mut builder) => Arc::new(builder.finish()), + MetadataBuilder::LastModified(mut builder) => Arc::new(builder.finish()), + MetadataBuilder::Size(mut builder) => Arc::new(builder.finish()), + } + } +} + +impl MetadataColumn { + pub fn arrow_type(&self) -> DataType { + match self { + MetadataColumn::Location => DataType::Utf8, + MetadataColumn::LastModified => { + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) + } + MetadataColumn::Size => DataType::UInt64, + } + } + + pub fn builder(&self, capacity: usize) -> MetadataBuilder { + match self { + MetadataColumn::Location => MetadataBuilder::Location( + StringBuilder::with_capacity(capacity, capacity * 10), + ), + MetadataColumn::LastModified => MetadataBuilder::LastModified( + TimestampMicrosecondBuilder::with_capacity(capacity).with_data_type( + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + ), + ), + MetadataColumn::Size => { + MetadataBuilder::Size(UInt64Builder::with_capacity(capacity)) + } + } + } + + pub fn to_scalar_value(&self, meta: &ObjectMeta) -> ScalarValue { + match self { + MetadataColumn::Location => { + ScalarValue::Utf8(Some(meta.location.to_string())) + } + MetadataColumn::LastModified => ScalarValue::TimestampMicrosecond( + Some(meta.last_modified.timestamp_micros()), + Some("UTC".into()), + ), + MetadataColumn::Size => ScalarValue::UInt64(Some(meta.size as u64)), + } + } +} + +impl FromStr for MetadataColumn { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "location" => Ok(MetadataColumn::Location), + "last_modified" => Ok(MetadataColumn::LastModified), + "size" => Ok(MetadataColumn::Size), + _ => plan_err!( + "Invalid metadata column: {}, expected: location, last_modified, or size", + s + ), + } + } +} + +/// Determine if the given file matches the input metadata filters. +/// `filters` should only contain expressions that can be evaluated +/// using only the metadata columns. +pub(crate) fn apply_metadata_filters( + file: PartitionedFile, + filters: &[Expr], + metadata_cols: &[MetadataColumn], +) -> Result> { + // if no metadata col => simply return all the files + if metadata_cols.is_empty() { + return Ok(Some(file)); + } + + let mut builders: Vec<_> = metadata_cols.iter().map(|col| col.builder(1)).collect(); + + for builder in builders.iter_mut() { + builder.append(&file.object_meta); + } + + let arrays = builders + .into_iter() + .map(|builder| builder.finish()) + .collect::>(); + + let fields: Fields = metadata_cols + .iter() + .map(|col| Field::new(col.to_string(), col.arrow_type(), true)) + .collect(); + let schema = Arc::new(Schema::new(fields)); + + let batch = RecordBatch::try_new(schema, arrays)?; + + // TODO: Plumb this down + let props = ExecutionProps::new(); + + // Don't retain rows that evaluated to null + let prepared = apply_filters(&batch, filters, &props)?; + + // If the filter evaluates to true, return the file + if prepared.true_count() == 1 { + return Ok(Some(file)); + } + + Ok(None) +} diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index c5a441aacf1d..e9dbd17dc102 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -19,6 +19,7 @@ //! to get the list of files to process. mod helpers; +mod metadata; mod table; mod url; diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 2f450597d7e1..6f971541982d 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::{any::Any, str::FromStr, sync::Arc}; use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; +use super::metadata::{apply_metadata_filters, MetadataColumn}; use super::{ListingTableUrl, PartitionedFile}; use crate::datasource::{ @@ -40,7 +41,7 @@ use datafusion_expr::{SortExpr, TableType}; use datafusion_physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}; use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; -use arrow_schema::{Schema, TimeUnit}; +use arrow_schema::Schema; use datafusion_common::{ config_datafusion_err, internal_err, plan_err, project_schema, Constraints, SchemaExt, ToDFSchema, @@ -59,8 +60,6 @@ use futures::{future, stream, StreamExt, TryStreamExt}; use itertools::Itertools; use object_store::ObjectStore; -const METADATA_COLUMNS: [&str; 3] = ["location", "last_modified", "size"]; - /// Configuration for creating a [`ListingTable`] #[derive(Debug, Clone)] pub struct ListingTableConfig { @@ -535,13 +534,7 @@ impl ListingOptions { return plan_err!("Column {} already exists in schema", col); } - if !METADATA_COLUMNS.contains(&col.as_str()) { - return plan_err!( - "Metadata column {} invalid; must be one of {:?}", - col, - METADATA_COLUMNS - ); - } + let _ = MetadataColumn::from_str(col)?; } Ok(()) @@ -652,15 +645,6 @@ impl ListingOptions { } } } - - pub(crate) fn metadata_column_type(col: &str) -> Result { - match col { - "location" => Ok(DataType::Utf8), - "last_modified" => Ok(DataType::Timestamp(TimeUnit::Nanosecond, None)), - "size" => Ok(DataType::UInt64), - _ => plan_err!("Invalid metadata column: {}", col), - } - } } /// Reads data from one or more files as a single table. @@ -801,7 +785,7 @@ impl ListingTable { for col in &options.metadata_cols { builder.push(Field::new( col, - ListingOptions::metadata_column_type(col)?, + MetadataColumn::from_str(col)?.arrow_type(), false, )); } @@ -933,24 +917,28 @@ impl TableProvider for ListingTable { limit: Option, ) -> Result> { // extract types of extended columns (partition + metadata columns) - let extended_table_col_names = self - .partition_column_names()? - .chain(self.metadata_column_names()) - .collect::>(); + let partition_col_names = self.partition_column_names()?.collect::>(); + let metadata_col_names = self.metadata_column_names().collect::>(); // If the filters can be resolved using only partition/metadata cols, there is no need to // push it down to the TableScan, otherwise, `unhandled` pruning predicates will be generated let (partition_filters, filters): (Vec<_>, Vec<_>) = filters.iter().cloned().partition(|filter| { - can_be_evaluated_for_extended_col_pruning( - &extended_table_col_names, - filter, - ) + can_be_evaluated_for_extended_col_pruning(&partition_col_names, filter) + }); + let (metadata_filters, filters): (Vec<_>, Vec<_>) = + filters.iter().cloned().partition(|filter| { + can_be_evaluated_for_extended_col_pruning(&metadata_col_names, filter) }); // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here? let session_state = state.as_any().downcast_ref::().unwrap(); let (mut partitioned_file_lists, statistics) = self - .list_files_for_scan(session_state, &partition_filters, limit) + .list_files_for_scan( + session_state, + &partition_filters, + &metadata_filters, + limit, + ) .await?; // if no files need to be read, return an `EmptyExec` @@ -988,7 +976,7 @@ impl TableProvider for ListingTable { let filters = conjunction(filters.to_vec()) .map(|expr| -> Result<_> { - // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns. + // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition/metadata columns. let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?; let filters = create_physical_expr( &expr, @@ -1174,7 +1162,8 @@ impl ListingTable { async fn list_files_for_scan<'a>( &'a self, ctx: &'a SessionState, - filters: &'a [Expr], + partition_filters: &'a [Expr], + metadata_filters: &'a [Expr], limit: Option, ) -> Result<(Vec>, Statistics)> { let store = if let Some(url) = self.table_paths.first() { @@ -1188,17 +1177,36 @@ impl ListingTable { ctx, store.as_ref(), table_path, - filters, + partition_filters, &self.options.file_extension, &self.options.table_partition_cols, ) })) .await?; let file_list = stream::iter(file_list).flatten(); - // collect the statistics if required by the config + + let metadata_cols = self + .metadata_column_names() + .map(MetadataColumn::from_str) + .collect::>>()?; + + // collect the statistics if required by the config + filter out files that don't match the metadata filters let files = file_list + .filter_map(|par_file| async { + if metadata_cols.is_empty() { + return Some(par_file); + } + + let Ok(par_file) = par_file else { + return Some(par_file); + }; + + apply_metadata_filters(par_file, metadata_filters, &metadata_cols) + .transpose() + }) .map(|part_file| async { let part_file = part_file?; + if self.options.collect_stat { let statistics = self.do_collect_statistics(ctx, &store, &part_file).await?; @@ -1722,7 +1730,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let (file_list, _) = table + .list_files_for_scan(&ctx.state(), &[], &[], None) + .await?; assert_eq!(file_list.len(), output_partitioning); @@ -1759,7 +1769,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let (file_list, _) = table + .list_files_for_scan(&ctx.state(), &[], &[], None) + .await?; assert_eq!(file_list.len(), output_partitioning); From 226ce8f2cb2596fa7348ac5fa1c076613c051fcd Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Tue, 25 Feb 2025 17:31:20 +0900 Subject: [PATCH 3/8] Working on plumbing to file scan config --- .../core/src/datasource/listing/metadata.rs | 92 +++++++++++-------- datafusion/core/src/datasource/listing/mod.rs | 1 + .../physical_plan/file_scan_config.rs | 64 ++++++++++--- 3 files changed, 105 insertions(+), 52 deletions(-) diff --git a/datafusion/core/src/datasource/listing/metadata.rs b/datafusion/core/src/datasource/listing/metadata.rs index 2a98186a6610..212df4937eef 100644 --- a/datafusion/core/src/datasource/listing/metadata.rs +++ b/datafusion/core/src/datasource/listing/metadata.rs @@ -39,9 +39,14 @@ use datafusion_common::DataFusionError; use datafusion_expr::Expr; use object_store::ObjectMeta; -pub(crate) enum MetadataColumn { +/// A metadata column that can be used to filter files +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum MetadataColumn { + /// The location of the file in object store Location, + /// The last modified timestamp of the file LastModified, + /// The size of the file in bytes Size, } @@ -55,44 +60,38 @@ impl fmt::Display for MetadataColumn { } } -pub(crate) enum MetadataBuilder { - Location(StringBuilder), - LastModified(TimestampMicrosecondBuilder), - Size(UInt64Builder), -} - -impl MetadataBuilder { - pub fn append(&mut self, meta: &ObjectMeta) { +impl MetadataColumn { + /// Returns the arrow type of this metadata column + pub fn arrow_type(&self) -> DataType { match self { - Self::Location(builder) => builder.append_value(&meta.location), - Self::LastModified(builder) => { - builder.append_value(meta.last_modified.timestamp_micros()) + MetadataColumn::Location => DataType::Utf8, + MetadataColumn::LastModified => { + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) } - Self::Size(builder) => builder.append_value(meta.size as u64), + MetadataColumn::Size => DataType::UInt64, } } - pub fn finish(self) -> Arc { - match self { - MetadataBuilder::Location(mut builder) => Arc::new(builder.finish()), - MetadataBuilder::LastModified(mut builder) => Arc::new(builder.finish()), - MetadataBuilder::Size(mut builder) => Arc::new(builder.finish()), - } + /// Returns the arrow field for this metadata column + pub fn field(&self) -> Field { + Field::new(self.to_string(), self.arrow_type(), true) } -} -impl MetadataColumn { - pub fn arrow_type(&self) -> DataType { + /// Returns the scalar value for this metadata column given an object meta + pub fn to_scalar_value(&self, meta: &ObjectMeta) -> ScalarValue { match self { - MetadataColumn::Location => DataType::Utf8, - MetadataColumn::LastModified => { - DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) + MetadataColumn::Location => { + ScalarValue::Utf8(Some(meta.location.to_string())) } - MetadataColumn::Size => DataType::UInt64, + MetadataColumn::LastModified => ScalarValue::TimestampMicrosecond( + Some(meta.last_modified.timestamp_micros()), + Some("UTC".into()), + ), + MetadataColumn::Size => ScalarValue::UInt64(Some(meta.size as u64)), } } - pub fn builder(&self, capacity: usize) -> MetadataBuilder { + pub(crate) fn builder(&self, capacity: usize) -> MetadataBuilder { match self { MetadataColumn::Location => MetadataBuilder::Location( StringBuilder::with_capacity(capacity, capacity * 10), @@ -107,19 +106,6 @@ impl MetadataColumn { } } } - - pub fn to_scalar_value(&self, meta: &ObjectMeta) -> ScalarValue { - match self { - MetadataColumn::Location => { - ScalarValue::Utf8(Some(meta.location.to_string())) - } - MetadataColumn::LastModified => ScalarValue::TimestampMicrosecond( - Some(meta.last_modified.timestamp_micros()), - Some("UTC".into()), - ), - MetadataColumn::Size => ScalarValue::UInt64(Some(meta.size as u64)), - } - } } impl FromStr for MetadataColumn { @@ -138,6 +124,32 @@ impl FromStr for MetadataColumn { } } +pub(crate) enum MetadataBuilder { + Location(StringBuilder), + LastModified(TimestampMicrosecondBuilder), + Size(UInt64Builder), +} + +impl MetadataBuilder { + pub fn append(&mut self, meta: &ObjectMeta) { + match self { + Self::Location(builder) => builder.append_value(&meta.location), + Self::LastModified(builder) => { + builder.append_value(meta.last_modified.timestamp_micros()) + } + Self::Size(builder) => builder.append_value(meta.size as u64), + } + } + + pub fn finish(self) -> Arc { + match self { + MetadataBuilder::Location(mut builder) => Arc::new(builder.finish()), + MetadataBuilder::LastModified(mut builder) => Arc::new(builder.finish()), + MetadataBuilder::Size(mut builder) => Arc::new(builder.finish()), + } + } +} + /// Determine if the given file matches the input metadata filters. /// `filters` should only contain expressions that can be evaluated /// using only the metadata columns. diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index e9dbd17dc102..696eeb273421 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -32,6 +32,7 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; +pub use metadata::MetadataColumn; pub use table::{ListingOptions, ListingTable, ListingTableConfig}; /// Stream of files get listed from object store diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 74ab0126a557..9828c4642dd0 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -24,7 +24,10 @@ use std::{ }; use super::{get_projected_output_ordering, statistics::MinMaxStatistics}; -use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; +use crate::datasource::{ + listing::{MetadataColumn, PartitionedFile}, + object_store::ObjectStoreUrl, +}; use crate::{error::Result, scalar::ScalarValue}; use arrow::array::{ArrayData, BufferBuilder}; @@ -126,6 +129,8 @@ pub struct FileScanConfig { pub limit: Option, /// The partitioning columns pub table_partition_cols: Vec, + /// The metadata columns + pub metadata_cols: Vec, /// All equivalent lexicographical orderings that describe the schema. pub output_ordering: Vec, } @@ -151,6 +156,7 @@ impl FileScanConfig { projection: None, limit: None, table_partition_cols: vec![], + metadata_cols: vec![], output_ordering: vec![], } } @@ -205,6 +211,12 @@ impl FileScanConfig { self } + /// Set the metadata columns of the files + pub fn with_metadata_cols(mut self, metadata_cols: Vec) -> Self { + self.metadata_cols = metadata_cols; + self + } + /// Set the output ordering of the files pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { self.output_ordering = output_ordering; @@ -224,23 +236,19 @@ impl FileScanConfig { let proj_iter: Box> = match &self.projection { Some(proj) => Box::new(proj.iter().copied()), None => Box::new( - 0..(self.file_schema.fields().len() + self.table_partition_cols.len()), + 0..(self.file_schema.fields().len() + + self.table_partition_cols.len() + + self.metadata_cols.len()), ), }; let mut table_fields = vec![]; let mut table_cols_stats = vec![]; + for idx in proj_iter { - if idx < self.file_schema.fields().len() { - let field = self.file_schema.field(idx); - table_fields.push(field.clone()); - table_cols_stats.push(self.statistics.column_statistics[idx].clone()) - } else { - let partition_idx = idx - self.file_schema.fields().len(); - table_fields.push(self.table_partition_cols[partition_idx].to_owned()); - // TODO provide accurate stat for partition column (#1186) - table_cols_stats.push(ColumnStatistics::new_unknown()) - } + let (field, stats) = self.get_field_and_stats(idx); + table_fields.push(field); + table_cols_stats.push(stats); } let table_stats = Statistics { @@ -261,6 +269,38 @@ impl FileScanConfig { (projected_schema, table_stats, projected_output_ordering) } + /// Helper function to get field and statistics for a given index + fn get_field_and_stats(&self, idx: usize) -> (Field, ColumnStatistics) { + let file_schema_len = self.file_schema.fields().len(); + let partition_cols_len = self.table_partition_cols.len(); + + match idx { + // File schema columns + i if i < file_schema_len => ( + self.file_schema.field(i).clone(), + self.statistics.column_statistics[i].clone(), + ), + + // Partition columns + i if i < file_schema_len + partition_cols_len => { + let partition_idx = i - file_schema_len; + ( + self.table_partition_cols[partition_idx].to_owned(), + ColumnStatistics::new_unknown(), + ) + } + + // Metadata columns + i => { + let metadata_idx = i - file_schema_len - partition_cols_len; + ( + self.metadata_cols[metadata_idx].field(), + ColumnStatistics::new_unknown(), + ) + } + } + } + #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro pub(crate) fn projected_file_column_names(&self) -> Option> { self.projection.as_ref().map(|p| { From 34363f215968a06244d39cdecb54567de5c06e71 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Tue, 25 Feb 2025 23:24:18 +0900 Subject: [PATCH 4/8] wip --- .../core/src/datasource/listing/metadata.rs | 14 ++- .../core/src/datasource/listing/table.rs | 8 +- .../physical_plan/file_scan_config.rs | 102 ++++++++++++++---- .../datasource/physical_plan/file_stream.rs | 4 +- 4 files changed, 99 insertions(+), 29 deletions(-) diff --git a/datafusion/core/src/datasource/listing/metadata.rs b/datafusion/core/src/datasource/listing/metadata.rs index 212df4937eef..00d6038a0a49 100644 --- a/datafusion/core/src/datasource/listing/metadata.rs +++ b/datafusion/core/src/datasource/listing/metadata.rs @@ -52,15 +52,19 @@ pub enum MetadataColumn { impl fmt::Display for MetadataColumn { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - MetadataColumn::Location => write!(f, "location"), - MetadataColumn::LastModified => write!(f, "last_modified"), - MetadataColumn::Size => write!(f, "size"), - } + write!(f, "{}", self.name()) } } impl MetadataColumn { + pub fn name(&self) -> &str { + match self { + MetadataColumn::Location => "location", + MetadataColumn::LastModified => "last_modified", + MetadataColumn::Size => "size", + } + } + /// Returns the arrow type of this metadata column pub fn arrow_type(&self) -> DataType { match self { diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 6f971541982d..e008bbfac119 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -999,6 +999,11 @@ impl TableProvider for ListingTable { .cloned() .collect(); + let metadata_cols = self + .metadata_column_names() + .filter_map(|c| MetadataColumn::from_str(c).ok()) + .collect::>(); + // create the execution plan self.options .format @@ -1010,7 +1015,8 @@ impl TableProvider for ListingTable { .with_projection(projection.cloned()) .with_limit(limit) .with_output_ordering(output_ordering) - .with_table_partition_cols(table_partition_cols), + .with_table_partition_cols(table_partition_cols) + .with_metadata_cols(metadata_cols), filters.as_ref(), ) .await diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 9828c4642dd0..370c16b2bcc8 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -20,7 +20,7 @@ use std::{ borrow::Cow, collections::HashMap, fmt::Debug, marker::PhantomData, mem::size_of, - sync::Arc, vec, + str::FromStr, sync::Arc, vec, }; use super::{get_projected_output_ordering, statistics::MinMaxStatistics}; @@ -41,6 +41,7 @@ use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr_common::sort_expr::LexOrderingRef; use log::warn; +use object_store::ObjectMeta; /// Convert type to a type suitable for use as a [`ListingTable`] /// partition column. Returns `Dictionary(UInt16, val_type)`, which is @@ -122,7 +123,7 @@ pub struct FileScanConfig { /// Defaults to [`Statistics::new_unknown`]. pub statistics: Statistics, /// Columns on which to project the data. Indexes that are higher than the - /// number of columns of `file_schema` refer to `table_partition_cols`. + /// number of columns of `file_schema` refer to `table_partition_cols` and then `metadata_cols`. pub projection: Option>, /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. @@ -312,7 +313,7 @@ impl FileScanConfig { }) } - /// Projects only file schema, ignoring partition columns + /// Projects only file schema, ignoring partition/metadata columns pub(crate) fn projected_file_schema(&self) -> SchemaRef { let fields = self.file_column_projection_indices().map(|indices| { indices @@ -408,13 +409,13 @@ impl FileScanConfig { } } -/// A helper that projects partition columns into the file record batches. +/// A helper that projects extended (i.e. partition/metadata) columns into the file record batches. /// /// One interesting trick is the usage of a cache for the key buffers of the partition column /// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them /// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches, /// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count). -pub struct PartitionColumnProjector { +pub struct ExtendedColumnProjector { /// An Arrow buffer initialized to zeros that represents the key array of all partition /// columns (partition columns are materialized by dictionary arrays with only one /// value in the dictionary, thus all the keys are equal to zero). @@ -423,15 +424,21 @@ pub struct PartitionColumnProjector { /// schema. Sorted by index in the target schema so that we can iterate on it to /// insert the partition columns in the target record batch. projected_partition_indexes: Vec<(usize, usize)>, + /// Similar to `projected_partition_indexes` but only stores the indexes in the target schema + projected_metadata_indexes: Vec, /// The schema of the table once the projection was applied. projected_schema: SchemaRef, } -impl PartitionColumnProjector { - // Create a projector to insert the partitioning columns into batches read from files - // - `projected_schema`: the target schema with both file and partitioning columns +impl ExtendedColumnProjector { + // Create a projector to insert the partitioning/metadata columns into batches read from files + // - `projected_schema`: the target schema with file, partitioning and metadata columns // - `table_partition_cols`: all the partitioning column names - pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self { + pub fn new( + projected_schema: SchemaRef, + table_partition_cols: &[String], + metadata_cols: &[MetadataColumn], + ) -> Self { let mut idx_map = HashMap::new(); for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() { if let Ok(schema_idx) = projected_schema.index_of(partition_name) { @@ -442,25 +449,40 @@ impl PartitionColumnProjector { let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect(); projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b)); + let mut projected_metadata_indexes = vec![]; + for metadata_name in metadata_cols.iter() { + if let Ok(schema_idx) = projected_schema.index_of(metadata_name.name()) { + projected_metadata_indexes.push(schema_idx); + } + } + + projected_metadata_indexes.sort_by(|a, b| a.cmp(b)); + Self { - projected_partition_indexes, key_buffer_cache: Default::default(), + projected_partition_indexes, + projected_metadata_indexes, projected_schema, } } - // Transform the batch read from the file by inserting the partitioning columns + // Transform the batch read from the file by inserting both partitioning and metadata columns // to the right positions as deduced from `projected_schema` // - `file_batch`: batch read from the file, with internal projection applied // - `partition_values`: the list of partition values, one for each partition column + // - `metadata`: the metadata of the file containing information like location, size, etc. pub fn project( &mut self, file_batch: RecordBatch, partition_values: &[ScalarValue], + metadata: &ObjectMeta, ) -> Result { - let expected_cols = - self.projected_schema.fields().len() - self.projected_partition_indexes.len(); + // Calculate expected number of columns from the file (excluding partition and metadata columns) + let expected_cols = self.projected_schema.fields().len() + - self.projected_partition_indexes.len() + - self.projected_metadata_indexes.len(); + // Verify the file batch has the expected number of columns if file_batch.columns().len() != expected_cols { return exec_err!( "Unexpected batch schema from file, expected {} cols but got {}", @@ -469,18 +491,21 @@ impl PartitionColumnProjector { ); } + // Start with the columns from the file batch let mut cols = file_batch.columns().to_vec(); + + // Insert partition columns for &(pidx, sidx) in &self.projected_partition_indexes { - let p_value = - partition_values - .get(pidx) - .ok_or(DataFusionError::Execution( - "Invalid partitioning found on disk".to_string(), - ))?; + // Get the partition value from the provided values + let p_value = partition_values.get(pidx).ok_or_else(|| { + DataFusionError::Execution( + "Invalid partitioning found on disk".to_string(), + ) + })?; let mut partition_value = Cow::Borrowed(p_value); - // check if user forgot to dict-encode the partition value + // Check if user forgot to dict-encode the partition value and apply auto-fix if needed let field = self.projected_schema.field(sidx); let expected_data_type = field.data_type(); let actual_data_type = partition_value.data_type(); @@ -494,6 +519,7 @@ impl PartitionColumnProjector { } } + // Create array and insert at the correct schema position cols.insert( sidx, create_output_array( @@ -501,9 +527,25 @@ impl PartitionColumnProjector { partition_value.as_ref(), file_batch.num_rows(), )?, - ) + ); } + // Insert metadata columns + for &sidx in &self.projected_metadata_indexes { + // Get the metadata column type from the field name + let field_name = self.projected_schema.field(sidx).name(); + let metadata_col = MetadataColumn::from_str(field_name).map_err(|e| { + DataFusionError::Execution(format!("Invalid metadata column: {}", e)) + })?; + + // Convert metadata to scalar value based on the column type + let scalar_value = metadata_col.to_scalar_value(metadata); + + // Create array and insert at the correct schema position + cols.insert(sidx, scalar_value.to_array_of_size(file_batch.num_rows())?); + } + + // Create a new record batch with all columns in the correct order RecordBatch::try_new_with_options( Arc::clone(&self.projected_schema), cols, @@ -659,10 +701,21 @@ fn create_output_array( #[cfg(test)] mod tests { use arrow_array::Int32Array; + use object_store::path::Path; use super::*; use crate::{test::columns, test_util::aggr_test_schema}; + fn test_object_meta() -> ObjectMeta { + ObjectMeta { + location: Path::from("test"), + size: 100, + last_modified: chrono::Utc::now(), + e_tag: None, + version: None, + } + } + #[test] fn physical_plan_config_no_projection() { let file_schema = aggr_test_schema(); @@ -804,12 +857,13 @@ mod tests { ); let (proj_schema, ..) = conf.project(); // created a projector for that projected schema - let mut proj = PartitionColumnProjector::new( + let mut proj = ExtendedColumnProjector::new( proj_schema, &partition_cols .iter() .map(|x| x.0.clone()) .collect::>(), + &[], ); // project first batch @@ -822,6 +876,7 @@ mod tests { wrap_partition_value_in_dict(ScalarValue::from("10")), wrap_partition_value_in_dict(ScalarValue::from("26")), ], + &ObjectMeta::default(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ @@ -850,6 +905,7 @@ mod tests { wrap_partition_value_in_dict(ScalarValue::from("10")), wrap_partition_value_in_dict(ScalarValue::from("27")), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ @@ -880,6 +936,7 @@ mod tests { wrap_partition_value_in_dict(ScalarValue::from("10")), wrap_partition_value_in_dict(ScalarValue::from("28")), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ @@ -908,6 +965,7 @@ mod tests { ScalarValue::from("10"), ScalarValue::from("26"), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 6f354b31ae87..e2badc7cd982 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -45,6 +45,8 @@ use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; +use super::file_scan_config::ExtendedColumnProjector; + /// A fallible future that resolves to a stream of [`RecordBatch`] pub type FileOpenFuture = BoxFuture<'static, Result>>>; @@ -251,7 +253,7 @@ impl FileStream { metrics: &ExecutionPlanMetricsSet, ) -> Result { let (projected_schema, ..) = config.project(); - let pc_projector = PartitionColumnProjector::new( + let pc_projector = ExtendedColumnProjector::new( projected_schema.clone(), &config .table_partition_cols From 86bb24dde64a05a421c96405d76599ba7d7a14f7 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Wed, 26 Feb 2025 09:07:51 +0900 Subject: [PATCH 5/8] All wired up --- .../core/src/datasource/listing/metadata.rs | 1 + .../physical_plan/file_scan_config.rs | 5 +- .../datasource/physical_plan/file_stream.rs | 61 +++++++++++++------ 3 files changed, 44 insertions(+), 23 deletions(-) diff --git a/datafusion/core/src/datasource/listing/metadata.rs b/datafusion/core/src/datasource/listing/metadata.rs index 00d6038a0a49..30ce70083546 100644 --- a/datafusion/core/src/datasource/listing/metadata.rs +++ b/datafusion/core/src/datasource/listing/metadata.rs @@ -57,6 +57,7 @@ impl fmt::Display for MetadataColumn { } impl MetadataColumn { + /// The name of the metadata column (one of `location`, `last_modified`, or `size`) pub fn name(&self) -> &str { match self { MetadataColumn::Location => "location", diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 370c16b2bcc8..6c813ee08982 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -434,6 +434,7 @@ impl ExtendedColumnProjector { // Create a projector to insert the partitioning/metadata columns into batches read from files // - `projected_schema`: the target schema with file, partitioning and metadata columns // - `table_partition_cols`: all the partitioning column names + // - `metadata_cols`: all the metadata column names pub fn new( projected_schema: SchemaRef, table_partition_cols: &[String], @@ -456,8 +457,6 @@ impl ExtendedColumnProjector { } } - projected_metadata_indexes.sort_by(|a, b| a.cmp(b)); - Self { key_buffer_cache: Default::default(), projected_partition_indexes, @@ -876,7 +875,7 @@ mod tests { wrap_partition_value_in_dict(ScalarValue::from("10")), wrap_partition_value_in_dict(ScalarValue::from("26")), ], - &ObjectMeta::default(), + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index e2badc7cd982..91712efbebca 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -27,7 +27,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use crate::datasource::listing::PartitionedFile; -use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector; +use crate::datasource::physical_plan::file_scan_config::ExtendedColumnProjector; use crate::datasource::physical_plan::{FileMeta, FileScanConfig}; use crate::error::Result; use crate::physical_plan::metrics::{ @@ -44,8 +44,7 @@ use datafusion_common::ScalarValue; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; - -use super::file_scan_config::ExtendedColumnProjector; +use object_store::ObjectMeta; /// A fallible future that resolves to a stream of [`RecordBatch`] pub type FileOpenFuture = @@ -87,8 +86,8 @@ pub struct FileStream { /// A generic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`], /// which can be resolved to a stream of `RecordBatch`. file_opener: F, - /// The partition column projector - pc_projector: PartitionColumnProjector, + /// The extended (partitioning + metadata) column projector + col_projector: ExtendedColumnProjector, /// The stream state state: FileStreamState, /// File stream specific metrics @@ -117,19 +116,23 @@ enum FileStreamState { future: FileOpenFuture, /// The partition values for this file partition_values: Vec, + /// The object metadata for this file + object_meta: ObjectMeta, }, /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] /// returned by [`FileOpener::open`] Scan { /// Partitioning column values for the current batch_iter partition_values: Vec, + /// The object metadata for the current file + object_meta: ObjectMeta, /// The reader instance reader: BoxStream<'static, Result>, /// A [`FileOpenFuture`] for the next file to be processed, /// and its corresponding partition column values, if any. /// This allows the next file to be opened in parallel while the /// current file is read. - next: Option<(NextOpen, Vec)>, + next: Option<(NextOpen, Vec, ObjectMeta)>, }, /// Encountered an error Error, @@ -253,13 +256,14 @@ impl FileStream { metrics: &ExecutionPlanMetricsSet, ) -> Result { let (projected_schema, ..) = config.project(); - let pc_projector = ExtendedColumnProjector::new( + let col_projector = ExtendedColumnProjector::new( projected_schema.clone(), &config .table_partition_cols .iter() .map(|x| x.name().clone()) .collect::>(), + &config.metadata_cols, ); let files = config.file_groups[partition].clone(); @@ -269,7 +273,7 @@ impl FileStream { projected_schema, remain: config.limit, file_opener, - pc_projector, + col_projector, state: FileStreamState::Idle, file_stream_metrics: FileStreamMetrics::new(metrics, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), @@ -290,19 +294,21 @@ impl FileStream { /// /// Since file opening is mostly IO (and may involve a /// bunch of sequential IO), it can be parallelized with decoding. - fn start_next_file(&mut self) -> Option)>> { + fn start_next_file( + &mut self, + ) -> Option, ObjectMeta)>> { let part_file = self.file_iter.pop_front()?; let file_meta = FileMeta { - object_meta: part_file.object_meta, + object_meta: part_file.object_meta.clone(), range: part_file.range, extensions: part_file.extensions, }; Some( - self.file_opener - .open(file_meta) - .map(|future| (future, part_file.partition_values)), + self.file_opener.open(file_meta).map(|future| { + (future, part_file.partition_values, part_file.object_meta) + }), ) } @@ -313,10 +319,11 @@ impl FileStream { self.file_stream_metrics.time_opening.start(); match self.start_next_file().transpose() { - Ok(Some((future, partition_values))) => { + Ok(Some((future, partition_values, object_meta))) => { self.state = FileStreamState::Open { future, partition_values, + object_meta, } } Ok(None) => return Poll::Ready(None), @@ -329,9 +336,11 @@ impl FileStream { FileStreamState::Open { future, partition_values, + object_meta, } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { let partition_values = mem::take(partition_values); + let object_meta = object_meta.clone(); // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); @@ -340,13 +349,19 @@ impl FileStream { self.file_stream_metrics.time_scanning_total.start(); match next { - Ok(Some((next_future, next_partition_values))) => { + Ok(Some(( + next_future, + next_partition_values, + next_object_meta, + ))) => { self.state = FileStreamState::Scan { partition_values, + object_meta, reader, next: Some(( NextOpen::Pending(next_future), next_partition_values, + next_object_meta, )), }; } @@ -354,6 +369,7 @@ impl FileStream { self.state = FileStreamState::Scan { reader, partition_values, + object_meta, next: None, }; } @@ -381,9 +397,10 @@ impl FileStream { reader, partition_values, next, + object_meta, } => { // We need to poll the next `FileOpenFuture` here to drive it forward - if let Some((next_open_future, _)) = next { + if let Some((next_open_future, _, _)) = next { if let NextOpen::Pending(f) = next_open_future { if let Poll::Ready(reader) = f.as_mut().poll(cx) { *next_open_future = NextOpen::Ready(reader); @@ -395,8 +412,8 @@ impl FileStream { self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); let result = self - .pc_projector - .project(batch, partition_values) + .col_projector + .project(batch, partition_values, object_meta) .map_err(|e| ArrowError::ExternalError(e.into())) .map(|batch| match &mut self.remain { Some(remain) => { @@ -429,7 +446,7 @@ impl FileStream { match self.on_error { // If `OnError::Skip` we skip the file as soon as we hit the first error OnError::Skip => match mem::take(next) { - Some((future, partition_values)) => { + Some((future, partition_values, object_meta)) => { self.file_stream_metrics.time_opening.start(); match future { @@ -437,6 +454,7 @@ impl FileStream { self.state = FileStreamState::Open { future, partition_values, + object_meta, } } NextOpen::Ready(reader) => { @@ -445,6 +463,7 @@ impl FileStream { reader, )), partition_values, + object_meta, } } } @@ -462,7 +481,7 @@ impl FileStream { self.file_stream_metrics.time_scanning_total.stop(); match mem::take(next) { - Some((future, partition_values)) => { + Some((future, partition_values, object_meta)) => { self.file_stream_metrics.time_opening.start(); match future { @@ -470,6 +489,7 @@ impl FileStream { self.state = FileStreamState::Open { future, partition_values, + object_meta, } } NextOpen::Ready(reader) => { @@ -478,6 +498,7 @@ impl FileStream { reader, )), partition_values, + object_meta, } } } From a6283a3c12cbebac56bdd77187b7680a4b662506 Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Wed, 26 Feb 2025 10:05:57 +0900 Subject: [PATCH 6/8] Working! --- datafusion/core/src/datasource/listing/table.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index e008bbfac119..45cd066dbb7c 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -862,9 +862,7 @@ impl ListingTable { self.options .table_partition_cols .iter() - .map(|col| &col.0) - .chain(self.options.metadata_cols.iter()) - .map(|col| Ok(self.table_schema.field_with_name(col)?)) + .map(|col| Ok(self.table_schema.field_with_name(&col.0)?)) .collect::>>() } From 6241e7a2fea5ee0fda930d6590804cc896c44f7a Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Wed, 26 Feb 2025 10:13:46 +0900 Subject: [PATCH 7/8] Use MetadataColumn enum --- .../core/src/datasource/listing/table.rs | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 45cd066dbb7c..0497b72da187 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -262,7 +262,7 @@ pub struct ListingOptions { pub file_sort_order: Vec>, /// Additional columns to include in the table schema, based on the metadata of the files. /// See [Self::with_metadata_cols] for details. - pub metadata_cols: Vec, + pub metadata_cols: Vec, } impl ListingOptions { @@ -434,11 +434,11 @@ impl ListingOptions { /// let listing_options = ListingOptions::new(Arc::new( /// ParquetFormat::default() /// )) - /// .with_metadata_cols(vec!["last_modified".to_string()]); + /// .with_metadata_cols(vec![MetadataColumn::LastModified]); /// - /// assert_eq!(listing_options.metadata_cols, vec!["last_modified".to_string()]); + /// assert_eq!(listing_options.metadata_cols, vec![MetadataColumn::LastModified]); /// ``` - pub fn with_metadata_cols(mut self, metadata_cols: Vec) -> Self { + pub fn with_metadata_cols(mut self, metadata_cols: Vec) -> Self { self.metadata_cols = metadata_cols; self } @@ -530,11 +530,9 @@ impl ListingOptions { /// Validates that the metadata columns do not already exist in the schema, and are one of the allowed metadata columns. pub fn validate_metadata_cols(&self, schema: &SchemaRef) -> Result<()> { for col in self.metadata_cols.iter() { - if schema.column_with_name(col).is_some() { + if schema.column_with_name(col.name()).is_some() { return plan_err!("Column {} already exists in schema", col); } - - let _ = MetadataColumn::from_str(col)?; } Ok(()) @@ -783,11 +781,7 @@ impl ListingTable { // Validate and add metadata columns to the schema options.validate_metadata_cols(&file_schema)?; for col in &options.metadata_cols { - builder.push(Field::new( - col, - MetadataColumn::from_str(col)?.arrow_type(), - false, - )); + builder.push(col.field()); } let table_schema = Arc::new( @@ -875,7 +869,7 @@ impl ListingTable { } fn metadata_column_names(&self) -> impl Iterator { - self.options.metadata_cols.iter().map(|col| col.as_str()) + self.options.metadata_cols.iter().map(|col| col.name()) } } From 8066f4aaba4f09c0312f760f8d8b881a4035f75d Mon Sep 17 00:00:00 2001 From: Phillip LeBlanc Date: Wed, 26 Feb 2025 11:53:06 +0900 Subject: [PATCH 8/8] Add integration tests for metadata selection + pushdown filtering --- datafusion/core/tests/sql/path_partition.rs | 136 +++++++++++++++++++- 1 file changed, 134 insertions(+), 2 deletions(-) diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index 919054e8330f..8438a5472245 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -24,7 +24,7 @@ use std::ops::Range; use std::sync::Arc; use arrow::datatypes::DataType; -use datafusion::datasource::listing::ListingTableUrl; +use datafusion::datasource::listing::{ListingTableUrl, MetadataColumn}; use datafusion::datasource::physical_plan::ParquetExec; use datafusion::{ assert_batches_sorted_eq, @@ -72,6 +72,7 @@ async fn parquet_partition_pruning_filter() -> Result<()> { ("month", DataType::Int32), ("day", DataType::Int32), ], + &[], "mirror:///", "alltypes_plain.parquet", ) @@ -574,6 +575,134 @@ async fn parquet_overlapping_columns() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_metadata_columns() -> Result<()> { + let ctx = SessionContext::new(); + + let table = create_partitioned_alltypes_parquet_table( + &ctx, + &[ + "year=2021/month=09/day=09/file.parquet", + "year=2021/month=10/day=09/file.parquet", + "year=2021/month=10/day=28/file.parquet", + ], + &[ + ("year", DataType::Int32), + ("month", DataType::Int32), + ("day", DataType::Int32), + ], + &[ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ], + "mirror:///", + "alltypes_plain.parquet", + ) + .await; + + ctx.register_table("t", table).unwrap(); + + let result = ctx + .sql("SELECT id, size, location, last_modified FROM t WHERE size > 1500 ORDER BY id LIMIT 10") + .await? + .collect() + .await?; + + let expected = [ + "+----+------+----------------------------------------+----------------------+", + "| id | size | location | last_modified |", + "+----+------+----------------------------------------+----------------------+", + "| 0 | 1851 | year=2021/month=09/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 0 | 1851 | year=2021/month=10/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 0 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "| 1 | 1851 | year=2021/month=09/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 1 | 1851 | year=2021/month=10/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 1 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "| 2 | 1851 | year=2021/month=09/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 2 | 1851 | year=2021/month=10/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 2 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "| 3 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "+----+------+----------------------------------------+----------------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + + Ok(()) +} + +#[tokio::test] +async fn test_metadata_columns_pushdown() -> Result<()> { + let ctx = SessionContext::new(); + + let table = create_partitioned_alltypes_parquet_table( + &ctx, + &[ + "year=2021/month=09/day=09/file.parquet", + "year=2021/month=10/day=09/file.parquet", + "year=2021/month=10/day=28/file.parquet", + ], + &[ + ("year", DataType::Int32), + ("month", DataType::Int32), + ("day", DataType::Int32), + ], + &[ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ], + "mirror:///", + "alltypes_plain.parquet", + ) + .await; + + // The metadata filters can be resolved using only the metadata columns. + let filters = [ + Expr::eq( + col("location"), + lit("year=2021/month=09/day=09/file.parquet"), + ), + Expr::gt(col("size"), lit(400u64)), + Expr::gt_eq( + col("last_modified"), + lit(ScalarValue::TimestampMicrosecond( + Some(0), + Some("UTC".into()), + )), + ), + Expr::gt(col("id"), lit(1)), + ]; + let exec = table.scan(&ctx.state(), None, &filters, None).await?; + let parquet_exec = exec.as_any().downcast_ref::().unwrap(); + let pred = parquet_exec.predicate().unwrap(); + // Only the last filter should be pushdown to TableScan + let expected = Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + + assert!(pred.as_any().is::()); + let pred = pred.as_any().downcast_ref::().unwrap(); + + assert_eq!(pred, expected.as_any()); + + // Only the first file should be scanned + let scan_files = parquet_exec + .base_config() + .file_groups + .iter() + .flat_map(|x| x.iter().map(|y| y.path()).collect::>()) + .collect::>(); + assert_eq!(scan_files.len(), 1); + assert_eq!( + scan_files[0].to_string(), + "year=2021/month=09/day=09/file.parquet" + ); + + Ok(()) +} + fn register_partitioned_aggregate_csv( ctx: &SessionContext, store_paths: &[&str], @@ -618,6 +747,7 @@ async fn register_partitioned_alltypes_parquet( ctx, store_paths, partition_cols, + &[], table_path, source_file, ) @@ -630,6 +760,7 @@ async fn create_partitioned_alltypes_parquet_table( ctx: &SessionContext, store_paths: &[&str], partition_cols: &[(&str, DataType)], + metadata_cols: &[MetadataColumn], table_path: &str, source_file: &str, ) -> Arc { @@ -647,7 +778,8 @@ async fn create_partitioned_alltypes_parquet_table( .iter() .map(|x| (x.0.to_owned(), x.1.clone())) .collect::>(), - ); + ) + .with_metadata_cols(metadata_cols.to_vec()); let table_path = ListingTableUrl::parse(table_path).unwrap(); let store_path =