From 74a5c20378ca0482d8967490ed3e3704befe225b Mon Sep 17 00:00:00 2001 From: Liam Date: Thu, 5 Dec 2024 17:58:07 +0000 Subject: [PATCH] wip: resolve conflicts, port retries in env client to use new tunable retries --- data-plane/src/env/client.rs | 248 ------------------ data-plane/src/env/mod.rs | 319 ++++++++++++++++-------- data-plane/src/health/mod.rs | 7 - data-plane/src/main.rs | 6 +- data-plane/src/server/server.rs | 2 +- data-plane/src/server/tls/tls_server.rs | 8 +- 6 files changed, 224 insertions(+), 366 deletions(-) delete mode 100644 data-plane/src/env/client.rs diff --git a/data-plane/src/env/client.rs b/data-plane/src/env/client.rs deleted file mode 100644 index 41d6f02f..00000000 --- a/data-plane/src/env/client.rs +++ /dev/null @@ -1,248 +0,0 @@ -use crate::{ - cert_provisioner_client::CertProvisionerClient, - config_client::ConfigClient, - e3client::{CryptoRequest, CryptoResponse, E3Api, E3Client}, - error::Error, - EnclaveContext, -}; -use serde_json::json; -use shared::server::config_server::requests::{GetSecretsResponseDataPlane, Secret}; -use std::{fs::File, future::Future, io::Write}; -use std::{fs::OpenOptions, marker::PhantomData}; - -use super::EnvError; - -/// Empty trait to enforce correctness in the Environment Loader — an Environment Loader can only wrap a known phase -/// which will expose the appropriate loader functions. -pub trait EnvLoadingPhaseMarker {} - -/// Phase for loading the Enclaves environment variables, decrypting them, and exposing them in the customer-env file. -pub struct NeedEnv; -impl EnvLoadingPhaseMarker for NeedEnv {} - -impl NeedEnv { - async fn decrypt_secrets( - loader: &EnvironmentLoader, - secrets: &Vec, - ) -> Result, Error> { - let (encrypted_env, plaintext_env): (_, Vec) = secrets - .clone() - .into_iter() - .partition(|env| env.secret.starts_with("ev:")); - - let mut plaintext_env = plaintext_env; - - if encrypted_env.is_empty() { - return Ok(plaintext_env); - } - - let e3_response: CryptoResponse = loader - .e3_client - .decrypt(CryptoRequest { - data: json!(encrypted_env.clone()), - }) - .await?; - let mut decrypted_env: Vec = serde_json::from_value(e3_response.data)?; - decrypted_env.append(&mut plaintext_env); - Ok(decrypted_env) - } - - fn write_env_file(secrets: Vec) -> Result<(), EnvError> { - let mut file = File::create("/etc/customer-env")?; - - let env_string = secrets - .iter() - .map(|env| format!("export {}={} ", env.name, env.secret)) - .collect::>() - .join(""); - - file.write_all(env_string.as_bytes())?; - Ok(()) - } - - async fn get_env( - loader: &EnvironmentLoader, - ) -> Result { - let cert_token = with_retries(|| async { - loader - .config_client - .get_cert_token() - .await - .map_err(crate::error::Error::from) - }) - .await?; - - let token = cert_token.token(); - let secrets_response = with_retries(|| async { - loader - .cert_provisioner_client - .get_secrets(token.clone()) - .await - .map_err(crate::error::Error::from) - }) - .await?; - - Ok(secrets_response) - } -} - -/// Phase for marking the environment as ready and unblocking the customer process' start up script. -pub struct Finalize; -impl EnvLoadingPhaseMarker for Finalize {} - -pub struct EnvironmentLoader

