Skip to content

Commit

Permalink
Cache channel transactions and timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Jan 1, 2025
1 parent ea5c1fe commit 9d49421
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 54 deletions.
31 changes: 29 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ tokio = { version = "1", features = [
indicatif = "0.16"
console = "0.15.8"
bincode = "1.3.3"
lru = "0.12.5"

[features]
default = []
Expand Down
153 changes: 101 additions & 52 deletions src/fiber/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
use std::{
collections::{HashMap, HashSet},
marker::PhantomData,
num::NonZero,
sync::Arc,
time::Duration,
};

use ckb_hash::blake2b_256;
use ckb_jsonrpc_types::{Status, TransactionView, TxStatus};
use ckb_types::{packed::OutPoint, H256};
use ckb_types::{
packed::{Byte32, OutPoint},
H256,
};
use lru::LruCache;
use once_cell::sync::OnceCell;
use ractor::{
async_trait as rasync_trait, call, call_t,
concurrency::{timeout, JoinHandle},
Expand All @@ -26,7 +32,7 @@ use tentacle::{
utils::{is_reachable, multiaddr_to_socketaddr},
SessionId,
};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, Mutex};
use tracing::{debug, error, info, trace, warn};

use crate::{
Expand Down Expand Up @@ -1849,29 +1855,50 @@ async fn get_channel_tx(
outpoint: &OutPoint,
chain: &ActorRef<CkbChainMessage>,
) -> Result<(TransactionView, H256), Error> {
match call_t!(
chain,
CkbChainMessage::TraceTx,
DEFAULT_CHAIN_ACTOR_TIMEOUT,
TraceTxRequest {
tx_hash: outpoint.tx_hash(),
confirmations: 1,
static CHANNEL_TRANSACTIONS: OnceCell<Mutex<LruCache<Byte32, (TransactionView, H256)>>> =
OnceCell::new();
static CHANNEL_TRANSACTION_CACHE_SIZE: usize = 20;
CHANNEL_TRANSACTIONS.get_or_init(|| {
Mutex::new(LruCache::new(
NonZero::new(CHANNEL_TRANSACTION_CACHE_SIZE).unwrap(),
))
});
let mut channel_transactions = CHANNEL_TRANSACTIONS.get().unwrap().lock().await;
match channel_transactions.get(&outpoint.tx_hash()) {
Some(tx) => {
return Ok(tx.clone());
}
) {
Ok(TraceTxResponse {
tx: Some(tx),
status:
TxStatus {
status: Status::Committed,
block_hash: Some(block_hash),
..
None => {
// TODO: we should also cache invalid transactions to avoid querying the chain actor,
// but there is a possibility that the transaction is valid while calling the chain actor fails.
match call_t!(
chain,
CkbChainMessage::TraceTx,
DEFAULT_CHAIN_ACTOR_TIMEOUT,
TraceTxRequest {
tx_hash: outpoint.tx_hash(),
confirmations: 1,
}
) {
Ok(TraceTxResponse {
tx: Some(tx),
status:
TxStatus {
status: Status::Committed,
block_hash: Some(block_hash),
..
},
}) => {
channel_transactions.put(outpoint.tx_hash(), (tx.clone(), block_hash.clone()));
Ok((tx, block_hash))
},
}) => Ok((tx, block_hash)),
err => Err(Error::InvalidParameter(format!(
"Channel announcement transaction {:?} not found or not confirmed, result is: {:?}",
&outpoint.tx_hash(),
err
))),
err => Err(Error::InvalidParameter(format!(
"Channel announcement transaction {:?} not found or not confirmed, result is: {:?}",
&outpoint.tx_hash(),
err
))),
}
}
}
}

Expand All @@ -1880,38 +1907,60 @@ async fn get_channel_timestamp<S: GossipMessageStore>(
store: &S,
chain: &ActorRef<CkbChainMessage>,
) -> Result<u64, Error> {
if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) {
return Ok(timestamp);
}

let (_, block_hash) = get_channel_tx(outpoint, chain).await?;
let timestamp: u64 = match call_t!(
chain,
CkbChainMessage::GetBlockTimestamp,
DEFAULT_CHAIN_ACTOR_TIMEOUT,
GetBlockTimestampRequest::from_block_hash(block_hash.clone())
) {
Ok(Ok(Some(timestamp))) => timestamp,
Ok(Ok(None)) => {
return Err(Error::InternalError(anyhow::anyhow!(
"Unable to find block {:?} for channel outpoint {:?}",
&block_hash,
&outpoint
)));
}
Ok(Err(err)) => {
return Err(Error::CkbRpcError(err));
static CHANNEL_TIMESTAMPS: OnceCell<Mutex<LruCache<Byte32, u64>>> = OnceCell::new();
// The cache size of 200 is enough to process two batches of 100 broadcast messages.
// So it could be quite enough for the channel announcement messages. Yet it is not
// very memory intensive.
static CHANNEL_TIMESTAMP_CACHE_SIZE: usize = 200;
CHANNEL_TIMESTAMPS.get_or_init(|| {
Mutex::new(LruCache::new(
NonZero::new(CHANNEL_TIMESTAMP_CACHE_SIZE).unwrap(),
))
});
let mut channel_timestamps = CHANNEL_TIMESTAMPS.get().unwrap().lock().await;
let timestamp = match channel_timestamps.get(&outpoint.tx_hash()) {
Some(timestamp) => {
return Ok(*timestamp);
}
Err(err) => {
return Err(Error::InternalError(anyhow::Error::new(err).context(
format!(
"Error while trying to obtain block {:?} for channel outpoint {:?}",
block_hash, &outpoint
),
)));
None => {
if let Some((timestamp, _)) = store.get_latest_channel_announcement(&outpoint) {
timestamp
} else {
debug!(
"Getting channel announcement message timestamp by calling chain actor: {:?}",
&outpoint
);
let (_, block_hash) = get_channel_tx(outpoint, chain).await?;
match call_t!(
chain,
CkbChainMessage::GetBlockTimestamp,
DEFAULT_CHAIN_ACTOR_TIMEOUT,
GetBlockTimestampRequest::from_block_hash(block_hash.clone())
) {
Ok(Ok(Some(timestamp))) => timestamp,
Ok(Ok(None)) => {
return Err(Error::InternalError(anyhow::anyhow!(
"Unable to find block {:?} for channel outpoint {:?}",
&block_hash,
&outpoint
)));
}
Ok(Err(err)) => {
return Err(Error::CkbRpcError(err));
}
Err(err) => {
return Err(Error::InternalError(anyhow::Error::new(err).context(
format!(
"Error while trying to obtain block {:?} for channel outpoint {:?}",
block_hash, &outpoint
),
)));
}
}
}
}
};

channel_timestamps.put(outpoint.tx_hash(), timestamp);
Ok(timestamp)
}

Expand Down

0 comments on commit 9d49421

Please sign in to comment.