Skip to content

Commit

Permalink
Allow packet clearing to be done through event relayer (#535)
Browse files Browse the repository at this point in the history
* Simplify trait bounds for QueryCosmosPacketAcknowledgements

* Remove height returned from PacketCommitmentsQuerier

* Remove height argument from query_send_packets_from_sequences

* Remove height argument in ack clearing

* Remove height argument in PacketRelayer

* Simplify trait bounds

* Implement BlockEventsQuerier

* Implement RelayWithPolledEvents

* Add BlockEventsQuerier middleware

* Use RelayWithPolledEvents to implement AutoRelayer

* Implement AutoRelayStartingCurrentHeight

* Implement HasBiRelayAt for CosmosRelayDriver

* Initial clearing test is working

* Full IBC transfer is working

* Clean up constraints

* Try running only clearing test

* Abort background relayer on drop

* Separate out IBC transfer and packet clearing tests

* Keep drop handle in local variable
  • Loading branch information
soareschen authored Feb 14, 2025
1 parent 063c43f commit 9865ee3
Show file tree
Hide file tree
Showing 41 changed files with 713 additions and 198 deletions.
71 changes: 71 additions & 0 deletions crates/chain/chain-components/src/impls/queries/block_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use alloc::vec::Vec;
use core::marker::PhantomData;
use core::time::Duration;

use cgp::prelude::*;
use hermes_chain_type_components::traits::types::event::HasEventType;
use hermes_chain_type_components::traits::types::height::HasHeightType;
use hermes_runtime_components::traits::runtime::HasRuntime;
use hermes_runtime_components::traits::sleep::CanSleep;

use crate::traits::queries::block_events::{BlockEventsQuerier, BlockEventsQuerierComponent};
use crate::traits::queries::chain_status::CanQueryChainHeight;

pub struct WaitBlockHeightAndQueryEvents<InQuerier>(pub PhantomData<InQuerier>);

#[cgp_provider(BlockEventsQuerierComponent)]
impl<Chain, InQuerier> BlockEventsQuerier<Chain> for WaitBlockHeightAndQueryEvents<InQuerier>
where
Chain: HasRuntime + HasEventType + CanQueryChainHeight,
InQuerier: BlockEventsQuerier<Chain>,
Chain::Runtime: CanSleep,
{
async fn query_block_events(
chain: &Chain,
height: &Chain::Height,
) -> Result<Vec<Chain::Event>, Chain::Error> {
let runtime = chain.runtime();

loop {
let current_height = chain.query_chain_height().await?;
if &current_height >= height {
break;
} else {
runtime.sleep(Duration::from_millis(200)).await;
}
}

InQuerier::query_block_events(chain, height).await
}
}

pub struct RetryQueryBlockEvents<const MAX_RETRIES: usize, InQuerier>(pub PhantomData<InQuerier>);

#[cgp_provider(BlockEventsQuerierComponent)]
impl<Chain, InQuerier, const MAX_RETRIES: usize> BlockEventsQuerier<Chain>
for RetryQueryBlockEvents<MAX_RETRIES, InQuerier>
where
Chain: HasRuntime + HasHeightType + HasEventType + HasAsyncErrorType,
InQuerier: BlockEventsQuerier<Chain>,
Chain::Runtime: CanSleep,
{
async fn query_block_events(
chain: &Chain,
height: &Chain::Height,
) -> Result<Vec<Chain::Event>, Chain::Error> {
let runtime = chain.runtime();
let mut sleep_time = Duration::from_millis(500);

for _ in 0..MAX_RETRIES {
let res = InQuerier::query_block_events(chain, height).await;
if let Ok(events) = res {
return Ok(events);
}

runtime.sleep(sleep_time).await;
sleep_time *= 2;
}

InQuerier::query_block_events(chain, height).await
}
}
1 change: 1 addition & 0 deletions crates/chain/chain-components/src/impls/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod block_events;
pub mod consensus_state_height;
pub mod consensus_state_heights;
pub mod query_and_convert_client_state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,6 @@ where
counterparty_channel_id: &Self::ChannelId,
counterparty_port_id: &Self::PortId,
sequences: &[Counterparty::Sequence],
// The height is given to query the packets from a specific height.
// This height should be the same as the query height from the
// `CanQueryPacketAcknowledgements` made on the same chain.
height: &Self::Height,
) -> Result<
Vec<(
Counterparty::OutgoingPacket,
Expand Down Expand Up @@ -75,7 +71,6 @@ where
counterparty_channel_id: &Self::ChannelId,
counterparty_port_id: &Self::PortId,
sequence: &Counterparty::Sequence,
height: &Self::Height,
) -> Result<
(
Counterparty::OutgoingPacket,
Expand Down
17 changes: 17 additions & 0 deletions crates/chain/chain-components/src/traits/queries/block_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use alloc::vec::Vec;

use cgp::prelude::*;
use hermes_chain_type_components::traits::types::event::HasEventType;
use hermes_chain_type_components::traits::types::height::HasHeightType;

#[cgp_component {
provider: BlockEventsQuerier,
context: Chain,
}]
#[async_trait]
pub trait CanQueryBlockEvents: HasHeightType + HasEventType + HasAsyncErrorType {
async fn query_block_events(
&self,
height: &Self::Height,
) -> Result<Vec<Self::Event>, Self::Error>;
}
1 change: 1 addition & 0 deletions crates/chain/chain-components/src/traits/queries/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod ack_packets;
pub mod block;
pub mod block_events;
pub mod chain_status;
pub mod channel_end;
pub mod client_state;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use alloc::vec::Vec;

