diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index 35601a966..c9f598e3d 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -13,7 +13,6 @@ use ckb_types::{ H256, }; use lru::LruCache; -use once_cell::sync::OnceCell; use ractor::{ async_trait as rasync_trait, call, call_t, concurrency::{timeout, JoinHandle}, @@ -219,6 +218,161 @@ pub trait GossipMessageStore { fn save_node_announcement(&self, node_announcement: NodeAnnouncement); } +#[derive(Clone)] +pub struct ChainWithCache { + chain_actor: ActorRef, + channel_timestamp_cache: Arc>>, + channel_transaction_cache: Arc>>, +} + +impl ChainWithCache { + fn new(chain_actor: ActorRef) -> Self { + Self { + chain_actor, + // The cache size of 200 is enough to process two batches of 100 broadcast messages. + // So it could be quite enough for the channel announcement messages. Yet it is not + // very memory intensive. + channel_timestamp_cache: Arc::new(Mutex::new(LruCache::new( + NonZero::new(200).unwrap(), + ))), + // Normally, we will first obtain the transaction timestamp + // (the timestamp of the block that contains the transaction) to sort messages by timestamp. + // And then we will verify the transaction is a valid funding transaction for the channel. + // There shouldn't be too many channel announcement transactions in the cache. + channel_transaction_cache: Arc::new(Mutex::new(LruCache::new( + NonZero::new(100).unwrap(), + ))), + } + } + + async fn get_channel_tx(&self, outpoint: &OutPoint) -> Result<(TransactionView, H256), Error> { + let mut channel_transactions = self.channel_transaction_cache.lock().await; + match channel_transactions.get(&outpoint.tx_hash()) { + Some(tx) => { + return Ok(tx.clone()); + } + None => { + // TODO: we should also cache invalid transactions to avoid querying the chain actor, + // but there is a possibility that the transaction is valid while calling the chain actor fails. + match call_t!( + &self.chain_actor, + CkbChainMessage::TraceTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + TraceTxRequest { + tx_hash: outpoint.tx_hash(), + confirmations: 1, + } + ) { + Ok(TraceTxResponse { + tx: Some(tx), + status: + TxStatus { + status: Status::Committed, + block_hash: Some(block_hash), + .. + }, + }) => { + channel_transactions.put(outpoint.tx_hash(), (tx.clone(), block_hash.clone())); + Ok((tx, block_hash)) + }, + err => Err(Error::InvalidParameter(format!( + "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", + &outpoint.tx_hash(), + err + ))), + } + } + } + } + + async fn get_channel_timestamp( + &self, + outpoint: &OutPoint, + store: &S, + ) -> Result { + let mut channel_timestamps = self.channel_timestamp_cache.lock().await; + let timestamp = match channel_timestamps.get(&outpoint.tx_hash()) { + Some(timestamp) => { + return Ok(*timestamp); + } + None => { + if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { + timestamp + } else { + debug!( + "Getting channel announcement message timestamp by calling chain actor: {:?}", + &outpoint + ); + let (_, block_hash) = self.get_channel_tx(outpoint).await?; + match call_t!( + &self.chain_actor, + CkbChainMessage::GetBlockTimestamp, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + GetBlockTimestampRequest::from_block_hash(block_hash.clone()) + ) { + Ok(Ok(Some(timestamp))) => timestamp, + Ok(Ok(None)) => { + return Err(Error::InternalError(anyhow::anyhow!( + "Unable to find block {:?} for channel outpoint {:?}", + &block_hash, + &outpoint + ))); + } + Ok(Err(err)) => { + return Err(Error::CkbRpcError(err)); + } + Err(err) => { + return Err(Error::InternalError(anyhow::Error::new(err).context( + format!( + "Error while trying to obtain block {:?} for channel outpoint {:?}", + block_hash, &outpoint + ), + ))); + } + } + } + } + }; + channel_timestamps.put(outpoint.tx_hash(), timestamp); + Ok(timestamp) + } + + async fn get_message_cursor( + &self, + message: BroadcastMessage, + store: &S, + ) -> Result { + let m = self + .get_broadcast_message_with_timestamp(message, store) + .await?; + Ok(m.cursor()) + } + + async fn get_broadcast_message_with_timestamp( + &self, + message: BroadcastMessage, + store: &S, + ) -> Result { + match message { + BroadcastMessage::ChannelAnnouncement(channel_announcement) => { + let timestamp = self + .get_channel_timestamp(&channel_announcement.channel_outpoint, store) + .await?; + Ok(BroadcastMessageWithTimestamp::ChannelAnnouncement( + timestamp, + channel_announcement, + )) + } + BroadcastMessage::ChannelUpdate(channel_update) => { + Ok(BroadcastMessageWithTimestamp::ChannelUpdate(channel_update)) + } + BroadcastMessage::NodeAnnouncement(node_announcement) => Ok( + BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement), + ), + } + } +} + // A batch of gossip messages has been added to the store since the last time // we pulled new messages/messages are pushed to us. #[derive(Clone, Debug)] @@ -362,7 +516,7 @@ struct SyncingPeerState { pub struct GossipSyncingActorState { peer_id: PeerId, gossip_actor: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, store: ExtendedGossipMessageStore, // The problem of using the cursor from the store is that a malicious peer may only // send large cursor to us, which may cause us to miss some messages. @@ -380,14 +534,14 @@ impl GossipSyncingActorState { fn new( peer_id: PeerId, gossip_actor: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, store: ExtendedGossipMessageStore, cursor: Cursor, ) -> Self { Self { peer_id, gossip_actor, - chain_actor, + chain, store, cursor, peer_state: Default::default(), @@ -439,7 +593,7 @@ where type Arguments = ( PeerId, ActorRef, - ActorRef, + ChainWithCache, ExtendedGossipMessageStore, Cursor, ); @@ -447,7 +601,7 @@ where async fn pre_start( &self, myself: ActorRef, - (peer_id, gossip_actor, chain_actor, store, cursor): Self::Arguments, + (peer_id, gossip_actor, chain, store, cursor): Self::Arguments, ) -> Result { myself .send_message(GossipSyncingActorMessage::NewGetRequest()) @@ -455,7 +609,7 @@ where Ok(GossipSyncingActorState::new( peer_id, gossip_actor, - chain_actor, + chain, store, cursor, )) @@ -494,12 +648,10 @@ where match messages.last() { Some(last_message) => { // We need the message timestamp to construct a valid cursor. - match get_message_cursor( - last_message.clone(), - &state.store.store, - &state.chain_actor, - ) - .await + match state + .chain + .get_message_cursor(last_message.clone(), &state.store.store) + .await { Ok(cursor) => { state.cursor = cursor; @@ -867,7 +1019,7 @@ where announce_private_addr: bool, store: S, gossip_actor: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, supervisor: ActorCell, ) -> Self { let (actor, _) = Actor::spawn_linked( @@ -881,7 +1033,7 @@ where announce_private_addr, store.clone(), gossip_actor, - chain_actor, + chain, ), supervisor, ) @@ -993,7 +1145,7 @@ pub struct ExtendedGossipMessageStoreState { announce_private_addr: bool, store: S, gossip_actor: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, next_id: u64, output_ports: HashMap, messages_to_be_saved: HashSet, @@ -1004,13 +1156,13 @@ impl ExtendedGossipMessageStoreState { announce_private_addr: bool, store: S, gossip_actor: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, ) -> Self { Self { announce_private_addr, store, gossip_actor, - chain_actor, + chain, next_id: Default::default(), output_ports: Default::default(), messages_to_be_saved: Default::default(), @@ -1044,8 +1196,7 @@ impl ExtendedGossipMessageStoreState { let mut verified_sorted_messages = Vec::with_capacity(sorted_messages.len()); for message in sorted_messages { - match verify_and_save_broadcast_message(&message, &self.store, &self.chain_actor).await - { + match verify_and_save_broadcast_message(&message, &self.store, &self.chain).await { Ok(_) => { verified_sorted_messages.push(message); } @@ -1095,12 +1246,11 @@ impl ExtendedGossipMessageStoreState { } } - let message = - get_broadcast_message_with_timestamp(message.clone(), &self.store, &self.chain_actor) - .await - .map_err(|error| { - GossipMessageProcessingError::ProcessingError(error.to_string()) - })?; + let message = self + .chain + .get_broadcast_message_with_timestamp(message.clone(), &self.store) + .await + .map_err(|error| GossipMessageProcessingError::ProcessingError(error.to_string()))?; let max_acceptable_gossip_message_timestamp = max_acceptable_gossip_message_timestamp(); if message.timestamp() > max_acceptable_gossip_message_timestamp { @@ -1168,7 +1318,7 @@ impl Actor for ExtendedGossipMess bool, S, ActorRef, - ActorRef, + ChainWithCache, ); async fn pre_start( @@ -1179,7 +1329,7 @@ impl Actor for ExtendedGossipMess announce_private_addr, store, gossip_actor, - chain_actor, + chain, ): Self::Arguments, ) -> Result { myself.send_interval(gossip_store_maintenance_interval, || { @@ -1189,7 +1339,7 @@ impl Actor for ExtendedGossipMess announce_private_addr, store, gossip_actor, - chain_actor, + chain, )) } @@ -1407,7 +1557,7 @@ pub(crate) struct GossipActorState { num_targeted_outbound_passive_syncing_peers: usize, next_request_id: u64, myself: ActorRef, - chain_actor: ActorRef, + chain: ChainWithCache, // There are some messages missing from our store, and we need to query them from peers. // These messages include channel updates and node announcements related to channel announcements, // and channel announcements related to channel updates. @@ -1539,7 +1689,7 @@ where ( peer_id.clone(), self.myself.clone(), - self.chain_actor.clone(), + self.chain.clone(), self.store.clone(), safe_cursor, ), @@ -1748,15 +1898,6 @@ fn get_dependent_message_queries( queries } -async fn get_message_cursor( - message: BroadcastMessage, - store: &S, - chain: &ActorRef, -) -> Result { - let m = get_broadcast_message_with_timestamp(message, store, chain).await?; - Ok(m.cursor()) -} - fn get_existing_broadcast_message( message: &BroadcastMessage, store: &S, @@ -1795,29 +1936,6 @@ fn get_existing_newer_broadcast_message( }) } -async fn get_broadcast_message_with_timestamp( - message: BroadcastMessage, - store: &S, - chain: &ActorRef, -) -> Result { - match message { - BroadcastMessage::ChannelAnnouncement(channel_announcement) => { - let timestamp = - get_channel_timestamp(&channel_announcement.channel_outpoint, store, chain).await?; - Ok(BroadcastMessageWithTimestamp::ChannelAnnouncement( - timestamp, - channel_announcement, - )) - } - BroadcastMessage::ChannelUpdate(channel_update) => { - Ok(BroadcastMessageWithTimestamp::ChannelUpdate(channel_update)) - } - BroadcastMessage::NodeAnnouncement(node_announcement) => Ok( - BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement), - ), - } -} - // Channel updates depends on channel announcements to obtain the node public keys. // If a channel update is saved before the channel announcement, we can't reliably determine if // this channel update is valid. So we need to save the channel update to lagged_messages and @@ -1829,7 +1947,7 @@ async fn get_broadcast_message_with_timestamp( async fn verify_and_save_broadcast_message( message: &BroadcastMessageWithTimestamp, store: &S, - chain: &ActorRef, + chain: &ChainWithCache, ) -> Result<(), Error> { match message { BroadcastMessageWithTimestamp::ChannelAnnouncement(timestamp, channel_announcement) => { @@ -1851,119 +1969,6 @@ async fn verify_and_save_broadcast_message( Ok(()) } -async fn get_channel_tx( - outpoint: &OutPoint, - chain: &ActorRef, -) -> Result<(TransactionView, H256), Error> { - static CHANNEL_TRANSACTIONS: OnceCell>> = - OnceCell::new(); - static CHANNEL_TRANSACTION_CACHE_SIZE: usize = 20; - CHANNEL_TRANSACTIONS.get_or_init(|| { - Mutex::new(LruCache::new( - NonZero::new(CHANNEL_TRANSACTION_CACHE_SIZE).unwrap(), - )) - }); - let mut channel_transactions = CHANNEL_TRANSACTIONS.get().unwrap().lock().await; - match channel_transactions.get(&outpoint.tx_hash()) { - Some(tx) => { - return Ok(tx.clone()); - } - None => { - // TODO: we should also cache invalid transactions to avoid querying the chain actor, - // but there is a possibility that the transaction is valid while calling the chain actor fails. - match call_t!( - chain, - CkbChainMessage::TraceTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - TraceTxRequest { - tx_hash: outpoint.tx_hash(), - confirmations: 1, - } - ) { - Ok(TraceTxResponse { - tx: Some(tx), - status: - TxStatus { - status: Status::Committed, - block_hash: Some(block_hash), - .. - }, - }) => { - channel_transactions.put(outpoint.tx_hash(), (tx.clone(), block_hash.clone())); - Ok((tx, block_hash)) - }, - err => Err(Error::InvalidParameter(format!( - "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", - &outpoint.tx_hash(), - err - ))), - } - } - } -} - -async fn get_channel_timestamp( - outpoint: &OutPoint, - store: &S, - chain: &ActorRef, -) -> Result { - static CHANNEL_TIMESTAMPS: OnceCell>> = OnceCell::new(); - // The cache size of 200 is enough to process two batches of 100 broadcast messages. - // So it could be quite enough for the channel announcement messages. Yet it is not - // very memory intensive. - static CHANNEL_TIMESTAMP_CACHE_SIZE: usize = 200; - CHANNEL_TIMESTAMPS.get_or_init(|| { - Mutex::new(LruCache::new( - NonZero::new(CHANNEL_TIMESTAMP_CACHE_SIZE).unwrap(), - )) - }); - let mut channel_timestamps = CHANNEL_TIMESTAMPS.get().unwrap().lock().await; - let timestamp = match channel_timestamps.get(&outpoint.tx_hash()) { - Some(timestamp) => { - return Ok(*timestamp); - } - None => { - if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { - timestamp - } else { - debug!( - "Getting channel announcement message timestamp by calling chain actor: {:?}", - &outpoint - ); - let (_, block_hash) = get_channel_tx(outpoint, chain).await?; - match call_t!( - chain, - CkbChainMessage::GetBlockTimestamp, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - GetBlockTimestampRequest::from_block_hash(block_hash.clone()) - ) { - Ok(Ok(Some(timestamp))) => timestamp, - Ok(Ok(None)) => { - return Err(Error::InternalError(anyhow::anyhow!( - "Unable to find block {:?} for channel outpoint {:?}", - &block_hash, - &outpoint - ))); - } - Ok(Err(err)) => { - return Err(Error::CkbRpcError(err)); - } - Err(err) => { - return Err(Error::InternalError(anyhow::Error::new(err).context( - format!( - "Error while trying to obtain block {:?} for channel outpoint {:?}", - block_hash, &outpoint - ), - ))); - } - } - } - } - }; - channel_timestamps.put(outpoint.tx_hash(), timestamp); - Ok(timestamp) -} - // Verify the channel announcement message. If any error occurs, return the error. // Otherwise, return the timestamp of the channel announcement and a bool value indicating if the // the channel announcement is already saved to the store. If it is already saved, the bool value @@ -1971,7 +1976,7 @@ async fn get_channel_timestamp( async fn verify_channel_announcement( channel_announcement: &ChannelAnnouncement, store: &S, - chain: &ActorRef, + chain: &ChainWithCache, ) -> Result { debug!( "Verifying channel announcement message: {:?}", @@ -2037,7 +2042,9 @@ async fn verify_channel_announcement( &channel_announcement ); - let (tx, _) = get_channel_tx(&channel_announcement.channel_outpoint, chain).await?; + let (tx, _) = chain + .get_channel_tx(&channel_announcement.channel_outpoint) + .await?; debug!("Channel announcement transaction found: {:?}", &tx); @@ -2210,6 +2217,7 @@ impl GossipProtocolHandle { { let (network_control_sender, network_control_receiver) = oneshot::channel(); let (store_sender, store_receiver) = oneshot::channel(); + let chain = ChainWithCache::new(chain_actor); let (actor, _handle) = ActorRuntime::spawn_linked_instant( name, @@ -2221,7 +2229,7 @@ impl GossipProtocolHandle { gossip_store_maintenance_interval, announce_private_addr, store, - chain_actor, + chain, ), supervisor, ) @@ -2265,7 +2273,7 @@ where Duration, bool, S, - ActorRef, + ChainWithCache, ); async fn pre_start( @@ -2278,7 +2286,7 @@ where store_maintenance_interval, announce_private_addr, store, - chain_actor, + chain, ): Self::Arguments, ) -> Result { let store = ExtendedGossipMessageStore::new( @@ -2286,7 +2294,7 @@ where announce_private_addr, store, myself.clone(), - chain_actor.clone(), + chain.clone(), myself.get_cell(), ) .await; @@ -2308,7 +2316,7 @@ where num_targeted_active_syncing_peers: MAX_NUM_OF_ACTIVE_SYNCING_PEERS, num_targeted_outbound_passive_syncing_peers: MIN_NUM_OF_PASSIVE_SYNCING_PEERS, myself, - chain_actor, + chain, next_request_id: Default::default(), pending_queries: Default::default(), num_finished_active_syncing_peers: Default::default(),