Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: clean up outdated dependencies & modules and fix tests #32

Merged
merged 8 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-node@v4
- run: npm install prettier prettier-plugin-toml
- run: npx prettier --check --no-config .
- run: npx prettier --write --no-config .
2 changes: 1 addition & 1 deletion datafusion-federation/src/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl<'a> Rewriter<'a> {
}
}

impl<'a> TreeNodeRewriter for Rewriter<'a> {
impl TreeNodeRewriter for Rewriter<'_> {
type Node = LogicalPlan;

fn f_down(&mut self, node: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
// under the License.

/// Copy of DataFusion's [`RequiredIndicies`](https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/optimize_projections/required_indices.rs) implementation.

use std::result;

use datafusion::{common::{tree_node::TreeNodeRecursion, Column, DFSchemaRef}, error::DataFusionError, logical_expr::LogicalPlan, prelude::Expr};

use super::outer_columns;
use datafusion::{
common::{tree_node::TreeNodeRecursion, Column, DFSchemaRef},
error::DataFusionError,
logical_expr::LogicalPlan,
prelude::Expr,
};

type Result<T, E = DataFusionError> = result::Result<T, E>;

Expand Down Expand Up @@ -91,11 +94,7 @@ impl RequiredIndicies {
}

/// Add required indices for all `exprs` used in plan
pub fn with_plan_exprs(
mut self,
plan: &LogicalPlan,
schema: &DFSchemaRef,
) -> Result<Self> {
pub fn with_plan_exprs(mut self, plan: &LogicalPlan, schema: &DFSchemaRef) -> Result<Self> {
// Add indices of the child fields referred to by the expressions in the
// parent
plan.apply_expressions(|e| {
Expand Down Expand Up @@ -168,8 +167,7 @@ impl RequiredIndicies {
where
F: Fn(usize) -> bool,
{
let (l, r): (Vec<usize>, Vec<usize>) =
self.indices.iter().partition(|&&idx| f(idx));
let (l, r): (Vec<usize>, Vec<usize>) = self.indices.iter().partition(|&&idx| f(idx));
let projection_beneficial = self.projection_beneficial;

(
Expand Down Expand Up @@ -226,4 +224,4 @@ impl RequiredIndicies {
self.indices.dedup();
self
}
}
}
26 changes: 20 additions & 6 deletions datafusion-federation/src/schema_cast/intervals_cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) fn cast_interval_monthdaynano_to_daytime(
let interval_monthdaynano_array = interval_monthdaynano_array
.as_any()
.downcast_ref::<IntervalMonthDayNanoArray>()
.ok_or_else(||
.ok_or_else(||
ArrowError::CastError("Failed to cast IntervalMonthDayNanoArray: Unable to downcast to IntervalMonthDayNanoArray".to_string()))?;

let mut interval_daytime_builder =
Expand Down Expand Up @@ -78,8 +78,10 @@ pub(crate) fn cast_interval_monthdaynano_to_daytime(
#[cfg(test)]
mod test {
use datafusion::arrow::{
array::{RecordBatch, IntervalDayTimeArray, IntervalYearMonthArray},
datatypes::{DataType, Field, Schema, SchemaRef, IntervalUnit, IntervalMonthDayNano, IntervalDayTime},
array::{IntervalDayTimeArray, IntervalYearMonthArray, RecordBatch},
datatypes::{
DataType, Field, IntervalDayTime, IntervalMonthDayNano, IntervalUnit, Schema, SchemaRef,
},
};

use crate::schema_cast::record_convert::try_cast_to;
Expand All @@ -88,9 +90,21 @@ mod test {

fn input_schema() -> SchemaRef {
Arc::new(Schema::new(vec![
Field::new("interval_daytime", DataType::Interval(IntervalUnit::MonthDayNano), false),
Field::new("interval_monthday_nano", DataType::Interval(IntervalUnit::MonthDayNano), false),
Field::new("interval_yearmonth", DataType::Interval(IntervalUnit::MonthDayNano), false),
Field::new(
"interval_daytime",
DataType::Interval(IntervalUnit::MonthDayNano),
false,
),
Field::new(
"interval_monthday_nano",
DataType::Interval(IntervalUnit::MonthDayNano),
false,
),
Field::new(
"interval_yearmonth",
DataType::Interval(IntervalUnit::MonthDayNano),
false,
),
]))
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion-federation/src/schema_cast/record_convert.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use datafusion::arrow::{
array::{Array, RecordBatch},
compute::cast,
datatypes::{DataType, IntervalUnit, SchemaRef}
datatypes::{DataType, IntervalUnit, SchemaRef},
};
use std::sync::Arc;

use super::{
intervals_cast::{
cast_interval_monthdaynano_to_daytime, cast_interval_monthdaynano_to_yearmonth,
},
lists_cast::{cast_string_to_fixed_size_list, cast_string_to_large_list, cast_string_to_list}, struct_cast::cast_string_to_struct,
lists_cast::{cast_string_to_fixed_size_list, cast_string_to_large_list, cast_string_to_list},
struct_cast::cast_string_to_struct,
};

pub type Result<T, E = Error> = std::result::Result<T, E>;
Expand Down
2 changes: 1 addition & 1 deletion datafusion-federation/src/schema_cast/struct_cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) fn cast_string_to_struct(
}
};
// struct_field is single struct column
return Ok(Arc::clone(record.column(0)));
Ok(Arc::clone(record.column(0)))
}

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ license.workspace = true
readme.workspace = true

[dev-dependencies]
arrow-flight = { version = "52.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { version = "53.0.0", features = ["flight-sql-experimental"] }
tokio = "1.35.1"
async-trait.workspace = true
datafusion.workspace = true
datafusion-federation.path = "../datafusion-federation"
datafusion-federation-sql.path = "../sources/sql"
datafusion-federation-sql = { path = "../sources/sql", features = ["connectorx"] }
datafusion-federation-flight-sql.path = "../sources/flight-sql"
connectorx = { git = "https://github.com/devinjdangelo/connector-x.git", features = [
"dst_arrow",
"src_sqlite"
] }
tonic = "0.11.0"
tonic = "0.12.2"

[dependencies]
async-std = "1.12.0"
Expand Down
17 changes: 12 additions & 5 deletions examples/examples/flight-sql.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use std::{sync::Arc, time::Duration};

use arrow_flight::sql::client::FlightSqlServiceClient;
use datafusion::execution::SessionStateBuilder;
use datafusion::{
catalog::schema::SchemaProvider,
catalog::SchemaProvider,
error::{DataFusionError, Result},
execution::{
context::{SessionContext, SessionState},
Expand Down Expand Up @@ -39,14 +40,20 @@ async fn main() -> Result<()> {
sleep(Duration::from_secs(3)).await;

// Local context
let state = SessionContext::new().state();
// let state = SessionContext::new().state();
let known_tables: Vec<String> = ["test"].iter().map(|&x| x.into()).collect();

// Register FederationAnalyzer
// TODO: Interaction with other analyzers & optimizers.
let state = state
.add_analyzer_rule(Arc::new(FederationAnalyzerRule::new()))
.with_query_planner(Arc::new(FederatedQueryPlanner::new()));
// let mut state = state
// .add_analyzer_rule(Arc::new(FederationAnalyzerRule::new()))
// .with_query_planner(Arc::new(FederatedQueryPlanner::new()))
// .clone();

let mut state = SessionStateBuilder::new()
.with_query_planner(Arc::new(FederatedQueryPlanner::new()))
.build();
state.add_analyzer_rule(Arc::new(FederationAnalyzerRule::new()));

// Register schema
// TODO: table inference
Expand Down
70 changes: 0 additions & 70 deletions examples/examples/postgres-partial.rs

This file was deleted.

107 changes: 0 additions & 107 deletions examples/examples/sqlite-partial.rs

This file was deleted.

Loading
Loading