diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 2094425d7..df38a189d 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -98,11 +98,16 @@ pub async fn poll_loop let permit = available_task_slots.clone().acquire_owned().await.unwrap(); + let start_exec_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + match run_received_task( executor.clone(), permit, - task_status_sender, - task, + task_status_sender.clone(), + task.clone(), &codec, &dedicated_executor, ) @@ -110,7 +115,46 @@ pub async fn poll_loop { Ok(_) => {} Err(e) => { - warn!("Failed to run task: {:?}", e); + // + // notifying scheduler about task failure + // as scheduler expects notification. + // + + let partition_id = PartitionId { + job_id: task.job_id.clone(), + stage_id: task.stage_id as usize, + partition_id: task.partition_id as usize, + }; + + warn!( + "Executor failed to run task: {:?}, error: {:?}", + partition_id, e + ); + + let end_exec_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() + as u64; + + let task_execution_times = TaskExecutionTimes { + launch_time: task.launch_time, + start_exec_time, + end_exec_time, + }; + + // TODO: MM should we re-try message? + if let Err(error) = task_status_sender.send(as_task_status( + Err(e), + executor.metadata.id.clone(), + task.task_id as usize, + task.task_attempt_num as usize, + partition_id, + None, + task_execution_times, + )) { + warn!("failed to send task status: {:?}", error); + }; } } }