diff --git a/crates/astria-conductor/src/celestia/builder.rs b/crates/astria-conductor/src/celestia/builder.rs index ef33a28cbd..eb03d9440c 100644 --- a/crates/astria-conductor/src/celestia/builder.rs +++ b/crates/astria-conductor/src/celestia/builder.rs @@ -10,9 +10,12 @@ use jsonrpsee::http_client::HttpClient as CelestiaClient; use tendermint_rpc::HttpClient as SequencerClient; use tokio_util::sync::CancellationToken; -use super::Reader; +use super::{ + Reader, + ReconstructedBlock, +}; use crate::{ - executor, + executor::StateReceiver, metrics::Metrics, }; @@ -20,7 +23,8 @@ pub(crate) struct Builder { pub(crate) celestia_block_time: Duration, pub(crate) celestia_http_endpoint: String, pub(crate) celestia_token: Option, - pub(crate) executor: executor::Handle, + pub(crate) firm_blocks: tokio::sync::mpsc::Sender>, + pub(crate) rollup_state: StateReceiver, pub(crate) sequencer_cometbft_client: SequencerClient, pub(crate) sequencer_requests_per_second: u32, pub(crate) expected_celestia_chain_id: String, @@ -36,13 +40,14 @@ impl Builder { celestia_block_time, celestia_http_endpoint, celestia_token, - executor, sequencer_cometbft_client, sequencer_requests_per_second, expected_celestia_chain_id, expected_sequencer_chain_id, shutdown, metrics, + firm_blocks, + rollup_state, } = self; let celestia_client = create_celestia_client(celestia_http_endpoint, celestia_token) @@ -51,7 +56,8 @@ impl Builder { Ok(Reader { celestia_block_time, celestia_client, - executor, + firm_blocks, + rollup_state, sequencer_cometbft_client, sequencer_requests_per_second, expected_celestia_chain_id, diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 9acfc2c50f..07b1633b96 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -58,15 +58,11 @@ use tracing::{ trace, trace_span, warn, + Instrument as _, }; use crate::{ block_cache::GetSequencerHeight, - executor::{ - FirmSendError, - FirmTrySendError, - StateIsInit, - }, metrics::Metrics, utils::flatten, }; @@ -95,10 +91,7 @@ use self::{ BlobVerifier, }, }; -use crate::{ - block_cache::BlockCache, - executor, -}; +use crate::block_cache::BlockCache; /// Sequencer Block information reconstructed from Celestia blobs. /// @@ -138,8 +131,11 @@ pub(crate) struct Reader { /// Client to fetch heights and blocks from Celestia. celestia_client: CelestiaClient, - /// The channel used to send messages to the executor task. - executor: executor::Handle, + /// The channel to forward firm blocks to the executor. + firm_blocks: mpsc::Sender>, + + /// The channel to read updates of the rollup state from. + rollup_state: crate::executor::StateReceiver, /// The client to get the sequencer namespace and verify blocks. sequencer_cometbft_client: SequencerClient, @@ -162,7 +158,7 @@ pub(crate) struct Reader { impl Reader { pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { - let (executor, sequencer_chain_id) = select!( + let sequencer_chain_id = select!( () = self.shutdown.clone().cancelled_owned() => { info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(|| info!("received shutdown signal while waiting for Celestia reader task to initialize") @@ -175,16 +171,14 @@ impl Reader { } ); - RunningReader::from_parts(self, executor, sequencer_chain_id) + RunningReader::from_parts(self, sequencer_chain_id) .wrap_err("failed entering run loop")? .run_until_stopped() .await } #[instrument(skip_all, err)] - async fn initialize( - &mut self, - ) -> eyre::Result<(executor::Handle, tendermint::chain::Id)> { + async fn initialize(&mut self) -> eyre::Result { let validate_celestia_chain_id = async { let actual_celestia_chain_id = get_celestia_chain_id(&self.celestia_client) .await @@ -196,14 +190,8 @@ impl Reader { `{actual_celestia_chain_id}`" ); Ok(()) - }; - - let wait_for_init_executor = async { - self.executor - .wait_for_init() - .await - .wrap_err("handle to executor failed while waiting for it being initialized") - }; + } + .in_current_span(); let get_and_validate_sequencer_chain_id = async { let actual_sequencer_chain_id = @@ -217,18 +205,18 @@ impl Reader { actual: `{actual_sequencer_chain_id}`" ); Ok(actual_sequencer_chain_id) - }; + } + .in_current_span(); try_join!( validate_celestia_chain_id, - wait_for_init_executor, get_and_validate_sequencer_chain_id ) - .map(|((), executor_init, sequencer_chain_id)| (executor_init, sequencer_chain_id)) + .map(|((), sequencer_chain_id)| sequencer_chain_id) } } -#[instrument(skip_all, err)] +#[instrument(skip_all, err, ret(Display))] async fn get_celestia_chain_id( celestia_client: &CelestiaClient, ) -> eyre::Result { @@ -263,8 +251,11 @@ struct RunningReader { // Client to fetch heights and blocks from Celestia. celestia_client: CelestiaClient, - /// The channel used to send messages to the executor task. - executor: executor::Handle, + /// The channel to forward firm blocks to the executor. + firm_blocks: mpsc::Sender>, + + /// The channel to read updates of the rollup state from. + rollup_state: crate::executor::StateReceiver, /// Token to listen for Conductor being shut down. shutdown: CancellationToken, @@ -280,7 +271,8 @@ struct RunningReader { /// capacity again. Used as a back pressure mechanism so that this task does not fetch more /// blobs if there is no capacity in the executor to execute them against the rollup in /// time. - enqueued_block: Fuse>>, + enqueued_block: + Fuse>>>>, /// The latest observed head height of the Celestia network. Set by values read from /// the `latest_height` stream. @@ -323,7 +315,6 @@ struct RunningReader { impl RunningReader { fn from_parts( exposed_reader: Reader, - mut executor: executor::Handle, sequencer_chain_id: tendermint::chain::Id, ) -> eyre::Result { let Reader { @@ -333,21 +324,23 @@ impl RunningReader { shutdown, sequencer_requests_per_second, metrics, + firm_blocks, + rollup_state, .. } = exposed_reader; let block_cache = - BlockCache::with_next_height(executor.next_expected_firm_sequencer_height()) + BlockCache::with_next_height(rollup_state.next_expected_firm_sequencer_height()) .wrap_err("failed constructing sequential block cache")?; let latest_heights = stream_latest_heights(celestia_client.clone(), celestia_block_time); - let rollup_id = executor.rollup_id(); + let rollup_id = rollup_state.rollup_id(); let rollup_namespace = astria_core::celestia::namespace_v0_from_rollup_id(rollup_id); let sequencer_namespace = astria_core::celestia::namespace_v0_from_sha256_of_bytes(sequencer_chain_id.as_bytes()); - let celestia_next_height = executor.celestia_base_block_height(); - let celestia_reference_height = executor.celestia_base_block_height(); - let celestia_variance = executor.celestia_block_variance(); + let celestia_next_height = rollup_state.celestia_base_block_height(); + let celestia_reference_height = rollup_state.celestia_base_block_height(); + let celestia_variance = rollup_state.celestia_block_variance(); Ok(Self { block_cache, @@ -357,7 +350,8 @@ impl RunningReader { ), celestia_client, enqueued_block: Fuse::terminated(), - executor, + firm_blocks, + rollup_state, latest_heights, shutdown, reconstruction_tasks: JoinMap::new(), @@ -498,7 +492,7 @@ impl RunningReader { rollup_id: self.rollup_id, rollup_namespace: self.rollup_namespace, sequencer_namespace: self.sequencer_namespace, - executor: self.executor.clone(), + rollup_state: self.rollup_state.clone(), metrics: self.metrics, }; self.reconstruction_tasks.spawn(height, task.execute()); @@ -520,28 +514,20 @@ impl RunningReader { #[instrument(skip_all)] fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> { let celestia_height = block.celestia_height; - match self.executor.try_send_firm_block(block) { + match self.firm_blocks.try_send(block.into()) { Ok(()) => self.advance_reference_celestia_height(celestia_height), - Err(FirmTrySendError::Channel { - source, - }) => match source { - mpsc::error::TrySendError::Full(block) => { - trace!( - "executor channel is full; rescheduling block fetch until the channel \ - opens up" - ); - self.enqueued_block = - enqueue_block(self.executor.clone(), block).boxed().fuse(); - } - mpsc::error::TrySendError::Closed(_) => { - bail!("exiting because executor channel is closed"); - } - }, - Err(FirmTrySendError::NotSet) => bail!( - "exiting because executor was configured without firm commitments; this Celestia \ - reader should have never been started" - ), - } + Err(mpsc::error::TrySendError::Full(block)) => { + trace!( + "executor channel is full; rescheduling block fetch until the channel opens up" + ); + self.enqueued_block = enqueue_block(self.firm_blocks.clone(), block) + .boxed() + .fuse(); + } + Err(mpsc::error::TrySendError::Closed(_)) => { + bail!("exiting because executor channel is closed"); + } + }; Ok(()) } @@ -574,7 +560,7 @@ struct FetchConvertVerifyAndReconstruct { rollup_id: RollupId, rollup_namespace: Namespace, sequencer_namespace: Namespace, - executor: executor::Handle, + rollup_state: crate::executor::StateReceiver, metrics: &'static Metrics, } @@ -593,7 +579,7 @@ impl FetchConvertVerifyAndReconstruct { rollup_id, rollup_namespace, sequencer_namespace, - executor, + rollup_state, metrics, } = self; @@ -633,7 +619,7 @@ impl FetchConvertVerifyAndReconstruct { "decoded Sequencer header and rollup info from raw Celestia blobs", ); - let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, executor).await; + let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, rollup_state).await; metrics.record_sequencer_blocks_metadata_verified_per_celestia_fetch( verified_blobs.len_header_blobs(), @@ -671,15 +657,15 @@ impl FetchConvertVerifyAndReconstruct { #[instrument(skip_all, err)] async fn enqueue_block( - executor: executor::Handle, + firm_blocks_tx: mpsc::Sender>, block: Box, -) -> Result { +) -> Result>> { let celestia_height = block.celestia_height; - executor.send_firm_block(block).await?; + firm_blocks_tx.send(block).await?; Ok(celestia_height) } -#[instrument(skip_all, err)] +#[instrument(skip_all, err, ret(Display))] async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result { use sequencer_client::Client as _; diff --git a/crates/astria-conductor/src/celestia/verify.rs b/crates/astria-conductor/src/celestia/verify.rs index a4415468b6..d257a3b5c1 100644 --- a/crates/astria-conductor/src/celestia/verify.rs +++ b/crates/astria-conductor/src/celestia/verify.rs @@ -51,10 +51,6 @@ use super::{ block_verifier, convert::ConvertedBlobs, }; -use crate::executor::{ - self, - StateIsInit, -}; pub(super) struct VerifiedBlobs { celestia_height: u64, @@ -99,7 +95,7 @@ struct VerificationTaskKey { pub(super) async fn verify_metadata( blob_verifier: Arc, converted_blobs: ConvertedBlobs, - mut executor: executor::Handle, + rollup_state: crate::executor::StateReceiver, ) -> VerifiedBlobs { let (celestia_height, header_blobs, rollup_blobs) = converted_blobs.into_parts(); @@ -107,7 +103,7 @@ pub(super) async fn verify_metadata( let mut verified_header_blobs = HashMap::with_capacity(header_blobs.len()); let next_expected_firm_sequencer_height = - executor.next_expected_firm_sequencer_height().value(); + rollup_state.next_expected_firm_sequencer_height().value(); for (index, blob) in header_blobs.into_iter().enumerate() { if blob.height().value() < next_expected_firm_sequencer_height { diff --git a/crates/astria-conductor/src/conductor/inner.rs b/crates/astria-conductor/src/conductor/inner.rs index b6dc9d3eb9..2f0056268b 100644 --- a/crates/astria-conductor/src/conductor/inner.rs +++ b/crates/astria-conductor/src/conductor/inner.rs @@ -6,20 +6,16 @@ use std::{ use astria_eyre::eyre::{ self, eyre, - Result, + Report, WrapErr as _, }; -use itertools::Itertools as _; use pin_project_lite::pin_project; -use sequencer_client::HttpClient; use tokio::{ select, + task::JoinHandle, time::timeout, }; -use tokio_util::{ - sync::CancellationToken, - task::JoinMap, -}; +use tokio_util::sync::CancellationToken; use tracing::{ error, info, @@ -29,16 +25,14 @@ use tracing::{ }; use crate::{ - celestia, executor, - sequencer, - utils::flatten, Config, Metrics, }; /// Exit value of the inner conductor impl to signal to the outer task whether to restart or /// shutdown +#[derive(Debug)] pub(super) enum RestartOrShutdown { Restart, Shutdown, @@ -54,24 +48,18 @@ impl std::fmt::Display for RestartOrShutdown { } } -enum ExitReason { - ShutdownSignal, - TaskFailed { - name: &'static str, - error: eyre::Report, - }, -} +struct ShutdownSignalReceived; pin_project! { /// A handle returned by [`ConductorInner::spawn`]. pub(super) struct InnerHandle { shutdown_token: CancellationToken, - task: Option>>, + task: Option>>, } } impl Future for InnerHandle { - type Output = Result, tokio::task::JoinError>; + type Output = Result, tokio::task::JoinError>; fn poll( self: std::pin::Pin<&mut Self>, @@ -91,15 +79,10 @@ pub(super) struct ConductorInner { /// Token to signal to all tasks to shut down gracefully. shutdown_token: CancellationToken, - /// The different long-running tasks that make up the conductor; - tasks: JoinMap<&'static str, eyre::Result<()>>, + executor: Option>>, } impl ConductorInner { - const CELESTIA: &'static str = "celestia"; - const EXECUTOR: &'static str = "executor"; - const SEQUENCER: &'static str = "sequencer"; - /// Create a new [`ConductorInner`] from a [`Config`]. /// /// # Errors @@ -107,78 +90,21 @@ impl ConductorInner { /// actors could not be spawned (executor, sequencer reader, or data availability reader). /// This usually happens if the actors failed to connect to their respective endpoints. fn new( - cfg: Config, + config: Config, metrics: &'static Metrics, shutdown_token: CancellationToken, ) -> eyre::Result { - let mut tasks = JoinMap::new(); - - let sequencer_cometbft_client = HttpClient::new(&*cfg.sequencer_cometbft_url) - .wrap_err("failed constructing sequencer cometbft RPC client")?; - - // Spawn the executor task. - let executor_handle = { - let (executor, handle) = executor::Builder { - mode: cfg.execution_commit_level, - rollup_address: cfg.execution_rpc_url, - shutdown: shutdown_token.clone(), - metrics, - } - .build() - .wrap_err("failed constructing executor")?; - - tasks.spawn(Self::EXECUTOR, executor.run_until_stopped()); - handle - }; - - if cfg.execution_commit_level.is_with_soft() { - let sequencer_grpc_client = - sequencer::SequencerGrpcClient::new(&cfg.sequencer_grpc_url) - .wrap_err("failed constructing grpc client for Sequencer")?; - - // The `sync_start_block_height` represents the height of the next - // sequencer block that can be executed on top of the rollup state. - // This value is derived by the Executor. - let sequencer_reader = sequencer::Builder { - sequencer_grpc_client, - sequencer_cometbft_client: sequencer_cometbft_client.clone(), - sequencer_block_time: Duration::from_millis(cfg.sequencer_block_time_ms), - expected_sequencer_chain_id: cfg.expected_sequencer_chain_id.clone(), - shutdown: shutdown_token.clone(), - executor: executor_handle.clone(), - } - .build(); - tasks.spawn(Self::SEQUENCER, sequencer_reader.run_until_stopped()); + let executor = executor::Builder { + config, + shutdown: shutdown_token.clone(), + metrics, } - - if cfg.execution_commit_level.is_with_firm() { - let celestia_token = if cfg.no_celestia_auth { - None - } else { - Some(cfg.celestia_bearer_token) - }; - - let reader = celestia::Builder { - celestia_http_endpoint: cfg.celestia_node_http_url, - celestia_token, - celestia_block_time: Duration::from_millis(cfg.celestia_block_time_ms), - executor: executor_handle.clone(), - sequencer_cometbft_client: sequencer_cometbft_client.clone(), - sequencer_requests_per_second: cfg.sequencer_requests_per_second, - expected_celestia_chain_id: cfg.expected_celestia_chain_id, - expected_sequencer_chain_id: cfg.expected_sequencer_chain_id, - shutdown: shutdown_token.clone(), - metrics, - } - .build() - .wrap_err("failed to build Celestia Reader")?; - - tasks.spawn(Self::CELESTIA, reader.run_until_stopped()); - }; + .build() + .wrap_err("failed constructing executor")?; Ok(Self { shutdown_token, - tasks, + executor: Some(tokio::spawn(executor.run_until_stopped())), }) } @@ -186,27 +112,28 @@ impl ConductorInner { /// /// # Panics /// Panics if it could not install a signal handler. - async fn run_until_stopped(mut self) -> Result { + async fn run_until_stopped(mut self) -> eyre::Result { info_span!("Conductor::run_until_stopped").in_scope(|| info!("conductor is running")); let exit_reason = select! { biased; () = self.shutdown_token.cancelled() => { - ExitReason::ShutdownSignal + Ok(ShutdownSignalReceived) }, - Some((name, res)) = self.tasks.join_next() => { - match flatten(res) { - Ok(()) => ExitReason::TaskFailed{name, error: eyre!("task `{name}` exited unexpectedly")}, - Err(err) => ExitReason::TaskFailed{name, error: err.wrap_err(format!("task `{name}` failed"))}, + res = self.executor.as_mut().expect("task must always be set at this point") => { + // XXX: must Option::take the JoinHandle to avoid polling it in the shutdown logic. + self.executor.take(); + match res { + Ok(Ok(())) => Err(eyre!("executor exited unexpectedly")), + Ok(Err(err)) => Err(err.wrap_err("executor exited with error")), + Err(err) => Err(Report::new(err).wrap_err("executor panicked")), } } }; - let message = "initiating shutdown"; - report_exit(&exit_reason, message); - self.shutdown(exit_reason).await + self.restart_or_shutdown(exit_reason).await } /// Creates and spawns a Conductor on the tokio runtime. @@ -232,89 +159,43 @@ impl ConductorInner { /// Waits 25 seconds for all tasks to shut down before aborting them. 25 seconds /// because kubernetes issues SIGKILL 30 seconds after SIGTERM, giving 5 seconds /// to abort the remaining tasks. - #[instrument(skip_all)] - async fn shutdown(mut self, exit_reason: ExitReason) -> Result { + #[instrument(skip_all, err, ret(Display))] + async fn restart_or_shutdown( + mut self, + exit_reason: eyre::Result, + ) -> eyre::Result { self.shutdown_token.cancel(); - let mut restart_or_shutdown = RestartOrShutdown::Shutdown; - - match &exit_reason { - ExitReason::ShutdownSignal => { - info!("received shutdown signal, skipping check for restart"); - } - ExitReason::TaskFailed { - name, - error, - } => { - if check_for_restart(name, error) { - restart_or_shutdown = RestartOrShutdown::Restart; + let restart_or_shutdown = match exit_reason { + Ok(ShutdownSignalReceived) => Ok(RestartOrShutdown::Shutdown), + Err(error) => { + error!(%error, "executor failed; checking error chain if conductor should be restarted"); + if check_for_restart(&error) { + Ok(RestartOrShutdown::Restart) + } else { + Err(error) } } - } - - info!("signalled all tasks to shut down; waiting for 25 seconds to exit"); - - let shutdown_loop = async { - while let Some((name, res)) = self.tasks.join_next().await { - let message = "task shut down"; - match flatten(res) { - Ok(()) => { - info!(name, message); - } - Err(error) => { - if check_for_restart(name, &error) - && !matches!(exit_reason, ExitReason::ShutdownSignal) - { - restart_or_shutdown = RestartOrShutdown::Restart; - } - error!(name, %error, message); - } - }; - } }; - if timeout(Duration::from_secs(25), shutdown_loop) - .await - .is_err() - { - let tasks = self.tasks.keys().join(", "); - warn!( - tasks = format_args!("[{tasks}]"), - "aborting all tasks that have not yet shut down", - ); - self.tasks.abort_all(); - } else { - info!("all tasks shut down regularly"); - } - info!("shutting down"); - - if let ExitReason::TaskFailed { - error, .. - } = exit_reason - { - if matches!(restart_or_shutdown, RestartOrShutdown::Shutdown) { - return Err(error); + if let Some(mut executor) = self.executor.take() { + let wait_until_timeout = Duration::from_secs(25); + if timeout(wait_until_timeout, &mut executor).await.is_err() { + warn!( + "waited `{}` for executor start to respond to shutdown signal; aborting", + humantime::format_duration(wait_until_timeout) + ); + executor.abort(); + } else { + info!("executor shut down regularly"); } } - Ok(restart_or_shutdown) - } -} -#[instrument(skip_all)] -fn report_exit(exit_reason: &ExitReason, message: &str) { - match exit_reason { - ExitReason::ShutdownSignal => info!(reason = "received shutdown signal", message), - ExitReason::TaskFailed { - name: task, - error: reason, - } => error!(%reason, %task, message), + restart_or_shutdown } } #[instrument(skip_all)] -fn check_for_restart(name: &str, err: &eyre::Report) -> bool { - if name != ConductorInner::EXECUTOR { - return false; - } +fn check_for_restart(err: &eyre::Report) -> bool { let mut current = Some(err.as_ref() as &dyn std::error::Error); while let Some(err) = current { if let Some(status) = err.downcast_ref::() { @@ -338,6 +219,6 @@ mod tests { let err = tonic_error.wrap_err("wrapper_1"); let err = err.wrap_err("wrapper_2"); let err = err.wrap_err("wrapper_3"); - assert!(super::check_for_restart("executor", &err.unwrap_err())); + assert!(super::check_for_restart(&err.unwrap_err())); } } diff --git a/crates/astria-conductor/src/config.rs b/crates/astria-conductor/src/config.rs index e8211b1714..cd48c4d1e0 100644 --- a/crates/astria-conductor/src/config.rs +++ b/crates/astria-conductor/src/config.rs @@ -95,6 +95,16 @@ pub struct Config { pub pretty_print: bool, } +impl Config { + pub(crate) fn is_with_firm(&self) -> bool { + self.execution_commit_level.is_with_firm() + } + + pub(crate) fn is_with_soft(&self) -> bool { + self.execution_commit_level.is_with_soft() + } +} + impl config::Config for Config { const PREFIX: &'static str = "ASTRIA_CONDUCTOR_"; } diff --git a/crates/astria-conductor/src/executor/builder.rs b/crates/astria-conductor/src/executor/builder.rs index daf53e5835..1cae5f0089 100644 --- a/crates/astria-conductor/src/executor/builder.rs +++ b/crates/astria-conductor/src/executor/builder.rs @@ -1,86 +1,40 @@ -use std::collections::HashMap; - use astria_eyre::eyre::{ self, WrapErr as _, }; -use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; -use super::{ - state, - Executor, - Handle, - ReconstructedBlock, - StateNotInit, -}; -use crate::{ - config::CommitLevel, - metrics::Metrics, -}; +use super::Executor; +use crate::metrics::Metrics; pub(crate) struct Builder { - pub(crate) mode: CommitLevel, - pub(crate) rollup_address: String, + pub(crate) config: crate::Config, pub(crate) shutdown: CancellationToken, pub(crate) metrics: &'static Metrics, } impl Builder { - pub(crate) fn build(self) -> eyre::Result<(Executor, Handle)> { + pub(crate) fn build(self) -> eyre::Result { let Self { - mode, - rollup_address, + config, shutdown, metrics, } = self; - let client = super::client::Client::connect_lazy(&rollup_address).wrap_err_with(|| { - format!( - "failed to construct execution client for provided rollup address \ - `{rollup_address}`" - ) - })?; - - let mut firm_block_tx = None; - let mut firm_block_rx = None; - if mode.is_with_firm() { - let (tx, rx) = mpsc::channel::>(16); - firm_block_tx = Some(tx); - firm_block_rx = Some(rx); - } - - let mut soft_block_tx = None; - let mut soft_block_rx = None; - if mode.is_with_soft() { - let (tx, rx) = super::soft_block_channel(); - soft_block_tx = Some(tx); - soft_block_rx = Some(rx); - } - - let (state_tx, state_rx) = state::channel(); + let client = + super::client::Client::connect_lazy(&config.execution_rpc_url).wrap_err_with(|| { + format!( + "failed to construct execution client for provided rollup address `{}`", + config.execution_rpc_url, + ) + })?; let executor = Executor { + config, client, - - mode, - - firm_blocks: firm_block_rx, - soft_blocks: soft_block_rx, - shutdown, - state: state_tx, - blocks_pending_finalization: HashMap::new(), - - max_spread: None, metrics, }; - let handle = Handle { - firm_blocks: firm_block_tx, - soft_blocks: soft_block_tx, - state: state_rx, - _state_init: StateNotInit, - }; - Ok((executor, handle)) + Ok(executor) } } diff --git a/crates/astria-conductor/src/executor/channel.rs b/crates/astria-conductor/src/executor/channel.rs deleted file mode 100644 index 955d62d9b3..0000000000 --- a/crates/astria-conductor/src/executor/channel.rs +++ /dev/null @@ -1,241 +0,0 @@ -//! An mpsc channel bounded by an externally driven semaphore. -//! -//! While the main purpose of this channel is to send [`sequencer_client::SequencerBlock`]s -//! from a sequencer reader to the executor, the channel is generic over the values that are -//! being sent to better test its functionality. - -use std::sync::{ - Arc, - Weak, -}; - -use tokio::sync::{ - mpsc::{ - error::SendError as TokioSendError, - unbounded_channel, - UnboundedReceiver, - UnboundedSender, - }, - AcquireError, - Semaphore, - TryAcquireError, -}; -use tracing::instrument; - -/// Creates an mpsc channel for sending soft blocks between asynchronous task. -/// -/// The initial bound of the channel is 0 and the receiver is expected to add -/// capacity to the channel. -pub(super) fn soft_block_channel() -> (Sender, Receiver) { - let cap = 0; - let sem = Arc::new(Semaphore::new(0)); - let (tx, rx) = unbounded_channel(); - let sender = Sender { - chan: tx, - sem: Arc::downgrade(&sem), - }; - let receiver = Receiver { - cap, - chan: rx, - sem, - }; - (sender, receiver) -} - -#[derive(Debug, thiserror::Error, PartialEq)] -#[error("the channel is closed")] -pub(crate) struct SendError; - -impl From for SendError { - fn from(_: AcquireError) -> Self { - Self - } -} - -impl From> for SendError { - fn from(_: TokioSendError) -> Self { - Self - } -} - -#[derive(Debug, thiserror::Error, PartialEq)] -pub(crate) enum TrySendError { - #[error("the channel is closed")] - Closed(T), - #[error("no permits available")] - NoPermits(T), -} - -impl TrySendError { - fn from_semaphore(err: &TryAcquireError, block: T) -> Self { - match err { - tokio::sync::TryAcquireError::Closed => Self::Closed(block), - tokio::sync::TryAcquireError::NoPermits => Self::NoPermits(block), - } - } -} - -impl From> for TrySendError { - fn from(err: TokioSendError) -> Self { - Self::Closed(err.0) - } -} - -#[derive(Debug, Clone)] -pub(super) struct Sender { - sem: Weak, - chan: UnboundedSender, -} - -impl Sender { - /// Sends a block, waiting until the channel has permits. - /// - /// Returns an error if the channel is closed. - #[instrument(skip_all, err)] - pub(super) async fn send(&self, block: T) -> Result<(), SendError> { - let sem = self.sem.upgrade().ok_or(SendError)?; - let permit = sem.acquire().await?; - permit.forget(); - self.chan.send(block)?; - Ok(()) - } - - /// Attempts to send a block without blocking. - /// - /// Returns an error if the channel is out of permits or if it has been closed. - pub(super) fn try_send(&self, block: T) -> Result<(), TrySendError> { - let sem = match self.sem.upgrade() { - None => return Err(TrySendError::Closed(block)), - Some(sem) => sem, - }; - let permit = match sem.try_acquire() { - Err(err) => return Err(TrySendError::from_semaphore(&err, block)), - Ok(permit) => permit, - }; - permit.forget(); - self.chan.send(block)?; - Ok(()) - } -} - -pub(super) struct Receiver { - cap: usize, - sem: Arc, - chan: UnboundedReceiver, -} - -impl Drop for Receiver { - fn drop(&mut self) { - self.sem.close(); - } -} - -impl Receiver { - /// Sets the channel's capacity to `cap`. - /// - /// `cap` will be the maximum number of blocks that can be sent - /// over the channel before new permits are added with `[SoftBlockReceiver::add_permits]`. - pub(super) fn set_capacity(&mut self, cap: usize) { - self.cap = cap; - } - - /// Adds up to `capacity` number of permits to the channel. - /// - /// `capacity` is previously set by [`SoftBlockReceiver::set_capacity`] - /// or zero by default. - pub(super) fn fill_permits(&self) { - let additional = self.cap.saturating_sub(self.sem.available_permits()); - self.sem.add_permits(additional); - } - - /// Receives a block over the channel. - #[instrument(skip_all)] - pub(super) async fn recv(&mut self) -> Option { - self.chan.recv().await - } -} - -#[cfg(test)] -mod tests { - use super::{ - soft_block_channel, - SendError, - TrySendError, - }; - - #[test] - fn fresh_channel_has_no_capacity() { - let (tx, _rx) = soft_block_channel::<()>(); - assert_eq!( - tx.try_send(()).unwrap_err(), - TrySendError::NoPermits(()), - "a fresh channel starts without permits" - ); - } - - #[test] - fn permits_are_filled_to_capacity() { - let cap = 2; - let (tx, mut rx) = soft_block_channel::<()>(); - rx.set_capacity(cap); - rx.fill_permits(); - for _ in 0..cap { - tx.try_send(()).expect("the channel should have capacity"); - } - assert_eq!( - tx.try_send(()).unwrap_err(), - TrySendError::NoPermits(()), - "a channel that has its permits used up should return with a NoPermits error until \ - refilled or closed", - ); - } - - #[test] - fn refilling_twice_has_no_effect() { - let cap = 2; - let (tx, mut rx) = soft_block_channel::<()>(); - rx.set_capacity(cap); - rx.fill_permits(); - rx.fill_permits(); - for _ in 0..cap { - tx.try_send(()).expect("the channel should have capacity"); - } - assert_eq!( - tx.try_send(()).unwrap_err(), - TrySendError::NoPermits(()), - "refilling twice in a row should result in the same number of permits" - ); - } - - #[test] - fn try_sending_to_dropped_receiver_returns_closed_error() { - let (tx, rx) = soft_block_channel::<()>(); - std::mem::drop(rx); - assert_eq!( - tx.try_send(()).unwrap_err(), - TrySendError::Closed(()), - "a channel with a dropped receiver is considered closed", - ); - } - - #[tokio::test] - async fn async_sending_to_dropped_receiver_returns_closed_error() { - let (tx, rx) = soft_block_channel::<()>(); - std::mem::drop(rx); - assert_eq!( - tx.send(()).await.unwrap_err(), - SendError, - "a channel with a dropped receiver is considered closed", - ); - } - - #[tokio::test] - #[should_panic(expected = "receiving with all senders dropped should return None")] - async fn receiving_without_any_remaining_receivers_returns_none() { - let (tx, mut rx) = soft_block_channel::<()>(); - std::mem::drop(tx); - rx.recv() - .await - .expect("receiving with all senders dropped should return None"); - } -} diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index fb3525ead1..36f90386a0 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -1,4 +1,7 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + time::Duration, +}; use astria_core::{ execution::v1::{ @@ -16,27 +19,33 @@ use astria_eyre::eyre::{ self, bail, ensure, + eyre, WrapErr as _, }; use bytes::Bytes; -use sequencer_client::tendermint::{ - block::Height as SequencerHeight, - Time as TendermintTime, +use sequencer_client::{ + tendermint::{ + block::Height as SequencerHeight, + Time as TendermintTime, + }, + HttpClient, }; use tokio::{ select, - sync::{ - mpsc, - watch::error::RecvError, - }, + sync::mpsc, + task::JoinError, +}; +use tokio_util::{ + sync::CancellationToken, + task::JoinMap, }; -use tokio_util::sync::CancellationToken; use tracing::{ debug, debug_span, error, info, instrument, + warn, }; use crate::{ @@ -46,10 +55,8 @@ use crate::{ }; mod builder; -pub(crate) mod channel; pub(crate) use builder::Builder; -use channel::soft_block_channel; mod client; mod state; @@ -57,180 +64,176 @@ mod state; mod tests; pub(super) use client::Client; -use state::StateReceiver; +use state::State; +pub(crate) use state::StateReceiver; use self::state::StateSender; type CelestiaHeight = u64; -#[derive(Clone, Debug)] -pub(crate) struct StateNotInit; -#[derive(Clone, Debug)] -pub(crate) struct StateIsInit; - -#[derive(Debug, thiserror::Error)] -pub(crate) enum FirmSendError { - #[error("executor was configured without firm commitments")] - NotSet, - #[error("failed sending blocks to executor")] - Channel { - #[from] - source: mpsc::error::SendError>, - }, -} - -#[derive(Debug, thiserror::Error)] -pub(crate) enum FirmTrySendError { - #[error("executor was configured without firm commitments")] - NotSet, - #[error("failed sending blocks to executor")] - Channel { - #[from] - source: mpsc::error::TrySendError>, - }, -} +pub(crate) struct Executor { + config: crate::Config, -#[derive(Debug, thiserror::Error)] -pub(crate) enum SoftSendError { - #[error("executor was configured without soft commitments")] - NotSet, - #[error("failed sending blocks to executor")] - Channel { source: Box }, -} + /// The execution client driving the rollup. + client: Client, -#[derive(Debug, thiserror::Error)] -pub(crate) enum SoftTrySendError { - #[error("executor was configured without firm commitments")] - NotSet, - #[error("failed sending blocks to executor")] - Channel { - source: Box>, - }, -} + /// Token to listen for Conductor being shut down. + shutdown: CancellationToken, -/// A handle to the executor. -/// -/// To be useful, [`Handle::wait_for_init`] must be called in -/// order to obtain a [`Handle`]. This is to ensure that the executor -/// state was primed before using its other methods. See [`State`] for more -/// information. -#[derive(Debug, Clone)] -pub(crate) struct Handle { - firm_blocks: Option>>, - soft_blocks: Option>, - state: StateReceiver, - _state_init: TStateInit, + metrics: &'static Metrics, } -impl Handle { - #[instrument(skip_all, err)] - pub(crate) async fn wait_for_init(&mut self) -> eyre::Result> { - self.state.wait_for_init().await.wrap_err( - "executor state channel terminated while waiting for the state to initialize", - )?; - let Self { - firm_blocks, - soft_blocks, - state, - .. - } = self.clone(); - Ok(Handle { - firm_blocks, - soft_blocks, - state, - _state_init: StateIsInit, - }) - } -} +impl Executor { + const CELESTIA: &'static str = "celestia"; + const SEQUENCER: &'static str = "sequencer"; -impl Handle { - #[instrument(skip_all, err)] - pub(crate) async fn send_firm_block( - self, - block: impl Into>, - ) -> Result<(), FirmSendError> { - let sender = self.firm_blocks.as_ref().ok_or(FirmSendError::NotSet)?; - Ok(sender.send(block.into()).await?) - } + pub(crate) async fn run_until_stopped(self) -> eyre::Result<()> { + let initialized = select!( + () = self.shutdown.clone().cancelled_owned() => { + return report_exit(Ok( + "received shutdown signal while initializing task; \ + aborting intialization and exiting" + ), ""); + } + res = self.init() => { + res.wrap_err("initialization failed")? + } + ); - pub(crate) fn try_send_firm_block( - &self, - block: impl Into>, - ) -> Result<(), FirmTrySendError> { - let sender = self.firm_blocks.as_ref().ok_or(FirmTrySendError::NotSet)?; - Ok(sender.try_send(block.into())?) + initialized.run().await } + /// Runs the init logic that needs to happen before [`Executor`] can enter its main loop. #[instrument(skip_all, err)] - pub(crate) async fn send_soft_block_owned( - self, - block: FilteredSequencerBlock, - ) -> Result<(), SoftSendError> { - let chan = self.soft_blocks.as_ref().ok_or(SoftSendError::NotSet)?; - chan.send(block) + async fn init(self) -> eyre::Result { + let state = self + .create_initial_node_state() .await - .map_err(|source| SoftSendError::Channel { - source: Box::new(source), - })?; - Ok(()) - } - - pub(crate) fn try_send_soft_block( - &self, - block: FilteredSequencerBlock, - ) -> Result<(), SoftTrySendError> { - let chan = self.soft_blocks.as_ref().ok_or(SoftTrySendError::NotSet)?; - chan.try_send(block) - .map_err(|source| SoftTrySendError::Channel { - source: Box::new(source), - })?; - Ok(()) - } + .wrap_err("failed setting initial rollup node state")?; - pub(crate) fn next_expected_firm_sequencer_height(&mut self) -> SequencerHeight { - self.state.next_expected_firm_sequencer_height() - } + let sequencer_cometbft_client = HttpClient::new(&*self.config.sequencer_cometbft_url) + .wrap_err("failed constructing sequencer cometbft RPC client")?; + + let reader_cancellation_token = self.shutdown.child_token(); + + let (firm_blocks_tx, firm_blocks_rx) = tokio::sync::mpsc::channel(16); + let (soft_blocks_tx, soft_blocks_rx) = + tokio::sync::mpsc::channel(state.calculate_max_spread()); + + let mut reader_tasks = JoinMap::new(); + if self.config.is_with_firm() { + let celestia_token = if self.config.no_celestia_auth { + None + } else { + Some(self.config.celestia_bearer_token.clone()) + }; + + let reader = crate::celestia::Builder { + celestia_http_endpoint: self.config.celestia_node_http_url.clone(), + celestia_token, + celestia_block_time: Duration::from_millis(self.config.celestia_block_time_ms), + firm_blocks: firm_blocks_tx, + rollup_state: state.subscribe(), + sequencer_cometbft_client: sequencer_cometbft_client.clone(), + sequencer_requests_per_second: self.config.sequencer_requests_per_second, + expected_celestia_chain_id: self.config.expected_celestia_chain_id.clone(), + expected_sequencer_chain_id: self.config.expected_sequencer_chain_id.clone(), + shutdown: reader_cancellation_token.child_token(), + metrics: self.metrics, + } + .build() + .wrap_err("failed to build Celestia Reader")?; + reader_tasks.spawn(Self::CELESTIA, reader.run_until_stopped()); + } - pub(crate) fn next_expected_soft_sequencer_height(&mut self) -> SequencerHeight { - self.state.next_expected_soft_sequencer_height() - } + if self.config.is_with_soft() { + let sequencer_grpc_client = + crate::sequencer::SequencerGrpcClient::new(&self.config.sequencer_grpc_url) + .wrap_err("failed constructing grpc client for Sequencer")?; + + let sequencer_reader = crate::sequencer::Builder { + sequencer_grpc_client, + sequencer_cometbft_client: sequencer_cometbft_client.clone(), + sequencer_block_time: Duration::from_millis(self.config.sequencer_block_time_ms), + expected_sequencer_chain_id: self.config.expected_sequencer_chain_id.clone(), + shutdown: reader_cancellation_token.child_token(), + soft_blocks: soft_blocks_tx, + rollup_state: state.subscribe(), + } + .build(); + reader_tasks.spawn(Self::SEQUENCER, sequencer_reader.run_until_stopped()); + }; - #[instrument(skip_all)] - pub(crate) async fn next_expected_soft_height_if_changed( - &mut self, - ) -> Result { - self.state.next_expected_soft_height_if_changed().await + Ok(Initialized { + config: self.config, + client: self.client, + firm_blocks: firm_blocks_rx, + soft_blocks: soft_blocks_rx, + shutdown: self.shutdown, + state, + blocks_pending_finalization: HashMap::new(), + metrics: self.metrics, + reader_tasks, + reader_cancellation_token, + }) } - pub(crate) fn rollup_id(&mut self) -> RollupId { - self.state.rollup_id() - } + #[instrument(skip_all, err)] + async fn create_initial_node_state(&self) -> eyre::Result { + let genesis_info = { + async { + self.client + .clone() + .get_genesis_info_with_retry() + .await + .wrap_err("failed getting genesis info") + } + }; + let commitment_state = { + async { + self.client + .clone() + .get_commitment_state_with_retry() + .await + .wrap_err("failed getting commitment state") + } + }; + let (genesis_info, commitment_state) = tokio::try_join!(genesis_info, commitment_state)?; - pub(crate) fn celestia_base_block_height(&mut self) -> CelestiaHeight { - self.state.celestia_base_block_height() - } + let (state, _) = state::channel( + State::try_from_genesis_info_and_commitment_state(genesis_info, commitment_state) + .wrap_err( + "failed to construct initial state gensis and commitment info received from \ + rollup", + )?, + ); - pub(crate) fn celestia_block_variance(&mut self) -> u64 { - self.state.celestia_block_variance() + self.metrics + .absolute_set_executed_firm_block_number(state.firm_number()); + self.metrics + .absolute_set_executed_soft_block_number(state.soft_number()); + info!( + initial_state = serde_json::to_string(&*state.get()) + .expect("writing json to a string should not fail"), + "received genesis info from rollup", + ); + Ok(state) } } -pub(crate) struct Executor { +pub(crate) struct Initialized { + config: crate::Config, + /// The execution client driving the rollup. client: Client, - /// The mode under which this executor (and hence conductor) runs. - mode: CommitLevel, - /// The channel of which this executor receives blocks for executing /// firm commitments. - /// Only set if `mode` is `FirmOnly` or `SoftAndFirm`. - firm_blocks: Option>>, + firm_blocks: mpsc::Receiver>, /// The channel of which this executor receives blocks for executing /// soft commitments. - /// Only set if `mode` is `SoftOnly` or `SoftAndFirm`. - soft_blocks: Option>, + soft_blocks: mpsc::Receiver, /// Token to listen for Conductor being shut down. shutdown: CancellationToken, @@ -244,43 +247,38 @@ pub(crate) struct Executor { /// without re-executing on top of the rollup node. blocks_pending_finalization: HashMap, - /// The maximum permitted spread between firm and soft blocks. - max_spread: Option, - metrics: &'static Metrics, + + /// The tasks reading block data off Celestia or Sequencer. + reader_tasks: JoinMap<&'static str, eyre::Result<()>>, + + /// The cancellation token specifically for signaling the `reader_tasks` to shut down. + reader_cancellation_token: CancellationToken, } -impl Executor { - pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { - select!( +impl Initialized { + async fn run(mut self) -> eyre::Result<()> { + let reason = select!( + biased; + () = self.shutdown.clone().cancelled_owned() => { - return report_exit(Ok( - "received shutdown signal while initializing task; \ - aborting intialization and exiting" - ), ""); + Ok("received shutdown signal") } - res = self.init() => { - res.wrap_err("initialization failed")?; + + res = self.run_event_loop() => { + res } ); - let reason = loop { - let spread_not_too_large = !self.is_spread_too_large(); - if spread_not_too_large { - if let Some(channel) = self.soft_blocks.as_mut() { - channel.fill_permits(); - } - } + self.shutdown(reason).await + } + async fn run_event_loop(&mut self) -> eyre::Result<&'static str> { + loop { select!( biased; - () = self.shutdown.cancelled() => { - break Ok("received shutdown signal"); - } - - Some(block) = async { self.firm_blocks.as_mut().unwrap().recv().await }, - if self.firm_blocks.is_some() => + Some(block) = self.firm_blocks.recv() => { debug_span!("conductor::Executor::run_until_stopped").in_scope(||debug!( block.height = %block.sequencer_height(), @@ -292,8 +290,7 @@ impl Executor { } } - Some(block) = async { self.soft_blocks.as_mut().unwrap().recv().await }, - if self.soft_blocks.is_some() && spread_not_too_large => + Some(block) = self.soft_blocks.recv(), if !self.is_spread_too_large() => { debug_span!("conductor::Executor::run_until_stopped").in_scope(||debug!( block.height = %block.height(), @@ -304,53 +301,14 @@ impl Executor { break Err(error).wrap_err("failed executing soft block"); } } - ); - }; - - // XXX: explicitly setting the message (usually implicitly set by tracing) - let message = "shutting down"; - report_exit(reason, message) - } - /// Runs the init logic that needs to happen before [`Executor`] can enter its main loop. - #[instrument(skip_all, err)] - async fn init(&mut self) -> eyre::Result<()> { - self.set_initial_node_state() - .await - .wrap_err("failed setting initial rollup node state")?; + Some((task, res)) = self.reader_tasks.join_next() => { + break handle_task_exit(task, res); + } - let max_spread: usize = self.calculate_max_spread(); - self.max_spread.replace(max_spread); - if let Some(channel) = self.soft_blocks.as_mut() { - channel.set_capacity(max_spread); - info!( - max_spread, - "setting capacity of soft blocks channel to maximum permitted firm<>soft \ - commitment spread (this has no effect if conductor is set to perform soft-sync \ - only)" + else => break Ok("all channels are closed") ); } - - Ok(()) - } - - /// Calculates the maximum allowed spread between firm and soft commitments heights. - /// - /// The maximum allowed spread is taken as `max_spread = variance * 6`, where `variance` - /// is the `celestia_block_variance` as defined in the rollup node's genesis that this - /// executor/conductor talks to. - /// - /// The heuristic 6 is the largest number of Sequencer heights that will be found at - /// one Celestia height. - /// - /// # Panics - /// Panics if the `u32` underlying the celestia block variance tracked in the state could - /// not be converted to a `usize`. This should never happen on any reasonable architecture - /// that Conductor will run on. - fn calculate_max_spread(&self) -> usize { - usize::try_from(self.state.celestia_block_variance()) - .expect("converting a u32 to usize should work on any architecture conductor runs on") - .saturating_mul(6) } /// Returns if the spread between firm and soft commitment heights in the tracked state is too @@ -362,7 +320,7 @@ impl Executor { /// /// Panics if called before [`Executor::init`] because `max_spread` must be set. fn is_spread_too_large(&self) -> bool { - if self.firm_blocks.is_none() { + if !self.config.is_with_firm() { return false; } let (next_firm, next_soft) = { @@ -372,12 +330,7 @@ impl Executor { }; let is_too_far_ahead = usize::try_from(next_soft.saturating_sub(next_firm)) - .map(|spread| { - spread - >= self - .max_spread - .expect("executor must be initalized and this field set") - }) + .map(|spread| spread >= self.state.calculate_max_spread()) .unwrap_or(false); if is_too_far_ahead { @@ -568,43 +521,6 @@ impl Executor { Ok(executed_block) } - #[instrument(skip_all, err)] - async fn set_initial_node_state(&mut self) -> eyre::Result<()> { - let genesis_info = { - async { - self.client - .clone() - .get_genesis_info_with_retry() - .await - .wrap_err("failed getting genesis info") - } - }; - let commitment_state = { - async { - self.client - .clone() - .get_commitment_state_with_retry() - .await - .wrap_err("failed getting commitment state") - } - }; - let (genesis_info, commitment_state) = tokio::try_join!(genesis_info, commitment_state)?; - self.state - .try_init(genesis_info, commitment_state) - .wrap_err("failed initializing state tracking")?; - - self.metrics - .absolute_set_executed_firm_block_number(self.state.firm_number()); - self.metrics - .absolute_set_executed_soft_block_number(self.state.soft_number()); - info!( - initial_state = serde_json::to_string(&*self.state.get()) - .expect("writing json to a string should not fail"), - "received genesis info from rollup", - ); - Ok(()) - } - #[instrument(skip_all, err)] async fn update_commitment_state(&mut self, update: Update) -> eyre::Result<()> { use Update::{ @@ -662,13 +578,46 @@ impl Executor { should_execute_firm_block( self.state.next_expected_firm_sequencer_height().value(), self.state.next_expected_soft_sequencer_height().value(), - self.mode, + self.config.execution_commit_level, ) } + + #[instrument(skip_all, err, ret)] + async fn shutdown(mut self, reason: eyre::Result<&'static str>) -> eyre::Result<()> { + info!("signaling all reader tasks to exit"); + self.reader_cancellation_token.cancel(); + while let Some((task, exit_status)) = self.reader_tasks.join_next().await { + match crate::utils::flatten(exit_status) { + Ok(()) => info!(task, "task exited"), + Err(error) => warn!(task, %error, "task exited"), + } + } + report_exit(reason, "shutting down") + } +} + +/// Wraps a task result to explain why it exited. +/// +/// Right now only the err-branch is populated because tasks should +/// never exit. Still returns an `eyre::Result` to line up with the +/// return type of [`Executor::run_until_stopped`]. +/// +/// Executor should `break handle_task_exit` immediately after calling +/// this method. +fn handle_task_exit( + task: &'static str, + res: Result, JoinError>, +) -> eyre::Result<&'static str> { + match res { + Ok(Ok(())) => Err(eyre!("task `{task}` finished unexpectedly")), + Ok(Err(err)) => Err(err).wrap_err_with(|| format!("task `{task}` exited with error")), + Err(err) => Err(err).wrap_err_with(|| format!("task `{task}` panicked")), + } } #[instrument(skip_all)] fn report_exit(reason: eyre::Result<&str>, message: &str) -> eyre::Result<()> { + // XXX: explicitly setting the message (usually implicitly set by tracing) match reason { Ok(reason) => { info!(%reason, message); diff --git a/crates/astria-conductor/src/executor/state.rs b/crates/astria-conductor/src/executor/state.rs index 2ae4e2c4b2..1f315b078b 100644 --- a/crates/astria-conductor/src/executor/state.rs +++ b/crates/astria-conductor/src/executor/state.rs @@ -10,10 +10,6 @@ use astria_core::{ }, primitive::v1::RollupId, }; -use astria_eyre::{ - eyre, - eyre::WrapErr as _, -}; use bytes::Bytes; use sequencer_client::tendermint::block::Height as SequencerHeight; use tokio::sync::watch::{ @@ -22,8 +18,8 @@ use tokio::sync::watch::{ }; use tracing::instrument; -pub(super) fn channel() -> (StateSender, StateReceiver) { - let (tx, rx) = watch::channel(None); +pub(super) fn channel(state: State) -> (StateSender, StateReceiver) { + let (tx, rx) = watch::channel(state); let sender = StateSender { inner: tx, }; @@ -46,25 +42,14 @@ pub(super) struct InvalidState { } #[derive(Clone, Debug)] -pub(super) struct StateReceiver { - inner: watch::Receiver>, +pub(crate) struct StateReceiver { + inner: watch::Receiver, } impl StateReceiver { - #[instrument(skip_all, err)] - pub(super) async fn wait_for_init(&mut self) -> eyre::Result<()> { - self.inner - .wait_for(Option::is_some) - .await - .wrap_err("channel failed while waiting for state to become initialized")?; - Ok(()) - } - - pub(super) fn next_expected_firm_sequencer_height(&self) -> SequencerHeight { + pub(crate) fn next_expected_firm_sequencer_height(&self) -> SequencerHeight { self.inner .borrow() - .as_ref() - .expect("the state is initialized") .next_expected_firm_sequencer_height() .expect( "the tracked state must never be set to a genesis/commitment state that cannot be \ @@ -72,11 +57,9 @@ impl StateReceiver { ) } - pub(super) fn next_expected_soft_sequencer_height(&self) -> SequencerHeight { + pub(crate) fn next_expected_soft_sequencer_height(&self) -> SequencerHeight { self.inner .borrow() - .as_ref() - .expect("the state is initialized") .next_expected_soft_sequencer_height() .expect( "the tracked state must never be set to a genesis/commitment state that cannot be \ @@ -94,11 +77,11 @@ impl StateReceiver { } pub(super) struct StateSender { - inner: watch::Sender>, + inner: watch::Sender, } fn can_map_firm_to_sequencer_height( - genesis_info: GenesisInfo, + genesis_info: &GenesisInfo, commitment_state: &CommitmentState, ) -> Result<(), InvalidState> { let sequencer_genesis_height = genesis_info.sequencer_genesis_block_height(); @@ -115,7 +98,7 @@ fn can_map_firm_to_sequencer_height( } fn can_map_soft_to_sequencer_height( - genesis_info: GenesisInfo, + genesis_info: &GenesisInfo, commitment_state: &CommitmentState, ) -> Result<(), InvalidState> { let sequencer_genesis_height = genesis_info.sequencer_genesis_block_height(); @@ -132,21 +115,29 @@ fn can_map_soft_to_sequencer_height( } impl StateSender { - pub(super) fn try_init( - &mut self, - genesis_info: GenesisInfo, - commitment_state: CommitmentState, - ) -> Result<(), InvalidState> { - can_map_firm_to_sequencer_height(genesis_info, &commitment_state)?; - can_map_soft_to_sequencer_height(genesis_info, &commitment_state)?; - self.inner.send_modify(move |state| { - let old_state = state.replace(State::new(genesis_info, commitment_state)); - assert!( - old_state.is_none(), - "the state must be initialized only once", - ); - }); - Ok(()) + pub(super) fn subscribe(&self) -> StateReceiver { + StateReceiver { + inner: self.inner.subscribe(), + } + } + + /// Calculates the maximum allowed spread between firm and soft commitments heights. + /// + /// The maximum allowed spread is taken as `max_spread = variance * 6`, where `variance` + /// is the `celestia_block_variance` as defined in the rollup node's genesis that this + /// executor/conductor talks to. + /// + /// The heuristic 6 is the largest number of Sequencer heights that will be found at + /// one Celestia height. + /// + /// # Panics + /// Panics if the `u32` underlying the celestia block variance tracked in the state could + /// not be converted to a `usize`. This should never happen on any reasonable architecture + /// that Conductor will run on. + pub(super) fn calculate_max_spread(&self) -> usize { + usize::try_from(self.celestia_block_variance()) + .expect("converting a u32 to usize should work on any architecture conductor runs on") + .saturating_mul(6) } pub(super) fn try_update_commitment_state( @@ -154,26 +145,21 @@ impl StateSender { commitment_state: CommitmentState, ) -> Result<(), InvalidState> { let genesis_info = self.genesis_info(); - can_map_firm_to_sequencer_height(genesis_info, &commitment_state)?; - can_map_soft_to_sequencer_height(genesis_info, &commitment_state)?; + can_map_firm_to_sequencer_height(&genesis_info, &commitment_state)?; + can_map_soft_to_sequencer_height(&genesis_info, &commitment_state)?; self.inner.send_modify(move |state| { - state - .as_mut() - .expect("the state must be initialized") - .set_commitment_state(commitment_state); + state.set_commitment_state(commitment_state); }); Ok(()) } - pub(super) fn get(&self) -> tokio::sync::watch::Ref<'_, Option> { + pub(super) fn get(&self) -> tokio::sync::watch::Ref<'_, State> { self.inner.borrow() } pub(super) fn next_expected_firm_sequencer_height(&self) -> SequencerHeight { self.inner .borrow() - .as_ref() - .expect("the state is initialized") .next_expected_firm_sequencer_height() .expect( "the tracked state must never be set to a genesis/commitment state that cannot be \ @@ -184,8 +170,6 @@ impl StateSender { pub(super) fn next_expected_soft_sequencer_height(&self) -> SequencerHeight { self.inner .borrow() - .as_ref() - .expect("the state is initialized") .next_expected_soft_sequencer_height() .expect( "the tracked state must never be set to a genesis/commitment state that cannot be \ @@ -198,11 +182,9 @@ macro_rules! forward_impls { ($target:ident: $([$fn:ident -> $ret:ty]),*$(,)?) => { impl $target { $( - pub(super) fn $fn(&self) -> $ret { + pub(crate) fn $fn(&self) -> $ret { self.inner .borrow() - .as_ref() - .expect("the state is initialized") .$fn() .clone() } @@ -241,11 +223,16 @@ pub(super) struct State { } impl State { - fn new(genesis_info: GenesisInfo, commitment_state: CommitmentState) -> Self { - Self { + pub(super) fn try_from_genesis_info_and_commitment_state( + genesis_info: GenesisInfo, + commitment_state: CommitmentState, + ) -> Result { + can_map_firm_to_sequencer_height(&genesis_info, &commitment_state)?; + can_map_soft_to_sequencer_height(&genesis_info, &commitment_state)?; + Ok(State { commitment_state, genesis_info, - } + }) } /// Sets the inner commitment state. @@ -390,16 +377,21 @@ mod tests { .unwrap() } - fn make_state() -> (StateSender, StateReceiver) { - let (mut tx, rx) = super::channel(); - tx.try_init(make_genesis_info(), make_commitment_state()) - .unwrap(); - (tx, rx) + fn make_state() -> State { + State::try_from_genesis_info_and_commitment_state( + make_genesis_info(), + make_commitment_state(), + ) + .unwrap() + } + + fn make_channel() -> (StateSender, StateReceiver) { + super::channel(make_state()) } #[test] fn next_firm_sequencer_height_is_correct() { - let (_, rx) = make_state(); + let (_, rx) = make_channel(); assert_eq!( SequencerHeight::from(12u32), rx.next_expected_firm_sequencer_height(), @@ -408,7 +400,7 @@ mod tests { #[test] fn next_soft_sequencer_height_is_correct() { - let (_, rx) = make_state(); + let (_, rx) = make_channel(); assert_eq!( SequencerHeight::from(13u32), rx.next_expected_soft_sequencer_height(), diff --git a/crates/astria-conductor/src/executor/tests.rs b/crates/astria-conductor/src/executor/tests.rs index e8ead9dc71..a5206cb141 100644 --- a/crates/astria-conductor/src/executor/tests.rs +++ b/crates/astria-conductor/src/executor/tests.rs @@ -13,6 +13,7 @@ use bytes::Bytes; use super::{ should_execute_firm_block, state::{ + State, StateReceiver, StateSender, }, @@ -57,9 +58,9 @@ fn make_state( base_celestia_height: 1, }) .unwrap(); - let (mut tx, rx) = super::state::channel(); - tx.try_init(genesis_info, commitment_state).unwrap(); - (tx, rx) + let state = + State::try_from_genesis_info_and_commitment_state(genesis_info, commitment_state).unwrap(); + super::state::channel(state) } #[track_caller] diff --git a/crates/astria-conductor/src/sequencer/builder.rs b/crates/astria-conductor/src/sequencer/builder.rs index c71aa0e7d8..a95b98e13a 100644 --- a/crates/astria-conductor/src/sequencer/builder.rs +++ b/crates/astria-conductor/src/sequencer/builder.rs @@ -1,31 +1,36 @@ use std::time::Duration; +use astria_core::sequencerblock::v1::block::FilteredSequencerBlock; +use tokio::sync::mpsc; use tokio_util::sync::CancellationToken; use super::SequencerGrpcClient; -use crate::executor; +use crate::executor::StateReceiver; pub(crate) struct Builder { - pub(crate) executor: executor::Handle, pub(crate) sequencer_grpc_client: SequencerGrpcClient, pub(crate) sequencer_cometbft_client: sequencer_client::HttpClient, pub(crate) sequencer_block_time: Duration, pub(crate) expected_sequencer_chain_id: String, pub(crate) shutdown: CancellationToken, + pub(crate) rollup_state: StateReceiver, + pub(crate) soft_blocks: mpsc::Sender, } impl Builder { pub(crate) fn build(self) -> super::Reader { let Self { - executor, sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, expected_sequencer_chain_id, shutdown, + rollup_state, + soft_blocks, } = self; super::Reader { - executor, + rollup_state, + soft_blocks, sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, diff --git a/crates/astria-conductor/src/sequencer/mod.rs b/crates/astria-conductor/src/sequencer/mod.rs index df9d11b5a1..00caf1a450 100644 --- a/crates/astria-conductor/src/sequencer/mod.rs +++ b/crates/astria-conductor/src/sequencer/mod.rs @@ -25,7 +25,10 @@ use sequencer_client::{ LatestHeightStream, StreamLatestHeight as _, }; -use tokio::select; +use tokio::{ + select, + sync::mpsc, +}; use tokio_util::sync::CancellationToken; use tracing::{ debug, @@ -40,12 +43,6 @@ use tracing::{ use crate::{ block_cache::BlockCache, - executor::{ - self, - SoftSendError, - SoftTrySendError, - StateIsInit, - }, sequencer::block_stream::BlocksFromHeightStream, }; @@ -56,16 +53,15 @@ mod reporting; pub(crate) use builder::Builder; pub(crate) use client::SequencerGrpcClient; +use crate::executor::StateReceiver; + /// [`Reader`] reads Sequencer blocks and forwards them to the [`crate::Executor`] task. /// /// The blocks are forwarded in strictly sequential order of their Sequencr heights. /// A [`Reader`] is created with [`Builder::build`] and run with [`Reader::run_until_stopped`]. pub(crate) struct Reader { - /// The handle for sending sequencer blocks as soft commits to the executor - /// and checking it for the next expected height, and rollup ID associated with - /// this instance of Conductor. - /// Must be initialized before it can be used. - executor: executor::Handle, + rollup_state: StateReceiver, + soft_blocks: mpsc::Sender, /// The gRPC client to fetch new blocks from the Sequencer network. sequencer_grpc_client: SequencerGrpcClient, @@ -87,22 +83,22 @@ pub(crate) struct Reader { impl Reader { pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { - let executor = select!( + let () = select!( () = self.shutdown.clone().cancelled_owned() => { return report_exit(Ok("received shutdown signal while waiting for Sequencer reader task to initialize"), ""); } res = self.initialize() => { - res? + res?; } ); - RunningReader::try_from_parts(self, executor) + RunningReader::try_from_parts(self) .wrap_err("failed entering run loop")? .run_until_stopped() .await } #[instrument(skip_all, err)] - async fn initialize(&mut self) -> eyre::Result> { + async fn initialize(&mut self) -> eyre::Result<()> { let actual_sequencer_chain_id = get_sequencer_chain_id(self.sequencer_cometbft_client.clone()) .await @@ -113,20 +109,13 @@ impl Reader { "expected chain id `{expected_sequencer_chain_id}` does not match actual: \ `{actual_sequencer_chain_id}`" ); - - self.executor - .wait_for_init() - .await - .wrap_err("handle to executor failed while waiting for it being initialized") + Ok(()) } } struct RunningReader { - /// The initialized handle to the executor task. - /// Used for sending sequencer blocks as soft commits to the executor - /// and checking it for the next expected height, and rollup ID associated with - /// this instance of Conductor. - executor: executor::Handle, + rollup_state: StateReceiver, + soft_blocks: mpsc::Sender, /// Caches the filtered sequencer blocks retrieved from the Sequencer. /// This cache will yield a block if it contains a block that matches the @@ -143,26 +132,26 @@ struct RunningReader { /// An enqueued block waiting for executor to free up. Set if the executor exhibits /// backpressure. - enqueued_block: Fuse>>, + enqueued_block: + Fuse>>>, /// Token to listen for Conductor being shut down. shutdown: CancellationToken, } impl RunningReader { - fn try_from_parts( - reader: Reader, - mut executor: executor::Handle, - ) -> eyre::Result { + fn try_from_parts(reader: Reader) -> eyre::Result { let Reader { sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, shutdown, + rollup_state, + soft_blocks, .. } = reader; - let next_expected_height = executor.next_expected_soft_sequencer_height(); + let next_expected_height = rollup_state.next_expected_soft_sequencer_height(); let latest_height_stream = sequencer_cometbft_client.stream_latest_height(sequencer_block_time); @@ -171,14 +160,15 @@ impl RunningReader { .wrap_err("failed constructing sequential block cache")?; let blocks_from_heights = BlocksFromHeightStream::new( - executor.rollup_id(), + rollup_state.rollup_id(), next_expected_height, sequencer_grpc_client, ); let enqueued_block: Fuse>> = future::Fuse::terminated(); Ok(RunningReader { - executor, + rollup_state, + soft_blocks, block_cache, latest_height_stream, blocks_from_heights, @@ -215,7 +205,7 @@ impl RunningReader { } // Skip heights that executor has already executed (e.g. firm blocks from Celestia) - Ok(next_height) = self.executor.next_expected_soft_height_if_changed() => { + Ok(next_height) = self.rollup_state.next_expected_soft_height_if_changed() => { self.update_next_expected_height(next_height); } @@ -267,34 +257,18 @@ impl RunningReader { /// Enqueues the block is the channel to the executor is full, sending it once /// it frees up. fn send_to_executor(&mut self, block: FilteredSequencerBlock) -> eyre::Result<()> { - if let Err(err) = self.executor.try_send_soft_block(block) { + if let Err(err) = self.soft_blocks.try_send(block) { match err { - SoftTrySendError::Channel { - source, - } => match *source { - executor::channel::TrySendError::Closed(_) => { - bail!("could not send block to executor because its channel was closed"); - } - - executor::channel::TrySendError::NoPermits(block) => { - trace!( - "executor channel is full; scheduling block and stopping block fetch \ - until a slot opens up" - ); - self.enqueued_block = self - .executor - .clone() - .send_soft_block_owned(block) - .boxed() - .fuse(); - } - }, - - SoftTrySendError::NotSet => { - bail!( - "conductor was configured without soft commitments; the sequencer reader \ - task should have never been started", + mpsc::error::TrySendError::Full(block) => { + trace!( + "executor channel is full; scheduling block and stopping block fetch \ + until a slot opens up" ); + let chan = self.soft_blocks.clone(); + self.enqueued_block = async move { chan.send(block).await }.boxed().fuse(); + } + mpsc::error::TrySendError::Closed(_) => { + bail!("could not send block to executor because its channel was closed") } } } diff --git a/crates/astria-conductor/tests/blackbox/firm_only.rs b/crates/astria-conductor/tests/blackbox/firm_only.rs index e634292a1c..f08271ce43 100644 --- a/crates/astria-conductor/tests/blackbox/firm_only.rs +++ b/crates/astria-conductor/tests/blackbox/firm_only.rs @@ -125,7 +125,7 @@ async fn simple() { .await .expect( "conductor should have executed the firm block and updated the firm commitment state \ - within 1000ms", + within 2000ms", ); }