Skip to content

Commit

Permalink
refactor(conductor): make firm, soft readers subtasks
Browse files Browse the repository at this point in the history
making the celestia (firm) and astria (soft)
readers subtasks of the executor task is a more
faithful representation of their dependencies:

executor can run with either or both present.
But the reader tasks cannot run without the
executor task present. To fully initialize they
also depend on data from the executor, and they
could be implemented by streams instead of
free standing tasks.

spin up readers only after commitment, genesis states are received

this allows removing a lot of complexity:

1. the readers need not explicitly wait for the state
to be initialized but receive an already initialized
watch channel.
2. there is no need for a bespoke channel to dynamically
set permits - a normal mpsc channel can be used with
its capacity initialized after receiving the genesis info.

executor::Initialized::run delegates to
executor::Initialized::run_event_loop to separate
the shutdown token from the other arms of the
select macro - this way, an else => {} arm can be
introduced that shuts down executor as a fallback
  • Loading branch information
SuperFluffy committed Jan 24, 2025
1 parent a83f3ee commit 453f66c
Show file tree
Hide file tree
Showing 13 changed files with 463 additions and 950 deletions.
16 changes: 11 additions & 5 deletions crates/astria-conductor/src/celestia/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@ use jsonrpsee::http_client::HttpClient as CelestiaClient;
use tendermint_rpc::HttpClient as SequencerClient;
use tokio_util::sync::CancellationToken;

use super::Reader;
use super::{
Reader,
ReconstructedBlock,
};
use crate::{
executor,
executor::StateReceiver,
metrics::Metrics,
};