{ - phase: PhantomData

, - cert_provisioner_client: CertProvisionerClient, - config_client: ConfigClient, - e3_client: E3Client, -} - -#[cfg(feature = "tls_termination")] -mod tls_enabled { - use super::*; - use openssl::{ - pkey::{PKey, Private}, - x509::X509, - }; - - /// Phase for loading the intermediate CA details from the provisioner - pub struct NeedCert; - impl EnvLoadingPhaseMarker for NeedCert {} - - impl EnvironmentLoader { - /// Load the environment variables from the provisioner and transition to the next appropriate loading state - `NeedCert` - pub async fn load_env_vars(self) -> Result, Error> { - let secrets_response = NeedEnv::get_env(&self).await?; - - EnclaveContext::set(secrets_response.context.clone().into()); - - let customer_env = NeedEnv::decrypt_secrets(&self, &secrets_response.secrets).await?; - - NeedEnv::write_env_file(customer_env)?; - - Ok(EnvironmentLoader { - phase: PhantomData, - cert_provisioner_client: self.cert_provisioner_client, - config_client: self.config_client, - e3_client: self.e3_client, - }) - } - } - - impl EnvironmentLoader { - /// Load the intermediate CA details from the provisioner, returning them alongside the loader in the `Finalize` state. - pub async fn load_cert( - self, - ) -> Result<(EnvironmentLoader, X509, PKey), Error> { - let cert_token = - with_retries(|| async { self.config_client.get_cert_token().await }).await?; - - let token = cert_token.token(); - let cert_response = with_retries(|| async { - self.cert_provisioner_client - .get_cert(token.clone()) - .await - .map_err(|err| Error::CertServer(err.to_string())) - }) - .await?; - - let inter_ca_cert = parse_cert(cert_response.cert())?; - let inter_ca_key_pair = parse_key(cert_response.key_pair())?; - Ok(( - EnvironmentLoader { - phase: PhantomData, - cert_provisioner_client: self.cert_provisioner_client, - config_client: self.config_client, - e3_client: self.e3_client, - }, - inter_ca_cert, - inter_ca_key_pair, - )) - } - } - - fn parse_cert(raw_cert: String) -> Result { - let decoded_cert = - base64::decode(raw_cert).map_err(|err| Error::Crypto(err.to_string()))?; - X509::from_pem(&decoded_cert).map_err(|err| Error::Crypto(err.to_string())) - } - - fn parse_key(raw_key: String) -> Result, Error> { - let decoded_key = base64::decode(raw_key).map_err(|err| Error::Crypto(err.to_string()))?; - PKey::private_key_from_pem(&decoded_key).map_err(|err| Error::Crypto(err.to_string())) - } -} -#[cfg(feature = "tls_termination")] -pub use tls_enabled::*; - -#[cfg(not(feature = "tls_termination"))] -mod tls_disabled { - use super::*; - - impl EnvironmentLoader { - /// Load the environment variables from the provisioner and transition to the next appropriate loading state - `Finalize` - pub async fn load_env_vars(self) -> Result, Error> { - let secrets_response = NeedEnv::get_env(&self).await?; - - EnclaveContext::set(secrets_response.context.clone().into()); - - let customer_env = NeedEnv::decrypt_secrets(&self, &secrets_response.secrets).await?; - - NeedEnv::write_env_file(customer_env)?; - - Ok(EnvironmentLoader { - phase: PhantomData, - cert_provisioner_client: self.cert_provisioner_client, - config_client: self.config_client, - e3_client: self.e3_client, - }) - } - } -} -#[cfg(not(feature = "tls_termination"))] -pub use tls_disabled::*; - -impl EnvironmentLoader { - pub fn finalize_env(self) -> Result<(), Error> { - write_startup_complete_env_vars()?; - Ok(()) - } -} - -/// Get an environment variable loader in the default state - `NeedEnv` -/// This is the only way an EnvironmentLoader should be built. This constraint is enforced by leaving the attributes private. -pub fn init_environment_loader() -> EnvironmentLoader { - EnvironmentLoader { - phase: PhantomData, - cert_provisioner_client: Default::default(), - config_client: Default::default(), - e3_client: Default::default(), - } -} - -async fn with_retries(func: F) -> Result -where - F: Fn() -> Fut, - Fut: Future>, -{ - let mut attempts = 0; - loop { - attempts += 1; - match func().await { - Ok(response) => return Ok(response), - Err(e) if attempts < 3 => { - log::error!("Request failed during environment init flow - {e:?}"); - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - } - Err(e) => return Err(e), - } - } -} - -pub fn write_startup_complete_env_vars() -> Result<(), Error> { - let mut file = OpenOptions::new().append(true).open("/etc/customer-env")?; - - write!(file, "export EV_INITIALIZED=true")?; - - Ok(()) -} diff --git a/data-plane/src/env/mod.rs b/data-plane/src/env/mod.rs index 1c54e5cb..c1a1b951 100644 --- a/data-plane/src/env/mod.rs +++ b/data-plane/src/env/mod.rs @@ -1,20 +1,18 @@ -use crate::cert_provisioner_client::CertProvisionerClient; -use crate::config_client::ConfigClient; use crate::{base_tls_client::ClientError, ContextError}; +use crate::{ + cert_provisioner_client::CertProvisionerClient, + config_client::ConfigClient, + e3client::{CryptoRequest, CryptoResponse, E3Api, E3Client}, + error::Error, + EnclaveContext, +}; use hyper::header::InvalidHeaderValue; use serde_json::json; -use shared::server::config_server::requests::Secret; -use std::future::Future; -use std::{ - fs::{File, OpenOptions}, - io::Write, -}; +use shared::server::config_server::requests::{GetSecretsResponseDataPlane, Secret}; +use std::{fs::File, future::Future, io::Write}; +use std::{fs::OpenOptions, marker::PhantomData}; use thiserror::Error; -pub mod client; - -use crate::e3client::{CryptoRequest, CryptoResponse, E3Api, E3Client}; - #[derive(Debug, Error)] pub enum EnvError { #[error("{0}")] @@ -33,26 +31,19 @@ pub enum EnvError { ContextError(#[from] ContextError), } -#[derive(Clone)] -pub struct Environment { - pub cert_provisioner_client: CertProvisionerClient, - pub config_client: ConfigClient, - pub e3_client: E3Client, -} +/// Empty trait to enforce correctness in the Environment Loader — an Environment Loader can only wrap a known phase +/// which will expose the appropriate loader functions. +pub trait EnvLoadingPhaseMarker {} -impl Environment { - pub fn new() -> Environment { - let cert_provisioner_client = CertProvisionerClient::new(); - let e3_client = E3Client::new(); - let config_client = ConfigClient::new(); - Environment { - cert_provisioner_client, - e3_client, - config_client, - } - } +/// Phase for loading the Enclaves environment variables, decrypting them, and exposing them in the customer-env file. +pub struct NeedEnv; +impl EnvLoadingPhaseMarker for NeedEnv {} - pub async fn init(self, secrets: Vec) -> Result<(), EnvError> { +impl NeedEnv { + async fn decrypt_secrets( + loader: &EnvironmentLoader, + secrets: &Vec, + ) -> Result, Error> { let (encrypted_env, plaintext_env): (_, Vec) = secrets .clone() .into_iter() @@ -60,108 +51,237 @@ impl Environment { let mut plaintext_env = plaintext_env; - if !encrypted_env.is_empty() { - let e3_response: CryptoResponse = self - .e3_client - .decrypt(CryptoRequest { - data: json!(encrypted_env.clone()), - }) - .await?; - let mut decrypted_env: Vec = serde_json::from_value(e3_response.data)?; - decrypted_env.append(&mut plaintext_env); - self.write_env_file(decrypted_env.clone())?; - } else { - self.write_env_file(plaintext_env.clone())?; + if encrypted_env.is_empty() { + return Ok(plaintext_env); } - Ok(()) + let e3_response: CryptoResponse = loader + .e3_client + .decrypt(CryptoRequest { + data: json!(encrypted_env.clone()), + }) + .await?; + let mut decrypted_env: Vec = serde_json::from_value(e3_response.data)?; + decrypted_env.append(&mut plaintext_env); + Ok(decrypted_env) } - #[cfg(not(feature = "tls_termination"))] - async fn with_retries( - backoff: u64, - n_attempts: u32, - upper_bound: u64, - func: F, - ) -> Result - where - F: Fn() -> Fut, - Fut: Future>, - { - use rand::{thread_rng, Rng}; - - let mut attempts = 0; - loop { - let computed_backoff = - (2_u64.pow(attempts) * backoff) + thread_rng().gen_range(50..150); - attempts += 1; - match func().await { - Ok(response) => return Ok(response), - Err(e) if attempts < n_attempts => { - log::error!("Request failed during environment init flow - {e:?}"); - let limited_backoff = std::cmp::min(upper_bound, computed_backoff); - tokio::time::sleep(tokio::time::Duration::from_millis(limited_backoff)).await; - } - Err(e) => return Err(e), - } - } - } + fn write_env_file(secrets: Vec) -> Result<(), EnvError> { + let mut file = File::create("/etc/customer-env")?; + + let env_string = secrets + .iter() + .map(|env| format!("export {}={} ", env.name, env.secret)) + .collect::>() + .join(""); - pub async fn initialize_environment_variables(self) -> crate::error::Result<()> { - use crate::EnclaveContext; + file.write_all(env_string.as_bytes())?; + Ok(()) + } + async fn get_env( + loader: &EnvironmentLoader, + ) -> Result { let half_min = 1_000 * 30; - log::info!("Initializing env without TLS termination, sending request to control plane for cert provisioner token."); - let token = Self::with_retries(500, 10, half_min, || async { - self.config_client + let cert_token = with_retries(500, 10, half_min, || async { + loader + .config_client .get_cert_token() .await .map_err(crate::error::Error::from) }) - .await? - .token(); + .await?; - let secrets_response = Self::with_retries(500, 10, half_min, || async { - self.cert_provisioner_client + let token = cert_token.token(); + let secrets_response = with_retries(500, 10, half_min, || async { + loader + .cert_provisioner_client .get_secrets(token.clone()) .await .map_err(crate::error::Error::from) }) .await?; - EnclaveContext::set(secrets_response.context.clone().into()); + Ok(secrets_response) + } +} - self.init(secrets_response.clone().secrets).await?; +/// Phase for marking the environment as ready and unblocking the customer process' start up script. +pub struct Finalize; +impl EnvLoadingPhaseMarker for Finalize {} - Ok(()) +pub struct EnvironmentLoader

