From 453f66cf53c2f020d884cebaa9d997273a57c569 Mon Sep 17 00:00:00 2001 From: Richard Janis Goldschmidt Date: Tue, 21 Jan 2025 10:59:10 +0100 Subject: [PATCH] refactor(conductor): make firm, soft readers subtasks making the celestia (firm) and astria (soft) readers subtasks of the executor task is a more faithful representation of their dependencies: executor can run with either or both present. But the reader tasks cannot run without the executor task present. To fully initialize they also depend on data from the executor, and they could be implemented by streams instead of free standing tasks. spin up readers only after commitment, genesis states are received this allows removing a lot of complexity: 1. the readers need not explicitly wait for the state to be initialized but receive an already initialized watch channel. 2. there is no need for a bespoke channel to dynamically set permits - a normal mpsc channel can be used with its capacity initialized after receiving the genesis info. executor::Initialized::run delegates to executor::Initialized::run_event_loop to separate the shutdown token from the other arms of the select macro - this way, an else => {} arm can be introduced that shuts down executor as a fallback --- .../astria-conductor/src/celestia/builder.rs | 16 +- crates/astria-conductor/src/celestia/mod.rs | 120 ++--- .../astria-conductor/src/celestia/verify.rs | 8 +- .../astria-conductor/src/conductor/inner.rs | 223 ++------ crates/astria-conductor/src/config.rs | 10 + .../astria-conductor/src/executor/builder.rs | 74 +-- .../astria-conductor/src/executor/channel.rs | 241 --------- crates/astria-conductor/src/executor/mod.rs | 481 ++++++++---------- crates/astria-conductor/src/executor/state.rs | 122 +++-- crates/astria-conductor/src/executor/tests.rs | 7 +- .../astria-conductor/src/sequencer/builder.rs | 13 +- crates/astria-conductor/src/sequencer/mod.rs | 96 ++-- .../tests/blackbox/firm_only.rs | 2 +- 13 files changed, 463 insertions(+), 950 deletions(-) delete mode 100644 crates/astria-conductor/src/executor/channel.rs diff --git a/crates/astria-conductor/src/celestia/builder.rs b/crates/astria-conductor/src/celestia/builder.rs index ef33a28cbd..eb03d9440c 100644 --- a/crates/astria-conductor/src/celestia/builder.rs +++ b/crates/astria-conductor/src/celestia/builder.rs @@ -10,9 +10,12 @@ use jsonrpsee::http_client::HttpClient as CelestiaClient; use tendermint_rpc::HttpClient as SequencerClient; use tokio_util::sync::CancellationToken; -use super::Reader; +use super::{ + Reader, + ReconstructedBlock, +}; use crate::{ - executor, + executor::StateReceiver, metrics::Metrics, }; @@ -20,7 +23,8 @@ pub(crate) struct Builder { pub(crate) celestia_block_time: Duration, pub(crate) celestia_http_endpoint: String, pub(crate) celestia_token: Option, - pub(crate) executor: executor::Handle, + pub(crate) firm_blocks: tokio::sync::mpsc::Sender>, + pub(crate) rollup_state: StateReceiver, pub(crate) sequencer_cometbft_client: SequencerClient, pub(crate) sequencer_requests_per_second: u32, pub(crate) expected_celestia_chain_id: String, @@ -36,13 +40,14 @@ impl Builder { celestia_block_time, celestia_http_endpoint, celestia_token, - executor, sequencer_cometbft_client, sequencer_requests_per_second, expected_celestia_chain_id, expected_sequencer_chain_id, shutdown, metrics, + firm_blocks, + rollup_state, } = self; let celestia_client = create_celestia_client(celestia_http_endpoint, celestia_token) @@ -51,7 +56,8 @@ impl Builder { Ok(Reader { celestia_block_time, celestia_client, - executor, + firm_blocks, + rollup_state, sequencer_cometbft_client, sequencer_requests_per_second, expected_celestia_chain_id, diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 9acfc2c50f..07b1633b96 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -58,15 +58,11 @@ use tracing::{ trace, trace_span, warn, + Instrument as _, }; use crate::{ block_cache::GetSequencerHeight, - executor::{ - FirmSendError, - FirmTrySendError, - StateIsInit, - }, metrics::Metrics, utils::flatten, }; @@ -95,10 +91,7 @@ use self::{ BlobVerifier, }, }; -use crate::{ - block_cache::BlockCache, - executor, -}; +use crate::block_cache::BlockCache; /// Sequencer Block information reconstructed from Celestia blobs. /// @@ -138,8 +131,11 @@ pub(crate) struct Reader { /// Client to fetch heights and blocks from Celestia. celestia_client: CelestiaClient, - /// The channel used to send messages to the executor task. - executor: executor::Handle, + /// The channel to forward firm blocks to the executor. + firm_blocks: mpsc::Sender>, + + /// The channel to read updates of the rollup state from. + rollup_state: crate::executor::StateReceiver, /// The client to get the sequencer namespace and verify blocks. sequencer_cometbft_client: SequencerClient, @@ -162,7 +158,7 @@ pub(crate) struct Reader { impl Reader { pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { - let (executor, sequencer_chain_id) = select!( + let sequencer_chain_id = select!( () = self.shutdown.clone().cancelled_owned() => { info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(|| info!("received shutdown signal while waiting for Celestia reader task to initialize") @@ -175,16 +171,14 @@ impl Reader { } ); - RunningReader::from_parts(self, executor, sequencer_chain_id) + RunningReader::from_parts(self, sequencer_chain_id) .wrap_err("failed entering run loop")? .run_until_stopped() .await } #[instrument(skip_all, err)] - async fn initialize( - &mut self, - ) -> eyre::Result<(executor::Handle, tendermint::chain::Id)> { + async fn initialize(&mut self) -> eyre::Result { let validate_celestia_chain_id = async { let actual_celestia_chain_id = get_celestia_chain_id(&self.celestia_client) .await @@ -196,14 +190,8 @@ impl Reader { `{actual_celestia_chain_id}`" ); Ok(()) - }; - - let wait_for_init_executor = async { - self.executor - .wait_for_init() - .await - .wrap_err("handle to executor failed while waiting for it being initialized") - }; + } + .in_current_span(); let get_and_validate_sequencer_chain_id = async { let actual_sequencer_chain_id = @@ -217,18 +205,18 @@ impl Reader { actual: `{actual_sequencer_chain_id}`" ); Ok(actual_sequencer_chain_id) - }; + } + .in_current_span(); try_join!( validate_celestia_chain_id, - wait_for_init_executor, get_and_validate_sequencer_chain_id ) - .map(|((), executor_init, sequencer_chain_id)| (executor_init, sequencer_chain_id)) + .map(|((), sequencer_chain_id)| sequencer_chain_id) } } -#[instrument(skip_all, err)] +#[instrument(skip_all, err, ret(Display))] async fn get_celestia_chain_id( celestia_client: &CelestiaClient, ) -> eyre::Result { @@ -263,8 +251,11 @@ struct RunningReader { // Client to fetch heights and blocks from Celestia. celestia_client: CelestiaClient, - /// The channel used to send messages to the executor task. - executor: executor::Handle, + /// The channel to forward firm blocks to the executor. + firm_blocks: mpsc::Sender>, + + /// The channel to read updates of the rollup state from. + rollup_state: crate::executor::StateReceiver, /// Token to listen for Conductor being shut down. shutdown: CancellationToken, @@ -280,7 +271,8 @@ struct RunningReader { /// capacity again. Used as a back pressure mechanism so that this task does not fetch more /// blobs if there is no capacity in the executor to execute them against the rollup in /// time. - enqueued_block: Fuse>>, + enqueued_block: + Fuse>>>>, /// The latest observed head height of the Celestia network. Set by values read from /// the `latest_height` stream. @@ -323,7 +315,6 @@ struct RunningReader { impl RunningReader { fn from_parts( exposed_reader: Reader, - mut executor: executor::Handle, sequencer_chain_id: tendermint::chain::Id, ) -> eyre::Result { let Reader { @@ -333,21 +324,23 @@ impl RunningReader { shutdown, sequencer_requests_per_second, metrics, + firm_blocks, + rollup_state, .. } = exposed_reader; let block_cache = - BlockCache::with_next_height(executor.next_expected_firm_sequencer_height()) + BlockCache::with_next_height(rollup_state.next_expected_firm_sequencer_height()) .wrap_err("failed constructing sequential block cache")?; let latest_heights = stream_latest_heights(celestia_client.clone(), celestia_block_time); - let rollup_id = executor.rollup_id(); + let rollup_id = rollup_state.rollup_id(); let rollup_namespace = astria_core::celestia::namespace_v0_from_rollup_id(rollup_id); let sequencer_namespace = astria_core::celestia::namespace_v0_from_sha256_of_bytes(sequencer_chain_id.as_bytes()); - let celestia_next_height = executor.celestia_base_block_height(); - let celestia_reference_height = executor.celestia_base_block_height(); - let celestia_variance = executor.celestia_block_variance(); + let celestia_next_height = rollup_state.celestia_base_block_height(); + let celestia_reference_height = rollup_state.celestia_base_block_height(); + let celestia_variance = rollup_state.celestia_block_variance(); Ok(Self { block_cache, @@ -357,7 +350,8 @@ impl RunningReader { ), celestia_client, enqueued_block: Fuse::terminated(), - executor, + firm_blocks, + rollup_state, latest_heights, shutdown, reconstruction_tasks: JoinMap::new(), @@ -498,7 +492,7 @@ impl RunningReader { rollup_id: self.rollup_id, rollup_namespace: self.rollup_namespace, sequencer_namespace: self.sequencer_namespace, - executor: self.executor.clone(), + rollup_state: self.rollup_state.clone(), metrics: self.metrics, }; self.reconstruction_tasks.spawn(height, task.execute()); @@ -520,28 +514,20 @@ impl RunningReader { #[instrument(skip_all)] fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> { let celestia_height = block.celestia_height; - match self.executor.try_send_firm_block(block) { + match self.firm_blocks.try_send(block.into()) { Ok(()) => self.advance_reference_celestia_height(celestia_height), - Err(FirmTrySendError::Channel { - source, - }) => match source { - mpsc::error::TrySendError::Full(block) => { - trace!( - "executor channel is full; rescheduling block fetch until the channel \ - opens up" - ); - self.enqueued_block = - enqueue_block(self.executor.clone(), block).boxed().fuse(); - } - mpsc::error::TrySendError::Closed(_) => { - bail!("exiting because executor channel is closed"); - } - }, - Err(FirmTrySendError::NotSet) => bail!( - "exiting because executor was configured without firm commitments; this Celestia \ - reader should have never been started" - ), - } + Err(mpsc::error::TrySendError::Full(block)) => { + trace!( + "executor channel is full; rescheduling block fetch until the channel opens up" + ); + self.enqueued_block = enqueue_block(self.firm_blocks.clone(), block) + .boxed() + .fuse(); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + bail!("exiting because executor channel is closed"); + } + }; Ok(()) } @@ -574,7 +560,7 @@ struct FetchConvertVerifyAndReconstruct { rollup_id: RollupId, rollup_namespace: Namespace, sequencer_namespace: Namespace, - executor: executor::Handle, + rollup_state: crate::executor::StateReceiver, metrics: &'static Metrics, } @@ -593,7 +579,7 @@ impl FetchConvertVerifyAndReconstruct { rollup_id, rollup_namespace, sequencer_namespace, - executor, + rollup_state, metrics, } = self; @@ -633,7 +619,7 @@ impl FetchConvertVerifyAndReconstruct { "decoded Sequencer header and rollup info from raw Celestia blobs", ); - let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, executor).await; + let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, rollup_state).await; metrics.record_sequencer_blocks_metadata_verified_per_celestia_fetch( verified_blobs.len_header_blobs(), @@ -671,15 +657,15 @@ impl FetchConvertVerifyAndReconstruct { #[instrument(skip_all, err)] async fn enqueue_block( - executor: executor::Handle, + firm_blocks_tx: mpsc::Sender>, block: Box, -) -> Result { +) -> Result>> { let celestia_height = block.celestia_height; - executor.send_firm_block(block).await?; + firm_blocks_tx.send(block).await?; Ok(celestia_height) } -#[instrument(skip_all, err)] +#[instrument(skip_all, err, ret(Display))] async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result { use sequencer_client::Client as _; diff --git a/crates/astria-conductor/src/celestia/verify.rs b/crates/astria-conductor/src/celestia/verify.rs index a4415468b6..d257a3b5c1 100644 --- a/crates/astria-conductor/src/celestia/verify.rs +++ b/crates/astria-conductor/src/celestia/verify.rs @@ -51,10 +51,6 @@ use super::{ block_verifier, convert::ConvertedBlobs, }; -use crate::executor::{ - self, - StateIsInit, -}; pub(super) struct VerifiedBlobs { celestia_height: u64, @@ -99,7 +95,7 @@ struct VerificationTaskKey { pub(super) async fn verify_metadata( blob_verifier: Arc, converted_blobs: ConvertedBlobs, - mut executor: executor::Handle, + rollup_state: crate::executor::StateReceiver, ) -> VerifiedBlobs { let (celestia_height, header_blobs, rollup_blobs) = converted_blobs.into_parts(); @@ -107,7 +103,7 @@ pub(super) async fn verify_metadata( let mut verified_header_blobs = HashMap::with_capacity(header_blobs.len()); let next_expected_firm_sequencer_height = - executor.next_expected_firm_sequencer_height().value(); + rollup_state.next_expected_firm_sequencer_height().value(); for (index, blob) in header_blobs.into_iter().enumerate() { if blob.height().value() < next_expected_firm_sequencer_height { diff --git a/crates/astria-conductor/src/conductor/inner.rs b/crates/astria-conductor/src/conductor/inner.rs index b6dc9d3eb9..2f0056268b 100644 --- a/crates/astria-conductor/src/conductor/inner.rs +++ b/crates/astria-conductor/src/conductor/inner.rs @@ -6,20 +6,16 @@ use std::{ use astria_eyre::eyre::{ self, eyre, - Result, + Report, WrapErr as _, }; -use itertools::Itertools as _; use pin_project_lite::pin_project; -use sequencer_client::HttpClient; use tokio::{ select, + task::JoinHandle, time::timeout, }; -use tokio_util::{ - sync::CancellationToken, - task::JoinMap, -}; +use tokio_util::sync::CancellationToken; use tracing::{ error, info, @@ -29,16 +25,14 @@ use tracing::{ }; use crate::{ - celestia, executor, - sequencer, - utils::flatten, Config, Metrics, }; /// Exit value of the inner conductor impl to signal to the outer task whether to restart or /// shutdown +#[derive(Debug)] pub(super) enum RestartOrShutdown { Restart, Shutdown, @@ -54,24 +48,18 @@ impl std::fmt::Display for RestartOrShutdown { } } -enum ExitReason { - ShutdownSignal, - TaskFailed { - name: &'static str, - error: eyre::Report, - }, -} +struct ShutdownSignalReceived; pin_project! { /// A handle returned by [`ConductorInner::spawn`]. pub(super) struct InnerHandle { shutdown_token: CancellationToken, - task: Option>>, + task: Option>>, } } impl Future for InnerHandle { - type Output = Result, tokio::task::JoinError>; + type Output = Result, tokio::task::JoinError>; fn poll( self: std::pin::Pin<&mut Self>, @@ -91,15 +79,10 @@ pub(super) struct ConductorInner { /// Token to signal to all tasks to shut down gracefully. shutdown_token: CancellationToken, - /// The different long-running tasks that make up the conductor; - tasks: JoinMap<&'static str, eyre::Result<()>>, + executor: Option>>, } impl ConductorInner { - const CELESTIA: &'static str = "celestia"; - const EXECUTOR: &'static str = "executor"; - const SEQUENCER: &'static str = "sequencer"; - /// Create a new [`ConductorInner`] from a [`Config`]. /// /// # Errors @@ -107,78 +90,21 @@ impl ConductorInner { /// actors could not be spawned (executor, sequencer reader, or data availability reader). /// This usually happens if the actors failed to connect to their respective endpoints. fn new( - cfg: Config, + config: Config, metrics: &'static Metrics, shutdown_token: CancellationToken, ) -> eyre::Result { - let mut tasks = JoinMap::new(); - - let sequencer_cometbft_client = HttpClient::new(&*cfg.sequencer_cometbft_url) - .wrap_err("failed constructing sequencer cometbft RPC client")?; - - // Spawn the executor task. - let executor_handle = { - let (executor, handle) = executor::Builder { - mode: cfg.execution_commit_level, - rollup_address: cfg.execution_rpc_url, - shutdown: shutdown_token.clone(), - metrics, - } - .build() - .wrap_err("failed constructing executor")?; - - tasks.spawn(Self::EXECUTOR, executor.run_until_stopped()); - handle - }; - - if cfg.execution_commit_level.is_with_soft() { - let sequencer_grpc_client = - sequencer::SequencerGrpcClient::new(&cfg.sequencer_grpc_url) - .wrap_err("failed constructing grpc client for Sequencer")?; - - // The `sync_start_block_height` represents the height of the next - // sequencer block that can be executed on top of the rollup state. - // This value is derived by the Executor. - let sequencer_reader = sequencer::Builder { - sequencer_grpc_client, - sequencer_cometbft_client: sequencer_cometbft_client.clone(), - sequencer_block_time: Duration::from_millis(cfg.sequencer_block_time_ms), - expected_sequencer_chain_id: cfg.expected_sequencer_chain_id.clone(), - shutdown: shutdown_token.clone(), - executor: executor_handle.clone(), - } - .build(); - tasks.spawn(Self::SEQUENCER, sequencer_reader.run_until_stopped()); + let executor = executor::Builder { + config, + shutdown: shutdown_token.clone(), + metrics, } - - if cfg.execution_commit_level.is_with_firm() { - let celestia_token = if cfg.no_celestia_auth { - None - } else { - Some(cfg.celestia_bearer_token) - }; - - let reader = celestia::Builder { - celestia_http_endpoint: cfg.celestia_node_http_url, - celestia_token, - celestia_block_time: Duration::from_millis(cfg.celestia_block_time_ms), - executor: executor_handle.clone(), - sequencer_cometbft_client: sequencer_cometbft_client.clone(), - sequencer_requests_per_second: cfg.sequencer_requests_per_second, - expected_celestia_chain_id: cfg.expected_celestia_chain_id, - expected_sequencer_chain_id: cfg.expected_sequencer_chain_id, - shutdown: shutdown_token.clone(), - metrics, - } - .build() - .wrap_err("failed to build Celestia Reader")?; - - tasks.spawn(Self::CELESTIA, reader.run_until_stopped()); - }; + .build() + .wrap_err("failed constructing executor")?; Ok(Self { shutdown_token, - tasks, + executor: Some(tokio::spawn(executor.run_until_stopped())), }) } @@ -186,27 +112,28 @@ impl ConductorInner { /// /// # Panics /// Panics if it could not install a signal handler. - async fn run_until_stopped(mut self) -> Result { + async fn run_until_stopped(mut self) -> eyre::Result { info_span!("Conductor::run_until_stopped").in_scope(|| info!("conductor is running")); let exit_reason = select! { biased; () = self.shutdown_token.cancelled() => { - ExitReason::ShutdownSignal + Ok(ShutdownSignalReceived) }, - Some((name, res)) = self.tasks.join_next() => { - match flatten(res) { - Ok(()) => ExitReason::TaskFailed{name, error: eyre!("task `{name}` exited unexpectedly")}, - Err(err) => ExitReason::TaskFailed{name, error: err.wrap_err(format!("task `{name}` failed"))}, + res = self.executor.as_mut().expect("task must always be set at this point") => { + // XXX: must Option::take the JoinHandle to avoid polling it in the shutdown logic. + self.executor.take(); + match res { + Ok(Ok(())) => Err(eyre!("executor exited unexpectedly")), + Ok(Err(err)) => Err(err.wrap_err("executor exited with error")), + Err(err) => Err(Report::new(err).wrap_err("executor panicked")), } } }; - let message = "initiating shutdown"; - report_exit(&exit_reason, message); - self.shutdown(exit_reason).await + self.restart_or_shutdown(exit_reason).await } /// Creates and spawns a Conductor on the tokio runtime. @@ -232,89 +159,43 @@ impl ConductorInner { /// Waits 25 seconds for all tasks to shut down before aborting them. 25 seconds /// because kubernetes issues SIGKILL 30 seconds after SIGTERM, giving 5 seconds /// to abort the remaining tasks. - #[instrument(skip_all)] - async fn shutdown(mut self, exit_reason: ExitReason) -> Result { + #[instrument(skip_all, err, ret(Display))] + async fn restart_or_shutdown( + mut self, + exit_reason: eyre::Result, + ) -> eyre::Result { self.shutdown_token.cancel(); - let mut restart_or_shutdown = RestartOrShutdown::Shutdown; - - match &exit_reason { - ExitReason::ShutdownSignal => { - info!("received shutdown signal, skipping check for restart"); - } - ExitReason::TaskFailed { - name, - error, - } => { - if check_for_restart(name, error) { - restart_or_shutdown = RestartOrShutdown::Restart; + let restart_or_shutdown = match exit_reason { + Ok(ShutdownSignalReceived) => Ok(RestartOrShutdown::Shutdown), + Err(error) => { + error!(%error, "executor failed; checking error chain if conductor should be restarted"); + if check_for_restart(&error) { + Ok(RestartOrShutdown::Restart) + } else { + Err(error) } } - } - - info!("signalled all tasks to shut down; waiting for 25 seconds to exit"); - - let shutdown_loop = async { - while let Some((name, res)) = self.tasks.join_next().await { - let message = "task shut down"; - match flatten(res) { - Ok(()) => { - info!(name, message); - } - Err(error) => { - if check_for_restart(name, &error) - && !matches!(exit_reason, ExitReason::ShutdownSignal) - { - restart_or_shutdown = RestartOrShutdown::Restart; - } - error!(name, %error, message); - } - }; - } }; - if timeout(Duration::from_secs(25), shutdown_loop) - .await - .is_err() - { - let tasks = self.tasks.keys().join(", "); - warn!( - tasks = format_args!("[{tasks}]"), - "aborting all tasks that have not yet shut down", - ); - self.tasks.abort_all(); - } else { - info!("all tasks shut down regularly"); - } - info!("shutting down"); - - if let ExitReason::TaskFailed { - error, .. - } = exit_reason - { - if matches!(restart_or_shutdown, RestartOrShutdown::Shutdown) { - return Err(error); + if let Some(mut executor) = self.executor.take() { + let wait_until_timeout = Duration::from_secs(25); + if timeout(wait_until_timeout, &mut executor).await.is_err() { + warn!( + "waited `{}` for executor start to respond to shutdown signal; aborting", + humantime::format_duration(wait_until_timeout) + ); + executor.abort(); + } else { + info!("executor shut down regularly"); } } - Ok(restart_or_shutdown) - } -} -#[instrument(skip_all)] -fn report_exit(exit_reason: &ExitReason, message: &str) { - match exit_reason { - ExitReason::ShutdownSignal => info!(reason = "received shutdown signal", message), - ExitReason::TaskFailed { - name: task, - error: reason, - } => error!(%reason, %task, message), + restart_or_shutdown } } #[instrument(skip_all)] -fn check_for_restart(name: &str, err: &eyre::Report) -> bool { - if name != ConductorInner::EXECUTOR { - return false; - } +fn check_for_restart(err: &eyre::Report) -> bool { let mut current = Some(err.as_ref() as &dyn std::error::Error); while let Some(err) = current { if let Some(status) = err.downcast_ref::() { @@ -338,6 +219,6 @@ mod tests { let err = tonic_error.wrap_err("wrapper_1"); let err = err.wrap_err("wrapper_2"); let err = err.wrap_err("wrapper_3"); - assert!(super::check_for_restart("executor", &err.unwrap_err())); + assert!(super::check_for_restart(&err.unwrap_err())); } } diff --git a/crates/astria-conductor/src/config.rs b/crates/astria-conductor/src/config.rs index e8211b1714..cd48c4d1e0 100644 --- a/crates/astria-conductor/src/config.rs +++ b/crates/astria-conductor/src/config.rs @@ -95,6 +95,16 @@ pub struct Config { pub pretty_print: bool, } +impl Config { + pub(crate) fn is_with_firm(&self) -> bool { + self.execution_commit_level.is_with_firm() + } + + pub(crate) fn is_with_soft(&self) -> bool { + self.execution_commit_level.is_with_soft() + } +} + impl config::Config for Config { const PREFIX: &'static str = "ASTRIA_CONDUCTOR_"; } diff --git a/crates/astria-conductor/src/executor/builder.rs b/crates/astria-conductor/src/executor/builder.rs index daf53e5835..1cae5f0089 100644 --- a/crates/astria-conductor/src/executor/builder.rs +++ b/crates/astria-conductor/src/executor/builder.rs @@ -1,86 +1,40 @@ -use std::collections::HashMap; - use astria_eyre::eyre::{ self, WrapErr as _, }; -use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; -use super::{ - state, - Executor, - Handle, - ReconstructedBlock, - StateNotInit, -}; -use crate::{ - config::CommitLevel, - metrics::Metrics, -}; +use super::Executor; +use crate::metrics::Metrics; pub(crate) struct Builder { - pub(crate) mode: CommitLevel, - pub(crate) rollup_address: String, + pub(crate) config: crate::Config, pub(crate) shutdown: CancellationToken, pub(crate) metrics: &'static Metrics, } impl Builder { - pub(crate) fn build(self) -> eyre::Result<(Executor, Handle)> { + pub(crate) fn build(self) -> eyre::Result { let Self { - mode, - rollup_address, + config, shutdown, metrics, } = self; - let client = super::client::Client::connect_lazy(&rollup_address).wrap_err_with(|| { - format!( - "failed to construct execution client for provided rollup address \ - `{rollup_address}`" - ) - })?; - - let mut firm_block_tx = None; - let mut firm_block_rx = None; - if mode.is_with_firm() { - let (tx, rx) = mpsc::channel::>(16); - firm_block_tx = Some(tx); - firm_block_rx = Some(rx); - } - - let mut soft_block_tx = None; - let mut soft_block_rx = None; - if mode.is_with_soft() { - let (tx, rx) = super::soft_block_channel(); - soft_block_tx = Some(tx); - soft_block_rx = Some(rx); - } - - let (state_tx, state_rx) = state::channel(); + let client = + super::client::Client::connect_lazy(&config.execution_rpc_url).wrap_err_with(|| { + format!( + "failed to construct execution client for provided rollup address `{}`", + config.execution_rpc_url, + ) + })?; let executor = Executor { + config, client, - - mode, - - firm_blocks: firm_block_rx, - soft_blocks: soft_block_rx, - shutdown, - state: state_tx, - blocks_pending_finalization: HashMap::new(), - - max_spread: None, metrics, }; - let handle = Handle { - firm_blocks: firm_block_tx, - soft_blocks: soft_block_tx, - state: state_rx, - _state_init: StateNotInit, - }; - Ok((executor, handle)) + Ok(executor) } } diff --git a/crates/astria-conductor/src/executor/channel.rs b/crates/astria-conductor/src/executor/channel.rs deleted file mode 100644 index 955d62d9b3..0000000000 --- a/crates/astria-conductor/src/executor/channel.rs +++ /dev/null @@ -1,241 +0,0 @@ -//! An mpsc channel bounded by an externally driven semaphore. -//! -//! While the main purpose of this channel is to send [`sequencer_client::SequencerBlock`]s -//! from a sequencer reader to the executor, the channel is generic over the values that are -//! being sent to better test its functionality. - -use std::sync::{ - Arc, - Weak, -}; - -use tokio::sync::{ - mpsc::{ - error::SendError as TokioSendError, - unbounded_channel, - UnboundedReceiver, - UnboundedSender, - }, - AcquireError, - Semaphore, - TryAcquireError, -}; -use tracing::instrument; - -/// Creates an mpsc channel for sending soft blocks between asynchronous task. -/// -/// The initial bound of the channel is 0 and the receiver is expected to add -/// capacity to the channel. -pub(super) fn soft_block_channel() -> (Sender, Receiver) { - let cap = 0; - let sem = Arc::new(Semaphore::new(0)); - let (tx, rx) = unbounded_channel(); - let sender = Sender { - chan: tx, - sem: Arc::downgrade(&sem), - }; - let receiver = Receiver { - cap, - chan: rx, - sem, - }; - (sender, receiver) -} - -#[derive(Debug, thiserror::Error, PartialEq)] -#[error("the channel is closed")] -pub(crate) struct SendError; - -impl From for SendError { - fn from(_: AcquireError) -> Self { - Self - } -} - -impl From> for SendError { - fn from(_: TokioSendError) -> Self { - Self - } -} - -#[derive(Debug, thiserror::Error, PartialEq)] -pub(crate) enum TrySendError { - #[error("the channel is closed")] - Closed(T), - #[error("no permits available")] - NoPermits(T), -} - -impl TrySendError { - fn from_semaphore(err: &TryAcquireError, block: T) -> Self { - match err { - tokio::sync::TryAcquireError::Closed => Self::Closed(block), - tokio::sync::TryAcquireError::NoPermits => Self::NoPermits(block), - } - } -} - -impl From> for TrySendError { - fn from(err: TokioSendError) -> Self { - Self::Closed(err.0) - } -} - -#[derive(Debug, Clone)] -pub(super) struct Sender { - sem: Weak, - chan: UnboundedSender, -} - -impl Sender { - /// Sends a block, waiting until the channel has permits. - /// - /// Returns an error if the channel is closed. - #[instrument(skip_all, err)] - pub(super) async fn send(&self, block: T) -> Result<(), SendError> { - let sem = self.sem.upgrade().ok_or(SendError)?; - let permit = sem.acquire().await?; - permit.forget(); - self.chan.send(block)?; - Ok(()) - } - - /// Attempts to send a block without blocking. - /// - /// Returns an error if the channel is out of permits or if it has been closed. - pub(super) fn try_send(&self, block: T) -> Result<(), TrySendError> { - let sem = match self.sem.upgrade() { - None => return Err(TrySendError::Closed(block)), - Some(sem) => sem, - }; - let permit = match sem.try_acquire() { - Err(err) => return Err(TrySendError::from_semaphore(&err, block)), - Ok(permit) => permit, - }; - permit.forget(); - self.chan.send(block)?; - Ok(()) - } -} - -pub(super) struct Receiver { - cap: usize, - sem: Arc, - chan: UnboundedReceiver, -} - -impl Drop for Receiver { - fn drop(&mut self) { - self.sem.close(); - } -} - -impl Receiver { - /// Sets the channel's capacity to `cap`. - /// - /// `cap` will be the maximum number of blocks that can be sent - /// over the channel before new permits are added with `[SoftBlockReceiver::add_permits]`. - pub(super) fn set_capacity(&mut self, cap: usize) { - self.cap = cap; - } - - /// Adds up to `capacity` number of permits to the channel. - /// - /// `capacity` is previously set by [`SoftBlockReceiver::set_capacity`] - /// or zero by default. - pub(super) fn fill_permits(&self) { - let additional = self.cap.saturating_sub(self.sem.available_permits()); - self.sem.add_permits(additional); - } - - /// Receives a block over the channel. - #[instrument(skip_all)] - pub(super) async fn recv(&mut self) -> Option { - self.chan.recv().await - } -} - -#[cfg(test)] -mod tests { - use super::{ - soft_block_channel, - SendError, - TrySendError, - }; - - #[test] - fn fresh_channel_has_no_capacity() { - let (tx, _rx) = soft_block_channel::<()>(); - assert_eq!( - tx.try_send(()).unwrap_err(), - TrySendError::NoPermits(()), - "a fresh channel starts without permits" - ); - } - - #[test] - fn permits_are_filled_to_capacity() { - let cap = 2; - let (tx, mut rx) = soft_block_channel::<()>(); - rx.set_capacity(cap); - rx.fill_permits(); - for _ in 0..cap { - tx.try_send(()).expect("the channel should have capacity"); - } - assert_eq!( - tx.try_send(()).unwrap_err(), - TrySendError::NoPermits(()), - "a channel that has its permits used up should return with a NoPermits error until \ - refilled or closed", - ); - } - - #[test] - fn refilling_twice_has_no_effect() { - let cap = 2; - let (tx, mut rx) = soft_block_channel::<()>(); - rx.set_capacity(cap); - rx.fill_permits(); - rx.fill_permits(); - for _ in 0..cap { - tx.try_send(()).expect("the channel should have capacity"); - } - assert_eq!( - tx.try_send(()).unwrap_err(), - TrySendError::NoPermits(()), - "refilling twice in a row should result in the same number of permits" - ); - } - - #[test] - fn try_sending_to_dropped_receiver_returns_closed_error() { - let (tx, rx) = soft_block_channel::<()>(); - std::mem::drop(rx); - assert_eq!( - tx.try_send(()).unwrap_err(), - TrySendError::Closed(()), - "a channel with a dropped receiver is considered closed", - ); - } - - #[tokio::test] - async fn async_sending_to_dropped_receiver_returns_closed_error() { - let (tx, rx) = soft_block_channel::<()>(); - std::mem::drop(rx); - assert_eq!( - tx.send(()).await.unwrap_err(), - SendError, - "a channel with a dropped receiver is considered closed", - ); - } - - #[tokio::test] - #[should_panic(expected = "receiving with all senders dropped should return None")] - async fn receiving_without_any_remaining_receivers_returns_none() { - let (tx, mut rx) = soft_block_channel::<()>(); - std::mem::drop(tx); - rx.recv() - .await - .expect("receiving with all senders dropped should return None"); - } -} diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index fb3525ead1..36f90386a0 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -1,4 +1,7 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + time::Duration, +}; use astria_core::{ execution::v1::{ @@ -16,27 +19,33 @@ use astria_eyre::eyre::{ self, bail, ensure, + eyre, WrapErr as _, }; use bytes::Bytes; -use sequencer_client::tendermint::{ - block::Height as SequencerHeight, - Time as TendermintTime, +use sequencer_client::{ + tendermint::{ + block::Height as SequencerHeight, + Time as TendermintTime, + }, + HttpClient, }; use tokio::{ select, - sync::{ - mpsc, - watch::error::RecvError, - }, + sync::mpsc, + task::JoinError, +}; +use tokio_util::{ + sync::CancellationToken, + task::JoinMap, }; -use tokio_util::sync::CancellationToken; use tracing::{ debug, debug_span, error, info, instrument, + warn, }; use crate::{ @@ -46,10 +55,8 @@ use crate::{ }; mod builder; -pub(crate) mod channel; pub(crate) use builder::Builder; -use channel::soft_block_channel; mod client; mod state; @@ -57,180 +64,176 @@ mod state; mod tests; pub(super) use client::Client; -use state::StateReceiver; +use state::State; +pub(crate) use state::StateReceiver; use self::state::StateSender; type CelestiaHeight = u64; -#[derive(Clone, Debug)] -pub(crate) struct StateNotInit; -#[derive(Clone, Debug)] -pub(crate) struct StateIsInit; - -#[derive(Debug, thiserror::Error)] -pub(crate) enum FirmSendError { - #[error("executor was configured without firm commitments")] - NotSet, - #[error("failed sending blocks to executor")] - Channel { - #[from] - source: mpsc::error::SendError>, - }, -} - -#[derive(Debug, thiserror::Error)] -pub(crate) enum FirmTrySendError { - #[error("executor was configured without firm commitments")] - NotSet, - #[error("failed sending blocks to executor")] - Channel { - #[from] - source: mpsc::error::TrySendError>, - }, -} +pub(crate) struct Executor { + config: crate::Config, -#[derive(Debug, thiserror::Error)] -pub(crate) enum SoftSendError { - #[error("executor was configured without soft commitments")] - NotSet, - #[error("failed sending blocks to executor")] - Channel { source: Box }, -} + /// The execution client driving the rollup. + client: Client, -#[derive(Debug, thiserror::Error)] -pub(crate) enum SoftTrySendError { - #[error("executor was configured without firm commitments")] - NotSet, - #[error("failed sending blocks to executor")] - Channel { - source: Box>, - }, -} + /// Token to listen for Conductor being shut down. + shutdown: CancellationToken, -/// A handle to the executor. -/// -/// To be useful, [`Handle::wait_for_init`] must be called in -/// order to obtain a [`Handle`]. This is to ensure that the executor -/// state was primed before using its other methods. See [`State`] for more -/// information. -#[derive(Debug, Clone)] -pub(crate) struct Handle { - firm_blocks: Option>>, - soft_blocks: Option>, - state: StateReceiver, - _state_init: TStateInit, + metrics: &'static Metrics, } -impl Handle { - #[instrument(skip_all, err)] - pub(crate) async fn wait_for_init(&mut self) -> eyre::Result> { - self.state.wait_for_init().await.wrap_err( - "executor state channel terminated while waiting for the state to initialize", - )?; - let Self { - firm_blocks, - soft_blocks, - state, - .. - } = self.clone(); - Ok(Handle { - firm_blocks, - soft_blocks, - state, - _state_init: StateIsInit, - }) - } -} +impl Executor { + const CELESTIA: &'static str = "celestia"; + const SEQUENCER: &'static str = "sequencer"; -impl Handle { - #[instrument(skip_all, err)] - pub(crate) async fn send_firm_block( - self, - block: impl Into>, - ) -> Result<(), FirmSendError> { - let sender = self.firm_blocks.as_ref().ok_or(FirmSendError::NotSet)?; - Ok(sender.send(block.into()).await?) - } + pub(crate) async fn run_until_stopped(self) -> eyre::Result<()> { + let initialized = select!( + () = self.shutdown.clone().cancelled_owned() => { + return report_exit(Ok( + "received shutdown signal while initializing task; \ + aborting intialization and exiting" + ), ""); + } + res = self.init() => { + res.wrap_err("initialization failed")? + } + ); - pub(crate) fn try_send_firm_block( - &self, - block: impl Into>, - ) -> Result<(), FirmTrySendError> { - let sender = self.firm_blocks.as_ref().ok_or(FirmTrySendError::NotSet)?; - Ok(sender.try_send(block.into())?) + initialized.run().await } + /// Runs the init logic that needs to happen before [`Executor`] can enter its main loop. #[instrument(skip_all, err)] - pub(crate) async fn send_soft_block_owned( - self, - block: FilteredSequencerBlock, - ) -> Result<(), SoftSendError> { - let chan = self.soft_blocks.as_ref().ok_or(SoftSendError::NotSet)?; - chan.send(block) + async fn init(self) -> eyre::Result { + let state = self + .create_initial_node_state() .await - .map_err(|source| SoftSendError::Channel { - source: Box::new(source), - })?; - Ok(()) - } - - pub(crate) fn try_send_soft_block( - &self, - block: FilteredSequencerBlock, - ) -> Result<(), SoftTrySendError> { - let chan = self.soft_blocks.as_ref().ok_or(SoftTrySendError::NotSet)?; - chan.try_send(block) - .map_err(|source| SoftTrySendError::Channel { - source: Box::new(source), - })?; - Ok(()) - } + .wrap_err("failed setting initial rollup node state")?; - pub(crate) fn next_expected_firm_sequencer_height(&mut self) -> SequencerHeight { - self.state.next_expected_firm_sequencer_height() - } + let sequencer_cometbft_client = HttpClient::new(&*self.config.sequencer_cometbft_url) + .wrap_err("failed constructing sequencer cometbft RPC client")?; + + let reader_cancellation_token = self.shutdown.child_token(); + + let (firm_blocks_tx, firm_blocks_rx) = tokio::sync::mpsc::channel(16); + let (soft_blocks_tx, soft_blocks_rx) = + tokio::sync::mpsc::channel(state.calculate_max_spread()); + + let mut reader_tasks = JoinMap::new(); + if self.config.is_with_firm() { + let celestia_token = if self.config.no_celestia_auth { + None + } else { + Some(self.config.celestia_bearer_token.clone()) + }; + + let reader = crate::celestia::Builder { + celestia_http_endpoint: self.config.celestia_node_http_url.clone(), + celestia_token, + celestia_block_time: Duration::from_millis(self.config.celestia_block_time_ms), + firm_blocks: firm_blocks_tx, + rollup_state: state.subscribe(), + sequencer_cometbft_client: sequencer_cometbft_client.clone(), + sequencer_requests_per_second: self.config.sequencer_requests_per_second, + expected_celestia_chain_id: self.config.expected_celestia_chain_id.clone(), + expected_sequencer_chain_id: self.config.expected_sequencer_chain_id.clone(), + shutdown: reader_cancellation_token.child_token(), + metrics: self.metrics, + } + .build() + .wrap_err("failed to build Celestia Reader")?; + reader_tasks.spawn(Self::CELESTIA, reader.run_until_stopped()); + } - pub(crate) fn next_expected_soft_sequencer_height(&mut self) -> SequencerHeight { - self.state.next_expected_soft_sequencer_height() - } + if self.config.is_with_soft() { + let sequencer_grpc_client = + crate::sequencer::SequencerGrpcClient::new(&self.config.sequencer_grpc_url) + .wrap_err("failed constructing grpc client for Sequencer")?; + + let sequencer_reader = crate::sequencer::Builder { + sequencer_grpc_client, + sequencer_cometbft_client: sequencer_cometbft_client.clone(), + sequencer_block_time: Duration::from_millis(self.config.sequencer_block_time_ms), + expected_sequencer_chain_id: self.config.expected_sequencer_chain_id.clone(), + shutdown: reader_cancellation_token.child_token(), + soft_blocks: soft_blocks_tx, + rollup_state: state.subscribe(), + } + .build(); + reader_tasks.spawn(Self::SEQUENCER, sequencer_reader.run_until_stopped()); + }; - #[instrument(skip_all)] - pub(crate) async fn next_expected_soft_height_if_changed( - &mut self, - ) -> Result { - self.state.next_expected_soft_height_if_changed().await + Ok(Initialized { + config: self.config, + client: self.client, + firm_blocks: firm_blocks_rx, + soft_blocks: soft_blocks_rx, + shutdown: self.shutdown, + state, + blocks_pending_finalization: HashMap::new(), + metrics: self.metrics, + reader_tasks, + reader_cancellation_token, + }) } - pub(crate) fn rollup_id(&mut self) -> RollupId { - self.state.rollup_id() - } + #[instrument(skip_all, err)] + async fn create_initial_node_state(&self) -> eyre::Result { + let genesis_info = { + async { + self.client + .clone() + .get_genesis_info_with_retry() + .await + .wrap_err("failed getting genesis info") + } + }; + let commitment_state = { + async { + self.client + .clone() + .get_commitment_state_with_retry() + .await + .wrap_err("failed getting commitment state") + } + }; + let (genesis_info, commitment_state) = tokio::try_join!(genesis_info, commitment_state)?; - pub(crate) fn celestia_base_block_height(&mut self) -> CelestiaHeight { - self.state.celestia_base_block_height() - } + let (state, _) = state::channel( + State::try_from_genesis_info_and_commitment_state(genesis_info, commitment_state) + .wrap_err( + "failed to construct initial state gensis and commitment info received from \ + rollup", + )?, + ); - pub(crate) fn celestia_block_variance(&mut self) -> u64 { - self.state.celestia_block_variance() + self.metrics + .absolute_set_executed_firm_block_number(state.firm_number()); + self.metrics + .absolute_set_executed_soft_block_number(state.soft_number()); + info!( + initial_state = serde_json::to_string(&*state.get()) + .expect("writing json to a string should not fail"), + "received genesis info from rollup", + ); + Ok(state) } } -pub(crate) struct Executor { +pub(crate) struct Initialized { + config: crate::Config, + /// The execution client driving the rollup. client: Client, - /// The mode under which this executor (and hence conductor) runs. - mode: CommitLevel, - /// The channel of which this executor receives blocks for executing /// firm commitments. - /// Only set if `mode` is `FirmOnly` or `SoftAndFirm`. - firm_blocks: Option>>, + firm_blocks: mpsc::Receiver>, /// The channel of which this executor receives blocks for executing /// soft commitments. - /// Only set if `mode` is `SoftOnly` or `SoftAndFirm`. - soft_blocks: Option>, + soft_blocks: mpsc::Receiver, /// Token to listen for Conductor being shut down. shutdown: CancellationToken, @@ -244,43 +247,38 @@ pub(crate) struct Executor { /// without re-executing on top of the rollup node. blocks_pending_finalization: HashMap, - /// The maximum permitted spread between firm and soft blocks. - max_spread: Option, - metrics: &'static Metrics, + + /// The tasks reading block data off Celestia or Sequencer. + reader_tasks: JoinMap<&'static str, eyre::Result<()>>, + + /// The cancellation token specifically for signaling the `reader_tasks` to shut down. + reader_cancellation_token: CancellationToken, } -impl Executor { - pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { - select!( +impl Initialized { + async fn run(mut self) -> eyre::Result<()> { + let reason = select!( + biased; + () = self.shutdown.clone().cancelled_owned() => { - return report_exit(Ok( - "received shutdown signal while initializing task; \ - aborting intialization and exiting" - ), ""); + Ok("received shutdown signal") } - res = self.init() => { - res.wrap_err("initialization failed")?; + + res = self.run_event_loop() => { + res } ); - let reason = loop { - let spread_not_too_large = !self.is_spread_too_large(); - if spread_not_too_large { - if let Some(channel) = self.soft_blocks.as_mut() { - channel.fill_permits(); - } - } + self.shutdown(reason).await + } + async fn run_event_loop(&mut self) -> eyre::Result<&'static str> { + loop { select!( biased; - () = self.shutdown.cancelled() => { - break Ok("received shutdown signal"); - } - - Some(block) = async { self.firm_blocks.as_mut().unwrap().recv().await }, - if self.firm_blocks.is_some() => + Some(block) = self.firm_blocks.recv() => { debug_span!("conductor::Executor::run_until_stopped").in_scope(||debug!( block.height = %block.sequencer_height(), @@ -292,8 +290,7 @@ impl Executor { } } - Some(block) = async { self.soft_blocks.as_mut().unwrap().recv().await }, - if self.soft_blocks.is_some() && spread_not_too_large => + Some(block) = self.soft_blocks.recv(), if !self.is_spread_too_large() => { debug_span!("conductor::Executor::run_until_stopped").in_scope(||debug!( block.height = %block.height(), @@ -304,53 +301,14 @@ impl Executor { break Err(error).wrap_err("failed executing soft block"); } } - ); - }; - - // XXX: explicitly setting the message (usually implicitly set by tracing) - let message = "shutting down"; - report_exit(reason, message) - } - /// Runs the init logic that needs to happen before [`Executor`] can enter its main loop. - #[instrument(skip_all, err)] - async fn init(&mut self) -> eyre::Result<()> { - self.set_initial_node_state() - .await - .wrap_err("failed setting initial rollup node state")?; + Some((task, res)) = self.reader_tasks.join_next() => { + break handle_task_exit(task, res); + } - let max_spread: usize = self.calculate_max_spread(); - self.max_spread.replace(max_spread); - if let Some(channel) = self.soft_blocks.as_mut() { - channel.set_capacity(max_spread); - info!( - max_spread, - "setting capacity of soft blocks channel to maximum permitted firm<>soft \ - commitment spread (this has no effect if conductor is set to perform soft-sync \ - only)" + else => break Ok("all channels are closed") ); } - - Ok(()) - } - - /// Calculates the maximum allowed spread between firm and soft commitments heights. - /// - /// The maximum allowed spread is taken as `max_spread = variance * 6`, where `variance` - /// is the `celestia_block_variance` as defined in the rollup node's genesis that this - /// executor/conductor talks to. - /// - /// The heuristic 6 is the largest number of Sequencer heights that will be found at - /// one Celestia height. - /// - /// # Panics - /// Panics if the `u32` underlying the celestia block variance tracked in the state could - /// not be converted to a `usize`. This should never happen on any reasonable architecture - /// that Conductor will run on. - fn calculate_max_spread(&self) -> usize { - usize::try_from(self.state.celestia_block_variance()) - .expect("converting a u32 to usize should work on any architecture conductor runs on") - .saturating_mul(6) } /// Returns if the spread between firm and soft commitment heights in the tracked state is too @@ -362,7 +320,7 @@ impl Executor { /// /// Panics if called before [`Executor::init`] because `max_spread` must be set. fn is_spread_too_large(&self) -> bool { - if self.firm_blocks.is_none() { + if !self.config.is_with_firm() { return false; } let (next_firm, next_soft) = { @@ -372,12 +330,7 @@ impl Executor { }; let is_too_far_ahead = usize::try_from(next_soft.saturating_sub(next_firm)) - .map(|spread| { - spread - >= self - .max_spread - .expect("executor must be initalized and this field set") - }) + .map(|spread| spread >= self.state.calculate_max_spread()) .unwrap_or(false); if is_too_far_ahead { @@ -568,43 +521,6 @@ impl Executor { Ok(executed_block) } - #[instrument(skip_all, err)] - async fn set_initial_node_state(&mut self) -> eyre::Result<()> { - let genesis_info = { - async { - self.client - .clone() - .get_genesis_info_with_retry() - .await - .wrap_err("failed getting genesis info") - } - }; - let commitment_state = { - async { - self.client - .clone() - .get_commitment_state_with_retry() - .await - .wrap_err("failed getting commitment state") - } - }; - let (genesis_info, commitment_state) = tokio::try_join!(genesis_info, commitment_state)?; - self.state - .try_init(genesis_info, commitment_state) - .wrap_err("failed initializing state tracking")?; - - self.metrics - .absolute_set_executed_firm_block_number(self.state.firm_number()); - self.metrics - .absolute_set_executed_soft_block_number(self.state.soft_number()); - info!( - initial_state = serde_json::to_string(&*self.state.get()) - .expect("writing json to a string should not fail"), - "received genesis info from rollup", - ); - Ok(()) - } - #[instrument(skip_all, err)] async fn update_commitment_state(&mut self, update: Update) -> eyre::Result<()> { use Update::{ @@ -662,13 +578,46 @@ impl Executor { should_execute_firm_block( self.state.next_expected_firm_sequencer_height().value(), self.state.next_expected_soft_sequencer_height().value(), - self.mode, + self.config.execution_commit_level, ) } + + #[instrument(skip_all, err, ret)] + async fn shutdown(mut self, reason: eyre::Result<&'static str>) -> eyre::Result<()> { + info!("signaling all reader tasks to exit"); + self.reader_cancellation_token.cancel(); + while let Some((task, exit_status)) = self.reader_tasks.join_next().await { + match crate::utils::flatten(exit_status) { + Ok(()) => info!(task, "task exited"), + Err(error) => warn!(task, %error, "task exited"), + } + } + report_exit(reason, "shutting down") + } +} + +/// Wraps a task result to explain why it exited. +/// +/// Right now only the err-branch is populated because tasks should +/// never exit. Still returns an `eyre::Result` to line up with the +/// return type of [`Executor::run_until_stopped`]. +/// +/// Executor should `break handle_task_exit` immediately after calling +/// this method. +fn handle_task_exit( + task: &'static str, + res: Result, JoinError>, +) -> eyre::Result<&'static str> { + match res { + Ok(Ok(())) => Err(eyre!("task `{task}` finished unexpectedly")), + Ok(Err(err)) => Err(err).wrap_err_with(|| format!("task `{task}` exited with error")), + Err(err) => Err(err).wrap_err_with(|| format!("task `{task}` panicked")), + } } #[instrument(skip_all)] fn report_exit(reason: eyre::Result<&str>, message: &str) -> eyre::Result<()> { + // XXX: explicitly setting the message (usually implicitly set by tracing) match reason { Ok(reason) => { info!(%reason, message); diff --git a/crates/astria-conductor/src/executor/state.rs b/crates/astria-conductor/src/executor/state.rs index 2ae4e2c4b2..1f315b078b 100644 --- a/crates/astria-conductor/src/executor/state.rs +++ b/crates/astria-conductor/src/executor/state.rs @@ -10,10 +10,6 @@ use astria_core::{ }, primitive::v1::RollupId, }; -use astria_eyre::{ - eyre, - eyre::WrapErr as _, -}; use bytes::Bytes; use sequencer_client::tendermint::block::Height as SequencerHeight; use tokio::sync::watch::{ @@ -22,8 +18,8 @@ use tokio::sync::watch::{ }; use tracing::instrument; -pub(super) fn channel() -> (StateSender, StateReceiver) { - let (tx, rx) = watch::channel(None); +pub(super) fn channel(state: State) -> (StateSender, StateReceiver) { + let (tx, rx) = watch::channel(state); let sender = StateSender { inner: tx, }; @@ -46,25 +42,14 @@ pub(super) struct InvalidState { } #[derive(Clone, Debug)] -pub(super) struct StateReceiver { - inner: watch::Receiver>, +pub(crate) struct StateReceiver { + inner: watch::Receiver, } impl StateReceiver { - #[instrument(skip_all, err)] - pub(super) async fn wait_for_init(&mut self) -> eyre::Result<()> { - self.inner - .wait_for(Option::is_some) - .await - .wrap_err("channel failed while waiting for state to become initialized")?; - Ok(()) - } - - pub(super) fn next_expected_firm_sequencer_height(&self) -> SequencerHeight { + pub(crate) fn next_expected_firm_sequencer_height(&self) -> SequencerHeight { self.inner .borrow() - .as_ref() - .expect("the state is initialized") .next_expected_firm_sequencer_height() .expect( "the tracked state must never be set to a genesis/commitment state that cannot be \ @@ -72,11 +57,9 @@ impl StateReceiver { ) } - pub(super) fn next_expected_soft_sequencer_height(&self) -> SequencerHeight { + pub(crate) fn next_expected_soft_sequencer_height(&self) -> SequencerHeight { self.inner .borrow() - .as_ref() - .expect("the state is initialized") .next_expected_soft_sequencer_height() .expect( "the tracked state must never be set to a genesis/commitment state that cannot be \ @@ -94,11 +77,11 @@ impl StateReceiver { } pub(super) struct StateSender { - inner: watch::Sender>, + inner: watch::Sender, } fn can_map_firm_to_sequencer_height( - genesis_info: GenesisInfo, + genesis_info: &GenesisInfo, commitment_state: &CommitmentState, ) -> Result<(), InvalidState> { let sequencer_genesis_height = genesis_info.sequencer_genesis_block_height(); @@ -115,7 +98,7 @@ fn can_map_firm_to_sequencer_height( } fn can_map_soft_to_sequencer_height( - genesis_info: GenesisInfo, + genesis_info: &GenesisInfo, commitment_state: &CommitmentState, ) -> Result<(), InvalidState> { let sequencer_genesis_height = genesis_info.sequencer_genesis_block_height(); @@ -132,21 +115,29 @@ fn can_map_soft_to_sequencer_height( } impl StateSender { - pub(super) fn try_init( - &mut self, - genesis_info: GenesisInfo, - commitment_state: CommitmentState, - ) -> Result<(), InvalidState> { - can_map_firm_to_sequencer_height(genesis_info, &commitment_state)?; - can_map_soft_to_sequencer_height(genesis_info, &commitment_state)?; - self.inner.send_modify(move |state| { - let old_state = state.replace(State::new(genesis_info, commitment_state)); - assert!( - old_state.is_none(), - "the state must be initialized only once", - ); - }); - Ok(()) + pub(super) fn subscribe(&self) -> StateReceiver { + StateReceiver { + inner: self.inner.subscribe(), + } + } + + /// Calculates the maximum allowed spread between firm and soft commitments heights. + /// + /// The maximum allowed spread is taken as `max_spread = variance * 6`, where `variance` + /// is the `celestia_block_variance` as defined in the rollup node's genesis that this + /// executor/conductor talks to. + /// + /// The heuristic 6 is the largest number of Sequencer heights that will be found at + /// one Celestia height. + /// + /// # Panics + /// Panics if the `u32` underlying the celestia block variance tracked in the state could + /// not be converted to a `usize`. This should never happen on any reasonable architecture + /// that Conductor will run on. + pub(super) fn calculate_max_spread(&self) -> usize { + usize::try_from(self.celestia_block_variance()) + .expect("converting a u32 to usize should work on any architecture conductor runs on") + .saturating_mul(6) } pub(super) fn try_update_commitment_state( @@ -154,26 +145,21 @@ impl StateSender { commitment_state: CommitmentState, ) -> Result<(), InvalidState> { let genesis_info = self.genesis_info(); - can_map_firm_to_sequencer_height(genesis_info, &commitment_state)?; - can_map_soft_to_sequencer_height(genesis_info, &commitment_state)?; + can_map_firm_to_sequencer_height(&genesis_info, &commitment_state)?; + can_map_soft_to_sequencer_height(&genesis_info, &commitment_state)?; self.inner.send_modify(move |state| { - state - .as_mut() - .expect("the state must be initialized") - .set_commitment_state(commitment_state); + state.set_commitment_state(commitment_state); }); Ok(()) } - pub(super) fn get(&self) -> tokio::sync::watch::Ref<'_, Option> { + pub(super) fn get(&self) -> tokio::sync::watch::Ref<'_, State> { self.inner.borrow() } pub(super) fn next_expected_firm_sequencer_height(&self) -> SequencerHeight { self.inner .borrow() - .as_ref() - .expect("the state is initialized") .next_expected_firm_sequencer_height() .expect( "the tracked state must never be set to a genesis/commitment state that cannot be \ @@ -184,8 +170,6 @@ impl StateSender { pub(super) fn next_expected_soft_sequencer_height(&self) -> SequencerHeight { self.inner .borrow() - .as_ref() - .expect("the state is initialized") .next_expected_soft_sequencer_height() .expect( "the tracked state must never be set to a genesis/commitment state that cannot be \ @@ -198,11 +182,9 @@ macro_rules! forward_impls { ($target:ident: $([$fn:ident -> $ret:ty]),*$(,)?) => { impl $target { $( - pub(super) fn $fn(&self) -> $ret { + pub(crate) fn $fn(&self) -> $ret { self.inner .borrow() - .as_ref() - .expect("the state is initialized") .$fn() .clone() } @@ -241,11 +223,16 @@ pub(super) struct State { } impl State { - fn new(genesis_info: GenesisInfo, commitment_state: CommitmentState) -> Self { - Self { + pub(super) fn try_from_genesis_info_and_commitment_state( + genesis_info: GenesisInfo, + commitment_state: CommitmentState, + ) -> Result { + can_map_firm_to_sequencer_height(&genesis_info, &commitment_state)?; + can_map_soft_to_sequencer_height(&genesis_info, &commitment_state)?; + Ok(State { commitment_state, genesis_info, - } + }) } /// Sets the inner commitment state. @@ -390,16 +377,21 @@ mod tests { .unwrap() } - fn make_state() -> (StateSender, StateReceiver) { - let (mut tx, rx) = super::channel(); - tx.try_init(make_genesis_info(), make_commitment_state()) - .unwrap(); - (tx, rx) + fn make_state() -> State { + State::try_from_genesis_info_and_commitment_state( + make_genesis_info(), + make_commitment_state(), + ) + .unwrap() + } + + fn make_channel() -> (StateSender, StateReceiver) { + super::channel(make_state()) } #[test] fn next_firm_sequencer_height_is_correct() { - let (_, rx) = make_state(); + let (_, rx) = make_channel(); assert_eq!( SequencerHeight::from(12u32), rx.next_expected_firm_sequencer_height(), @@ -408,7 +400,7 @@ mod tests { #[test] fn next_soft_sequencer_height_is_correct() { - let (_, rx) = make_state(); + let (_, rx) = make_channel(); assert_eq!( SequencerHeight::from(13u32), rx.next_expected_soft_sequencer_height(), diff --git a/crates/astria-conductor/src/executor/tests.rs b/crates/astria-conductor/src/executor/tests.rs index e8ead9dc71..a5206cb141 100644 --- a/crates/astria-conductor/src/executor/tests.rs +++ b/crates/astria-conductor/src/executor/tests.rs @@ -13,6 +13,7 @@ use bytes::Bytes; use super::{ should_execute_firm_block, state::{ + State, StateReceiver, StateSender, }, @@ -57,9 +58,9 @@ fn make_state( base_celestia_height: 1, }) .unwrap(); - let (mut tx, rx) = super::state::channel(); - tx.try_init(genesis_info, commitment_state).unwrap(); - (tx, rx) + let state = + State::try_from_genesis_info_and_commitment_state(genesis_info, commitment_state).unwrap(); + super::state::channel(state) } #[track_caller] diff --git a/crates/astria-conductor/src/sequencer/builder.rs b/crates/astria-conductor/src/sequencer/builder.rs index c71aa0e7d8..a95b98e13a 100644 --- a/crates/astria-conductor/src/sequencer/builder.rs +++ b/crates/astria-conductor/src/sequencer/builder.rs @@ -1,31 +1,36 @@ use std::time::Duration; +use astria_core::sequencerblock::v1::block::FilteredSequencerBlock; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use super::SequencerGrpcClient; -use crate::executor; +use crate::executor::StateReceiver; pub(crate) struct Builder { - pub(crate) executor: executor::Handle, pub(crate) sequencer_grpc_client: SequencerGrpcClient, pub(crate) sequencer_cometbft_client: sequencer_client::HttpClient, pub(crate) sequencer_block_time: Duration, pub(crate) expected_sequencer_chain_id: String, pub(crate) shutdown: CancellationToken, + pub(crate) rollup_state: StateReceiver, + pub(crate) soft_blocks: mpsc::Sender, } impl Builder { pub(crate) fn build(self) -> super::Reader { let Self { - executor, sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, expected_sequencer_chain_id, shutdown, + rollup_state, + soft_blocks, } = self; super::Reader { - executor, + rollup_state, + soft_blocks, sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, diff --git a/crates/astria-conductor/src/sequencer/mod.rs b/crates/astria-conductor/src/sequencer/mod.rs index df9d11b5a1..00caf1a450 100644 --- a/crates/astria-conductor/src/sequencer/mod.rs +++ b/crates/astria-conductor/src/sequencer/mod.rs @@ -25,7 +25,10 @@ use sequencer_client::{ LatestHeightStream, StreamLatestHeight as _, }; -use tokio::select; +use tokio::{ + select, + sync::mpsc, +}; use tokio_util::sync::CancellationToken; use tracing::{ debug, @@ -40,12 +43,6 @@ use tracing::{ use crate::{ block_cache::BlockCache, - executor::{ - self, - SoftSendError, - SoftTrySendError, - StateIsInit, - }, sequencer::block_stream::BlocksFromHeightStream, }; @@ -56,16 +53,15 @@ mod reporting; pub(crate) use builder::Builder; pub(crate) use client::SequencerGrpcClient; +use crate::executor::StateReceiver; + /// [`Reader`] reads Sequencer blocks and forwards them to the [`crate::Executor`] task. /// /// The blocks are forwarded in strictly sequential order of their Sequencr heights. /// A [`Reader`] is created with [`Builder::build`] and run with [`Reader::run_until_stopped`]. pub(crate) struct Reader { - /// The handle for sending sequencer blocks as soft commits to the executor - /// and checking it for the next expected height, and rollup ID associated with - /// this instance of Conductor. - /// Must be initialized before it can be used. - executor: executor::Handle, + rollup_state: StateReceiver, + soft_blocks: mpsc::Sender, /// The gRPC client to fetch new blocks from the Sequencer network. sequencer_grpc_client: SequencerGrpcClient, @@ -87,22 +83,22 @@ pub(crate) struct Reader { impl Reader { pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { - let executor = select!( + let () = select!( () = self.shutdown.clone().cancelled_owned() => { return report_exit(Ok("received shutdown signal while waiting for Sequencer reader task to initialize"), ""); } res = self.initialize() => { - res? + res?; } ); - RunningReader::try_from_parts(self, executor) + RunningReader::try_from_parts(self) .wrap_err("failed entering run loop")? .run_until_stopped() .await } #[instrument(skip_all, err)] - async fn initialize(&mut self) -> eyre::Result> { + async fn initialize(&mut self) -> eyre::Result<()> { let actual_sequencer_chain_id = get_sequencer_chain_id(self.sequencer_cometbft_client.clone()) .await @@ -113,20 +109,13 @@ impl Reader { "expected chain id `{expected_sequencer_chain_id}` does not match actual: \ `{actual_sequencer_chain_id}`" ); - - self.executor - .wait_for_init() - .await - .wrap_err("handle to executor failed while waiting for it being initialized") + Ok(()) } } struct RunningReader { - /// The initialized handle to the executor task. - /// Used for sending sequencer blocks as soft commits to the executor - /// and checking it for the next expected height, and rollup ID associated with - /// this instance of Conductor. - executor: executor::Handle, + rollup_state: StateReceiver, + soft_blocks: mpsc::Sender, /// Caches the filtered sequencer blocks retrieved from the Sequencer. /// This cache will yield a block if it contains a block that matches the @@ -143,26 +132,26 @@ struct RunningReader { /// An enqueued block waiting for executor to free up. Set if the executor exhibits /// backpressure. - enqueued_block: Fuse>>, + enqueued_block: + Fuse>>>, /// Token to listen for Conductor being shut down. shutdown: CancellationToken, } impl RunningReader { - fn try_from_parts( - reader: Reader, - mut executor: executor::Handle, - ) -> eyre::Result { + fn try_from_parts(reader: Reader) -> eyre::Result { let Reader { sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, shutdown, + rollup_state, + soft_blocks, .. } = reader; - let next_expected_height = executor.next_expected_soft_sequencer_height(); + let next_expected_height = rollup_state.next_expected_soft_sequencer_height(); let latest_height_stream = sequencer_cometbft_client.stream_latest_height(sequencer_block_time); @@ -171,14 +160,15 @@ impl RunningReader { .wrap_err("failed constructing sequential block cache")?; let blocks_from_heights = BlocksFromHeightStream::new( - executor.rollup_id(), + rollup_state.rollup_id(), next_expected_height, sequencer_grpc_client, ); let enqueued_block: Fuse>> = future::Fuse::terminated(); Ok(RunningReader { - executor, + rollup_state, + soft_blocks, block_cache, latest_height_stream, blocks_from_heights, @@ -215,7 +205,7 @@ impl RunningReader { } // Skip heights that executor has already executed (e.g. firm blocks from Celestia) - Ok(next_height) = self.executor.next_expected_soft_height_if_changed() => { + Ok(next_height) = self.rollup_state.next_expected_soft_height_if_changed() => { self.update_next_expected_height(next_height); } @@ -267,34 +257,18 @@ impl RunningReader { /// Enqueues the block is the channel to the executor is full, sending it once /// it frees up. fn send_to_executor(&mut self, block: FilteredSequencerBlock) -> eyre::Result<()> { - if let Err(err) = self.executor.try_send_soft_block(block) { + if let Err(err) = self.soft_blocks.try_send(block) { match err { - SoftTrySendError::Channel { - source, - } => match *source { - executor::channel::TrySendError::Closed(_) => { - bail!("could not send block to executor because its channel was closed"); - } - - executor::channel::TrySendError::NoPermits(block) => { - trace!( - "executor channel is full; scheduling block and stopping block fetch \ - until a slot opens up" - ); - self.enqueued_block = self - .executor - .clone() - .send_soft_block_owned(block) - .boxed() - .fuse(); - } - }, - - SoftTrySendError::NotSet => { - bail!( - "conductor was configured without soft commitments; the sequencer reader \ - task should have never been started", + mpsc::error::TrySendError::Full(block) => { + trace!( + "executor channel is full; scheduling block and stopping block fetch \ + until a slot opens up" ); + let chan = self.soft_blocks.clone(); + self.enqueued_block = async move { chan.send(block).await }.boxed().fuse(); + } + mpsc::error::TrySendError::Closed(_) => { + bail!("could not send block to executor because its channel was closed") } } } diff --git a/crates/astria-conductor/tests/blackbox/firm_only.rs b/crates/astria-conductor/tests/blackbox/firm_only.rs index e634292a1c..f08271ce43 100644 --- a/crates/astria-conductor/tests/blackbox/firm_only.rs +++ b/crates/astria-conductor/tests/blackbox/firm_only.rs @@ -125,7 +125,7 @@ async fn simple() { .await .expect( "conductor should have executed the firm block and updated the firm commitment state \ - within 1000ms", + within 2000ms", ); }