pub(crate) struct Builder {
pub(crate) celestia_block_time: Duration,
pub(crate) celestia_http_endpoint: String,
pub(crate) celestia_token: Option<String>,
pub(crate) executor: executor::Handle,
pub(crate) firm_blocks: tokio::sync::mpsc::Sender<Box<ReconstructedBlock>>,
pub(crate) rollup_state: StateReceiver,
pub(crate) sequencer_cometbft_client: SequencerClient,
pub(crate) sequencer_requests_per_second: u32,
pub(crate) expected_celestia_chain_id: String,
Expand All @@ -36,13 +40,14 @@ impl Builder {
celestia_block_time,
celestia_http_endpoint,
celestia_token,
executor,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
expected_sequencer_chain_id,
shutdown,
metrics,
firm_blocks,
rollup_state,
} = self;

let celestia_client = create_celestia_client(celestia_http_endpoint, celestia_token)
Expand All @@ -51,7 +56,8 @@ impl Builder {
Ok(Reader {
celestia_block_time,
celestia_client,
executor,
firm_blocks,
rollup_state,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
Expand Down
120 changes: 53 additions & 67 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,11 @@ use tracing::{
trace,
trace_span,
warn,
Instrument as _,
};

use crate::{
block_cache::GetSequencerHeight,
executor::{
FirmSendError,
FirmTrySendError,
StateIsInit,
},
metrics::Metrics,
utils::flatten,
};
Expand Down Expand Up @@ -95,10 +91,7 @@ use self::{
BlobVerifier,
},
};
use crate::{
block_cache::BlockCache,
executor,
};
use crate::block_cache::BlockCache;

/// Sequencer Block information reconstructed from Celestia blobs.
///
Expand Down Expand Up @@ -138,8 +131,11 @@ pub(crate) struct Reader {
/// Client to fetch heights and blocks from Celestia.
celestia_client: CelestiaClient,

/// The channel used to send messages to the executor task.
executor: executor::Handle,
/// The channel to forward firm blocks to the executor.
firm_blocks: mpsc::Sender<Box<ReconstructedBlock>>,

/// The channel to read updates of the rollup state from.
rollup_state: crate::executor::StateReceiver,

/// The client to get the sequencer namespace and verify blocks.
sequencer_cometbft_client: SequencerClient,
Expand All @@ -162,7 +158,7 @@ pub(crate) struct Reader {

impl Reader {
pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> {
let (executor, sequencer_chain_id) = select!(
let sequencer_chain_id = select!(
() = self.shutdown.clone().cancelled_owned() => {
info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(||
info!("received shutdown signal while waiting for Celestia reader task to initialize")
Expand All @@ -175,16 +171,14 @@ impl Reader {
}
);

RunningReader::from_parts(self, executor, sequencer_chain_id)
RunningReader::from_parts(self, sequencer_chain_id)
.wrap_err("failed entering run loop")?
.run_until_stopped()
.await
}

#[instrument(skip_all, err)]
async fn initialize(
&mut self,
) -> eyre::Result<(executor::Handle<StateIsInit>, tendermint::chain::Id)> {
async fn initialize(&mut self) -> eyre::Result<tendermint::chain::Id> {
let validate_celestia_chain_id = async {
let actual_celestia_chain_id = get_celestia_chain_id(&self.celestia_client)
.await
Expand All @@ -196,14 +190,8 @@ impl Reader {
`{actual_celestia_chain_id}`"
);
Ok(())
};

let wait_for_init_executor = async {
self.executor
.wait_for_init()
.await
.wrap_err("handle to executor failed while waiting for it being initialized")
};
}
.in_current_span();

let get_and_validate_sequencer_chain_id = async {
let actual_sequencer_chain_id =
Expand All @@ -217,18 +205,18 @@ impl Reader {
actual: `{actual_sequencer_chain_id}`"
);
Ok(actual_sequencer_chain_id)
};
}
.in_current_span();

try_join!(
validate_celestia_chain_id,
wait_for_init_executor,
get_and_validate_sequencer_chain_id
)
.map(|((), executor_init, sequencer_chain_id)| (executor_init, sequencer_chain_id))
.map(|((), sequencer_chain_id)| sequencer_chain_id)
}
}

#[instrument(skip_all, err)]
#[instrument(skip_all, err, ret(Display))]
async fn get_celestia_chain_id(
celestia_client: &CelestiaClient,
) -> eyre::Result<celestia_tendermint::chain::Id> {
Expand Down Expand Up @@ -263,8 +251,11 @@ struct RunningReader {
// Client to fetch heights and blocks from Celestia.
celestia_client: CelestiaClient,

/// The channel used to send messages to the executor task.
executor: executor::Handle<StateIsInit>,
/// The channel to forward firm blocks to the executor.
firm_blocks: mpsc::Sender<Box<ReconstructedBlock>>,

/// The channel to read updates of the rollup state from.
rollup_state: crate::executor::StateReceiver,

/// Token to listen for Conductor being shut down.
shutdown: CancellationToken,
Expand All @@ -280,7 +271,8 @@ struct RunningReader {
/// capacity again. Used as a back pressure mechanism so that this task does not fetch more
/// blobs if there is no capacity in the executor to execute them against the rollup in
/// time.
enqueued_block: Fuse<BoxFuture<'static, Result<u64, FirmSendError>>>,
enqueued_block:
Fuse<BoxFuture<'static, Result<u64, mpsc::error::SendError<Box<ReconstructedBlock>>>>>,

/// The latest observed head height of the Celestia network. Set by values read from
/// the `latest_height` stream.
Expand Down Expand Up @@ -323,7 +315,6 @@ struct RunningReader {
impl RunningReader {
fn from_parts(
exposed_reader: Reader,
mut executor: executor::Handle<StateIsInit>,
sequencer_chain_id: tendermint::chain::Id,
) -> eyre::Result<Self> {
let Reader {
Expand All @@ -333,21 +324,23 @@ impl RunningReader {
shutdown,
sequencer_requests_per_second,
metrics,
firm_blocks,
rollup_state,
..
} = exposed_reader;
let block_cache =
BlockCache::with_next_height(executor.next_expected_firm_sequencer_height())
BlockCache::with_next_height(rollup_state.next_expected_firm_sequencer_height())
.wrap_err("failed constructing sequential block cache")?;

let latest_heights = stream_latest_heights(celestia_client.clone(), celestia_block_time);
let rollup_id = executor.rollup_id();
let rollup_id = rollup_state.rollup_id();
let rollup_namespace = astria_core::celestia::namespace_v0_from_rollup_id(rollup_id);
let sequencer_namespace =
astria_core::celestia::namespace_v0_from_sha256_of_bytes(sequencer_chain_id.as_bytes());

let celestia_next_height = executor.celestia_base_block_height();
let celestia_reference_height = executor.celestia_base_block_height();
let celestia_variance = executor.celestia_block_variance();
let celestia_next_height = rollup_state.celestia_base_block_height();
let celestia_reference_height = rollup_state.celestia_base_block_height();
let celestia_variance = rollup_state.celestia_block_variance();

Ok(Self {
block_cache,
Expand All @@ -357,7 +350,8 @@ impl RunningReader {
),
celestia_client,
enqueued_block: Fuse::terminated(),
executor,
firm_blocks,
rollup_state,
latest_heights,
shutdown,
reconstruction_tasks: JoinMap::new(),
Expand Down Expand Up @@ -498,7 +492,7 @@ impl RunningReader {
rollup_id: self.rollup_id,
rollup_namespace: self.rollup_namespace,
sequencer_namespace: self.sequencer_namespace,
executor: self.executor.clone(),
rollup_state: self.rollup_state.clone(),
metrics: self.metrics,
};
self.reconstruction_tasks.spawn(height, task.execute());
Expand All @@ -520,28 +514,20 @@ impl RunningReader {
#[instrument(skip_all)]
fn forward_block_to_executor(&mut self, block: ReconstructedBlock) -> eyre::Result<()> {
let celestia_height = block.celestia_height;
match self.executor.try_send_firm_block(block) {
match self.firm_blocks.try_send(block.into()) {
Ok(()) => self.advance_reference_celestia_height(celestia_height),
Err(FirmTrySendError::Channel {
source,
}) => match source {
mpsc::error::TrySendError::Full(block) => {
trace!(
"executor channel is full; rescheduling block fetch until the channel \
opens up"
);
self.enqueued_block =
enqueue_block(self.executor.clone(), block).boxed().fuse();
}
mpsc::error::TrySendError::Closed(_) => {
bail!("exiting because executor channel is closed");
}
},
Err(FirmTrySendError::NotSet) => bail!(
"exiting because executor was configured without firm commitments; this Celestia \
reader should have never been started"
),
}
Err(mpsc::error::TrySendError::Full(block)) => {
trace!(
"executor channel is full; rescheduling block fetch until the channel opens up"
);
self.enqueued_block = enqueue_block(self.firm_blocks.clone(), block)
.boxed()
.fuse();
}
Err(mpsc::error::TrySendError::Closed(_)) => {
bail!("exiting because executor channel is closed");
}
};
Ok(())
}

Expand Down Expand Up @@ -574,7 +560,7 @@ struct FetchConvertVerifyAndReconstruct {
rollup_id: RollupId,
rollup_namespace: Namespace,
sequencer_namespace: Namespace,
executor: executor::Handle<StateIsInit>,
rollup_state: crate::executor::StateReceiver,
metrics: &'static Metrics,
}

Expand All @@ -593,7 +579,7 @@ impl FetchConvertVerifyAndReconstruct {
rollup_id,
rollup_namespace,
sequencer_namespace,
executor,
rollup_state,
metrics,
} = self;

Expand Down Expand Up @@ -633,7 +619,7 @@ impl FetchConvertVerifyAndReconstruct {
"decoded Sequencer header and rollup info from raw Celestia blobs",
);

let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, executor).await;
let verified_blobs = verify_metadata(blob_verifier, decoded_blobs, rollup_state).await;

metrics.record_sequencer_blocks_metadata_verified_per_celestia_fetch(
verified_blobs.len_header_blobs(),
Expand Down Expand Up @@ -671,15 +657,15 @@ impl FetchConvertVerifyAndReconstruct {

#[instrument(skip_all, err)]
async fn enqueue_block(
executor: executor::Handle<StateIsInit>,
firm_blocks_tx: mpsc::Sender<Box<ReconstructedBlock>>,
block: Box<ReconstructedBlock>,
) -> Result<u64, FirmSendError> {
) -> Result<u64, mpsc::error::SendError<Box<ReconstructedBlock>>> {
let celestia_height = block.celestia_height;
executor.send_firm_block(block).await?;
firm_blocks_tx.send(block).await?;
Ok(celestia_height)
}

#[instrument(skip_all, err)]
#[instrument(skip_all, err, ret(Display))]
async fn get_sequencer_chain_id(client: SequencerClient) -> eyre::Result<tendermint::chain::Id> {
use sequencer_client::Client as _;

Expand Down
8 changes: 2 additions & 6 deletions crates/astria-conductor/src/celestia/verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ use super::{
block_verifier,
convert::ConvertedBlobs,
};
use crate::executor::{
self,
StateIsInit,
};

pub(super) struct VerifiedBlobs {
celestia_height: u64,
Expand Down Expand Up @@ -99,15 +95,15 @@ struct VerificationTaskKey {
pub(super) async fn verify_metadata(
blob_verifier: Arc<BlobVerifier>,
converted_blobs: ConvertedBlobs,
mut executor: executor::Handle<StateIsInit>,
rollup_state: crate::executor::StateReceiver,
) -> VerifiedBlobs {
let (celestia_height, header_blobs, rollup_blobs) = converted_blobs.into_parts();

let mut verification_tasks = JoinMap::new();
let mut verified_header_blobs = HashMap::with_capacity(header_blobs.len());

let next_expected_firm_sequencer_height =
executor.next_expected_firm_sequencer_height().value();
rollup_state.next_expected_firm_sequencer_height().value();

for (index, blob) in header_blobs.into_iter().enumerate() {
if blob.height().value() < next_expected_firm_sequencer_height {
Expand Down
Loading

0 comments on commit 453f66c

Please sign in to comment.