From f1722158a050bb6f40d451fdfff9aaf8e0961e32 Mon Sep 17 00:00:00 2001 From: YI Date: Tue, 31 Dec 2024 08:26:13 +0800 Subject: [PATCH 01/18] Remove last cursor in gossip --- src/fiber/gossip.rs | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index c2ba222d2..d4a2fbabb 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -977,7 +977,6 @@ pub struct ExtendedGossipMessageStoreState { chain_actor: ActorRef, next_id: u64, output_ports: HashMap, - last_cursor: Cursor, messages_to_be_saved: HashSet, } @@ -995,17 +994,10 @@ impl ExtendedGossipMessageStoreState { chain_actor, next_id: Default::default(), output_ports: Default::default(), - last_cursor: Default::default(), messages_to_be_saved: Default::default(), } } - fn update_last_cursor(&mut self, cursor: Cursor) { - if cursor > self.last_cursor { - self.last_cursor = cursor; - } - } - // Saving all the messages whose transitive dependencies are already available. // We will also change the relevant state (e.g. update the latest cursor). // The returned list may be sent to the subscribers. @@ -1024,7 +1016,6 @@ impl ExtendedGossipMessageStoreState { match verify_and_save_broadcast_message(&message, &self.store, &self.chain_actor).await { Ok(_) => { - self.update_last_cursor(message.cursor()); verified_sorted_messages.push(message); } Err(error) => { @@ -1298,8 +1289,7 @@ impl Actor for ExtendedGossipMess ExtendedGossipMessageStoreMessage::Tick => { trace!( - "Gossip store maintenance ticked: last_cursor = {:?} #subscriptions = {}, #messages_to_be_saved = {}", - state.last_cursor, + "Gossip store maintenance ticked: #subscriptions = {}, #messages_to_be_saved = {}", state.output_ports.len(), state.messages_to_be_saved.len(), ); @@ -1361,7 +1351,6 @@ pub enum ExtendedGossipMessageStoreMessage { LoadMessagesFromStore(u64, Cursor), // A tick message that is sent periodically to check if there are any messages that are saved out of order. // If there are, we will send them to the subscribers. - // This tick will also advance the last_cursor upon finishing. Tick, } From 097aed2347bc3c09d321e4fa4da4d41a737769ec Mon Sep 17 00:00:00 2001 From: YI Date: Thu, 2 Jan 2025 01:40:36 +0800 Subject: [PATCH 02/18] Fix a subtle bug because nullify messages_to_be_saved too soon --- src/fiber/gossip.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index d4a2fbabb..9b9c90fa1 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -1,7 +1,6 @@ use std::{ collections::{HashMap, HashSet}, marker::PhantomData, - mem::take, sync::Arc, time::Duration, }; @@ -1002,11 +1001,19 @@ impl ExtendedGossipMessageStoreState { // We will also change the relevant state (e.g. update the latest cursor). // The returned list may be sent to the subscribers. async fn prune_messages_to_be_saved(&mut self) -> Vec { - let messages_to_be_saved = take(&mut self.messages_to_be_saved); - let (complete_messages, uncomplete_messages) = messages_to_be_saved - .into_iter() - .partition(|m| self.has_dependencies_available(m)); - self.messages_to_be_saved = uncomplete_messages; + // Note that we have to call has_dependencies_available before changing messages_to_be_saved, + // as the function will check the dependencies of the message in the current messages_to_be_saved. + let complete_messages = self + .messages_to_be_saved + .iter() + .filter(|m| self.has_dependencies_available(m)) + .cloned() + .collect::>(); + self.messages_to_be_saved = self + .messages_to_be_saved + .difference(&complete_messages) + .cloned() + .collect(); let mut sorted_messages = complete_messages.into_iter().collect::>(); sorted_messages.sort_unstable(); From 86609397f2036700f816f5e353aeb0dfc8d78e04 Mon Sep 17 00:00:00 2001 From: YI Date: Mon, 30 Dec 2024 11:19:08 +0800 Subject: [PATCH 03/18] Fix broadcast message order --- src/fiber/gossip.rs | 4 ++++ src/fiber/types.rs | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index 9b9c90fa1..a99660958 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -1017,6 +1017,10 @@ impl ExtendedGossipMessageStoreState { let mut sorted_messages = complete_messages.into_iter().collect::>(); sorted_messages.sort_unstable(); + trace!( + "Saving complete messages to the store: {:?}", + &sorted_messages + ); let mut verified_sorted_messages = Vec::with_capacity(sorted_messages.len()); for message in sorted_messages { diff --git a/src/fiber/types.rs b/src/fiber/types.rs index 71e68590f..9632e2de7 100644 --- a/src/fiber/types.rs +++ b/src/fiber/types.rs @@ -2720,14 +2720,14 @@ impl Ord for BroadcastMessageID { BroadcastMessageID::NodeAnnouncement(pubkey1), BroadcastMessageID::NodeAnnouncement(pubkey2), ) => pubkey1.cmp(pubkey2), - (BroadcastMessageID::ChannelUpdate(_), _) => Ordering::Less, - (BroadcastMessageID::NodeAnnouncement(_), _) => Ordering::Greater, + (BroadcastMessageID::NodeAnnouncement(_), _) => Ordering::Less, + (BroadcastMessageID::ChannelUpdate(_), _) => Ordering::Greater, ( BroadcastMessageID::ChannelAnnouncement(_), BroadcastMessageID::NodeAnnouncement(_), - ) => Ordering::Less, + ) => Ordering::Greater, (BroadcastMessageID::ChannelAnnouncement(_), BroadcastMessageID::ChannelUpdate(_)) => { - Ordering::Greater + Ordering::Less } } } From 03e2ae69c655328e266a371f9199c7f942b4ac2b Mon Sep 17 00:00:00 2001 From: YI Date: Thu, 2 Jan 2025 03:09:20 +0800 Subject: [PATCH 04/18] Cache channel transactions and timestamps --- Cargo.lock | 31 ++++++++- Cargo.toml | 1 + src/fiber/gossip.rs | 153 +++++++++++++++++++++++++++++--------------- 3 files changed, 131 insertions(+), 54 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea31525f7..cbd506690 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,6 +47,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -956,7 +962,7 @@ dependencies = [ "jsonrpc-core", "lazy_static", "log", - "lru", + "lru 0.7.8", "parking_lot", "reqwest", "secp256k1 0.29.1", @@ -1064,7 +1070,7 @@ dependencies = [ "ckb-types", "ckb-verification-traits", "derive_more", - "lru", + "lru 0.7.8", "tokio", ] @@ -1618,6 +1624,7 @@ dependencies = [ "jsonrpsee", "lightning-invoice", "lnd-grpc-tonic-client", + "lru 0.12.5", "molecule", "musig2", "nom", @@ -1648,6 +1655,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1903,6 +1916,11 @@ name = "hashbrown" version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "heapsize" @@ -2618,6 +2636,15 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "matchers" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 962d1fd71..70f84d0d9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ indicatif = "0.16" console = "0.15.8" bincode = "1.3.3" num_enum = "0.7.3" +lru = "0.12.5" [features] default = [] diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index a99660958..16a4e27b4 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -1,13 +1,19 @@ use std::{ collections::{HashMap, HashSet}, marker::PhantomData, + num::NonZero, sync::Arc, time::Duration, }; use ckb_hash::blake2b_256; use ckb_jsonrpc_types::{Status, TransactionView, TxStatus}; -use ckb_types::{packed::OutPoint, H256}; +use ckb_types::{ + packed::{Byte32, OutPoint}, + H256, +}; +use lru::LruCache; +use once_cell::sync::OnceCell; use ractor::{ async_trait as rasync_trait, call, call_t, concurrency::{timeout, JoinHandle}, @@ -26,7 +32,7 @@ use tentacle::{ utils::{is_reachable, multiaddr_to_socketaddr}, SessionId, }; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, Mutex}; use tracing::{debug, error, info, trace, warn}; use crate::{ @@ -1815,29 +1821,50 @@ async fn get_channel_tx( outpoint: &OutPoint, chain: &ActorRef, ) -> Result<(TransactionView, H256), Error> { - match call_t!( - chain, - CkbChainMessage::TraceTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - TraceTxRequest { - tx_hash: outpoint.tx_hash(), - confirmations: 1, + static CHANNEL_TRANSACTIONS: OnceCell>> = + OnceCell::new(); + static CHANNEL_TRANSACTION_CACHE_SIZE: usize = 20; + CHANNEL_TRANSACTIONS.get_or_init(|| { + Mutex::new(LruCache::new( + NonZero::new(CHANNEL_TRANSACTION_CACHE_SIZE).unwrap(), + )) + }); + let mut channel_transactions = CHANNEL_TRANSACTIONS.get().unwrap().lock().await; + match channel_transactions.get(&outpoint.tx_hash()) { + Some(tx) => { + return Ok(tx.clone()); } - ) { - Ok(TraceTxResponse { - tx: Some(tx), - status: - TxStatus { - status: Status::Committed, - block_hash: Some(block_hash), - .. + None => { + // TODO: we should also cache invalid transactions to avoid querying the chain actor, + // but there is a possibility that the transaction is valid while calling the chain actor fails. + match call_t!( + chain, + CkbChainMessage::TraceTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + TraceTxRequest { + tx_hash: outpoint.tx_hash(), + confirmations: 1, + } + ) { + Ok(TraceTxResponse { + tx: Some(tx), + status: + TxStatus { + status: Status::Committed, + block_hash: Some(block_hash), + .. + }, + }) => { + channel_transactions.put(outpoint.tx_hash(), (tx.clone(), block_hash.clone())); + Ok((tx, block_hash)) }, - }) => Ok((tx, block_hash)), - err => Err(Error::InvalidParameter(format!( - "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", - &outpoint.tx_hash(), - err - ))), + err => Err(Error::InvalidParameter(format!( + "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", + &outpoint.tx_hash(), + err + ))), + } + } } } @@ -1846,38 +1873,60 @@ async fn get_channel_timestamp( store: &S, chain: &ActorRef, ) -> Result { - if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { - return Ok(timestamp); - } - - let (_, block_hash) = get_channel_tx(outpoint, chain).await?; - let timestamp: u64 = match call_t!( - chain, - CkbChainMessage::GetBlockTimestamp, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - GetBlockTimestampRequest::from_block_hash(block_hash.clone()) - ) { - Ok(Ok(Some(timestamp))) => timestamp, - Ok(Ok(None)) => { - return Err(Error::InternalError(anyhow::anyhow!( - "Unable to find block {:?} for channel outpoint {:?}", - &block_hash, - &outpoint - ))); - } - Ok(Err(err)) => { - return Err(Error::CkbRpcError(err)); + static CHANNEL_TIMESTAMPS: OnceCell>> = OnceCell::new(); + // The cache size of 200 is enough to process two batches of 100 broadcast messages. + // So it could be quite enough for the channel announcement messages. Yet it is not + // very memory intensive. + static CHANNEL_TIMESTAMP_CACHE_SIZE: usize = 200; + CHANNEL_TIMESTAMPS.get_or_init(|| { + Mutex::new(LruCache::new( + NonZero::new(CHANNEL_TIMESTAMP_CACHE_SIZE).unwrap(), + )) + }); + let mut channel_timestamps = CHANNEL_TIMESTAMPS.get().unwrap().lock().await; + let timestamp = match channel_timestamps.get(&outpoint.tx_hash()) { + Some(timestamp) => { + return Ok(*timestamp); } - Err(err) => { - return Err(Error::InternalError(anyhow::Error::new(err).context( - format!( - "Error while trying to obtain block {:?} for channel outpoint {:?}", - block_hash, &outpoint - ), - ))); + None => { + if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { + timestamp + } else { + debug!( + "Getting channel announcement message timestamp by calling chain actor: {:?}", + &outpoint + ); + let (_, block_hash) = get_channel_tx(outpoint, chain).await?; + match call_t!( + chain, + CkbChainMessage::GetBlockTimestamp, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + GetBlockTimestampRequest::from_block_hash(block_hash.clone()) + ) { + Ok(Ok(Some(timestamp))) => timestamp, + Ok(Ok(None)) => { + return Err(Error::InternalError(anyhow::anyhow!( + "Unable to find block {:?} for channel outpoint {:?}", + &block_hash, + &outpoint + ))); + } + Ok(Err(err)) => { + return Err(Error::CkbRpcError(err)); + } + Err(err) => { + return Err(Error::InternalError(anyhow::Error::new(err).context( + format!( + "Error while trying to obtain block {:?} for channel outpoint {:?}", + block_hash, &outpoint + ), + ))); + } + } + } } }; - + channel_timestamps.put(outpoint.tx_hash(), timestamp); Ok(timestamp) } From ee6923f3ad5794feecf87d1cb1b593bd820f1497 Mon Sep 17 00:00:00 2001 From: YI Date: Thu, 2 Jan 2025 13:19:26 +0800 Subject: [PATCH 05/18] Encapsulate chain operations with cache --- src/fiber/gossip.rs | 377 +++++++++++++++++++++++--------------------- 1 file changed, 193 insertions(+), 184 deletions(-) diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index 16a4e27b4..40350c547 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -13,7 +13,6 @@ use ckb_types::{ H256, }; use lru::LruCache; -use once_cell::sync::OnceCell; use ractor::{ async_trait as rasync_trait, call, call_t, concurrency::{timeout, JoinHandle}, @@ -219,6 +218,161 @@ pub trait GossipMessageStore { fn save_node_announcement(&self, node_announcement: NodeAnnouncement); } +#[derive(Clone)] +pub struct ChainWithCache { + chain_actor: ActorRef, + channel_timestamp_cache: Arc>>, + channel_transaction_cache: Arc>>, +} + +impl ChainWithCache { + fn new(chain_actor: ActorRef) -> Self { + Self { + chain_actor, + // The cache size of 200 is enough to process two batches of 100 broadcast messages. + // So it could be quite enough for the channel announcement messages. Yet it is not + // very memory intensive. + channel_timestamp_cache: Arc::new(Mutex::new(LruCache::new( + NonZero::new(200).unwrap(), + ))), + // Normally, we will first obtain the transaction timestamp + // (the timestamp of the block that contains the transaction) to sort messages by timestamp. + // And then we will verify the transaction is a valid funding transaction for the channel. + // There shouldn't be too many channel announcement transactions in the cache. + channel_transaction_cache: Arc::new(Mutex::new(LruCache::new( + NonZero::new(100).unwrap(), + ))), + } + } + + async fn get_channel_tx(&self, outpoint: &OutPoint) -> Result<(TransactionView, H256), Error> { + let mut channel_transactions = self.channel_transaction_cache.lock().await; + match channel_transactions.get(&outpoint.tx_hash()) { + Some(tx) => { + return Ok(tx.clone()); + } + None => { + // TODO: we should also cache invalid transactions to avoid querying the chain actor, + // but there is a possibility that the transaction is valid while calling the chain actor fails. + match call_t!( + &self.chain_actor, + CkbChainMessage::TraceTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + TraceTxRequest { + tx_hash: outpoint.tx_hash(), + confirmations: 1, + } + ) { + Ok(TraceTxResponse { + tx: Some(tx), + status: + TxStatus { + status: Status::Committed, + block_hash: Some(block_hash), + .. + }, + }) => { + channel_transactions.put(outpoint.tx_hash(), (tx.clone(), block_hash.clone())); + Ok((tx, block_hash)) + }, + err => Err(Error::InvalidParameter(format!( + "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", + &outpoint.tx_hash(), + err + ))), + } + } + } + } + + async fn get_channel_timestamp( + &self, + outpoint: &OutPoint, + store: &S, + ) -> Result { + let mut channel_timestamps = self.channel_timestamp_cache.lock().await; + let timestamp = match channel_timestamps.get(&outpoint.tx_hash()) { + Some(timestamp) => { + return Ok(*timestamp); + } + None => { + if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { + timestamp + } else { + debug!( + "Getting channel announcement message timestamp by calling chain actor: {:?}", + &outpoint + ); + let (_, block_hash) = self.get_channel_tx(outpoint).await?; + match call_t!( + &self.chain_actor, + CkbChainMessage::GetBlockTimestamp, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + GetBlockTimestampRequest::from_block_hash(block_hash.clone()) + ) { + Ok(Ok(Some(timestamp))) => timestamp, + Ok(Ok(None)) => { + return Err(Error::InternalError(anyhow::anyhow!( + "Unable to find block {:?} for channel outpoint {:?}", + &block_hash, + &outpoint + ))); + } + Ok(Err(err)) => { + return Err(Error::CkbRpcError(err)); + } + Err(err) => { + return Err(Error::InternalError(anyhow::Error::new(err).context( + format!( + "Error while trying to obtain block {:?} for channel outpoint {:?}", + block_hash, &outpoint + ), + ))); + } + } + } + } + }; + channel_timestamps.put(outpoint.tx_hash(), timestamp); + Ok(timestamp) + } + + async fn get_message_cursor( + &self, + message: BroadcastMessage, + store: &S, + ) -> Result { + let m = self + .get_broadcast_message_with_timestamp(message, store) + .await?; + Ok(m.cursor()) + } + + async fn get_broadcast_message_with_timestamp( + &self, + message: BroadcastMessage, + store: &S, + ) -> Result { + match message { + BroadcastMessage::ChannelAnnouncement(channel_announcement) => { + let timestamp = self + .get_channel_timestamp(&channel_announcement.channel_outpoint, store) + .await?; + Ok(BroadcastMessageWithTimestamp::ChannelAnnouncement( + timestamp, + channel_announcement, + )) + } + BroadcastMessage::ChannelUpdate(channel_update) => { + Ok(BroadcastMessageWithTimestamp::ChannelUpdate(channel_update)) + } + BroadcastMessage::NodeAnnouncement(node_announcement) => Ok( + BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement), + ), + } + } +} + // A batch of gossip messages has been added to the store since the last time // we pulled new messages/messages are pushed to us. #[derive(Clone, Debug)] @@ -362,7 +516,7 @@ struct SyncingPeerState { pub struct GossipSyncingActorState { peer_id: PeerId, gossip_actor: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, store: ExtendedGossipMessageStore, // The problem of using the cursor from the store is that a malicious peer may only // send large cursor to us, which may cause us to miss some messages. @@ -380,14 +534,14 @@ impl GossipSyncingActorState { fn new( peer_id: PeerId, gossip_actor: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, store: ExtendedGossipMessageStore, cursor: Cursor, ) -> Self { Self { peer_id, gossip_actor, - chain_actor, + chain, store, cursor, peer_state: Default::default(), @@ -439,7 +593,7 @@ where type Arguments = ( PeerId, ActorRef, - ActorRef, + ChainWithCache, ExtendedGossipMessageStore, Cursor, ); @@ -447,7 +601,7 @@ where async fn pre_start( &self, myself: ActorRef, - (peer_id, gossip_actor, chain_actor, store, cursor): Self::Arguments, + (peer_id, gossip_actor, chain, store, cursor): Self::Arguments, ) -> Result { myself .send_message(GossipSyncingActorMessage::NewGetRequest()) @@ -455,7 +609,7 @@ where Ok(GossipSyncingActorState::new( peer_id, gossip_actor, - chain_actor, + chain, store, cursor, )) @@ -490,12 +644,10 @@ where match messages.last() { Some(last_message) => { // We need the message timestamp to construct a valid cursor. - match get_message_cursor( - last_message.clone(), - &state.store.store, - &state.chain_actor, - ) - .await + match state + .chain + .get_message_cursor(last_message.clone(), &state.store.store) + .await { Ok(cursor) => { state.cursor = cursor; @@ -857,7 +1009,7 @@ where announce_private_addr: bool, store: S, gossip_actor: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, supervisor: ActorCell, ) -> Self { let (actor, _) = Actor::spawn_linked( @@ -871,7 +1023,7 @@ where announce_private_addr, store.clone(), gossip_actor, - chain_actor, + chain, ), supervisor, ) @@ -979,7 +1131,7 @@ pub struct ExtendedGossipMessageStoreState { announce_private_addr: bool, store: S, gossip_actor: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, next_id: u64, output_ports: HashMap, messages_to_be_saved: HashSet, @@ -990,13 +1142,13 @@ impl ExtendedGossipMessageStoreState { announce_private_addr: bool, store: S, gossip_actor: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, ) -> Self { Self { announce_private_addr, store, gossip_actor, - chain_actor, + chain, next_id: Default::default(), output_ports: Default::default(), messages_to_be_saved: Default::default(), @@ -1030,8 +1182,7 @@ impl ExtendedGossipMessageStoreState { let mut verified_sorted_messages = Vec::with_capacity(sorted_messages.len()); for message in sorted_messages { - match verify_and_save_broadcast_message(&message, &self.store, &self.chain_actor).await - { + match verify_and_save_broadcast_message(&message, &self.store, &self.chain).await { Ok(_) => { verified_sorted_messages.push(message); } @@ -1082,12 +1233,11 @@ impl ExtendedGossipMessageStoreState { } } - let message = - get_broadcast_message_with_timestamp(message.clone(), &self.store, &self.chain_actor) - .await - .map_err(|error| { - GossipMessageProcessingError::ProcessingError(error.to_string()) - })?; + let message = self + .chain + .get_broadcast_message_with_timestamp(message.clone(), &self.store) + .await + .map_err(|error| GossipMessageProcessingError::ProcessingError(error.to_string()))?; let max_acceptable_gossip_message_timestamp = max_acceptable_gossip_message_timestamp(); if message.timestamp() > max_acceptable_gossip_message_timestamp { @@ -1155,7 +1305,7 @@ impl Actor for ExtendedGossipMess bool, S, ActorRef, - ActorRef, + ChainWithCache, ); async fn pre_start( @@ -1166,7 +1316,7 @@ impl Actor for ExtendedGossipMess announce_private_addr, store, gossip_actor, - chain_actor, + chain, ): Self::Arguments, ) -> Result { myself.send_interval(gossip_store_maintenance_interval, || { @@ -1176,7 +1326,7 @@ impl Actor for ExtendedGossipMess announce_private_addr, store, gossip_actor, - chain_actor, + chain, )) } @@ -1386,7 +1536,7 @@ pub(crate) struct GossipActorState { num_targeted_outbound_passive_syncing_peers: usize, next_request_id: u64, myself: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, // There are some messages missing from our store, and we need to query them from peers. // These messages include channel updates and node announcements related to channel announcements, // and channel announcements related to channel updates. @@ -1514,7 +1664,7 @@ where ( peer_id.clone(), self.myself.clone(), - self.chain_actor.clone(), + self.chain.clone(), self.store.clone(), safe_cursor, ), @@ -1714,15 +1864,6 @@ fn get_dependent_message_queries( queries } -async fn get_message_cursor( - message: BroadcastMessage, - store: &S, - chain: &ActorRef, -) -> Result { - let m = get_broadcast_message_with_timestamp(message, store, chain).await?; - Ok(m.cursor()) -} - fn get_existing_broadcast_message( message: &BroadcastMessage, store: &S, @@ -1761,29 +1902,6 @@ fn get_existing_newer_broadcast_message( }) } -async fn get_broadcast_message_with_timestamp( - message: BroadcastMessage, - store: &S, - chain: &ActorRef, -) -> Result { - match message { - BroadcastMessage::ChannelAnnouncement(channel_announcement) => { - let timestamp = - get_channel_timestamp(&channel_announcement.channel_outpoint, store, chain).await?; - Ok(BroadcastMessageWithTimestamp::ChannelAnnouncement( - timestamp, - channel_announcement, - )) - } - BroadcastMessage::ChannelUpdate(channel_update) => { - Ok(BroadcastMessageWithTimestamp::ChannelUpdate(channel_update)) - } - BroadcastMessage::NodeAnnouncement(node_announcement) => Ok( - BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement), - ), - } -} - // Channel updates depends on channel announcements to obtain the node public keys. // If a channel update is saved before the channel announcement, we can't reliably determine if // this channel update is valid. So we need to save the channel update to lagged_messages and @@ -1795,7 +1913,7 @@ async fn get_broadcast_message_with_timestamp( async fn verify_and_save_broadcast_message( message: &BroadcastMessageWithTimestamp, store: &S, - chain: &ActorRef, + chain: &ChainWithCache, ) -> Result<(), Error> { match message { BroadcastMessageWithTimestamp::ChannelAnnouncement(timestamp, channel_announcement) => { @@ -1817,119 +1935,6 @@ async fn verify_and_save_broadcast_message( Ok(()) } -async fn get_channel_tx( - outpoint: &OutPoint, - chain: &ActorRef, -) -> Result<(TransactionView, H256), Error> { - static CHANNEL_TRANSACTIONS: OnceCell>> = - OnceCell::new(); - static CHANNEL_TRANSACTION_CACHE_SIZE: usize = 20; - CHANNEL_TRANSACTIONS.get_or_init(|| { - Mutex::new(LruCache::new( - NonZero::new(CHANNEL_TRANSACTION_CACHE_SIZE).unwrap(), - )) - }); - let mut channel_transactions = CHANNEL_TRANSACTIONS.get().unwrap().lock().await; - match channel_transactions.get(&outpoint.tx_hash()) { - Some(tx) => { - return Ok(tx.clone()); - } - None => { - // TODO: we should also cache invalid transactions to avoid querying the chain actor, - // but there is a possibility that the transaction is valid while calling the chain actor fails. - match call_t!( - chain, - CkbChainMessage::TraceTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - TraceTxRequest { - tx_hash: outpoint.tx_hash(), - confirmations: 1, - } - ) { - Ok(TraceTxResponse { - tx: Some(tx), - status: - TxStatus { - status: Status::Committed, - block_hash: Some(block_hash), - .. - }, - }) => { - channel_transactions.put(outpoint.tx_hash(), (tx.clone(), block_hash.clone())); - Ok((tx, block_hash)) - }, - err => Err(Error::InvalidParameter(format!( - "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", - &outpoint.tx_hash(), - err - ))), - } - } - } -} - -async fn get_channel_timestamp( - outpoint: &OutPoint, - store: &S, - chain: &ActorRef, -) -> Result { - static CHANNEL_TIMESTAMPS: OnceCell>> = OnceCell::new(); - // The cache size of 200 is enough to process two batches of 100 broadcast messages. - // So it could be quite enough for the channel announcement messages. Yet it is not - // very memory intensive. - static CHANNEL_TIMESTAMP_CACHE_SIZE: usize = 200; - CHANNEL_TIMESTAMPS.get_or_init(|| { - Mutex::new(LruCache::new( - NonZero::new(CHANNEL_TIMESTAMP_CACHE_SIZE).unwrap(), - )) - }); - let mut channel_timestamps = CHANNEL_TIMESTAMPS.get().unwrap().lock().await; - let timestamp = match channel_timestamps.get(&outpoint.tx_hash()) { - Some(timestamp) => { - return Ok(*timestamp); - } - None => { - if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { - timestamp - } else { - debug!( - "Getting channel announcement message timestamp by calling chain actor: {:?}", - &outpoint - ); - let (_, block_hash) = get_channel_tx(outpoint, chain).await?; - match call_t!( - chain, - CkbChainMessage::GetBlockTimestamp, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - GetBlockTimestampRequest::from_block_hash(block_hash.clone()) - ) { - Ok(Ok(Some(timestamp))) => timestamp, - Ok(Ok(None)) => { - return Err(Error::InternalError(anyhow::anyhow!( - "Unable to find block {:?} for channel outpoint {:?}", - &block_hash, - &outpoint - ))); - } - Ok(Err(err)) => { - return Err(Error::CkbRpcError(err)); - } - Err(err) => { - return Err(Error::InternalError(anyhow::Error::new(err).context( - format!( - "Error while trying to obtain block {:?} for channel outpoint {:?}", - block_hash, &outpoint - ), - ))); - } - } - } - } - }; - channel_timestamps.put(outpoint.tx_hash(), timestamp); - Ok(timestamp) -} - // Verify the channel announcement message. If any error occurs, return the error. // Otherwise, return the timestamp of the channel announcement and a bool value indicating if the // the channel announcement is already saved to the store. If it is already saved, the bool value @@ -1937,7 +1942,7 @@ async fn get_channel_timestamp( async fn verify_channel_announcement( channel_announcement: &ChannelAnnouncement, store: &S, - chain: &ActorRef, + chain: &ChainWithCache, ) -> Result { if let Some((_, announcement)) = store.get_latest_channel_announcement(&channel_announcement.channel_outpoint) @@ -1994,7 +1999,10 @@ async fn verify_channel_announcement( ))); } - let (tx, _) = get_channel_tx(&channel_announcement.channel_outpoint, chain).await?; + let (tx, _) = chain + .get_channel_tx(&channel_announcement.channel_outpoint) + .await?; + let pubkey = channel_announcement.ckb_key.serialize(); let pubkey_hash = &blake2b_256(pubkey.as_slice())[0..20]; match tx.inner.outputs.first() { @@ -2147,6 +2155,7 @@ impl GossipProtocolHandle { { let (network_control_sender, network_control_receiver) = oneshot::channel(); let (store_sender, store_receiver) = oneshot::channel(); + let chain = ChainWithCache::new(chain_actor); let (actor, _handle) = ActorRuntime::spawn_linked_instant( name, @@ -2158,7 +2167,7 @@ impl GossipProtocolHandle { gossip_store_maintenance_interval, announce_private_addr, store, - chain_actor, + chain, ), supervisor, ) @@ -2202,7 +2211,7 @@ where Duration, bool, S, - ActorRef, + ChainWithCache, ); async fn pre_start( @@ -2215,7 +2224,7 @@ where store_maintenance_interval, announce_private_addr, store, - chain_actor, + chain, ): Self::Arguments, ) -> Result { let store = ExtendedGossipMessageStore::new( @@ -2223,7 +2232,7 @@ where announce_private_addr, store, myself.clone(), - chain_actor.clone(), + chain.clone(), myself.get_cell(), ) .await; @@ -2245,7 +2254,7 @@ where num_targeted_active_syncing_peers: MAX_NUM_OF_ACTIVE_SYNCING_PEERS, num_targeted_outbound_passive_syncing_peers: MIN_NUM_OF_PASSIVE_SYNCING_PEERS, myself, - chain_actor, + chain, next_request_id: Default::default(), pending_queries: Default::default(), num_finished_active_syncing_peers: Default::default(), From e2acdb5f2c6811d8f185e00825d58f4fd81f036d Mon Sep 17 00:00:00 2001 From: YI Date: Thu, 2 Jan 2025 14:52:37 +0800 Subject: [PATCH 06/18] Revert "Encapsulate chain operations with cache" This reverts commit 048d740a01bab29224bec6fddd1013507b0c3433. --- src/fiber/gossip.rs | 372 ++++++++++++++++++++++---------------------- 1 file changed, 183 insertions(+), 189 deletions(-) diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index 40350c547..331b0a86a 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -13,6 +13,7 @@ use ckb_types::{ H256, }; use lru::LruCache; +use once_cell::sync::OnceCell; use ractor::{ async_trait as rasync_trait, call, call_t, concurrency::{timeout, JoinHandle}, @@ -218,161 +219,6 @@ pub trait GossipMessageStore { fn save_node_announcement(&self, node_announcement: NodeAnnouncement); } -#[derive(Clone)] -pub struct ChainWithCache { - chain_actor: ActorRef, - channel_timestamp_cache: Arc>>, - channel_transaction_cache: Arc>>, -} - -impl ChainWithCache { - fn new(chain_actor: ActorRef) -> Self { - Self { - chain_actor, - // The cache size of 200 is enough to process two batches of 100 broadcast messages. - // So it could be quite enough for the channel announcement messages. Yet it is not - // very memory intensive. - channel_timestamp_cache: Arc::new(Mutex::new(LruCache::new( - NonZero::new(200).unwrap(), - ))), - // Normally, we will first obtain the transaction timestamp - // (the timestamp of the block that contains the transaction) to sort messages by timestamp. - // And then we will verify the transaction is a valid funding transaction for the channel. - // There shouldn't be too many channel announcement transactions in the cache. - channel_transaction_cache: Arc::new(Mutex::new(LruCache::new( - NonZero::new(100).unwrap(), - ))), - } - } - - async fn get_channel_tx(&self, outpoint: &OutPoint) -> Result<(TransactionView, H256), Error> { - let mut channel_transactions = self.channel_transaction_cache.lock().await; - match channel_transactions.get(&outpoint.tx_hash()) { - Some(tx) => { - return Ok(tx.clone()); - } - None => { - // TODO: we should also cache invalid transactions to avoid querying the chain actor, - // but there is a possibility that the transaction is valid while calling the chain actor fails. - match call_t!( - &self.chain_actor, - CkbChainMessage::TraceTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - TraceTxRequest { - tx_hash: outpoint.tx_hash(), - confirmations: 1, - } - ) { - Ok(TraceTxResponse { - tx: Some(tx), - status: - TxStatus { - status: Status::Committed, - block_hash: Some(block_hash), - .. - }, - }) => { - channel_transactions.put(outpoint.tx_hash(), (tx.clone(), block_hash.clone())); - Ok((tx, block_hash)) - }, - err => Err(Error::InvalidParameter(format!( - "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", - &outpoint.tx_hash(), - err - ))), - } - } - } - } - - async fn get_channel_timestamp( - &self, - outpoint: &OutPoint, - store: &S, - ) -> Result { - let mut channel_timestamps = self.channel_timestamp_cache.lock().await; - let timestamp = match channel_timestamps.get(&outpoint.tx_hash()) { - Some(timestamp) => { - return Ok(*timestamp); - } - None => { - if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { - timestamp - } else { - debug!( - "Getting channel announcement message timestamp by calling chain actor: {:?}", - &outpoint - ); - let (_, block_hash) = self.get_channel_tx(outpoint).await?; - match call_t!( - &self.chain_actor, - CkbChainMessage::GetBlockTimestamp, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - GetBlockTimestampRequest::from_block_hash(block_hash.clone()) - ) { - Ok(Ok(Some(timestamp))) => timestamp, - Ok(Ok(None)) => { - return Err(Error::InternalError(anyhow::anyhow!( - "Unable to find block {:?} for channel outpoint {:?}", - &block_hash, - &outpoint - ))); - } - Ok(Err(err)) => { - return Err(Error::CkbRpcError(err)); - } - Err(err) => { - return Err(Error::InternalError(anyhow::Error::new(err).context( - format!( - "Error while trying to obtain block {:?} for channel outpoint {:?}", - block_hash, &outpoint - ), - ))); - } - } - } - } - }; - channel_timestamps.put(outpoint.tx_hash(), timestamp); - Ok(timestamp) - } - - async fn get_message_cursor( - &self, - message: BroadcastMessage, - store: &S, - ) -> Result { - let m = self - .get_broadcast_message_with_timestamp(message, store) - .await?; - Ok(m.cursor()) - } - - async fn get_broadcast_message_with_timestamp( - &self, - message: BroadcastMessage, - store: &S, - ) -> Result { - match message { - BroadcastMessage::ChannelAnnouncement(channel_announcement) => { - let timestamp = self - .get_channel_timestamp(&channel_announcement.channel_outpoint, store) - .await?; - Ok(BroadcastMessageWithTimestamp::ChannelAnnouncement( - timestamp, - channel_announcement, - )) - } - BroadcastMessage::ChannelUpdate(channel_update) => { - Ok(BroadcastMessageWithTimestamp::ChannelUpdate(channel_update)) - } - BroadcastMessage::NodeAnnouncement(node_announcement) => Ok( - BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement), - ), - } - } -} - // A batch of gossip messages has been added to the store since the last time // we pulled new messages/messages are pushed to us. #[derive(Clone, Debug)] @@ -516,7 +362,7 @@ struct SyncingPeerState { pub struct GossipSyncingActorState { peer_id: PeerId, gossip_actor: ActorRef, - chain: ChainWithCache, + chain_actor: ActorRef, store: ExtendedGossipMessageStore, // The problem of using the cursor from the store is that a malicious peer may only // send large cursor to us, which may cause us to miss some messages. @@ -534,14 +380,14 @@ impl GossipSyncingActorState { fn new( peer_id: PeerId, gossip_actor: ActorRef, - chain: ChainWithCache, + chain_actor: ActorRef, store: ExtendedGossipMessageStore, cursor: Cursor, ) -> Self { Self { peer_id, gossip_actor, - chain, + chain_actor, store, cursor, peer_state: Default::default(), @@ -593,7 +439,7 @@ where type Arguments = ( PeerId, ActorRef, - ChainWithCache, + ActorRef, ExtendedGossipMessageStore, Cursor, ); @@ -601,7 +447,7 @@ where async fn pre_start( &self, myself: ActorRef, - (peer_id, gossip_actor, chain, store, cursor): Self::Arguments, + (peer_id, gossip_actor, chain_actor, store, cursor): Self::Arguments, ) -> Result { myself .send_message(GossipSyncingActorMessage::NewGetRequest()) @@ -609,7 +455,7 @@ where Ok(GossipSyncingActorState::new( peer_id, gossip_actor, - chain, + chain_actor, store, cursor, )) @@ -644,10 +490,12 @@ where match messages.last() { Some(last_message) => { // We need the message timestamp to construct a valid cursor. - match state - .chain - .get_message_cursor(last_message.clone(), &state.store.store) - .await + match get_message_cursor( + last_message.clone(), + &state.store.store, + &state.chain_actor, + ) + .await { Ok(cursor) => { state.cursor = cursor; @@ -1009,7 +857,7 @@ where announce_private_addr: bool, store: S, gossip_actor: ActorRef, - chain: ChainWithCache, + chain_actor: ActorRef, supervisor: ActorCell, ) -> Self { let (actor, _) = Actor::spawn_linked( @@ -1023,7 +871,7 @@ where announce_private_addr, store.clone(), gossip_actor, - chain, + chain_actor, ), supervisor, ) @@ -1131,7 +979,7 @@ pub struct ExtendedGossipMessageStoreState { announce_private_addr: bool, store: S, gossip_actor: ActorRef, - chain: ChainWithCache, + chain_actor: ActorRef, next_id: u64, output_ports: HashMap, messages_to_be_saved: HashSet, @@ -1142,13 +990,13 @@ impl ExtendedGossipMessageStoreState { announce_private_addr: bool, store: S, gossip_actor: ActorRef, - chain: ChainWithCache, + chain_actor: ActorRef, ) -> Self { Self { announce_private_addr, store, gossip_actor, - chain, + chain_actor, next_id: Default::default(), output_ports: Default::default(), messages_to_be_saved: Default::default(), @@ -1182,7 +1030,8 @@ impl ExtendedGossipMessageStoreState { let mut verified_sorted_messages = Vec::with_capacity(sorted_messages.len()); for message in sorted_messages { - match verify_and_save_broadcast_message(&message, &self.store, &self.chain).await { + match verify_and_save_broadcast_message(&message, &self.store, &self.chain_actor).await + { Ok(_) => { verified_sorted_messages.push(message); } @@ -1233,11 +1082,12 @@ impl ExtendedGossipMessageStoreState { } } - let message = self - .chain - .get_broadcast_message_with_timestamp(message.clone(), &self.store) - .await - .map_err(|error| GossipMessageProcessingError::ProcessingError(error.to_string()))?; + let message = + get_broadcast_message_with_timestamp(message.clone(), &self.store, &self.chain_actor) + .await + .map_err(|error| { + GossipMessageProcessingError::ProcessingError(error.to_string()) + })?; let max_acceptable_gossip_message_timestamp = max_acceptable_gossip_message_timestamp(); if message.timestamp() > max_acceptable_gossip_message_timestamp { @@ -1305,7 +1155,7 @@ impl Actor for ExtendedGossipMess bool, S, ActorRef, - ChainWithCache, + ActorRef, ); async fn pre_start( @@ -1316,7 +1166,7 @@ impl Actor for ExtendedGossipMess announce_private_addr, store, gossip_actor, - chain, + chain_actor, ): Self::Arguments, ) -> Result { myself.send_interval(gossip_store_maintenance_interval, || { @@ -1326,7 +1176,7 @@ impl Actor for ExtendedGossipMess announce_private_addr, store, gossip_actor, - chain, + chain_actor, )) } @@ -1536,7 +1386,7 @@ pub(crate) struct GossipActorState { num_targeted_outbound_passive_syncing_peers: usize, next_request_id: u64, myself: ActorRef, - chain: ChainWithCache, + chain_actor: ActorRef, // There are some messages missing from our store, and we need to query them from peers. // These messages include channel updates and node announcements related to channel announcements, // and channel announcements related to channel updates. @@ -1664,7 +1514,7 @@ where ( peer_id.clone(), self.myself.clone(), - self.chain.clone(), + self.chain_actor.clone(), self.store.clone(), safe_cursor, ), @@ -1864,6 +1714,15 @@ fn get_dependent_message_queries( queries } +async fn get_message_cursor( + message: BroadcastMessage, + store: &S, + chain: &ActorRef, +) -> Result { + let m = get_broadcast_message_with_timestamp(message, store, chain).await?; + Ok(m.cursor()) +} + fn get_existing_broadcast_message( message: &BroadcastMessage, store: &S, @@ -1902,6 +1761,29 @@ fn get_existing_newer_broadcast_message( }) } +async fn get_broadcast_message_with_timestamp( + message: BroadcastMessage, + store: &S, + chain: &ActorRef, +) -> Result { + match message { + BroadcastMessage::ChannelAnnouncement(channel_announcement) => { + let timestamp = + get_channel_timestamp(&channel_announcement.channel_outpoint, store, chain).await?; + Ok(BroadcastMessageWithTimestamp::ChannelAnnouncement( + timestamp, + channel_announcement, + )) + } + BroadcastMessage::ChannelUpdate(channel_update) => { + Ok(BroadcastMessageWithTimestamp::ChannelUpdate(channel_update)) + } + BroadcastMessage::NodeAnnouncement(node_announcement) => Ok( + BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement), + ), + } +} + // Channel updates depends on channel announcements to obtain the node public keys. // If a channel update is saved before the channel announcement, we can't reliably determine if // this channel update is valid. So we need to save the channel update to lagged_messages and @@ -1913,7 +1795,7 @@ fn get_existing_newer_broadcast_message( async fn verify_and_save_broadcast_message( message: &BroadcastMessageWithTimestamp, store: &S, - chain: &ChainWithCache, + chain: &ActorRef, ) -> Result<(), Error> { match message { BroadcastMessageWithTimestamp::ChannelAnnouncement(timestamp, channel_announcement) => { @@ -1935,6 +1817,119 @@ async fn verify_and_save_broadcast_message( Ok(()) } +async fn get_channel_tx( + outpoint: &OutPoint, + chain: &ActorRef, +) -> Result<(TransactionView, H256), Error> { + static CHANNEL_TRANSACTIONS: OnceCell>> = + OnceCell::new(); + static CHANNEL_TRANSACTION_CACHE_SIZE: usize = 20; + CHANNEL_TRANSACTIONS.get_or_init(|| { + Mutex::new(LruCache::new( + NonZero::new(CHANNEL_TRANSACTION_CACHE_SIZE).unwrap(), + )) + }); + let mut channel_transactions = CHANNEL_TRANSACTIONS.get().unwrap().lock().await; + match channel_transactions.get(&outpoint.tx_hash()) { + Some(tx) => { + return Ok(tx.clone()); + } + None => { + // TODO: we should also cache invalid transactions to avoid querying the chain actor, + // but there is a possibility that the transaction is valid while calling the chain actor fails. + match call_t!( + chain, + CkbChainMessage::TraceTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + TraceTxRequest { + tx_hash: outpoint.tx_hash(), + confirmations: 1, + } + ) { + Ok(TraceTxResponse { + tx: Some(tx), + status: + TxStatus { + status: Status::Committed, + block_hash: Some(block_hash), + .. + }, + }) => { + channel_transactions.put(outpoint.tx_hash(), (tx.clone(), block_hash.clone())); + Ok((tx, block_hash)) + }, + err => Err(Error::InvalidParameter(format!( + "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", + &outpoint.tx_hash(), + err + ))), + } + } + } +} + +async fn get_channel_timestamp( + outpoint: &OutPoint, + store: &S, + chain: &ActorRef, +) -> Result { + static CHANNEL_TIMESTAMPS: OnceCell>> = OnceCell::new(); + // The cache size of 200 is enough to process two batches of 100 broadcast messages. + // So it could be quite enough for the channel announcement messages. Yet it is not + // very memory intensive. + static CHANNEL_TIMESTAMP_CACHE_SIZE: usize = 200; + CHANNEL_TIMESTAMPS.get_or_init(|| { + Mutex::new(LruCache::new( + NonZero::new(CHANNEL_TIMESTAMP_CACHE_SIZE).unwrap(), + )) + }); + let mut channel_timestamps = CHANNEL_TIMESTAMPS.get().unwrap().lock().await; + let timestamp = match channel_timestamps.get(&outpoint.tx_hash()) { + Some(timestamp) => { + return Ok(*timestamp); + } + None => { + if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { + timestamp + } else { + debug!( + "Getting channel announcement message timestamp by calling chain actor: {:?}", + &outpoint + ); + let (_, block_hash) = get_channel_tx(outpoint, chain).await?; + match call_t!( + chain, + CkbChainMessage::GetBlockTimestamp, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + GetBlockTimestampRequest::from_block_hash(block_hash.clone()) + ) { + Ok(Ok(Some(timestamp))) => timestamp, + Ok(Ok(None)) => { + return Err(Error::InternalError(anyhow::anyhow!( + "Unable to find block {:?} for channel outpoint {:?}", + &block_hash, + &outpoint + ))); + } + Ok(Err(err)) => { + return Err(Error::CkbRpcError(err)); + } + Err(err) => { + return Err(Error::InternalError(anyhow::Error::new(err).context( + format!( + "Error while trying to obtain block {:?} for channel outpoint {:?}", + block_hash, &outpoint + ), + ))); + } + } + } + } + }; + channel_timestamps.put(outpoint.tx_hash(), timestamp); + Ok(timestamp) +} + // Verify the channel announcement message. If any error occurs, return the error. // Otherwise, return the timestamp of the channel announcement and a bool value indicating if the // the channel announcement is already saved to the store. If it is already saved, the bool value @@ -1942,7 +1937,7 @@ async fn verify_and_save_broadcast_message( async fn verify_channel_announcement( channel_announcement: &ChannelAnnouncement, store: &S, - chain: &ChainWithCache, + chain: &ActorRef, ) -> Result { if let Some((_, announcement)) = store.get_latest_channel_announcement(&channel_announcement.channel_outpoint) @@ -2155,7 +2150,6 @@ impl GossipProtocolHandle { { let (network_control_sender, network_control_receiver) = oneshot::channel(); let (store_sender, store_receiver) = oneshot::channel(); - let chain = ChainWithCache::new(chain_actor); let (actor, _handle) = ActorRuntime::spawn_linked_instant( name, @@ -2167,7 +2161,7 @@ impl GossipProtocolHandle { gossip_store_maintenance_interval, announce_private_addr, store, - chain, + chain_actor, ), supervisor, ) @@ -2211,7 +2205,7 @@ where Duration, bool, S, - ChainWithCache, + ActorRef, ); async fn pre_start( @@ -2224,7 +2218,7 @@ where store_maintenance_interval, announce_private_addr, store, - chain, + chain_actor, ): Self::Arguments, ) -> Result { let store = ExtendedGossipMessageStore::new( @@ -2232,7 +2226,7 @@ where announce_private_addr, store, myself.clone(), - chain.clone(), + chain_actor.clone(), myself.get_cell(), ) .await; @@ -2254,7 +2248,7 @@ where num_targeted_active_syncing_peers: MAX_NUM_OF_ACTIVE_SYNCING_PEERS, num_targeted_outbound_passive_syncing_peers: MIN_NUM_OF_PASSIVE_SYNCING_PEERS, myself, - chain, + chain_actor, next_request_id: Default::default(), pending_queries: Default::default(), num_finished_active_syncing_peers: Default::default(), From 50f1ccf43bae04b93ef5ed3588719a8ff50f04cf Mon Sep 17 00:00:00 2001 From: YI Date: Thu, 2 Jan 2025 14:52:45 +0800 Subject: [PATCH 07/18] Revert "Cache channel transactions and timestamps" This reverts commit 362dfe5e0f4371fe2b8a09d416bca99d566742a7. --- Cargo.lock | 31 +-------- Cargo.toml | 1 - src/fiber/gossip.rs | 153 +++++++++++++++----------------------------- 3 files changed, 54 insertions(+), 131 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cbd506690..ea31525f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,12 +47,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "allocator-api2" -version = "0.2.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" - [[package]] name = "android-tzdata" version = "0.1.1" @@ -962,7 +956,7 @@ dependencies = [ "jsonrpc-core", "lazy_static", "log", - "lru 0.7.8", + "lru", "parking_lot", "reqwest", "secp256k1 0.29.1", @@ -1070,7 +1064,7 @@ dependencies = [ "ckb-types", "ckb-verification-traits", "derive_more", - "lru 0.7.8", + "lru", "tokio", ] @@ -1624,7 +1618,6 @@ dependencies = [ "jsonrpsee", "lightning-invoice", "lnd-grpc-tonic-client", - "lru 0.12.5", "molecule", "musig2", "nom", @@ -1655,12 +1648,6 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" -[[package]] -name = "foldhash" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f" - [[package]] name = "foreign-types" version = "0.3.2" @@ -1916,11 +1903,6 @@ name = "hashbrown" version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" -dependencies = [ - "allocator-api2", - "equivalent", - "foldhash", -] [[package]] name = "heapsize" @@ -2636,15 +2618,6 @@ dependencies = [ "hashbrown 0.12.3", ] -[[package]] -name = "lru" -version = "0.12.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" -dependencies = [ - "hashbrown 0.15.2", -] - [[package]] name = "matchers" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 70f84d0d9..962d1fd71 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,7 +64,6 @@ indicatif = "0.16" console = "0.15.8" bincode = "1.3.3" num_enum = "0.7.3" -lru = "0.12.5" [features] default = [] diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index 331b0a86a..ff3573ac3 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -1,19 +1,13 @@ use std::{ collections::{HashMap, HashSet}, marker::PhantomData, - num::NonZero, sync::Arc, time::Duration, }; use ckb_hash::blake2b_256; use ckb_jsonrpc_types::{Status, TransactionView, TxStatus}; -use ckb_types::{ - packed::{Byte32, OutPoint}, - H256, -}; -use lru::LruCache; -use once_cell::sync::OnceCell; +use ckb_types::{packed::OutPoint, H256}; use ractor::{ async_trait as rasync_trait, call, call_t, concurrency::{timeout, JoinHandle}, @@ -32,7 +26,7 @@ use tentacle::{ utils::{is_reachable, multiaddr_to_socketaddr}, SessionId, }; -use tokio::sync::{oneshot, Mutex}; +use tokio::sync::oneshot; use tracing::{debug, error, info, trace, warn}; use crate::{ @@ -1821,50 +1815,29 @@ async fn get_channel_tx( outpoint: &OutPoint, chain: &ActorRef, ) -> Result<(TransactionView, H256), Error> { - static CHANNEL_TRANSACTIONS: OnceCell>> = - OnceCell::new(); - static CHANNEL_TRANSACTION_CACHE_SIZE: usize = 20; - CHANNEL_TRANSACTIONS.get_or_init(|| { - Mutex::new(LruCache::new( - NonZero::new(CHANNEL_TRANSACTION_CACHE_SIZE).unwrap(), - )) - }); - let mut channel_transactions = CHANNEL_TRANSACTIONS.get().unwrap().lock().await; - match channel_transactions.get(&outpoint.tx_hash()) { - Some(tx) => { - return Ok(tx.clone()); + match call_t!( + chain, + CkbChainMessage::TraceTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + TraceTxRequest { + tx_hash: outpoint.tx_hash(), + confirmations: 1, } - None => { - // TODO: we should also cache invalid transactions to avoid querying the chain actor, - // but there is a possibility that the transaction is valid while calling the chain actor fails. - match call_t!( - chain, - CkbChainMessage::TraceTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - TraceTxRequest { - tx_hash: outpoint.tx_hash(), - confirmations: 1, - } - ) { - Ok(TraceTxResponse { - tx: Some(tx), - status: - TxStatus { - status: Status::Committed, - block_hash: Some(block_hash), - .. - }, - }) => { - channel_transactions.put(outpoint.tx_hash(), (tx.clone(), block_hash.clone())); - Ok((tx, block_hash)) + ) { + Ok(TraceTxResponse { + tx: Some(tx), + status: + TxStatus { + status: Status::Committed, + block_hash: Some(block_hash), + .. }, - err => Err(Error::InvalidParameter(format!( - "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", - &outpoint.tx_hash(), - err - ))), - } - } + }) => Ok((tx, block_hash)), + err => Err(Error::InvalidParameter(format!( + "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", + &outpoint.tx_hash(), + err + ))), } } @@ -1873,60 +1846,38 @@ async fn get_channel_timestamp( store: &S, chain: &ActorRef, ) -> Result { - static CHANNEL_TIMESTAMPS: OnceCell>> = OnceCell::new(); - // The cache size of 200 is enough to process two batches of 100 broadcast messages. - // So it could be quite enough for the channel announcement messages. Yet it is not - // very memory intensive. - static CHANNEL_TIMESTAMP_CACHE_SIZE: usize = 200; - CHANNEL_TIMESTAMPS.get_or_init(|| { - Mutex::new(LruCache::new( - NonZero::new(CHANNEL_TIMESTAMP_CACHE_SIZE).unwrap(), - )) - }); - let mut channel_timestamps = CHANNEL_TIMESTAMPS.get().unwrap().lock().await; - let timestamp = match channel_timestamps.get(&outpoint.tx_hash()) { - Some(timestamp) => { - return Ok(*timestamp); + if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { + return Ok(timestamp); + } + + let (_, block_hash) = get_channel_tx(outpoint, chain).await?; + let timestamp: u64 = match call_t!( + chain, + CkbChainMessage::GetBlockTimestamp, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + GetBlockTimestampRequest::from_block_hash(block_hash.clone()) + ) { + Ok(Ok(Some(timestamp))) => timestamp, + Ok(Ok(None)) => { + return Err(Error::InternalError(anyhow::anyhow!( + "Unable to find block {:?} for channel outpoint {:?}", + &block_hash, + &outpoint + ))); } - None => { - if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { - timestamp - } else { - debug!( - "Getting channel announcement message timestamp by calling chain actor: {:?}", - &outpoint - ); - let (_, block_hash) = get_channel_tx(outpoint, chain).await?; - match call_t!( - chain, - CkbChainMessage::GetBlockTimestamp, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - GetBlockTimestampRequest::from_block_hash(block_hash.clone()) - ) { - Ok(Ok(Some(timestamp))) => timestamp, - Ok(Ok(None)) => { - return Err(Error::InternalError(anyhow::anyhow!( - "Unable to find block {:?} for channel outpoint {:?}", - &block_hash, - &outpoint - ))); - } - Ok(Err(err)) => { - return Err(Error::CkbRpcError(err)); - } - Err(err) => { - return Err(Error::InternalError(anyhow::Error::new(err).context( - format!( - "Error while trying to obtain block {:?} for channel outpoint {:?}", - block_hash, &outpoint - ), - ))); - } - } - } + Ok(Err(err)) => { + return Err(Error::CkbRpcError(err)); + } + Err(err) => { + return Err(Error::InternalError(anyhow::Error::new(err).context( + format!( + "Error while trying to obtain block {:?} for channel outpoint {:?}", + block_hash, &outpoint + ), + ))); } }; - channel_timestamps.put(outpoint.tx_hash(), timestamp); + Ok(timestamp) } From b6bbf40afdfc4e18b8a5d1fd4ef256d1373e5a6d Mon Sep 17 00:00:00 2001 From: YI Date: Thu, 2 Jan 2025 16:55:28 +0800 Subject: [PATCH 08/18] Simplify prune_messages_to_be_saved --- src/fiber/gossip.rs | 20 +++++++------------- 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index ff3573ac3..a5fc7db84 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -997,23 +997,17 @@ impl ExtendedGossipMessageStoreState { } } - // Saving all the messages whose transitive dependencies are already available. - // We will also change the relevant state (e.g. update the latest cursor). - // The returned list may be sent to the subscribers. + // Obtaining all the messages whose transitive dependencies are already available, + // check their validity and then save valid messages to a list that can be sent to the subscribers. async fn prune_messages_to_be_saved(&mut self) -> Vec { // Note that we have to call has_dependencies_available before changing messages_to_be_saved, // as the function will check the dependencies of the message in the current messages_to_be_saved. - let complete_messages = self + let (complete_messages, uncomplete_messages) = self .messages_to_be_saved - .iter() - .filter(|m| self.has_dependencies_available(m)) - .cloned() - .collect::>(); - self.messages_to_be_saved = self - .messages_to_be_saved - .difference(&complete_messages) - .cloned() - .collect(); + .clone() + .into_iter() + .partition(|m| self.has_dependencies_available(m)); + self.messages_to_be_saved = uncomplete_messages; let mut sorted_messages = complete_messages.into_iter().collect::>(); sorted_messages.sort_unstable(); From 95cf1f8a4b69ce298058b4346fbf32cbb5aca92a Mon Sep 17 00:00:00 2001 From: YI Date: Thu, 2 Jan 2025 16:37:25 +0800 Subject: [PATCH 09/18] Add on chain info to messages_to_be_saved --- src/ckb/tests/test_utils.rs | 1 + src/fiber/gossip.rs | 177 +++++++++++++++++++++--------------- src/fiber/types.rs | 85 ++++++++++++++++- 3 files changed, 189 insertions(+), 74 deletions(-) diff --git a/src/ckb/tests/test_utils.rs b/src/ckb/tests/test_utils.rs index b9fcd7310..f47a8166c 100644 --- a/src/ckb/tests/test_utils.rs +++ b/src/ckb/tests/test_utils.rs @@ -472,6 +472,7 @@ impl Actor for MockChainActor { } } TraceTx(tx, reply_port) => { + debug!("Tracing transaction: {:?}", &tx); match state.tx_status.get(&tx.tx_hash).cloned() { Some((tx_view, status)) => { reply_trace_tx(Some(tx_view), status, reply_port); diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index a5fc7db84..9c90f8231 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -39,10 +39,10 @@ use super::{ network::{check_chain_hash, get_chain_hash, GossipMessageWithPeerId, GOSSIP_PROTOCOL_ID}, types::{ BroadcastMessage, BroadcastMessageID, BroadcastMessageQuery, BroadcastMessageQueryFlags, - BroadcastMessageWithTimestamp, BroadcastMessagesFilter, BroadcastMessagesFilterResult, - ChannelAnnouncement, ChannelUpdate, Cursor, GetBroadcastMessages, - GetBroadcastMessagesResult, GossipMessage, NodeAnnouncement, Pubkey, - QueryBroadcastMessages, QueryBroadcastMessagesResult, + BroadcastMessageWithOnChainInfo, BroadcastMessageWithTimestamp, BroadcastMessagesFilter, + BroadcastMessagesFilterResult, ChannelAnnouncement, ChannelOnchainInfo, ChannelUpdate, + Cursor, GetBroadcastMessages, GetBroadcastMessagesResult, GossipMessage, NodeAnnouncement, + Pubkey, QueryBroadcastMessages, QueryBroadcastMessagesResult, }, }; @@ -485,7 +485,7 @@ where Some(last_message) => { // We need the message timestamp to construct a valid cursor. match get_message_cursor( - last_message.clone(), + last_message, &state.store.store, &state.chain_actor, ) @@ -976,7 +976,7 @@ pub struct ExtendedGossipMessageStoreState { chain_actor: ActorRef, next_id: u64, output_ports: HashMap, - messages_to_be_saved: HashSet, + messages_to_be_saved: HashSet, } impl ExtendedGossipMessageStoreState { @@ -1018,10 +1018,9 @@ impl ExtendedGossipMessageStoreState { let mut verified_sorted_messages = Vec::with_capacity(sorted_messages.len()); for message in sorted_messages { - match verify_and_save_broadcast_message(&message, &self.store, &self.chain_actor).await - { + match verify_and_save_broadcast_message(&message, &self.store).await { Ok(_) => { - verified_sorted_messages.push(message); + verified_sorted_messages.push(message.into()); } Err(error) => { trace!( @@ -1047,10 +1046,11 @@ impl ExtendedGossipMessageStoreState { outpoint: &OutPoint, ) -> Option<(u64, ChannelAnnouncement)> { self.messages_to_be_saved.iter().find_map(|m| match m { - BroadcastMessageWithTimestamp::ChannelAnnouncement(timestamp, channel_announcement) - if &channel_announcement.channel_outpoint == outpoint => - { - Some((*timestamp, channel_announcement.clone())) + BroadcastMessageWithOnChainInfo::ChannelAnnouncement( + on_chain_info, + channel_announcement, + ) if &channel_announcement.channel_outpoint == outpoint => { + Some((on_chain_info.timestamp, channel_announcement.clone())) } _ => None, }) @@ -1070,12 +1070,9 @@ impl ExtendedGossipMessageStoreState { } } - let message = - get_broadcast_message_with_timestamp(message.clone(), &self.store, &self.chain_actor) - .await - .map_err(|error| { - GossipMessageProcessingError::ProcessingError(error.to_string()) - })?; + let message = get_broadcast_message_with_on_chain_info(message.clone(), &self.chain_actor) + .await + .map_err(|error| GossipMessageProcessingError::ProcessingError(error.to_string()))?; let max_acceptable_gossip_message_timestamp = max_acceptable_gossip_message_timestamp(); if message.timestamp() > max_acceptable_gossip_message_timestamp { @@ -1086,7 +1083,7 @@ impl ExtendedGossipMessageStoreState { } if !self.announce_private_addr { - if let BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement) = &message { + if let BroadcastMessageWithOnChainInfo::NodeAnnouncement(node_announcement) = &message { if !node_announcement.addresses.iter().any(|addr| { multiaddr_to_socketaddr(addr) .map(|socket_addr| is_reachable(socket_addr.ip())) @@ -1101,12 +1098,12 @@ impl ExtendedGossipMessageStoreState { trace!("New gossip message saved to memory: {:?}", message); self.messages_to_be_saved.insert(message.clone()); - Ok(message) + Ok(message.into()) } - fn has_dependencies_available(&self, message: &BroadcastMessageWithTimestamp) -> bool { + fn has_dependencies_available(&self, message: &BroadcastMessageWithOnChainInfo) -> bool { match message { - BroadcastMessageWithTimestamp::ChannelUpdate(channel_update) => self + BroadcastMessageWithOnChainInfo::ChannelUpdate(channel_update) => self .get_channel_annnouncement(&channel_update.channel_outpoint) .is_some(), _ => true, @@ -1703,12 +1700,30 @@ fn get_dependent_message_queries( } async fn get_message_cursor( - message: BroadcastMessage, + message: &BroadcastMessage, store: &S, chain: &ActorRef, ) -> Result { - let m = get_broadcast_message_with_timestamp(message, store, chain).await?; - Ok(m.cursor()) + match message { + BroadcastMessage::ChannelAnnouncement(channel_announcement) => { + let timestamp = + get_channel_timestamp(&channel_announcement.channel_outpoint, store, chain).await?; + Ok(Cursor::new( + timestamp, + BroadcastMessageID::ChannelAnnouncement( + channel_announcement.channel_outpoint.clone(), + ), + )) + } + BroadcastMessage::ChannelUpdate(channel_update) => Ok(Cursor::new( + channel_update.timestamp, + BroadcastMessageID::ChannelUpdate(channel_update.channel_outpoint.clone()), + )), + BroadcastMessage::NodeAnnouncement(node_announcement) => Ok(Cursor::new( + node_announcement.timestamp, + BroadcastMessageID::NodeAnnouncement(node_announcement.node_id.clone()), + )), + } } fn get_existing_broadcast_message( @@ -1749,25 +1764,24 @@ fn get_existing_newer_broadcast_message( }) } -async fn get_broadcast_message_with_timestamp( +async fn get_broadcast_message_with_on_chain_info( message: BroadcastMessage, - store: &S, chain: &ActorRef, -) -> Result { +) -> Result { match message { BroadcastMessage::ChannelAnnouncement(channel_announcement) => { - let timestamp = - get_channel_timestamp(&channel_announcement.channel_outpoint, store, chain).await?; - Ok(BroadcastMessageWithTimestamp::ChannelAnnouncement( - timestamp, + let on_chain_info = + get_channel_on_chain_info(&channel_announcement.channel_outpoint, chain).await?; + Ok(BroadcastMessageWithOnChainInfo::ChannelAnnouncement( + on_chain_info, channel_announcement, )) } - BroadcastMessage::ChannelUpdate(channel_update) => { - Ok(BroadcastMessageWithTimestamp::ChannelUpdate(channel_update)) - } + BroadcastMessage::ChannelUpdate(channel_update) => Ok( + BroadcastMessageWithOnChainInfo::ChannelUpdate(channel_update), + ), BroadcastMessage::NodeAnnouncement(node_announcement) => Ok( - BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement), + BroadcastMessageWithOnChainInfo::NodeAnnouncement(node_announcement), ), } } @@ -1781,22 +1795,27 @@ async fn get_broadcast_message_with_timestamp( // announcement is saved before the channel announcement, we need to temporarily save the channel // announcement to lagged_messages and wait for the node announcement to be saved. async fn verify_and_save_broadcast_message( - message: &BroadcastMessageWithTimestamp, + message: &BroadcastMessageWithOnChainInfo, store: &S, - chain: &ActorRef, ) -> Result<(), Error> { match message { - BroadcastMessageWithTimestamp::ChannelAnnouncement(timestamp, channel_announcement) => { - if !verify_channel_announcement(channel_announcement, store, chain).await? { - store.save_channel_announcement(*timestamp, channel_announcement.clone()); + BroadcastMessageWithOnChainInfo::ChannelAnnouncement( + on_chain_info, + channel_announcement, + ) => { + if !verify_channel_announcement(channel_announcement, on_chain_info, store).await? { + store.save_channel_announcement( + on_chain_info.timestamp, + channel_announcement.clone(), + ); } } - BroadcastMessageWithTimestamp::ChannelUpdate(channel_update) => { + BroadcastMessageWithOnChainInfo::ChannelUpdate(channel_update) => { if !verify_channel_update(channel_update, store)? { store.save_channel_update(channel_update.clone()); } } - BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement) => { + BroadcastMessageWithOnChainInfo::NodeAnnouncement(node_announcement) => { if !verify_node_announcement(node_announcement, store)? { store.save_node_announcement(node_announcement.clone()); } @@ -1815,7 +1834,7 @@ async fn get_channel_tx( DEFAULT_CHAIN_ACTOR_TIMEOUT, TraceTxRequest { tx_hash: outpoint.tx_hash(), - confirmations: 1, + confirmations: 2, } ) { Ok(TraceTxResponse { @@ -1844,7 +1863,26 @@ async fn get_channel_timestamp( return Ok(timestamp); } - let (_, block_hash) = get_channel_tx(outpoint, chain).await?; + let on_chain_info = get_channel_on_chain_info(outpoint, chain).await?; + + Ok(on_chain_info.timestamp) +} + +async fn get_channel_on_chain_info( + outpoint: &OutPoint, + chain: &ActorRef, +) -> Result { + let (tx, block_hash) = get_channel_tx(outpoint, chain).await?; + let first_output = match tx.inner.outputs.first() { + None => { + return Err(Error::InvalidParameter(format!( + "On-chain transaction found but no output: {:?}", + &outpoint + ))); + } + Some(output) => output.clone(), + }; + let timestamp: u64 = match call_t!( chain, CkbChainMessage::GetBlockTimestamp, @@ -1872,7 +1910,10 @@ async fn get_channel_timestamp( } }; - Ok(timestamp) + Ok(ChannelOnchainInfo { + timestamp, + first_output, + }) } // Verify the channel announcement message. If any error occurs, return the error. @@ -1881,8 +1922,8 @@ async fn get_channel_timestamp( // is true, otherwise it is false. async fn verify_channel_announcement( channel_announcement: &ChannelAnnouncement, + on_chain_info: &ChannelOnchainInfo, store: &S, - chain: &ActorRef, ) -> Result { if let Some((_, announcement)) = store.get_latest_channel_announcement(&channel_announcement.channel_outpoint) @@ -1939,40 +1980,32 @@ async fn verify_channel_announcement( ))); } - let (tx, _) = chain - .get_channel_tx(&channel_announcement.channel_outpoint) - .await?; - let pubkey = channel_announcement.ckb_key.serialize(); let pubkey_hash = &blake2b_256(pubkey.as_slice())[0..20]; - match tx.inner.outputs.first() { - None => { - return Err(Error::InvalidParameter(format!( - "On-chain transaction found but no output: {:?}", - &channel_announcement - ))); - } - Some(output) => { - if output.lock.args.as_bytes() != pubkey_hash { - return Err(Error::InvalidParameter(format!( + + let output = &on_chain_info.first_output; + if output.lock.args.as_bytes() != pubkey_hash { + return Err(Error::InvalidParameter(format!( "On-chain transaction found but pubkey hash mismatched: on chain hash {:?}, pub key ({:?}) hash {:?}", &output.lock.args.as_bytes(), hex::encode(pubkey), &pubkey_hash ))); - } - let capacity: u128 = u64::from(output.capacity).into(); - if channel_announcement.udt_type_script.is_none() - && channel_announcement.capacity > capacity - { + } + let capacity: u128 = u64::from(output.capacity).into(); + match channel_announcement.udt_type_script { + Some(_) => { + // TODO: verify the capacity of the UDT + } + None => { + if channel_announcement.capacity > capacity { return Err(Error::InvalidParameter(format!( - "On-chain transaction found but capacity mismatched: on chain capacity {:?} smaller than annoucned channel capacity {:?}", - &output.capacity, &channel_announcement.capacity - ))); + "On-chain transaction found but capacity mismatched: on chain capacity {:?} smaller than annoucned channel capacity {:?}", + &output.capacity, &channel_announcement.capacity + ))); } - capacity } - }; + } if let Err(err) = secp256k1_instance().verify_schnorr( ckb_signature, diff --git a/src/fiber/types.rs b/src/fiber/types.rs index 9632e2de7..737e9bd7c 100644 --- a/src/fiber/types.rs +++ b/src/fiber/types.rs @@ -11,6 +11,7 @@ use super::r#gen::fiber::PubNonceOpt; use super::serde_utils::{EntityHex, SliceHex}; use crate::ckb::config::{UdtArgInfo, UdtCellDep, UdtCfgInfos, UdtScript}; use crate::ckb::contracts::get_udt_whitelist; +use ckb_jsonrpc_types::CellOutput; use num_enum::IntoPrimitive; use num_enum::TryFromPrimitive; use std::convert::TryFrom; @@ -2442,6 +2443,66 @@ impl BroadcastMessage { } } +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct ChannelOnchainInfo { + pub timestamp: u64, + pub first_output: CellOutput, +} + +// Augment the broadcast message with on-chain information so that we can verify the validity of the message. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub enum BroadcastMessageWithOnChainInfo { + NodeAnnouncement(NodeAnnouncement), + ChannelAnnouncement(ChannelOnchainInfo, ChannelAnnouncement), + ChannelUpdate(ChannelUpdate), +} + +impl BroadcastMessageWithOnChainInfo { + pub fn cursor(&self) -> Cursor { + match self { + Self::NodeAnnouncement(node_announcement) => Cursor::new( + node_announcement.timestamp, + BroadcastMessageID::NodeAnnouncement(node_announcement.node_id), + ), + Self::ChannelAnnouncement(on_chain_info, channel_announcement) => Cursor::new( + on_chain_info.timestamp, + BroadcastMessageID::ChannelAnnouncement( + channel_announcement.channel_outpoint.clone(), + ), + ), + Self::ChannelUpdate(channel_update) => Cursor::new( + channel_update.timestamp, + BroadcastMessageID::ChannelUpdate(channel_update.channel_outpoint.clone()), + ), + } + } + + pub fn timestamp(&self) -> u64 { + match self { + Self::NodeAnnouncement(node_announcement) => node_announcement.timestamp, + Self::ChannelAnnouncement(on_chain_info, _) => on_chain_info.timestamp, + Self::ChannelUpdate(channel_update) => channel_update.timestamp, + } + } + + pub fn message_id(&self) -> BroadcastMessageID { + match self { + Self::NodeAnnouncement(node_announcement) => { + BroadcastMessageID::NodeAnnouncement(node_announcement.node_id) + } + Self::ChannelAnnouncement(_, channel_announcement) => { + BroadcastMessageID::ChannelAnnouncement( + channel_announcement.channel_outpoint.clone(), + ) + } + Self::ChannelUpdate(channel_update) => { + BroadcastMessageID::ChannelUpdate(channel_update.channel_outpoint.clone()) + } + } + } +} + +// Augment the broadcast message with timestamp so that we can easily obtain the cursor of the message. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum BroadcastMessageWithTimestamp { NodeAnnouncement(NodeAnnouncement), @@ -2520,7 +2581,7 @@ impl BroadcastMessageWithTimestamp { } } -impl Ord for BroadcastMessageWithTimestamp { +impl Ord for BroadcastMessageWithOnChainInfo { fn cmp(&self, other: &Self) -> Ordering { self.message_id() .cmp(&other.message_id()) @@ -2528,7 +2589,7 @@ impl Ord for BroadcastMessageWithTimestamp { } } -impl PartialOrd for BroadcastMessageWithTimestamp { +impl PartialOrd for BroadcastMessageWithOnChainInfo { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } @@ -2568,6 +2629,26 @@ impl From<(BroadcastMessage, u64)> for BroadcastMessageWithTimestamp { } } +impl From for BroadcastMessageWithTimestamp { + fn from(broadcast_message_with_onchain_info: BroadcastMessageWithOnChainInfo) -> Self { + match broadcast_message_with_onchain_info { + BroadcastMessageWithOnChainInfo::NodeAnnouncement(node_announcement) => { + BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement) + } + BroadcastMessageWithOnChainInfo::ChannelAnnouncement( + channel_onchain_info, + channel_announcement, + ) => BroadcastMessageWithTimestamp::ChannelAnnouncement( + channel_onchain_info.timestamp, + channel_announcement, + ), + BroadcastMessageWithOnChainInfo::ChannelUpdate(channel_update) => { + BroadcastMessageWithTimestamp::ChannelUpdate(channel_update) + } + } + } +} + impl From for molecule_gossip::BroadcastMessageUnion { fn from(fiber_broadcast_message: BroadcastMessage) -> Self { match fiber_broadcast_message { From 3b371bc443e33eb4eb3ed44d9db11db21f4215cf Mon Sep 17 00:00:00 2001 From: YI Date: Fri, 3 Jan 2025 14:27:42 +0800 Subject: [PATCH 10/18] Check message existance before insert to messages_to_be_saved --- src/fiber/gossip.rs | 11 +++++++++-- src/fiber/types.rs | 20 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index 9c90f8231..f2f70c17e 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -976,7 +976,7 @@ pub struct ExtendedGossipMessageStoreState { chain_actor: ActorRef, next_id: u64, output_ports: HashMap, - messages_to_be_saved: HashSet, + messages_to_be_saved: Vec, } impl ExtendedGossipMessageStoreState { @@ -1070,6 +1070,13 @@ impl ExtendedGossipMessageStoreState { } } + match self.messages_to_be_saved.iter().find(|m| message == *m) { + Some(existing_message) => { + return Ok(existing_message.clone().into()); + } + None => {} + } + let message = get_broadcast_message_with_on_chain_info(message.clone(), &self.chain_actor) .await .map_err(|error| GossipMessageProcessingError::ProcessingError(error.to_string()))?; @@ -1097,7 +1104,7 @@ impl ExtendedGossipMessageStoreState { } trace!("New gossip message saved to memory: {:?}", message); - self.messages_to_be_saved.insert(message.clone()); + self.messages_to_be_saved.push(message.clone()); Ok(message.into()) } diff --git a/src/fiber/types.rs b/src/fiber/types.rs index 737e9bd7c..855e0f7a8 100644 --- a/src/fiber/types.rs +++ b/src/fiber/types.rs @@ -2502,6 +2502,26 @@ impl BroadcastMessageWithOnChainInfo { } } +impl PartialEq for BroadcastMessage { + fn eq(&self, other: &BroadcastMessageWithOnChainInfo) -> bool { + match (self, other) { + ( + BroadcastMessage::NodeAnnouncement(node_announcement), + BroadcastMessageWithOnChainInfo::NodeAnnouncement(other_node_announcement), + ) => node_announcement == other_node_announcement, + ( + BroadcastMessage::ChannelAnnouncement(channel_announcement), + BroadcastMessageWithOnChainInfo::ChannelAnnouncement(_, other_channel_announcement), + ) => channel_announcement == other_channel_announcement, + ( + BroadcastMessage::ChannelUpdate(channel_update), + BroadcastMessageWithOnChainInfo::ChannelUpdate(other_channel_update), + ) => channel_update == other_channel_update, + _ => false, + } + } +} + // Augment the broadcast message with timestamp so that we can easily obtain the cursor of the message. #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum BroadcastMessageWithTimestamp { From 5649b07dc26462b3f3be6bcbc8636c9e5afbac62 Mon Sep 17 00:00:00 2001 From: YI Date: Sat, 4 Jan 2025 17:21:41 +0800 Subject: [PATCH 11/18] Store our own broadcast messages directly --- src/ckb/actor.rs | 29 ++--- src/ckb/tests/test_utils.rs | 9 +- src/fiber/channel.rs | 32 +++-- src/fiber/gossip.rs | 225 +++++++++++++++++------------------- src/fiber/network.rs | 77 +++++++++--- src/fiber/types.rs | 126 +++++++------------- src/store/tests/store.rs | 2 +- 7 files changed, 254 insertions(+), 246 deletions(-) diff --git a/src/ckb/actor.rs b/src/ckb/actor.rs index 994748f41..baf7f0ae2 100644 --- a/src/ckb/actor.rs +++ b/src/ckb/actor.rs @@ -41,18 +41,19 @@ impl TraceTxResponse { } } -#[derive(Debug, Clone)] -pub struct GetBlockTimestampRequest { - block_hash: H256, +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub enum GetBlockTimestampRequest { + BlockNumber(u64), + BlockHash(H256), } impl GetBlockTimestampRequest { pub fn from_block_hash(block_hash: H256) -> Self { - Self { block_hash } + Self::BlockHash(block_hash) } - pub fn block_hash(&self) -> H256 { - self.block_hash.clone() + pub fn from_block_number(block_number: u64) -> Self { + Self::BlockNumber(block_number) } } @@ -257,18 +258,20 @@ impl Actor for CkbChainActor { } } } - CkbChainMessage::GetBlockTimestamp( - GetBlockTimestampRequest { block_hash }, - reply_port, - ) => { + CkbChainMessage::GetBlockTimestamp(request, reply_port) => { let rpc_url = state.config.rpc_url.clone(); + tokio::task::block_in_place(move || { let ckb_client = CkbRpcClient::new(&rpc_url); - let _ = reply_port.send( - ckb_client + let timestamp = match request { + GetBlockTimestampRequest::BlockNumber(block_number) => ckb_client + .get_header_by_number(block_number.into()) + .map(|x| x.map(|x| x.inner.timestamp.into())), + GetBlockTimestampRequest::BlockHash(block_hash) => ckb_client .get_header(block_hash) .map(|x| x.map(|x| x.inner.timestamp.into())), - ); + }; + let _ = reply_port.send(timestamp); }); } } diff --git a/src/ckb/tests/test_utils.rs b/src/ckb/tests/test_utils.rs index f47a8166c..37762010a 100644 --- a/src/ckb/tests/test_utils.rs +++ b/src/ckb/tests/test_utils.rs @@ -6,7 +6,6 @@ use ckb_types::{ core::{DepType, TransactionView}, packed::{CellDep, CellOutput, OutPoint, Script, Transaction}, prelude::{Builder, Entity, IntoTransactionView, Pack, PackVec, Unpack}, - H256, }; use once_cell::sync::{Lazy, OnceCell}; use std::{collections::HashMap, sync::Arc, sync::RwLock}; @@ -16,7 +15,7 @@ use crate::{ ckb::{ config::UdtCfgInfos, contracts::{Contract, ContractsContext, ContractsInfo}, - TraceTxRequest, TraceTxResponse, + GetBlockTimestampRequest, TraceTxRequest, TraceTxResponse, }, now_timestamp_as_millis_u64, }; @@ -501,14 +500,16 @@ impl Actor for MockChainActor { // cause an infinite loop. // So here we create an static lock which is shared across all nodes, and we use this lock to // guarantee that the block timestamp is the same across all nodes. - static BLOCK_TIMESTAMP: OnceCell>> = OnceCell::new(); + static BLOCK_TIMESTAMP: OnceCell< + TokioRwLock>, + > = OnceCell::new(); BLOCK_TIMESTAMP.get_or_init(|| TokioRwLock::new(HashMap::new())); let timestamp = *BLOCK_TIMESTAMP .get() .unwrap() .write() .await - .entry(request.block_hash()) + .entry(request.clone()) .or_insert(now_timestamp_as_millis_u64()); let _ = rpc_reply_port.send(Ok(Some(timestamp))); diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index 43e1b75f6..9680cf830 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -1,6 +1,9 @@ #[cfg(debug_assertions)] use crate::fiber::network::DebugEvent; -use crate::{debug_event, fiber::serde_utils::U64Hex}; +use crate::{ + debug_event, + fiber::{serde_utils::U64Hex, types::BroadcastMessageWithTimestamp}, +}; use bitflags::bitflags; use ckb_jsonrpc_types::BlockNumber; use futures::future::OptionFuture; @@ -25,7 +28,7 @@ use crate::{ }, serde_utils::{CompactSignatureAsBytes, EntityHex, PubNonceAsBytes}, types::{ - AcceptChannel, AddTlc, AnnouncementSignatures, BroadcastMessage, BroadcastMessageQuery, + AcceptChannel, AddTlc, AnnouncementSignatures, BroadcastMessageQuery, BroadcastMessageQueryFlags, ChannelAnnouncement, ChannelReady, ChannelUpdate, ClosingSigned, CommitmentSigned, EcdsaSignature, FiberChannelMessage, FiberMessage, Hash256, OpenChannel, PaymentOnionPacket, PeeledPaymentOnionPacket, Privkey, Pubkey, @@ -1709,7 +1712,7 @@ where event: ChannelEvent, ) -> Result<(), ProcessingChannelError> { match event { - ChannelEvent::FundingTransactionConfirmed(block_number, tx_index) => { + ChannelEvent::FundingTransactionConfirmed(block_number, tx_index, timestamp) => { debug!("Funding transaction confirmed"); let flags = match state.state { ChannelState::AwaitingChannelReady(flags) => flags, @@ -1723,7 +1726,7 @@ where "Expecting funding transaction confirmed event in state AwaitingChannelReady or after TX_SIGNATURES_SENT, but got state {:?}", &state.state))); } }; - state.funding_tx_confirmed_at = Some((block_number, tx_index)); + state.funding_tx_confirmed_at = Some((block_number, tx_index, timestamp)); self.network .send_message(NetworkActorMessage::new_command( NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId::new( @@ -1766,7 +1769,7 @@ where self.network .send_message(NetworkActorMessage::new_command( NetworkActorCommand::BroadcastMessages(vec![ - BroadcastMessage::ChannelUpdate(update), + BroadcastMessageWithTimestamp::ChannelUpdate(update), ]), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); @@ -2766,7 +2769,7 @@ pub struct ChannelActorState { #[serde_as(as = "Option")] pub funding_tx: Option, - pub funding_tx_confirmed_at: Option<(BlockNumber, u32)>, + pub funding_tx_confirmed_at: Option<(BlockNumber, u32, u64)>, #[serde_as(as = "Option")] pub funding_udt_type_script: Option