Skip to content

Commit

Permalink
Add notify shutdown to control plane services (#261)
Browse files Browse the repository at this point in the history
* Add notify shutdown to control plane services

* refactor
  • Loading branch information
hanneary authored Dec 20, 2024
1 parent d4a079c commit b856aea
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 139 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion control-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ openssl = { version = "0.10.48", features = ["vendored"] }
base64 = "0.13.0"
storage-client-interface = "0.3.0"
log = { version = "0.4.19", features = ["max_level_debug"] }
rand_chacha = "0.3.1"

[dev-dependencies]
tokio-test = "0.4.2"
Expand All @@ -44,4 +45,4 @@ default = []
network_egress = ["shared/network_egress"]
enclave = ["dep:tokio-vsock", "shared/enclave"]
not_enclave = ["network_egress"]
release_logging = ["log/release_max_level_info"]
release_logging = ["log/release_max_level_info"]
5 changes: 3 additions & 2 deletions control-plane/src/dnsproxy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::error::{Result, ServerError};
use rand::seq::SliceRandom;
use rand::thread_rng;
use rand::SeedableRng;
use rand_chacha::ChaCha12Rng;
use shared::server::egress::check_dns_allowed_for_domain;
use shared::server::egress::{cache_ip_for_allowlist, EgressDestinations};
use shared::server::CID::Parent;
Expand Down Expand Up @@ -55,7 +56,7 @@ impl DnsProxy {
let mut server = get_vsock_server(DNS_PROXY_VSOCK_PORT, Parent).await?;

let allowed_domains = shared::server::egress::get_egress_allow_list_from_env();
let mut rng = thread_rng();
let mut rng = ChaCha12Rng::from_entropy();
loop {
let domains = allowed_domains.clone();
match server.accept().await {
Expand Down
107 changes: 77 additions & 30 deletions control-plane/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::error::ServerError;
use axum::http::HeaderValue;
use hyper::{Body, Request, Response};
use serde::{Deserialize, Serialize};
use shared::notify_shutdown::Service;
use shared::server::{
error::ServerResult,
health::{ControlPlaneState, DataPlaneState, HealthCheck, HealthCheckLog, HealthCheckVersion},
Expand All @@ -12,15 +13,12 @@ use shared::server::{
use shared::ENCLAVE_HEALTH_CHECK_PORT;
use std::net::SocketAddr;
use std::sync::OnceLock;
use tokio::sync::mpsc::Receiver;

pub static IS_DRAINING: OnceLock<bool> = OnceLock::new();

pub const CONTROL_PLANE_HEALTH_CHECK_PORT: u16 = 3032;

pub struct HealthCheckServer {
tcp_server: TcpServer,
}

#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct CombinedHealthCheckLog {
Expand All @@ -30,6 +28,7 @@ struct CombinedHealthCheckLog {

pub async fn run_ecs_health_check_service(
is_draining: bool,
control_plane_state: ControlPlaneState,
) -> std::result::Result<Response<Body>, ServerError> {
if is_draining {
let combined_log = CombinedHealthCheckLog {
Expand All @@ -48,15 +47,15 @@ pub async fn run_ecs_health_check_service(
.body(Body::from(combined_log_json))?);
};

let control_plane = ControlPlaneState::Ok;
let data_plane = health_check_data_plane().await.unwrap_or_else(|e| {
DataPlaneState::Error(format!("Failed to contact data-plane for healthcheck: {e}")).into()
});

let status_to_return = std::cmp::max(control_plane.status_code(), data_plane.status_code());
let status_to_return =
std::cmp::max(control_plane_state.status_code(), data_plane.status_code());

let combined_log = CombinedHealthCheckLog {
control_plane,
control_plane: control_plane_state,
data_plane,
};
let combined_log_json = serde_json::to_string(&combined_log).unwrap();
Expand Down Expand Up @@ -101,37 +100,56 @@ async fn health_check_data_plane() -> Result<HealthCheckVersion, ServerError> {
Ok(hc)
}

pub struct HealthCheckServer {
shutdown_receiver: Receiver<Service>,
exited_services: Vec<Service>,
}

impl HealthCheckServer {
pub async fn new() -> ServerResult<Self> {
let tcp_server = TcpServer::bind(SocketAddr::from((
[0, 0, 0, 0],
CONTROL_PLANE_HEALTH_CHECK_PORT,
)))
.await?;
Ok(HealthCheckServer { tcp_server })
pub fn new(shutdown_receiver: Receiver<Service>) -> Self {
Self {
shutdown_receiver,
exited_services: Vec::new(),
}
}

pub async fn start(&mut self) -> ServerResult<()> {
log::info!(
"Control plane health-check server running on port {CONTROL_PLANE_HEALTH_CHECK_PORT}"
);

let mut tcp_server = TcpServer::bind(SocketAddr::from((
[0, 0, 0, 0],
CONTROL_PLANE_HEALTH_CHECK_PORT,
)))
.await?;

loop {
let stream = self.tcp_server.accept().await?;
let service = hyper::service::service_fn(move |request: Request<Body>| async move {
match request
.headers()
.get("User-Agent")
.map(|value| value.as_bytes())
{
Some(b"ECS-HealthCheck") => {
let is_draining = IS_DRAINING.get().is_some();
run_ecs_health_check_service(is_draining).await
let stream = tcp_server.accept().await?;

let cp_state = self.get_control_plane_state();

let service = hyper::service::service_fn({
let cp_state = cp_state.clone();
move |request: Request<Body>| {
let cp_state = cp_state.clone();
async move {
match request
.headers()
.get("User-Agent")
.map(|value| value.as_bytes())
{
Some(b"ECS-HealthCheck") => {
let cp_state = cp_state.clone();
let is_draining = IS_DRAINING.get().is_some();
run_ecs_health_check_service(is_draining, cp_state).await
}
_ => Response::builder()
.status(400)
.body(Body::from("Unsupported health check type!"))
.map_err(ServerError::from),
}
}
_ => Response::builder()
.status(400)
.body(Body::from("Unsupported health check type!"))
.map_err(ServerError::from),
}
});
if let Err(error) = hyper::server::conn::Http::new()
Expand All @@ -143,6 +161,31 @@ impl HealthCheckServer {
}
}
}

fn get_control_plane_state(&mut self) -> ControlPlaneState {
if let Ok(exited_service) = self.shutdown_receiver.try_recv() {
self.exited_services.push(exited_service);
ControlPlaneState::Error(format!(
"Critical Control Plane services have exited: {}",
self.serialize_exited_services()
))
} else if !self.exited_services.is_empty() {
ControlPlaneState::Error(format!(
"Critical Control Plane services have exited: {}",
self.serialize_exited_services()
))
} else {
ControlPlaneState::Ok
}
}

fn serialize_exited_services(&self) -> String {
self.exited_services
.iter()
.map(|service| service.to_string())
.collect::<Vec<_>>()
.join(", ")
}
}

#[cfg(test)]
Expand All @@ -158,7 +201,9 @@ mod health_check_tests {
#[tokio::test]
async fn test_enclave_health_check_service() {
// the data-plane status should error, as its not running
let response = run_ecs_health_check_service(false).await.unwrap();
let response = run_ecs_health_check_service(false, ControlPlaneState::Ok)
.await
.unwrap();
assert_eq!(response.status(), 500);
println!("deep response: {response:?}");
let health_check_log = response_to_health_check_log(response).await;
Expand All @@ -175,7 +220,9 @@ mod health_check_tests {
async fn test_enclave_health_check_service_with_draining_set_to_true() {
// the data-plane status should error, as its not running
IS_DRAINING.set(true).unwrap();
let response = run_ecs_health_check_service(true).await.unwrap();
let response = run_ecs_health_check_service(true, ControlPlaneState::Ok)
.await
.unwrap();
assert_eq!(response.status(), 500);
println!("deep response: {response:?}");
let health_check_log = response_to_health_check_log(response).await;
Expand Down
Loading

0 comments on commit b856aea

Please sign in to comment.