Skip to content

Commit

Permalink
Move shutdown watchdog to an inner scope, implement for subspace-boot…
Browse files Browse the repository at this point in the history
…strap-node
  • Loading branch information
teor2345 committed Oct 29, 2024
1 parent 2eca57d commit b64b252
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::commands::cluster::cache::{cache, CacheArgs};
use crate::commands::cluster::controller::{controller, ControllerArgs};
use crate::commands::cluster::farmer::{farmer, FarmerArgs};
use crate::commands::cluster::plotter::{plotter, PlotterArgs};
use crate::utils::shutdown_signal;
use crate::utils::{shutdown_signal, spawn_shutdown_watchdog};
use anyhow::anyhow;
use async_nats::ServerAddr;
use backoff::ExponentialBackoff;
Expand All @@ -22,6 +22,7 @@ use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::utils::AsyncJoinOnDrop;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_proof_of_space::Table;
use tokio::runtime::Handle;

/// Arguments for cluster
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -87,6 +88,13 @@ where
PosTable: Table,
{
let signal = shutdown_signal();
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
// TODO: make sure this runs before anything else is dropped, because drops can hang.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};

let ClusterArgs {
shared_args,
Expand Down
10 changes: 9 additions & 1 deletion crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::commands::shared::network::{configure_network, NetworkArgs};
use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority};
use crate::utils::shutdown_signal;
use crate::utils::{shutdown_signal, spawn_shutdown_watchdog};
use anyhow::anyhow;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use backoff::ExponentialBackoff;
Expand Down Expand Up @@ -54,6 +54,7 @@ use subspace_kzg::Kzg;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::utils::piece_provider::PieceProvider;
use subspace_proof_of_space::Table;
use tokio::runtime::Handle;
use tokio::sync::{Barrier, Semaphore};
use tracing::{error, info, info_span, warn, Instrument};

Expand Down Expand Up @@ -303,6 +304,13 @@ where
PosTable: Table,
{
let signal = shutdown_signal();
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
// TODO: make sure this runs before anything else is dropped, because drops can hang.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};

let FarmingArgs {
node_rpc_url,
Expand Down
11 changes: 0 additions & 11 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
mod commands;
mod utils;

use crate::utils::spawn_shutdown_watchdog;
use clap::Parser;
use std::path::PathBuf;
use std::process::exit;
use std::{fs, panic};
use subspace_farmer::single_disk_farm::{ScrubTarget, SingleDiskFarm};
use subspace_proof_of_space::chia::ChiaTable;
use tokio::runtime::Handle;
use tracing::info;
use tracing_subscriber::filter::LevelFilter;
use tracing_subscriber::prelude::*;
Expand Down Expand Up @@ -108,18 +106,9 @@ async fn main() -> anyhow::Result<()> {

match command {
Command::Farm(farming_args) => {
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};
commands::farm::farm::<PosTable>(farming_args).await?;
}
Command::Cluster(cluster_args) => {
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};
commands::cluster::cluster::<PosTable>(cluster_args).await?;
}
Command::Benchmark(benchmark_args) => {
Expand Down
10 changes: 9 additions & 1 deletion crates/subspace-gateway/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod rpc;

use crate::commands::run::network::{configure_network, NetworkArgs};
use crate::commands::run::rpc::{launch_rpc_server, RpcOptions, RPC_DEFAULT_PORT};
use crate::commands::shutdown_signal;
use crate::commands::{shutdown_signal, spawn_shutdown_watchdog};
use crate::piece_getter::DsnPieceGetter;
use crate::piece_validator::SegmentCommitmentPieceValidator;
use anyhow::anyhow;
Expand All @@ -20,6 +20,7 @@ use subspace_data_retrieval::object_fetcher::ObjectFetcher;
use subspace_erasure_coding::ErasureCoding;
use subspace_gateway_rpc::{SubspaceGatewayRpc, SubspaceGatewayRpcConfig};
use subspace_kzg::Kzg;
use tokio::runtime::Handle;
use tracing::info;

/// The default size limit, based on the maximum block size in some domains.
Expand Down Expand Up @@ -58,6 +59,13 @@ pub(crate) struct GatewayOptions {
/// Default run command for gateway
pub async fn run(run_options: RunOptions) -> anyhow::Result<()> {
let signal = shutdown_signal();
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
// TODO: make sure this runs before anything else is dropped, because drops can hang.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};

let RunOptions {
gateway:
Expand Down
11 changes: 1 addition & 10 deletions crates/subspace-gateway/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ mod node_client;
mod piece_getter;
mod piece_validator;

use crate::commands::{
init_logger, raise_fd_limit, set_exit_on_panic, spawn_shutdown_watchdog, Command,
};
use crate::commands::{init_logger, raise_fd_limit, set_exit_on_panic, Command};
use clap::Parser;
use tokio::runtime::Handle;

#[global_allocator]
static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
Expand All @@ -24,12 +21,6 @@ async fn main() -> anyhow::Result<()> {

match command {
Command::Run(run_options) => {
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};
commands::run::run(run_options).await?;
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,6 @@ features = [
rand = "0.8.5"
# TODO: Replace with upstream once https://github.com/libp2p/rust-libp2p/issues/5626 is resolved
libp2p-swarm-test = { version = "0.4.0", git = "https://github.com/autonomys/rust-libp2p", rev = "ae7527453146df24aff6afed5f5b9efdffbc15b8" }

[lints.rust]
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(tokio_unstable)', 'cfg(tokio_taskdump)'] }
126 changes: 122 additions & 4 deletions crates/subspace-networking/src/bin/subspace-bootstrap-node/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,30 @@ use serde::{Deserialize, Serialize};
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::panic;
use std::process::exit;
use std::sync::Arc;
use std::time::Duration;
use std::{panic, process, thread};
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::{peer_id, Config, KademliaMode};
use tracing::{debug, info, Level};
use tokio::runtime::{Handle, Runtime};
use tokio::signal;
use tracing::{debug, error, info, Level};
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;

/// Size of the LRU cache for peers.
pub const KNOWN_PEERS_CACHE_SIZE: u32 = 10000;

/// The amount of time we wait for tasks to finish when shutting down.
pub const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60);

/// When shutting down, the amount of extra time we wait for async task dumps to complete, or the
/// user to trace the process, before exiting.
pub const TRACE_TIMEOUT: Duration = Duration::from_secs(15);

#[derive(Debug, Parser)]
#[clap(about, version)]
enum Command {
Expand Down Expand Up @@ -128,6 +138,109 @@ fn init_logging() {
builder.init()
}

#[cfg(unix)]
pub(crate) async fn shutdown_signal() {
use futures::FutureExt;
use std::pin::pin;

futures::future::select(
pin!(signal::unix::signal(signal::unix::SignalKind::interrupt())
.expect("Setting signal handlers must never fail")
.recv()
.map(|_| {
tracing::info!("Received SIGINT, shutting down gateway...");
}),),
pin!(signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("Setting signal handlers must never fail")
.recv()
.map(|_| {
tracing::info!("Received SIGTERM, shutting down gateway...");
}),),
)
.await;
}

#[cfg(not(unix))]
pub(crate) async fn shutdown_signal() {
signal::ctrl_c()
.await
.expect("Setting signal handlers must never fail");

tracing::info!("Received Ctrl+C, shutting down gateway...");
}

/// Spawns a thread which forces a shutdown after [`SHUTDOWN_TIMEOUT`], if an async task is
/// blocking. If a second Ctrl-C is received, the thread will force a shut down immediately.
///
/// If compiled with `--cfg tokio_unstable,tokio_taskdump`, logs backtraces of the async tasks
/// blocking shutdown on `runtime_handle`.
///
/// When `tokio::main()` returns, the runtime will be dropped. A dropped runtime can wait forever for
/// all async tasks to reach an await point, or all blocking tasks to finish. If the runtime is
/// dropped before the timeout, the underlying `main()` function will return, and the `exit()` in
/// this spawned thread will never be called.
#[cfg_attr(
not(all(tokio_unstable, tokio_taskdump)),
expect(unused_variables, reason = "handle only used in some configs")
)]
pub fn spawn_shutdown_watchdog(runtime_handle: Handle) {
// TODO: replace tokio::main with runtime::Builder, and call Runtime::shutdown_timeout()
// instead of sleep() and exit()

thread::spawn(move || {
// Shut down immediately if we get a second Ctrl-C.
//
// A tokio runtime that's shutting down will cancel pending futures, so we need to
// wait for ctrl_c() on a separate runtime.
thread::spawn(|| {
debug!("waiting for a second shutdown signal");
Runtime::new()
.expect("creating a runtime to wait for shutdown signal failed")
.block_on(async {
let _ = shutdown_signal().await;
info!("second shutdown signal received, shutting down immediately");
exit(1);
});
});

debug!(?SHUTDOWN_TIMEOUT, "waiting for tokio runtime to shut down");
thread::sleep(SHUTDOWN_TIMEOUT);

// Force a shutdown if a task is blocking.
error!(?SHUTDOWN_TIMEOUT, "shutdown timed out, forcing an exit");
info!(
"run `flamegraph --pid {}` or similar to generate a stack dump",
process::id()
);

// Log all the async tasks and spawn_blocking() tasks that are still running.
//
// A tokio runtime that's shutting down will cancel a dump at its first await
// point, so we need to call dump() on a separate runtime.
#[cfg(all(tokio_unstable, tokio_taskdump))]
thread::spawn(move || {
use tracing::warn;

error!(
?SHUTDOWN_TIMEOUT,
"shutdown timed out, trying to dump blocking tasks"
);
Runtime::new()
.expect("creating a runtime to dump blocking tasks failed")
.block_on(async move {
for (task_number, task) in handle.dump().await.tasks().iter().enumerate() {
let trace = task.trace();
warn!(task_number, trace, "blocking task backtrace");
}
});
});

// Give the log messages time to flush, and any dumps time to finish.
thread::sleep(TRACE_TIMEOUT);
exit(1);
});
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
set_exit_on_panic();
Expand Down Expand Up @@ -183,6 +296,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
dsn_metrics_registry,
)
};

// These tasks can hang on shutdown or when dropped. But there are no error returns
// here, so the only way we exit is when a task finishes. That means we can just launch
// the shutdown watchdog at the end of the block.
let (node, mut node_runner) =
subspace_networking::construct(config).expect("Networking stack creation failed.");

Expand Down Expand Up @@ -210,12 +327,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
.transpose()?;
if let Some(prometheus_task) = prometheus_task {
select! {
_ = node_runner.run().fuse() => {},
_ = prometheus_task.fuse() => {},
_ = node_runner.run().fuse() => {},
_ = prometheus_task.fuse() => {},
}
} else {
node_runner.run().await
}
spawn_shutdown_watchdog(Handle::current());
}
Command::GenerateKeypair { json } => {
let output = KeypairOutput::new(Keypair::generate());
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-node/src/commands/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub async fn run(run_options: RunOptions) -> Result<(), Error> {
// The async runtime can wait forever for tasks to yield or finish.
// This watchdog runs on shutdown, and makes sure the process exits within a timeout,
// or when the user sends a second Ctrl-C.
// TODO: make sure this runs before anything else is dropped, because drops can hang.
scopeguard::defer! {
spawn_shutdown_watchdog(Handle::current());
};
Expand Down

0 comments on commit b64b252

Please sign in to comment.