diff --git a/ballista/scheduler/src/bin/main.rs b/ballista/scheduler/src/bin/main.rs index 7d8b4b1b0..80c4a0a59 100644 --- a/ballista/scheduler/src/bin/main.rs +++ b/ballista/scheduler/src/bin/main.rs @@ -27,7 +27,7 @@ use ballista_core::config::LogRotationPolicy; use ballista_core::print_version; use ballista_scheduler::cluster::BallistaCluster; use ballista_scheduler::config::{ - ClusterStorageConfig, SchedulerConfig, TaskDistribution, TaskDistributionPolicy, + SchedulerConfig, TaskDistribution, TaskDistributionPolicy, }; use ballista_scheduler::scheduler_process::start_server; use tracing_subscriber::EnvFilter; @@ -115,8 +115,6 @@ async fn inner() -> Result<()> { let addr = format!("{}:{}", opt.bind_host, opt.bind_port); let addr = addr.parse()?; - let cluster_storage_config = ClusterStorageConfig::Memory; - let task_distribution = match opt.task_distribution { TaskDistribution::Bias => TaskDistributionPolicy::Bias, TaskDistribution::RoundRobin => TaskDistributionPolicy::RoundRobin, @@ -142,7 +140,6 @@ async fn inner() -> Result<()> { finished_job_state_clean_up_interval_seconds: opt .finished_job_state_clean_up_interval_seconds, advertise_flight_sql_endpoint: opt.advertise_flight_sql_endpoint, - cluster_storage: cluster_storage_config, job_resubmit_interval_ms: (opt.job_resubmit_interval_ms > 0) .then_some(opt.job_resubmit_interval_ms), executor_termination_grace_period: opt.executor_termination_grace_period, diff --git a/ballista/scheduler/src/cluster/mod.rs b/ballista/scheduler/src/cluster/mod.rs index eda49806c..e60831a0a 100644 --- a/ballista/scheduler/src/cluster/mod.rs +++ b/ballista/scheduler/src/cluster/mod.rs @@ -42,7 +42,7 @@ use log::{debug, info, warn}; use crate::cluster::memory::{InMemoryClusterState, InMemoryJobState}; -use crate::config::{ClusterStorageConfig, SchedulerConfig, TaskDistributionPolicy}; +use crate::config::{SchedulerConfig, TaskDistributionPolicy}; use crate::scheduler_server::SessionBuilder; use crate::state::execution_graph::{create_task_info, ExecutionGraph, TaskDescription}; use crate::state::task_manager::JobInfoCache; @@ -105,12 +105,10 @@ impl BallistaCluster { pub async fn new_from_config(config: &SchedulerConfig) -> Result { let scheduler = config.scheduler_name(); - match &config.cluster_storage { - ClusterStorageConfig::Memory => Ok(BallistaCluster::new_memory( - scheduler, - default_session_builder, - )), - } + Ok(BallistaCluster::new_memory( + scheduler, + default_session_builder, + )) } pub fn cluster_state(&self) -> Arc { diff --git a/ballista/scheduler/src/config.rs b/ballista/scheduler/src/config.rs index ce542e519..665e7ad33 100644 --- a/ballista/scheduler/src/config.rs +++ b/ballista/scheduler/src/config.rs @@ -47,8 +47,7 @@ pub struct SchedulerConfig { /// If provided, submitted jobs which do not have tasks scheduled will be resubmitted after `job_resubmit_interval_ms` /// milliseconds pub job_resubmit_interval_ms: Option, - /// Configuration for ballista cluster storage - pub cluster_storage: ClusterStorageConfig, + /// Time in seconds to allow executor for graceful shutdown. Once an executor signals it has entered Terminating status /// the scheduler should only consider the executor dead after this time interval has elapsed pub executor_termination_grace_period: u64, @@ -76,7 +75,6 @@ impl Default for SchedulerConfig { finished_job_data_clean_up_interval_seconds: 300, finished_job_state_clean_up_interval_seconds: 3600, advertise_flight_sql_endpoint: None, - cluster_storage: ClusterStorageConfig::Memory, job_resubmit_interval_ms: None, executor_termination_grace_period: 0, scheduler_event_expected_processing_duration: 0, @@ -151,11 +149,6 @@ impl SchedulerConfig { self } - pub fn with_cluster_storage(mut self, config: ClusterStorageConfig) -> Self { - self.cluster_storage = config; - self - } - pub fn with_job_resubmit_interval_ms(mut self, interval_ms: u64) -> Self { self.job_resubmit_interval_ms = Some(interval_ms); self @@ -177,11 +170,6 @@ impl SchedulerConfig { } } -#[derive(Clone, Debug)] -pub enum ClusterStorageConfig { - Memory, -} - /// Policy of distributing tasks to available executor slots /// /// It needs to be visible to code generated by configure_me diff --git a/ballista/scheduler/src/state/execution_graph.rs b/ballista/scheduler/src/state/execution_graph.rs index 333545d35..e9ff2b62c 100644 --- a/ballista/scheduler/src/state/execution_graph.rs +++ b/ballista/scheduler/src/state/execution_graph.rs @@ -24,25 +24,20 @@ use std::time::{SystemTime, UNIX_EPOCH}; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::{accept, ExecutionPlan, ExecutionPlanVisitor}; -use datafusion::prelude::SessionContext; -use datafusion_proto::logical_plan::AsLogicalPlan; use log::{error, info, warn}; use ballista_core::error::{BallistaError, Result}; use ballista_core::execution_plans::{ShuffleWriterExec, UnresolvedShuffleExec}; use ballista_core::serde::protobuf::failed_task::FailedReason; use ballista_core::serde::protobuf::job_status::Status; -use ballista_core::serde::protobuf::{ - self, execution_graph_stage::StageType, FailedTask, JobStatus, ResultLost, - RunningJob, SuccessfulJob, TaskStatus, -}; use ballista_core::serde::protobuf::{job_status, FailedJob, ShuffleWritePartition}; use ballista_core::serde::protobuf::{task_status, RunningTask}; +use ballista_core::serde::protobuf::{ + FailedTask, JobStatus, ResultLost, RunningJob, SuccessfulJob, TaskStatus, +}; use ballista_core::serde::scheduler::{ ExecutorMetadata, PartitionId, PartitionLocation, PartitionStats, }; -use ballista_core::serde::BallistaCodec; -use datafusion_proto::physical_plan::AsExecutionPlan; use crate::display::print_stage_metrics; use crate::planner::DistributedPlanner; @@ -50,8 +45,7 @@ use crate::scheduler_server::event::QueryStageSchedulerEvent; use crate::scheduler_server::timestamp_millis; use crate::state::execution_graph::execution_stage::RunningStage; pub(crate) use crate::state::execution_graph::execution_stage::{ - ExecutionStage, FailedStage, ResolvedStage, StageOutput, SuccessfulStage, TaskInfo, - UnresolvedStage, + ExecutionStage, ResolvedStage, StageOutput, TaskInfo, UnresolvedStage, }; use crate::state::task_manager::UpdatedStages; @@ -1317,163 +1311,6 @@ impl ExecutionGraph { fn clear_stage_failure(&mut self, stage_id: usize) { self.failed_stage_attempts.remove(&stage_id); } - - pub(crate) async fn decode_execution_graph< - T: 'static + AsLogicalPlan, - U: 'static + AsExecutionPlan, - >( - proto: protobuf::ExecutionGraph, - codec: &BallistaCodec, - session_ctx: &SessionContext, - ) -> Result { - let mut stages: HashMap = HashMap::new(); - for graph_stage in proto.stages { - let stage_type = graph_stage.stage_type.expect("Unexpected empty stage"); - - let execution_stage = match stage_type { - StageType::UnresolvedStage(stage) => { - let stage: UnresolvedStage = - UnresolvedStage::decode(stage, codec, session_ctx)?; - (stage.stage_id, ExecutionStage::UnResolved(stage)) - } - StageType::ResolvedStage(stage) => { - let stage: ResolvedStage = - ResolvedStage::decode(stage, codec, session_ctx)?; - (stage.stage_id, ExecutionStage::Resolved(stage)) - } - StageType::SuccessfulStage(stage) => { - let stage: SuccessfulStage = - SuccessfulStage::decode(stage, codec, session_ctx)?; - (stage.stage_id, ExecutionStage::Successful(stage)) - } - StageType::FailedStage(stage) => { - let stage: FailedStage = - FailedStage::decode(stage, codec, session_ctx)?; - (stage.stage_id, ExecutionStage::Failed(stage)) - } - }; - - stages.insert(execution_stage.0, execution_stage.1); - } - - let output_locations: Vec = proto - .output_locations - .into_iter() - .map(|loc| loc.try_into()) - .collect::>>()?; - - let failed_stage_attempts = proto - .failed_attempts - .into_iter() - .map(|attempt| { - ( - attempt.stage_id as usize, - HashSet::from_iter( - attempt - .stage_attempt_num - .into_iter() - .map(|num| num as usize), - ), - ) - }) - .collect(); - - Ok(ExecutionGraph { - scheduler_id: (!proto.scheduler_id.is_empty()).then_some(proto.scheduler_id), - job_id: proto.job_id, - job_name: proto.job_name, - session_id: proto.session_id, - status: proto.status.ok_or_else(|| { - BallistaError::Internal( - "Invalid Execution Graph: missing job status".to_owned(), - ) - })?, - queued_at: proto.queued_at, - start_time: proto.start_time, - end_time: proto.end_time, - stages, - output_partitions: proto.output_partitions as usize, - output_locations, - task_id_gen: proto.task_id_gen as usize, - failed_stage_attempts, - }) - } - - /// Running stages will not be persisted so that will not be encoded. - /// Running stages will be convert back to the resolved stages to be encoded and persisted - pub(crate) fn encode_execution_graph< - T: 'static + AsLogicalPlan, - U: 'static + AsExecutionPlan, - >( - graph: ExecutionGraph, - codec: &BallistaCodec, - ) -> Result { - let job_id = graph.job_id().to_owned(); - - let stages = graph - .stages - .into_values() - .map(|stage| { - let stage_type = match stage { - ExecutionStage::UnResolved(stage) => { - StageType::UnresolvedStage(UnresolvedStage::encode(stage, codec)?) - } - ExecutionStage::Resolved(stage) => { - StageType::ResolvedStage(ResolvedStage::encode(stage, codec)?) - } - ExecutionStage::Running(stage) => StageType::ResolvedStage( - ResolvedStage::encode(stage.to_resolved(), codec)?, - ), - ExecutionStage::Successful(stage) => StageType::SuccessfulStage( - SuccessfulStage::encode(job_id.clone(), stage, codec)?, - ), - ExecutionStage::Failed(stage) => StageType::FailedStage( - FailedStage::encode(job_id.clone(), stage, codec)?, - ), - }; - Ok(protobuf::ExecutionGraphStage { - stage_type: Some(stage_type), - }) - }) - .collect::>>()?; - - let output_locations: Vec = graph - .output_locations - .into_iter() - .map(|loc| loc.try_into()) - .collect::>>()?; - - let failed_attempts: Vec = graph - .failed_stage_attempts - .into_iter() - .map(|(stage_id, attempts)| { - let stage_attempt_num = attempts - .into_iter() - .map(|num| num as u32) - .collect::>(); - protobuf::StageAttempts { - stage_id: stage_id as u32, - stage_attempt_num, - } - }) - .collect::>(); - - Ok(protobuf::ExecutionGraph { - job_id: graph.job_id, - job_name: graph.job_name, - session_id: graph.session_id, - status: Some(graph.status), - queued_at: graph.queued_at, - start_time: graph.start_time, - end_time: graph.end_time, - stages, - output_partitions: graph.output_partitions as u64, - output_locations, - scheduler_id: graph.scheduler_id.unwrap_or_default(), - task_id_gen: graph.task_id_gen as u32, - failed_attempts, - }) - } } impl Debug for ExecutionGraph { diff --git a/ballista/scheduler/src/state/execution_graph/execution_stage.rs b/ballista/scheduler/src/state/execution_graph/execution_stage.rs index 65a2d0125..b609dbee3 100644 --- a/ballista/scheduler/src/state/execution_graph/execution_stage.rs +++ b/ballista/scheduler/src/state/execution_graph/execution_stage.rs @@ -18,7 +18,6 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryInto; use std::fmt::{Debug, Formatter}; -use std::iter::FromIterator; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; @@ -27,23 +26,18 @@ use datafusion::physical_optimizer::PhysicalOptimizerRule; use datafusion::physical_plan::display::DisplayableExecutionPlan; use datafusion::physical_plan::metrics::{MetricValue, MetricsSet}; use datafusion::physical_plan::{ExecutionPlan, Metric}; -use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion_proto::logical_plan::AsLogicalPlan; +use datafusion::prelude::SessionConfig; use log::{debug, warn}; +use crate::display::DisplayableBallistaExecutionPlan; use ballista_core::error::{BallistaError, Result}; use ballista_core::execution_plans::ShuffleWriterExec; use ballista_core::serde::protobuf::failed_task::FailedReason; +use ballista_core::serde::protobuf::{task_status, RunningTask}; use ballista_core::serde::protobuf::{ - self, task_info, FailedTask, GraphStageInput, OperatorMetricsSet, ResultLost, - SuccessfulTask, TaskStatus, + FailedTask, OperatorMetricsSet, ResultLost, SuccessfulTask, TaskStatus, }; -use ballista_core::serde::protobuf::{task_status, RunningTask}; use ballista_core::serde::scheduler::PartitionLocation; -use ballista_core::serde::BallistaCodec; -use datafusion_proto::physical_plan::AsExecutionPlan; - -use crate::display::DisplayableBallistaExecutionPlan; /// A stage in the ExecutionGraph, /// represents a set of tasks (one per each `partition`) which can be executed concurrently. @@ -368,54 +362,6 @@ impl UnresolvedStage { self.last_attempt_failure_reasons.clone(), )) } - - pub(super) fn decode( - stage: protobuf::UnResolvedStage, - codec: &BallistaCodec, - session_ctx: &SessionContext, - ) -> Result { - let plan_proto = U::try_decode(&stage.plan)?; - let plan = plan_proto.try_into_physical_plan( - session_ctx, - session_ctx.runtime_env().as_ref(), - codec.physical_extension_codec(), - )?; - - let inputs = decode_inputs(stage.inputs)?; - - Ok(UnresolvedStage { - stage_id: stage.stage_id as usize, - stage_attempt_num: stage.stage_attempt_num as usize, - output_links: stage.output_links.into_iter().map(|l| l as usize).collect(), - plan, - inputs, - last_attempt_failure_reasons: HashSet::from_iter( - stage.last_attempt_failure_reasons, - ), - }) - } - - pub(super) fn encode( - stage: UnresolvedStage, - codec: &BallistaCodec, - ) -> Result { - let mut plan: Vec = vec![]; - U::try_from_physical_plan(stage.plan, codec.physical_extension_codec()) - .and_then(|proto| proto.try_encode(&mut plan))?; - - let inputs = encode_inputs(stage.inputs)?; - - Ok(protobuf::UnResolvedStage { - stage_id: stage.stage_id as u32, - stage_attempt_num: stage.stage_attempt_num as u32, - output_links: stage.output_links.into_iter().map(|l| l as u32).collect(), - inputs, - plan, - last_attempt_failure_reasons: Vec::from_iter( - stage.last_attempt_failure_reasons, - ), - }) - } } impl Debug for UnresolvedStage { @@ -482,56 +428,6 @@ impl ResolvedStage { ); Ok(unresolved) } - - pub(super) fn decode( - stage: protobuf::ResolvedStage, - codec: &BallistaCodec, - session_ctx: &SessionContext, - ) -> Result { - let plan_proto = U::try_decode(&stage.plan)?; - let plan = plan_proto.try_into_physical_plan( - session_ctx, - session_ctx.runtime_env().as_ref(), - codec.physical_extension_codec(), - )?; - - let inputs = decode_inputs(stage.inputs)?; - - Ok(ResolvedStage { - stage_id: stage.stage_id as usize, - stage_attempt_num: stage.stage_attempt_num as usize, - partitions: stage.partitions as usize, - output_links: stage.output_links.into_iter().map(|l| l as usize).collect(), - inputs, - plan, - last_attempt_failure_reasons: HashSet::from_iter( - stage.last_attempt_failure_reasons, - ), - }) - } - - pub(super) fn encode( - stage: ResolvedStage, - codec: &BallistaCodec, - ) -> Result { - let mut plan: Vec = vec![]; - U::try_from_physical_plan(stage.plan, codec.physical_extension_codec()) - .and_then(|proto| proto.try_encode(&mut plan))?; - - let inputs = encode_inputs(stage.inputs)?; - - Ok(protobuf::ResolvedStage { - stage_id: stage.stage_id as u32, - stage_attempt_num: stage.stage_attempt_num as u32, - partitions: stage.partitions as u32, - output_links: stage.output_links.into_iter().map(|l| l as u32).collect(), - inputs, - plan, - last_attempt_failure_reasons: Vec::from_iter( - stage.last_attempt_failure_reasons, - ), - }) - } } impl Debug for ResolvedStage { @@ -611,18 +507,6 @@ impl RunningStage { } } - /// Change to the resolved state and bump the stage attempt number - pub(super) fn to_resolved(&self) -> ResolvedStage { - ResolvedStage::new( - self.stage_id, - self.stage_attempt_num + 1, - self.plan.clone(), - self.output_links.clone(), - self.inputs.clone(), - HashSet::new(), - ) - } - /// Change to the unresolved state and bump the stage attempt number pub(super) fn to_unresolved( &self, @@ -952,80 +836,6 @@ impl SuccessfulStage { } reset } - - pub(super) fn decode( - stage: protobuf::SuccessfulStage, - codec: &BallistaCodec, - session_ctx: &SessionContext, - ) -> Result { - let plan_proto = U::try_decode(&stage.plan)?; - let plan = plan_proto.try_into_physical_plan( - session_ctx, - session_ctx.runtime_env().as_ref(), - codec.physical_extension_codec(), - )?; - - let inputs = decode_inputs(stage.inputs)?; - assert_eq!( - stage.task_infos.len(), - stage.partitions as usize, - "protobuf::SuccessfulStage task_infos len not equal to partitions." - ); - let task_infos = stage.task_infos.into_iter().map(decode_taskinfo).collect(); - let stage_metrics = stage - .stage_metrics - .into_iter() - .map(|m| m.try_into()) - .collect::>>()?; - - Ok(SuccessfulStage { - stage_id: stage.stage_id as usize, - stage_attempt_num: stage.stage_attempt_num as usize, - partitions: stage.partitions as usize, - output_links: stage.output_links.into_iter().map(|l| l as usize).collect(), - inputs, - plan, - task_infos, - stage_metrics, - }) - } - - pub(super) fn encode( - _job_id: String, - stage: SuccessfulStage, - codec: &BallistaCodec, - ) -> Result { - let stage_id = stage.stage_id; - - let mut plan: Vec = vec![]; - U::try_from_physical_plan(stage.plan, codec.physical_extension_codec()) - .and_then(|proto| proto.try_encode(&mut plan))?; - - let inputs = encode_inputs(stage.inputs)?; - let task_infos = stage - .task_infos - .into_iter() - .enumerate() - .map(|(partition, task_info)| encode_taskinfo(task_info, partition)) - .collect(); - - let stage_metrics = stage - .stage_metrics - .into_iter() - .map(|m| m.try_into()) - .collect::>>()?; - - Ok(protobuf::SuccessfulStage { - stage_id: stage_id as u32, - stage_attempt_num: stage.stage_attempt_num as u32, - partitions: stage.partitions as u32, - output_links: stage.output_links.into_iter().map(|l| l as u32).collect(), - inputs, - plan, - task_infos, - stage_metrics, - }) - } } impl Debug for SuccessfulStage { @@ -1071,85 +881,6 @@ impl FailedStage { pub(super) fn available_tasks(&self) -> usize { self.task_infos.iter().filter(|s| s.is_none()).count() } - - pub(super) fn decode( - stage: protobuf::FailedStage, - codec: &BallistaCodec, - session_ctx: &SessionContext, - ) -> Result { - let plan_proto = U::try_decode(&stage.plan)?; - let plan = plan_proto.try_into_physical_plan( - session_ctx, - session_ctx.runtime_env().as_ref(), - codec.physical_extension_codec(), - )?; - - let mut task_infos: Vec> = vec![None; stage.partitions as usize]; - for info in stage.task_infos { - task_infos[info.partition_id as usize] = Some(decode_taskinfo(info.clone())); - } - - let stage_metrics = if stage.stage_metrics.is_empty() { - None - } else { - let ms = stage - .stage_metrics - .into_iter() - .map(|m| m.try_into()) - .collect::>>()?; - Some(ms) - }; - - Ok(FailedStage { - stage_id: stage.stage_id as usize, - stage_attempt_num: stage.stage_attempt_num as usize, - partitions: stage.partitions as usize, - output_links: stage.output_links.into_iter().map(|l| l as usize).collect(), - plan, - task_infos, - stage_metrics, - error_message: stage.error_message, - }) - } - - pub(super) fn encode( - _job_id: String, - stage: FailedStage, - codec: &BallistaCodec, - ) -> Result { - let stage_id = stage.stage_id; - - let mut plan: Vec = vec![]; - U::try_from_physical_plan(stage.plan, codec.physical_extension_codec()) - .and_then(|proto| proto.try_encode(&mut plan))?; - - let task_infos: Vec = stage - .task_infos - .into_iter() - .enumerate() - .filter_map(|(partition, task_info)| { - task_info.map(|info| encode_taskinfo(info, partition)) - }) - .collect(); - - let stage_metrics = stage - .stage_metrics - .unwrap_or_default() - .into_iter() - .map(|m| m.try_into()) - .collect::>>()?; - - Ok(protobuf::FailedStage { - stage_id: stage_id as u32, - stage_attempt_num: stage.stage_attempt_num as u32, - partitions: stage.partitions as u32, - output_links: stage.output_links.into_iter().map(|l| l as u32).collect(), - plan, - task_infos, - stage_metrics, - error_message: stage.error_message, - }) - } } impl Debug for FailedStage { @@ -1220,106 +951,3 @@ impl StageOutput { self.complete } } - -fn decode_inputs( - stage_inputs: Vec, -) -> Result> { - let mut inputs: HashMap = HashMap::new(); - for input in stage_inputs { - let stage_id = input.stage_id as usize; - - let outputs = input - .partition_locations - .into_iter() - .map(|loc| { - let partition = loc.partition as usize; - let locations = loc - .partition_location - .into_iter() - .map(|l| l.try_into()) - .collect::>>()?; - Ok((partition, locations)) - }) - .collect::>>>()?; - - inputs.insert( - stage_id, - StageOutput { - partition_locations: outputs, - complete: input.complete, - }, - ); - } - Ok(inputs) -} - -fn encode_inputs( - stage_inputs: HashMap, -) -> Result> { - let mut inputs: Vec = vec![]; - for (stage_id, output) in stage_inputs.into_iter() { - inputs.push(protobuf::GraphStageInput { - stage_id: stage_id as u32, - partition_locations: output - .partition_locations - .into_iter() - .map(|(partition, locations)| { - Ok(protobuf::TaskInputPartitions { - partition: partition as u32, - partition_location: locations - .into_iter() - .map(|l| l.try_into()) - .collect::>>()?, - }) - }) - .collect::>>()?, - complete: output.complete, - }); - } - Ok(inputs) -} - -fn decode_taskinfo(task_info: protobuf::TaskInfo) -> TaskInfo { - let task_info_status = match task_info.status { - Some(task_info::Status::Running(running)) => { - task_status::Status::Running(running) - } - Some(task_info::Status::Failed(failed)) => task_status::Status::Failed(failed), - Some(task_info::Status::Successful(success)) => { - task_status::Status::Successful(success) - } - _ => panic!( - "protobuf::TaskInfo status for task {} should not be none", - task_info.task_id - ), - }; - TaskInfo { - task_id: task_info.task_id as usize, - scheduled_time: task_info.scheduled_time as u128, - launch_time: task_info.launch_time as u128, - start_exec_time: task_info.start_exec_time as u128, - end_exec_time: task_info.end_exec_time as u128, - finish_time: task_info.finish_time as u128, - task_status: task_info_status, - } -} - -fn encode_taskinfo(task_info: TaskInfo, partition_id: usize) -> protobuf::TaskInfo { - let task_info_status = match task_info.task_status { - task_status::Status::Running(running) => task_info::Status::Running(running), - task_status::Status::Failed(failed) => task_info::Status::Failed(failed), - task_status::Status::Successful(success) => { - task_info::Status::Successful(success) - } - }; - protobuf::TaskInfo { - task_id: task_info.task_id as u32, - partition_id: partition_id as u32, - scheduled_time: task_info.scheduled_time as u64, - launch_time: task_info.launch_time as u64, - start_exec_time: task_info.start_exec_time as u64, - end_exec_time: task_info.end_exec_time as u64, - finish_time: task_info.finish_time as u64, - status: Some(task_info_status), - } -}