diff --git a/README.md b/README.md index 7fa711a..c6d7367 100644 --- a/README.md +++ b/README.md @@ -119,7 +119,8 @@ Dns is coredns with fanout between all nodes along with serving from file. Hosts are maintained via a CNI plugin that adds/removes the ip to the hosts file. -Pods get a hostname of `..cluster.skate.` +Pods get a hostname of `..pod.cluster.skate.` +Services get `..svc.cluster.skate.` ### Ingress @@ -141,14 +142,11 @@ spec: pathType: Prefix backend: service: - name: mypod.myns.cluster.skate + name: mypod.myns # routes to mypod.myns.svc.cluster.skate port: number: 80 ``` -Service resources are ignored and it's implicit that a pod has a service with -url: `..cluster.skate` - Currently only Prefix pathType is supported. Supported annotations: @@ -332,10 +330,20 @@ sudo apt-get install -y gcc make libssl-dev pkg-config - [ ] Get pod config from store and not podman -### DNS Improvements +### Service Improvements + +#### Pre work +1. Deploy keepalived on allnodes +2. Apply static ips to pods. + +#### Deploying service + +1. Modify keepalived.conf on all nodes to have service ips +2. Assign ip to keepalive2 service. +2. Create a dns entry for ..svc.cluster.skate that points to keepalived + +Or -1. Mod coredns to fanout to all nodes and wait for all responses, and round robin the responses. -2. Make these dns records available as ..pod.cluster.skate -3. Mod ingress to apply Service resources, making them available as ..svc.cluster.skate, proxying to the - pod domains. -4. Make nginx proxy to next healthy upstream upon connection failure. \ No newline at end of file +1. Assign ip to keepalive2 service. +2. Cron that queries dns for all services every n seconds and updates keepalived.conf and reloads it. +3. Create a dns entry for ..svc.cluster.skate that points to keepalived diff --git a/hack/test-deployment.yaml b/hack/test-deployment.yaml index d9ad1d4..ea39a49 100644 --- a/hack/test-deployment.yaml +++ b/hack/test-deployment.yaml @@ -2,7 +2,7 @@ apiVersion: apps/v1 kind: Deployment metadata: - name: nginx-deployment + name: nginx namespace: foo spec: replicas: 3 diff --git a/hack/test-service.yaml b/hack/test-service.yaml new file mode 100644 index 0000000..a7bf914 --- /dev/null +++ b/hack/test-service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + name: nginx + namespace: foo +spec: + selector: + app.kubernetes.io/name: nginx + ports: + - protocol: TCP + port: 80 + targetPort: 80 \ No newline at end of file diff --git a/images/coredns/go.mod b/images/coredns/go.mod index f373af2..4b8d8c4 100644 --- a/images/coredns/go.mod +++ b/images/coredns/go.mod @@ -145,4 +145,4 @@ require ( sigs.k8s.io/yaml v1.3.0 // indirect ) -replace github.com/networkservicemesh/fanout => github.com/skateco/fanout v0.0.0-20240821130608-7538dbcf5f9e +replace github.com/networkservicemesh/fanout => github.com/skateco/fanout v0.0.0-20240821133121-12157fa01a4d diff --git a/images/coredns/go.sum b/images/coredns/go.sum index fd1a4bb..508d0d4 100644 --- a/images/coredns/go.sum +++ b/images/coredns/go.sum @@ -547,8 +547,6 @@ github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxzi github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= -github.com/networkservicemesh/fanout v1.9.2 h1:KF2PsFJSNUTvFXc1hMdqCOQ9lRqGN4V8lVg8fwa5HhA= -github.com/networkservicemesh/fanout v1.9.2/go.mod h1:EM8dDilQja7KTATYkS6En1OIdxyy19/n0ivm+ft6tDs= github.com/nrdcg/auroradns v1.0.0/go.mod h1:6JPXKzIRzZzMqtTDgueIhTi6rFf1QvYE/HzqidhOhjw= github.com/nrdcg/goinwx v0.6.1/go.mod h1:XPiut7enlbEdntAqalBIqcYcTEVhpv/dKWgDCX2SwKQ= github.com/nrdcg/namesilo v0.2.1/go.mod h1:lwMvfQTyYq+BbjJd30ylEG4GPSS6PII0Tia4rRpRiyw= @@ -682,8 +680,8 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= -github.com/skateco/fanout v0.0.0-20240821130608-7538dbcf5f9e h1:wKU46K4+dPwlK+8pV7POCAbBri+VKKRyC0IhKGmSHQk= -github.com/skateco/fanout v0.0.0-20240821130608-7538dbcf5f9e/go.mod h1:EM8dDilQja7KTATYkS6En1OIdxyy19/n0ivm+ft6tDs= +github.com/skateco/fanout v0.0.0-20240821133121-12157fa01a4d h1:RynTo7/odyJf0omOAki6QoUA0izPazadCa1l91udW6E= +github.com/skateco/fanout v0.0.0-20240821133121-12157fa01a4d/go.mod h1:EM8dDilQja7KTATYkS6En1OIdxyy19/n0ivm+ft6tDs= github.com/skratchdot/open-golang v0.0.0-20160302144031-75fb7ed4208c/go.mod h1:sUM3LWHvSMaG192sy56D9F7CNvL7jUJVXoqM1QKLnog= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA= diff --git a/images/nginx-ingress/service.conf.tmpl b/images/nginx-ingress/service.conf.tmpl index eae4980..54a37fc 100644 --- a/images/nginx-ingress/service.conf.tmpl +++ b/images/nginx-ingress/service.conf.tmpl @@ -64,7 +64,7 @@ {{/inline}} {{#*inline "proxyPassLocation"}} - set $upstream http://{{backend.service.name}}.cluster.skate:{{backend.service.port.number}}; + set $upstream http://{{backend.service.name}}.svc.cluster.skate:{{backend.service.port.number}}; proxy_pass $upstream; {{/inline}} diff --git a/manifests/coredns.yaml b/manifests/coredns.yaml index 283452f..50435b0 100644 --- a/manifests/coredns.yaml +++ b/manifests/coredns.yaml @@ -36,7 +36,15 @@ spec: hosts /var/lib/skate/dns/addnhosts } - cluster.skate:53 { + svc.cluster.skate:53 { + + bind lo + + hosts /var/lib/skate/dns/addnhosts + + } + + pod.cluster.skate:53 { bind lo diff --git a/src/config_cmd.rs b/src/config_cmd.rs new file mode 100644 index 0000000..5ec8aa3 --- /dev/null +++ b/src/config_cmd.rs @@ -0,0 +1,37 @@ +use anyhow::anyhow; +use clap::{Args, Subcommand}; +use crate::skate::ConfigFileArgs; + +#[derive(Debug, Args)] +pub struct ConfigArgs{ + #[command(flatten)] + config: ConfigFileArgs, + #[command(subcommand)] + command: ConfigCommands, +} + +#[derive(Debug, Args)] +pub struct UseContextArgs{ + pub context: String + +} + +#[derive(Debug, Subcommand)] +pub enum ConfigCommands { + UseContext(UseContextArgs), +} + +pub fn config(args: ConfigArgs) -> Result<(), Box> { + match args.command { + ConfigCommands::UseContext(use_context_args) => { + let mut config = crate::config::Config::load(Some(args.config.skateconfig.clone())).expect("failed to load skate config"); + config.clusters.iter().any(|c| c.name == use_context_args.context) + .then(|| ()) + .ok_or(anyhow!("no context exists with the name {}", use_context_args.context))?; + config.current_context = Some(use_context_args.context.clone()); + config.persist(Some(args.config.skateconfig))?; + println!("Switched to context \"{}\"", use_context_args.context.replace("\"", "")); + } + } + Ok(()) +} \ No newline at end of file diff --git a/src/create.rs b/src/create.rs index b205e8e..1cb3abd 100644 --- a/src/create.rs +++ b/src/create.rs @@ -231,8 +231,13 @@ async fn create_node(args: CreateNodeArgs) -> Result<(), Box> { let (all_conns, _) = cluster_connections(&cluster).await; let all_conns = &all_conns.unwrap_or(SshClients { clients: vec!() }); + let skate_dirs = [ + "ingress", + "ingress/letsencrypt_storage", + "dns", + "keepalived"].map(|s| format!("/var/lib/skate/{}", s)); - _ = conn.execute("sudo mkdir -p /var/lib/skate/ingress /var/lib/skate/ingress/letsencrypt_storage /var/lib/skate/dns").await?; + _ = conn.execute(&format!("sudo mkdir -p {}", skate_dirs.join(" "))).await?; // _ = conn.execute("sudo podman rm -fa").await; setup_networking(&conn, &all_conns, &cluster, &node).await?; @@ -291,6 +296,12 @@ async fn install_cluster_manifests(args: &ConfigFileArgs, config: &Cluster) -> R async fn setup_networking(conn: &SshClient, all_conns: &SshClients, cluster_conf: &Cluster, node: &Node) -> Result<(), Box> { let network_backend = "netavark"; + conn.execute("sudo apt-get install -y keepalived").await?; + conn.execute(&format!("sudo bash -c -eu 'echo {}| base64 --decode > /etc/keepalived/keepalived.conf'", general_purpose::STANDARD.encode(include_str!("./resources/keepalived.conf")))).await?; + conn.execute("sudo systemctl enable keepalived").await?; + conn.execute("sudo systemctl start keepalived").await?; + + if conn.execute("test -f /etc/containers/containers.conf").await.is_err() { let cmd = "sudo cp /usr/share/containers/containers.conf /etc/containers/containers.conf"; conn.execute(cmd).await?; diff --git a/src/delete.rs b/src/delete.rs index cb0f5a9..4b2b403 100644 --- a/src/delete.rs +++ b/src/delete.rs @@ -24,6 +24,7 @@ pub enum DeleteCommands { Secret(DeleteResourceArgs), Deployment(DeleteResourceArgs), Daemonset(DeleteResourceArgs), + Service(DeleteResourceArgs), } #[derive(Debug, Args)] @@ -45,6 +46,7 @@ pub async fn delete(args: DeleteArgs) -> Result<(), Box> { DeleteCommands::Ingress(args) => delete_resource(ResourceType::Ingress, args).await?, DeleteCommands::Cronjob(args) => delete_resource(ResourceType::CronJob, args).await?, DeleteCommands::Secret(args) => delete_resource(ResourceType::Secret, args).await?, + DeleteCommands::Service(args) => delete_resource(ResourceType::Service, args).await?, } Ok(()) } diff --git a/src/executor.rs b/src/executor.rs index 239321e..6e40777 100644 --- a/src/executor.rs +++ b/src/executor.rs @@ -1,22 +1,25 @@ use std::error::Error; use std::fs::File; -use std::io::{Write}; -use std::process; +use std::io::{read_to_string, BufRead, Read, Seek, SeekFrom, Write}; +use std::net::{IpAddr, Ipv4Addr}; +use std::{fs, process}; use std::process::Stdio; - +use std::str::FromStr; use anyhow::anyhow; use handlebars::Handlebars; use itertools::Itertools; use k8s_openapi::api::apps::v1::{DaemonSet, Deployment}; use k8s_openapi::api::batch::v1::CronJob; -use k8s_openapi::api::core::v1::{Pod, Secret}; +use k8s_openapi::api::core::v1::{Pod, Secret, Service}; use k8s_openapi::api::networking::v1::Ingress; +use log::info; use serde_json::{json, Value}; use crate::cron::cron_to_systemd; use crate::filestore::FileStore; use crate::skate::{exec_cmd, SupportedResources}; -use crate::util::{hash_string, metadata_name}; +use crate::skatelet::dns; +use crate::util::{hash_string, lock_file, metadata_name}; pub trait Executor { fn apply(&self, manifest: &str) -> Result<(), Box>; @@ -128,10 +131,11 @@ impl DefaultExecutor { let mut file = std::fs::OpenOptions::new().write(true).create(true).truncate(true).open(&format!("/etc/systemd/system/skate-cronjob-{}.timer", &ns_name.to_string()))?; file.write_all(output.as_bytes())?; + let unit_name = format!("skate-cronjob-{}", &ns_name.to_string()); - // systemctl daemon-reload exec_cmd("systemctl", &["daemon-reload"])?; - exec_cmd("systemctl", &["enable", "--now", &format!("skate-cronjob-{}", &ns_name.to_string())])?; + exec_cmd("systemctl", &["enable", "--now", &unit_name])?; + exec_cmd("systemctl", &["reset-failed", &unit_name]); Ok(()) } @@ -287,6 +291,107 @@ impl DefaultExecutor { Ok(()) } + fn apply_service(&self, service: Service) -> Result<(), Box> { + let manifest_string = serde_yaml::to_string(&service).map_err(|e| anyhow!(e).context("failed to serialize manifest to yaml"))?; + let name = &metadata_name(&service).to_string(); + + // manifest goes into store + let yaml_path = self.store.write_file("service", name, "manifest.yaml", manifest_string.as_bytes())?; + + let hash = service.metadata.labels.as_ref().and_then(|m| m.get("skate.io/hash")).unwrap_or(&"".to_string()).to_string(); + + if !hash.is_empty() { + self.store.write_file("service", &name, "hash", &hash.as_bytes())?; + } + + // install systemd service and timer + let mut handlebars = Handlebars::new(); + handlebars.set_strict_mode(true); + //////////////////////////////////////////////////// + // template cron-pod.service to /var/lib/state/store/cronjob//systemd.service + //////////////////////////////////////////////////// + + handlebars.register_template_string("unit", include_str!("./resources/skate-ipvsmon.service")).map_err(|e| anyhow!(e).context("failed to load service template file"))?; + + + // cidr is 10.30.0.0/16 + // we just keep incrementing + let service_subnet_start = "10.30.0.0"; + + let ip = lock_file("/var/lib/skate/keepalived/service-ips.lock", Box::new(move || { + info!("reading ip file"); + + + let last_ip = fs::read_to_string("/var/lib/skate/keepalived/service-ips").unwrap_or_default(); + info!("converting {} to Ipv4Addr", last_ip); + let mut last_ip = Ipv4Addr::from_str(&last_ip).unwrap_or_else(|_| Ipv4Addr::from_str(service_subnet_start).unwrap()); + + info!("last ip: {}", last_ip); + + let mut octets = last_ip.octets(); + + if octets[3] == 255 { + if octets[2] == 255 { + return Err(anyhow!("no more ips available on subnet {}/16", service_subnet_start).into()); + } + octets[2] += 1; + octets[3] = 0; + } else { + octets[3] += 1; + } + + let ip = Ipv4Addr::from(octets); + + fs::write("/var/lib/skate/keepalived/service-ips", ip.to_string())?; + + Ok(ip.to_string()) + }))?; + + let json: Value = json!({ + "svc_name":name, + "ip": ip, + "yaml_path": yaml_path, + }); + + let file = std::fs::OpenOptions::new().write(true).create(true).truncate(true).open(&format!("/etc/systemd/system/skate-ipvsmon-{}.service", &name))?; + handlebars.render_to_write("unit", &json, file)?; + + handlebars.register_template_string("timer", include_str!("./resources/skate-ipvsmon.timer")).map_err(|e| anyhow!(e).context("failed to load timer template file"))?; + let json: Value = json!({ + "svc_name":name, + }); + let file = std::fs::OpenOptions::new().write(true).create(true).truncate(true).open(&format!("/etc/systemd/system/skate-ipvsmon-{}.timer", &name))?; + handlebars.render_to_write("timer", &json, file)?; + let unit_name = format!("skate-ipvsmon-{}", &name); + + exec_cmd("systemctl", &["daemon-reload"])?; + exec_cmd("systemctl", &["enable", "--now", &unit_name])?; + exec_cmd("systemctl", &["reset-failed", &unit_name])?; + + let domain = format!("{}.svc.cluster.skate", name); + dns::add_misc_host(ip, domain.clone(), domain)?; + + Ok(()) + } + + fn remove_service(&self, service: Service) -> Result<(), Box> { + let ns_name = metadata_name(&service); + + let _ = exec_cmd("systemctl", &["stop", &format!("skate-ipvsmon-{}", &ns_name.to_string())]); + + let _ = exec_cmd("systemctl", &["disable", &format!("skate-ipvsmon-{}", &ns_name.to_string())]); + let _ = exec_cmd("rm", &[&format!("/etc/systemd/system/skate-ipvsmon-{}.service", &ns_name.to_string())]); + let _ = exec_cmd("rm", &[&format!("/etc/systemd/system/skate-ipvsmon-{}.timer", &ns_name.to_string())]); + let _ = exec_cmd("rm", &[&format!("/var/lib/skate/keepalived/{}.conf", &ns_name.to_string())]); + let _ = exec_cmd("systemctl", &["daemon-reload"])?; + let _ = exec_cmd("systemctl", &["reset-failed"])?; + + let _ = self.store.remove_object("service", &ns_name.to_string())?; + + Ok(()) + } + + fn remove_deployment(&self, deployment: Deployment, grace_period: Option) -> Result<(), Box> { // find all pod ids for the deployment let name = deployment.metadata.name.unwrap(); @@ -369,7 +474,10 @@ impl Executor for DefaultExecutor { // just to check let object: SupportedResources = serde_yaml::from_str(manifest).expect("failed to deserialize manifest"); match object { - SupportedResources::Pod(_) | SupportedResources::Secret(_) | SupportedResources::Deployment(_) | SupportedResources::DaemonSet(_) => { + SupportedResources::Pod(_) + | SupportedResources::Secret(_) + | SupportedResources::Deployment(_) + | SupportedResources::DaemonSet(_) => { self.apply_play(object) } SupportedResources::Ingress(ingress) => { @@ -378,6 +486,9 @@ impl Executor for DefaultExecutor { SupportedResources::CronJob(cron) => { self.apply_cronjob(cron) } + SupportedResources::Service(service) => { + self.apply_service(service) + } } } @@ -403,6 +514,9 @@ impl Executor for DefaultExecutor { SupportedResources::Secret(secret) => { self.remove_secret(secret) } + SupportedResources::Service(service) => { + self.remove_service(service) + } } } } diff --git a/src/lib.rs b/src/lib.rs index 4fb0846..6eecf5d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ mod filestore; mod cron; mod logs; mod oci; +mod config_cmd; pub use skate::skate; pub use skatelet::skatelet; diff --git a/src/resources/keepalived-service.conf b/src/resources/keepalived-service.conf new file mode 100644 index 0000000..3044ae6 --- /dev/null +++ b/src/resources/keepalived-service.conf @@ -0,0 +1,16 @@ +{{#each manifest.spec.ports as |portspec|}} +virtual_server {{../host}} {{portspec.port}} { + delay_loop 3 + lb_algo rr + lb_kind NAT + protocol TCP + + {{#each ../target_ips as |rs|}} + real_server {{rs}} {{#if portspec.target_port}}{{portspect.target_port}}{{else}}{{portspec.port}}{{/if}} { + TCP_CHECK { + connect_timeout 10 + } + } + {{/each}} +} +{{/each}} diff --git a/src/resources/keepalived.conf b/src/resources/keepalived.conf new file mode 100644 index 0000000..587fd17 --- /dev/null +++ b/src/resources/keepalived.conf @@ -0,0 +1 @@ +include /var/lib/skate/keepalived/*.conf diff --git a/src/resources/skate-ipvsmon.service b/src/resources/skate-ipvsmon.service new file mode 100644 index 0000000..ac99282 --- /dev/null +++ b/src/resources/skate-ipvsmon.service @@ -0,0 +1,15 @@ +[Unit] +Description=Update keepalived config for service {{svc_name}} +Requires=network-online.target +After=network-online.target +Wants=skate-ipvsmon-{{svc_name}}.timer + +[Service] +Restart=no +ExecStart=/usr/local/bin/skatelet ipvsmon {{ip}} {{yaml_path}} --out /var/lib/skate/keepalived/{{svc_name}}.conf +User=root +Group=root +Type=oneshot + +[Install] +WantedBy=multi-user.target diff --git a/src/resources/skate-ipvsmon.timer b/src/resources/skate-ipvsmon.timer new file mode 100644 index 0000000..f554ad3 --- /dev/null +++ b/src/resources/skate-ipvsmon.timer @@ -0,0 +1,11 @@ +[Unit] +Description=Skate IPVS Manger for {{svc_name}} +Requires=network-online.target + +[Timer] +OnCalendar=*-*-* *:*:0/2 +Unit=skate-ipvsmon-{{svc_name}}.service +AccuracySec=1s + +[Install] +WantedBy=timers.target \ No newline at end of file diff --git a/src/scheduler.rs b/src/scheduler.rs index 19d6ca1..e2063c6 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -7,7 +7,7 @@ use itertools::Itertools; use k8s_openapi::api::apps::v1::{DaemonSet, Deployment}; use k8s_openapi::api::batch::v1::CronJob; -use k8s_openapi::api::core::v1::{Node as K8sNode, Pod, Secret}; +use k8s_openapi::api::core::v1::{Node as K8sNode, Pod, Secret, Service}; use k8s_openapi::api::networking::v1::Ingress; use k8s_openapi::Metadata; @@ -330,7 +330,6 @@ impl DefaultScheduler { } fn plan_cronjob(state: &ClusterState, cron: &CronJob) -> Result> { - let name = metadata_name(cron); let mut new_cron = cron.clone(); @@ -348,7 +347,6 @@ impl DefaultScheduler { match existing_cron { Some(c) => { if c.0.manifest_hash == new_hash { - actions.push(ScheduledOperation { resource: SupportedResources::CronJob(new_cron), error: None, @@ -357,7 +355,6 @@ impl DefaultScheduler { }); // nothing to do } else { - actions.push(ScheduledOperation { resource: SupportedResources::CronJob(new_cron.clone()), error: None, @@ -371,19 +368,15 @@ impl DefaultScheduler { operation: OpType::Create, node: None, }); - - } } None => { - actions.push(ScheduledOperation { resource: SupportedResources::CronJob(new_cron), error: None, operation: OpType::Create, node: None, }); - } } @@ -392,7 +385,6 @@ impl DefaultScheduler { // if so compare hashes, if differ then create, otherwise no change - Ok(ApplyPlan { actions, }) @@ -400,13 +392,11 @@ impl DefaultScheduler { // just apply on all nodes fn plan_secret(state: &ClusterState, secret: &Secret) -> Result> { - let mut actions = vec!(); for node in state.nodes.iter() { - actions.extend([ - ScheduledOperation{ + ScheduledOperation { resource: SupportedResources::Secret(secret.clone()), error: None, operation: OpType::Create, @@ -416,6 +406,61 @@ impl DefaultScheduler { } + Ok(ApplyPlan { + actions, + }) + } + fn plan_service(state: &ClusterState, service: &Service) -> Result> { + let name = metadata_name(service); + + let mut actions = vec!(); + + + let mut new_service = service.clone(); + + let new_hash = hash_k8s_resource(&mut new_service); + + + for node in state.nodes.iter() { + let existing_service = state.locate_service(&node.node_name, &name.name, &name.namespace); + match existing_service { + Some(c) => { + if c.0.manifest_hash == new_hash { + actions.push(ScheduledOperation { + resource: SupportedResources::Service(new_service.clone()), + error: None, + operation: OpType::Unchanged, + node: Some(node.clone()), + }); + // nothing to do + } else { + actions.push(ScheduledOperation { + resource: SupportedResources::Service(new_service.clone()), + error: None, + operation: OpType::Delete, + node: Some(node.clone()), + }); + + actions.push(ScheduledOperation { + resource: SupportedResources::Service(new_service.clone()), + error: None, + operation: OpType::Create, + node: Some(node.clone()), + }); + } + } + None => { + actions.push(ScheduledOperation { + resource: SupportedResources::Service(new_service.clone()), + error: None, + operation: OpType::Create, + node: Some(node.clone()), + }); + } + } + } + + Ok(ApplyPlan { actions, }) @@ -438,7 +483,6 @@ impl DefaultScheduler { match existing_ingress { Some(c) => { if c.0.manifest_hash == new_hash { - actions.push(ScheduledOperation { resource: SupportedResources::Ingress(new_ingress.clone()), error: None, @@ -447,7 +491,6 @@ impl DefaultScheduler { }); // nothing to do } else { - actions.push(ScheduledOperation { resource: SupportedResources::Ingress(new_ingress.clone()), error: None, @@ -461,23 +504,17 @@ impl DefaultScheduler { operation: OpType::Create, node: Some(node.clone()), }); - - } } None => { - actions.push(ScheduledOperation { resource: SupportedResources::Ingress(new_ingress.clone()), error: None, operation: OpType::Create, node: Some(node.clone()), }); - } } - - } Ok(ApplyPlan { @@ -493,10 +530,11 @@ impl DefaultScheduler { SupportedResources::Ingress(ingress) => Self::plan_ingress(state, ingress), SupportedResources::CronJob(cron) => Self::plan_cronjob(state, cron), SupportedResources::Secret(secret) => Self::plan_secret(state, secret), + SupportedResources::Service(service) => Self::plan_service(state, service), } } - async fn remove_existing(conns: &SshClients, resource: ScheduledOperation) -> Result<(String,String), Box> { + async fn remove_existing(conns: &SshClients, resource: ScheduledOperation) -> Result<(String, String), Box> { let conn = conns.find(&resource.node.unwrap().node_name).ok_or("failed to find connection to host")?; let manifest = serde_yaml::to_string(&resource.resource).expect("failed to serialize manifest"); diff --git a/src/skate.rs b/src/skate.rs index e11788c..519ae92 100644 --- a/src/skate.rs +++ b/src/skate.rs @@ -5,7 +5,7 @@ use async_trait::async_trait; use clap::{Args, Command, Parser, Subcommand}; use k8s_openapi::{List, Metadata, NamespaceResourceScope, Resource, ResourceScope}; use k8s_openapi::api::apps::v1::{DaemonSet, Deployment, DeploymentSpec}; -use k8s_openapi::api::core::v1::{Container, Pod, PodTemplateSpec, Secret}; +use k8s_openapi::api::core::v1::{Container, Pod, PodTemplateSpec, Secret, Service}; use serde_yaml; use serde::{Deserialize, Serialize}; use tokio; @@ -31,6 +31,7 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use serde_yaml::{Error as SerdeYamlError, Value}; use crate::config; use crate::config::{cache_dir, Config, Node}; +use crate::config_cmd::ConfigArgs; use crate::create::{create, CreateArgs}; use crate::delete::{delete, DeleteArgs}; use crate::get::{get, GetArgs}; @@ -59,6 +60,7 @@ enum Commands { Get(GetArgs), Describe(DescribeArgs), Logs(LogArgs), + Config(ConfigArgs) } #[derive(Debug, Clone, Args)] @@ -81,6 +83,7 @@ pub async fn skate() -> Result<(), Box> { Commands::Get(args) => get(args).await, Commands::Describe(args) => describe(args).await, Commands::Logs(args) => logs(args).await, + Commands::Config(args) => crate::config_cmd::config(args), _ => Ok(()) } } @@ -93,6 +96,7 @@ pub enum ResourceType { Ingress, CronJob, Secret, + Service } #[derive(Debug, Serialize, Deserialize, Display, Clone)] @@ -109,6 +113,8 @@ pub enum SupportedResources { CronJob(CronJob), #[strum(serialize = "Secret")] Secret(Secret), + #[strum(serialize = "Service")] + Service(Service), } @@ -121,6 +127,7 @@ impl SupportedResources { SupportedResources::Ingress(r) => metadata_name(r), SupportedResources::CronJob(r) => metadata_name(r), SupportedResources::Secret(s) => metadata_name(s), + SupportedResources::Service(s) => metadata_name(s), } } @@ -133,6 +140,7 @@ impl SupportedResources { SupportedResources::Ingress(_) => false, SupportedResources::CronJob(c) => c.clone().spec.unwrap_or_default().job_template.spec.unwrap_or_default().template.spec.unwrap_or_default().host_network.unwrap_or_default(), SupportedResources::Secret(_) => false, + SupportedResources::Service(_) => false, } } fn fixup_pod_template(template: PodTemplateSpec, ns: &str) -> Result> { @@ -352,6 +360,22 @@ impl SupportedResources { }; resource } + SupportedResources::Service(ref mut s) => { + let original_name = s.metadata.name.clone().unwrap_or("".to_string()); + if s.metadata.name.is_none() { + return Err(anyhow!("metadata.name is empty").into()); + } + if s.metadata.namespace.is_none() { + return Err(anyhow!("metadata.namespace is empty").into()); + } + + let mut extra_labels = HashMap::from([]); + + s.metadata = Self::fixup_metadata(s.metadata.clone(), Some(extra_labels))?; + // set name to be name.namespace + s.metadata.name = Some(format!("{}", metadata_name(s))); + resource + } }; Ok(resource) } @@ -417,7 +441,13 @@ pub fn read_manifests(filenames: Vec) -> Result, { let secret: Secret = serde::Deserialize::deserialize(value)?; result.push(SupportedResources::Secret(secret)) - } + } else if + api_version == Service::API_VERSION && + kind == Service::KIND + { + let service: Service = serde::Deserialize::deserialize(value)?; + result.push(SupportedResources::Service(service)) + } } _ => { return Err(anyhow!(format!("kind {:?}", kind)).context("unsupported resource type").into()); diff --git a/src/skatelet/delete.rs b/src/skatelet/delete.rs index 4d509a9..c3568cc 100644 --- a/src/skatelet/delete.rs +++ b/src/skatelet/delete.rs @@ -5,13 +5,14 @@ use std::io::Read; use clap::{Args, Subcommand}; use crate::executor::{DefaultExecutor, Executor}; use crate::skate::SupportedResources; -use crate::skate::SupportedResources::{CronJob, Ingress}; +use crate::skate::SupportedResources::{CronJob, Ingress, Service}; use crate::skatelet::apply::StdinCommand; use k8s_openapi::api::batch::v1::CronJob as K8sCronJob; use k8s_openapi::api::core::v1::Secret; use k8s_openapi::api::networking::v1::Ingress as K8sIngress; +use k8s_openapi::api::core::v1::Service as K8sService; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; #[derive(Debug, Args, Clone)] @@ -31,8 +32,10 @@ pub enum DeleteResourceCommands { Secret(DeleteResourceArgs), Deployment(DeleteResourceArgs), Daemonset(DeleteResourceArgs), + Service(DeleteResourceArgs), } + #[derive(Debug, Args, Clone)] pub struct DeleteArgs { #[arg(short, long, long_help("Number of seconds to wait before hard killing."))] @@ -49,6 +52,7 @@ pub fn delete(args: DeleteArgs) -> Result<(), Box> { DeleteResourceCommands::Secret(resource_args) => delete_secret(args.clone(), resource_args.clone()), DeleteResourceCommands::Daemonset(resource_args) => delete_daemonset(args.clone(), resource_args.clone()), DeleteResourceCommands::Deployment(resource_args) => delete_deployment(args.clone(), resource_args.clone()), + DeleteResourceCommands::Service(resource_args) => delete_service(args.clone(), resource_args.clone()) } } @@ -70,6 +74,23 @@ pub fn delete_ingress(delete_args: DeleteArgs, resource_args: DeleteResourceArgs }), delete_args.termination_grace_period) } +pub fn delete_service(delete_args: DeleteArgs, resource_args: DeleteResourceArgs) -> Result<(), Box> { + let executor = DefaultExecutor::new(); + let mut meta = ObjectMeta::default(); + meta.name = Some(resource_args.name.clone()); + meta.namespace = Some(resource_args.namespace.clone()); + meta.labels = Some(BTreeMap::from([ + ("skate.io/name".to_string(), resource_args.name), + ("skate.io/namespace".to_string(), resource_args.namespace), + ])); + + executor.manifest_delete(Service(K8sService { + metadata: meta, + spec: None, + status: None, + }), delete_args.termination_grace_period) +} + pub fn delete_cronjob(delete_args: DeleteArgs, resource_args: DeleteResourceArgs) -> Result<(), Box> { let executor = DefaultExecutor::new(); let mut meta = ObjectMeta::default(); diff --git a/src/skatelet/dns.rs b/src/skatelet/dns.rs index f3390cb..69ad14f 100644 --- a/src/skatelet/dns.rs +++ b/src/skatelet/dns.rs @@ -1,5 +1,5 @@ use std::error::Error; -use std::{fs, process}; +use std::{fs, panic, process}; use std::fs::{File, OpenOptions}; use std::io::{BufRead, BufReader, BufWriter}; use std::path::Path; @@ -7,12 +7,13 @@ use anyhow::anyhow; use clap::{Args, Subcommand}; use fs2::FileExt; use log::{debug, info, warn, LevelFilter}; -use crate::util::{spawn_orphan_process, NamespacedName}; +use crate::util::{lock_file, spawn_orphan_process, NamespacedName}; use std::io::prelude::*; use std::process::Stdio; use serde_json::Value; use syslog::{BasicLogger, Facility, Formatter3164}; use crate::skate::exec_cmd; +use crate::skatelet::skatelet::log_panic; #[derive(Debug, Subcommand)] pub enum Command { @@ -27,6 +28,9 @@ pub struct DnsArgs { } pub fn dns(args: DnsArgs) -> Result<(), Box> { + panic::set_hook(Box::new(move |info| { + log_panic(info) + })); match args.command { Command::Add(add_args) => add(add_args.container_id, add_args.ip), Command::Remove(remove_args) => remove(remove_args.container_id), @@ -39,18 +43,7 @@ fn conf_path_str() -> String { } fn lock(cb: Box Result>>) -> Result> { - let lock_path = Path::new(&conf_path_str()).join("lock"); - let lock_file = File::create(lock_path.clone()).map_err(|e| anyhow!("failed to create/open lock file: {}", e))?; - info!("waiting for lock on {}", lock_path.display()); - lock_file.lock_exclusive()?; - info!("locked {}", lock_path.display()); - - let result = cb(); - - lock_file.unlock()?; - info!("unlocked {}", lock_path.display()); - - result + lock_file(&format!("{}/lock", conf_path_str()), cb) } fn ensure_skatelet_dns_conf_dir() { @@ -88,6 +81,34 @@ fn retry(retries: u32, f: impl Fn() -> Result)>) -> } } +pub fn add_misc_host(ip: String, domain: String, tag: String) -> Result<(), Box> { + ensure_skatelet_dns_conf_dir(); + let log_tag = "add_misc_host"; + + info!("{} dns add for {} {} # {}", log_tag, domain, ip, tag); + + let addnhosts_path = Path::new(&conf_path_str()).join("addnhosts"); + + lock(Box::new(move || { + + // scope to make sure files closed after + { + debug!("{} updating hosts file", log_tag); + // create or open + let mut addhosts_file = OpenOptions::new() + .create(true) + .write(true) + .append(true) + .open(addnhosts_path).map_err(|e| anyhow!("failed to open addnhosts file: {}", e))?; + + // write with comment for now + writeln!(addhosts_file, "{} {} # {}", ip, domain, tag).map_err(|e| anyhow!("failed to write host to file: {}", e))?; + } + + Ok(()) + })) +} + pub fn add(container_id: String, supplied_ip: Option) -> Result<(), Box> { ensure_skatelet_dns_conf_dir(); let log_tag = format!("{}::add", container_id); @@ -144,14 +165,14 @@ pub fn add(container_id: String, supplied_ip: Option) -> Result<(), Box< parent_resource = "deployment"; } else { info!("not a daemonset or deployment, skipping"); - return Ok(()) + return Ok(()); } let parent_identifer_label = format!("skate.io/{}", parent_resource); let app = labels.get(&parent_identifer_label).unwrap().as_str().unwrap(); - let domain = format!("{}.{}.cluster.skate", app, ns); + let domain = format!("{}.{}.pod.cluster.skate", app, ns); let addnhosts_path = Path::new(&conf_path_str()).join("addnhosts"); let container_id_cpy = container_id.clone(); @@ -301,9 +322,9 @@ pub struct RemoveArgs { container_id: String, } -pub fn remove(container_id: String) -> Result<(), Box> { - let log_tag = format!("{}::remove", container_id); - info!("{} removing dns entry for {}", log_tag, container_id); +pub fn remove(tag: String) -> Result<(), Box> { + let log_tag = format!("{}::remove", tag); + info!("{} removing dns entry for {}", log_tag, tag); ensure_skatelet_dns_conf_dir(); let addnhosts_path = Path::new(&conf_path_str()).join("addnhosts"); let newaddnhosts_path = Path::new(&conf_path_str()).join("addnhosts-new"); @@ -333,7 +354,7 @@ pub fn remove(container_id: String) -> Result<(), Box> { for (_index, line) in reader.lines().enumerate() { let line = line?; - if !line.ends_with(&container_id) { + if !line.ends_with(&tag) { writeln!(writer, "{}", line)?; } } diff --git a/src/skatelet/ipvsmon.rs b/src/skatelet/ipvsmon.rs new file mode 100644 index 0000000..579edfb --- /dev/null +++ b/src/skatelet/ipvsmon.rs @@ -0,0 +1,88 @@ +use crate::skate::exec_cmd; +use crate::util::{spawn_orphan_process, NamespacedName}; +use anyhow::anyhow; +use clap::{Args, Subcommand}; +use fs2::FileExt; +use handlebars::Handlebars; +use k8s_openapi::api::core::v1::Service; +use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; +use log::{debug, info, warn, LevelFilter}; +use serde_json::{json, Value}; +use std::error::Error; +use std::fs::{File, OpenOptions}; +use std::io::prelude::*; +use std::io::{BufRead, BufReader, BufWriter}; +use std::net::{SocketAddr, ToSocketAddrs}; +use std::path::Path; +use std::process::Stdio; +use std::{fs, process}; +use std::hash::{DefaultHasher, Hash, Hasher}; +use itertools::Itertools; +use syslog::{BasicLogger, Facility, Formatter3164}; + +#[derive(Debug, Args)] +pub struct IpvsmonArgs { + #[arg(long, long_help = "Name of the file to write keepalived config to.")] + out: String, + host: String, + file: String, +} + +pub fn ipvsmon(args: IpvsmonArgs) -> Result<(), Box> { + // args.service_name is fqn like foo.bar + let mut manifest: Service = serde_yaml::from_str(&fs::read_to_string(args.file)?)?; + let spec = manifest.spec.clone().unwrap_or_default(); + let name = spec.selector.unwrap_or_default().get("app.kubernetes.io/name").unwrap_or(&"default".to_string()).clone(); + if name == "" { + return Err(anyhow!("service selector app.kubernetes.io/name is required").into()); + } + let ns = manifest.metadata.namespace.unwrap_or("default".to_string()); + let fqn = NamespacedName { name, namespace: ns.clone() }; + manifest.metadata.namespace = Some(ns); + + + let domain = format!("{}.pod.cluster.skate:80", fqn); + // get all pod ips from dns .cluster.skate + info!("looking up ips for {}", &domain); + let mut addrs: Vec<_> = domain.to_socket_addrs().unwrap_or_default() + .map(|addr| addr.ip().to_string()).sorted().collect(); + + + let mut hasher = DefaultHasher::new(); + addrs.hash(&mut hasher); + let new_hash = format!("{:x}", hasher.finish()); + let hash_file_name = format!("/run/skatelet-ipvsmon-{}.hash", fqn); + + let old_hash = fs::read_to_string(&hash_file_name).unwrap_or_default(); + + // hashes match and output file exists + if old_hash == new_hash && Path::new(&args.out).exists() { + info!("ips haven't changed: {:?}", &addrs); + return Ok(()); + } + info!("ips changed, rewriting keepalived config for {} -> {:?}", &args.host, &addrs); + + fs::write(&hash_file_name, new_hash)?; + + + // rewrite keepalived include file + let mut handlebars = Handlebars::new(); + handlebars.set_strict_mode(true); + + handlebars.register_template_string("keepalived", include_str!("../resources/keepalived-service.conf")).map_err(|e| anyhow!(e).context("failed to load keepalived file"))?; + + // write config + { + let file = OpenOptions::new().write(true).create(true).truncate(true).open(args.out)?; + handlebars.render_to_write("keepalived", &json!({ + "host": args.host, + "manifest": manifest, + "target_ips": addrs, + }), file)?; + } + + + // reload keepalived + let _ = exec_cmd("systemctl", &["reload", "keepalived"])?; + Ok(()) +} \ No newline at end of file diff --git a/src/skatelet/mod.rs b/src/skatelet/mod.rs index 38d82f6..9ec2ddc 100644 --- a/src/skatelet/mod.rs +++ b/src/skatelet/mod.rs @@ -7,6 +7,7 @@ mod template; mod delete; pub(crate) mod dns; mod oci; +mod ipvsmon; pub use skatelet::skatelet; pub use system::SystemInfo; diff --git a/src/skatelet/oci.rs b/src/skatelet/oci.rs index d8ed2ef..2403636 100644 --- a/src/skatelet/oci.rs +++ b/src/skatelet/oci.rs @@ -1,9 +1,11 @@ use std::error::Error; +use std::panic; use std::process::{exit, Command, Stdio}; use clap::{Args, Subcommand}; use log::{error, info}; use strum_macros::EnumString; use crate::skatelet::dns; +use crate::skatelet::skatelet::log_panic; use crate::util::spawn_orphan_process; #[derive(EnumString, Debug, Subcommand)] @@ -18,6 +20,10 @@ pub struct OciArgs { } pub(crate) fn oci(args: OciArgs) -> Result<(), Box> { + + panic::set_hook(Box::new(move |info| { + log_panic(info) + })); let result = match args.commands { Commands::Poststart => post_start(), Commands::Poststop => post_stop(), diff --git a/src/skatelet/skatelet.rs b/src/skatelet/skatelet.rs index 203cf55..666048d 100644 --- a/src/skatelet/skatelet.rs +++ b/src/skatelet/skatelet.rs @@ -1,5 +1,6 @@ use std::error::Error; use std::{panic, process, thread}; +use std::panic::PanicInfo; use clap::{Parser, Subcommand}; use log::{error, LevelFilter}; use strum::AsStaticRef; @@ -10,6 +11,7 @@ use crate::skatelet::apply::{ApplyArgs}; use crate::skatelet::cni::cni; use crate::skatelet::delete::{delete, DeleteArgs}; use crate::skatelet::dns::{dns, DnsArgs}; +use crate::skatelet::ipvsmon::{ipvsmon, IpvsmonArgs}; use crate::skatelet::oci::{oci, OciArgs}; use crate::skatelet::system::{system, SystemArgs}; use crate::skatelet::template::{template, TemplateArgs}; @@ -33,6 +35,39 @@ enum Commands { Dns(DnsArgs), Cni, Oci(OciArgs), + Ipvsmon(IpvsmonArgs) +} + +pub fn log_panic(info: &PanicInfo) { + + let thread = thread::current(); + let thread = thread.name().unwrap_or(""); + + let msg = match info.payload().downcast_ref::<&'static str>() { + Some(s) => *s, + None => match info.payload().downcast_ref::() { + Some(s) => &**s, + None => "Box", + }, + }; + + match info.location() { + Some(location) => { + error!( + target: "panic", "thread '{}' panicked at '{}': {}:{}", + thread, + msg, + location.file(), + location.line(), + ); + } + None => error!( + target: "panic", + "thread '{}' panicked at '{}'", + thread, + msg, + ), + } } pub async fn skatelet() -> Result<(), Box> { @@ -53,37 +88,6 @@ pub async fn skatelet() -> Result<(), Box> { log::set_boxed_logger(Box::new(BasicLogger::new(logger))) .map(|()| log::set_max_level(LevelFilter::Debug))?; - panic::set_hook(Box::new(move |info| { - - let thread = thread::current(); - let thread = thread.name().unwrap_or(""); - - let msg = match info.payload().downcast_ref::<&'static str>() { - Some(s) => *s, - None => match info.payload().downcast_ref::() { - Some(s) => &**s, - None => "Box", - }, - }; - - match info.location() { - Some(location) => { - error!( - target: "panic", "thread '{}' panicked at '{}': {}:{}", - thread, - msg, - location.file(), - location.line(), - ); - } - None => error!( - target: "panic", - "thread '{}' panicked at '{}'", - thread, - msg, - ), - } - })); let result = match args.command { Commands::Apply(args) => apply::apply(args), @@ -96,6 +100,7 @@ pub async fn skatelet() -> Result<(), Box> { }, Commands::Dns(args) => dns(args), Commands::Oci(args) => oci(args), + Commands::Ipvsmon(args) => ipvsmon(args), // _ => Ok(()) }; match result { diff --git a/src/skatelet/system.rs b/src/skatelet/system.rs index 65e41a7..e319582 100644 --- a/src/skatelet/system.rs +++ b/src/skatelet/system.rs @@ -60,6 +60,7 @@ pub struct SystemInfo { pub ingresses: Option>, pub cronjobs: Option>, pub secrets: Option>, + pub services: Option>, pub cpu_freq_mhz: u64, pub cpu_usage: f32, pub cpu_brand: String, @@ -140,6 +141,7 @@ async fn info() -> Result<(), Box> { // list ingresses let ingresses = store.list_objects("ingress")?; let cronjobs = store.list_objects("cronjob")?; + let services = store.list_objects("service")?; let secrets = exec_cmd("podman", &["secret", "ls", "--noheading"]).unwrap_or_else(|e| { @@ -235,10 +237,14 @@ async fn info() -> Result<(), Box> { true => None, false => Some(cronjobs), }, - secrets: match secrets.is_empty() { + secrets: match secret_info.is_empty() { true => None, false => Some(secret_info), }, + services: match services.is_empty() { + true => None, + false => Some(services), + }, hostname: sysinfo::System::host_name().unwrap_or("".to_string()), internal_ip_address: internal_ip_addr, }; diff --git a/src/ssh.rs b/src/ssh.rs index ad2a896..9fc1be0 100644 --- a/src/ssh.rs +++ b/src/ssh.rs @@ -204,7 +204,7 @@ echo ovs=$(cat /tmp/ovs-$$); _ => { let message = match result.stderr.len() { 0 => result.stdout, - _ => result.stderr + _ => result.stderr, }; Err(anyhow!("failed to apply resource: exit code {}, {}", result.exit_status, message).into()) } diff --git a/src/state/state.rs b/src/state/state.rs index 9866401..dd18adb 100644 --- a/src/state/state.rs +++ b/src/state/state.rs @@ -179,6 +179,8 @@ impl ClusterState { SupportedResources::Ingress(_) => Ok(ReconciledResult { removed: 1, added: 1, updated: 0 }), // TODO SupportedResources::CronJob(_) => Ok(ReconciledResult { removed: 1, added: 1, updated: 0 }), // TODO SupportedResources::Secret(_) => Ok(ReconciledResult { removed: 1, added: 1, updated: 0 }), // TODO + SupportedResources::Secret(_) => Ok(ReconciledResult { removed: 1, added: 1, updated: 0 }), // TODO + SupportedResources::Service(_) => Ok(ReconciledResult { removed: 1, added: 1, updated: 0 }), // TODO _ => todo!("reconcile not supported") } } @@ -315,6 +317,21 @@ impl ClusterState { res } + pub fn locate_service(&self, node:&str, name: &str, namespace: &str) -> Option<(ObjectListItem, &NodeState)> { + let res = self.nodes.iter().find(|n| n.node_name == node).and_then(|n| { + n.host_info.as_ref().and_then(|h| { + h.system_info.clone().and_then(|i| { + i.services.and_then(|p| { + p.clone().into_iter().find(|p| { + p.name.name == name && p.name.namespace == namespace + }).map(|p| (p, n)) + }) + }) + }) + }); + res + } + pub fn locate_deployment(&self, name: &str, namespace: &str) -> Vec<(PodmanPodInfo, &NodeState)> { let name = name.strip_prefix(format!("{}.", namespace).as_str()).unwrap_or(name); diff --git a/src/util.rs b/src/util.rs index 14aa6b7..16c64bc 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,12 +1,18 @@ use std::collections::hash_map::DefaultHasher; +use std::error::Error; use std::ffi::OsStr; use std::fmt::{Display, Formatter}; +use std::fs::File; use std::hash::{Hash, Hasher}; +use std::path::Path; +use anyhow::anyhow; use chrono::{DateTime, Local}; use deunicode::deunicode_char; +use fs2::FileExt; use itertools::Itertools; use k8s_openapi::{Metadata, NamespaceResourceScope}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; +use log::info; use serde::{Deserialize, Deserializer, Serialize}; pub const CHECKBOX_EMOJI: char = '✔'; @@ -214,4 +220,15 @@ where .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::null()) .spawn(); -} \ No newline at end of file +} +pub fn lock_file(file: &str, cb: Box Result>>) -> Result> { + let lock_path = Path::new(file); + let lock_file = File::create(lock_path.clone()).map_err(|e| anyhow!("failed to create/open lock file: {}", e))?; + info!("waiting for lock on {}", lock_path.display()); + lock_file.lock_exclusive()?; + info!("locked {}", lock_path.display()); + let result = cb(); + lock_file.unlock()?; + info!("unlocked {}", lock_path.display()); + result +}