diff --git a/changelog.d/+mirrord-policy-rejection.fixed.md b/changelog.d/+mirrord-policy-rejection.fixed.md new file mode 100644 index 00000000000..ad7415c5411 --- /dev/null +++ b/changelog.d/+mirrord-policy-rejection.fixed.md @@ -0,0 +1 @@ +Fixed a bug where port mirroring block (due to active mirrord policies) would terminate the mirrord session. diff --git a/changelog.d/2868.changed.md b/changelog.d/2868.changed.md new file mode 100644 index 00000000000..f24bdc1461c --- /dev/null +++ b/changelog.d/2868.changed.md @@ -0,0 +1 @@ +Updated how intproxy is outputing logfile when using container mode, now logs will be written on host machine. diff --git a/mirrord/cli/Cargo.toml b/mirrord/cli/Cargo.toml index 420ec6e97f1..aea0b77d7cc 100644 --- a/mirrord/cli/Cargo.toml +++ b/mirrord/cli/Cargo.toml @@ -63,11 +63,12 @@ tempfile.workspace = true rcgen.workspace = true rustls-pemfile.workspace = true tokio-rustls.workspace = true -tokio-stream = { workspace = true, features = ["net"] } +tokio-stream = { workspace = true, features = ["io-util", "net"] } regex.workspace = true mid = "3.0.0" rand.workspace = true + [target.'cfg(target_os = "macos")'.dependencies] mirrord-sip = { path = "../sip" } diff --git a/mirrord/cli/src/config.rs b/mirrord/cli/src/config.rs index 1570cfbca35..367504ddbf4 100644 --- a/mirrord/cli/src/config.rs +++ b/mirrord/cli/src/config.rs @@ -834,6 +834,13 @@ pub struct RuntimeArgs { /// Supported command for using mirrord with container runtimes. #[derive(Subcommand, Debug, Clone)] pub(super) enum ContainerRuntimeCommand { + /// Execute a ` create` command with mirrord loaded. (not supported with ) + #[command(hide = true)] + Create { + /// Arguments that will be propogated to underlying ` create` command. + #[arg(allow_hyphen_values = true, trailing_var_arg = true)] + runtime_args: Vec, + }, /// Execute a ` run` command with mirrord loaded. Run { /// Arguments that will be propogated to underlying ` run` command. @@ -843,14 +850,17 @@ pub(super) enum ContainerRuntimeCommand { } impl ContainerRuntimeCommand { - pub fn run>(runtime_args: impl IntoIterator) -> Self { - ContainerRuntimeCommand::Run { + pub fn create>(runtime_args: impl IntoIterator) -> Self { + ContainerRuntimeCommand::Create { runtime_args: runtime_args.into_iter().map(T::into).collect(), } } pub fn has_publish(&self) -> bool { - let ContainerRuntimeCommand::Run { runtime_args } = self; + let runtime_args = match self { + ContainerRuntimeCommand::Run { runtime_args } => runtime_args, + _ => return false, + }; let mut hit_trailing_token = false; @@ -860,6 +870,15 @@ impl ContainerRuntimeCommand { !hit_trailing_token && matches!(runtime_arg.as_str(), "-p" | "--publish") }) } + + pub fn into_parts(self) -> (Vec, Vec) { + match self { + ContainerRuntimeCommand::Create { runtime_args } => { + (vec!["create".to_owned()], runtime_args) + } + ContainerRuntimeCommand::Run { runtime_args } => (vec!["run".to_owned()], runtime_args), + } + } } #[derive(Args, Debug)] @@ -947,7 +966,9 @@ mod tests { assert_eq!(runtime_args.runtime, ContainerRuntime::Podman); - let ContainerRuntimeCommand::Run { runtime_args } = runtime_args.command; + let ContainerRuntimeCommand::Run { runtime_args } = runtime_args.command else { + panic!("expected run command"); + }; assert_eq!(runtime_args, vec!["-it", "--rm", "debian"]); } @@ -965,7 +986,9 @@ mod tests { assert_eq!(runtime_args.runtime, ContainerRuntime::Podman); - let ContainerRuntimeCommand::Run { runtime_args } = runtime_args.command; + let ContainerRuntimeCommand::Run { runtime_args } = runtime_args.command else { + panic!("expected run command"); + }; assert_eq!(runtime_args, vec!["-it", "--rm", "debian"]); } diff --git a/mirrord/cli/src/container.rs b/mirrord/cli/src/container.rs index afffad9831d..aa396a0162f 100644 --- a/mirrord/cli/src/container.rs +++ b/mirrord/cli/src/container.rs @@ -2,7 +2,6 @@ use std::{ collections::HashMap, io::Write, net::SocketAddr, - ops::Not, path::{Path, PathBuf}, process::Stdio, time::Duration, @@ -15,7 +14,6 @@ use mirrord_config::{ external_proxy::{MIRRORD_EXTERNAL_TLS_CERTIFICATE_ENV, MIRRORD_EXTERNAL_TLS_KEY_ENV}, internal_proxy::{ MIRRORD_INTPROXY_CLIENT_TLS_CERTIFICATE_ENV, MIRRORD_INTPROXY_CLIENT_TLS_KEY_ENV, - MIRRORD_INTPROXY_CONTAINER_MODE_ENV, }, LayerConfig, MIRRORD_CONFIG_FILE_ENV, }; @@ -28,20 +26,22 @@ use tokio::{ use tracing::Level; use crate::{ - config::{ContainerRuntime, ContainerRuntimeCommand, ExecParams, RuntimeArgs}, + config::{ContainerRuntime, ExecParams, RuntimeArgs}, connection::AGENT_CONNECT_INFO_ENV_KEY, - container::command_builder::RuntimeCommandBuilder, + container::{command_builder::RuntimeCommandBuilder, sidecar::Sidecar}, error::{CliError, CliResult, ContainerError}, execution::{ MirrordExecution, LINUX_INJECTION_ENV_VAR, MIRRORD_CONNECT_TCP_ENV, MIRRORD_EXECUTION_KIND_ENV, }, + logging::pipe_intproxy_sidecar_logs, util::MIRRORD_CONSOLE_ADDR_ENV, }; static CONTAINER_EXECUTION_KIND: ExecutionKind = ExecutionKind::Container; mod command_builder; +mod sidecar; /// Format [`Command`] to look like the executated command (currently without env because we don't /// use it in these scenarios) @@ -65,10 +65,8 @@ async fn exec_and_get_first_line(command: &mut Command) -> Result .spawn() .map_err(ContainerError::UnableToExecuteCommand)?; - tracing::warn!(?child, "spawned watch for child"); - let stdout = child.stdout.take().expect("stdout should be piped"); - let stderr = child.stderr.take().expect("stdout should be piped"); + let stderr = child.stderr.take().expect("stderr should be piped"); let result = tokio::time::timeout(Duration::from_secs(30), async { BufReader::new(stdout) @@ -151,115 +149,6 @@ fn create_self_signed_certificate( Ok((certificate, private_key)) } -/// Create a "sidecar" container that is running `mirrord intproxy` that connects to `mirrord -/// extproxy` running on user machine to be used by execution container (via mounting on same -/// network) -#[tracing::instrument(level = Level::TRACE, ret)] -async fn create_sidecar_intproxy( - config: &LayerConfig, - base_command: &RuntimeCommandBuilder, - connection_info: Vec<(&str, &str)>, -) -> Result<(String, SocketAddr), ContainerError> { - let mut sidecar_command = base_command.clone(); - - sidecar_command.add_env(MIRRORD_INTPROXY_CONTAINER_MODE_ENV, "true"); - sidecar_command.add_envs(connection_info); - - let cleanup = config.container.cli_prevent_cleanup.not().then_some("--rm"); - - let sidecar_container_command = ContainerRuntimeCommand::run( - config - .container - .cli_extra_args - .iter() - .map(String::as_str) - .chain(cleanup) - .chain(["-d", &config.container.cli_image, "mirrord", "intproxy"]), - ); - - let (runtime_binary, sidecar_args) = sidecar_command - .with_command(sidecar_container_command) - .into_command_args(); - - let mut sidecar_container_spawn = Command::new(&runtime_binary); - - sidecar_container_spawn.args(sidecar_args); - - let sidecar_container_id = exec_and_get_first_line(&mut sidecar_container_spawn) - .await? - .ok_or_else(|| { - ContainerError::UnsuccesfulCommandOutput( - format_command(&sidecar_container_spawn), - "stdout and stderr were empty".to_owned(), - ) - })?; - - // For Docker runtime sometimes the sidecar doesn't start so we double check. - // See [#2927](https://github.com/metalbear-co/mirrord/issues/2927) - if matches!(base_command.runtime(), ContainerRuntime::Docker) { - let mut container_inspect_command = Command::new(&runtime_binary); - container_inspect_command - .args(["inspect", &sidecar_container_id]) - .stdout(Stdio::piped()); - - let container_inspect_output = container_inspect_command.output().await.map_err(|err| { - ContainerError::UnsuccesfulCommandOutput( - format_command(&container_inspect_command), - err.to_string(), - ) - })?; - - let (container_inspection,) = - serde_json::from_slice::<(serde_json::Value,)>(&container_inspect_output.stdout) - .unwrap_or_default(); - - let container_status = container_inspection - .get("State") - .and_then(|inspect| inspect.get("Status")); - - if container_status - .map(|status| status == "created") - .unwrap_or(false) - { - let mut container_start_command = Command::new(&runtime_binary); - - container_start_command - .args(["start", &sidecar_container_id]) - .stdin(Stdio::null()) - .stdout(Stdio::null()) - .stderr(Stdio::null()); - - let _ = container_start_command.status().await.map_err(|err| { - ContainerError::UnsuccesfulCommandOutput( - format_command(&container_start_command), - err.to_string(), - ) - })?; - } - } - - // After spawning sidecar with -d flag it prints container_id, now we need the address of - // intproxy running in sidecar to be used by mirrord-layer in execution container - let intproxy_address: SocketAddr = { - let mut attach_command = Command::new(&runtime_binary); - attach_command.args(["logs", "-f", &sidecar_container_id]); - - match exec_and_get_first_line(&mut attach_command).await? { - Some(line) => line - .parse() - .map_err(ContainerError::UnableParseProxySocketAddr)?, - None => { - return Err(ContainerError::UnsuccesfulCommandOutput( - format_command(&attach_command), - "stdout and stderr were empty".into(), - )) - } - } - }; - - Ok((sidecar_container_id, intproxy_address)) -} - type TlsGuard = (NamedTempFile, NamedTempFile); fn prepare_tls_certs_for_container( @@ -315,34 +204,15 @@ fn prepare_tls_certs_for_container( Ok((internal_proxy_tls_guards, external_proxy_tls_guards)) } -/// Main entry point for the `mirrord container` command. -/// This spawns: "agent" - "external proxy" - "intproxy sidecar" - "execution container" -pub(crate) async fn container_command( - runtime_args: RuntimeArgs, - exec_params: ExecParams, +/// Load [`LayerConfig`] from env and create [`AnalyticsReporter`] whilst reporting any warnings. +fn create_config_and_analytics( + progress: &mut P, watch: drain::Watch, -) -> CliResult { - let mut progress = ProgressTracker::from_env("mirrord container"); - - if runtime_args.command.has_publish() { - progress.warning("mirrord container may have problems with \"-p\" directly container in command, please add to \"contanier.cli_extra_args\" in config if you are planning to publish ports"); - } - - progress.warning("mirrord container is currently an unstable feature"); - - for (name, value) in exec_params.as_env_vars()? { - std::env::set_var(name, value); - } - - std::env::set_var( - MIRRORD_EXECUTION_KIND_ENV, - (CONTAINER_EXECUTION_KIND as u32).to_string(), - ); - - let (mut config, mut context) = LayerConfig::from_env_with_warnings()?; +) -> CliResult<(LayerConfig, AnalyticsReporter)> { + let (config, mut context) = LayerConfig::from_env_with_warnings()?; // Initialize only error analytics, extproxy will be the full AnalyticsReporter. - let mut analytics = + let analytics = AnalyticsReporter::only_error(config.telemetry, CONTAINER_EXECUTION_KIND, watch); config.verify(&mut context)?; @@ -350,16 +220,22 @@ pub(crate) async fn container_command( progress.warning(warning); } - let (_internal_proxy_tls_guards, _external_proxy_tls_guards) = - prepare_tls_certs_for_container(&mut config)?; - - let composed_config_file = create_composed_config(&config)?; - std::env::set_var(MIRRORD_CONFIG_FILE_ENV, composed_config_file.path()); + Ok((config, analytics)) +} +/// Create [`RuntimeCommandBuilder`] with the corresponding [`Sidecar`] connected to +/// [`MirrordExecution`] as extproxy. +async fn create_runtime_command_with_sidecar( + analytics: &mut AnalyticsReporter, + progress: &mut P, + config: &LayerConfig, + composed_config_path: &Path, + runtime: ContainerRuntime, +) -> CliResult<(RuntimeCommandBuilder, Sidecar, MirrordExecution)> { let mut sub_progress = progress.subtask("preparing to launch process"); let execution_info = - MirrordExecution::start_external(&config, &mut sub_progress, &mut analytics).await?; + MirrordExecution::start_external(config, &mut sub_progress, analytics).await?; let mut connection_info = Vec::new(); let mut execution_info_env_without_connection_info = Vec::new(); @@ -374,7 +250,7 @@ pub(crate) async fn container_command( sub_progress.success(None); - let mut runtime_command = RuntimeCommandBuilder::new(runtime_args.runtime); + let mut runtime_command = RuntimeCommandBuilder::new(runtime); if let Ok(console_addr) = std::env::var(MIRRORD_CONSOLE_ADDR_ENV) { if console_addr @@ -398,8 +274,7 @@ pub(crate) async fn container_command( ); runtime_command.add_env(MIRRORD_CONFIG_FILE_ENV, "/tmp/mirrord-config.json"); - runtime_command - .add_volume::(composed_config_file.path(), "/tmp/mirrord-config.json"); + runtime_command.add_volume::(composed_config_path, "/tmp/mirrord-config.json"); let mut load_env_and_mount_pem = |env: &str, path: &Path| { let container_path = format!("/tmp/{}.pem", env.to_lowercase()); @@ -426,11 +301,57 @@ pub(crate) async fn container_command( runtime_command.add_envs(execution_info_env_without_connection_info); - let (sidecar_container_id, sidecar_intproxy_address) = - create_sidecar_intproxy(&config, &runtime_command, connection_info).await?; + let sidecar = Sidecar::create_intproxy(config, &runtime_command, connection_info).await?; + + runtime_command.add_network(sidecar.as_network()); + runtime_command.add_volumes_from(&sidecar.container_id); + + Ok((runtime_command, sidecar, execution_info)) +} + +/// Main entry point for the `mirrord container` command. +/// This spawns: "agent" - "external proxy" - "intproxy sidecar" - "execution container" +pub(crate) async fn container_command( + runtime_args: RuntimeArgs, + exec_params: ExecParams, + watch: drain::Watch, +) -> CliResult { + let mut progress = ProgressTracker::from_env("mirrord container"); + + if runtime_args.command.has_publish() { + progress.warning("mirrord container may have problems with \"-p\" when used as part of container run command, please add the publish arguments to \"contanier.cli_extra_args\" in config if you are planning to publish ports"); + } + + progress.warning("mirrord container is currently an unstable feature"); + + for (name, value) in exec_params.as_env_vars()? { + std::env::set_var(name, value); + } + + std::env::set_var( + MIRRORD_EXECUTION_KIND_ENV, + (CONTAINER_EXECUTION_KIND as u32).to_string(), + ); - runtime_command.add_network(format!("container:{sidecar_container_id}")); - runtime_command.add_volumes_from(sidecar_container_id); + let (mut config, mut analytics) = create_config_and_analytics(&mut progress, watch)?; + + let (_internal_proxy_tls_guards, _external_proxy_tls_guards) = + prepare_tls_certs_for_container(&mut config)?; + + let composed_config_file = create_composed_config(&config)?; + std::env::set_var(MIRRORD_CONFIG_FILE_ENV, composed_config_file.path()); + + let (mut runtime_command, sidecar, _execution_info) = create_runtime_command_with_sidecar( + &mut analytics, + &mut progress, + &config, + composed_config_file.path(), + runtime_args.runtime, + ) + .await?; + + let (sidecar_intproxy_address, sidecar_intproxy_logs) = sidecar.start().await?; + tokio::spawn(pipe_intproxy_sidecar_logs(&config, sidecar_intproxy_logs).await?); runtime_command.add_env(LINUX_INJECTION_ENV_VAR, config.container.cli_image_lib_path); runtime_command.add_env( @@ -505,15 +426,8 @@ pub(crate) async fn container_ext_command( std::env::set_var("MIRRORD_IMPERSONATED_TARGET", target.clone()); env.insert("MIRRORD_IMPERSONATED_TARGET".into(), target.to_string()); } - let (mut config, mut context) = LayerConfig::from_env_with_warnings()?; - - // Initialize only error analytics, extproxy will be the full AnalyticsReporter. - let mut analytics = AnalyticsReporter::only_error(config.telemetry, Default::default(), watch); - config.verify(&mut context)?; - for warning in context.get_warnings() { - progress.warning(warning); - } + let (mut config, mut analytics) = create_config_and_analytics(&mut progress, watch)?; let (_internal_proxy_tls_guards, _external_proxy_tls_guards) = prepare_tls_certs_for_container(&mut config)?; @@ -521,86 +435,22 @@ pub(crate) async fn container_ext_command( let composed_config_file = create_composed_config(&config)?; std::env::set_var(MIRRORD_CONFIG_FILE_ENV, composed_config_file.path()); - let mut sub_progress = progress.subtask("preparing to launch process"); - - let execution_info = - MirrordExecution::start_external(&config, &mut sub_progress, &mut analytics).await?; - - let mut connection_info = Vec::new(); - let mut execution_info_env_without_connection_info = Vec::new(); - - for (key, value) in &execution_info.environment { - if key == MIRRORD_CONNECT_TCP_ENV || key == AGENT_CONNECT_INFO_ENV_KEY { - connection_info.push((key.as_str(), value.as_str())); - } else { - execution_info_env_without_connection_info.push((key.as_str(), value.as_str())) - } - } - - sub_progress.success(None); - let container_runtime = std::env::var("MIRRORD_CONTAINER_USE_RUNTIME") .ok() .and_then(|value| ContainerRuntime::from_str(&value, true).ok()) .unwrap_or(ContainerRuntime::Docker); - let mut runtime_command = RuntimeCommandBuilder::new(container_runtime); - - if let Ok(console_addr) = std::env::var(MIRRORD_CONSOLE_ADDR_ENV) { - if console_addr - .parse() - .map(|addr: SocketAddr| !addr.ip().is_loopback()) - .unwrap_or_default() - { - runtime_command.add_env(MIRRORD_CONSOLE_ADDR_ENV, console_addr); - } else { - tracing::warn!( - ?console_addr, - "{MIRRORD_CONSOLE_ADDR_ENV} needs to be a non loopback address when used with containers" - ); - } - } - - runtime_command.add_env(MIRRORD_PROGRESS_ENV, "off"); - runtime_command.add_env( - MIRRORD_EXECUTION_KIND_ENV, - (CONTAINER_EXECUTION_KIND as u32).to_string(), - ); - - runtime_command.add_env(MIRRORD_CONFIG_FILE_ENV, "/tmp/mirrord-config.json"); - runtime_command - .add_volume::(composed_config_file.path(), "/tmp/mirrord-config.json"); - - let mut load_env_and_mount_pem = |env: &str, path: &Path| { - let container_path = format!("/tmp/{}.pem", env.to_lowercase()); - - runtime_command.add_env(env, &container_path); - runtime_command.add_volume::(path, container_path); - }; - - if let Some(path) = config.internal_proxy.client_tls_certificate.as_ref() { - load_env_and_mount_pem(MIRRORD_INTPROXY_CLIENT_TLS_CERTIFICATE_ENV, path) - } - - if let Some(path) = config.internal_proxy.client_tls_key.as_ref() { - load_env_and_mount_pem(MIRRORD_INTPROXY_CLIENT_TLS_KEY_ENV, path) - } - - if let Some(path) = config.external_proxy.tls_certificate.as_ref() { - load_env_and_mount_pem(MIRRORD_EXTERNAL_TLS_CERTIFICATE_ENV, path) - } - - if let Some(path) = config.external_proxy.tls_key.as_ref() { - load_env_and_mount_pem(MIRRORD_EXTERNAL_TLS_KEY_ENV, path) - } - - runtime_command.add_envs(execution_info_env_without_connection_info); - - let (sidecar_container_id, sidecar_intproxy_address) = - create_sidecar_intproxy(&config, &runtime_command, connection_info).await?; - - runtime_command.add_network(format!("container:{sidecar_container_id}")); - runtime_command.add_volumes_from(sidecar_container_id); + let (mut runtime_command, sidecar, execution_info) = create_runtime_command_with_sidecar( + &mut analytics, + &mut progress, + &config, + composed_config_file.path(), + container_runtime, + ) + .await?; + + let (sidecar_intproxy_address, sidecar_intproxy_logs) = sidecar.start().await?; + tokio::spawn(pipe_intproxy_sidecar_logs(&config, sidecar_intproxy_logs).await?); runtime_command.add_env(LINUX_INJECTION_ENV_VAR, config.container.cli_image_lib_path); runtime_command.add_env( diff --git a/mirrord/cli/src/container/command_builder.rs b/mirrord/cli/src/container/command_builder.rs index 20e0fac883b..c5bf3a6ae39 100644 --- a/mirrord/cli/src/container/command_builder.rs +++ b/mirrord/cli/src/container/command_builder.rs @@ -17,10 +17,6 @@ pub struct RuntimeCommandBuilder { } impl RuntimeCommandBuilder { - pub(super) fn runtime(&self) -> &ContainerRuntime { - &self.runtime - } - fn push_arg(&mut self, value: V) where V: Into, @@ -152,13 +148,12 @@ impl RuntimeCommandBuilder { step, } = self; - let (runtime_command, runtime_args) = match step.command { - ContainerRuntimeCommand::Run { runtime_args } => ("run".to_owned(), runtime_args), - }; + let (runtime_command, runtime_args) = step.command.into_parts(); ( runtime.to_string(), - std::iter::once(runtime_command) + runtime_command + .into_iter() .chain(extra_args) .chain(runtime_args), ) diff --git a/mirrord/cli/src/container/sidecar.rs b/mirrord/cli/src/container/sidecar.rs new file mode 100644 index 00000000000..28641070058 --- /dev/null +++ b/mirrord/cli/src/container/sidecar.rs @@ -0,0 +1,126 @@ +use std::{net::SocketAddr, ops::Not, process::Stdio, time::Duration}; + +use mirrord_config::{internal_proxy::MIRRORD_INTPROXY_CONTAINER_MODE_ENV, LayerConfig}; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + process::{ChildStderr, ChildStdout, Command}, +}; +use tokio_stream::{wrappers::LinesStream, StreamExt}; +use tracing::Level; + +use crate::{ + config::ContainerRuntimeCommand, + container::{command_builder::RuntimeCommandBuilder, exec_and_get_first_line, format_command}, + error::ContainerError, +}; + +#[derive(Debug)] +pub(crate) struct Sidecar { + pub container_id: String, + pub runtime_binary: String, +} + +impl Sidecar { + /// Create a "sidecar" container that is running `mirrord intproxy` that connects to `mirrord + /// extproxy` running on user machine to be used by execution container (via mounting on same + /// network) + #[tracing::instrument(level = Level::TRACE)] + pub async fn create_intproxy( + config: &LayerConfig, + base_command: &RuntimeCommandBuilder, + connection_info: Vec<(&str, &str)>, + ) -> Result { + let mut sidecar_command = base_command.clone(); + + sidecar_command.add_env(MIRRORD_INTPROXY_CONTAINER_MODE_ENV, "true"); + sidecar_command.add_envs(connection_info); + + let cleanup = config.container.cli_prevent_cleanup.not().then_some("--rm"); + + let sidecar_container_command = ContainerRuntimeCommand::create( + config + .container + .cli_extra_args + .iter() + .map(String::as_str) + .chain(cleanup) + .chain([&config.container.cli_image, "mirrord", "intproxy"]), + ); + + let (runtime_binary, sidecar_args) = sidecar_command + .with_command(sidecar_container_command) + .into_command_args(); + + let mut sidecar_container_spawn = Command::new(&runtime_binary); + sidecar_container_spawn.args(sidecar_args); + + let container_id = exec_and_get_first_line(&mut sidecar_container_spawn) + .await? + .ok_or_else(|| { + ContainerError::UnsuccesfulCommandOutput( + format_command(&sidecar_container_spawn), + "stdout and stderr were empty".to_owned(), + ) + })?; + + Ok(Sidecar { + container_id, + runtime_binary, + }) + } + + pub fn as_network(&self) -> String { + let Sidecar { container_id, .. } = self; + format!("container:{container_id}") + } + + #[tracing::instrument(level = Level::TRACE)] + pub async fn start(&self) -> Result<(SocketAddr, SidecarLogs), ContainerError> { + let mut command = Command::new(&self.runtime_binary); + command.args(["start", "--attach", &self.container_id]); + + let mut child = command + .stdin(Stdio::null()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(ContainerError::UnableToExecuteCommand)?; + + let mut stdout = + BufReader::new(child.stdout.take().expect("stdout should be piped")).lines(); + let stderr = BufReader::new(child.stderr.take().expect("stderr should be piped")).lines(); + + let first_line = tokio::time::timeout(Duration::from_secs(30), async { + stdout.next_line().await.map_err(|error| { + ContainerError::UnableReadCommandStdout(format_command(&command), error) + }) + }) + .await + .map_err(|_| { + ContainerError::UnsuccesfulCommandOutput( + format_command(&command), + "timeout reached for reading first line".into(), + ) + })?? + .ok_or_else(|| { + ContainerError::UnsuccesfulCommandOutput( + format_command(&command), + "unexpected EOF".into(), + ) + })?; + + let internal_proxy_addr: SocketAddr = first_line + .parse() + .map_err(ContainerError::UnableParseProxySocketAddr)?; + + Ok(( + internal_proxy_addr, + LinesStream::new(stdout).merge(LinesStream::new(stderr)), + )) + } +} + +type SidecarLogs = tokio_stream::adapters::Merge< + LinesStream>, + LinesStream>, +>; diff --git a/mirrord/cli/src/external_proxy.rs b/mirrord/cli/src/external_proxy.rs index b728416b83a..a6ab4bb8674 100644 --- a/mirrord/cli/src/external_proxy.rs +++ b/mirrord/cli/src/external_proxy.rs @@ -20,7 +20,7 @@ //! ``` use std::{ - fs::{File, OpenOptions}, + fs::File, io, io::BufReader, net::{Ipv4Addr, SocketAddr}, @@ -41,13 +41,13 @@ use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::server::TlsStream; use tokio_util::{either::Either, sync::CancellationToken}; use tracing::Level; -use tracing_subscriber::EnvFilter; use crate::{ connection::AGENT_CONNECT_INFO_ENV_KEY, error::{CliResult, ExternalProxyError}, execution::MIRRORD_EXECUTION_KIND_ENV, internal_proxy::connect_and_ping, + logging::init_extproxy_tracing_registry, util::{create_listen_socket, detach_io}, }; @@ -62,28 +62,9 @@ fn print_addr(listener: &TcpListener) -> io::Result<()> { pub async fn proxy(listen_port: u16, watch: drain::Watch) -> CliResult<()> { let config = LayerConfig::from_env()?; + init_extproxy_tracing_registry(&config)?; tracing::info!(?config, "external_proxy starting"); - if let Some(log_destination) = config.external_proxy.log_destination.as_ref() { - let output_file = OpenOptions::new() - .create(true) - .append(true) - .open(log_destination) - .map_err(|e| ExternalProxyError::OpenLogFile(log_destination.clone(), e))?; - - let tracing_registry = tracing_subscriber::fmt() - .with_writer(output_file) - .with_ansi(false); - - if let Some(log_level) = config.external_proxy.log_level.as_ref() { - tracing_registry - .with_env_filter(EnvFilter::builder().parse_lossy(log_level)) - .init(); - } else { - tracing_registry.init(); - } - } - let agent_connect_info = std::env::var(AGENT_CONNECT_INFO_ENV_KEY) .ok() .map(|var| { diff --git a/mirrord/cli/src/internal_proxy.rs b/mirrord/cli/src/internal_proxy.rs index 1c8763a0d92..b1b9a20b72a 100644 --- a/mirrord/cli/src/internal_proxy.rs +++ b/mirrord/cli/src/internal_proxy.rs @@ -11,12 +11,9 @@ //! or let the [`OperatorApi`](mirrord_operator::client::OperatorApi) handle the connection. use std::{ - env, - fs::OpenOptions, - io, + env, io, net::{Ipv4Addr, SocketAddr}, - path::PathBuf, - time::{Duration, SystemTime}, + time::Duration, }; use mirrord_analytics::{AnalyticsReporter, CollectAnalytics, Reporter}; @@ -28,15 +25,14 @@ use mirrord_intproxy::{ }; use mirrord_protocol::{ClientMessage, DaemonMessage, LogLevel, LogMessage}; use nix::sys::resource::{setrlimit, Resource}; -use rand::{distributions::Alphanumeric, Rng}; use tokio::net::TcpListener; use tracing::{warn, Level}; -use tracing_subscriber::EnvFilter; use crate::{ connection::AGENT_CONNECT_INFO_ENV_KEY, error::{CliResult, InternalProxyError}, execution::MIRRORD_EXECUTION_KIND_ENV, + logging::init_intproxy_tracing_registry, util::{create_listen_socket, detach_io}, }; @@ -56,44 +52,9 @@ pub(crate) async fn proxy( ) -> CliResult<(), InternalProxyError> { let config = LayerConfig::from_env()?; + init_intproxy_tracing_registry(&config)?; tracing::info!(?config, "internal_proxy starting"); - // Setting up default logging for intproxy. - let log_destination = config - .internal_proxy - .log_destination - .as_ref() - .map(PathBuf::from) - .unwrap_or_else(|| { - let random_name: String = rand::thread_rng() - .sample_iter(&Alphanumeric) - .take(7) - .map(char::from) - .collect(); - let timestamp = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_secs(); - - PathBuf::from(format!( - "/tmp/mirrord-intproxy-{timestamp}-{random_name}.log" - )) - }); - - let output_file = OpenOptions::new() - .create(true) - .append(true) - .open(&log_destination) - .map_err(|fail| { - InternalProxyError::OpenLogFile(log_destination.to_string_lossy().to_string(), fail) - })?; - - let log_level = config.internal_proxy.log_level.as_deref().unwrap_or("info"); - - tracing_subscriber::fmt() - .with_writer(output_file) - .with_ansi(false) - .with_env_filter(EnvFilter::builder().parse_lossy(log_level)) - .pretty() - .init(); - // According to https://wilsonmar.github.io/maximum-limits/ this is the limit on macOS // so we assume Linux can be higher and set to that. if let Err(error) = setrlimit(Resource::RLIMIT_NOFILE, 12288, 12288) { diff --git a/mirrord/cli/src/logging.rs b/mirrord/cli/src/logging.rs new file mode 100644 index 00000000000..8b9fba787ba --- /dev/null +++ b/mirrord/cli/src/logging.rs @@ -0,0 +1,192 @@ +use std::{ + fs::OpenOptions, + future::Future, + path::{Path, PathBuf}, + time::SystemTime, +}; + +use futures::StreamExt; +use mirrord_config::LayerConfig; +use rand::{distributions::Alphanumeric, Rng}; +use tokio::io::AsyncWriteExt; +use tokio_stream::Stream; +use tracing_subscriber::{prelude::*, EnvFilter}; + +use crate::{ + config::Commands, + error::{CliError, ExternalProxyError, InternalProxyError}, +}; + +// only ls and ext commands need the errors in json format +// error logs are disabled for extensions +fn init_ext_error_handler(commands: &Commands) -> bool { + match commands { + Commands::ListTargets(_) | Commands::ExtensionExec(_) => { + let _ = miette::set_hook(Box::new(|_| Box::new(miette::JSONReportHandler::new()))); + + true + } + _ => false, + } +} + +pub async fn init_tracing_registry( + command: &Commands, + watch: drain::Watch, +) -> Result<(), CliError> { + if let Ok(console_addr) = std::env::var("MIRRORD_CONSOLE_ADDR") { + mirrord_console::init_async_logger(&console_addr, watch.clone(), 124).await?; + + return Ok(()); + } + + if matches!( + command, + Commands::InternalProxy { .. } | Commands::ExternalProxy { .. } + ) { + return Ok(()); + } + + // There are situations where even if running "ext" commands that shouldn't log, we want those + // to log to be able to debug issues. + let force_log = std::env::var("MIRRORD_FORCE_LOG") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(false); + + if force_log || init_ext_error_handler(command) { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_writer(std::io::stderr)) + .with(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + } + + Ok(()) +} + +fn default_logfile_path(prefix: &str) -> PathBuf { + let random_name: String = rand::thread_rng() + .sample_iter(&Alphanumeric) + .take(7) + .map(char::from) + .collect(); + let timestamp = SystemTime::UNIX_EPOCH.elapsed().unwrap().as_secs(); + + PathBuf::from(format!("/tmp/{prefix}-{timestamp}-{random_name}.log")) +} + +fn init_proxy_tracing_registry( + log_destination: &Path, + log_level: Option<&str>, +) -> std::io::Result<()> { + if std::env::var("MIRRORD_CONSOLE_ADDR").is_ok() { + return Ok(()); + } + + let output_file = OpenOptions::new() + .create(true) + .append(true) + .open(log_destination)?; + + let env_filter = log_level + .map(|log_level| EnvFilter::builder().parse_lossy(log_level)) + .unwrap_or_else(EnvFilter::from_default_env); + + tracing_subscriber::fmt() + .with_writer(output_file) + .with_ansi(false) + .with_env_filter(env_filter) + .pretty() + .init(); + + Ok(()) +} + +pub fn init_intproxy_tracing_registry(config: &LayerConfig) -> Result<(), InternalProxyError> { + if !config.internal_proxy.container_mode { + // Setting up default logging for intproxy. + let log_destination = config + .internal_proxy + .log_destination + .as_ref() + .map(PathBuf::from) + .unwrap_or_else(|| default_logfile_path("mirrord-intproxy")); + + init_proxy_tracing_registry(&log_destination, config.internal_proxy.log_level.as_deref()) + .map_err(|fail| { + InternalProxyError::OpenLogFile(log_destination.to_string_lossy().to_string(), fail) + }) + } else { + let env_filter = config + .internal_proxy + .log_level + .as_ref() + .map(|log_level| EnvFilter::builder().parse_lossy(log_level)) + .unwrap_or_else(EnvFilter::from_default_env); + + tracing_subscriber::fmt() + .with_writer(std::io::stderr) + .with_ansi(false) + .with_env_filter(env_filter) + .pretty() + .init(); + + Ok(()) + } +} + +pub fn init_extproxy_tracing_registry(config: &LayerConfig) -> Result<(), ExternalProxyError> { + // Setting up default logging for extproxy. + let log_destination = config + .external_proxy + .log_destination + .as_ref() + .map(PathBuf::from) + .unwrap_or_else(|| default_logfile_path("mirrord-extproxy")); + + init_proxy_tracing_registry(&log_destination, config.external_proxy.log_level.as_deref()) + .map_err(|fail| { + ExternalProxyError::OpenLogFile(log_destination.to_string_lossy().to_string(), fail) + }) +} + +pub async fn pipe_intproxy_sidecar_logs<'s, S>( + config: &LayerConfig, + stream: S, +) -> Result + 's, InternalProxyError> +where + S: Stream> + 's, +{ + let log_destination = config + .internal_proxy + .log_destination + .as_ref() + .map(PathBuf::from) + .unwrap_or_else(|| default_logfile_path("mirrord-intproxy")); + + let mut output_file = tokio::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&log_destination) + .await + .map_err(|fail| { + InternalProxyError::OpenLogFile(log_destination.to_string_lossy().to_string(), fail) + })?; + + Ok(async move { + let mut stream = std::pin::pin!(stream); + + while let Some(line) = stream.next().await { + let result: std::io::Result<_> = try { + output_file.write_all(line?.as_bytes()).await?; + output_file.write_u8(b'\n').await?; + + output_file.flush().await?; + }; + + if let Err(error) = result { + tracing::error!(?error, "unable to pipe logs from intproxy"); + } + } + }) +} diff --git a/mirrord/cli/src/main.rs b/mirrord/cli/src/main.rs index 27923cab4fd..dde03d6a73e 100644 --- a/mirrord/cli/src/main.rs +++ b/mirrord/cli/src/main.rs @@ -18,7 +18,6 @@ use execution::MirrordExecution; use extension::extension_exec; use extract::extract_library; use kube::Client; -use miette::JSONReportHandler; use mirrord_analytics::{ AnalyticsError, AnalyticsReporter, CollectAnalytics, ExecutionKind, NullReporter, Reporter, }; @@ -45,7 +44,6 @@ use regex::Regex; use semver::{Version, VersionReq}; use serde_json::json; use tracing::{error, info, warn}; -use tracing_subscriber::{fmt, prelude::*, registry, EnvFilter}; use which::which; mod config; @@ -58,6 +56,7 @@ mod extension; mod external_proxy; mod extract; mod internal_proxy; +mod logging; mod operator; pub mod port_forward; mod teams; @@ -650,21 +649,8 @@ fn main() -> miette::Result<()> { let (signal, watch) = drain::channel(); - // There are situations where even if running "ext" commands that shouldn't log, we want those - // to log to be able to debug issues. - let force_log = std::env::var("MIRRORD_FORCE_LOG") - .map(|s| s.parse().unwrap_or(false)) - .unwrap_or(false); - let res: CliResult<(), CliError> = rt.block_on(async move { - if let Ok(console_addr) = std::env::var("MIRRORD_CONSOLE_ADDR") { - mirrord_console::init_async_logger(&console_addr, watch.clone(), 124).await?; - } else if force_log || !init_ext_error_handler(&cli.commands) { - registry() - .with(fmt::layer().with_writer(std::io::stderr)) - .with(EnvFilter::from_default_env()) - .init(); - } + logging::init_tracing_registry(&cli.commands, watch.clone()).await?; match cli.commands { Commands::Exec(args) => exec(&args, watch).await?, @@ -720,19 +706,6 @@ fn main() -> miette::Result<()> { res.map_err(Into::into) } -// only ls and ext commands need the errors in json format -// error logs are disabled for extensions -fn init_ext_error_handler(commands: &Commands) -> bool { - match commands { - Commands::ListTargets(_) | Commands::ExtensionExec(_) => { - let _ = miette::set_hook(Box::new(|_| Box::new(JSONReportHandler::new()))); - true - } - Commands::InternalProxy { .. } | Commands::ExternalProxy { .. } => true, - _ => false, - } -} - async fn prompt_outdated_version(progress: &ProgressTracker) { let mut progress = progress.subtask("version check"); let check_version: bool = std::env::var("MIRRORD_CHECK_VERSION") diff --git a/mirrord/intproxy/src/proxies/incoming/subscriptions.rs b/mirrord/intproxy/src/proxies/incoming/subscriptions.rs index 8439e79d8e1..7731707d8c8 100644 --- a/mirrord/intproxy/src/proxies/incoming/subscriptions.rs +++ b/mirrord/intproxy/src/proxies/incoming/subscriptions.rs @@ -242,6 +242,7 @@ impl SubscriptionsManager { Ok(subscription.confirm()) } + Err(ResponseError::PortAlreadyStolen(port)) => { let Some(subscription) = self.subscriptions.remove(&port) else { return Ok(vec![]); @@ -255,23 +256,30 @@ impl SubscriptionsManager { } } } + Err( - ref response_err @ ResponseError::Forbidden { - blocked_action: BlockedAction::Steal(ref steal_type), - .. + ref response_error @ ResponseError::Forbidden { + ref blocked_action, .. }, ) => { - tracing::warn!("Port subscribe blocked by policy: {response_err}"); - let Some(subscription) = self.subscriptions.remove(&steal_type.get_port()) else { + tracing::warn!(%response_error, "Port subscribe blocked by policy"); + + let port = match blocked_action { + BlockedAction::Steal(steal_type) => steal_type.get_port(), + BlockedAction::Mirror(port) => *port, + }; + let Some(subscription) = self.subscriptions.remove(&port) else { return Ok(vec![]); }; + subscription - .reject(response_err.clone()) - .map_err(|sub|{ - tracing::error!("Subscription {sub:?} was confirmed before, then requested again and blocked by a policy."); - IncomingProxyError::SubscriptionFailed(response_err.clone()) + .reject(response_error.clone()) + .map_err(|subscription|{ + tracing::error!(?subscription, "Subscription was confirmed before, then requested again and blocked by a policy."); + IncomingProxyError::SubscriptionFailed(response_error.clone()) }) } + Err(err) => Err(IncomingProxyError::SubscriptionFailed(err)), } }