Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Notify the service state through sd_notify #3903

Merged
merged 6 commits into from
Jan 29, 2025
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ console = "0.15.10"
dialoguer = { version = "0.11.0", features = ["fuzzy-select"] }
dotenvy = "0.15.7"
figment.workspace = true
futures-util.workspace = true
http-body-util.workspace = true
hyper.workspace = true
ipnetwork = "0.20.0"
@@ -32,6 +33,7 @@ rand.workspace = true
rand_chacha = "0.3.1"
reqwest.workspace = true
rustls.workspace = true
sd-notify = "0.4.5"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be gated behind a flag, or is it cross-platform enough not to be a problem?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does use stuff from std::os::unix, which is 'Unix-only'. We don't claim to support Windows, so I'm happy keeping it like that

serde_json.workspace = true
serde_yaml = "0.9.34"
sqlx.workspace = true
14 changes: 7 additions & 7 deletions crates/cli/src/commands/server.rs
Original file line number Diff line number Diff line change
@@ -24,11 +24,10 @@ use tracing::{info, info_span, warn, Instrument};

use crate::{
app_state::AppState,
shutdown::ShutdownManager,
lifecycle::LifecycleManager,
util::{
database_pool_from_config, mailer_from_config, password_manager_from_config,
policy_factory_from_config, register_sighup, site_config_from_config,
templates_from_config,
policy_factory_from_config, site_config_from_config, templates_from_config,
},
};

