diff --git a/Cargo.lock b/Cargo.lock index c7214ff..ac031be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -207,6 +207,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + [[package]] name = "krun" version = "0.1.0" @@ -218,6 +224,8 @@ dependencies = [ "log", "nix", "rustix", + "serde", + "serde_json", "utils", ] @@ -235,6 +243,19 @@ dependencies = [ "utils", ] +[[package]] +name = "krun-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "bpaf", + "env_logger", + "log", + "serde", + "serde_json", + "utils", +] + [[package]] name = "krun-sys" version = "1.9.0" @@ -372,6 +393,43 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "serde" +version = "1.0.203" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7253ab4de971e72fb7be983802300c30b5a7f0c2e56fab8abfc6a214307c0094" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.203" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "500cbc0ebeb6f46627f50f3f5811ccf6bf00643be300b4c3eabc0ef55dc5b5ba" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" +dependencies = [ + "itoa", + "ryu", + "serde", +] + [[package]] name = "shlex" version = "1.3.0" @@ -416,6 +474,12 @@ checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" [[package]] name = "utils" version = "0.0.0" +dependencies = [ + "anyhow", + "rustix", + "serde", + "serde_json", +] [[package]] name = "windows-sys" diff --git a/Cargo.toml b/Cargo.toml index ada9b01..fdeef66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,5 +20,7 @@ krun-sys = { path = "crates/krun-sys", default-features = false } log = { version = "0.4.21", default-features = false } nix = { version = "0.28.0", default-features = false } rustix = { version = "0.38.34", default-features = false } +serde = { version = "1.0.203", default-features = false } +serde_json = { version = "1.0.117", default-features = false } tempfile = { version = "3.10.1", default-features = false } utils = { path = "crates/utils", default-features = false } diff --git a/crates/krun-server/Cargo.toml b/crates/krun-server/Cargo.toml new file mode 100644 index 0000000..53eb5e4 --- /dev/null +++ b/crates/krun-server/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "krun-server" +version = { workspace = true } +authors = { workspace = true } +edition = { workspace = true } +rust-version = { workspace = true } +description = { workspace = true } +repository = { workspace = true } +license = { workspace = true } + +[dependencies] +anyhow = { workspace = true, features = ["std"] } +bpaf = { workspace = true, features = [] } +env_logger = { workspace = true, features = ["auto-color", "humantime", "unstable-kv"] } +log = { workspace = true, features = ["kv"] } +nix = { workspace = true, features = ["socket"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +utils = { workspace = true, features = [] } + +[features] +default = [] diff --git a/crates/krun-server/src/bin/krun-server.rs b/crates/krun-server/src/bin/krun-server.rs new file mode 100644 index 0000000..f8192c8 --- /dev/null +++ b/crates/krun-server/src/bin/krun-server.rs @@ -0,0 +1,33 @@ +use std::net::TcpListener; +use std::os::fd::AsRawFd; +use std::process::Command; + +use anyhow::Result; +use krun_server::cli_options::options; +use krun_server::server::start_server; +use nix::sys::socket::{shutdown, Shutdown}; + +fn main() -> Result<()> { + env_logger::init(); + + let options = options().run(); + + let listener = TcpListener::bind(format!("127.0.0.1:{}", &options.server_port))?; + let listener_fd = listener.as_raw_fd(); + + let server_thread = start_server(listener); + + let status = Command::new(&options.command) + .args(options.command_args) + .status(); + if let Err(err) = status { + println!("Error executing command {}: {}", options.command, err); + } + + shutdown(listener_fd, Shutdown::Both)?; + if let Err(err) = server_thread.join() { + println!("Error waiting for server thread termination: {err:?}"); + } + + Ok(()) +} diff --git a/crates/krun-server/src/cli_options.rs b/crates/krun-server/src/cli_options.rs new file mode 100644 index 0000000..c0980cf --- /dev/null +++ b/crates/krun-server/src/cli_options.rs @@ -0,0 +1,40 @@ +use bpaf::{any, construct, env, positional, OptionParser, Parser}; + +#[derive(Clone, Debug)] +pub struct Options { + pub server_port: u32, + pub command: String, + pub command_args: Vec, +} + +pub fn options() -> OptionParser { + let server_port = env("KRUN_SERVER_PORT") + .short('p') + .help("TCP port to listen for command launch requests") + .argument("SERVER_PORT") + .fallback(3333) + .display_fallback(); + let command = positional("COMMAND"); + let command_args = any::("COMMAND_ARGS", |arg| { + (!["--help", "-h"].contains(&&*arg)).then_some(arg) + }) + .many(); + + construct!(Options { + server_port, + // positionals + command, + command_args, + }) + .to_options() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn check_options() { + options().check_invariants(false) + } +} diff --git a/crates/krun-server/src/lib.rs b/crates/krun-server/src/lib.rs new file mode 100644 index 0000000..b8fd505 --- /dev/null +++ b/crates/krun-server/src/lib.rs @@ -0,0 +1,2 @@ +pub mod cli_options; +pub mod server; diff --git a/crates/krun-server/src/server.rs b/crates/krun-server/src/server.rs new file mode 100644 index 0000000..3ef0d3f --- /dev/null +++ b/crates/krun-server/src/server.rs @@ -0,0 +1,87 @@ +use std::{ + collections::HashMap, + env, + fs::File, + io::{BufRead, BufReader, Write}, + net::{TcpListener, TcpStream}, + path::Path, + process::Command, + thread::{self, JoinHandle}, + time::{SystemTime, UNIX_EPOCH}, +}; + +use anyhow::{anyhow, Result}; +use log::debug; +use utils::launch::Launch; + +pub fn start_server(listener: TcpListener) -> JoinHandle<()> { + thread::spawn(move || { + if let Err(err) = work(listener) { + debug!("krun server thread is terminating: {err:?}") + } + }) +} + +fn work(listener: TcpListener) -> Result<()> { + for stream in listener.incoming() { + let stream = stream?; + + if let Err(e) = handle_connection(stream) { + println!("Error processing client request: {e}"); + } + } + + Ok(()) +} + +fn read_request(mut stream: &TcpStream) -> Result { + let mut buf_reader = BufReader::new(&mut stream); + let mut buf = String::new(); + loop { + if buf_reader.read_line(&mut buf)? == 0 { + return Err(anyhow!("Unexpected EOF")); + } + if buf.contains("EOM") { + let launch: Launch = serde_json::from_str(&buf[..buf.len() - 5])?; + return Ok(launch); + } + } +} + +fn handle_connection(mut stream: TcpStream) -> Result<()> { + let mut envs: HashMap = env::vars().collect(); + + let launch = read_request(&stream)?; + envs.extend(launch.envs); + + let (stdout, stderr) = { + let base = if envs.contains_key("XDG_RUNTIME_DIR") { + Path::new(&envs["XDG_RUNTIME_DIR"]) + } else { + Path::new("/tmp") + }; + let ts = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis(); + let path_stdout = base.join(format!("krun-{}-{}.stdout", launch.command, ts)); + let path_stderr = base.join(format!("krun-{}-{}.stderr", launch.command, ts)); + ( + File::create_new(path_stdout)?, + File::create_new(path_stderr)?, + ) + }; + + let err = Command::new(&launch.command) + .args(launch.args) + .envs(envs) + .stdout(stdout) + .stderr(stderr) + .spawn(); + if let Err(err) = err { + let error = format!("Error executing command {}: {}", launch.command, err); + let _ = stream.write_all(error.as_bytes()); + } else { + let _ = stream.write_all(b"OK"); + } + let _ = stream.flush(); + + Ok(()) +} diff --git a/crates/krun/Cargo.toml b/crates/krun/Cargo.toml index 6ba9aef..dec684e 100644 --- a/crates/krun/Cargo.toml +++ b/crates/krun/Cargo.toml @@ -16,6 +16,8 @@ krun-sys = { workspace = true, features = [] } log = { workspace = true, features = ["kv"] } nix = { workspace = true, features = ["user"] } rustix = { workspace = true, features = ["process", "std", "use-libc-auxv"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } utils = { workspace = true, features = [] } [features] diff --git a/crates/krun/src/bin/krun.rs b/crates/krun/src/bin/krun.rs index c7f12d0..21edd59 100644 --- a/crates/krun/src/bin/krun.rs +++ b/crates/krun/src/bin/krun.rs @@ -12,6 +12,7 @@ use anyhow::{anyhow, Context, Result}; use krun::{ cli_options::options, cpu::{get_fallback_cores, get_performance_cores}, + lock::{lock_or_connect, LockResult}, net::{connect_to_passt, start_passt}, types::{MiB, NetMode}, }; @@ -27,7 +28,7 @@ use rustix::{ io::Errno, process::{geteuid, getgid, getrlimit, getuid, sched_setaffinity, setrlimit, CpuSet, Resource}, }; -use utils::env::find_in_path; +use utils::env::find_krun_exec; fn main() -> Result<()> { env_logger::init(); @@ -39,6 +40,15 @@ fn main() -> Result<()> { let options = options().fallback_to_usage().run(); + let _lock_file = match lock_or_connect(&options)? { + LockResult::LaunchRequested => { + // There was a krun instance already running and we've requested it + // to launch the command successfully, so all the work is done. + return Ok(()); + }, + LockResult::LockAcquired(lock_file) => lock_file, + }; + { // Set the log level to "off". // @@ -136,7 +146,9 @@ fn main() -> Result<()> { .context("Failed to connect to `passt`")? .into() } else { - start_passt().context("Failed to start `passt`")?.into() + start_passt(options.server_port) + .context("Failed to start `passt`")? + .into() }; // SAFETY: `passt_fd` is an `OwnedFd` and consumed to prevent closing on drop. // See https://doc.rust-lang.org/std/io/index.html#io-safety @@ -196,28 +208,8 @@ fn main() -> Result<()> { } } - let krun_guest_path = - find_in_path("krun-guest").context("Failed to check existence of `krun-guest`")?; - let krun_guest_path = if let Some(krun_guest_path) = krun_guest_path { - krun_guest_path - } else { - let krun_path = env::current_exe().and_then(|p| p.canonicalize()); - let krun_path = krun_path.context("Failed to get path of current running executable")?; - krun_path.with_file_name(format!( - "{}-guest", - krun_path - .file_name() - .expect("krun_path should end with a file name") - .to_str() - .context("Failed to process `krun` file name as it contains invalid UTF-8")? - )) - }; - let krun_guest_path = CString::new( - krun_guest_path - .to_str() - .context("Failed to process `krun-guest` path as it contains invalid UTF-8")?, - ) - .context("Failed to process `krun-guest` path as it contains NUL character")?; + let krun_guest_path = find_krun_exec("krun-guest")?; + let krun_server_path = find_krun_exec("krun-server")?; let mut krun_guest_args: Vec = vec![ CString::new(username).expect("username should not contain NUL character"), @@ -226,6 +218,8 @@ fn main() -> Result<()> { CString::new(format!("{}", getgid().as_raw())) .expect("gid should not contain NUL character"), ]; + + krun_guest_args.push(krun_server_path); krun_guest_args.push( CString::new(options.command) .context("Failed to process command as it contains NUL character")?, @@ -236,6 +230,7 @@ fn main() -> Result<()> { .context("Failed to process command arg as it contains NUL character")?; krun_guest_args.push(s); } + let krun_guest_args: Vec<*const c_char> = { const KRUN_GUEST_ARGS_FIXED: usize = 4; // SAFETY: All pointers must be stored in the same allocation. @@ -317,6 +312,8 @@ fn main() -> Result<()> { env.push(s); } + env.push(CString::new(format!("KRUN_SERVER_PORT={}", options.server_port)).unwrap()); + debug!(env:?; "env vars"); let env: Vec<*const c_char> = { diff --git a/crates/krun/src/cli_options.rs b/crates/krun/src/cli_options.rs index 943cffa..f38738a 100644 --- a/crates/krun/src/cli_options.rs +++ b/crates/krun/src/cli_options.rs @@ -12,6 +12,7 @@ pub struct Options { pub mem: Option, pub net: NetMode, pub passt_socket: Option, + pub server_port: u32, pub command: String, pub command_args: Vec, } @@ -86,6 +87,12 @@ pub fn options() -> OptionParser { .help("Instead of starting passt, connect to passt socket at PATH") .argument("PATH") .optional(); + let server_port = long("server-port") + .short('p') + .help("Set the port to be used in server mode") + .argument("SERVER_PORT") + .fallback(3334) + .display_fallback(); let command = positional("COMMAND").help("the command you want to execute in the vm"); let command_args = any::("COMMAND_ARGS", |arg| { (!["--help", "-h"].contains(&&*arg)).then_some(arg) @@ -99,6 +106,7 @@ pub fn options() -> OptionParser { mem, net, passt_socket, + server_port, // positionals command, command_args, diff --git a/crates/krun/src/lib.rs b/crates/krun/src/lib.rs index 1ae1296..2f46467 100644 --- a/crates/krun/src/lib.rs +++ b/crates/krun/src/lib.rs @@ -1,4 +1,5 @@ pub mod cli_options; pub mod cpu; +pub mod lock; pub mod net; pub mod types; diff --git a/crates/krun/src/lock.rs b/crates/krun/src/lock.rs new file mode 100644 index 0000000..7f0682f --- /dev/null +++ b/crates/krun/src/lock.rs @@ -0,0 +1,132 @@ +use std::{ + collections::HashMap, + env, + fs::File, + io::{BufRead, BufReader, Read, Write}, + net::TcpStream, + path::Path, +}; + +use anyhow::{anyhow, Context, Result}; +use rustix::{ + fs::{flock, FlockOperation}, + path::Arg, +}; +use serde_json; +use utils::launch::Launch; + +use crate::cli_options::Options; + +pub enum LockResult { + LaunchRequested, + LockAcquired(File), +} + +pub fn lock_or_connect(options: &Options) -> Result { + let running_server_port = env::var("KRUN_SERVER_PORT").ok(); + if let Some(port) = running_server_port { + let port: u32 = port.parse()?; + request_launch(port, &options.command, &options.command_args, &options.env)?; + return Ok(LockResult::LaunchRequested); + } + + let (lock_file, running_server_port) = lock_file(options.server_port)?; + match lock_file { + Some(lock_file) => Ok(LockResult::LockAcquired(lock_file)), + None => { + if let Some(port) = running_server_port { + let mut tries = 0; + while let Err(e) = + request_launch(port, &options.command, &options.command_args, &options.env) + { + if tries == 3 { + return Err(e); + } else { + tries += 1; + } + } + Ok(LockResult::LaunchRequested) + } else { + Err(anyhow!( + "krun is already running but couldn't find its server port, bailing out" + )) + } + }, + } +} + +fn lock_file(server_port: u32) -> Result<(Option, Option)> { + let run_path = env::var("XDG_RUNTIME_DIR") + .context("Failed to read XDG_RUNTIME_DIR environment variable")?; + let lock_path = Path::new(&run_path).join("krun.lock"); + + let mut lock_file = if !lock_path.exists() { + let lock_file = File::create(lock_path).context("Can't create lock file")?; + flock(&lock_file, FlockOperation::NonBlockingLockExclusive) + .context("Can't acquire an exclusive lock on new lock file")?; + lock_file + } else { + let mut lock_file = File::options() + .write(true) + .read(true) + .open(lock_path) + .context("Can't create lock file")?; + let ret = flock(&lock_file, FlockOperation::NonBlockingLockExclusive); + if ret.is_err() { + let mut data: Vec = Vec::with_capacity(5); + lock_file.read_to_end(&mut data)?; + let port = match data.to_string_lossy().parse::() { + Ok(port) => { + if port > 1024 { + Some(port) + } else { + None + } + }, + Err(_) => None, + }; + return Ok((None, port)); + } + lock_file + }; + + lock_file.set_len(0)?; + lock_file.write_all(format!("{}", server_port).as_bytes())?; + Ok((Some(lock_file), None)) +} + +fn request_launch( + port: u32, + command: &String, + args: &[String], + envs: &Vec<(String, Option)>, +) -> Result<()> { + let mut stream = TcpStream::connect(format!("127.0.0.1:{}", port))?; + + let mut envs_map: HashMap = HashMap::new(); + for (k, v) in envs { + if let Some(v) = v { + envs_map.insert(k.to_string(), v.to_string()); + } + } + + let launch = Launch { + command: command.to_string(), + args: args.to_vec(), + envs: envs_map, + }; + + stream.write_all(serde_json::to_string(&launch)?.as_bytes())?; + stream.write_all(b"\nEOM\n")?; + stream.flush()?; + + let mut buf_reader = BufReader::new(&mut stream); + let mut resp = String::new(); + buf_reader.read_line(&mut resp)?; + + if resp == "OK" { + Ok(()) + } else { + Err(anyhow!("Error requesting launch to server: {resp}")) + } +} diff --git a/crates/krun/src/net.rs b/crates/krun/src/net.rs index 46b9f81..4ab1453 100644 --- a/crates/krun/src/net.rs +++ b/crates/krun/src/net.rs @@ -19,7 +19,7 @@ where Ok(UnixStream::connect(passt_socket_path)?) } -pub fn start_passt() -> Result { +pub fn start_passt(server_port: u32) -> Result { // SAFETY: The child process should not inherit the file descriptor of `parent_socket`. // There is no documented guarantee of this, but the implementation as of writing atomically // sets `SOCK_CLOEXEC`. @@ -43,7 +43,9 @@ pub fn start_passt() -> Result { // owned by the child process. // See https://doc.rust-lang.org/std/io/index.html#io-safety let child = Command::new("passt") - .args(["-q", "-f", "--fd"]) + .args(["-q", "-f", "-t"]) + .arg(format!("{server_port}:{server_port}")) + .arg("--fd") .arg(format!("{}", child_fd.into_raw_fd())) .spawn(); if let Err(err) = child { diff --git a/crates/utils/Cargo.toml b/crates/utils/Cargo.toml index ba20df6..017c807 100644 --- a/crates/utils/Cargo.toml +++ b/crates/utils/Cargo.toml @@ -7,5 +7,11 @@ repository = { workspace = true } license = { workspace = true } publish = false +[dependencies] +anyhow = { workspace = true, features = ["std"] } +rustix = { workspace = true, features = ["fs"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" + [features] default = [] diff --git a/crates/utils/src/env.rs b/crates/utils/src/env.rs index 8497cd6..1c734a7 100644 --- a/crates/utils/src/env.rs +++ b/crates/utils/src/env.rs @@ -1,9 +1,13 @@ use std::{ - env, fs, io, + env, + ffi::CString, + fs, io, os::unix::fs::PermissionsExt as _, path::{Path, PathBuf}, }; +use anyhow::{Context, Result}; + pub fn find_in_path

