Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for metadata columns (location, size, last_modified) in ListingTableProvider #74

Open
wants to merge 8 commits into
base: spiceai-43
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 49 additions & 24 deletions datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use datafusion_common::{Result, ScalarValue};
use datafusion_expr::{BinaryExpr, Operator};

use arrow::{
array::{Array, ArrayRef, AsArray, StringBuilder},
array::{Array, ArrayRef, AsArray, BooleanArray, StringBuilder},
buffer::BooleanBuffer,
compute::{and, cast, prep_null_mask_filter},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
Expand Down Expand Up @@ -272,26 +273,59 @@ async fn prune_partitions(
.collect();
let schema = Arc::new(Schema::new(fields));

let df_schema = DFSchema::from_unqualified_fields(
partition_cols
.iter()
.map(|(n, d)| Field::new(n, d.clone(), true))
.collect(),
Default::default(),
)?;

let batch = RecordBatch::try_new(schema, arrays)?;

// TODO: Plumb this down
let props = ExecutionProps::new();

// Don't retain partitions that evaluated to null
let prepared = apply_filters(&batch, filters, &props)?;

// If all rows are retained, return all partitions
if prepared.true_count() == prepared.len() {
return Ok(partitions);
}

// Sanity check
assert_eq!(prepared.len(), partitions.len());

let filtered = partitions
.into_iter()
.zip(prepared.values())
.filter_map(|(p, f)| f.then_some(p))
.collect();

Ok(filtered)
}

/// Applies the given filters to the input batch and returns a boolean mask that represents
/// the result of the filters applied to each row.
pub(crate) fn apply_filters(
batch: &RecordBatch,
filters: &[Expr],
props: &ExecutionProps,
) -> Result<BooleanArray> {
if filters.is_empty() {
return Ok(BooleanArray::new(
BooleanBuffer::new_set(batch.num_rows()),
None,
));
}

let num_rows = batch.num_rows();

let df_schema = DFSchema::from_unqualified_fields(
batch.schema().fields().clone(),
HashMap::default(),
)?;

// Applies `filter` to `batch` returning `None` on error
let do_filter = |filter| -> Result<ArrayRef> {
let expr = create_physical_expr(filter, &df_schema, &props)?;
expr.evaluate(&batch)?.into_array(partitions.len())
let expr = create_physical_expr(filter, &df_schema, props)?;
expr.evaluate(batch)?.into_array(num_rows)
};

//.Compute the conjunction of the filters
// Compute the conjunction of the filters
let mask = filters
.iter()
.map(|f| do_filter(f).map(|a| a.as_boolean().clone()))
Expand All @@ -300,25 +334,16 @@ async fn prune_partitions(
let mask = match mask {
Some(Ok(mask)) => mask,
Some(Err(err)) => return Err(err),
None => return Ok(partitions),
None => return Ok(BooleanArray::new(BooleanBuffer::new_set(num_rows), None)),
};

// Don't retain partitions that evaluated to null
// Don't retain rows that evaluated to null
let prepared = match mask.null_count() {
0 => mask,
_ => prep_null_mask_filter(&mask),
};

// Sanity check
assert_eq!(prepared.len(), partitions.len());

let filtered = partitions
.into_iter()
.zip(prepared.values())
.filter_map(|(p, f)| f.then_some(p))
.collect();

Ok(filtered)
Ok(prepared)
}

#[derive(Debug)]
Expand Down
202 changes: 202 additions & 0 deletions datafusion/core/src/datasource/listing/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Functions that support extracting metadata from files.

use std::fmt;
use std::str::FromStr;
use std::sync::Arc;

use super::PartitionedFile;
use crate::datasource::listing::helpers::apply_filters;
use datafusion_common::plan_err;
use datafusion_common::Result;

use arrow::{
array::{Array, StringBuilder, TimestampMicrosecondBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema, TimeUnit},
record_batch::RecordBatch,
};
use arrow_schema::Fields;
use datafusion_common::ScalarValue;
use datafusion_expr::execution_props::ExecutionProps;

use datafusion_common::DataFusionError;
use datafusion_expr::Expr;
use object_store::ObjectMeta;

/// A metadata column that can be used to filter files
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum MetadataColumn {
/// The location of the file in object store
Location,
/// The last modified timestamp of the file
LastModified,
/// The size of the file in bytes
Size,
}

impl fmt::Display for MetadataColumn {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name())
}
}

