From 397b12bca04932c9747ebf270a48f0588c0f0b69 Mon Sep 17 00:00:00 2001 From: Teoh Han Hui Date: Sun, 9 Jun 2024 17:11:57 +0800 Subject: [PATCH] Make krun-server wait for child processes Continue to accept and handle incoming connections, until the server is idle and all child processes have exited. --- Cargo.lock | 17 ----- crates/krun-guest/src/net.rs | 2 +- crates/krun-guest/src/pulse.rs | 3 +- crates/krun-server/Cargo.toml | 1 - crates/krun-server/src/bin/krun-server.rs | 90 +++++++++++++++++++---- crates/krun-server/src/server.rs | 77 ++++++++++++------- rustfmt.toml | 2 +- 7 files changed, 132 insertions(+), 60 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5783f5d..57aafbb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -66,12 +66,6 @@ version = "1.0.83" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "25bdb32cbbdce2b519a9cd7df3a678443100e265d5e25ca763b7572a5104f5f3" -[[package]] -name = "autocfg" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" - [[package]] name = "bindgen" version = "0.69.4" @@ -257,7 +251,6 @@ dependencies = [ "bpaf", "env_logger", "log", - "nix", "serde", "serde_json", "utils", @@ -306,15 +299,6 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c8640c5d730cb13ebd907d8d04b52f55ac9a2eec55b440c8892f40d56c76c1d" -[[package]] -name = "memoffset" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "488016bfae457b036d996092f6cb448677611ce4449e970ceaf42695203f218a" -dependencies = [ - "autocfg", -] - [[package]] name = "minimal-lexical" version = "0.2.1" @@ -331,7 +315,6 @@ dependencies = [ "cfg-if", "cfg_aliases", "libc", - "memoffset", ] [[package]] diff --git a/crates/krun-guest/src/net.rs b/crates/krun-guest/src/net.rs index e4cabd0..629ccd0 100644 --- a/crates/krun-guest/src/net.rs +++ b/crates/krun-guest/src/net.rs @@ -1,5 +1,5 @@ use std::fs; -use std::os::unix::process::ExitStatusExt; +use std::os::unix::process::ExitStatusExt as _; use std::process::Command; use anyhow::{anyhow, Context, Result}; diff --git a/crates/krun-guest/src/pulse.rs b/crates/krun-guest/src/pulse.rs index ed091e7..cf2b97f 100644 --- a/crates/krun-guest/src/pulse.rs +++ b/crates/krun-guest/src/pulse.rs @@ -1,8 +1,7 @@ use std::collections::HashMap; -use std::env; -use std::fs; use std::path::Path; use std::process::{Command, Stdio}; +use std::{env, fs}; use anyhow::{Context, Result}; use utils::env::find_in_path; diff --git a/crates/krun-server/Cargo.toml b/crates/krun-server/Cargo.toml index cf8b3fc..95fb36b 100644 --- a/crates/krun-server/Cargo.toml +++ b/crates/krun-server/Cargo.toml @@ -13,7 +13,6 @@ anyhow = { workspace = true, features = ["std"] } bpaf = { workspace = true, features = [] } env_logger = { workspace = true, features = ["auto-color", "humantime", "unstable-kv"] } log = { workspace = true, features = ["kv"] } -nix = { workspace = true, features = ["socket"] } serde = { workspace = true, features = [] } serde_json = { workspace = true, features = ["std"] } utils = { workspace = true, features = [] } diff --git a/crates/krun-server/src/bin/krun-server.rs b/crates/krun-server/src/bin/krun-server.rs index a61d34a..dfe39d4 100644 --- a/crates/krun-server/src/bin/krun-server.rs +++ b/crates/krun-server/src/bin/krun-server.rs @@ -1,12 +1,13 @@ use std::net::TcpListener; -use std::os::fd::AsRawFd; -use std::panic; +use std::os::unix::process::ExitStatusExt as _; use std::process::Command; +use std::sync::mpsc::{channel, TryRecvError}; +use std::sync::{Arc, Condvar, Mutex}; -use anyhow::{Context, Result}; +use anyhow::Result; use krun_server::cli_options::options; use krun_server::server::start_server; -use nix::sys::socket::{shutdown, Shutdown}; +use log::debug; fn main() -> Result<()> { env_logger::init(); @@ -14,19 +15,82 @@ fn main() -> Result<()> { let options = options().run(); let listener = TcpListener::bind(format!("0.0.0.0:{}", options.server_port))?; - let listener_fd = listener.as_raw_fd(); + let (child_tx, child_rx) = channel(); + let idle_pair = Arc::new((Mutex::new(true), Condvar::new())); - let server_thread = start_server(listener); + start_server(listener, child_tx, Arc::clone(&idle_pair)); - Command::new(&options.command) + match Command::new(&options.command) .args(options.command_args) .status() - .with_context(|| format!("Failed to execute command {:?}", options.command))?; - - shutdown(listener_fd, Shutdown::Both)?; - if let Err(err) = server_thread.join() { - panic::resume_unwind(err); + { + Ok(status) => { + if !status.success() { + if let Some(code) = status.code() { + eprintln!( + "{:?} process exited with status code: {code}", + options.command + ); + } else { + eprintln!( + "{:?} process terminated by signal: {}", + options.command, + status + .signal() + .expect("either one of status code or signal should be set") + ); + } + } + }, + Err(err) => { + eprintln!( + "Failed to execute {:?} as child process: {err}", + options.command + ); + }, } - Ok(()) + println!("Waiting for other commands launched through this krun server to exit..."); + println!("Press Ctrl+C to force quit"); + + let (idle_lock, idle_cvar) = &*idle_pair; + + loop { + let mut child = { + let _guard = idle_cvar + .wait_while(idle_lock.lock().unwrap(), |idle| !*idle) + .unwrap(); + match child_rx.try_recv() { + Ok(child) => child, + Err(TryRecvError::Empty) => { + // Server is idle and no more child processes. We're done. + return Ok(()); + }, + Err(TryRecvError::Disconnected) => { + panic!("child_tx in server thread should not be gone"); + }, + } + }; + let child_pid = child.id(); + match child.wait() { + Ok(status) => { + debug!("child process {child_pid} exited"); + if !status.success() { + if let Some(code) = status.code() { + eprintln!("child process {child_pid} exited with status code: {code}"); + } else { + eprintln!( + "child process {child_pid} terminated by signal: {}", + status + .signal() + .expect("either one of status code or signal should be set") + ); + } + } + }, + Err(err) => { + eprintln!("Failed to wait for child process {child_pid} to exit: {err}"); + }, + } + } } diff --git a/crates/krun-server/src/server.rs b/crates/krun-server/src/server.rs index 7839003..88d1425 100644 --- a/crates/krun-server/src/server.rs +++ b/crates/krun-server/src/server.rs @@ -1,32 +1,57 @@ use std::collections::HashMap; -use std::env; use std::io::{BufRead, BufReader, Write}; use std::net::{TcpListener, TcpStream}; -use std::process::{Command, Stdio}; -use std::thread::{self, JoinHandle}; +use std::process::{Child, Command, Stdio}; +use std::sync::mpsc::Sender; +use std::sync::{Arc, Condvar, Mutex}; +use std::{env, thread}; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use log::debug; -use utils::{launch::Launch, stdio::make_stdout_stderr}; +use utils::launch::Launch; +use utils::stdio::make_stdout_stderr; -pub fn start_server(listener: TcpListener) -> JoinHandle<()> { +pub fn start_server( + listener: TcpListener, + child_tx: Sender, + idle_pair: Arc<(Mutex, Condvar)>, +) { thread::spawn(move || { - if let Err(err) = work(listener) { - debug!(err:?; "server thread is terminating") - } - }) -} + let (idle_lock, idle_cvar) = &*idle_pair; + for stream in listener.incoming() { + { + let mut idle = idle_lock.lock().unwrap(); + *idle = false; + idle_cvar.notify_one(); + } + let stream = match stream { + Ok(stream) => stream, + Err(err) => { + eprintln!("Failed to accept incoming connection: {err}"); + let mut idle = idle_lock.lock().unwrap(); + *idle = true; + idle_cvar.notify_one(); + continue; + }, + }; -fn work(listener: TcpListener) -> Result<()> { - for stream in listener.incoming() { - let stream = stream?; - - if let Err(err) = handle_connection(stream) { - println!("Error processing client request: {err:?}"); + match handle_connection(stream) { + Ok(child) => { + child_tx + .send(child) + .expect("child_rx in main thread should not be gone"); + }, + Err(err) => { + eprintln!("Failed to process client request: {err:?}"); + }, + } + let mut idle = idle_lock.lock().unwrap(); + *idle = true; + idle_cvar.notify_one(); } - } - Ok(()) + unreachable!("server accept loop should not exit"); + }); } fn read_request(mut stream: &TcpStream) -> Result { @@ -43,7 +68,7 @@ fn read_request(mut stream: &TcpStream) -> Result { } } -fn handle_connection(mut stream: TcpStream) -> Result<()> { +fn handle_connection(mut stream: TcpStream) -> Result { let mut envs: HashMap = env::vars().collect(); let Launch { @@ -51,24 +76,26 @@ fn handle_connection(mut stream: TcpStream) -> Result<()> { command_args, env, } = read_request(&stream)?; + debug!(command:?, command_args:?, env:?; "received launch request"); envs.extend(env); let (stdout, stderr) = make_stdout_stderr(&command, &envs)?; - let err = Command::new(&command) + let spawn_result = Command::new(&command) .args(command_args) .envs(envs) .stdin(Stdio::null()) .stdout(stdout) .stderr(stderr) - .spawn(); - if let Err(err) = err { - let msg = format!("Failed to execute command {command:?}: {err}"); + .spawn() + .with_context(|| format!("Failed to execute {command:?} as child process")); + if let Err(err) = &spawn_result { + let msg = format!("{err:?}"); stream.write_all(msg.as_bytes()).ok(); } else { stream.write_all(b"OK").ok(); } stream.flush().ok(); - Ok(()) + spawn_result } diff --git a/rustfmt.toml b/rustfmt.toml index 1b26771..abb3c84 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,6 +1,6 @@ edition = "2021" -# empty_item_single_line = false +# empty_item_single_line = true # error_on_line_overflow = true # format_code_in_doc_comments = true # format_strings = true