diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 4388d2e707..b80c8417d8 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -68,12 +68,14 @@ jobs: if: needs.run_checker.outputs.run_lint_rust == 'true' steps: - uses: actions/checkout@v4 + - name: Install just + uses: taiki-e/install-action@just - uses: dtolnay/rust-toolchain@master with: toolchain: nightly-2024-09-15 components: rustfmt - name: run rustfmt - run: cargo +nightly-2024-09-15 fmt --all -- --check + run: just lint rust-fmt toml: runs-on: ubuntu-22.04 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0ebe04560b..833358ab33 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -217,6 +217,8 @@ jobs: - uses: actions/checkout@v4 with: submodules: 'true' + - name: Install just + uses: taiki-e/install-action@just - uses: dtolnay/rust-toolchain@master with: toolchain: ${{ env.RUSTUP_TOOLCHAIN }} @@ -230,15 +232,10 @@ jobs: repo-token: ${{ secrets.GITHUB_TOKEN }} - name: run pedantic clippy on workspace crates run: | - cargo clippy --all-targets --all-features \ - -- --warn clippy::pedantic --warn clippy::arithmetic-side-effects \ - --warn clippy::allow_attributes --warn clippy::allow_attributes_without_reason \ - --deny warnings + just lint rust-clippy - name: run pedantic clippy on tools/protobuf-compiler run: | - cargo clippy --manifest-path tools/protobuf-compiler/Cargo.toml \ - --all-targets --all-features \ - -- --warn clippy::pedantic --deny warnings + just lint rust-clippy-tools custom-lints: runs-on: buildjet-8vcpu-ubuntu-2204 @@ -248,6 +245,8 @@ jobs: - uses: actions/checkout@v4 with: submodules: 'true' + - name: Install just + uses: taiki-e/install-action@just - uses: dtolnay/rust-toolchain@master with: # This has to match `rust-toolchain` in the rust-toolchain file of the dylint lints @@ -266,9 +265,7 @@ jobs: run: | : # list all lint packages here to have clippy explicitly test them : # uses the same nightly installed above to work around the entry in rust-toolchain.toml - cargo +nightly-2024-09-05 clippy --all-targets --all-features \ - -p tracing_debug_field \ - -- --warn clippy::pedantic --deny warnings + just lint rust-clippy-custom - name: run dylint clippy on workspace crates env: # set the dylint driver path to the target/ directory so that it's hopefully cached by rust-cache @@ -276,7 +273,7 @@ jobs: DYLINT_RUSTFLAGS: "-D warnings" run: | mkdir -p "$DYLINT_DRIVER_PATH" - cargo dylint --all --workspace + just lint rust-dylint test: if: ${{ always() && !cancelled() }} diff --git a/crates/astria-cli/src/bridge/mod.rs b/crates/astria-cli/src/bridge/mod.rs index cfa4aaaa81..fa65c0bd5a 100644 --- a/crates/astria-cli/src/bridge/mod.rs +++ b/crates/astria-cli/src/bridge/mod.rs @@ -36,8 +36,9 @@ impl Command { #[derive(Debug, Subcommand)] enum SubCommand { - /// Commands for interacting with Sequencer accounts + /// Collect withdrawals actions CollectWithdrawals(collect::Command), + /// Submit collected withdrawal actions SubmitWithdrawals(submit::Command), } diff --git a/crates/astria-cli/src/bridge/submit.rs b/crates/astria-cli/src/bridge/submit.rs index f7753b5bca..e387fce2b6 100644 --- a/crates/astria-cli/src/bridge/submit.rs +++ b/crates/astria-cli/src/bridge/submit.rs @@ -30,14 +30,19 @@ use tracing::{ #[derive(clap::Args, Debug)] pub(crate) struct Command { + /// Path to the file containing the actions to submit #[arg(long, short)] input: PathBuf, + /// Path to the file containing the signing key #[arg(long)] signing_key: PathBuf, + /// The address prefix for the sequencer account #[arg(long, default_value = "astria")] sequencer_address_prefix: String, + /// The chain ID of the sequencer #[arg(long)] sequencer_chain_id: String, + /// The URL of the sequencer rpc #[arg(long)] sequencer_url: String, } diff --git a/crates/astria-cli/src/sequencer/bridge_lock.rs b/crates/astria-cli/src/sequencer/bridge_lock.rs index a629192752..7049d9f11b 100644 --- a/crates/astria-cli/src/sequencer/bridge_lock.rs +++ b/crates/astria-cli/src/sequencer/bridge_lock.rs @@ -22,16 +22,18 @@ pub(super) struct Command { /// The amount being locked #[arg(long)] amount: u128, + /// The address on the destination chain #[arg(long)] destination_chain_address: String, /// The prefix to construct a bech32m address given the private key. #[arg(long, default_value = "astria")] prefix: String, + /// The private key of the account locking the funds + #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] // TODO: https://github.com/astriaorg/astria/issues/594 // Don't use a plain text private, prefer wrapper like from // the secrecy crate with specialized `Debug` and `Drop` implementations // that overwrite the key on drop and don't reveal it when printing. - #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] private_key: String, /// The url of the Sequencer node #[arg(long, env = "SEQUENCER_URL")] diff --git a/crates/astria-cli/src/sequencer/ics20_withdrawal.rs b/crates/astria-cli/src/sequencer/ics20_withdrawal.rs index 68ec8c1d72..81086f3887 100644 --- a/crates/astria-cli/src/sequencer/ics20_withdrawal.rs +++ b/crates/astria-cli/src/sequencer/ics20_withdrawal.rs @@ -60,11 +60,12 @@ pub(super) struct Command { /// The prefix to construct a bech32m address given the private key #[arg(long, default_value = "astria")] prefix: String, + /// The private key of the account withdrawing the funds + #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] // TODO: https://github.com/astriaorg/astria/issues/594 // Don't use a plain text private, prefer wrapper like from // the secrecy crate with specialized `Debug` and `Drop` implementations // that overwrite the key on drop and don't reveal it when printing. - #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] private_key: String, /// The url of the Sequencer node #[arg(long, env = "SEQUENCER_URL")] diff --git a/crates/astria-cli/src/sequencer/init_bridge_account.rs b/crates/astria-cli/src/sequencer/init_bridge_account.rs index 41085bc53d..34a351ca06 100644 --- a/crates/astria-cli/src/sequencer/init_bridge_account.rs +++ b/crates/astria-cli/src/sequencer/init_bridge_account.rs @@ -15,11 +15,12 @@ pub(super) struct Command { /// The bech32m prefix that will be used for constructing addresses using the private key #[arg(long, default_value = "astria")] prefix: String, + /// The private key of the account initializing the bridge account + #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] // TODO: https://github.com/astriaorg/astria/issues/594 // Don't use a plain text private, prefer wrapper like from // the secrecy crate with specialized `Debug` and `Drop` implementations // that overwrite the key on drop and don't reveal it when printing. - #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] private_key: String, /// The url of the Sequencer node #[arg(long, env = "SEQUENCER_URL")] diff --git a/crates/astria-cli/src/sequencer/sudo/fee_asset.rs b/crates/astria-cli/src/sequencer/sudo/fee_asset.rs index dd9a021f19..a04ff1d90a 100644 --- a/crates/astria-cli/src/sequencer/sudo/fee_asset.rs +++ b/crates/astria-cli/src/sequencer/sudo/fee_asset.rs @@ -91,11 +91,12 @@ struct ArgsInner { /// The bech32m prefix that will be used for constructing addresses using the private key #[arg(long, default_value = "astria")] prefix: String, + /// The private key of the sudo account authorizing change + #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] // TODO: https://github.com/astriaorg/astria/issues/594 // Don't use a plain text private, prefer wrapper like from // the secrecy crate with specialized `Debug` and `Drop` implementations // that overwrite the key on drop and don't reveal it when printing. - #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] private_key: String, /// The url of the Sequencer node #[arg(long, env = "SEQUENCER_URL")] diff --git a/crates/astria-cli/src/sequencer/sudo/ibc_relayer.rs b/crates/astria-cli/src/sequencer/sudo/ibc_relayer.rs index 263bb22597..8c546701e0 100644 --- a/crates/astria-cli/src/sequencer/sudo/ibc_relayer.rs +++ b/crates/astria-cli/src/sequencer/sudo/ibc_relayer.rs @@ -88,11 +88,12 @@ struct ArgsInner { /// The prefix to construct a bech32m address given the private key. #[arg(long, default_value = "astria")] prefix: String, + /// The private key of the account authorizing the change + #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] // TODO: https://github.com/astriaorg/astria/issues/594 // Don't use a plain text private, prefer wrapper like from // the secrecy crate with specialized `Debug` and `Drop` implementations // that overwrite the key on drop and don't reveal it when printing. - #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] private_key: String, /// The url of the Sequencer node #[arg(long, env = "SEQUENCER_URL")] diff --git a/crates/astria-cli/src/sequencer/sudo/sudo_address_change.rs b/crates/astria-cli/src/sequencer/sudo/sudo_address_change.rs index 147dc0afc2..d37104ba06 100644 --- a/crates/astria-cli/src/sequencer/sudo/sudo_address_change.rs +++ b/crates/astria-cli/src/sequencer/sudo/sudo_address_change.rs @@ -17,11 +17,12 @@ pub(super) struct Command { /// The bech32m prefix that will be used for constructing addresses using the private key #[arg(long, default_value = "astria")] prefix: String, + /// The private key of account authorizing the change + #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] // TODO: https://github.com/astriaorg/astria/issues/594 // Don't use a plain text private, prefer wrapper like from // the secrecy crate with specialized `Debug` and `Drop` implementations // that overwrite the key on drop and don't reveal it when printing. - #[arg(long, env = "SEQUENCER_PRIVATE_KEY")] private_key: String, /// The url of the Sequencer node #[arg(long, env = "SEQUENCER_URL")] diff --git a/crates/astria-cli/src/sequencer/threshold/dkg.rs b/crates/astria-cli/src/sequencer/threshold/dkg.rs index 8ea7295c1d..8469bbfded 100644 --- a/crates/astria-cli/src/sequencer/threshold/dkg.rs +++ b/crates/astria-cli/src/sequencer/threshold/dkg.rs @@ -48,6 +48,7 @@ pub(super) struct Command { #[arg(long)] public_key_package_path: String, + /// The address prefix for the generated address. #[arg(long, default_value = "astria")] prefix: String, } diff --git a/crates/astria-cli/src/sequencer/transfer.rs b/crates/astria-cli/src/sequencer/transfer.rs index b80c46812f..69a5036f88 100644 --- a/crates/astria-cli/src/sequencer/transfer.rs +++ b/crates/astria-cli/src/sequencer/transfer.rs @@ -17,9 +17,9 @@ use crate::utils::submit_transaction; #[derive(clap::Args, Debug)] pub(super) struct Command { - // The address of the Sequencer account to send amount to + /// The address of the Sequencer account to send amount to to_address: Address, - // The amount being sent + /// The amount being sent #[arg(long)] amount: u128, /// The bech32m prefix that will be used for constructing addresses using the private key diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index e0e1bfa0f7..1bd56d00d2 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -150,7 +150,7 @@ pub(crate) struct Reader { impl Reader { pub(crate) async fn run_until_stopped(mut self) -> eyre::Result { - let ((), executor, sequencer_chain_id) = select!( + let (executor, sequencer_chain_id) = select!( () = self.shutdown.clone().cancelled_owned() => { info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(|| info!("received shutdown signal while waiting for Celestia reader task to initialize") @@ -169,11 +169,10 @@ impl Reader { .await } - // TODO(https://github.com/astriaorg/astria/issues/1879): refactor to not return an empty tuple #[instrument(skip_all, err)] async fn initialize( &mut self, - ) -> eyre::Result<((), executor::Handle, tendermint::chain::Id)> { + ) -> eyre::Result<(executor::Handle, tendermint::chain::Id)> { let executor = self .executor .wait_for_init() @@ -212,7 +211,7 @@ impl Reader { get_and_validate_sequencer_chain_id )?; - Ok(((), executor, sequencer_chain_id)) + Ok((executor, sequencer_chain_id)) } } @@ -511,18 +510,20 @@ impl RunningReader { match self.executor.try_send_firm_block(block) { Ok(()) => self.advance_reference_celestia_height(celestia_height), Err(FirmTrySendError::Channel { - source: mpsc::error::TrySendError::Full(block), - }) => { - trace!( - "executor channel is full; rescheduling block fetch until the channel opens up" - ); - self.enqueued_block = enqueue_block(self.executor.clone(), block).boxed().fuse(); - } - - Err(FirmTrySendError::Channel { - source: mpsc::error::TrySendError::Closed(_), - }) => bail!("exiting because executor channel is closed"), - + source, + }) => match source { + mpsc::error::TrySendError::Full(block) => { + trace!( + "executor channel is full; rescheduling block fetch until the channel \ + opens up" + ); + self.enqueued_block = + enqueue_block(self.executor.clone(), block).boxed().fuse(); + } + mpsc::error::TrySendError::Closed(_) => { + bail!("exiting because executor channel is closed"); + } + }, Err(FirmTrySendError::NotSet) => bail!( "exiting because executor was configured without firm commitments; this Celestia \ reader should have never been started" @@ -658,7 +659,7 @@ impl FetchConvertVerifyAndReconstruct { #[instrument(skip_all, err)] async fn enqueue_block( executor: executor::Handle, - block: ReconstructedBlock, + block: Box, ) -> Result { let celestia_height = block.celestia_height; executor.send_firm_block(block).await?; diff --git a/crates/astria-conductor/src/executor/builder.rs b/crates/astria-conductor/src/executor/builder.rs index 62559f30b9..daf53e5835 100644 --- a/crates/astria-conductor/src/executor/builder.rs +++ b/crates/astria-conductor/src/executor/builder.rs @@ -11,6 +11,7 @@ use super::{ state, Executor, Handle, + ReconstructedBlock, StateNotInit, }; use crate::{ @@ -44,7 +45,7 @@ impl Builder { let mut firm_block_tx = None; let mut firm_block_rx = None; if mode.is_with_firm() { - let (tx, rx) = mpsc::channel(16); + let (tx, rx) = mpsc::channel::>(16); firm_block_tx = Some(tx); firm_block_rx = Some(rx); } diff --git a/crates/astria-conductor/src/executor/mod.rs b/crates/astria-conductor/src/executor/mod.rs index 7acad76aad..cac3b5d984 100644 --- a/crates/astria-conductor/src/executor/mod.rs +++ b/crates/astria-conductor/src/executor/mod.rs @@ -150,7 +150,7 @@ pub(crate) enum FirmSendError { #[error("failed sending blocks to executor")] Channel { #[from] - source: mpsc::error::SendError, + source: mpsc::error::SendError>, }, } @@ -161,7 +161,7 @@ pub(crate) enum FirmTrySendError { #[error("failed sending blocks to executor")] Channel { #[from] - source: mpsc::error::TrySendError, + source: mpsc::error::TrySendError>, }, } @@ -191,7 +191,7 @@ pub(crate) enum SoftTrySendError { /// information. #[derive(Debug, Clone)] pub(crate) struct Handle { - firm_blocks: Option>, + firm_blocks: Option>>, soft_blocks: Option>, state: StateReceiver, _state_init: TStateInit, @@ -222,20 +222,18 @@ impl Handle { #[instrument(skip_all, err)] pub(crate) async fn send_firm_block( self, - block: ReconstructedBlock, + block: impl Into>, ) -> Result<(), FirmSendError> { let sender = self.firm_blocks.as_ref().ok_or(FirmSendError::NotSet)?; - sender.send(block).await?; - Ok(()) + Ok(sender.send(block.into()).await?) } pub(crate) fn try_send_firm_block( &self, - block: ReconstructedBlock, + block: impl Into>, ) -> Result<(), FirmTrySendError> { let sender = self.firm_blocks.as_ref().ok_or(FirmTrySendError::NotSet)?; - sender.try_send(block)?; - Ok(()) + Ok(sender.try_send(block.into())?) } #[instrument(skip_all, err)] @@ -310,7 +308,7 @@ pub(crate) struct Executor { /// The channel of which this executor receives blocks for executing /// firm commitments. /// Only set if `mode` is `FirmOnly` or `SoftAndFirm`. - firm_blocks: Option>, + firm_blocks: Option>>, /// The channel of which this executor receives blocks for executing /// soft commitments. @@ -584,7 +582,7 @@ impl Executor { ))] async fn execute_firm( &mut self, - block: ReconstructedBlock, + block: Box, ) -> eyre::Result> { if self.is_firm_block_height_exceded(&block) { return Ok(Some(StopHeightExceded::celestia( @@ -595,7 +593,7 @@ impl Executor { } let celestia_height = block.celestia_height; - let executable_block = ExecutableBlock::from_reconstructed(block); + let executable_block = ExecutableBlock::from_reconstructed(*block); let expected_height = self.state.next_expected_firm_sequencer_height(); let block_height = executable_block.height; let rollup_start_block_height = self.state.rollup_start_block_height(); diff --git a/crates/astria-sequencer-relayer/src/relayer/mod.rs b/crates/astria-sequencer-relayer/src/relayer/mod.rs index e4e58e0cb7..6f7afb299c 100644 --- a/crates/astria-sequencer-relayer/src/relayer/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/mod.rs @@ -83,6 +83,9 @@ use crate::{ IncludeRollup, }; +type ForwardFut<'a> = + Fuse>>>>; + pub(crate) struct Relayer { /// A token to notify relayer that it should shut down. #[expect( @@ -172,7 +175,7 @@ impl Relayer { // future to forward a sequencer block to the celestia-submission-task. // gets set in the select-loop if the task is at capacity. let mut forward_once_free: Fuse< - BoxFuture>>, + BoxFuture>>>, > = Fuse::terminated(); self.state.set_ready(); @@ -282,9 +285,7 @@ impl Relayer { block: SequencerBlock, block_stream: &mut read::BlockStream, submitter: write::BlobSubmitterHandle, - forward: &mut Fuse< - BoxFuture>>, - >, + forward: &mut ForwardFut, ) -> eyre::Result<()> { assert!( forward.is_terminated(), @@ -292,7 +293,7 @@ impl Relayer { congested and this future is in-flight", ); - if let Err(error) = submitter.try_send(block) { + if let Err(error) = submitter.try_send(Box::new(block)) { debug!( // Just print the error directly: TrySendError has no cause chain. %error, diff --git a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs index bed9880a20..aebb0debd4 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs @@ -86,7 +86,7 @@ struct StartedSubmissionAndFee { #[derive(Clone)] pub(super) struct BlobSubmitterHandle { - tx: mpsc::Sender, + tx: mpsc::Sender>, } impl BlobSubmitterHandle { @@ -95,8 +95,8 @@ impl BlobSubmitterHandle { /// This is a thin wrapper around [`mpsc::Sender::try_send`]. pub(super) fn try_send( &self, - block: SequencerBlock, - ) -> Result<(), TrySendError> { + block: Box, + ) -> Result<(), TrySendError>> { self.tx.try_send(block) } @@ -105,8 +105,8 @@ impl BlobSubmitterHandle { /// This is a thin wrapper around [`mpsc::Sender::send`]. pub(super) async fn send( &self, - block: SequencerBlock, - ) -> Result<(), SendError> { + block: Box, + ) -> Result<(), SendError>> { self.tx.send(block).await } } @@ -116,7 +116,7 @@ pub(super) struct BlobSubmitter { client_builder: CelestiaClientBuilder, /// The channel over which sequencer blocks are received. - blocks: mpsc::Receiver, + blocks: mpsc::Receiver>, /// The accumulator of all data that will be submitted to Celestia on the next submission. next_submission: NextSubmission, @@ -253,7 +253,7 @@ impl BlobSubmitter { sequencer_height = %block.height(), "skipping sequencer block as already included in previous submission" )); - } else if let Err(error) = self.add_sequencer_block_to_next_submission(block) { + } else if let Err(error) = self.add_sequencer_block_to_next_submission(*block) { break Err(error).wrap_err( "critically failed adding Sequencer block to next submission" ); diff --git a/crates/astria-sequencer/src/accounts/component.rs b/crates/astria-sequencer/src/accounts/component.rs index c9d515ec45..b8869d4a7c 100644 --- a/crates/astria-sequencer/src/accounts/component.rs +++ b/crates/astria-sequencer/src/accounts/component.rs @@ -25,7 +25,7 @@ pub(crate) struct AccountsComponent; impl Component for AccountsComponent { type AppState = GenesisAppState; - #[instrument(name = "AccountsComponent::init_chain", skip_all)] + #[instrument(name = "AccountsComponent::init_chain", skip_all, err)] async fn init_chain(mut state: S, app_state: &Self::AppState) -> Result<()> where S: accounts::StateWriteExt + assets::StateReadExt, diff --git a/crates/astria-sequencer/src/accounts/query.rs b/crates/astria-sequencer/src/accounts/query.rs index 00c23a4095..af7d320228 100644 --- a/crates/astria-sequencer/src/accounts/query.rs +++ b/crates/astria-sequencer/src/accounts/query.rs @@ -28,7 +28,10 @@ use tendermint::{ }, block::Height, }; -use tracing::instrument; +use tracing::{ + instrument, + Level, +}; use crate::{ accounts::StateReadExt as _, @@ -36,6 +39,7 @@ use crate::{ assets::StateReadExt as _, }; +#[instrument(skip_all, fields(%asset), err(level = Level::DEBUG))] async fn ibc_to_trace( state: S, asset: &asset::IbcPrefixed, @@ -47,7 +51,7 @@ async fn ibc_to_trace( .ok_or_eyre("asset not found when user has balance of it; this is a bug") } -#[instrument(skip_all, fields(%address))] +#[instrument(skip_all, fields(%address), err(level = Level::DEBUG))] async fn get_trace_prefixed_account_balances( state: &S, address: &Address, @@ -70,6 +74,7 @@ async fn get_trace_prefixed_account_balances( /// Returns a list of [`AssetBalance`]s for the provided address. `AssetBalance`s are sorted /// alphabetically by [`asset::Denom`]. +#[instrument(skip_all)] pub(crate) async fn balance_request( storage: Storage, request: request::Query, @@ -112,6 +117,7 @@ pub(crate) async fn balance_request( } } +#[instrument(skip_all)] pub(crate) async fn nonce_request( storage: Storage, request: request::Query, @@ -150,6 +156,7 @@ pub(crate) async fn nonce_request( } } +#[instrument(skip_all, fields(%height), err(level = Level::DEBUG))] async fn get_snapshot_and_height(storage: &Storage, height: Height) -> Result<(Snapshot, Height)> { let snapshot = match height.value() { 0 => storage.latest_snapshot(), @@ -173,6 +180,7 @@ async fn get_snapshot_and_height(storage: &Storage, height: Height) -> Result<(S Ok((snapshot, height)) } +#[instrument(skip_all)] async fn preprocess_request( storage: &Storage, request: &request::Query, diff --git a/crates/astria-sequencer/src/accounts/state_ext.rs b/crates/astria-sequencer/src/accounts/state_ext.rs index ff466469db..2cae372809 100644 --- a/crates/astria-sequencer/src/accounts/state_ext.rs +++ b/crates/astria-sequencer/src/accounts/state_ext.rs @@ -25,7 +25,10 @@ use cnidarium::{ }; use futures::Stream; use pin_project_lite::pin_project; -use tracing::instrument; +use tracing::{ + instrument, + Level, +}; use super::storage::{ self, @@ -141,7 +144,7 @@ pub(crate) trait StateReadExt: StateRead + crate::assets::StateReadExt { } } - #[instrument(skip_all, fields(address = %address.display_address(), %asset), err)] + #[instrument(skip_all, fields(address = %address.display_address(), %asset), err(level = Level::WARN))] async fn get_account_balance<'a, TAddress, TAsset>( &self, address: &TAddress, @@ -165,7 +168,7 @@ pub(crate) trait StateReadExt: StateRead + crate::assets::StateReadExt { .wrap_err("invalid balance bytes") } - #[instrument(skip_all)] + #[instrument(skip_all, err)] async fn get_account_nonce(&self, address: &T) -> Result { let bytes = self .get_raw(&keys::nonce(address)) @@ -186,7 +189,7 @@ impl StateReadExt for T {} #[async_trait] pub(crate) trait StateWriteExt: StateWrite { - #[instrument(skip_all, fields(address = %address.display_address(), %asset, balance), err)] + #[instrument(skip_all, fields(address = %address.display_address(), %asset, balance), err(level = Level::WARN))] fn put_account_balance<'a, TAddress, TAsset>( &mut self, address: &TAddress, @@ -205,7 +208,7 @@ pub(crate) trait StateWriteExt: StateWrite { Ok(()) } - #[instrument(skip_all)] + #[instrument(skip_all, fields(address = %address.display_address(), nonce), err(level = Level::WARN))] fn put_account_nonce(&mut self, address: &T, nonce: u32) -> Result<()> { let bytes = StoredValue::from(storage::Nonce::from(nonce)) .serialize() @@ -214,7 +217,7 @@ pub(crate) trait StateWriteExt: StateWrite { Ok(()) } - #[instrument(skip_all, fields(address = %address.display_address(), %asset, amount), err)] + #[instrument(skip_all, fields(address = %address.display_address(), %asset, amount), err(level = Level::WARN))] async fn increase_balance<'a, TAddress, TAsset>( &mut self, address: &TAddress, @@ -241,7 +244,7 @@ pub(crate) trait StateWriteExt: StateWrite { Ok(()) } - #[instrument(skip_all, fields(address = %address.display_address(), %asset, amount))] + #[instrument(skip_all, fields(address = %address.display_address(), %asset, amount), err(level = Level::DEBUG))] async fn decrease_balance<'a, TAddress, TAsset>( &mut self, address: &TAddress, diff --git a/crates/astria-sequencer/src/action_handler/impls/bridge_lock.rs b/crates/astria-sequencer/src/action_handler/impls/bridge_lock.rs index 3be717fc61..38c61520c6 100644 --- a/crates/astria-sequencer/src/action_handler/impls/bridge_lock.rs +++ b/crates/astria-sequencer/src/action_handler/impls/bridge_lock.rs @@ -13,6 +13,10 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::{ @@ -36,6 +40,7 @@ impl ActionHandler for BridgeLock { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/bridge_sudo_change.rs b/crates/astria-sequencer/src/action_handler/impls/bridge_sudo_change.rs index a69775cf5f..97f7dbaa57 100644 --- a/crates/astria-sequencer/src/action_handler/impls/bridge_sudo_change.rs +++ b/crates/astria-sequencer/src/action_handler/impls/bridge_sudo_change.rs @@ -7,6 +7,10 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::ActionHandler, @@ -24,6 +28,7 @@ impl ActionHandler for BridgeSudoChange { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/bridge_unlock.rs b/crates/astria-sequencer/src/action_handler/impls/bridge_unlock.rs index d2362a5742..3789fa5bde 100644 --- a/crates/astria-sequencer/src/action_handler/impls/bridge_unlock.rs +++ b/crates/astria-sequencer/src/action_handler/impls/bridge_unlock.rs @@ -10,6 +10,10 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::{ @@ -28,6 +32,7 @@ use crate::{ #[async_trait] impl ActionHandler for BridgeUnlock { // TODO(https://github.com/astriaorg/astria/issues/1430): move checks to the `BridgeUnlock` parsing. + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_stateless(&self) -> Result<()> { ensure!(self.amount > 0, "amount must be greater than zero",); ensure!(self.memo.len() <= 64, "memo must not be more than 64 bytes"); @@ -46,6 +51,7 @@ impl ActionHandler for BridgeUnlock { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/fee_asset_change.rs b/crates/astria-sequencer/src/action_handler/impls/fee_asset_change.rs index c92161e751..fabbe3247c 100644 --- a/crates/astria-sequencer/src/action_handler/impls/fee_asset_change.rs +++ b/crates/astria-sequencer/src/action_handler/impls/fee_asset_change.rs @@ -8,6 +8,10 @@ use async_trait::async_trait; use cnidarium::StateWrite; use futures::StreamExt as _; use tokio::pin; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::ActionHandler, @@ -25,6 +29,7 @@ impl ActionHandler for FeeAssetChange { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> eyre::Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/fee_change.rs b/crates/astria-sequencer/src/action_handler/impls/fee_change.rs index f066ded9ea..3b399ddc40 100644 --- a/crates/astria-sequencer/src/action_handler/impls/fee_change.rs +++ b/crates/astria-sequencer/src/action_handler/impls/fee_change.rs @@ -6,6 +6,10 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::ActionHandler, @@ -22,6 +26,7 @@ impl ActionHandler for FeeChange { /// check that the signer of the transaction is the current sudo address, /// as only that address can change the fee + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> eyre::Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/ibc_relayer_change.rs b/crates/astria-sequencer/src/action_handler/impls/ibc_relayer_change.rs index e5de94afbe..9ddce3ea30 100644 --- a/crates/astria-sequencer/src/action_handler/impls/ibc_relayer_change.rs +++ b/crates/astria-sequencer/src/action_handler/impls/ibc_relayer_change.rs @@ -6,6 +6,10 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::ActionHandler, @@ -23,6 +27,7 @@ impl ActionHandler for IbcRelayerChange { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/ibc_sudo_change.rs b/crates/astria-sequencer/src/action_handler/impls/ibc_sudo_change.rs index f82af648fc..487f71b4e6 100644 --- a/crates/astria-sequencer/src/action_handler/impls/ibc_sudo_change.rs +++ b/crates/astria-sequencer/src/action_handler/impls/ibc_sudo_change.rs @@ -6,6 +6,10 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::ActionHandler, @@ -21,6 +25,7 @@ impl ActionHandler for IbcSudoChange { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/ics20_withdrawal.rs b/crates/astria-sequencer/src/action_handler/impls/ics20_withdrawal.rs index 35d1523ece..cf29705315 100644 --- a/crates/astria-sequencer/src/action_handler/impls/ics20_withdrawal.rs +++ b/crates/astria-sequencer/src/action_handler/impls/ics20_withdrawal.rs @@ -26,7 +26,6 @@ use cnidarium::{ StateRead, StateWrite, }; -use ibc_proto::ibc::apps::transfer::v2::FungibleTokenPacketData; use ibc_types::core::channel::{ ChannelId, PortId, @@ -37,6 +36,11 @@ use penumbra_ibc::component::packet::{ SendPacketWrite as _, Unchecked, }; +use penumbra_proto::core::component::ibc::v1::FungibleTokenPacketData; +use tracing::{ + instrument, + Level, +}; use crate::{ accounts::{ @@ -60,6 +64,7 @@ use crate::{ #[async_trait] impl ActionHandler for action::Ics20Withdrawal { // TODO(https://github.com/astriaorg/astria/issues/1430): move checks to the `Ics20Withdrawal` parsing. + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_stateless(&self) -> Result<()> { ensure!(self.timeout_time() != 0, "timeout time must be non-zero",); ensure!(self.amount() > 0, "amount must be greater than zero",); @@ -95,6 +100,7 @@ impl ActionHandler for action::Ics20Withdrawal { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/init_bridge_account.rs b/crates/astria-sequencer/src/action_handler/impls/init_bridge_account.rs index 85e05de042..fdea8cab4f 100644 --- a/crates/astria-sequencer/src/action_handler/impls/init_bridge_account.rs +++ b/crates/astria-sequencer/src/action_handler/impls/init_bridge_account.rs @@ -9,6 +9,10 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::ActionHandler, @@ -26,6 +30,7 @@ impl ActionHandler for InitBridgeAccount { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/rollup_data_submission.rs b/crates/astria-sequencer/src/action_handler/impls/rollup_data_submission.rs index 3881e221bf..23be358129 100644 --- a/crates/astria-sequencer/src/action_handler/impls/rollup_data_submission.rs +++ b/crates/astria-sequencer/src/action_handler/impls/rollup_data_submission.rs @@ -5,11 +5,16 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::action_handler::ActionHandler; #[async_trait] impl ActionHandler for RollupDataSubmission { + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_stateless(&self) -> Result<()> { // TODO: do we want to place a maximum on the size of the data? // https://github.com/astriaorg/astria/issues/222 diff --git a/crates/astria-sequencer/src/action_handler/impls/sudo_address_change.rs b/crates/astria-sequencer/src/action_handler/impls/sudo_address_change.rs index 7c7cb2e5e2..eb3f4f771a 100644 --- a/crates/astria-sequencer/src/action_handler/impls/sudo_address_change.rs +++ b/crates/astria-sequencer/src/action_handler/impls/sudo_address_change.rs @@ -6,6 +6,10 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::ActionHandler, @@ -25,6 +29,7 @@ impl ActionHandler for SudoAddressChange { /// check that the signer of the transaction is the current sudo address, /// as only that address can change the sudo address + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/transaction.rs b/crates/astria-sequencer/src/action_handler/impls/transaction.rs index a5b66bf4fd..8605906ad4 100644 --- a/crates/astria-sequencer/src/action_handler/impls/transaction.rs +++ b/crates/astria-sequencer/src/action_handler/impls/transaction.rs @@ -18,6 +18,10 @@ use astria_eyre::{ }, }; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ accounts::{ @@ -74,6 +78,7 @@ impl std::error::Error for InvalidNonce {} #[async_trait::async_trait] impl ActionHandler for Transaction { + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_stateless(&self) -> Result<()> { ensure!(!self.actions().is_empty(), "must have at least one action"); @@ -149,7 +154,7 @@ impl ActionHandler for Transaction { // FIXME (https://github.com/astriaorg/astria/issues/1584): because most lines come from delegating (and error wrapping) to the // individual actions. This could be tidied up by implementing `ActionHandler for Action` // and letting it delegate. - #[expect(clippy::too_many_lines, reason = "should be refactored")] + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> Result<()> { // Add the current signed transaction into the ephemeral state in case // downstream actions require access to it. diff --git a/crates/astria-sequencer/src/action_handler/impls/transfer.rs b/crates/astria-sequencer/src/action_handler/impls/transfer.rs index 27d955bca3..75735e8df4 100644 --- a/crates/astria-sequencer/src/action_handler/impls/transfer.rs +++ b/crates/astria-sequencer/src/action_handler/impls/transfer.rs @@ -6,6 +6,10 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::{ @@ -23,6 +27,7 @@ impl ActionHandler for Transfer { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, state: S) -> Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/action_handler/impls/validator_update.rs b/crates/astria-sequencer/src/action_handler/impls/validator_update.rs index 4bd0d72c3c..5ad1ceab06 100644 --- a/crates/astria-sequencer/src/action_handler/impls/validator_update.rs +++ b/crates/astria-sequencer/src/action_handler/impls/validator_update.rs @@ -7,6 +7,10 @@ use astria_eyre::eyre::{ }; use async_trait::async_trait; use cnidarium::StateWrite; +use tracing::{ + instrument, + Level, +}; use crate::{ action_handler::ActionHandler, @@ -23,6 +27,7 @@ impl ActionHandler for ValidatorUpdate { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn check_and_execute(&self, mut state: S) -> Result<()> { let from = state .get_transaction_context() diff --git a/crates/astria-sequencer/src/address/state_ext.rs b/crates/astria-sequencer/src/address/state_ext.rs index 0f5cd5bf30..b217e80a56 100644 --- a/crates/astria-sequencer/src/address/state_ext.rs +++ b/crates/astria-sequencer/src/address/state_ext.rs @@ -16,16 +16,23 @@ use cnidarium::{ StateRead, StateWrite, }; -use tracing::instrument; +use tracing::{ + instrument, + Level, +}; use super::storage::{ self, keys, }; -use crate::storage::StoredValue; +use crate::{ + accounts::AddressBytes as _, + storage::StoredValue, +}; #[async_trait] pub(crate) trait StateReadExt: StateRead { + #[instrument(skip_all, fields(address = %address.display_address()), err(level = Level::DEBUG))] async fn ensure_base_prefix(&self, address: &Address) -> Result<()> { let prefix = self .get_base_prefix() @@ -39,6 +46,7 @@ pub(crate) trait StateReadExt: StateRead { Ok(()) } + #[instrument(skip_all, err(level = Level::DEBUG))] async fn try_base_prefixed(&self, slice: &[u8]) -> Result
{ let prefix = self .get_base_prefix() @@ -51,7 +59,7 @@ pub(crate) trait StateReadExt: StateRead { .wrap_err("failed to construct address from byte slice and state-provided base prefix") } - #[instrument(skip_all, err)] + #[instrument(skip_all, err(level = Level::WARN))] async fn get_base_prefix(&self) -> Result { let Some(bytes) = self .get_raw(keys::BASE_PREFIX) @@ -66,7 +74,7 @@ pub(crate) trait StateReadExt: StateRead { .context("invalid base prefix bytes") } - #[instrument(skip_all, err)] + #[instrument(skip_all, err(level = Level::WARN))] async fn get_ibc_compat_prefix(&self) -> Result { let Some(bytes) = self .get_raw(keys::IBC_COMPAT_PREFIX) diff --git a/crates/astria-sequencer/src/app/benchmarks.rs b/crates/astria-sequencer/src/app/benchmarks.rs index 390f650de1..d11723c18b 100644 --- a/crates/astria-sequencer/src/app/benchmarks.rs +++ b/crates/astria-sequencer/src/app/benchmarks.rs @@ -93,7 +93,7 @@ impl Fixture { } #[divan::bench(max_time = MAX_TIME)] -fn execute_transactions_prepare_proposal(bencher: divan::Bencher) { +fn prepare_proposal_tx_execution(bencher: divan::Bencher) { let runtime = tokio::runtime::Builder::new_multi_thread() .enable_all() .build() @@ -105,7 +105,7 @@ fn execute_transactions_prepare_proposal(bencher: divan::Bencher) { let (_tx_bytes, included_txs) = runtime.block_on(async { fixture .app - .execute_transactions_prepare_proposal(constraints) + .prepare_proposal_tx_execution(constraints) .await .unwrap() }); diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 78386c6f9f..9a5064ff60 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -22,6 +22,7 @@ use std::{ use astria_core::{ generated::astria::protocol::transaction::v1 as raw, + primitive::v1::TRANSACTION_ID_LEN, protocol::{ abci::AbciErrorCode, genesis::v1::GenesisAppState, @@ -79,6 +80,7 @@ use tracing::{ debug, info, instrument, + Level, }; pub(crate) use self::state_ext::{ @@ -235,6 +237,7 @@ pub(crate) struct App { } impl App { + #[instrument(name = "App::new", skip_all, err)] pub(crate) async fn new( snapshot: Snapshot, mempool: Mempool, @@ -268,7 +271,7 @@ impl App { }) } - #[instrument(name = "App:init_chain", skip_all)] + #[instrument(name = "App:init_chain", skip_all, err)] pub(crate) async fn init_chain( &mut self, storage: Storage, @@ -355,7 +358,7 @@ impl App { /// It puts this special "commitment" as the first transaction in a block. /// When other validators receive the block, they know the first transaction is /// supposed to be the commitment, and verifies that is it correct. - #[instrument(name = "App::prepare_proposal", skip_all)] + #[instrument(name = "App::prepare_proposal", skip_all, err(level = Level::WARN))] pub(crate) async fn prepare_proposal( &mut self, prepare_proposal: abci::request::PrepareProposal, @@ -384,7 +387,7 @@ impl App { // ignore the txs passed by cometbft in favour of our app-side mempool let (included_tx_bytes, signed_txs_included) = self - .execute_transactions_prepare_proposal(&mut block_size_constraints) + .prepare_proposal_tx_execution(&mut block_size_constraints) .await .wrap_err("failed to execute transactions")?; self.metrics @@ -405,7 +408,7 @@ impl App { /// Generates a commitment to the `sequence::Actions` in the block's transactions /// and ensures it matches the commitment created by the proposer, which /// should be the first transaction in the block. - #[instrument(name = "App::process_proposal", skip_all)] + #[instrument(name = "App::process_proposal", skip_all, err(level = Level::WARN))] pub(crate) async fn process_proposal( &mut self, process_proposal: abci::request::ProcessProposal, @@ -500,7 +503,7 @@ impl App { .collect::>(); let tx_results = self - .execute_transactions_process_proposal(signed_txs.clone(), &mut block_size_constraints) + .process_proposal_tx_execution(signed_txs.clone(), &mut block_size_constraints) .await .wrap_err("failed to execute transactions")?; @@ -565,20 +568,24 @@ impl App { /// /// As a result, all transactions in a sequencer block are guaranteed to execute /// successfully. - #[instrument(name = "App::execute_transactions_prepare_proposal", skip_all)] - async fn execute_transactions_prepare_proposal( + #[instrument(name = "App::prepare_proposal_tx_execution", skip_all, err(level = Level::DEBUG))] + async fn prepare_proposal_tx_execution( &mut self, block_size_constraints: &mut BlockSizeConstraints, ) -> Result<(Vec, Vec)> { let mempool_len = self.mempool.len().await; debug!(mempool_len, "executing transactions from mempool"); - let mut validated_txs: Vec = Vec::new(); - let mut included_signed_txs = Vec::new(); - let mut failed_tx_count: usize = 0; - let mut execution_results = Vec::new(); - let mut excluded_txs: usize = 0; - let mut current_tx_group = Group::BundleableGeneral; + let mut proposal_info = Proposal::Prepare { + validated_txs: Vec::new(), + included_signed_txs: Vec::new(), + failed_tx_count: 0, + execution_results: Vec::new(), + excluded_txs: 0, + current_tx_group: Group::BundleableGeneral, + mempool: self.mempool.clone(), + metrics: self.metrics, + }; // get copy of transactions to execute from mempool let pending_txs = self @@ -590,122 +597,33 @@ impl App { let mut unused_count = pending_txs.len(); for (tx_hash, tx) in pending_txs { unused_count = unused_count.saturating_sub(1); - let tx_hash_base64 = telemetry::display::base64(&tx_hash).to_string(); - let bytes = tx.to_raw().encode_to_vec(); - let tx_len = bytes.len(); - info!(transaction_hash = %tx_hash_base64, "executing transaction"); - - // don't include tx if it would make the cometBFT block too large - if !block_size_constraints.cometbft_has_space(tx_len) { - self.metrics - .increment_prepare_proposal_excluded_transactions_cometbft_space(); - debug!( - transaction_hash = %tx_hash_base64, - block_size_constraints = %json(&block_size_constraints), - tx_data_bytes = tx_len, - "excluding remaining transactions: max cometBFT data limit reached" - ); - excluded_txs = excluded_txs.saturating_add(1); - - // break from loop, as the block is full - break; - } - - // check if tx's sequence data will fit into sequence block - let tx_sequence_data_bytes = tx - .unsigned_transaction() - .actions() - .iter() - .filter_map(Action::as_rollup_data_submission) - .fold(0usize, |acc, seq| acc.saturating_add(seq.data.len())); - - if !block_size_constraints.sequencer_has_space(tx_sequence_data_bytes) { - self.metrics - .increment_prepare_proposal_excluded_transactions_sequencer_space(); - debug!( - transaction_hash = %tx_hash_base64, - block_size_constraints = %json(&block_size_constraints), - tx_data_bytes = tx_sequence_data_bytes, - "excluding transaction: max block sequenced data limit reached" - ); - excluded_txs = excluded_txs.saturating_add(1); - - // continue as there might be non-sequence txs that can fit - continue; - } - - // ensure transaction's group is less than or equal to current action group - let tx_group = tx.group(); - if tx_group > current_tx_group { - debug!( - transaction_hash = %tx_hash_base64, - block_size_constraints = %json(&block_size_constraints), - "excluding transaction: group is higher priority than previously included transactions" - ); - excluded_txs = excluded_txs.saturating_add(1); - - // note: we don't remove the tx from mempool as it may be valid in the future - continue; - } - // execute tx and store in `execution_results` list on success - match self.execute_transaction(tx.clone()).await { - Ok(events) => { - execution_results.push(ExecTxResult { - events, - ..Default::default() - }); - block_size_constraints - .sequencer_checked_add(tx_sequence_data_bytes) - .wrap_err("error growing sequencer block size")?; - block_size_constraints - .cometbft_checked_add(tx_len) - .wrap_err("error growing cometBFT block size")?; - validated_txs.push(bytes.into()); - included_signed_txs.push((*tx).clone()); - } - Err(e) => { - self.metrics - .increment_prepare_proposal_excluded_transactions_failed_execution(); - debug!( - transaction_hash = %tx_hash_base64, - error = AsRef::::as_ref(&e), - "failed to execute transaction, not including in block" - ); - - if e.downcast_ref::().is_some() { - // we don't remove the tx from mempool if it failed to execute - // due to an invalid nonce, as it may be valid in the future. - // if it's invalid due to the nonce being too low, it'll be - // removed from the mempool in `update_mempool_after_finalization`. - // - // this is important for possible out-of-order transaction - // groups fed into prepare_proposal. a transaction with a higher - // nonce might be in a higher priority group than a transaction - // from the same account wiht a lower nonce. this higher nonce - // could execute in the next block fine. - } else { - failed_tx_count = failed_tx_count.saturating_add(1); - - // remove the failing transaction from the mempool - // - // this will remove any transactions from the same sender - // as well, as the dependent nonces will not be able - // to execute - self.mempool - .remove_tx_invalid( - tx, - RemovalReason::FailedPrepareProposal(e.to_string()), - ) - .await; - } - } + if BreakOrContinue::Break + == proposal_checks_and_tx_execution( + self, + tx, + Some(tx_hash), + block_size_constraints, + &mut proposal_info, + ) + .await? + { + break; } - - // update current action group to tx's action group - current_tx_group = tx_group; } + let Proposal::Prepare { + validated_txs, + included_signed_txs, + failed_tx_count, + execution_results, + excluded_txs, + .. + } = proposal_info + else { + bail!("expected `Proposal::Prepare`, received `Proposal::Process`") + }; + if failed_tx_count > 0 { info!( failed_tx_count = failed_tx_count, @@ -745,77 +663,33 @@ impl App { /// /// As a result, all transactions in a sequencer block are guaranteed to execute /// successfully. - #[instrument(name = "App::execute_transactions_process_proposal", skip_all)] - async fn execute_transactions_process_proposal( + #[instrument(name = "App::process_proposal_tx_execution", skip_all, err(level = Level::DEBUG))] + async fn process_proposal_tx_execution( &mut self, txs: Vec, block_size_constraints: &mut BlockSizeConstraints, ) -> Result> { - let mut execution_results = Vec::new(); - let mut current_tx_group = Group::BundleableGeneral; + let mut proposal_info = Proposal::Process { + execution_results: vec![], + current_tx_group: Group::BundleableGeneral, + }; for tx in txs { - let bytes = tx.to_raw().encode_to_vec(); - let tx_hash = Sha256::digest(&bytes); - let tx_len = bytes.len(); - - // check if tx's sequence data will fit into sequence block - let tx_sequence_data_bytes = tx - .unsigned_transaction() - .actions() - .iter() - .filter_map(Action::as_rollup_data_submission) - .fold(0usize, |acc, seq| acc.saturating_add(seq.data.len())); - - if !block_size_constraints.sequencer_has_space(tx_sequence_data_bytes) { - debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), - block_size_constraints = %json(&block_size_constraints), - tx_data_bytes = tx_sequence_data_bytes, - "transaction error: max block sequenced data limit passed" - ); - bail!("max block sequenced data limit passed"); - } - - // ensure transaction's group is less than or equal to current action group - let tx_group = tx.group(); - if tx_group > current_tx_group { - debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), - "transaction error: block has incorrect transaction group ordering" - ); - bail!("transactions have incorrect transaction group ordering"); - } - - // execute tx and store in `execution_results` list on success - match self.execute_transaction(Arc::new(tx.clone())).await { - Ok(events) => { - execution_results.push(ExecTxResult { - events, - ..Default::default() - }); - block_size_constraints - .sequencer_checked_add(tx_sequence_data_bytes) - .wrap_err("error growing sequencer block size")?; - block_size_constraints - .cometbft_checked_add(tx_len) - .wrap_err("error growing cometBFT block size")?; - } - Err(e) => { - debug!( - transaction_hash = %telemetry::display::base64(&tx_hash), - error = AsRef::::as_ref(&e), - "transaction error: failed to execute transaction" - ); - return Err(e.wrap_err("transaction failed to execute")); - } + let tx = Arc::new(tx); + if BreakOrContinue::Break + == proposal_checks_and_tx_execution( + self, + tx, + None, + block_size_constraints, + &mut proposal_info, + ) + .await? + { + break; } - - // update current action group to tx's action group - current_tx_group = tx_group; } - - Ok(execution_results) + Ok(proposal_info.execution_results()) } /// sets up the state for execution of the block's transactions. @@ -823,7 +697,7 @@ impl App { /// /// this *must* be called anytime before a block's txs are executed, whether it's /// during the proposal phase, or finalize_block phase. - #[instrument(name = "App::pre_execute_transactions", skip_all, err)] + #[instrument(name = "App::pre_execute_transactions", skip_all, err(level = Level::WARN))] async fn pre_execute_transactions(&mut self, block_data: BlockData) -> Result<()> { let chain_id = self .state @@ -878,7 +752,7 @@ impl App { /// `SequencerBlock`. /// /// this must be called after a block's transactions are executed. - #[instrument(name = "App::post_execute_transactions", skip_all)] + #[instrument(name = "App::post_execute_transactions", skip_all, err(level = Level::WARN))] async fn post_execute_transactions( &mut self, block_hash: Hash, @@ -962,7 +836,7 @@ impl App { /// /// This is called by cometbft after the block has already been /// committed by the network's consensus. - #[instrument(name = "App::finalize_block", skip_all)] + #[instrument(name = "App::finalize_block", skip_all, err)] pub(crate) async fn finalize_block( &mut self, finalize_block: abci::request::FinalizeBlock, @@ -1076,7 +950,7 @@ impl App { Ok(finalize_block) } - #[instrument(skip_all, err)] + #[instrument(skip_all, err(level = Level::WARN))] async fn prepare_commit(&mut self, storage: Storage) -> Result { // extract the state we've built up to so we can prepare it as a `StagedWriteBatch`. let dummy_state = StateDelta::new(storage.latest_snapshot()); @@ -1113,7 +987,7 @@ impl App { Ok(app_hash) } - #[instrument(name = "App::begin_block", skip_all)] + #[instrument(name = "App::begin_block", skip_all, err(level = Level::WARN))] async fn begin_block( &mut self, begin_block: &abci::request::BeginBlock, @@ -1149,7 +1023,7 @@ impl App { } /// Executes a signed transaction. - #[instrument(name = "App::execute_transaction", skip_all)] + #[instrument(name = "App::execute_transaction", skip_all, err(level = Level::DEBUG))] async fn execute_transaction(&mut self, signed_tx: Arc) -> Result> { signed_tx .check_stateless() @@ -1186,7 +1060,7 @@ impl App { Ok(events) } - #[instrument(name = "App::end_block", skip_all)] + #[instrument(name = "App::end_block", skip_all, err(level = Level::WARN))] async fn end_block( &mut self, height: u64, @@ -1333,3 +1207,248 @@ struct PostTransactionExecutionResult { validator_updates: Vec, consensus_param_updates: Option, } + +#[derive(PartialEq)] +enum BreakOrContinue { + Break, + Continue, +} + +enum Proposal { + Prepare { + validated_txs: Vec, + included_signed_txs: Vec, + failed_tx_count: usize, + execution_results: Vec, + excluded_txs: usize, + current_tx_group: Group, + mempool: Mempool, + metrics: &'static Metrics, + }, + Process { + execution_results: Vec, + current_tx_group: Group, + }, +} + +impl Proposal { + fn current_tx_group(&self) -> Group { + match self { + Proposal::Prepare { + current_tx_group, .. + } + | Proposal::Process { + current_tx_group, .. + } => *current_tx_group, + } + } + + fn set_current_tx_group(&mut self, group: Group) { + match self { + Proposal::Prepare { + current_tx_group, .. + } + | Proposal::Process { + current_tx_group, .. + } => *current_tx_group = group, + } + } + + fn execution_results_mut(&mut self) -> &mut Vec { + match self { + Proposal::Prepare { + execution_results, .. + } + | Proposal::Process { + execution_results, .. + } => execution_results, + } + } + + fn execution_results(self) -> Vec { + match self { + Proposal::Prepare { + execution_results, .. + } + | Proposal::Process { + execution_results, .. + } => execution_results, + } + } +} + +#[instrument(skip_all)] +async fn proposal_checks_and_tx_execution( + app: &mut App, + tx: Arc, + // `prepare_proposal_tx_execution` already has the tx hash, so we pass it in here + tx_hash: Option<[u8; TRANSACTION_ID_LEN]>, + block_size_constraints: &mut BlockSizeConstraints, + proposal_info: &mut Proposal, +) -> Result { + let tx_bytes = tx.to_raw().encode_to_vec(); + let tx_hash_base_64 = + telemetry::display::base64(tx_hash.unwrap_or_else(|| Sha256::digest(&tx_bytes).into())) + .to_string(); + let tx_len = tx_bytes.len(); + + info!(transaction_hash = %tx_hash_base_64, "executing transaction"); + + // check CometBFT size constraints for `prepare_proposal` + if let Proposal::Prepare { + metrics, + excluded_txs, + .. + } = proposal_info + { + if !block_size_constraints.cometbft_has_space(tx_len) { + metrics.increment_prepare_proposal_excluded_transactions_cometbft_space(); + debug!( + transaction_hash = %tx_hash_base_64, + block_size_constraints = %json(&block_size_constraints), + tx_data_bytes = tx_len, + "excluding remaining transactions: max cometBFT data limit reached" + ); + *excluded_txs = excluded_txs.saturating_add(1); + + // break from calling loop, as the block is full + return Ok(BreakOrContinue::Break); + } + } + + let debug_msg = match proposal_info { + Proposal::Prepare { + .. + } => "excluding transaction", + Proposal::Process { + .. + } => "transaction error", + }; + + // check sequencer size constraints + let tx_sequence_data_bytes = tx + .unsigned_transaction() + .actions() + .iter() + .filter_map(Action::as_rollup_data_submission) + .fold(0usize, |acc, seq| acc.saturating_add(seq.data.len())); + if !block_size_constraints.sequencer_has_space(tx_sequence_data_bytes) { + debug!( + transaction_hash = %tx_hash_base_64, + block_size_constraints = %json(&block_size_constraints), + tx_data_bytes = tx_sequence_data_bytes, + "{debug_msg}: max block sequenced data limit reached" + ); + match proposal_info { + Proposal::Prepare { + metrics, + excluded_txs, + .. + } => { + metrics.increment_prepare_proposal_excluded_transactions_sequencer_space(); + *excluded_txs = excluded_txs.saturating_add(1); + + // continue as there might be non-sequence txs that can fit + return Ok(BreakOrContinue::Continue); + } + Proposal::Process { + .. + } => bail!("max block sequenced data limit passed"), + }; + } + + // ensure transaction's group is less than or equal to current action group + let tx_group = tx.group(); + if tx_group > proposal_info.current_tx_group() { + debug!( + transaction_hash = %tx_hash_base_64, + "{debug_msg}: group is higher priority than previously included transactions" + ); + match proposal_info { + Proposal::Prepare { + excluded_txs, .. + } => { + *excluded_txs = excluded_txs.saturating_add(1); + return Ok(BreakOrContinue::Continue); + } + Proposal::Process { + .. + } => { + bail!("transactions have incorrect transaction group ordering"); + } + }; + } + + let execution_results = proposal_info.execution_results_mut(); + match app.execute_transaction(tx.clone()).await { + Ok(events) => { + execution_results.push(ExecTxResult { + events, + ..Default::default() + }); + block_size_constraints + .sequencer_checked_add(tx_sequence_data_bytes) + .wrap_err("error growing sequencer block size")?; + block_size_constraints + .cometbft_checked_add(tx_len) + .wrap_err("error growing cometBFT block size")?; + if let Proposal::Prepare { + validated_txs, + included_signed_txs, + .. + } = proposal_info + { + validated_txs.push(tx_bytes.into()); + included_signed_txs.push((*tx).clone()); + } + } + Err(e) => { + debug!( + transaction_hash = %tx_hash_base_64, + error = AsRef::::as_ref(&e), + "{debug_msg}: failed to execute transaction" + ); + match proposal_info { + Proposal::Prepare { + metrics, + failed_tx_count, + mempool, + .. + } => { + metrics.increment_prepare_proposal_excluded_transactions_failed_execution(); + if e.downcast_ref::().is_some() { + // we don't remove the tx from mempool if it failed to execute + // due to an invalid nonce, as it may be valid in the future. + // if it's invalid due to the nonce being too low, it'll be + // removed from the mempool in `update_mempool_after_finalization`. + // + // this is important for possible out-of-order transaction + // groups fed into prepare_proposal. a transaction with a higher + // nonce might be in a higher priority group than a transaction + // from the same account wiht a lower nonce. this higher nonce + // could execute in the next block fine. + } else { + *failed_tx_count = failed_tx_count.saturating_add(1); + + // remove the failing transaction from the mempool + // + // this will remove any transactions from the same sender + // as well, as the dependent nonces will not be able + // to execute + mempool + .remove_tx_invalid( + tx, + RemovalReason::FailedPrepareProposal(e.to_string()), + ) + .await; + } + } + Proposal::Process { + .. + } => return Err(e.wrap_err("transaction failed to execute")), + } + } + }; + proposal_info.set_current_tx_group(tx_group); + Ok(BreakOrContinue::Continue) +} diff --git a/crates/astria-sequencer/src/app/state_ext.rs b/crates/astria-sequencer/src/app/state_ext.rs index b9a2e3b78e..d1dfb5e07a 100644 --- a/crates/astria-sequencer/src/app/state_ext.rs +++ b/crates/astria-sequencer/src/app/state_ext.rs @@ -12,7 +12,10 @@ use cnidarium::{ StateWrite, }; use tendermint::Time; -use tracing::instrument; +use tracing::{ + instrument, + Level, +}; use super::storage::{ self, @@ -22,7 +25,7 @@ use crate::storage::StoredValue; #[async_trait] pub(crate) trait StateReadExt: StateRead { - #[instrument(skip_all)] + #[instrument(skip_all, err(level = Level::WARN))] async fn get_chain_id(&self) -> Result { let Some(bytes) = self .get_raw(keys::CHAIN_ID) @@ -37,7 +40,7 @@ pub(crate) trait StateReadExt: StateRead { .wrap_err("invalid chain id bytes") } - #[instrument(skip_all)] + #[instrument(skip_all, err(level = Level::WARN))] async fn get_revision_number(&self) -> Result { let Some(bytes) = self .get_raw(keys::REVISION_NUMBER) @@ -52,7 +55,7 @@ pub(crate) trait StateReadExt: StateRead { .wrap_err("invalid revision number bytes") } - #[instrument(skip_all)] + #[instrument(skip_all, err(level = Level::WARN))] async fn get_block_height(&self) -> Result { let Some(bytes) = self .get_raw(keys::BLOCK_HEIGHT) @@ -67,7 +70,7 @@ pub(crate) trait StateReadExt: StateRead { .context("invalid block height bytes") } - #[instrument(skip_all)] + #[instrument(skip_all, err(level = Level::WARN))] async fn get_block_timestamp(&self) -> Result