{ + phase: PhantomData

, + cert_provisioner_client: CertProvisionerClient, + config_client: ConfigClient, + e3_client: E3Client, +} + +#[cfg(feature = "tls_termination")] +mod tls_enabled { + use super::*; + use openssl::{ + pkey::{PKey, Private}, + x509::X509, + }; + + /// Phase for loading the intermediate CA details from the provisioner + pub struct NeedCert; + impl EnvLoadingPhaseMarker for NeedCert {} + + impl EnvironmentLoader { + /// Load the environment variables from the provisioner and transition to the next appropriate loading state - `NeedCert` + pub async fn load_env_vars(self) -> Result, Error> { + let secrets_response = NeedEnv::get_env(&self).await?; + + EnclaveContext::set(secrets_response.context.clone().into()); + + let customer_env = NeedEnv::decrypt_secrets(&self, &secrets_response.secrets).await?; + + NeedEnv::write_env_file(customer_env)?; + + Ok(EnvironmentLoader { + phase: PhantomData, + cert_provisioner_client: self.cert_provisioner_client, + config_client: self.config_client, + e3_client: self.e3_client, + }) + } } - fn write_env_file(self, secrets: Vec) -> Result<(), EnvError> { - let mut file = File::create("/etc/customer-env")?; + impl EnvironmentLoader { + /// Load the intermediate CA details from the provisioner, returning them alongside the loader in the `Finalize` state. + pub async fn load_cert( + self, + ) -> Result<(EnvironmentLoader, X509, PKey), Error> { + let half_min = 1_000 * 30; + let cert_token = with_retries(100, 8, half_min, || async { + self.config_client.get_cert_token().await + }) + .await?; - let env_string = secrets - .iter() - .map(|env| format!("export {}={} ", env.name, env.secret)) - .collect::>() - .join(""); + let token = cert_token.token(); + let cert_response = with_retries(100, 8, half_min, || async { + self.cert_provisioner_client + .get_cert(token.clone()) + .await + .map_err(|err| Error::CertServer(err.to_string())) + }) + .await?; - file.write_all(env_string.as_bytes())?; - Ok(()) + let inter_ca_cert = parse_cert(cert_response.cert())?; + let inter_ca_key_pair = parse_key(cert_response.key_pair())?; + Ok(( + EnvironmentLoader { + phase: PhantomData, + cert_provisioner_client: self.cert_provisioner_client, + config_client: self.config_client, + e3_client: self.e3_client, + }, + inter_ca_cert, + inter_ca_key_pair, + )) + } } - pub fn write_startup_complete_env_vars() -> Result<(), EnvError> { - let mut file = OpenOptions::new().append(true).open("/etc/customer-env")?; + fn parse_cert(raw_cert: String) -> Result { + let decoded_cert = + base64::decode(raw_cert).map_err(|err| Error::Crypto(err.to_string()))?; + X509::from_pem(&decoded_cert).map_err(|err| Error::Crypto(err.to_string())) + } + + fn parse_key(raw_key: String) -> Result, Error> { + let decoded_key = base64::decode(raw_key).map_err(|err| Error::Crypto(err.to_string()))?; + PKey::private_key_from_pem(&decoded_key).map_err(|err| Error::Crypto(err.to_string())) + } +} +#[cfg(feature = "tls_termination")] +pub use tls_enabled::*; + +#[cfg(not(feature = "tls_termination"))] +mod tls_disabled { + use super::*; + + impl EnvironmentLoader { + /// Load the environment variables from the provisioner and transition to the next appropriate loading state - `Finalize` + pub async fn load_env_vars(self) -> Result, Error> { + let secrets_response = NeedEnv::get_env(&self).await?; + + EnclaveContext::set(secrets_response.context.clone().into()); + + let customer_env = NeedEnv::decrypt_secrets(&self, &secrets_response.secrets).await?; - write!(file, "export EV_INITIALIZED=true")?; + NeedEnv::write_env_file(customer_env)?; + Ok(EnvironmentLoader { + phase: PhantomData, + cert_provisioner_client: self.cert_provisioner_client, + config_client: self.config_client, + e3_client: self.e3_client, + }) + } + } +} +#[cfg(not(feature = "tls_termination"))] +pub use tls_disabled::*; + +impl EnvironmentLoader { + pub fn finalize_env(self) -> Result<(), Error> { + write_startup_complete_env_vars()?; Ok(()) } } +/// Get an environment variable loader in the default state - `NeedEnv` +/// This is the only way an EnvironmentLoader should be built. This constraint is enforced by leaving the attributes private. +pub fn init_environment_loader() -> EnvironmentLoader { + EnvironmentLoader { + phase: PhantomData, + cert_provisioner_client: Default::default(), + config_client: Default::default(), + e3_client: Default::default(), + } +} + +async fn with_retries( + backoff: u64, + n_attempts: u32, + upper_bound: u64, + func: F, +) -> Result +where + F: Fn() -> Fut, + Fut: Future>, +{ + use rand::{thread_rng, Rng}; + + let mut attempts = 0; + loop { + let computed_backoff = (2_u64.pow(attempts) * backoff) + thread_rng().gen_range(50..150); + attempts += 1; + match func().await { + Ok(response) => return Ok(response), + Err(e) if attempts < n_attempts => { + log::error!("Request failed during environment init flow - {e:?}"); + let limited_backoff = std::cmp::min(upper_bound, computed_backoff); + tokio::time::sleep(tokio::time::Duration::from_millis(limited_backoff)).await; + } + Err(e) => return Err(e), + } + } +} + +pub fn write_startup_complete_env_vars() -> Result<(), Error> { + let mut file = OpenOptions::new().append(true).open("/etc/customer-env")?; + + write!(file, "export EV_INITIALIZED=true")?; + + Ok(()) +} + #[cfg(test)] mod test { #[tokio::test] - #[cfg(not(feature = "tls_termination"))] async fn with_retries_redrives_requests_as_expected() { let responses = vec![ Ok(()), @@ -177,14 +297,13 @@ mod test { value }; - let result = super::Environment::with_retries(100, 3, 1_000, fallable_func).await; + let result = super::with_retries(100, 3, 1_000, fallable_func).await; assert!(result.is_ok()); let responses_lock = ctr_clone.lock().unwrap(); assert!(responses_lock.is_empty()); } #[tokio::test] - #[cfg(not(feature = "tls_termination"))] async fn with_retries_redrives_requests_and_bubbles_errors() { let responses = vec![ Ok(()), @@ -201,7 +320,7 @@ mod test { value }; - let result = super::Environment::with_retries(100, 3, 1_000, fallable_func).await; + let result = super::with_retries(100, 3, 1_000, fallable_func).await; assert!(result.is_err()); let responses_lock = ctr_clone.lock().unwrap(); assert_eq!(responses_lock.len(), 1); diff --git a/data-plane/src/health/mod.rs b/data-plane/src/health/mod.rs index b9386824..3ca8de7a 100644 --- a/data-plane/src/health/mod.rs +++ b/data-plane/src/health/mod.rs @@ -12,13 +12,6 @@ use shared::server::TcpServer; #[cfg(feature = "enclave")] use shared::server::VsockServer; use shared::server::CID::Enclave; -use shared::server::get_vsock_server; -use shared::server::health::{DataPlaneDiagnostic, DataPlaneState, UserProcessHealth}; -#[cfg(not(feature = "enclave"))] -use shared::server::TcpServer; -#[cfg(feature = "enclave")] -use shared::server::VsockServer; -use shared::server::CID::Enclave; use shared::{server::Listener, ENCLAVE_HEALTH_CHECK_PORT}; use tokio::sync::mpsc::{Sender, UnboundedSender}; diff --git a/data-plane/src/main.rs b/data-plane/src/main.rs index c63687b4..09b6eb56 100644 --- a/data-plane/src/main.rs +++ b/data-plane/src/main.rs @@ -1,7 +1,7 @@ #[cfg(feature = "network_egress")] use data_plane::dns::{egressproxy::EgressProxy, enclavedns::EnclaveDnsProxy}; use data_plane::{ - crypto::api::CryptoApi, env::client::{EnvironmentLoader, init_environment_loader}, health::build_health_check_server, + crypto::api::CryptoApi, env::{EnvironmentLoader, init_environment_loader}, health::build_health_check_server, stats::StatsProxy, stats_client::StatsClient, time::ClockSync, FeatureContext, }; #[cfg(not(feature = "tls_termination"))] @@ -139,7 +139,7 @@ async fn start(data_plane_port: u16, shutdown_notifier: Sender) { } #[cfg(feature = "tls_termination")] -use data_plane::env::client::NeedCert; +use data_plane::env::NeedCert; #[cfg(feature = "tls_termination")] async fn start_data_plane( data_plane_port: u16, @@ -158,7 +158,7 @@ async fn start_data_plane( } #[cfg(not(feature = "tls_termination"))] -use data_plane::env::client::Finalize; +use data_plane::env::Finalize; #[cfg(not(feature = "tls_termination"))] use shared::{server::proxy_protocol::ProxiedConnection, utils::pipe_streams}; #[cfg(not(feature = "tls_termination"))] diff --git a/data-plane/src/server/server.rs b/data-plane/src/server/server.rs index cda438f4..59e00bec 100644 --- a/data-plane/src/server/server.rs +++ b/data-plane/src/server/server.rs @@ -4,7 +4,7 @@ use super::http::{request_to_bytes, response_to_bytes}; use super::tls::TlsServerBuilder; use crate::e3client::E3Client; -use crate::env::client::{EnvironmentLoader, NeedCert}; +use crate::env::{EnvironmentLoader, NeedCert}; use crate::server::http::{build_internal_error_response, parse}; use crate::{EnclaveContext, FeatureContext}; diff --git a/data-plane/src/server/tls/tls_server.rs b/data-plane/src/server/tls/tls_server.rs index ab25d07e..a5f8f690 100644 --- a/data-plane/src/server/tls/tls_server.rs +++ b/data-plane/src/server/tls/tls_server.rs @@ -5,14 +5,9 @@ use once_cell::sync::OnceCell; #[cfg(feature = "enclave")] use tokio_rustls::rustls::sign::CertifiedKey; -use openssl::pkey::PKey; -use openssl::pkey::Private; -use openssl::x509::X509; use shared::server::proxy_protocol::ProxiedConnection; use shared::server::Listener; use std::sync::Arc; -use std::thread; -use std::time::Duration; use tokio_rustls::rustls::server::WantsServerCert; use tokio_rustls::rustls::ConfigBuilder; use tokio_rustls::rustls::ServerConfig; @@ -24,8 +19,7 @@ use tokio_rustls::TlsAcceptor; #[cfg(feature = "enclave")] use crate::acme; -use crate::env::client::EnvironmentLoader; -use crate::env::client::NeedCert; +use crate::env::{EnvironmentLoader, NeedCert}; use crate::server::error::ServerResult; use crate::server::error::TlsError; use crate::server::tls::cert_resolver::AttestableCertResolver;