Skip to content

Commit

Permalink
Introduce a client-server model
Browse files Browse the repository at this point in the history
With the introduction of 47fdd45 (Default guest memory to 80% of total
RAM on host), we're targeting a use case in which we expect to be
running a single, large VM.

To accommodate this use case, we're switching here to a client/server
model. The first time krun is launched, in addition to running the
command, it starts a small server in the VM listening on a TCP port. On
the host side, it also creates a lock file, writes the listening port
number to it, and calls flock() on it to prevent other instances from
running in parallel.

When it's executed again, krun detects there's another instance running,
and contacts the server in the VM to request the launch of the command.
The stdout and stderr of the command are redirected to
$XDG_RUNTIME_DIR/krun-$COMMAND-$TIMESTAMP.[stdout|stderr]

Signed-off-by: Sergio Lopez <[email protected]>
  • Loading branch information
slp committed Jun 4, 2024
1 parent b0133b9 commit f3ea74f
Show file tree
Hide file tree
Showing 17 changed files with 464 additions and 27 deletions.
64 changes: 64 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,7 @@ krun-sys = { path = "crates/krun-sys", default-features = false }
log = { version = "0.4.21", default-features = false }
nix = { version = "0.28.0", default-features = false }
rustix = { version = "0.38.34", default-features = false }
serde = { version = "1.0.203", default-features = false }
serde_json = { version = "1.0.117", default-features = false }
tempfile = { version = "3.10.1", default-features = false }
utils = { path = "crates/utils", default-features = false }
22 changes: 22 additions & 0 deletions crates/krun-server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "krun-server"
version = { workspace = true }
authors = { workspace = true }
edition = { workspace = true }
rust-version = { workspace = true }
description = { workspace = true }
repository = { workspace = true }
license = { workspace = true }

[dependencies]
anyhow = { workspace = true, features = ["std"] }
bpaf = { workspace = true, features = [] }
env_logger = { workspace = true, features = ["auto-color", "humantime", "unstable-kv"] }
log = { workspace = true, features = ["kv"] }
nix = { workspace = true, features = ["socket"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
utils = { workspace = true, features = [] }

[features]
default = []
33 changes: 33 additions & 0 deletions crates/krun-server/src/bin/krun-server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use std::net::TcpListener;
use std::os::fd::AsRawFd;
use std::process::Command;

use anyhow::Result;
use krun_server::cli_options::options;
use krun_server::server::start_server;
use nix::sys::socket::{shutdown, Shutdown};

fn main() -> Result<()> {
env_logger::init();

let options = options().run();

let listener = TcpListener::bind(format!("127.0.0.1:{}", &options.server_port))?;
let listener_fd = listener.as_raw_fd();

let server_thread = start_server(listener);

let err = Command::new(&options.command)
.args(options.command_args)
.status();
if let Err(err) = err {
println!("Error executing command {}: {}", options.command, err);
}

shutdown(listener_fd, Shutdown::Both)?;
if let Err(err) = server_thread.join() {
println!("Error waiting for server thread termination: {err:?}");
}

Ok(())
}
38 changes: 38 additions & 0 deletions crates/krun-server/src/cli_options.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use bpaf::{any, construct, env, positional, OptionParser, Parser};

#[derive(Clone, Debug)]
pub struct Options {
pub server_port: u32,
pub command: String,
pub command_args: Vec<String>,
}

pub fn options() -> OptionParser<Options> {
let server_port = env("KRUN_SERVER_PORT")
.help("TCP port to listen for command launch requests")
.argument::<u32>("KRUN_SERVER_PORT")
.fallback(3333);
let command = positional("COMMAND");
let command_args = any::<String, _, _>("COMMAND_ARGS", |arg| {
(!["--help", "-h"].contains(&&*arg)).then_some(arg)
})
.many();

construct!(Options {
server_port,
// positionals
command,
command_args,
})
.to_options()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn check_options() {
options().check_invariants(false)
}
}
2 changes: 2 additions & 0 deletions crates/krun-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod cli_options;
pub mod server;
87 changes: 87 additions & 0 deletions crates/krun-server/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::{
collections::HashMap,
env,
fs::File,
io::{BufRead, BufReader, Write},
net::{TcpListener, TcpStream},
path::Path,
process::Command,
thread::{self, JoinHandle},
time::{SystemTime, UNIX_EPOCH},
};

use anyhow::{anyhow, Result};
use log::debug;
use utils::launch::Launch;

pub fn start_server(listener: TcpListener) -> JoinHandle<()> {
thread::spawn(move || {
if let Err(err) = work(listener) {
debug!("krun server thread is terminating: {err:?}")
}
})
}

fn work(listener: TcpListener) -> Result<()> {
for stream in listener.incoming() {
let stream = stream?;

if let Err(e) = handle_connection(stream) {
println!("Error processing client request: {e}");
}
}

Ok(())
}

fn read_request(mut stream: &TcpStream) -> Result<Launch> {
let mut buf_reader = BufReader::new(&mut stream);
let mut buf = String::new();
loop {
if buf_reader.read_line(&mut buf)? == 0 {
return Err(anyhow!("Unexpected EOF"));
}
if buf.contains("EOM") {
let launch: Launch = serde_json::from_str(&buf[..buf.len() - 5])?;
return Ok(launch);
}
}
}

fn handle_connection(mut stream: TcpStream) -> Result<()> {
let mut envs: HashMap<String, String> = env::vars().collect();

let launch = read_request(&stream)?;
envs.extend(launch.envs);

let (stdout, stderr) = {
let base = if envs.contains_key("XDG_RUNTIME_DIR") {
Path::new(&envs["XDG_RUNTIME_DIR"])
} else {
Path::new("/tmp")
};
let ts = SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis();
let path_stdout = base.join(format!("krun-{}-{}.stdout", launch.command, ts));
let path_stderr = base.join(format!("krun-{}-{}.stderr", launch.command, ts));
(
File::create_new(path_stdout)?,
File::create_new(path_stderr)?,
)
};

let err = Command::new(&launch.command)
.args(launch.args)
.envs(envs)
.stdout(stdout)
.stderr(stderr)
.spawn();
if let Err(err) = err {
let error = format!("Error executing command {}: {}", launch.command, err);
let _ = stream.write_all(error.as_bytes());
} else {
let _ = stream.write_all(b"OK");
}
let _ = stream.flush();

Ok(())
}
2 changes: 2 additions & 0 deletions crates/krun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ krun-sys = { workspace = true, features = [] }
log = { workspace = true, features = ["kv"] }
nix = { workspace = true, features = ["user"] }
rustix = { workspace = true, features = ["process", "std", "use-libc-auxv"] }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
utils = { workspace = true, features = [] }

[features]
Expand Down
Loading

0 comments on commit f3ea74f

Please sign in to comment.