diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 47012f777ad1..5fa39f681360 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -29,7 +29,8 @@ use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{BinaryExpr, Operator}; use arrow::{ - array::{Array, ArrayRef, AsArray, StringBuilder}, + array::{Array, ArrayRef, AsArray, BooleanArray, StringBuilder}, + buffer::BooleanBuffer, compute::{and, cast, prep_null_mask_filter}, datatypes::{DataType, Field, Schema}, record_batch::RecordBatch, @@ -272,26 +273,59 @@ async fn prune_partitions( .collect(); let schema = Arc::new(Schema::new(fields)); - let df_schema = DFSchema::from_unqualified_fields( - partition_cols - .iter() - .map(|(n, d)| Field::new(n, d.clone(), true)) - .collect(), - Default::default(), - )?; - let batch = RecordBatch::try_new(schema, arrays)?; // TODO: Plumb this down let props = ExecutionProps::new(); + // Don't retain partitions that evaluated to null + let prepared = apply_filters(&batch, filters, &props)?; + + // If all rows are retained, return all partitions + if prepared.true_count() == prepared.len() { + return Ok(partitions); + } + + // Sanity check + assert_eq!(prepared.len(), partitions.len()); + + let filtered = partitions + .into_iter() + .zip(prepared.values()) + .filter_map(|(p, f)| f.then_some(p)) + .collect(); + + Ok(filtered) +} + +/// Applies the given filters to the input batch and returns a boolean mask that represents +/// the result of the filters applied to each row. +pub(crate) fn apply_filters( + batch: &RecordBatch, + filters: &[Expr], + props: &ExecutionProps, +) -> Result { + if filters.is_empty() { + return Ok(BooleanArray::new( + BooleanBuffer::new_set(batch.num_rows()), + None, + )); + } + + let num_rows = batch.num_rows(); + + let df_schema = DFSchema::from_unqualified_fields( + batch.schema().fields().clone(), + HashMap::default(), + )?; + // Applies `filter` to `batch` returning `None` on error let do_filter = |filter| -> Result { - let expr = create_physical_expr(filter, &df_schema, &props)?; - expr.evaluate(&batch)?.into_array(partitions.len()) + let expr = create_physical_expr(filter, &df_schema, props)?; + expr.evaluate(batch)?.into_array(num_rows) }; - //.Compute the conjunction of the filters + // Compute the conjunction of the filters let mask = filters .iter() .map(|f| do_filter(f).map(|a| a.as_boolean().clone())) @@ -300,25 +334,16 @@ async fn prune_partitions( let mask = match mask { Some(Ok(mask)) => mask, Some(Err(err)) => return Err(err), - None => return Ok(partitions), + None => return Ok(BooleanArray::new(BooleanBuffer::new_set(num_rows), None)), }; - // Don't retain partitions that evaluated to null + // Don't retain rows that evaluated to null let prepared = match mask.null_count() { 0 => mask, _ => prep_null_mask_filter(&mask), }; - // Sanity check - assert_eq!(prepared.len(), partitions.len()); - - let filtered = partitions - .into_iter() - .zip(prepared.values()) - .filter_map(|(p, f)| f.then_some(p)) - .collect(); - - Ok(filtered) + Ok(prepared) } #[derive(Debug)] diff --git a/datafusion/core/src/datasource/listing/metadata.rs b/datafusion/core/src/datasource/listing/metadata.rs new file mode 100644 index 000000000000..30ce70083546 --- /dev/null +++ b/datafusion/core/src/datasource/listing/metadata.rs @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Functions that support extracting metadata from files. + +use std::fmt; +use std::str::FromStr; +use std::sync::Arc; + +use super::PartitionedFile; +use crate::datasource::listing::helpers::apply_filters; +use datafusion_common::plan_err; +use datafusion_common::Result; + +use arrow::{ + array::{Array, StringBuilder, TimestampMicrosecondBuilder, UInt64Builder}, + datatypes::{DataType, Field, Schema, TimeUnit}, + record_batch::RecordBatch, +}; +use arrow_schema::Fields; +use datafusion_common::ScalarValue; +use datafusion_expr::execution_props::ExecutionProps; + +use datafusion_common::DataFusionError; +use datafusion_expr::Expr; +use object_store::ObjectMeta; + +/// A metadata column that can be used to filter files +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum MetadataColumn { + /// The location of the file in object store + Location, + /// The last modified timestamp of the file + LastModified, + /// The size of the file in bytes + Size, +} + +impl fmt::Display for MetadataColumn { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.name()) + } +} + +impl MetadataColumn { + /// The name of the metadata column (one of `location`, `last_modified`, or `size`) + pub fn name(&self) -> &str { + match self { + MetadataColumn::Location => "location", + MetadataColumn::LastModified => "last_modified", + MetadataColumn::Size => "size", + } + } + + /// Returns the arrow type of this metadata column + pub fn arrow_type(&self) -> DataType { + match self { + MetadataColumn::Location => DataType::Utf8, + MetadataColumn::LastModified => { + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())) + } + MetadataColumn::Size => DataType::UInt64, + } + } + + /// Returns the arrow field for this metadata column + pub fn field(&self) -> Field { + Field::new(self.to_string(), self.arrow_type(), true) + } + + /// Returns the scalar value for this metadata column given an object meta + pub fn to_scalar_value(&self, meta: &ObjectMeta) -> ScalarValue { + match self { + MetadataColumn::Location => { + ScalarValue::Utf8(Some(meta.location.to_string())) + } + MetadataColumn::LastModified => ScalarValue::TimestampMicrosecond( + Some(meta.last_modified.timestamp_micros()), + Some("UTC".into()), + ), + MetadataColumn::Size => ScalarValue::UInt64(Some(meta.size as u64)), + } + } + + pub(crate) fn builder(&self, capacity: usize) -> MetadataBuilder { + match self { + MetadataColumn::Location => MetadataBuilder::Location( + StringBuilder::with_capacity(capacity, capacity * 10), + ), + MetadataColumn::LastModified => MetadataBuilder::LastModified( + TimestampMicrosecondBuilder::with_capacity(capacity).with_data_type( + DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())), + ), + ), + MetadataColumn::Size => { + MetadataBuilder::Size(UInt64Builder::with_capacity(capacity)) + } + } + } +} + +impl FromStr for MetadataColumn { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + match s { + "location" => Ok(MetadataColumn::Location), + "last_modified" => Ok(MetadataColumn::LastModified), + "size" => Ok(MetadataColumn::Size), + _ => plan_err!( + "Invalid metadata column: {}, expected: location, last_modified, or size", + s + ), + } + } +} + +pub(crate) enum MetadataBuilder { + Location(StringBuilder), + LastModified(TimestampMicrosecondBuilder), + Size(UInt64Builder), +} + +impl MetadataBuilder { + pub fn append(&mut self, meta: &ObjectMeta) { + match self { + Self::Location(builder) => builder.append_value(&meta.location), + Self::LastModified(builder) => { + builder.append_value(meta.last_modified.timestamp_micros()) + } + Self::Size(builder) => builder.append_value(meta.size as u64), + } + } + + pub fn finish(self) -> Arc { + match self { + MetadataBuilder::Location(mut builder) => Arc::new(builder.finish()), + MetadataBuilder::LastModified(mut builder) => Arc::new(builder.finish()), + MetadataBuilder::Size(mut builder) => Arc::new(builder.finish()), + } + } +} + +/// Determine if the given file matches the input metadata filters. +/// `filters` should only contain expressions that can be evaluated +/// using only the metadata columns. +pub(crate) fn apply_metadata_filters( + file: PartitionedFile, + filters: &[Expr], + metadata_cols: &[MetadataColumn], +) -> Result> { + // if no metadata col => simply return all the files + if metadata_cols.is_empty() { + return Ok(Some(file)); + } + + let mut builders: Vec<_> = metadata_cols.iter().map(|col| col.builder(1)).collect(); + + for builder in builders.iter_mut() { + builder.append(&file.object_meta); + } + + let arrays = builders + .into_iter() + .map(|builder| builder.finish()) + .collect::>(); + + let fields: Fields = metadata_cols + .iter() + .map(|col| Field::new(col.to_string(), col.arrow_type(), true)) + .collect(); + let schema = Arc::new(Schema::new(fields)); + + let batch = RecordBatch::try_new(schema, arrays)?; + + // TODO: Plumb this down + let props = ExecutionProps::new(); + + // Don't retain rows that evaluated to null + let prepared = apply_filters(&batch, filters, &props)?; + + // If the filter evaluates to true, return the file + if prepared.true_count() == 1 { + return Ok(Some(file)); + } + + Ok(None) +} diff --git a/datafusion/core/src/datasource/listing/mod.rs b/datafusion/core/src/datasource/listing/mod.rs index c5a441aacf1d..696eeb273421 100644 --- a/datafusion/core/src/datasource/listing/mod.rs +++ b/datafusion/core/src/datasource/listing/mod.rs @@ -19,6 +19,7 @@ //! to get the list of files to process. mod helpers; +mod metadata; mod table; mod url; @@ -31,6 +32,7 @@ use std::pin::Pin; use std::sync::Arc; pub use self::url::ListingTableUrl; +pub use metadata::MetadataColumn; pub use table::{ListingOptions, ListingTable, ListingTableConfig}; /// Stream of files get listed from object store diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 15125fe5a090..0497b72da187 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -21,6 +21,7 @@ use std::collections::HashMap; use std::{any::Any, str::FromStr, sync::Arc}; use super::helpers::{expr_applicable_for_cols, pruned_partition_list, split_files}; +use super::metadata::{apply_metadata_filters, MetadataColumn}; use super::{ListingTableUrl, PartitionedFile}; use crate::datasource::{ @@ -259,6 +260,9 @@ pub struct ListingOptions { /// multiple equivalent orderings, the outer `Vec` will have a /// single element. pub file_sort_order: Vec>, + /// Additional columns to include in the table schema, based on the metadata of the files. + /// See [Self::with_metadata_cols] for details. + pub metadata_cols: Vec, } impl ListingOptions { @@ -276,6 +280,7 @@ impl ListingOptions { collect_stat: true, target_partitions: 1, file_sort_order: vec![], + metadata_cols: vec![], } } @@ -394,6 +399,50 @@ impl ListingOptions { self } + /// Set metadata columns on [`ListingOptions`] and returns self. + /// + /// "metadata columns" are columns that are computed from the `ObjectMeta` of the files from object store. + /// + /// Available metadata columns: + /// - `location`: The full path to the object + /// - `last_modified`: The last modified time + /// - `size`: The size in bytes of the object + /// + /// For example, given the following files in object store: + /// + /// ```text + /// /mnt/nyctaxi/tripdata01.parquet + /// /mnt/nyctaxi/tripdata02.parquet + /// /mnt/nyctaxi/tripdata03.parquet + /// ``` + /// + /// If the `last_modified` field in the `ObjectMeta` for `tripdata01.parquet` is `2024-01-01 12:00:00`, + /// then the table schema will include a column named `last_modified` with the value `2024-01-01 12:00:00` + /// for all rows read from `tripdata01.parquet`. + /// + /// | | last_modified | + /// |-----------------|-----------------------| + /// | ... | 2024-01-01 12:00:00 | + /// | ... | 2024-01-02 15:30:00 | + /// | ... | 2024-01-03 09:15:00 | + /// + /// # Example + /// ``` + /// # use std::sync::Arc; + /// # use datafusion::datasource::{listing::ListingOptions, file_format::parquet::ParquetFormat}; + /// + /// let listing_options = ListingOptions::new(Arc::new( + /// ParquetFormat::default() + /// )) + /// .with_metadata_cols(vec![MetadataColumn::LastModified]); + /// + /// assert_eq!(listing_options.metadata_cols, vec![MetadataColumn::LastModified]); + /// ``` + pub fn with_metadata_cols(mut self, metadata_cols: Vec) -> Self { + self.metadata_cols = metadata_cols; + self + } + /// Set stat collection on [`ListingOptions`] and returns self. /// /// ``` @@ -478,6 +527,17 @@ impl ListingOptions { Ok(schema) } + /// Validates that the metadata columns do not already exist in the schema, and are one of the allowed metadata columns. + pub fn validate_metadata_cols(&self, schema: &SchemaRef) -> Result<()> { + for col in self.metadata_cols.iter() { + if schema.column_with_name(col.name()).is_some() { + return plan_err!("Column {} already exists in schema", col); + } + } + + Ok(()) + } + /// Infers the partition columns stored in `LOCATION` and compares /// them with the columns provided in `PARTITIONED BY` to help prevent /// accidental corrupts of partitioned tables. @@ -684,7 +744,7 @@ pub struct ListingTable { table_paths: Vec, /// File fields only file_schema: SchemaRef, - /// File fields + partition columns + /// File fields + partition columns + metadata columns table_schema: SchemaRef, options: ListingOptions, definition: Option, @@ -701,9 +761,8 @@ impl ListingTable { /// `ListingOptions` and `SchemaRef` are optional. If they are not /// provided the file type is inferred based on the file suffix. /// If the schema is provided then it must be resolved before creating the table - /// and should contain the fields of the file without the table - /// partitioning columns. - /// + /// and should contain the fields of the file without the extended table columns, + /// i.e. the partitioning and metadata columns. pub fn try_new(config: ListingTableConfig) -> Result { let file_schema = config .file_schema @@ -719,6 +778,12 @@ impl ListingTable { builder.push(Field::new(part_col_name, part_col_type.clone(), false)); } + // Validate and add metadata columns to the schema + options.validate_metadata_cols(&file_schema)?; + for col in &options.metadata_cols { + builder.push(col.field()); + } + let table_schema = Arc::new( builder .finish() @@ -786,16 +851,36 @@ impl ListingTable { fn try_create_output_ordering(&self) -> Result> { create_ordering(&self.table_schema, &self.options.file_sort_order) } + + fn partition_column_fields(&self) -> Result> { + self.options + .table_partition_cols + .iter() + .map(|col| Ok(self.table_schema.field_with_name(&col.0)?)) + .collect::>>() + } + + fn partition_column_names(&self) -> Result> { + Ok(self + .options + .table_partition_cols + .iter() + .map(|col| col.0.as_str())) + } + + fn metadata_column_names(&self) -> impl Iterator { + self.options.metadata_cols.iter().map(|col| col.name()) + } } -// Expressions can be used for parttion pruning if they can be evaluated using -// only the partiton columns and there are partition columns. -fn can_be_evaluted_for_partition_pruning( - partition_column_names: &[&str], +// Expressions can be used for extended columns (partition/metadata) pruning if they can be evaluated using +// only the extended columns and there are extended columns. +fn can_be_evaluated_for_extended_col_pruning( + extended_column_names: &[&str], expr: &Expr, ) -> bool { - !partition_column_names.is_empty() - && expr_applicable_for_cols(partition_column_names, expr) + !extended_column_names.is_empty() + && expr_applicable_for_cols(extended_column_names, expr) } #[async_trait] @@ -823,28 +908,29 @@ impl TableProvider for ListingTable { filters: &[Expr], limit: Option, ) -> Result> { - // extract types of partition columns - let table_partition_cols = self - .options - .table_partition_cols - .iter() - .map(|col| Ok(self.table_schema.field_with_name(&col.0)?.clone())) - .collect::>>()?; + // extract types of extended columns (partition + metadata columns) + let partition_col_names = self.partition_column_names()?.collect::>(); + let metadata_col_names = self.metadata_column_names().collect::>(); - let table_partition_col_names = table_partition_cols - .iter() - .map(|field| field.name().as_str()) - .collect::>(); - // If the filters can be resolved using only partition cols, there is no need to - // pushdown it to TableScan, otherwise, `unhandled` pruning predicates will be generated + // If the filters can be resolved using only partition/metadata cols, there is no need to + // push it down to the TableScan, otherwise, `unhandled` pruning predicates will be generated let (partition_filters, filters): (Vec<_>, Vec<_>) = filters.iter().cloned().partition(|filter| { - can_be_evaluted_for_partition_pruning(&table_partition_col_names, filter) + can_be_evaluated_for_extended_col_pruning(&partition_col_names, filter) + }); + let (metadata_filters, filters): (Vec<_>, Vec<_>) = + filters.iter().cloned().partition(|filter| { + can_be_evaluated_for_extended_col_pruning(&metadata_col_names, filter) }); // TODO (https://github.com/apache/datafusion/issues/11600) remove downcast_ref from here? let session_state = state.as_any().downcast_ref::().unwrap(); let (mut partitioned_file_lists, statistics) = self - .list_files_for_scan(session_state, &partition_filters, limit) + .list_files_for_scan( + session_state, + &partition_filters, + &metadata_filters, + limit, + ) .await?; // if no files need to be read, return an `EmptyExec` @@ -882,7 +968,7 @@ impl TableProvider for ListingTable { let filters = conjunction(filters.to_vec()) .map(|expr| -> Result<_> { - // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition columns. + // NOTE: Use the table schema (NOT file schema) here because `expr` may contain references to partition/metadata columns. let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?; let filters = create_physical_expr( &expr, @@ -899,6 +985,17 @@ impl TableProvider for ListingTable { return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); }; + let table_partition_cols = self + .partition_column_fields()? + .into_iter() + .cloned() + .collect(); + + let metadata_cols = self + .metadata_column_names() + .filter_map(|c| MetadataColumn::from_str(c).ok()) + .collect::>(); + // create the execution plan self.options .format @@ -910,7 +1007,8 @@ impl TableProvider for ListingTable { .with_projection(projection.cloned()) .with_limit(limit) .with_output_ordering(output_ordering) - .with_table_partition_cols(table_partition_cols), + .with_table_partition_cols(table_partition_cols) + .with_metadata_cols(metadata_cols), filters.as_ref(), ) .await @@ -920,18 +1018,19 @@ impl TableProvider for ListingTable { &self, filters: &[&Expr], ) -> Result> { - let partition_column_names = self - .options - .table_partition_cols - .iter() - .map(|col| col.0.as_str()) + let extended_column_names = self + .partition_column_names()? + .chain(self.metadata_column_names()) .collect::>(); + filters .iter() .map(|filter| { - if can_be_evaluted_for_partition_pruning(&partition_column_names, filter) - { - // if filter can be handled by partition pruning, it is exact + if can_be_evaluated_for_extended_col_pruning( + &extended_column_names, + filter, + ) { + // if filter can be handled by pruning from the extended columns, it is exact return Ok(TableProviderFilterPushDown::Exact); } @@ -1061,7 +1160,8 @@ impl ListingTable { async fn list_files_for_scan<'a>( &'a self, ctx: &'a SessionState, - filters: &'a [Expr], + partition_filters: &'a [Expr], + metadata_filters: &'a [Expr], limit: Option, ) -> Result<(Vec>, Statistics)> { let store = if let Some(url) = self.table_paths.first() { @@ -1075,17 +1175,36 @@ impl ListingTable { ctx, store.as_ref(), table_path, - filters, + partition_filters, &self.options.file_extension, &self.options.table_partition_cols, ) })) .await?; let file_list = stream::iter(file_list).flatten(); - // collect the statistics if required by the config + + let metadata_cols = self + .metadata_column_names() + .map(MetadataColumn::from_str) + .collect::>>()?; + + // collect the statistics if required by the config + filter out files that don't match the metadata filters let files = file_list + .filter_map(|par_file| async { + if metadata_cols.is_empty() { + return Some(par_file); + } + + let Ok(par_file) = par_file else { + return Some(par_file); + }; + + apply_metadata_filters(par_file, metadata_filters, &metadata_cols) + .transpose() + }) .map(|part_file| async { let part_file = part_file?; + if self.options.collect_stat { let statistics = self.do_collect_statistics(ctx, &store, &part_file).await?; @@ -1609,7 +1728,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let (file_list, _) = table + .list_files_for_scan(&ctx.state(), &[], &[], None) + .await?; assert_eq!(file_list.len(), output_partitioning); @@ -1646,7 +1767,9 @@ mod tests { let table = ListingTable::try_new(config)?; - let (file_list, _) = table.list_files_for_scan(&ctx.state(), &[], None).await?; + let (file_list, _) = table + .list_files_for_scan(&ctx.state(), &[], &[], None) + .await?; assert_eq!(file_list.len(), output_partitioning); diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs index 74ab0126a557..6c813ee08982 100644 --- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs +++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs @@ -20,11 +20,14 @@ use std::{ borrow::Cow, collections::HashMap, fmt::Debug, marker::PhantomData, mem::size_of, - sync::Arc, vec, + str::FromStr, sync::Arc, vec, }; use super::{get_projected_output_ordering, statistics::MinMaxStatistics}; -use crate::datasource::{listing::PartitionedFile, object_store::ObjectStoreUrl}; +use crate::datasource::{ + listing::{MetadataColumn, PartitionedFile}, + object_store::ObjectStoreUrl, +}; use crate::{error::Result, scalar::ScalarValue}; use arrow::array::{ArrayData, BufferBuilder}; @@ -38,6 +41,7 @@ use datafusion_physical_expr::LexOrdering; use datafusion_physical_expr_common::sort_expr::LexOrderingRef; use log::warn; +use object_store::ObjectMeta; /// Convert type to a type suitable for use as a [`ListingTable`] /// partition column. Returns `Dictionary(UInt16, val_type)`, which is @@ -119,13 +123,15 @@ pub struct FileScanConfig { /// Defaults to [`Statistics::new_unknown`]. pub statistics: Statistics, /// Columns on which to project the data. Indexes that are higher than the - /// number of columns of `file_schema` refer to `table_partition_cols`. + /// number of columns of `file_schema` refer to `table_partition_cols` and then `metadata_cols`. pub projection: Option>, /// The maximum number of records to read from this plan. If `None`, /// all records after filtering are returned. pub limit: Option, /// The partitioning columns pub table_partition_cols: Vec, + /// The metadata columns + pub metadata_cols: Vec, /// All equivalent lexicographical orderings that describe the schema. pub output_ordering: Vec, } @@ -151,6 +157,7 @@ impl FileScanConfig { projection: None, limit: None, table_partition_cols: vec![], + metadata_cols: vec![], output_ordering: vec![], } } @@ -205,6 +212,12 @@ impl FileScanConfig { self } + /// Set the metadata columns of the files + pub fn with_metadata_cols(mut self, metadata_cols: Vec) -> Self { + self.metadata_cols = metadata_cols; + self + } + /// Set the output ordering of the files pub fn with_output_ordering(mut self, output_ordering: Vec) -> Self { self.output_ordering = output_ordering; @@ -224,23 +237,19 @@ impl FileScanConfig { let proj_iter: Box> = match &self.projection { Some(proj) => Box::new(proj.iter().copied()), None => Box::new( - 0..(self.file_schema.fields().len() + self.table_partition_cols.len()), + 0..(self.file_schema.fields().len() + + self.table_partition_cols.len() + + self.metadata_cols.len()), ), }; let mut table_fields = vec![]; let mut table_cols_stats = vec![]; + for idx in proj_iter { - if idx < self.file_schema.fields().len() { - let field = self.file_schema.field(idx); - table_fields.push(field.clone()); - table_cols_stats.push(self.statistics.column_statistics[idx].clone()) - } else { - let partition_idx = idx - self.file_schema.fields().len(); - table_fields.push(self.table_partition_cols[partition_idx].to_owned()); - // TODO provide accurate stat for partition column (#1186) - table_cols_stats.push(ColumnStatistics::new_unknown()) - } + let (field, stats) = self.get_field_and_stats(idx); + table_fields.push(field); + table_cols_stats.push(stats); } let table_stats = Statistics { @@ -261,6 +270,38 @@ impl FileScanConfig { (projected_schema, table_stats, projected_output_ordering) } + /// Helper function to get field and statistics for a given index + fn get_field_and_stats(&self, idx: usize) -> (Field, ColumnStatistics) { + let file_schema_len = self.file_schema.fields().len(); + let partition_cols_len = self.table_partition_cols.len(); + + match idx { + // File schema columns + i if i < file_schema_len => ( + self.file_schema.field(i).clone(), + self.statistics.column_statistics[i].clone(), + ), + + // Partition columns + i if i < file_schema_len + partition_cols_len => { + let partition_idx = i - file_schema_len; + ( + self.table_partition_cols[partition_idx].to_owned(), + ColumnStatistics::new_unknown(), + ) + } + + // Metadata columns + i => { + let metadata_idx = i - file_schema_len - partition_cols_len; + ( + self.metadata_cols[metadata_idx].field(), + ColumnStatistics::new_unknown(), + ) + } + } + } + #[cfg_attr(not(feature = "avro"), allow(unused))] // Only used by avro pub(crate) fn projected_file_column_names(&self) -> Option> { self.projection.as_ref().map(|p| { @@ -272,7 +313,7 @@ impl FileScanConfig { }) } - /// Projects only file schema, ignoring partition columns + /// Projects only file schema, ignoring partition/metadata columns pub(crate) fn projected_file_schema(&self) -> SchemaRef { let fields = self.file_column_projection_indices().map(|indices| { indices @@ -368,13 +409,13 @@ impl FileScanConfig { } } -/// A helper that projects partition columns into the file record batches. +/// A helper that projects extended (i.e. partition/metadata) columns into the file record batches. /// /// One interesting trick is the usage of a cache for the key buffers of the partition column /// dictionaries. Indeed, the partition columns are constant, so the dictionaries that represent them /// have all their keys equal to 0. This enables us to re-use the same "all-zero" buffer across batches, /// which makes the space consumption of the partition columns O(batch_size) instead of O(record_count). -pub struct PartitionColumnProjector { +pub struct ExtendedColumnProjector { /// An Arrow buffer initialized to zeros that represents the key array of all partition /// columns (partition columns are materialized by dictionary arrays with only one /// value in the dictionary, thus all the keys are equal to zero). @@ -383,15 +424,22 @@ pub struct PartitionColumnProjector { /// schema. Sorted by index in the target schema so that we can iterate on it to /// insert the partition columns in the target record batch. projected_partition_indexes: Vec<(usize, usize)>, + /// Similar to `projected_partition_indexes` but only stores the indexes in the target schema + projected_metadata_indexes: Vec, /// The schema of the table once the projection was applied. projected_schema: SchemaRef, } -impl PartitionColumnProjector { - // Create a projector to insert the partitioning columns into batches read from files - // - `projected_schema`: the target schema with both file and partitioning columns +impl ExtendedColumnProjector { + // Create a projector to insert the partitioning/metadata columns into batches read from files + // - `projected_schema`: the target schema with file, partitioning and metadata columns // - `table_partition_cols`: all the partitioning column names - pub fn new(projected_schema: SchemaRef, table_partition_cols: &[String]) -> Self { + // - `metadata_cols`: all the metadata column names + pub fn new( + projected_schema: SchemaRef, + table_partition_cols: &[String], + metadata_cols: &[MetadataColumn], + ) -> Self { let mut idx_map = HashMap::new(); for (partition_idx, partition_name) in table_partition_cols.iter().enumerate() { if let Ok(schema_idx) = projected_schema.index_of(partition_name) { @@ -402,25 +450,38 @@ impl PartitionColumnProjector { let mut projected_partition_indexes: Vec<_> = idx_map.into_iter().collect(); projected_partition_indexes.sort_by(|(_, a), (_, b)| a.cmp(b)); + let mut projected_metadata_indexes = vec![]; + for metadata_name in metadata_cols.iter() { + if let Ok(schema_idx) = projected_schema.index_of(metadata_name.name()) { + projected_metadata_indexes.push(schema_idx); + } + } + Self { - projected_partition_indexes, key_buffer_cache: Default::default(), + projected_partition_indexes, + projected_metadata_indexes, projected_schema, } } - // Transform the batch read from the file by inserting the partitioning columns + // Transform the batch read from the file by inserting both partitioning and metadata columns // to the right positions as deduced from `projected_schema` // - `file_batch`: batch read from the file, with internal projection applied // - `partition_values`: the list of partition values, one for each partition column + // - `metadata`: the metadata of the file containing information like location, size, etc. pub fn project( &mut self, file_batch: RecordBatch, partition_values: &[ScalarValue], + metadata: &ObjectMeta, ) -> Result { - let expected_cols = - self.projected_schema.fields().len() - self.projected_partition_indexes.len(); + // Calculate expected number of columns from the file (excluding partition and metadata columns) + let expected_cols = self.projected_schema.fields().len() + - self.projected_partition_indexes.len() + - self.projected_metadata_indexes.len(); + // Verify the file batch has the expected number of columns if file_batch.columns().len() != expected_cols { return exec_err!( "Unexpected batch schema from file, expected {} cols but got {}", @@ -429,18 +490,21 @@ impl PartitionColumnProjector { ); } + // Start with the columns from the file batch let mut cols = file_batch.columns().to_vec(); + + // Insert partition columns for &(pidx, sidx) in &self.projected_partition_indexes { - let p_value = - partition_values - .get(pidx) - .ok_or(DataFusionError::Execution( - "Invalid partitioning found on disk".to_string(), - ))?; + // Get the partition value from the provided values + let p_value = partition_values.get(pidx).ok_or_else(|| { + DataFusionError::Execution( + "Invalid partitioning found on disk".to_string(), + ) + })?; let mut partition_value = Cow::Borrowed(p_value); - // check if user forgot to dict-encode the partition value + // Check if user forgot to dict-encode the partition value and apply auto-fix if needed let field = self.projected_schema.field(sidx); let expected_data_type = field.data_type(); let actual_data_type = partition_value.data_type(); @@ -454,6 +518,7 @@ impl PartitionColumnProjector { } } + // Create array and insert at the correct schema position cols.insert( sidx, create_output_array( @@ -461,9 +526,25 @@ impl PartitionColumnProjector { partition_value.as_ref(), file_batch.num_rows(), )?, - ) + ); + } + + // Insert metadata columns + for &sidx in &self.projected_metadata_indexes { + // Get the metadata column type from the field name + let field_name = self.projected_schema.field(sidx).name(); + let metadata_col = MetadataColumn::from_str(field_name).map_err(|e| { + DataFusionError::Execution(format!("Invalid metadata column: {}", e)) + })?; + + // Convert metadata to scalar value based on the column type + let scalar_value = metadata_col.to_scalar_value(metadata); + + // Create array and insert at the correct schema position + cols.insert(sidx, scalar_value.to_array_of_size(file_batch.num_rows())?); } + // Create a new record batch with all columns in the correct order RecordBatch::try_new_with_options( Arc::clone(&self.projected_schema), cols, @@ -619,10 +700,21 @@ fn create_output_array( #[cfg(test)] mod tests { use arrow_array::Int32Array; + use object_store::path::Path; use super::*; use crate::{test::columns, test_util::aggr_test_schema}; + fn test_object_meta() -> ObjectMeta { + ObjectMeta { + location: Path::from("test"), + size: 100, + last_modified: chrono::Utc::now(), + e_tag: None, + version: None, + } + } + #[test] fn physical_plan_config_no_projection() { let file_schema = aggr_test_schema(); @@ -764,12 +856,13 @@ mod tests { ); let (proj_schema, ..) = conf.project(); // created a projector for that projected schema - let mut proj = PartitionColumnProjector::new( + let mut proj = ExtendedColumnProjector::new( proj_schema, &partition_cols .iter() .map(|x| x.0.clone()) .collect::>(), + &[], ); // project first batch @@ -782,6 +875,7 @@ mod tests { wrap_partition_value_in_dict(ScalarValue::from("10")), wrap_partition_value_in_dict(ScalarValue::from("26")), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ @@ -810,6 +904,7 @@ mod tests { wrap_partition_value_in_dict(ScalarValue::from("10")), wrap_partition_value_in_dict(ScalarValue::from("27")), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ @@ -840,6 +935,7 @@ mod tests { wrap_partition_value_in_dict(ScalarValue::from("10")), wrap_partition_value_in_dict(ScalarValue::from("28")), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ @@ -868,6 +964,7 @@ mod tests { ScalarValue::from("10"), ScalarValue::from("26"), ], + &test_object_meta(), ) .expect("Projection of partition columns into record batch failed"); let expected = [ diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 6f354b31ae87..91712efbebca 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -27,7 +27,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use crate::datasource::listing::PartitionedFile; -use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector; +use crate::datasource::physical_plan::file_scan_config::ExtendedColumnProjector; use crate::datasource::physical_plan::{FileMeta, FileScanConfig}; use crate::error::Result; use crate::physical_plan::metrics::{ @@ -44,6 +44,7 @@ use datafusion_common::ScalarValue; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; +use object_store::ObjectMeta; /// A fallible future that resolves to a stream of [`RecordBatch`] pub type FileOpenFuture = @@ -85,8 +86,8 @@ pub struct FileStream { /// A generic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`], /// which can be resolved to a stream of `RecordBatch`. file_opener: F, - /// The partition column projector - pc_projector: PartitionColumnProjector, + /// The extended (partitioning + metadata) column projector + col_projector: ExtendedColumnProjector, /// The stream state state: FileStreamState, /// File stream specific metrics @@ -115,19 +116,23 @@ enum FileStreamState { future: FileOpenFuture, /// The partition values for this file partition_values: Vec, + /// The object metadata for this file + object_meta: ObjectMeta, }, /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`] /// returned by [`FileOpener::open`] Scan { /// Partitioning column values for the current batch_iter partition_values: Vec, + /// The object metadata for the current file + object_meta: ObjectMeta, /// The reader instance reader: BoxStream<'static, Result>, /// A [`FileOpenFuture`] for the next file to be processed, /// and its corresponding partition column values, if any. /// This allows the next file to be opened in parallel while the /// current file is read. - next: Option<(NextOpen, Vec)>, + next: Option<(NextOpen, Vec, ObjectMeta)>, }, /// Encountered an error Error, @@ -251,13 +256,14 @@ impl FileStream { metrics: &ExecutionPlanMetricsSet, ) -> Result { let (projected_schema, ..) = config.project(); - let pc_projector = PartitionColumnProjector::new( + let col_projector = ExtendedColumnProjector::new( projected_schema.clone(), &config .table_partition_cols .iter() .map(|x| x.name().clone()) .collect::>(), + &config.metadata_cols, ); let files = config.file_groups[partition].clone(); @@ -267,7 +273,7 @@ impl FileStream { projected_schema, remain: config.limit, file_opener, - pc_projector, + col_projector, state: FileStreamState::Idle, file_stream_metrics: FileStreamMetrics::new(metrics, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), @@ -288,19 +294,21 @@ impl FileStream { /// /// Since file opening is mostly IO (and may involve a /// bunch of sequential IO), it can be parallelized with decoding. - fn start_next_file(&mut self) -> Option)>> { + fn start_next_file( + &mut self, + ) -> Option, ObjectMeta)>> { let part_file = self.file_iter.pop_front()?; let file_meta = FileMeta { - object_meta: part_file.object_meta, + object_meta: part_file.object_meta.clone(), range: part_file.range, extensions: part_file.extensions, }; Some( - self.file_opener - .open(file_meta) - .map(|future| (future, part_file.partition_values)), + self.file_opener.open(file_meta).map(|future| { + (future, part_file.partition_values, part_file.object_meta) + }), ) } @@ -311,10 +319,11 @@ impl FileStream { self.file_stream_metrics.time_opening.start(); match self.start_next_file().transpose() { - Ok(Some((future, partition_values))) => { + Ok(Some((future, partition_values, object_meta))) => { self.state = FileStreamState::Open { future, partition_values, + object_meta, } } Ok(None) => return Poll::Ready(None), @@ -327,9 +336,11 @@ impl FileStream { FileStreamState::Open { future, partition_values, + object_meta, } => match ready!(future.poll_unpin(cx)) { Ok(reader) => { let partition_values = mem::take(partition_values); + let object_meta = object_meta.clone(); // include time needed to start opening in `start_next_file` self.file_stream_metrics.time_opening.stop(); @@ -338,13 +349,19 @@ impl FileStream { self.file_stream_metrics.time_scanning_total.start(); match next { - Ok(Some((next_future, next_partition_values))) => { + Ok(Some(( + next_future, + next_partition_values, + next_object_meta, + ))) => { self.state = FileStreamState::Scan { partition_values, + object_meta, reader, next: Some(( NextOpen::Pending(next_future), next_partition_values, + next_object_meta, )), }; } @@ -352,6 +369,7 @@ impl FileStream { self.state = FileStreamState::Scan { reader, partition_values, + object_meta, next: None, }; } @@ -379,9 +397,10 @@ impl FileStream { reader, partition_values, next, + object_meta, } => { // We need to poll the next `FileOpenFuture` here to drive it forward - if let Some((next_open_future, _)) = next { + if let Some((next_open_future, _, _)) = next { if let NextOpen::Pending(f) = next_open_future { if let Poll::Ready(reader) = f.as_mut().poll(cx) { *next_open_future = NextOpen::Ready(reader); @@ -393,8 +412,8 @@ impl FileStream { self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); let result = self - .pc_projector - .project(batch, partition_values) + .col_projector + .project(batch, partition_values, object_meta) .map_err(|e| ArrowError::ExternalError(e.into())) .map(|batch| match &mut self.remain { Some(remain) => { @@ -427,7 +446,7 @@ impl FileStream { match self.on_error { // If `OnError::Skip` we skip the file as soon as we hit the first error OnError::Skip => match mem::take(next) { - Some((future, partition_values)) => { + Some((future, partition_values, object_meta)) => { self.file_stream_metrics.time_opening.start(); match future { @@ -435,6 +454,7 @@ impl FileStream { self.state = FileStreamState::Open { future, partition_values, + object_meta, } } NextOpen::Ready(reader) => { @@ -443,6 +463,7 @@ impl FileStream { reader, )), partition_values, + object_meta, } } } @@ -460,7 +481,7 @@ impl FileStream { self.file_stream_metrics.time_scanning_total.stop(); match mem::take(next) { - Some((future, partition_values)) => { + Some((future, partition_values, object_meta)) => { self.file_stream_metrics.time_opening.start(); match future { @@ -468,6 +489,7 @@ impl FileStream { self.state = FileStreamState::Open { future, partition_values, + object_meta, } } NextOpen::Ready(reader) => { @@ -476,6 +498,7 @@ impl FileStream { reader, )), partition_values, + object_meta, } } } diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index 919054e8330f..8438a5472245 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -24,7 +24,7 @@ use std::ops::Range; use std::sync::Arc; use arrow::datatypes::DataType; -use datafusion::datasource::listing::ListingTableUrl; +use datafusion::datasource::listing::{ListingTableUrl, MetadataColumn}; use datafusion::datasource::physical_plan::ParquetExec; use datafusion::{ assert_batches_sorted_eq, @@ -72,6 +72,7 @@ async fn parquet_partition_pruning_filter() -> Result<()> { ("month", DataType::Int32), ("day", DataType::Int32), ], + &[], "mirror:///", "alltypes_plain.parquet", ) @@ -574,6 +575,134 @@ async fn parquet_overlapping_columns() -> Result<()> { Ok(()) } +#[tokio::test] +async fn test_metadata_columns() -> Result<()> { + let ctx = SessionContext::new(); + + let table = create_partitioned_alltypes_parquet_table( + &ctx, + &[ + "year=2021/month=09/day=09/file.parquet", + "year=2021/month=10/day=09/file.parquet", + "year=2021/month=10/day=28/file.parquet", + ], + &[ + ("year", DataType::Int32), + ("month", DataType::Int32), + ("day", DataType::Int32), + ], + &[ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ], + "mirror:///", + "alltypes_plain.parquet", + ) + .await; + + ctx.register_table("t", table).unwrap(); + + let result = ctx + .sql("SELECT id, size, location, last_modified FROM t WHERE size > 1500 ORDER BY id LIMIT 10") + .await? + .collect() + .await?; + + let expected = [ + "+----+------+----------------------------------------+----------------------+", + "| id | size | location | last_modified |", + "+----+------+----------------------------------------+----------------------+", + "| 0 | 1851 | year=2021/month=09/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 0 | 1851 | year=2021/month=10/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 0 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "| 1 | 1851 | year=2021/month=09/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 1 | 1851 | year=2021/month=10/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 1 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "| 2 | 1851 | year=2021/month=09/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 2 | 1851 | year=2021/month=10/day=09/file.parquet | 1970-01-01T00:00:00Z |", + "| 2 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "| 3 | 1851 | year=2021/month=10/day=28/file.parquet | 1970-01-01T00:00:00Z |", + "+----+------+----------------------------------------+----------------------+", + ]; + assert_batches_sorted_eq!(expected, &result); + + Ok(()) +} + +#[tokio::test] +async fn test_metadata_columns_pushdown() -> Result<()> { + let ctx = SessionContext::new(); + + let table = create_partitioned_alltypes_parquet_table( + &ctx, + &[ + "year=2021/month=09/day=09/file.parquet", + "year=2021/month=10/day=09/file.parquet", + "year=2021/month=10/day=28/file.parquet", + ], + &[ + ("year", DataType::Int32), + ("month", DataType::Int32), + ("day", DataType::Int32), + ], + &[ + MetadataColumn::Location, + MetadataColumn::Size, + MetadataColumn::LastModified, + ], + "mirror:///", + "alltypes_plain.parquet", + ) + .await; + + // The metadata filters can be resolved using only the metadata columns. + let filters = [ + Expr::eq( + col("location"), + lit("year=2021/month=09/day=09/file.parquet"), + ), + Expr::gt(col("size"), lit(400u64)), + Expr::gt_eq( + col("last_modified"), + lit(ScalarValue::TimestampMicrosecond( + Some(0), + Some("UTC".into()), + )), + ), + Expr::gt(col("id"), lit(1)), + ]; + let exec = table.scan(&ctx.state(), None, &filters, None).await?; + let parquet_exec = exec.as_any().downcast_ref::().unwrap(); + let pred = parquet_exec.predicate().unwrap(); + // Only the last filter should be pushdown to TableScan + let expected = Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()), + Operator::Gt, + Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), + )); + + assert!(pred.as_any().is::()); + let pred = pred.as_any().downcast_ref::().unwrap(); + + assert_eq!(pred, expected.as_any()); + + // Only the first file should be scanned + let scan_files = parquet_exec + .base_config() + .file_groups + .iter() + .flat_map(|x| x.iter().map(|y| y.path()).collect::>()) + .collect::>(); + assert_eq!(scan_files.len(), 1); + assert_eq!( + scan_files[0].to_string(), + "year=2021/month=09/day=09/file.parquet" + ); + + Ok(()) +} + fn register_partitioned_aggregate_csv( ctx: &SessionContext, store_paths: &[&str], @@ -618,6 +747,7 @@ async fn register_partitioned_alltypes_parquet( ctx, store_paths, partition_cols, + &[], table_path, source_file, ) @@ -630,6 +760,7 @@ async fn create_partitioned_alltypes_parquet_table( ctx: &SessionContext, store_paths: &[&str], partition_cols: &[(&str, DataType)], + metadata_cols: &[MetadataColumn], table_path: &str, source_file: &str, ) -> Arc { @@ -647,7 +778,8 @@ async fn create_partitioned_alltypes_parquet_table( .iter() .map(|x| (x.0.to_owned(), x.1.clone())) .collect::>(), - ); + ) + .with_metadata_cols(metadata_cols.to_vec()); let table_path = ListingTableUrl::parse(table_path).unwrap(); let store_path =