Skip to content

Commit

Permalink
refactor(sequencer): split run method to eliminate long-lived spans (#…
Browse files Browse the repository at this point in the history
…1898)

## Summary
Split `run_until_stopped()` into two methods and called them both in
`spawn()`.

## Background
There are two reasons for making this change, firstly being to eliminate
the long-lived span in `run_until_stopped()`. Secondly, this is for
parity with the rest of the codebase, which separates the creation
and/or setup of the components from the running process.

## Changes
- Moved initialization logic into `initialize()`, which is instrumented.
- Called both `initialize()` and `run_until_stopped()` in new method
`spawn()`.
- Moved signal handler initialization into `spawn()` and poll for signal
receipt during initialization.
- Added doc comment for the public `spawn()` method.

## Testing
Passing all current tests, no additional testing needed.

## Changelogs
Changelogs updated

## Breaking Changelist
- Breaks public API, as the public-facing run function is now `spawn()`
instead of `run_until_stopped()`

## Related Issues
closes #1895
closes #1893
  • Loading branch information
ethanoroshiba authored Jan 17, 2025
1 parent 2899049 commit 12b3bd1
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 32 deletions.
2 changes: 1 addition & 1 deletion crates/astria-sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() -> ExitCode {
"initializing sequencer"
);

Sequencer::run_until_stopped(cfg, metrics)
Sequencer::spawn(cfg, metrics)
.await
.expect("failed to run sequencer");

Expand Down
124 changes: 93 additions & 31 deletions crates/astria-sequencer/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ use tokio::{
};
use tower_abci::v038::Server;
use tracing::{
debug,
error,
error_span,
info,
info_span,
instrument,
};

use crate::{
Expand All @@ -46,15 +48,82 @@ use crate::{

pub struct Sequencer;

type GRPCServerHandle = JoinHandle<Result<(), tonic::transport::Error>>;
type ABCIServerHandle = JoinHandle<()>;

struct RunningGRPCServer {
pub handle: GRPCServerHandle,
pub shutdown_tx: oneshot::Sender<()>,
}

struct RunningABCIServer {
pub handle: ABCIServerHandle,
pub shutdown_rx: oneshot::Receiver<()>,
}

impl Sequencer {
#[expect(clippy::missing_errors_doc, reason = "not a public function")]
pub async fn run_until_stopped(config: Config, metrics: &'static Metrics) -> Result<()> {
/// Builds and runs the sequencer until it is either stopped by a signal or an error occurs.
///
/// # Errors
/// Returns an error in the following cases:
/// - Database file does not exist, or cannot be loaded into storage
/// - The app fails to initialize
/// - Info service fails to initialize
/// - The server builder fails to return a server
/// - The gRPC address cannot be parsed
/// - The gRPC server fails to exit properly
pub async fn spawn(config: Config, metrics: &'static Metrics) -> Result<()> {
let mut signals = spawn_signal_handler();
let initialize_fut = Self::initialize(config, metrics);
select! {
_ = signals.stop_rx.changed() => {
info_span!("initialize").in_scope(|| info!("shutting down sequencer"));
Ok(())
}

result = initialize_fut => {
let (grpc_server, abci_server) = result?;
Self::run_until_stopped(abci_server, grpc_server, &mut signals).await
}
}
}

async fn run_until_stopped(
abci_server: RunningABCIServer,
grpc_server: RunningGRPCServer,
signals: &mut SignalReceiver,
) -> Result<()> {
select! {
_ = signals.stop_rx.changed() => {
info_span!("run_until_stopped").in_scope(|| info!("shutting down sequencer"));
}

_ = abci_server.shutdown_rx => {
info_span!("run_until_stopped").in_scope(|| error!("ABCI server task exited, this shouldn't happen"));
}
}

grpc_server
.shutdown_tx
.send(())
.map_err(|()| eyre!("failed to send shutdown signal to grpc server"))?;
grpc_server
.handle
.await
.wrap_err("grpc server task failed")?
.wrap_err("grpc server failed")?;
abci_server.handle.abort();
Ok(())
}

#[instrument(skip_all)]
async fn initialize(
config: Config,
metrics: &'static Metrics,
) -> Result<(RunningGRPCServer, RunningABCIServer)> {
cnidarium::register_metrics();
register_histogram_global("cnidarium_get_raw_duration_seconds");
register_histogram_global("cnidarium_nonverifiable_get_raw_duration_seconds");
let span = info_span!("Sequencer::run_until_stopped");

let mut signals = spawn_signal_handler();

let substore_prefixes = vec![penumbra_ibc::IBC_SUBSTORE_PREFIX];

Expand Down Expand Up @@ -88,26 +157,28 @@ impl Sequencer {
service::Info::new(storage.clone()).wrap_err("failed initializing info service")?;
let snapshot_service = service::Snapshot;

let server = Server::builder()
let abci_server = Server::builder()
.consensus(consensus_service)
.info(info_service)
.mempool(mempool_service)
.snapshot(snapshot_service)
.finish()
.ok_or_eyre("server builder didn't return server; are all fields set?")?;

let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
let (server_exit_tx, server_exit_rx) = tokio::sync::oneshot::channel();
let (grpc_shutdown_tx, grpc_shutdown_rx) = tokio::sync::oneshot::channel();
let (abci_shutdown_tx, abci_shutdown_rx) = tokio::sync::oneshot::channel();

let grpc_addr = config
.grpc_addr
.parse()
.wrap_err("failed to parse grpc_addr address")?;
let grpc_server_handle = start_grpc_server(&storage, mempool, grpc_addr, shutdown_rx);
let grpc_server_handle = start_grpc_server(&storage, mempool, grpc_addr, grpc_shutdown_rx);

span.in_scope(|| info!(config.listen_addr, "starting sequencer"));
let server_handle = tokio::spawn(async move {
match server.listen_tcp(&config.listen_addr).await {
debug!(config.listen_addr, "starting sequencer");

let listen_addr = config.listen_addr.clone();
let abci_server_handle = tokio::spawn(async move {
match abci_server.listen_tcp(listen_addr).await {
Ok(()) => {
// this shouldn't happen, as there isn't a way for the ABCI server to exit
info_span!("abci_server").in_scope(|| info!("ABCI server exited successfully"));
Expand All @@ -117,28 +188,19 @@ impl Sequencer {
.in_scope(|| error!(err = e.as_ref(), "ABCI server exited with error"));
}
}
let _ = server_exit_tx.send(());
let _ = abci_shutdown_tx.send(());
});

select! {
_ = signals.stop_rx.changed() => {
span.in_scope(|| info!("shutting down sequencer"));
}

_ = server_exit_rx => {
span.in_scope(|| error!("ABCI server task exited, this shouldn't happen"));
}
}
let grpc_server = RunningGRPCServer {
handle: grpc_server_handle,
shutdown_tx: grpc_shutdown_tx,
};
let abci_server = RunningABCIServer {
handle: abci_server_handle,
shutdown_rx: abci_shutdown_rx,
};

shutdown_tx
.send(())
.map_err(|()| eyre!("failed to send shutdown signal to grpc server"))?;
grpc_server_handle
.await
.wrap_err("grpc server task failed")?
.wrap_err("grpc server failed")?;
server_handle.abort();
Ok(())
Ok((grpc_server, abci_server))
}
}

Expand Down

0 comments on commit 12b3bd1

Please sign in to comment.