Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(composer): propagate errors #1838

Merged
merged 12 commits into from
Jan 23, 2025
1 change: 1 addition & 0 deletions crates/astria-composer/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- Bump penumbra dependencies [#1740](https://github.com/astriaorg/astria/pull/1740).
- Propagate errors [#1838](https://github.com/astriaorg/astria/pull/1838).

## [1.0.0-rc.2] - 2024-10-23

Expand Down
61 changes: 43 additions & 18 deletions crates/astria-composer/src/composer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::HashMap,
net::SocketAddr,
sync::Arc,
time::Duration,
};

Expand All @@ -12,11 +13,16 @@ use astria_eyre::eyre::{
use itertools::Itertools as _;
use tokio::{
io,
join,
signal::unix::{
signal,
SignalKind,
},
sync::watch,
sync::{
watch,
Mutex,
OnceCell,
},
task::{
JoinError,
JoinHandle,
Expand Down Expand Up @@ -226,7 +232,7 @@ impl Composer {
pub async fn run_until_stopped(self) -> eyre::Result<()> {
let Self {
api_server,
mut composer_status_sender,
composer_status_sender,
executor,
executor_handle,
mut geth_collector_tasks,
Expand All @@ -239,6 +245,8 @@ impl Composer {
fee_asset,
} = self;

let mut exit_err: OnceCell<eyre::Report> = OnceCell::new();

// we need the API server to shutdown at the end, since it is used by k8s
// to report the liveness of the service
let api_server_shutdown_token = CancellationToken::new();
Expand All @@ -259,12 +267,19 @@ impl Composer {
let mut executor_task = tokio::spawn(executor.run_until_stopped());

// wait for collectors and executor to come online
wait_for_collectors(&geth_collector_statuses, &mut composer_status_sender)
.await
.wrap_err("geth collectors failed to become ready")?;
wait_for_executor(executor_status, &mut composer_status_sender)
.await
.wrap_err("executor failed to become ready")?;
let composer_status_sender = Arc::new(Mutex::new(composer_status_sender));
ethanoroshiba marked this conversation as resolved.
Show resolved Hide resolved
let collectors_startup_fut =
wait_for_collectors(&geth_collector_statuses, composer_status_sender.clone());
let executor_startup_fut = wait_for_executor(executor_status, composer_status_sender);

match join!(collectors_startup_fut, executor_startup_fut) {
(Ok(()), Ok(())) => {}
(Err(e), Ok(())) => error!(%e, "geth collectors failed to become ready"),
(Ok(()), Err(e)) => error!(%e, "executor failed to become ready"),
(Err(collector_err), Err(executor_err)) => {
error!(%collector_err, %executor_err, "geth collectors and executor failed to become ready");
}
};

// run the grpc server
let mut grpc_server_handle = tokio::spawn(async move {
Expand Down Expand Up @@ -293,7 +308,7 @@ impl Composer {
};
},
o = &mut api_task => {
report_exit("api server unexpectedly ended", o);
report_exit("api server unexpectedly ended", o, &exit_err);
break ShutdownInfo {
api_server_shutdown_token,
composer_shutdown_token: shutdown_token,
Expand All @@ -304,7 +319,7 @@ impl Composer {
};
},
o = &mut executor_task => {
report_exit("executor unexpectedly ended", o);
report_exit("executor unexpectedly ended", o, &exit_err);
break ShutdownInfo {
api_server_shutdown_token,
composer_shutdown_token: shutdown_token,
Expand All @@ -315,7 +330,7 @@ impl Composer {
};
},
o = &mut grpc_server_handle => {
report_exit("grpc server unexpectedly ended", o);
report_exit("grpc server unexpectedly ended", o, &exit_err);
break ShutdownInfo {
api_server_shutdown_token,
composer_shutdown_token: shutdown_token,
Expand All @@ -326,7 +341,7 @@ impl Composer {
};
},
Some((rollup, collector_exit)) = geth_collector_tasks.join_next() => {
report_exit("collector", collector_exit);
report_exit("collector", collector_exit, &exit_err);
if let Some(url) = rollups.get(&rollup) {
let collector = geth::Builder {
chain_name: rollup.clone(),
Expand All @@ -348,7 +363,11 @@ impl Composer {
});
};

shutdown_info.run().await
let shutdown_res = shutdown_info.run().await;
if let Some(exit_err) = exit_err.take() {
return Err(exit_err);
}
shutdown_res
}
}

Expand Down Expand Up @@ -467,14 +486,14 @@ fn spawn_geth_collectors(
#[instrument(skip_all, err)]
async fn wait_for_executor(
mut executor_status: watch::Receiver<executor::Status>,
composer_status_sender: &mut watch::Sender<composer::Status>,
composer_status_sender: Arc<Mutex<watch::Sender<composer::Status>>>,
) -> eyre::Result<()> {
executor_status
.wait_for(executor::Status::is_connected)
.await
.wrap_err("executor failed while waiting for it to become ready")?;

composer_status_sender.send_modify(|status| {
composer_status_sender.lock().await.send_modify(|status| {
status.set_executor_connected(true);
});

Expand All @@ -485,7 +504,7 @@ async fn wait_for_executor(
#[instrument(skip_all, err)]
async fn wait_for_collectors(
collector_statuses: &HashMap<String, watch::Receiver<collectors::geth::Status>>,
composer_status_sender: &mut watch::Sender<composer::Status>,
composer_status_sender: Arc<Mutex<watch::Sender<composer::Status>>>,
) -> eyre::Result<()> {
use futures::{
future::FutureExt as _,
Expand Down Expand Up @@ -525,21 +544,27 @@ async fn wait_for_collectors(
}
}

composer_status_sender.send_modify(|status| {
composer_status_sender.lock().await.send_modify(|status| {
status.set_all_collectors_connected(true);
});

Ok(())
}

fn report_exit(task_name: &str, outcome: Result<eyre::Result<()>, JoinError>) {
fn report_exit(
task_name: &str,
outcome: Result<eyre::Result<()>, JoinError>,
exit_err: &OnceCell<eyre::Report>,
) {
match outcome {
Ok(Ok(())) => info!(task = task_name, "task exited successfully"),
Ok(Err(error)) => {
error!(%error, task = task_name, "task returned with error");
let _ = exit_err.set(error);
}
Err(error) => {
error!(%error, task = task_name, "task failed to complete");
let _ = exit_err.set(error.into());
}
}
}
Expand Down
Loading
Loading