From f3ea74f814fd4cc76f5b41a419ed7112086ac03e Mon Sep 17 00:00:00 2001 From: Sergio Lopez Date: Sat, 1 Jun 2024 00:58:09 +0200 Subject: [PATCH] Introduce a client-server model With the introduction of 47fdd45a (Default guest memory to 80% of total RAM on host), we're targeting a use case in which we expect to be running a single, large VM. To accommodate this use case, we're switching here to a client/server model. The first time krun is launched, in addition to running the command, it starts a small server in the VM listening on a TCP port. On the host side, it also creates a lock file, writes the listening port number to it, and calls flock() on it to prevent other instances from running in parallel. When it's executed again, krun detects there's another instance running, and contacts the server in the VM to request the launch of the command. The stdout and stderr of the command are redirected to $XDG_RUNTIME_DIR/krun-$COMMAND-$TIMESTAMP.[stdout|stderr] Signed-off-by: Sergio Lopez --- Cargo.lock | 64 +++++++++++ Cargo.toml | 2 + crates/krun-server/Cargo.toml | 22 ++++ crates/krun-server/src/bin/krun-server.rs | 33 ++++++ crates/krun-server/src/cli_options.rs | 38 +++++++ crates/krun-server/src/lib.rs | 2 + crates/krun-server/src/server.rs | 87 ++++++++++++++ crates/krun/Cargo.toml | 2 + crates/krun/src/bin/krun.rs | 45 ++++---- crates/krun/src/cli_options.rs | 8 ++ crates/krun/src/lib.rs | 1 + crates/krun/src/lock.rs | 132 ++++++++++++++++++++++ crates/krun/src/net.rs | 6 +- crates/utils/Cargo.toml | 6 + crates/utils/src/env.rs | 32 +++++- crates/utils/src/launch.rs | 10 ++ crates/utils/src/lib.rs | 1 + 17 files changed, 464 insertions(+), 27 deletions(-) create mode 100644 crates/krun-server/Cargo.toml create mode 100644 crates/krun-server/src/bin/krun-server.rs create mode 100644 crates/krun-server/src/cli_options.rs create mode 100644 crates/krun-server/src/lib.rs create mode 100644 crates/krun-server/src/server.rs create mode 100644 crates/krun/src/lock.rs create mode 100644 crates/utils/src/launch.rs diff --git a/Cargo.lock b/Cargo.lock index c7214ff..ac031be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -207,6 +207,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + [[package]] name = "krun" version = "0.1.0" @@ -218,6 +224,8 @@ dependencies = [ "log", "nix", "rustix", + "serde", + "serde_json", "utils", ] @@ -235,6 +243,19 @@ dependencies = [ "utils", ] +[[package]] +name = "krun-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "bpaf", + "env_logger", + "log", + "serde", + "serde_json", + "utils", +] + [[package]] name = "krun-sys" version = "1.9.0" @@ -372,6 +393,43 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "serde" +version = "1.0.203" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.203" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "shlex" version = "1.3.0" @@ -416,6 +474,12 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "utils" version = "0.0.0" +dependencies = [ + "anyhow", + "rustix", + "serde", + "serde_json", +] [[package]] name = "windows-sys" diff --git a/Cargo.toml b/Cargo.toml index ada9b01..fdeef66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,5 +20,7 @@ krun-sys = { path = "crates/krun-sys", default-features = false } log = { version = "0.4.21", default-features = false } nix = { version = "0.28.0", default-features = false } rustix = { version = "0.38.34", default-features = false } +serde = { version = "1.0.203", default-features = false } +serde_json = { version = "1.0.117", default-features = false } tempfile = { version = "3.10.1", default-features = false } utils = { path = "crates/utils", default-features = false } diff --git a/crates/krun-server/Cargo.toml b/crates/krun-server/Cargo.toml new file mode 100644 index 0000000..53eb5e4 --- /dev/null +++ b/crates/krun-server/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "krun-server" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +description = { workspace = true } +repository = { workspace = true } +license = { workspace = true } + +[dependencies] +anyhow = { workspace = true, features = ["std"] } +bpaf = { workspace = true, features = [] } +env_logger = { workspace = true, features = ["auto-color", "humantime", "unstable-kv"] } +log = { workspace = true, features = ["kv"] } +nix = { workspace = true, features = ["socket"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +utils = { workspace = true, features = [] } + +[features] +default = [] diff --git a/crates/krun-server/src/bin/krun-server.rs b/crates/krun-server/src/bin/krun-server.rs new file mode 100644 index 0000000..258f928 --- /dev/null +++ b/crates/krun-server/src/bin/krun-server.rs @@ -0,0 +1,33 @@ +use std::net::TcpListener; +use std::os::fd::AsRawFd; +use std::process::Command; + +use anyhow::Result; +use krun_server::cli_options::options; +use krun_server::server::start_server; +use nix::sys::socket::{shutdown, Shutdown}; + +fn main() -> Result<()> { + env_logger::init(); + + let options = options().run(); + + let listener = TcpListener::bind(format!("127.0.0.1:{}", &options.server_port))?; + let listener_fd = listener.as_raw_fd(); + + let server_thread = start_server(listener); + + let err = Command::new(&options.command) + .args(options.command_args) + .status(); + if let Err(err) = err { + println!("Error executing command {}: {}", options.command, err); + } + + shutdown(listener_fd, Shutdown::Both)?; + if let Err(err) = server_thread.join() { + println!("Error waiting for server thread termination: {err:?}"); + } + + Ok(()) +} diff --git a/crates/krun-server/src/cli_options.rs b/crates/krun-server/src/cli_options.rs new file mode 100644 index 0000000..161c7c3 --- /dev/null +++ b/crates/krun-server/src/cli_options.rs @@ -0,0 +1,38 @@ +use bpaf::{any, construct, env, positional, OptionParser, Parser}; + +#[derive(Clone, Debug)] +pub struct Options { + pub server_port: u32, + pub command: String, + pub command_args: Vec, +} + +pub fn options() -> OptionParser { + let server_port = env("KRUN_SERVER_PORT") + .help("TCP port to listen for command launch requests") + .argument::("KRUN_SERVER_PORT") + .fallback(3333); + let command = positional("COMMAND"); + let command_args = any::("COMMAND_ARGS", |arg| { + (!["--help", "-h"].contains(&&*arg)).then_some(arg) + }) + .many(); + + construct!(Options { + server_port, + // positionals + command, + command_args, + }) + .to_options() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn check_options() { + options().check_invariants(false) + } +} diff --git a/crates/krun-server/src/lib.rs b/crates/krun-server/src/lib.rs new file mode 100644 index 0000000..b8fd505 --- /dev/null +++ b/crates/krun-server/src/lib.rs @@ -0,0 +1,2 @@ +pub mod cli_options; +pub mod server; diff --git a/crates/krun-server/src/server.rs b/crates/krun-server/src/server.rs new file mode 100644 index 0000000..3ef0d3f --- /dev/null +++ b/crates/krun-server/src/server.rs @@ -0,0 +1,87 @@ +use std::{ + collections::HashMap, + env, + fs::File, + io::{BufRead, BufReader, Write}, + net::{TcpListener, TcpStream}, + path::Path, + process::Command, + thread::{self, JoinHandle}, + time::{SystemTime, UNIX_EPOCH}, +}; + +use anyhow::{anyhow, Result}; +use log::debug; +use utils::launch::Launch; + +pub fn start_server(listener: TcpListener) -> JoinHandle<()> { + thread::spawn(move || { + if let Err(err) = work(listener) { + debug!("krun server thread is terminating: {err:?}") + } + }) +} + +fn work(listener: TcpListener) -> Result<()> { + for stream in listener.incoming() { + let stream = stream?; + + if let Err(e) = handle_connection(stream) { + println!("Error processing client request: {e}"); + } + } + + Ok(()) +} + +fn read_request(mut stream: &TcpStream) -> Result { + let mut buf_reader = BufReader::new(&mut stream); + let mut buf = String::new(); + loop { + if buf_reader.read_line(&mut buf)? == 0 { + return Err(anyhow!("Unexpected EOF")); + } + if buf.contains("EOM") { + let launch: Launch = serde_json::from_str(&buf[..buf.len() - 5])?; + return Ok(launch); + } + } +} + +fn handle_connection(mut stream: TcpStream) -> Result<()> { + let mut envs: HashMap = env::vars().collect(); + + let launch = read_request(&stream)?; + envs.extend(launch.envs); + + let (stdout, stderr) = { + let base = if envs.contains_key("XDG_RUNTIME_DIR") { + Path::new(&envs["XDG_RUNTIME_DIR"]) + } else { + Path::new("/tmp") + }; + let ts = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(); + let path_stdout = base.join(format!("krun-{}-{}.stdout", launch.command, ts)); + let path_stderr = base.join(format!("krun-{}-{}.stderr", launch.command, ts)); + ( + File::create_new(path_stdout)?, + File::create_new(path_stderr)?, + ) + }; + + let err = Command::new(&launch.command) + .args(launch.args) + .envs(envs) + .stdout(stdout) + .stderr(stderr) + .spawn(); + if let Err(err) = err { + let error = format!("Error executing command {}: {}", launch.command, err); + let _ = stream.write_all(error.as_bytes()); + } else { + let _ = stream.write_all(b"OK"); + } + let _ = stream.flush(); + + Ok(()) +} diff --git a/crates/krun/Cargo.toml b/crates/krun/Cargo.toml index 6ba9aef..dec684e 100644 --- a/crates/krun/Cargo.toml +++ b/crates/krun/Cargo.toml @@ -16,6 +16,8 @@ krun-sys = { workspace = true, features = [] } log = { workspace = true, features = ["kv"] } nix = { workspace = true, features = ["user"] } rustix = { workspace = true, features = ["process", "std", "use-libc-auxv"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } utils = { workspace = true, features = [] } [features] diff --git a/crates/krun/src/bin/krun.rs b/crates/krun/src/bin/krun.rs index c7f12d0..21edd59 100644 --- a/crates/krun/src/bin/krun.rs +++ b/crates/krun/src/bin/krun.rs @@ -12,6 +12,7 @@ use anyhow::{anyhow, Context, Result}; use krun::{ cli_options::options, cpu::{get_fallback_cores, get_performance_cores}, + lock::{lock_or_connect, LockResult}, net::{connect_to_passt, start_passt}, types::{MiB, NetMode}, }; @@ -27,7 +28,7 @@ use rustix::{ io::Errno, process::{geteuid, getgid, getrlimit, getuid, sched_setaffinity, setrlimit, CpuSet, Resource}, }; -use utils::env::find_in_path; +use utils::env::find_krun_exec; fn main() -> Result<()> { env_logger::init(); @@ -39,6 +40,15 @@ fn main() -> Result<()> { let options = options().fallback_to_usage().run(); + let _lock_file = match lock_or_connect(&options)? { + LockResult::LaunchRequested => { + // There was a krun instance already running and we've requested it + // to launch the command successfully, so all the work is done. + return Ok(()); + }, + LockResult::LockAcquired(lock_file) => lock_file, + }; + { // Set the log level to "off". // @@ -136,7 +146,9 @@ fn main() -> Result<()> { .context("Failed to connect to `passt`")? .into() } else { - start_passt().context("Failed to start `passt`")?.into() + start_passt(options.server_port) + .context("Failed to start `passt`")? + .into() }; // SAFETY: `passt_fd` is an `OwnedFd` and consumed to prevent closing on drop. // See https://doc.rust-lang.org/std/io/index.html#io-safety @@ -196,28 +208,8 @@ fn main() -> Result<()> { } } - let krun_guest_path = - find_in_path("krun-guest").context("Failed to check existence of `krun-guest`")?; - let krun_guest_path = if let Some(krun_guest_path) = krun_guest_path { - krun_guest_path - } else { - let krun_path = env::current_exe().and_then(|p| p.canonicalize()); - let krun_path = krun_path.context("Failed to get path of current running executable")?; - krun_path.with_file_name(format!( - "{}-guest", - krun_path - .file_name() - .expect("krun_path should end with a file name") - .to_str() - .context("Failed to process `krun` file name as it contains invalid UTF-8")? - )) - }; - let krun_guest_path = CString::new( - krun_guest_path - .to_str() - .context("Failed to process `krun-guest` path as it contains invalid UTF-8")?, - ) - .context("Failed to process `krun-guest` path as it contains NUL character")?; + let krun_guest_path = find_krun_exec("krun-guest")?; + let krun_server_path = find_krun_exec("krun-server")?; let mut krun_guest_args: Vec = vec![ CString::new(username).expect("username should not contain NUL character"), @@ -226,6 +218,8 @@ fn main() -> Result<()> { CString::new(format!("{}", getgid().as_raw())) .expect("gid should not contain NUL character"), ]; + + krun_guest_args.push(krun_server_path); krun_guest_args.push( CString::new(options.command) .context("Failed to process command as it contains NUL character")?, @@ -236,6 +230,7 @@ fn main() -> Result<()> { .context("Failed to process command arg as it contains NUL character")?; krun_guest_args.push(s); } + let krun_guest_args: Vec<*const c_char> = { const KRUN_GUEST_ARGS_FIXED: usize = 4; // SAFETY: All pointers must be stored in the same allocation. @@ -317,6 +312,8 @@ fn main() -> Result<()> { env.push(s); } + env.push(CString::new(format!("KRUN_SERVER_PORT={}", options.server_port)).unwrap()); + debug!(env:?; "env vars"); let env: Vec<*const c_char> = { diff --git a/crates/krun/src/cli_options.rs b/crates/krun/src/cli_options.rs index 943cffa..f38738a 100644 --- a/crates/krun/src/cli_options.rs +++ b/crates/krun/src/cli_options.rs @@ -12,6 +12,7 @@ pub struct Options { pub mem: Option, pub net: NetMode, pub passt_socket: Option, + pub server_port: u32, pub command: String, pub command_args: Vec, } @@ -86,6 +87,12 @@ pub fn options() -> OptionParser { .help("Instead of starting passt, connect to passt socket at PATH") .argument("PATH") .optional(); + let server_port = long("server-port") + .short('p') + .help("Set the port to be used in server mode") + .argument("SERVER_PORT") + .fallback(3334) + .display_fallback(); let command = positional("COMMAND").help("the command you want to execute in the vm"); let command_args = any::("COMMAND_ARGS", |arg| { (!["--help", "-h"].contains(&&*arg)).then_some(arg) @@ -99,6 +106,7 @@ pub fn options() -> OptionParser { mem, net, passt_socket, + server_port, // positionals command, command_args, diff --git a/crates/krun/src/lib.rs b/crates/krun/src/lib.rs index 1ae1296..2f46467 100644 --- a/crates/krun/src/lib.rs +++ b/crates/krun/src/lib.rs @@ -1,4 +1,5 @@ pub mod cli_options; pub mod cpu; +pub mod lock; pub mod net; pub mod types; diff --git a/crates/krun/src/lock.rs b/crates/krun/src/lock.rs new file mode 100644 index 0000000..7f0682f --- /dev/null +++ b/crates/krun/src/lock.rs @@ -0,0 +1,132 @@ +use std::{ + collections::HashMap, + env, + fs::File, + io::{BufRead, BufReader, Read, Write}, + net::TcpStream, + path::Path, +}; + +use anyhow::{anyhow, Context, Result}; +use rustix::{ + fs::{flock, FlockOperation}, + path::Arg, +}; +use serde_json; +use utils::launch::Launch; + +use crate::cli_options::Options; + +pub enum LockResult { + LaunchRequested, + LockAcquired(File), +} + +pub fn lock_or_connect(options: &Options) -> Result { + let running_server_port = env::var("KRUN_SERVER_PORT").ok(); + if let Some(port) = running_server_port { + let port: u32 = port.parse()?; + request_launch(port, &options.command, &options.command_args, &options.env)?; + return Ok(LockResult::LaunchRequested); + } + + let (lock_file, running_server_port) = lock_file(options.server_port)?; + match lock_file { + Some(lock_file) => Ok(LockResult::LockAcquired(lock_file)), + None => { + if let Some(port) = running_server_port { + let mut tries = 0; + while let Err(e) = + request_launch(port, &options.command, &options.command_args, &options.env) + { + if tries == 3 { + return Err(e); + } else { + tries += 1; + } + } + Ok(LockResult::LaunchRequested) + } else { + Err(anyhow!( + "krun is already running but couldn't find its server port, bailing out" + )) + } + }, + } +} + +fn lock_file(server_port: u32) -> Result<(Option, Option)> { + let run_path = env::var("XDG_RUNTIME_DIR") + .context("Failed to read XDG_RUNTIME_DIR environment variable")?; + let lock_path = Path::new(&run_path).join("krun.lock"); + + let mut lock_file = if !lock_path.exists() { + let lock_file = File::create(lock_path).context("Can't create lock file")?; + flock(&lock_file, FlockOperation::NonBlockingLockExclusive) + .context("Can't acquire an exclusive lock on new lock file")?; + lock_file + } else { + let mut lock_file = File::options() + .write(true) + .read(true) + .open(lock_path) + .context("Can't create lock file")?; + let ret = flock(&lock_file, FlockOperation::NonBlockingLockExclusive); + if ret.is_err() { + let mut data: Vec = Vec::with_capacity(5); + lock_file.read_to_end(&mut data)?; + let port = match data.to_string_lossy().parse::() { + Ok(port) => { + if port > 1024 { + Some(port) + } else { + None + } + }, + Err(_) => None, + }; + return Ok((None, port)); + } + lock_file + }; + + lock_file.set_len(0)?; + lock_file.write_all(format!("{}", server_port).as_bytes())?; + Ok((Some(lock_file), None)) +} + +fn request_launch( + port: u32, + command: &String, + args: &[String], + envs: &Vec<(String, Option)>, +) -> Result<()> { + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?; + + let mut envs_map: HashMap = HashMap::new(); + for (k, v) in envs { + if let Some(v) = v { + envs_map.insert(k.to_string(), v.to_string()); + } + } + + let launch = Launch { + command: command.to_string(), + args: args.to_vec(), + envs: envs_map, + }; + + stream.write_all(serde_json::to_string(&launch)?.as_bytes())?; + stream.write_all(b"\nEOM\n")?; + stream.flush()?; + + let mut buf_reader = BufReader::new(&mut stream); + let mut resp = String::new(); + buf_reader.read_line(&mut resp)?; + + if resp == "OK" { + Ok(()) + } else { + Err(anyhow!("Error requesting launch to server: {resp}")) + } +} diff --git a/crates/krun/src/net.rs b/crates/krun/src/net.rs index 46b9f81..4ab1453 100644 --- a/crates/krun/src/net.rs +++ b/crates/krun/src/net.rs @@ -19,7 +19,7 @@ where Ok(UnixStream::connect(passt_socket_path)?) } -pub fn start_passt() -> Result { +pub fn start_passt(server_port: u32) -> Result { // SAFETY: The child process should not inherit the file descriptor of `parent_socket`. // There is no documented guarantee of this, but the implementation as of writing atomically // sets `SOCK_CLOEXEC`. @@ -43,7 +43,9 @@ pub fn start_passt() -> Result { // owned by the child process. // See https://doc.rust-lang.org/std/io/index.html#io-safety let child = Command::new("passt") - .args(["-q", "-f", "--fd"]) + .args(["-q", "-f", "-t"]) + .arg(format!("{server_port}:{server_port}")) + .arg("--fd") .arg(format!("{}", child_fd.into_raw_fd())) .spawn(); if let Err(err) = child { diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index ba20df6..017c807 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -7,5 +7,11 @@ repository = { workspace = true } license = { workspace = true } publish = false +[dependencies] +anyhow = { workspace = true, features = ["std"] } +rustix = { workspace = true, features = ["fs"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + [features] default = [] diff --git a/crates/utils/src/env.rs b/crates/utils/src/env.rs index 8497cd6..1c734a7 100644 --- a/crates/utils/src/env.rs +++ b/crates/utils/src/env.rs @@ -1,9 +1,13 @@ use std::{ - env, fs, io, + env, + ffi::CString, + fs, io, os::unix::fs::PermissionsExt as _, path::{Path, PathBuf}, }; +use anyhow::{Context, Result}; + pub fn find_in_path

(program: P) -> io::Result> where P: AsRef, @@ -37,3 +41,29 @@ where Ok(None) } + +pub fn find_krun_exec(name: &str) -> Result { + let krun_path = find_in_path(name).context("Failed to check existence of {name}")?; + let krun_path = if let Some(krun_path) = krun_path { + krun_path + } else { + let krun_path = env::current_exe().and_then(|p| p.canonicalize()); + let krun_path = krun_path.context("Failed to get path of current running executable")?; + krun_path.with_file_name(format!( + "{}-guest", + krun_path + .file_name() + .expect("krun_path should end with a file name") + .to_str() + .context("Failed to process `krun` file name as it contains invalid UTF-8")? + )) + }; + let krun_path = CString::new( + krun_path + .to_str() + .context("Failed to process {name} path as it contains invalid UTF-8")?, + ) + .context("Failed to process {name} path as it contains NUL character")?; + + Ok(krun_path) +} diff --git a/crates/utils/src/launch.rs b/crates/utils/src/launch.rs new file mode 100644 index 0000000..c31b95d --- /dev/null +++ b/crates/utils/src/launch.rs @@ -0,0 +1,10 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize)] +pub struct Launch { + pub command: String, + pub args: Vec, + pub envs: HashMap, +} diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 3d7924f..079241a 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -1 +1,2 @@ pub mod env; +pub mod launch;