diff --git a/rucat_state_monitor/src/resource_manager/k8s_client.rs b/rucat_state_monitor/src/resource_manager/k8s_client.rs index f30fcab..f8985f7 100644 --- a/rucat_state_monitor/src/resource_manager/k8s_client.rs +++ b/rucat_state_monitor/src/resource_manager/k8s_client.rs @@ -3,10 +3,14 @@ use ::std::{borrow::Cow, collections::BTreeMap}; use ::k8s_openapi::api::core::v1::{Pod, Service}; use ::kube::{api::PostParams, Api, Client}; use ::rucat_common::{ - anyhow::anyhow, engine::{ + anyhow::anyhow, + engine::{ get_spark_app_id, get_spark_driver_name, get_spark_service_name, EngineConfigs, EngineId, EngineState, - }, error::{Result, RucatError}, serde_json::{self, json}, tracing::{debug, warn} + }, + error::{Result, RucatError}, + serde_json::{self, json}, + tracing::{debug, warn}, }; use super::{ResourceManager, ResourceState}; @@ -79,51 +83,65 @@ impl K8sClient { const SPARK_SERVICE_SELECTOR: &str = "rucat-engine-selector"; // convert engine configurations to Spark submit format - fn to_spark_submit_format(id: &EngineId, user_configs: &EngineConfigs) -> Result>> { + fn to_spark_submit_format( + id: &EngineId, + user_configs: &EngineConfigs, + ) -> Result>> { // Preset configurations for Spark on Kubernetes. // Users are not allowed to set these configurations. // make the map ordered for easier testing let preset_configs = BTreeMap::from([ (Cow::Borrowed("spark.app.id"), get_spark_app_id(id)), - (Cow::Borrowed("spark.driver.extraJavaOptions"), Cow::Borrowed("-Divy.cache.dir=/tmp -Divy.home=/tmp")), - (Cow::Borrowed("spark.driver.host"), get_spark_service_name(id)), - (Cow::Borrowed("spark.kubernetes.container.image"), Cow::Borrowed("apache/spark:3.5.3")), - (Cow::Borrowed("spark.kubernetes.driver.pod.name"), get_spark_driver_name(id)), - (Cow::Borrowed("spark.kubernetes.executor.podNamePrefix"), get_spark_app_id(id)), + ( + Cow::Borrowed("spark.driver.extraJavaOptions"), + Cow::Borrowed("-Divy.cache.dir=/tmp -Divy.home=/tmp"), + ), + ( + Cow::Borrowed("spark.driver.host"), + get_spark_service_name(id), + ), + ( + Cow::Borrowed("spark.kubernetes.container.image"), + Cow::Borrowed("apache/spark:3.5.3"), + ), + ( + Cow::Borrowed("spark.kubernetes.driver.pod.name"), + get_spark_driver_name(id), + ), + ( + Cow::Borrowed("spark.kubernetes.executor.podNamePrefix"), + get_spark_app_id(id), + ), ]); - match preset_configs.keys() - .filter(|k| user_configs.contains_key(*k)) - .next() { - Some(key) => { - Err(RucatError::not_allowed(anyhow!( - "The config {} is not allowed as it is reserved.", - key - ))) - } - None => { - Ok([ - Cow::Borrowed("--master"), - Cow::Borrowed("k8s://https://kubernetes:443"), - Cow::Borrowed("--deploy-mode"), - Cow::Borrowed("client"), - Cow::Borrowed("--packages"), - Cow::Borrowed("org.apache.spark:spark-connect_2.12:3.5.3"), - ] + match preset_configs + .keys() + .find(|k| user_configs.contains_key(*k)) + { + Some(key) => Err(RucatError::not_allowed(anyhow!( + "The config {} is not allowed as it is reserved.", + key + ))), + None => Ok([ + Cow::Borrowed("--master"), + Cow::Borrowed("k8s://https://kubernetes:443"), + Cow::Borrowed("--deploy-mode"), + Cow::Borrowed("client"), + Cow::Borrowed("--packages"), + Cow::Borrowed("org.apache.spark:spark-connect_2.12:3.5.3"), + ] + .iter() + .cloned() + .chain( + preset_configs .iter() - .cloned() - .chain( - preset_configs - .iter() - .chain( - user_configs - .iter() - ) - .flat_map(|(k, v)| [Cow::Borrowed("--conf"), Cow::Owned(format!("{}={}", k, v))]) - ) - .collect()) - } - } + .chain(user_configs.iter()) + .flat_map(|(k, v)| { + [Cow::Borrowed("--conf"), Cow::Owned(format!("{}={}", k, v))] + }), + ) + .collect()), + } } pub async fn new() -> Result { @@ -287,7 +305,6 @@ impl ResourceManager for K8sClient { } } - #[cfg(test)] mod tests { use super::*;