use cgp::prelude::*;
use hermes_chain_type_components::traits::types::height::HasHeightType;
use hermes_chain_type_components::traits::types::ibc::channel_id::HasChannelIdType;
use hermes_chain_type_components::traits::types::ibc::port_id::HasPortIdType;
use hermes_chain_type_components::traits::types::ibc::sequence::HasSequenceType;
Expand All @@ -12,7 +11,7 @@ use hermes_chain_type_components::traits::types::ibc::sequence::HasSequenceType;
}]
#[async_trait]
pub trait CanQueryPacketAcknowledgements<Counterparty>:
HasHeightType + HasChannelIdType<Counterparty> + HasPortIdType<Counterparty> + HasAsyncErrorType
HasChannelIdType<Counterparty> + HasPortIdType<Counterparty> + HasAsyncErrorType
where
Counterparty: HasSequenceType<Self>,
{
Expand All @@ -25,5 +24,5 @@ where
channel_id: &Self::ChannelId,
port_id: &Self::PortId,
sequences: &[Counterparty::Sequence],
) -> Result<Option<(Vec<Counterparty::Sequence>, Self::Height)>, Self::Error>;
) -> Result<Option<Vec<Counterparty::Sequence>>, Self::Error>;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use alloc::vec::Vec;

use cgp::prelude::*;
use hermes_chain_type_components::traits::types::height::HasHeightType;
use hermes_chain_type_components::traits::types::ibc::channel_id::HasChannelIdType;
use hermes_chain_type_components::traits::types::ibc::port_id::HasPortIdType;
use hermes_chain_type_components::traits::types::ibc::sequence::HasSequenceType;
Expand All @@ -12,8 +11,7 @@ use hermes_chain_type_components::traits::types::ibc::sequence::HasSequenceType;
}]
#[async_trait]
pub trait CanQueryPacketCommitments<Counterparty>:
HasHeightType
+ HasChannelIdType<Counterparty>
HasChannelIdType<Counterparty>
+ HasPortIdType<Counterparty>
+ HasSequenceType<Counterparty>
+ HasAsyncErrorType
Expand All @@ -26,5 +24,5 @@ pub trait CanQueryPacketCommitments<Counterparty>:
&self,
channel_id: &Self::ChannelId,
port_id: &Self::PortId,
) -> Result<(Vec<Self::Sequence>, Self::Height), Self::Error>;
) -> Result<Vec<Self::Sequence>, Self::Error>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ pub trait CanQuerySendPackets<Counterparty>:
counterparty_channel_id: &ChannelIdOf<Counterparty, Self>,
counterparty_port_id: &PortIdOf<Counterparty, Self>,
sequences: &[Self::Sequence],
// The height is given to query the packets from a specific height.
// This height should be the same as the query height from the
// `CanQueryPacketCommitments` made on the same chain.
height: &Self::Height,
) -> Result<Vec<Self::OutgoingPacket>, Self::Error>;
}