@@ -57,7 +56,7 @@ impl Options {
#[allow(clippy::too_many_lines)]
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
let span = info_span!("cli.run.init").entered();
let shutdown = ShutdownManager::new()?;
let mut shutdown = LifecycleManager::new()?;
let config = AppConfig::extract(figment)?;

info!(version = crate::VERSION, "Starting up");
@@ -145,6 +144,7 @@ impl Options {
// Load and compile the templates
let templates =
templates_from_config(&config.templates, &site_config, &url_builder).await?;
shutdown.register_reloadable(&templates);

let http_client = mas_http::reqwest_client();

@@ -186,6 +186,9 @@ impl Options {
shutdown.task_tracker(),
shutdown.soft_shutdown_token(),
);

shutdown.register_reloadable(&activity_tracker);

let trusted_proxies = config.http.trusted_proxies.clone();

// Build a rate limiter.
@@ -197,9 +200,6 @@ impl Options {
// Explicitly the config to properly zeroize secret keys
drop(config);

// Listen for SIGHUP
register_sighup(&templates, &activity_tracker)?;

limiter.start();

let graphql_schema = mas_handlers::graphql_schema(
4 changes: 2 additions & 2 deletions crates/cli/src/commands/worker.rs
Original file line number Diff line number Diff line change
@@ -14,7 +14,7 @@ use mas_router::UrlBuilder;
use tracing::{info, info_span};

use crate::{
shutdown::ShutdownManager,
lifecycle::LifecycleManager,
util::{
database_pool_from_config, mailer_from_config, site_config_from_config,
templates_from_config,
@@ -26,7 +26,7 @@ pub(super) struct Options {}

impl Options {
pub async fn run(self, figment: &Figment) -> anyhow::Result<ExitCode> {
let shutdown = ShutdownManager::new()?;
let shutdown = LifecycleManager::new()?;
let span = info_span!("cli.worker.init").entered();
let config = AppConfig::extract(figment)?;

223 changes: 223 additions & 0 deletions crates/cli/src/lifecycle.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright 2024, 2025 New Vector Ltd.
//
// SPDX-License-Identifier: AGPL-3.0-only
// Please see LICENSE in the repository root for full details.

use std::{future::Future, process::ExitCode, time::Duration};

use futures_util::future::BoxFuture;
use mas_handlers::ActivityTracker;
use mas_templates::Templates;
use tokio::signal::unix::{Signal, SignalKind};
use tokio_util::{sync::CancellationToken, task::TaskTracker};

/// A helper to manage the lifecycle of the service, inclusing handling graceful
/// shutdowns and configuration reloads.
///
/// It will listen for SIGTERM and SIGINT signals, and will trigger a soft
/// shutdown on the first signal, and a hard shutdown on the second signal or
/// after a timeout.
///
/// Users of this manager should use the `soft_shutdown_token` to react to a
/// soft shutdown, which should gracefully finish requests and close
/// connections, and the `hard_shutdown_token` to react to a hard shutdown,
/// which should drop all connections and finish all requests.
///
/// They should also use the `task_tracker` to make it track things running, so
/// that it knows when the soft shutdown is over and worked.
///
/// It also integrates with [`sd_notify`] to notify the service manager of the
/// state of the service.
pub struct LifecycleManager {
hard_shutdown_token: CancellationToken,
soft_shutdown_token: CancellationToken,
task_tracker: TaskTracker,
sigterm: Signal,
sigint: Signal,
sighup: Signal,
timeout: Duration,
reload_handlers: Vec<Box<dyn Fn() -> BoxFuture<'static, ()>>>,
}

/// Represents a thing that can be reloaded with a SIGHUP
pub trait Reloadable: Clone + Send {
fn reload(&self) -> impl Future<Output = ()> + Send;
}

impl Reloadable for ActivityTracker {
async fn reload(&self) {
self.flush().await;
}
}

impl Reloadable for Templates {
async fn reload(&self) {
if let Err(err) = self.reload().await {
tracing::error!(
error = &err as &dyn std::error::Error,
"Failed to reload templates"
);
}
}
}

/// A wrapper around [`sd_notify::notify`] that logs any errors
fn notify(states: &[sd_notify::NotifyState]) {
if let Err(e) = sd_notify::notify(false, states) {
tracing::error!(
error = &e as &dyn std::error::Error,
"Failed to notify service manager"
Comment on lines +66 to +69
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this error expected if someone is not using systemd? If so, should we soften it somewhat so it doesn't sound like a major problem for them?

Maybe it would be good to elaborate in the comment what this is doing, I'm not sure most people would understand what this is for without having seen it before. (at least, I don't think knowledge about this is that common, but especially not amongst non-systemd users)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If NOTIFY_SOCKET is not set, it returns Ok(()) and is basically a no-op

);
}
}

impl LifecycleManager {
/// Create a new shutdown manager, installing the signal handlers
///
/// # Errors
///
/// Returns an error if the signal handler could not be installed
pub fn new() -> Result<Self, std::io::Error> {
let hard_shutdown_token = CancellationToken::new();
let soft_shutdown_token = hard_shutdown_token.child_token();
let sigterm = tokio::signal::unix::signal(SignalKind::terminate())?;
let sigint = tokio::signal::unix::signal(SignalKind::interrupt())?;
let sighup = tokio::signal::unix::signal(SignalKind::hangup())?;
let timeout = Duration::from_secs(60);
let task_tracker = TaskTracker::new();

notify(&[sd_notify::NotifyState::MainPid(std::process::id())]);

Ok(Self {
hard_shutdown_token,
soft_shutdown_token,
task_tracker,
sigterm,
sigint,
sighup,
timeout,
reload_handlers: Vec::new(),
})
}

/// Add a handler to be called when the server gets a SIGHUP
pub fn register_reloadable(&mut self, reloadable: &(impl Reloadable + 'static)) {
let reloadable = reloadable.clone();
self.reload_handlers.push(Box::new(move || {
let reloadable = reloadable.clone();
Box::pin(async move { reloadable.reload().await })
}));
}

/// Get a reference to the task tracker
#[must_use]
pub fn task_tracker(&self) -> &TaskTracker {
&self.task_tracker
}

/// Get a cancellation token that can be used to react to a hard shutdown
#[must_use]
pub fn hard_shutdown_token(&self) -> CancellationToken {
self.hard_shutdown_token.clone()
}

/// Get a cancellation token that can be used to react to a soft shutdown
#[must_use]
pub fn soft_shutdown_token(&self) -> CancellationToken {
self.soft_shutdown_token.clone()
}

/// Run until we finish completely shutting down.
pub async fn run(mut self) -> ExitCode {
notify(&[sd_notify::NotifyState::Ready]);

let mut watchdog_usec = 0;
let watchdog_enabled = sd_notify::watchdog_enabled(false, &mut watchdog_usec);
let mut watchdog_interval = tokio::time::interval(Duration::from_micros(watchdog_usec / 2));

// Wait for a first shutdown signal and trigger the soft shutdown
let likely_crashed = loop {
tokio::select! {
() = self.soft_shutdown_token.cancelled() => {
tracing::warn!("Another task triggered a shutdown, it likely crashed! Shutting down");
break true;
},

_ = self.sigterm.recv() => {
tracing::info!("Shutdown signal received (SIGTERM), shutting down");
break false;
},

_ = self.sigint.recv() => {
tracing::info!("Shutdown signal received (SIGINT), shutting down");
break false;
},

_ = watchdog_interval.tick(), if watchdog_enabled => {
notify(&[
sd_notify::NotifyState::Watchdog,
]);
},

_ = self.sighup.recv() => {
tracing::info!("Reload signal received (SIGHUP), reloading");

notify(&[
sd_notify::NotifyState::Reloading,
sd_notify::NotifyState::monotonic_usec_now()
.expect("Failed to read monotonic clock")
]);

// XXX: if one handler takes a long time, it will block the
// rest of the shutdown process, which is not ideal. We
// should probably have a timeout here
futures_util::future::join_all(
self.reload_handlers
.iter()
.map(|handler| handler())
).await;

notify(&[sd_notify::NotifyState::Ready]);

tracing::info!("Reloading done");
},
}
};

notify(&[sd_notify::NotifyState::Stopping]);

self.soft_shutdown_token.cancel();
self.task_tracker.close();

// Start the timeout
let timeout = tokio::time::sleep(self.timeout);
tokio::select! {
_ = self.sigterm.recv() => {
tracing::warn!("Second shutdown signal received (SIGTERM), abort");
},
_ = self.sigint.recv() => {
tracing::warn!("Second shutdown signal received (SIGINT), abort");
},
() = timeout => {
tracing::warn!("Shutdown timeout reached, abort");
},
() = self.task_tracker.wait() => {
// This is the "happy path", we have gracefully shutdown
},
}

self.hard_shutdown_token().cancel();

// TODO: we may want to have a time out on the task tracker, in case we have
// really stuck tasks on it
self.task_tracker().wait().await;

tracing::info!("All tasks are done, exitting");

if likely_crashed {
ExitCode::FAILURE
} else {
ExitCode::SUCCESS
}
}
}
2 changes: 1 addition & 1 deletion crates/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -18,8 +18,8 @@ use tracing_subscriber::{

mod app_state;
mod commands;
mod lifecycle;
mod server;
mod shutdown;
mod sync;
mod telemetry;
mod util;
130 changes: 0 additions & 130 deletions crates/cli/src/shutdown.rs

This file was deleted.

35 changes: 2 additions & 33 deletions crates/cli/src/util.rs
Original file line number Diff line number Diff line change
@@ -14,15 +14,15 @@ use mas_config::{
};
use mas_data_model::SiteConfig;
use mas_email::{MailTransport, Mailer};
use mas_handlers::{passwords::PasswordManager, ActivityTracker};
use mas_handlers::passwords::PasswordManager;
use mas_policy::PolicyFactory;
use mas_router::UrlBuilder;
use mas_templates::{SiteConfigExt, TemplateLoadingError, Templates};
use sqlx::{
postgres::{PgConnectOptions, PgPoolOptions},
ConnectOptions, PgConnection, PgPool,
};
use tracing::{error, info, log::LevelFilter};
use tracing::log::LevelFilter;

pub async fn password_manager_from_config(
config: &PasswordsConfig,
@@ -313,37 +313,6 @@ pub async fn database_connection_from_config(
.context("could not connect to the database")
}

/// Reload templates on SIGHUP
pub fn register_sighup(
templates: &Templates,
activity_tracker: &ActivityTracker,
) -> anyhow::Result<()> {
#[cfg(unix)]
{
let mut signal = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())?;
let templates = templates.clone();
let activity_tracker = activity_tracker.clone();

tokio::spawn(async move {
loop {
if signal.recv().await.is_none() {
// No more signals will be received, breaking
break;
};

info!("SIGHUP received, reloading templates & flushing activity tracker");

activity_tracker.flush().await;
templates.clone().reload().await.unwrap_or_else(|err| {
error!(?err, "Error while reloading templates");
});
}
});
}

Ok(())
}

#[cfg(test)]
mod tests {
use rand::SeedableRng;