Skip to content

Commit

Permalink
feat: improve executor loggers
Browse files Browse the repository at this point in the history
  • Loading branch information
milenkovicm committed Feb 19, 2025
1 parent faa05af commit 3735991
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 8 deletions.
6 changes: 3 additions & 3 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::error::BallistaError;
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use itertools::Itertools;
use log::{error, info};
use log::{debug, error};
use rand::prelude::SliceRandom;
use rand::thread_rng;
use tokio::sync::{mpsc, Semaphore};
Expand Down Expand Up @@ -144,7 +144,7 @@ impl ExecutionPlan for ShuffleReaderExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let task_id = context.task_id().unwrap_or_else(|| partition.to_string());
info!("ShuffleReaderExec::execute({})", task_id);
debug!("ShuffleReaderExec::execute({})", task_id);

// TODO make the maximum size configurable, or make it depends on global memory control
let max_request_num = 50usize;
Expand Down Expand Up @@ -292,7 +292,7 @@ fn send_fetch_partitions(
.into_iter()
.partition(check_is_local_location);

info!(
debug!(
"local shuffle file counts:{}, remote shuffle file count:{}.",
local_locations.len(),
remote_locations.len()
Expand Down
19 changes: 17 additions & 2 deletions ballista/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use datafusion::arrow::ipc::CompressionType;

use datafusion::arrow::ipc::writer::StreamWriter;
use std::any::Any;
use std::fmt::Debug;
use std::fs;
use std::fs::File;
use std::future::Future;
Expand All @@ -50,8 +51,8 @@ use datafusion::physical_plan::metrics::{
};

use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
SendableRecordBatchStream, Statistics,
displayable, DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties, SendableRecordBatchStream, Statistics,
};
use futures::{StreamExt, TryFutureExt, TryStreamExt};

Expand Down Expand Up @@ -80,9 +81,23 @@ pub struct ShuffleWriterExec {
shuffle_output_partitioning: Option<Partitioning>,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Plan properties
properties: PlanProperties,
}

impl std::fmt::Display for ShuffleWriterExec {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let printable_plan = displayable(self.plan.as_ref())
.set_show_statistics(true)
.indent(false);
write!(
f,
"ShuffleWriterExec: job={} stage={} work_dir={} partitioning={:?} plan: \n {}",
self.job_id, self.stage_id, self.work_dir, self.shuffle_output_partitioning, printable_plan
)
}
}

pub struct WriteTracker {
pub num_batches: usize,
pub num_rows: usize,
Expand Down
7 changes: 7 additions & 0 deletions ballista/core/src/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,13 @@ impl SessionConfigHelperExt for SessionConfig {
self.options()
.entries()
.iter()
// TODO: revisit this log once we this option is removed
//
// filtering this key as it's creating a lot of warning logs
// at the executor side.
.filter(|c| {
c.key != "datafusion.sql_parser.enable_options_value_normalization"
})
.map(|datafusion::config::ConfigEntry { key, value, .. }| {
log::trace!("sending configuration key: `{}`, value`{:?}`", key, value);
KeyValuePair {
Expand Down
10 changes: 8 additions & 2 deletions ballista/executor/src/execution_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::metrics::MetricsSet;
use datafusion::physical_plan::ExecutionPlan;
use std::fmt::Debug;
use std::fmt::{Debug, Display};
use std::sync::Arc;

/// Execution engine extension point
Expand All @@ -42,7 +42,7 @@ pub trait ExecutionEngine: Sync + Send {
/// partition is re-partitioned and streamed to disk in Arrow IPC format. Future stages of the query
/// will use the ShuffleReaderExec to read these results.
#[async_trait]
pub trait QueryStageExecutor: Sync + Send + Debug {
pub trait QueryStageExecutor: Sync + Send + Debug + Display {
async fn execute_query_stage(
&self,
input_partition: usize,
Expand Down Expand Up @@ -95,6 +95,12 @@ impl DefaultQueryStageExec {
}
}

impl Display for DefaultQueryStageExec {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DefaultQueryStageExec:\n{}", self.shuffle_writer)
}
}

#[async_trait]
impl QueryStageExecutor for DefaultQueryStageExec {
async fn execute_query_stage(
Expand Down
2 changes: 1 addition & 1 deletion ballista/executor/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl ExecutorMetricsCollector for LoggingMetricsCollector {
plan: Arc<dyn QueryStageExecutor>,
) {
info!(
"=== [{}/{}/{}] Physical plan with metrics ===\n{:?}\n",
"=== [{}/{}/{}] Physical plan with metrics ===\n{}\n",
job_id, stage_id, partition, plan
);
}
Expand Down

0 comments on commit 3735991

Please sign in to comment.