Skip to content

Commit

Permalink
add notify_shutdown to cp services
Browse files Browse the repository at this point in the history
  • Loading branch information
hanneary committed Dec 18, 2024
1 parent 533b292 commit bb5e4c9
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 27 deletions.
84 changes: 64 additions & 20 deletions control-plane/src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::error::ServerError;
use axum::http::HeaderValue;
use hyper::{Body, Request, Response};
use serde::{Deserialize, Serialize};
use shared::notify_shutdown::Service;
use shared::server::{
error::ServerResult,
health::{ControlPlaneState, DataPlaneState, HealthCheck, HealthCheckLog, HealthCheckVersion},
Expand All @@ -12,6 +13,7 @@ use shared::server::{
use shared::ENCLAVE_HEALTH_CHECK_PORT;
use std::net::SocketAddr;
use std::sync::OnceLock;
use tokio::sync::mpsc::Receiver;

pub static IS_DRAINING: OnceLock<bool> = OnceLock::new();

Expand All @@ -26,6 +28,7 @@ struct CombinedHealthCheckLog {

pub async fn run_ecs_health_check_service(
is_draining: bool,
control_plane_state: ControlPlaneState,
) -> std::result::Result<Response<Body>, ServerError> {
if is_draining {
let combined_log = CombinedHealthCheckLog {
Expand All @@ -44,15 +47,15 @@ pub async fn run_ecs_health_check_service(
.body(Body::from(combined_log_json))?);
};

let control_plane = ControlPlaneState::Ok;
let data_plane = health_check_data_plane().await.unwrap_or_else(|e| {
DataPlaneState::Error(format!("Failed to contact data-plane for healthcheck: {e}")).into()
});

let status_to_return = std::cmp::max(control_plane.status_code(), data_plane.status_code());
let status_to_return =
std::cmp::max(control_plane_state.status_code(), data_plane.status_code());

let combined_log = CombinedHealthCheckLog {
control_plane,
control_plane: control_plane_state,
data_plane,
};
let combined_log_json = serde_json::to_string(&combined_log).unwrap();
Expand Down Expand Up @@ -97,10 +100,20 @@ async fn health_check_data_plane() -> Result<HealthCheckVersion, ServerError> {
Ok(hc)
}

pub struct HealthCheckServer;
pub struct HealthCheckServer {
shutdown_receiver: Receiver<Service>,
exited_services: Vec<Service>,
}

impl HealthCheckServer {
pub async fn start() -> ServerResult<()> {
pub fn new(shutdown_receiver: Receiver<Service>) -> Self {
Self {
shutdown_receiver,
exited_services: Vec::new(),
}
}

pub async fn start(&mut self) -> ServerResult<()> {
log::info!(
"Control plane health-check server running on port {CONTROL_PLANE_HEALTH_CHECK_PORT}"
);
Expand All @@ -113,20 +126,43 @@ impl HealthCheckServer {

loop {
let stream = tcp_server.accept().await?;
let service = hyper::service::service_fn(move |request: Request<Body>| async move {
match request
.headers()
.get("User-Agent")
.map(|value| value.as_bytes())
{
Some(b"ECS-HealthCheck") => {
let is_draining = IS_DRAINING.get().is_some();
run_ecs_health_check_service(is_draining).await

let cp_state = if let Ok(exited_service) = self.shutdown_receiver.try_recv() {
self.exited_services.push(exited_service);
ControlPlaneState::Error(format!(
"Critical in-Enclave services have exited: {}",
self.serialize_exited_services()
))
} else if !self.exited_services.is_empty() {
ControlPlaneState::Error(format!(
"Critical in-Enclave services have exited: {}",
self.serialize_exited_services()
))
} else {
ControlPlaneState::Ok
};

let service = hyper::service::service_fn({
let cp_state = cp_state.clone();
move |request: Request<Body>| {
let cp_state = cp_state.clone();
async move {
match request
.headers()
.get("User-Agent")
.map(|value| value.as_bytes())
{
Some(b"ECS-HealthCheck") => {
let cp_state = cp_state.clone();
let is_draining = IS_DRAINING.get().is_some();
run_ecs_health_check_service(is_draining, cp_state).await
}
_ => Response::builder()
.status(400)
.body(Body::from("Unsupported health check type!"))
.map_err(ServerError::from),
}
}
_ => Response::builder()
.status(400)
.body(Body::from("Unsupported health check type!"))
.map_err(ServerError::from),
}
});
if let Err(error) = hyper::server::conn::Http::new()
Expand All @@ -138,6 +174,14 @@ impl HealthCheckServer {
}
}
}

fn serialize_exited_services(&self) -> String {
self.exited_services
.iter()
.map(|service| service.to_string())
.collect::<Vec<_>>()
.join(", ")
}
}

#[cfg(test)]
Expand All @@ -153,7 +197,7 @@ mod health_check_tests {
#[tokio::test]
async fn test_enclave_health_check_service() {
// the data-plane status should error, as its not running
let response = run_ecs_health_check_service(false).await.unwrap();
let response = run_ecs_health_check_service(false, ControlPlaneState::Ok).await.unwrap();
assert_eq!(response.status(), 500);
println!("deep response: {response:?}");
let health_check_log = response_to_health_check_log(response).await;
Expand All @@ -170,7 +214,7 @@ mod health_check_tests {
async fn test_enclave_health_check_service_with_draining_set_to_true() {
// the data-plane status should error, as its not running
IS_DRAINING.set(true).unwrap();
let response = run_ecs_health_check_service(true).await.unwrap();
let response = run_ecs_health_check_service(true, ControlPlaneState::Ok).await.unwrap();
assert_eq!(response.status(), 500);
println!("deep response: {response:?}");
let health_check_log = response_to_health_check_log(response).await;
Expand Down
41 changes: 35 additions & 6 deletions control-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use control_plane::orchestration::Orchestration;
use control_plane::stats_client::StatsClient;
use control_plane::stats_proxy::StatsProxy;
use control_plane::{config_server, tls_proxy};
use shared::notify_shutdown::{NotifyShutdown, Service};
use shared::{print_version, utils::pipe_streams, ENCLAVE_CONNECT_PORT};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use storage_client_interface::s3;
Expand All @@ -13,6 +14,7 @@ use tokio::time::{sleep, Duration};

use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio::sync::mpsc::channel;

use control_plane::{
configuration::{self, Environment},
Expand Down Expand Up @@ -63,17 +65,37 @@ async fn main() -> Result<()> {

let config_server = config_server::ConfigServer::new(cert_provisioner_client, acme_s3_client);

let (shutdown_sender, shutdown_receiver) = channel(1);

let mut health_check_server = HealthCheckServer::new(shutdown_receiver);

listen_for_shutdown_signal();

tokio::spawn(e3_proxy.listen());
tokio::spawn(
e3_proxy
.listen()
.notify_shutdown(Service::E3Proxy, shutdown_sender.clone()),
);

tokio::spawn(StatsProxy::listen());

tokio::spawn(HealthCheckServer::start());
tokio::spawn(async move {
if let Err(e) = health_check_server.start().await {
log::error!("Error starting health check server - {e:?}");
}
});

tokio::spawn(provisioner_proxy.listen());
tokio::spawn(
provisioner_proxy
.listen()
.notify_shutdown(Service::ProvisionerProxy, shutdown_sender.clone()),
);

tokio::spawn(acme_proxy.listen());
tokio::spawn(
acme_proxy
.listen()
.notify_shutdown(Service::AcmeProxy, shutdown_sender.clone()),
);

tokio::spawn(async move {
if let Err(e) = config_server.listen().await {
Expand All @@ -88,8 +110,15 @@ async fn main() -> Result<()> {

let dns_proxy_server = control_plane::dnsproxy::DnsProxy::new(parsed_ip);

tokio::spawn(dns_proxy_server.listen());
tokio::spawn(control_plane::egressproxy::EgressProxy::listen());
tokio::spawn(
dns_proxy_server
.listen()
.notify_shutdown(Service::DnsProxy, shutdown_sender.clone()),
);
tokio::spawn(
control_plane::egressproxy::EgressProxy::listen()
.notify_shutdown(Service::EgressProxy, shutdown_sender.clone()),
);
}

tokio::spawn(Orchestration::start_enclave());
Expand Down
6 changes: 6 additions & 0 deletions shared/src/notify_shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ pub enum Service {
ClockSync,
DnsProxy,
EgressProxy,
E3Proxy,
ProvisionerProxy,
AcmeProxy,
}

impl std::fmt::Display for Service {
Expand All @@ -20,6 +23,9 @@ impl std::fmt::Display for Service {
Self::ClockSync => "clock-sync",
Self::DnsProxy => "dns-proxy",
Self::EgressProxy => "egress-proxy",
Self::E3Proxy => "e3-proxy",
Self::ProvisionerProxy => "provisioner-proxy",
Self::AcmeProxy => "acme-proxy",
};
f.write_str(service_label)
}
Expand Down
4 changes: 3 additions & 1 deletion shared/src/server/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,17 +76,19 @@ pub trait HealthCheck {
fn status_code(&self) -> u16;
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum ControlPlaneState {
Draining,
Ok,
Error(String),
}

impl HealthCheck for ControlPlaneState {
fn status_code(&self) -> u16 {
match self {
ControlPlaneState::Ok => 200,
ControlPlaneState::Draining => 500,
ControlPlaneState::Error(_) => 500,
}
}
}
Expand Down

0 comments on commit bb5e4c9

Please sign in to comment.