(program: P) -> io::Result> where P: AsRef, @@ -37,3 +41,29 @@ where Ok(None) } + +pub fn find_krun_exec(name: &str) -> Result { + let krun_path = find_in_path(name).context("Failed to check existence of {name}")?; + let krun_path = if let Some(krun_path) = krun_path { + krun_path + } else { + let krun_path = env::current_exe().and_then(|p| p.canonicalize()); + let krun_path = krun_path.context("Failed to get path of current running executable")?; + krun_path.with_file_name(format!( + "{}-guest", + krun_path + .file_name() + .expect("krun_path should end with a file name") + .to_str() + .context("Failed to process `krun` file name as it contains invalid UTF-8")? + )) + }; + let krun_path = CString::new( + krun_path + .to_str() + .context("Failed to process {name} path as it contains invalid UTF-8")?, + ) + .context("Failed to process {name} path as it contains NUL character")?; + + Ok(krun_path) +} diff --git a/crates/utils/src/launch.rs b/crates/utils/src/launch.rs new file mode 100644 index 0000000..fd66f74 --- /dev/null +++ b/crates/utils/src/launch.rs @@ -0,0 +1,10 @@ +use std::collections::HashMap; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Eq, PartialEq, Debug, Deserialize, Serialize)] +pub struct Launch { + pub command: String, + pub args: Vec, + pub envs: HashMap, +} diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 3d7924f..079241a 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -1 +1,2 @@ pub mod env; +pub mod launch;