Skip to content

Commit

Permalink
Merge branch 'main' into feature/remove_default_cache
Browse files Browse the repository at this point in the history
# Conflicts:
#	datafusion/physical-plan/src/repartition/mod.rs
  • Loading branch information
mustafasrepo committed Feb 27, 2024
2 parents eced5bc + 38db3d8 commit 39e402a
Show file tree
Hide file tree
Showing 88 changed files with 3,040 additions and 996 deletions.
64 changes: 0 additions & 64 deletions .github/workflows/docs.yaml

This file was deleted.

1 change: 1 addition & 0 deletions ci/scripts/rust_example.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
set -ex
cd datafusion-examples/examples/
cargo fmt --all -- --check
cargo check --examples

files=$(ls .)
for filename in $files
Expand Down
3 changes: 1 addition & 2 deletions ci/scripts/rust_toml_fmt.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,4 @@
# without overwritng the file. If any error occur, you may want to
# rerun `taplo format` to fix the formatting automatically.
set -ex
taplo format
done
taplo format --check
4 changes: 4 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
disallowed-methods = [
{ path = "tokio::task::spawn", reason = "To provide cancel-safety, use `SpawnedTask::spawn` instead (https://github.com/apache/arrow-datafusion/issues/6513)" },
{ path = "tokio::task::spawn_blocking", reason = "To provide cancel-safety, use `SpawnedTask::spawn` instead (https://github.com/apache/arrow-datafusion/issues/6513)" },
]
43 changes: 24 additions & 19 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::object_storage::get_object_store;
use async_trait::async_trait;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::common::{plan_datafusion_err, DataFusionError};
use datafusion::datasource::listing::{
ListingTable, ListingTableConfig, ListingTableUrl,
};
Expand Down Expand Up @@ -145,16 +146,21 @@ impl SchemaProvider for DynamicFileSchemaProvider {
self.inner.register_table(name, table)
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let inner_table = self.inner.table(name).await;
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let inner_table = self.inner.table(name).await?;
if inner_table.is_some() {
return inner_table;
return Ok(inner_table);
}

// if the inner schema provider didn't have a table by
// that name, try to treat it as a listing table
let state = self.state.upgrade()?.read().clone();
let table_url = ListingTableUrl::parse(name).ok()?;
let state = self
.state
.upgrade()
.ok_or_else(|| plan_datafusion_err!("locking error"))?
.read()
.clone();
let table_url = ListingTableUrl::parse(name)?;
let url: &Url = table_url.as_ref();

// If the store is already registered for this URL then `get_store`
Expand All @@ -169,18 +175,20 @@ impl SchemaProvider for DynamicFileSchemaProvider {
let mut options = HashMap::new();
let store =
get_object_store(&state, &mut options, table_url.scheme(), url)
.await
.unwrap();
.await?;
state.runtime_env().register_object_store(url, store);
}
}

let config = ListingTableConfig::new(table_url)
.infer(&state)
.await
.ok()?;
let config = match ListingTableConfig::new(table_url).infer(&state).await {
Ok(cfg) => cfg,
Err(_) => {
// treat as non-existing
return Ok(None);
}
};