impl MetadataColumn {
/// The name of the metadata column (one of `location`, `last_modified`, or `size`)
pub fn name(&self) -> &str {
match self {
MetadataColumn::Location => "location",
MetadataColumn::LastModified => "last_modified",
MetadataColumn::Size => "size",
}
}

/// Returns the arrow type of this metadata column
pub fn arrow_type(&self) -> DataType {
match self {
MetadataColumn::Location => DataType::Utf8,
MetadataColumn::LastModified => {
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
}
MetadataColumn::Size => DataType::UInt64,
}
}

/// Returns the arrow field for this metadata column
pub fn field(&self) -> Field {
Field::new(self.to_string(), self.arrow_type(), true)
}

/// Returns the scalar value for this metadata column given an object meta
pub fn to_scalar_value(&self, meta: &ObjectMeta) -> ScalarValue {
match self {
MetadataColumn::Location => {
ScalarValue::Utf8(Some(meta.location.to_string()))
}
MetadataColumn::LastModified => ScalarValue::TimestampMicrosecond(
Some(meta.last_modified.timestamp_micros()),
Some("UTC".into()),
),
MetadataColumn::Size => ScalarValue::UInt64(Some(meta.size as u64)),
}
}

pub(crate) fn builder(&self, capacity: usize) -> MetadataBuilder {
match self {
MetadataColumn::Location => MetadataBuilder::Location(
StringBuilder::with_capacity(capacity, capacity * 10),
),
MetadataColumn::LastModified => MetadataBuilder::LastModified(
TimestampMicrosecondBuilder::with_capacity(capacity).with_data_type(
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
),
),
MetadataColumn::Size => {
MetadataBuilder::Size(UInt64Builder::with_capacity(capacity))
}
}
}
}

impl FromStr for MetadataColumn {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"location" => Ok(MetadataColumn::Location),
"last_modified" => Ok(MetadataColumn::LastModified),
"size" => Ok(MetadataColumn::Size),
_ => plan_err!(
"Invalid metadata column: {}, expected: location, last_modified, or size",
s
),
}
}
}

pub(crate) enum MetadataBuilder {
Location(StringBuilder),
LastModified(TimestampMicrosecondBuilder),
Size(UInt64Builder),
}

impl MetadataBuilder {
pub fn append(&mut self, meta: &ObjectMeta) {
match self {
Self::Location(builder) => builder.append_value(&meta.location),
Self::LastModified(builder) => {
builder.append_value(meta.last_modified.timestamp_micros())
}
Self::Size(builder) => builder.append_value(meta.size as u64),
}
}

pub fn finish(self) -> Arc<dyn Array> {
match self {
MetadataBuilder::Location(mut builder) => Arc::new(builder.finish()),
MetadataBuilder::LastModified(mut builder) => Arc::new(builder.finish()),
MetadataBuilder::Size(mut builder) => Arc::new(builder.finish()),
}
}
}

/// Determine if the given file matches the input metadata filters.
/// `filters` should only contain expressions that can be evaluated
/// using only the metadata columns.
pub(crate) fn apply_metadata_filters(
file: PartitionedFile,
filters: &[Expr],
metadata_cols: &[MetadataColumn],
) -> Result<Option<PartitionedFile>> {
// if no metadata col => simply return all the files
if metadata_cols.is_empty() {
return Ok(Some(file));
}

let mut builders: Vec<_> = metadata_cols.iter().map(|col| col.builder(1)).collect();

for builder in builders.iter_mut() {
builder.append(&file.object_meta);
}

let arrays = builders
.into_iter()
.map(|builder| builder.finish())
.collect::<Vec<_>>();

let fields: Fields = metadata_cols
.iter()
.map(|col| Field::new(col.to_string(), col.arrow_type(), true))
.collect();
let schema = Arc::new(Schema::new(fields));

let batch = RecordBatch::try_new(schema, arrays)?;

// TODO: Plumb this down
let props = ExecutionProps::new();

// Don't retain rows that evaluated to null
let prepared = apply_filters(&batch, filters, &props)?;

// If the filter evaluates to true, return the file
if prepared.true_count() == 1 {
return Ok(Some(file));
}

Ok(None)
}
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/listing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! to get the list of files to process.
mod helpers;
mod metadata;
mod table;
mod url;

Expand All @@ -31,6 +32,7 @@ use std::pin::Pin;
use std::sync::Arc;

pub use self::url::ListingTableUrl;
pub use metadata::MetadataColumn;
pub use table::{ListingOptions, ListingTable, ListingTableConfig};

/// Stream of files get listed from object store
Expand Down
Loading