Expand All @@ -63,6 +59,5 @@ pub trait CanQuerySendPacket<Counterparty>:
counterparty_channel_id: &ChannelIdOf<Counterparty, Self>,
counterparty_port_id: &PortIdOf<Counterparty, Self>,
sequence: &Self::Sequence,
height: &Self::Height,
) -> Result<Self::OutgoingPacket, Self::Error>;
}
13 changes: 3 additions & 10 deletions crates/cli/cli/src/commands/query/packet/commitments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ use cgp::prelude::*;
use hermes_cli_components::traits::build::CanLoadBuilder;
use hermes_cli_components::traits::command::CommandRunnerComponent;
use hermes_cli_framework::command::CommandRunner;
use hermes_cli_framework::output::{json, Output};
use hermes_cli_framework::output::Output;
use hermes_cosmos_relayer::contexts::chain::CosmosChain;
use hermes_relayer_components::chain::traits::queries::packet_commitments::CanQueryPacketCommitments;
use ibc::core::host::types::identifiers::{ChainId, ChannelId, PortId};

use crate::commands::query::packet::util::PacketSequences;
use crate::contexts::app::HermesApp;
use crate::Result;

Expand Down Expand Up @@ -48,20 +47,14 @@ impl CommandRunner<HermesApp> for QueryPacketCommitments {

let chain = builder.build_chain(&self.chain_id).await?;

let (sequences, height) =
let sequences =
<CosmosChain as CanQueryPacketCommitments<CosmosChain>>::query_packet_commitments(
&chain,
&self.channel_id,
&self.port_id,
)
.await?;

let packet_sequences = PacketSequences::new(height, sequences);

if json() {
Ok(Output::success(packet_sequences))
} else {
Ok(Output::success(packet_sequences.collated()))
}
Ok(Output::success(sequences))
}
}
8 changes: 4 additions & 4 deletions crates/cli/cli/src/commands/query/packet/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl QueryPendingPackets {
let counterparty_chain = builder.build_chain(&counterparty_chain_id.clone()).await?;

// Retrieve source Chain summary
let (commitment_sequences, _) =
let commitment_sequences =
<CosmosChain as CanQueryPacketCommitments<CosmosChain>>::query_packet_commitments(
&chain,
&channel_id,
Expand Down Expand Up @@ -210,7 +210,7 @@ impl QueryPendingPackets {
)
.await?;

let unreceived_acknowledgement_sequences = if let Some((acks_on_counterparty, _)) =
let unreceived_acknowledgement_sequences = if let Some(acks_on_counterparty) =
acks_and_height_on_counterparty
{
<CosmosChain as CanQueryUnreceivedAcksSequences<CosmosChain>>::query_unreceived_acknowledgments_sequences(
Expand All @@ -230,7 +230,7 @@ impl QueryPendingPackets {
};

// Retrieve destination chain summary
let (commitment_sequences, _) =
let commitment_sequences =
<CosmosChain as CanQueryPacketCommitments<CosmosChain>>::query_packet_commitments(
&counterparty_chain,
counterparty_channel_id,
Expand All @@ -255,7 +255,7 @@ impl QueryPendingPackets {
)
.await?;

let unreceived_acknowledgement_sequences = if let Some((acks_on_counterparty, _)) =
let unreceived_acknowledgement_sequences = if let Some(acks_on_counterparty) =
acks_and_height_on_counterparty
{
<CosmosChain as CanQueryUnreceivedAcksSequences<CosmosChain>>::query_unreceived_acknowledgments_sequences(
Expand Down
27 changes: 7 additions & 20 deletions crates/cli/cli/src/commands/query/packet/pending_acks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use hermes_chain_components::traits::queries::unreceived_acks_sequences::CanQuer
use hermes_cli_components::traits::build::CanLoadBuilder;
use hermes_cli_components::traits::command::CommandRunnerComponent;
use hermes_cli_framework::command::CommandRunner;
use hermes_cli_framework::output::{json, Output};
use hermes_cli_framework::output::Output;
use hermes_cosmos_chain_components::traits::abci_query::CanQueryAbci;
use hermes_cosmos_chain_components::types::tendermint::TendermintClientState;
use hermes_cosmos_relayer::contexts::build::CosmosBuilder;
Expand All @@ -16,14 +16,12 @@ use hermes_protobuf_encoding_components::types::any::Any;
use hermes_relayer_components::chain::traits::queries::chain_status::CanQueryChainHeight;
use ibc::clients::tendermint::types::TENDERMINT_CLIENT_STATE_TYPE_URL;
use ibc::core::channel::types::channel::{ChannelEnd, State};
use ibc::core::client::types::Height;
use ibc::core::connection::types::ConnectionEnd;
use ibc::core::host::types::identifiers::{ChainId, ChannelId, PortId, Sequence};
use ibc::cosmos_host::IBC_QUERY_PATH;
use ibc::primitives::proto::Protobuf;
use oneline_eyre::eyre::eyre;

use crate::commands::query::packet::util::PacketSequences;
use crate::contexts::app::HermesApp;
use crate::Result;

Expand Down Expand Up @@ -58,7 +56,7 @@ pub struct QueryPendingAcks {
}

impl QueryPendingAcks {
async fn execute(&self, builder: &CosmosBuilder) -> Result<Option<(Vec<Sequence>, Height)>> {
async fn execute(&self, builder: &CosmosBuilder) -> Result<Option<Vec<Sequence>>> {
let port_id = self.port_id.clone();
let channel_id = self.channel_id.clone();
let chain = builder.build_chain(&self.chain_id).await?;
Expand Down Expand Up @@ -124,7 +122,7 @@ impl QueryPendingAcks {
let counterparty_chain_id = client_state.chain_id();
let counterparty_chain = builder.build_chain(&counterparty_chain_id.clone()).await?;

let (commitment_sequences, _) =
let commitment_sequences =
<CosmosChain as CanQueryPacketCommitments<CosmosChain>>::query_packet_commitments(
&chain,
&channel_id,
Expand All @@ -142,19 +140,16 @@ impl QueryPendingAcks {
)
.await?;

let unreceived_acknowledgement_sequences_and_height = if let Some((
acks_on_counterparty,
height,
)) =
let unreceived_acknowledgement_sequences_and_height = if let Some(acks_on_counterparty) =
acks_and_height_on_counterparty
{
Some((<CosmosChain as CanQueryUnreceivedAcksSequences<CosmosChain>>::query_unreceived_acknowledgments_sequences(
Some(<CosmosChain as CanQueryUnreceivedAcksSequences<CosmosChain>>::query_unreceived_acknowledgments_sequences(
&chain,
&channel_id,
&port_id,
&acks_on_counterparty,
)
.await?, height))
.await?)
} else {
None
};
Expand All @@ -171,15 +166,7 @@ impl CommandRunner<HermesApp> for QueryPendingAcks {
match self.execute(&builder).await {
Err(e) => Ok(Output::error(e)),
Ok(None) => Ok(Output::success_msg("No unreceived acknowledgements")),
Ok(Some((sequences, height))) => {
let packet_sequences = PacketSequences::new(height, sequences);

if json() {
Ok(Output::success(packet_sequences))
} else {
Ok(Output::success(packet_sequences.collated()))
}
}
Ok(Some(sequences)) => Ok(Output::success(sequences)),
}
}
}
38 changes: 0 additions & 38 deletions crates/cli/cli/src/commands/query/packet/util/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use core::fmt;

use ibc::core::client::types::Height;
use ibc::core::host::types::identifiers::Sequence;
use serde::Serialize;

Expand Down Expand Up @@ -51,40 +50,3 @@ impl CollatedPendingPackets {
}
}
}

#[derive(Serialize, Debug)]
pub struct PacketSequences {
pub height: Height,
pub sequences: Vec<u64>,
}

impl PacketSequences {
pub fn new(height: Height, sequences: Vec<Sequence>) -> Self {
Self {
height,
sequences: sequences.into_iter().map(u64::from).collect(),
}
}

pub fn collated(self) -> CollatedPacketSequences {
CollatedPacketSequences {
height: self.height,
sequences: self.sequences.into_iter().collated().collect(),
}
}
}

#[derive(Serialize)]
pub struct CollatedPacketSequences {
pub height: Height,
pub sequences: Vec<Collated<u64>>,
}

impl fmt::Debug for CollatedPacketSequences {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PacketSequences")
.field("height", &self.height)
.field("sequences", &self.sequences)
.finish()
}
}
Loading

0 comments on commit 9865ee3

Please sign in to comment.