Some(Arc::new(ListingTable::try_new(config).ok()?))
Ok(Some(Arc::new(ListingTable::try_new(config)?)))
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
Expand Down Expand Up @@ -227,7 +235,7 @@ mod tests {
let (ctx, schema) = setup_context();

// That's a non registered table so expecting None here
let table = schema.table(&location).await;
let table = schema.table(&location).await.unwrap();
assert!(table.is_none());

// It should still create an object store for the location in the SessionState
Expand All @@ -251,7 +259,7 @@ mod tests {

let (ctx, schema) = setup_context();

let table = schema.table(&location).await;
let table = schema.table(&location).await.unwrap();
assert!(table.is_none());

let store = ctx
Expand All @@ -273,7 +281,7 @@ mod tests {

let (ctx, schema) = setup_context();

let table = schema.table(&location).await;
let table = schema.table(&location).await.unwrap();
assert!(table.is_none());

let store = ctx
Expand All @@ -289,13 +297,10 @@ mod tests {
}

#[tokio::test]
#[should_panic]
async fn query_invalid_location_test() {
let location = "ts://file.parquet";
let (_ctx, schema) = setup_context();

// This will panic, we cannot prevent that because `schema.table`
// returns an Option
schema.table(location).await;
assert!(schema.table(location).await.is_err());
}
}
27 changes: 26 additions & 1 deletion datafusion-examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,30 @@ license = { workspace = true }
authors = { workspace = true }
rust-version = { workspace = true }

[[example]]
name = "flight_sql_server"
path = "examples/flight/flight_sql_server.rs"

[[example]]
name = "flight_server"
path = "examples/flight/flight_server.rs"

[[example]]
name = "flight_client"
path = "examples/flight/flight_client.rs"

[[example]]
name = "catalog"
path = "examples/external_dependency/catalog.rs"

[[example]]
name = "dataframe_to_s3"
path = "examples/external_dependency/dataframe-to-s3.rs"

[[example]]
name = "query_aws_s3"
path = "examples/external_dependency/query-aws-s3.rs"

[dev-dependencies]
arrow = { workspace = true }
arrow-flight = { workspace = true }
Expand All @@ -54,6 +78,7 @@ serde = { version = "1.0.136", features = ["derive"] }
serde_json = { workspace = true }
tempfile = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot"] }
tonic = "0.11"
# 0.10 and 0.11 are incompatible. Need to upgrade tonic to 0.11 when upgrading to arrow 51
tonic = "0.10"
url = { workspace = true }
uuid = "1.2"
8 changes: 4 additions & 4 deletions datafusion-examples/examples/external_dependency/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use datafusion::{
arrow::util::pretty,
catalog::{
schema::SchemaProvider,
{CatalogProviderList, CatalogProvider},
{CatalogProvider, CatalogProviderList},
},
datasource::{
file_format::{csv::CsvFormat, parquet::ParquetFormat, FileFormat},
Expand Down Expand Up @@ -53,7 +53,7 @@ async fn main() -> Result<()> {
.unwrap();
let mut ctx = SessionContext::new();
let state = ctx.state();
let catlist = Arc::new(CustomCatalogProvderList::new());
let catlist = Arc::new(CustomCatalogProviderList::new());
// use our custom catalog list for context. each context has a single catalog list.
// context will by default have [`MemoryCatalogProviderList`]
ctx.register_catalog_list(catlist.clone());
Expand Down Expand Up @@ -180,9 +180,9 @@ impl SchemaProvider for DirSchema {
tables.keys().cloned().collect::<Vec<_>>()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let tables = self.tables.read().unwrap();
tables.get(name).cloned()
Ok(tables.get(name).cloned())
}

fn table_exist(&self, name: &str) -> bool {
Expand Down
4 changes: 2 additions & 2 deletions datafusion-examples/examples/flight/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
info!("getting results for {handle}");
let result = self.get_result(&handle)?;
// if we get an empty result, create an empty schema
let (schema, batches) = match result.get(0) {
let (schema, batches) = match result.first() {
None => (Arc::new(Schema::empty()), vec![]),
Some(batch) => (batch.schema(), result.clone()),
};
Expand Down Expand Up @@ -287,7 +287,7 @@ impl FlightSqlService for FlightSqlServiceImpl {
.map_err(|e| status!("Error executing query", e))?;

// if we get an empty result, create an empty schema
let schema = match result.get(0) {
let schema = match result.first() {
None => Schema::empty(),
Some(batch) => (*batch.schema()).clone(),
};
Expand Down
66 changes: 56 additions & 10 deletions datafusion-examples/examples/parquet_sql_multiple_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,35 @@

use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::ListingOptions;
use datafusion::error::Result;
use datafusion::prelude::*;
use datafusion_common::{FileType, GetExt};
use object_store::local::LocalFileSystem;
use std::path::Path;
use std::sync::Arc;

/// This example demonstrates executing a simple query against an Arrow data source (a directory
/// with multiple Parquet files) and fetching results
/// with multiple Parquet files) and fetching results. The query is run twice, once showing
/// how to used `register_listing_table` with an absolute path, and once registering an
/// ObjectStore to use a relative path.
#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// create local execution context
let ctx = SessionContext::new();

let testdata = datafusion::test_util::parquet_test_data();
let test_data = datafusion::test_util::parquet_test_data();

// Configure listing options
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext());
// This is a workaround for this example since `test_data` contains
// many different parquet different files,
// in practice use FileType::PARQUET.get_ext().
.with_file_extension("alltypes_plain.parquet");

// Register a listing table - this will use all files in the directory as data sources
// for the query
// First example were we use an absolute path, which requires no additional setup.
ctx.register_listing_table(
"my_table",
&format!("file://{testdata}/alltypes_plain.parquet"),
listing_options,
&format!("file://{test_data}/"),
listing_options.clone(),
None,
None,
)
Expand All @@ -60,5 +64,47 @@ async fn main() -> Result<()> {
// print the results
df.show().await?;

// Second example were we temporarily move into the test data's parent directory and
// simulate a relative path, this requires registering an ObjectStore.
let cur_dir = std::env::current_dir()?;

let test_data_path = Path::new(&test_data);
let test_data_path_parent = test_data_path
.parent()
.ok_or("test_data path needs a parent")?;

std::env::set_current_dir(test_data_path_parent)?;

let local_fs = Arc::new(LocalFileSystem::default());

let u = url::Url::parse("file://./")?;
ctx.runtime_env().register_object_store(&u, local_fs);

// Register a listing table - this will use all files in the directory as data sources
// for the query
ctx.register_listing_table(
"relative_table",
"./data",
listing_options.clone(),
None,
None,
)
.await?;

// execute the query
let df = ctx
.sql(
"SELECT * \
FROM relative_table \
LIMIT 1",
)
.await?;

// print the results
df.show().await?;

// Reset the current directory
std::env::set_current_dir(cur_dir)?;

Ok(())
}
5 changes: 3 additions & 2 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,13 @@ tokio = { workspace = true }
tokio-util = { version = "0.7.4", features = ["io"], optional = true }
url = { workspace = true }
uuid = { version = "1.0", features = ["v4"] }
xz2 = { version = "0.1", optional = true }
xz2 = { version = "0.1", optional = true, features = ["static"] }
zstd = { version = "0.13", optional = true, default-features = false }

[dev-dependencies]
async-trait = { workspace = true }
bigdecimal = { workspace = true }
cargo = "0.77.0"
criterion = { version = "0.5", features = ["async_tokio"] }
csv = "1.1.6"
ctor = { workspace = true }
Expand All @@ -141,7 +142,7 @@ thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "parking_lot", "fs"] }
tokio-postgres = "0.7.7"
[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
nix = { version = "0.27.1", features = ["fs"] }
nix = { version = "0.28.0", features = ["fs"] }

[[bench]]
harness = false
Expand Down
Loading

0 comments on commit 39e402a

Please sign in to comment.