diff --git a/ballista/core/src/execution_plans/shuffle_reader.rs b/ballista/core/src/execution_plans/shuffle_reader.rs index 7a20f1215..981032a92 100644 --- a/ballista/core/src/execution_plans/shuffle_reader.rs +++ b/ballista/core/src/execution_plans/shuffle_reader.rs @@ -48,7 +48,7 @@ use crate::error::BallistaError; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use itertools::Itertools; -use log::{error, info}; +use log::{debug, error}; use rand::prelude::SliceRandom; use rand::thread_rng; use tokio::sync::{mpsc, Semaphore}; @@ -144,7 +144,7 @@ impl ExecutionPlan for ShuffleReaderExec { context: Arc, ) -> Result { let task_id = context.task_id().unwrap_or_else(|| partition.to_string()); - info!("ShuffleReaderExec::execute({})", task_id); + debug!("ShuffleReaderExec::execute({})", task_id); // TODO make the maximum size configurable, or make it depends on global memory control let max_request_num = 50usize; @@ -292,7 +292,7 @@ fn send_fetch_partitions( .into_iter() .partition(check_is_local_location); - info!( + debug!( "local shuffle file counts:{}, remote shuffle file count:{}.", local_locations.len(), remote_locations.len() diff --git a/ballista/core/src/execution_plans/shuffle_writer.rs b/ballista/core/src/execution_plans/shuffle_writer.rs index 23b437f68..b6f2dcbe5 100644 --- a/ballista/core/src/execution_plans/shuffle_writer.rs +++ b/ballista/core/src/execution_plans/shuffle_writer.rs @@ -25,6 +25,7 @@ use datafusion::arrow::ipc::CompressionType; use datafusion::arrow::ipc::writer::StreamWriter; use std::any::Any; +use std::fmt::Debug; use std::fs; use std::fs::File; use std::future::Future; @@ -50,8 +51,8 @@ use datafusion::physical_plan::metrics::{ }; use datafusion::physical_plan::{ - DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, - SendableRecordBatchStream, Statistics, + displayable, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, + PlanProperties, SendableRecordBatchStream, Statistics, }; use futures::{StreamExt, TryFutureExt, TryStreamExt}; @@ -80,9 +81,23 @@ pub struct ShuffleWriterExec { shuffle_output_partitioning: Option, /// Execution metrics metrics: ExecutionPlanMetricsSet, + /// Plan properties properties: PlanProperties, } +impl std::fmt::Display for ShuffleWriterExec { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + let printable_plan = displayable(self.plan.as_ref()) + .set_show_statistics(true) + .indent(false); + write!( + f, + "ShuffleWriterExec: job={} stage={} work_dir={} partitioning={:?} plan: \n {}", + self.job_id, self.stage_id, self.work_dir, self.shuffle_output_partitioning, printable_plan + ) + } +} + pub struct WriteTracker { pub num_batches: usize, pub num_rows: usize, diff --git a/ballista/core/src/extension.rs b/ballista/core/src/extension.rs index 182113b8f..c6200ab9a 100644 --- a/ballista/core/src/extension.rs +++ b/ballista/core/src/extension.rs @@ -286,6 +286,13 @@ impl SessionConfigHelperExt for SessionConfig { self.options() .entries() .iter() + // TODO: revisit this log once we this option is removed + // + // filtering this key as it's creating a lot of warning logs + // at the executor side. + .filter(|c| { + c.key != "datafusion.sql_parser.enable_options_value_normalization" + }) .map(|datafusion::config::ConfigEntry { key, value, .. }| { log::trace!("sending configuration key: `{}`, value`{:?}`", key, value); KeyValuePair { diff --git a/ballista/executor/src/execution_engine.rs b/ballista/executor/src/execution_engine.rs index 42084267c..87cfa417e 100644 --- a/ballista/executor/src/execution_engine.rs +++ b/ballista/executor/src/execution_engine.rs @@ -23,7 +23,7 @@ use datafusion::error::{DataFusionError, Result}; use datafusion::execution::context::TaskContext; use datafusion::physical_plan::metrics::MetricsSet; use datafusion::physical_plan::ExecutionPlan; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::sync::Arc; /// Execution engine extension point @@ -42,7 +42,7 @@ pub trait ExecutionEngine: Sync + Send { /// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query /// will use the ShuffleReaderExec to read these results. #[async_trait] -pub trait QueryStageExecutor: Sync + Send + Debug { +pub trait QueryStageExecutor: Sync + Send + Debug + Display { async fn execute_query_stage( &self, input_partition: usize, @@ -95,6 +95,12 @@ impl DefaultQueryStageExec { } } +impl Display for DefaultQueryStageExec { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DefaultQueryStageExec:\n{}", self.shuffle_writer) + } +} + #[async_trait] impl QueryStageExecutor for DefaultQueryStageExec { async fn execute_query_stage( diff --git a/ballista/executor/src/metrics/mod.rs b/ballista/executor/src/metrics/mod.rs index 9a0f58fa3..10fb0ef07 100644 --- a/ballista/executor/src/metrics/mod.rs +++ b/ballista/executor/src/metrics/mod.rs @@ -49,7 +49,7 @@ impl ExecutorMetricsCollector for LoggingMetricsCollector { plan: Arc, ) { info!( - "=== [{}/{}/{}] Physical plan with metrics ===\n{:?}\n", + "=== [{}/{}/{}] Physical plan with metrics ===\n{}\n", job_id, stage_id, partition, plan ); }