diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 683cb809a5b1..6b4046dede87 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -156,7 +156,8 @@ impl Default for DataFrameWriteOptions { /// ``` #[derive(Debug, Clone)] pub struct DataFrame { - session_state: SessionState, + // Box the (large) SessionState to reduce the size of DataFrame on the stack + session_state: Box, plan: LogicalPlan, } @@ -168,7 +169,7 @@ impl DataFrame { /// `DataFrame` from an existing datasource. pub fn new(session_state: SessionState, plan: LogicalPlan) -> Self { Self { - session_state, + session_state: Box::new(session_state), plan, } } @@ -234,7 +235,10 @@ impl DataFrame { }; let project_plan = LogicalPlanBuilder::from(plan).project(expr_list)?.build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Expand each list element of a column to multiple rows. @@ -273,7 +277,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .unnest_column_with_options(column, options)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a DataFrame with only rows for which `predicate` evaluates to @@ -298,7 +305,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .filter(predicate)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` that aggregates the rows of the current @@ -329,7 +339,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .aggregate(group_expr, aggr_expr)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new DataFrame that adds the result of evaluating one or more @@ -338,7 +351,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .window(window_exprs)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Returns a new `DataFrame` with a limited number of rows. @@ -363,7 +379,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .limit(skip, fetch)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the union of two [`DataFrame`]s, preserving duplicate rows. @@ -387,7 +406,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .union(dataframe.plan)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the distinct union of two [`DataFrame`]s. @@ -409,12 +431,13 @@ impl DataFrame { /// # } /// ``` pub fn union_distinct(self, dataframe: DataFrame) -> Result { - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::from(self.plan) - .union_distinct(dataframe.plan)? - .build()?, - )) + let plan = LogicalPlanBuilder::from(self.plan) + .union_distinct(dataframe.plan)? + .build()?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` with all duplicated rows removed. @@ -432,10 +455,11 @@ impl DataFrame { /// # } /// ``` pub fn distinct(self) -> Result { - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::from(self.plan).distinct()?.build()?, - )) + let plan = LogicalPlanBuilder::from(self.plan).distinct()?.build()?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a new `DataFrame` that has statistics for a DataFrame. @@ -603,15 +627,18 @@ impl DataFrame { describe_record_batch.schema(), vec![vec![describe_record_batch]], )?; - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::scan( - UNNAMED_TABLE, - provider_as_source(Arc::new(provider)), - None, - )? - .build()?, - )) + + let plan = LogicalPlanBuilder::scan( + UNNAMED_TABLE, + provider_as_source(Arc::new(provider)), + None, + )? + .build()?; + + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Sort the DataFrame by the specified sorting expressions. @@ -637,7 +664,10 @@ impl DataFrame { /// ``` pub fn sort(self, expr: Vec) -> Result { let plan = LogicalPlanBuilder::from(self.plan).sort(expr)?.build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Join this `DataFrame` with another `DataFrame` using explicitly specified @@ -691,7 +721,10 @@ impl DataFrame { filter, )? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Join this `DataFrame` with another `DataFrame` using the specified @@ -741,7 +774,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .join_on(right.plan, join_type, expr)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Repartition a DataFrame based on a logical partitioning scheme. @@ -762,7 +798,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .repartition(partitioning_scheme)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return the total number of rows in this `DataFrame`. @@ -867,7 +906,7 @@ impl DataFrame { /// Return a new [`TaskContext`] which would be used to execute this DataFrame pub fn task_ctx(&self) -> TaskContext { - TaskContext::from(&self.session_state) + TaskContext::from(self.session_state.as_ref()) } /// Executes this DataFrame and returns a stream over a single partition @@ -973,7 +1012,7 @@ impl DataFrame { /// Returns both the [`LogicalPlan`] and [`SessionState`] that comprise this [`DataFrame`] pub fn into_parts(self) -> (SessionState, LogicalPlan) { - (self.session_state, self.plan) + (*self.session_state, self.plan) } /// Return the [`LogicalPlan`] represented by this DataFrame without running @@ -1027,7 +1066,10 @@ impl DataFrame { let plan = LogicalPlanBuilder::from(self.plan) .explain(verbose, analyze)? .build()?; - Ok(DataFrame::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Return a `FunctionRegistry` used to plan udf's calls @@ -1046,7 +1088,7 @@ impl DataFrame { /// # } /// ``` pub fn registry(&self) -> &dyn FunctionRegistry { - &self.session_state + self.session_state.as_ref() } /// Calculate the intersection of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema @@ -1066,10 +1108,11 @@ impl DataFrame { pub fn intersect(self, dataframe: DataFrame) -> Result { let left_plan = self.plan; let right_plan = dataframe.plan; - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::intersect(left_plan, right_plan, true)?, - )) + let plan = LogicalPlanBuilder::intersect(left_plan, right_plan, true)?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Calculate the exception of two [`DataFrame`]s. The two [`DataFrame`]s must have exactly the same schema @@ -1089,11 +1132,11 @@ impl DataFrame { pub fn except(self, dataframe: DataFrame) -> Result { let left_plan = self.plan; let right_plan = dataframe.plan; - - Ok(DataFrame::new( - self.session_state, - LogicalPlanBuilder::except(left_plan, right_plan, true)?, - )) + let plan = LogicalPlanBuilder::except(left_plan, right_plan, true)?; + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Execute this `DataFrame` and write the results to `table_name`. @@ -1118,7 +1161,13 @@ impl DataFrame { write_options.overwrite, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Execute the `DataFrame` and write the results to CSV file(s). @@ -1166,7 +1215,13 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Execute the `DataFrame` and write the results to JSON file(s). @@ -1215,7 +1270,13 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } /// Add an additional column to the DataFrame. @@ -1261,7 +1322,10 @@ impl DataFrame { let project_plan = LogicalPlanBuilder::from(plan).project(fields)?.build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Rename one column by applying a new projection. This is a no-op if the column to be @@ -1326,7 +1390,10 @@ impl DataFrame { let project_plan = LogicalPlanBuilder::from(self.plan) .project(projection)? .build()?; - Ok(DataFrame::new(self.session_state, project_plan)) + Ok(DataFrame { + session_state: self.session_state, + plan: project_plan, + }) } /// Replace all parameters in logical plan with the specified @@ -1388,7 +1455,10 @@ impl DataFrame { /// ``` pub fn with_param_values(self, query_values: impl Into) -> Result { let plan = self.plan.with_param_values(query_values)?; - Ok(Self::new(self.session_state, plan)) + Ok(DataFrame { + session_state: self.session_state, + plan, + }) } /// Cache DataFrame as a memory table. @@ -1405,7 +1475,7 @@ impl DataFrame { /// # } /// ``` pub async fn cache(self) -> Result { - let context = SessionContext::new_with_state(self.session_state.clone()); + let context = SessionContext::new_with_state((*self.session_state).clone()); // The schema is consistent with the output let plan = self.clone().create_physical_plan().await?; let schema = plan.schema(); diff --git a/datafusion/core/src/dataframe/parquet.rs b/datafusion/core/src/dataframe/parquet.rs index 7cc3201bf7e4..0ec46df0ae5d 100644 --- a/datafusion/core/src/dataframe/parquet.rs +++ b/datafusion/core/src/dataframe/parquet.rs @@ -68,7 +68,12 @@ impl DataFrame { options.partition_by, )? .build()?; - DataFrame::new(self.session_state, plan).collect().await + DataFrame { + session_state: self.session_state, + plan, + } + .collect() + .await } } diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 9b5a5fef8cb9..215bdbc3d579 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -471,24 +471,37 @@ impl SessionContext { /// [`SQLOptions::verify_plan`]. pub async fn execute_logical_plan(&self, plan: LogicalPlan) -> Result { match plan { - LogicalPlan::Ddl(ddl) => match ddl { - DdlStatement::CreateExternalTable(cmd) => { - self.create_external_table(&cmd).await - } - DdlStatement::CreateMemoryTable(cmd) => { - self.create_memory_table(cmd).await - } - DdlStatement::CreateView(cmd) => self.create_view(cmd).await, - DdlStatement::CreateCatalogSchema(cmd) => { - self.create_catalog_schema(cmd).await + LogicalPlan::Ddl(ddl) => { + // Box::pin avoids allocating the stack space within this function's frame + // for every one of these individual async functions, decreasing the risk of + // stack overflows. + match ddl { + DdlStatement::CreateExternalTable(cmd) => { + Box::pin(async move { self.create_external_table(&cmd).await }) + as std::pin::Pin + Send>> + } + DdlStatement::CreateMemoryTable(cmd) => { + Box::pin(self.create_memory_table(cmd)) + } + DdlStatement::CreateView(cmd) => Box::pin(self.create_view(cmd)), + DdlStatement::CreateCatalogSchema(cmd) => { + Box::pin(self.create_catalog_schema(cmd)) + } + DdlStatement::CreateCatalog(cmd) => { + Box::pin(self.create_catalog(cmd)) + } + DdlStatement::DropTable(cmd) => Box::pin(self.drop_table(cmd)), + DdlStatement::DropView(cmd) => Box::pin(self.drop_view(cmd)), + DdlStatement::DropCatalogSchema(cmd) => { + Box::pin(self.drop_schema(cmd)) + } + DdlStatement::CreateFunction(cmd) => { + Box::pin(self.create_function(cmd)) + } + DdlStatement::DropFunction(cmd) => Box::pin(self.drop_function(cmd)), } - DdlStatement::CreateCatalog(cmd) => self.create_catalog(cmd).await, - DdlStatement::DropTable(cmd) => self.drop_table(cmd).await, - DdlStatement::DropView(cmd) => self.drop_view(cmd).await, - DdlStatement::DropCatalogSchema(cmd) => self.drop_schema(cmd).await, - DdlStatement::CreateFunction(cmd) => self.create_function(cmd).await, - DdlStatement::DropFunction(cmd) => self.drop_function(cmd).await, - }, + .await + } // TODO what about the other statements (like TransactionStart and TransactionEnd) LogicalPlan::Statement(Statement::SetVariable(stmt)) => { self.set_variable(stmt).await