From 61c9d7a99fd576bdad68c9d9029bb9d953f46dcc Mon Sep 17 00:00:00 2001 From: James Munns Date: Mon, 15 Jul 2024 19:17:47 +0200 Subject: [PATCH] Implement hot/graceful reload as well as daemonization --- source/river/assets/test-config.kdl | 16 ++++++++ source/river/src/config/cli.rs | 16 ++++++++ source/river/src/config/internal.rs | 46 ++++++++++++++++++--- source/river/src/config/kdl/mod.rs | 61 +++++++++++++++++++++++++--- source/river/src/config/kdl/utils.rs | 14 +++++++ source/river/src/config/mod.rs | 32 +++++++++++++++ 6 files changed, 174 insertions(+), 11 deletions(-) diff --git a/source/river/assets/test-config.kdl b/source/river/assets/test-config.kdl index 30f9ecb..a3f85d5 100644 --- a/source/river/assets/test-config.kdl +++ b/source/river/assets/test-config.kdl @@ -1,6 +1,22 @@ // System configuration items - applies to the entire application system { threads-per-service 8 + // Should the server daemonize and run in the background? + // + // NOTE: If this is "true", then "pid-file" must be set + daemonize false + + // Path to the pidfile used when daemonizing + // + // NOTE: This has issues if you use relative paths. See issue TODO + pid-file "/tmp/river.pidfile" + + // Path to upgrade socket + // + // NOTE: `upgrade` is NOT exposed in the config file, it MUST be set on the CLI + // NOTE: This has issues if you use relative paths. See issue TODO + // NOTE: The upgrade command is only supported on Linux + upgrade-socket "/tmp/river-upgrade.sock" } // Services are the main abstraction of River diff --git a/source/river/src/config/cli.rs b/source/river/src/config/cli.rs index 1c1011b..0ee3f5e 100644 --- a/source/river/src/config/cli.rs +++ b/source/river/src/config/cli.rs @@ -21,4 +21,20 @@ pub struct Cli { /// Number of threads used in the worker pool for EACH service #[arg(long)] pub threads_per_service: Option, + + /// Should the server be daemonized after starting? + #[arg(long)] + pub daemonize: bool, + + /// Should the server take over an existing server? + #[arg(long)] + pub upgrade: bool, + + /// Path to upgrade socket + #[arg(long)] + pub upgrade_socket: Option, + + /// Path to the pidfile, used for upgrade + #[arg(long)] + pub pidfile: Option, } diff --git a/source/river/src/config/internal.rs b/source/river/src/config/internal.rs index c7d43cc..bc793cf 100644 --- a/source/river/src/config/internal.rs +++ b/source/river/src/config/internal.rs @@ -20,6 +20,10 @@ use crate::proxy::request_selector::{null_selector, RequestSelector}; pub struct Config { pub validate_configs: bool, pub threads_per_service: usize, + pub daemonize: bool, + pub pid_file: Option, + pub upgrade_socket: Option, + pub upgrade: bool, pub basic_proxies: Vec, pub file_servers: Vec, } @@ -29,8 +33,8 @@ impl Config { pub fn pingora_opt(&self) -> PingoraOpt { // TODO PingoraOpt { - upgrade: false, - daemon: false, + upgrade: self.upgrade, + daemon: self.daemonize, nocapture: false, test: self.validate_configs, conf: None, @@ -40,13 +44,25 @@ impl Config { /// Get the [`ServerConf`][PingoraServerConf] field for Pingora pub fn pingora_server_conf(&self) -> PingoraServerConf { PingoraServerConf { - daemon: false, + daemon: self.daemonize, error_log: None, // TODO: These are bad assumptions - non-developers will not have "target" // files, and we shouldn't necessarily use utf-8 strings with fixed separators // here. - pid_file: String::from("./target/pidfile"), - upgrade_sock: String::from("./target/upgrade"), + pid_file: self + .pid_file + .as_ref() + .cloned() + .unwrap_or_else(|| PathBuf::from("river.pidfile")) + .to_string_lossy() + .into(), + upgrade_sock: self + .upgrade_socket + .as_ref() + .cloned() + .unwrap_or_else(|| PathBuf::from("river-upgrade.sock")) + .to_string_lossy() + .into(), user: None, group: None, threads: self.threads_per_service, @@ -58,6 +74,22 @@ impl Config { pub fn validate(&self) { // TODO: validation logic + if self.daemonize { + assert!( + self.pid_file.is_some(), + "Daemonize commanded but no pid file set!" + ); + } + if self.upgrade { + assert!( + cfg!(target_os = "linux"), + "Upgrade is only supported on linux!" + ); + assert!( + self.upgrade_socket.is_some(), + "Upgrade commanded but no upgrade socket set!" + ); + } } } @@ -162,6 +194,10 @@ impl Default for Config { threads_per_service: 8, basic_proxies: vec![], file_servers: vec![], + daemonize: false, + pid_file: None, + upgrade: false, + upgrade_socket: None, } } } diff --git a/source/river/src/config/kdl/mod.rs b/source/river/src/config/kdl/mod.rs index ae74b5d..e1aa016 100644 --- a/source/river/src/config/kdl/mod.rs +++ b/source/river/src/config/kdl/mod.rs @@ -27,11 +27,19 @@ impl TryFrom for Config { type Error = miette::Error; fn try_from(value: KdlDocument) -> Result { - let threads_per_service = extract_threads_per_service(&value)?; + let SystemData { + threads_per_service, + daemonize, + upgrade_socket, + pid_file, + } = extract_system_data(&value)?; let (basic_proxies, file_servers) = extract_services(&value)?; Ok(Config { threads_per_service, + daemonize, + upgrade_socket, + pid_file, basic_proxies, file_servers, ..Config::default() @@ -39,6 +47,13 @@ impl TryFrom for Config { } } +struct SystemData { + threads_per_service: usize, + daemonize: bool, + upgrade_socket: Option, + pid_file: Option, +} + /// Extract all services from the top level document fn extract_services( doc: &KdlDocument, @@ -435,11 +450,45 @@ fn extract_listener( } // system { threads-per-service N } -fn extract_threads_per_service(doc: &KdlDocument) -> miette::Result { - let Some(tps) = - utils::optional_child_doc(doc, doc, "system").and_then(|n| n.get("threads-per-service")) - else { - // Not present, go ahead and return the default +fn extract_system_data(doc: &KdlDocument) -> miette::Result { + // Get the top level system doc + let sys = utils::required_child_doc(doc, doc, "system")?; + let tps = extract_threads_per_service(doc, sys)?; + + let daemonize = if let Some(n) = sys.get("daemonize") { + utils::extract_one_bool_arg(doc, n, "daemonize", n.entries())? + } else { + false + }; + + let upgrade_socket = if let Some(n) = sys.get("upgrade-socket") { + let x = utils::extract_one_str_arg(doc, n, "upgrade-socket", n.entries(), |s| { + Some(PathBuf::from(s)) + })?; + Some(x) + } else { + None + }; + + let pid_file = if let Some(n) = sys.get("pid-file") { + let x = utils::extract_one_str_arg(doc, n, "pid-file", n.entries(), |s| { + Some(PathBuf::from(s)) + })?; + Some(x) + } else { + None + }; + + Ok(SystemData { + threads_per_service: tps, + daemonize, + upgrade_socket, + pid_file, + }) +} + +fn extract_threads_per_service(doc: &KdlDocument, sys: &KdlDocument) -> miette::Result { + let Some(tps) = sys.get("threads-per-service") else { return Ok(8); }; diff --git a/source/river/src/config/kdl/utils.rs b/source/river/src/config/kdl/utils.rs index ce4f54f..824c769 100644 --- a/source/river/src/config/kdl/utils.rs +++ b/source/river/src/config/kdl/utils.rs @@ -109,6 +109,20 @@ pub(crate) fn extract_one_str_arg Option>( .or_bail(format!("Incorrect argument for '{name}'"), doc, node.span()) } +/// Extract a single un-named bool argument, like `daemonize true` +pub(crate) fn extract_one_bool_arg( + doc: &KdlDocument, + node: &KdlNode, + name: &str, + args: &[KdlEntry], +) -> miette::Result { + match args { + [one] => one.value().as_bool(), + _ => None, + } + .or_bail(format!("Incorrect argument for '{name}'"), doc, node.span()) +} + /// Like `extract_one_str_arg`, but with bonus "str:str" key/val pairs /// /// `selection "Ketama" key="UriPath"` diff --git a/source/river/src/config/mod.rs b/source/river/src/config/mod.rs index 972260a..426903d 100644 --- a/source/river/src/config/mod.rs +++ b/source/river/src/config/mod.rs @@ -68,6 +68,9 @@ pub fn render_config() -> internal::Config { apply_cli(&mut config, &c); tracing::info!(?config, "Full configuration",); + tracing::info!("Validating..."); + config.validate(); + tracing::info!("Validation complete"); config } @@ -77,9 +80,38 @@ fn apply_cli(conf: &mut internal::Config, cli: &Cli) { threads_per_service, config_toml: _, config_kdl: _, + daemonize, + upgrade, + pidfile, + upgrade_socket, } = cli; conf.validate_configs |= validate_configs; + conf.daemonize |= daemonize; + conf.upgrade |= upgrade; + + if let Some(pidfile) = pidfile { + if let Some(current_pidfile) = conf.pid_file.as_ref() { + if pidfile != current_pidfile { + panic!( + "Mismatched commanded PID files. CLI: {pidfile:?}, Config: {current_pidfile:?}" + ); + } + } + conf.pid_file = Some(pidfile.into()); + } + + if let Some(upgrade_socket) = upgrade_socket { + if let Some(current_upgrade_socket) = conf.upgrade_socket.as_ref() { + if upgrade_socket != current_upgrade_socket { + panic!( + "Mismatched commanded upgrade sockets. CLI: {upgrade_socket:?}, Config: {current_upgrade_socket:?}" + ); + } + } + conf.upgrade_socket = Some(upgrade_socket.into()); + } + if let Some(tps) = threads_per_service { conf.threads_per_service = *tps; }