Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
Signed-off-by: remzi <[email protected]>
  • Loading branch information
HaoYang670 committed Dec 20, 2024
1 parent af1ce38 commit 4954ee1
Showing 1 changed file with 57 additions and 40 deletions.
97 changes: 57 additions & 40 deletions rucat_state_monitor/src/resource_manager/k8s_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ use ::std::{borrow::Cow, collections::BTreeMap};
use ::k8s_openapi::api::core::v1::{Pod, Service};
use ::kube::{api::PostParams, Api, Client};
use ::rucat_common::{
anyhow::anyhow, engine::{
anyhow::anyhow,
engine::{
get_spark_app_id, get_spark_driver_name, get_spark_service_name, EngineConfigs, EngineId,
EngineState,
}, error::{Result, RucatError}, serde_json::{self, json}, tracing::{debug, warn}
},
error::{Result, RucatError},
serde_json::{self, json},
tracing::{debug, warn},
};

use super::{ResourceManager, ResourceState};
Expand Down Expand Up @@ -79,51 +83,65 @@ impl K8sClient {
const SPARK_SERVICE_SELECTOR: &str = "rucat-engine-selector";

// convert engine configurations to Spark submit format
fn to_spark_submit_format(id: &EngineId, user_configs: &EngineConfigs) -> Result<Vec<Cow<'static, str>>> {
fn to_spark_submit_format(
id: &EngineId,
user_configs: &EngineConfigs,
) -> Result<Vec<Cow<'static, str>>> {
// Preset configurations for Spark on Kubernetes.
// Users are not allowed to set these configurations.
// make the map ordered for easier testing
let preset_configs = BTreeMap::from([
(Cow::Borrowed("spark.app.id"), get_spark_app_id(id)),
(Cow::Borrowed("spark.driver.extraJavaOptions"), Cow::Borrowed("-Divy.cache.dir=/tmp -Divy.home=/tmp")),
(Cow::Borrowed("spark.driver.host"), get_spark_service_name(id)),
(Cow::Borrowed("spark.kubernetes.container.image"), Cow::Borrowed("apache/spark:3.5.3")),
(Cow::Borrowed("spark.kubernetes.driver.pod.name"), get_spark_driver_name(id)),
(Cow::Borrowed("spark.kubernetes.executor.podNamePrefix"), get_spark_app_id(id)),
(
Cow::Borrowed("spark.driver.extraJavaOptions"),
Cow::Borrowed("-Divy.cache.dir=/tmp -Divy.home=/tmp"),
),
(
Cow::Borrowed("spark.driver.host"),
get_spark_service_name(id),
),
(
Cow::Borrowed("spark.kubernetes.container.image"),
Cow::Borrowed("apache/spark:3.5.3"),
),
(
Cow::Borrowed("spark.kubernetes.driver.pod.name"),
get_spark_driver_name(id),
),
(
Cow::Borrowed("spark.kubernetes.executor.podNamePrefix"),
get_spark_app_id(id),
),
]);

match preset_configs.keys()
.filter(|k| user_configs.contains_key(*k))
.next() {
Some(key) => {
Err(RucatError::not_allowed(anyhow!(
"The config {} is not allowed as it is reserved.",
key
)))
}
None => {
Ok([
Cow::Borrowed("--master"),
Cow::Borrowed("k8s://https://kubernetes:443"),
Cow::Borrowed("--deploy-mode"),
Cow::Borrowed("client"),
Cow::Borrowed("--packages"),
Cow::Borrowed("org.apache.spark:spark-connect_2.12:3.5.3"),
]
match preset_configs
.keys()
.find(|k| user_configs.contains_key(*k))
{
Some(key) => Err(RucatError::not_allowed(anyhow!(
"The config {} is not allowed as it is reserved.",
key
))),
None => Ok([
Cow::Borrowed("--master"),
Cow::Borrowed("k8s://https://kubernetes:443"),
Cow::Borrowed("--deploy-mode"),
Cow::Borrowed("client"),
Cow::Borrowed("--packages"),
Cow::Borrowed("org.apache.spark:spark-connect_2.12:3.5.3"),
]
.iter()
.cloned()
.chain(
preset_configs
.iter()
.cloned()
.chain(
preset_configs
.iter()
.chain(
user_configs
.iter()
)
.flat_map(|(k, v)| [Cow::Borrowed("--conf"), Cow::Owned(format!("{}={}", k, v))])
)
.collect())
}
}
.chain(user_configs.iter())
.flat_map(|(k, v)| {
[Cow::Borrowed("--conf"), Cow::Owned(format!("{}={}", k, v))]
}),
)
.collect()),
}
}

pub async fn new() -> Result<Self> {
Expand Down Expand Up @@ -287,7 +305,6 @@ impl ResourceManager for K8sClient {
}
}


#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 4954ee1

Please sign in to comment.