diff --git a/Cargo.lock b/Cargo.lock index 4049c90f9..0628d80fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -47,6 +47,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "android-tzdata" version = "0.1.1" @@ -956,7 +962,7 @@ dependencies = [ "jsonrpc-core", "lazy_static", "log", - "lru", + "lru 0.7.8", "parking_lot", "reqwest", "secp256k1 0.29.1", @@ -1064,7 +1070,7 @@ dependencies = [ "ckb-types", "ckb-verification-traits", "derive_more", - "lru", + "lru 0.7.8", "tokio", ] @@ -1618,6 +1624,7 @@ dependencies = [ "jsonrpsee", "lightning-invoice", "lnd-grpc-tonic-client", + "lru 0.12.5", "molecule", "musig2", "nom", @@ -1647,6 +1654,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1902,6 +1915,11 @@ name = "hashbrown" version = "0.15.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "heapsize" @@ -2617,6 +2635,15 @@ dependencies = [ "hashbrown 0.12.3", ] +[[package]] +name = "lru" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38" +dependencies = [ + "hashbrown 0.15.2", +] + [[package]] name = "matchers" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 4b6d37c80..d520738ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -63,6 +63,7 @@ tokio = { version = "1", features = [ indicatif = "0.16" console = "0.15.8" bincode = "1.3.3" +lru = "0.12.5" [features] default = [] diff --git a/src/fiber/gossip.rs b/src/fiber/gossip.rs index 790ff91ef..35601a966 100644 --- a/src/fiber/gossip.rs +++ b/src/fiber/gossip.rs @@ -1,13 +1,19 @@ use std::{ collections::{HashMap, HashSet}, marker::PhantomData, + num::NonZero, sync::Arc, time::Duration, }; use ckb_hash::blake2b_256; use ckb_jsonrpc_types::{Status, TransactionView, TxStatus}; -use ckb_types::{packed::OutPoint, H256}; +use ckb_types::{ + packed::{Byte32, OutPoint}, + H256, +}; +use lru::LruCache; +use once_cell::sync::OnceCell; use ractor::{ async_trait as rasync_trait, call, call_t, concurrency::{timeout, JoinHandle}, @@ -26,7 +32,7 @@ use tentacle::{ utils::{is_reachable, multiaddr_to_socketaddr}, SessionId, }; -use tokio::sync::oneshot; +use tokio::sync::{oneshot, Mutex}; use tracing::{debug, error, info, trace, warn}; use crate::{ @@ -1849,29 +1855,50 @@ async fn get_channel_tx( outpoint: &OutPoint, chain: &ActorRef, ) -> Result<(TransactionView, H256), Error> { - match call_t!( - chain, - CkbChainMessage::TraceTx, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - TraceTxRequest { - tx_hash: outpoint.tx_hash(), - confirmations: 1, + static CHANNEL_TRANSACTIONS: OnceCell>> = + OnceCell::new(); + static CHANNEL_TRANSACTION_CACHE_SIZE: usize = 20; + CHANNEL_TRANSACTIONS.get_or_init(|| { + Mutex::new(LruCache::new( + NonZero::new(CHANNEL_TRANSACTION_CACHE_SIZE).unwrap(), + )) + }); + let mut channel_transactions = CHANNEL_TRANSACTIONS.get().unwrap().lock().await; + match channel_transactions.get(&outpoint.tx_hash()) { + Some(tx) => { + return Ok(tx.clone()); } - ) { - Ok(TraceTxResponse { - tx: Some(tx), - status: - TxStatus { - status: Status::Committed, - block_hash: Some(block_hash), - .. + None => { + // TODO: we should also cache invalid transactions to avoid querying the chain actor, + // but there is a possibility that the transaction is valid while calling the chain actor fails. + match call_t!( + chain, + CkbChainMessage::TraceTx, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + TraceTxRequest { + tx_hash: outpoint.tx_hash(), + confirmations: 1, + } + ) { + Ok(TraceTxResponse { + tx: Some(tx), + status: + TxStatus { + status: Status::Committed, + block_hash: Some(block_hash), + .. + }, + }) => { + channel_transactions.put(outpoint.tx_hash(), (tx.clone(), block_hash.clone())); + Ok((tx, block_hash)) }, - }) => Ok((tx, block_hash)), - err => Err(Error::InvalidParameter(format!( - "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", - &outpoint.tx_hash(), - err - ))), + err => Err(Error::InvalidParameter(format!( + "Channel announcement transaction {:?} not found or not confirmed, result is: {:?}", + &outpoint.tx_hash(), + err + ))), + } + } } } @@ -1880,38 +1907,60 @@ async fn get_channel_timestamp( store: &S, chain: &ActorRef, ) -> Result { - if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { - return Ok(timestamp); - } - - let (_, block_hash) = get_channel_tx(outpoint, chain).await?; - let timestamp: u64 = match call_t!( - chain, - CkbChainMessage::GetBlockTimestamp, - DEFAULT_CHAIN_ACTOR_TIMEOUT, - GetBlockTimestampRequest::from_block_hash(block_hash.clone()) - ) { - Ok(Ok(Some(timestamp))) => timestamp, - Ok(Ok(None)) => { - return Err(Error::InternalError(anyhow::anyhow!( - "Unable to find block {:?} for channel outpoint {:?}", - &block_hash, - &outpoint - ))); - } - Ok(Err(err)) => { - return Err(Error::CkbRpcError(err)); + static CHANNEL_TIMESTAMPS: OnceCell>> = OnceCell::new(); + // The cache size of 200 is enough to process two batches of 100 broadcast messages. + // So it could be quite enough for the channel announcement messages. Yet it is not + // very memory intensive. + static CHANNEL_TIMESTAMP_CACHE_SIZE: usize = 200; + CHANNEL_TIMESTAMPS.get_or_init(|| { + Mutex::new(LruCache::new( + NonZero::new(CHANNEL_TIMESTAMP_CACHE_SIZE).unwrap(), + )) + }); + let mut channel_timestamps = CHANNEL_TIMESTAMPS.get().unwrap().lock().await; + let timestamp = match channel_timestamps.get(&outpoint.tx_hash()) { + Some(timestamp) => { + return Ok(*timestamp); } - Err(err) => { - return Err(Error::InternalError(anyhow::Error::new(err).context( - format!( - "Error while trying to obtain block {:?} for channel outpoint {:?}", - block_hash, &outpoint - ), - ))); + None => { + if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) { + timestamp + } else { + debug!( + "Getting channel announcement message timestamp by calling chain actor: {:?}", + &outpoint + ); + let (_, block_hash) = get_channel_tx(outpoint, chain).await?; + match call_t!( + chain, + CkbChainMessage::GetBlockTimestamp, + DEFAULT_CHAIN_ACTOR_TIMEOUT, + GetBlockTimestampRequest::from_block_hash(block_hash.clone()) + ) { + Ok(Ok(Some(timestamp))) => timestamp, + Ok(Ok(None)) => { + return Err(Error::InternalError(anyhow::anyhow!( + "Unable to find block {:?} for channel outpoint {:?}", + &block_hash, + &outpoint + ))); + } + Ok(Err(err)) => { + return Err(Error::CkbRpcError(err)); + } + Err(err) => { + return Err(Error::InternalError(anyhow::Error::new(err).context( + format!( + "Error while trying to obtain block {:?} for channel outpoint {:?}", + block_hash, &outpoint + ), + ))); + } + } + } } }; - + channel_timestamps.put(outpoint.tx_hash(), timestamp); Ok(timestamp) }