From 5649b07dc26462b3f3be6bcbc8636c9e5afbac62 Mon Sep 17 00:00:00 2001 From: YI Date: Sat, 4 Jan 2025 17:21:41 +0800 Subject: [PATCH] Store our own broadcast messages directly --- src/ckb/actor.rs | 29 ++--- src/ckb/tests/test_utils.rs | 9 +- src/fiber/channel.rs | 32 +++-- src/fiber/gossip.rs | 225 +++++++++++++++++------------------- src/fiber/network.rs | 77 +++++++++--- src/fiber/types.rs | 126 +++++++------------- src/store/tests/store.rs | 2 +- 7 files changed, 254 insertions(+), 246 deletions(-) diff --git a/src/ckb/actor.rs b/src/ckb/actor.rs index 994748f41..baf7f0ae2 100644 --- a/src/ckb/actor.rs +++ b/src/ckb/actor.rs @@ -41,18 +41,19 @@ impl TraceTxResponse { } } -#[derive(Debug, Clone)] -pub struct GetBlockTimestampRequest { - block_hash: H256, +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub enum GetBlockTimestampRequest { + BlockNumber(u64), + BlockHash(H256), } impl GetBlockTimestampRequest { pub fn from_block_hash(block_hash: H256) -> Self { - Self { block_hash } + Self::BlockHash(block_hash) } - pub fn block_hash(&self) -> H256 { - self.block_hash.clone() + pub fn from_block_number(block_number: u64) -> Self { + Self::BlockNumber(block_number) } } @@ -257,18 +258,20 @@ impl Actor for CkbChainActor { } } } - CkbChainMessage::GetBlockTimestamp( - GetBlockTimestampRequest { block_hash }, - reply_port, - ) => { + CkbChainMessage::GetBlockTimestamp(request, reply_port) => { let rpc_url = state.config.rpc_url.clone(); + tokio::task::block_in_place(move || { let ckb_client = CkbRpcClient::new(&rpc_url); - let _ = reply_port.send( - ckb_client + let timestamp = match request { + GetBlockTimestampRequest::BlockNumber(block_number) => ckb_client + .get_header_by_number(block_number.into()) + .map(|x| x.map(|x| x.inner.timestamp.into())), + GetBlockTimestampRequest::BlockHash(block_hash) => ckb_client .get_header(block_hash) .map(|x| x.map(|x| x.inner.timestamp.into())), - ); + }; + let _ = reply_port.send(timestamp); }); } } diff --git a/src/ckb/tests/test_utils.rs b/src/ckb/tests/test_utils.rs index f47a8166c..37762010a 100644 --- a/src/ckb/tests/test_utils.rs +++ b/src/ckb/tests/test_utils.rs @@ -6,7 +6,6 @@ use ckb_types::{ core::{DepType, TransactionView}, packed::{CellDep, CellOutput, OutPoint, Script, Transaction}, prelude::{Builder, Entity, IntoTransactionView, Pack, PackVec, Unpack}, - H256, }; use once_cell::sync::{Lazy, OnceCell}; use std::{collections::HashMap, sync::Arc, sync::RwLock}; @@ -16,7 +15,7 @@ use crate::{ ckb::{ config::UdtCfgInfos, contracts::{Contract, ContractsContext, ContractsInfo}, - TraceTxRequest, TraceTxResponse, + GetBlockTimestampRequest, TraceTxRequest, TraceTxResponse, }, now_timestamp_as_millis_u64, }; @@ -501,14 +500,16 @@ impl Actor for MockChainActor { // cause an infinite loop. // So here we create an static lock which is shared across all nodes, and we use this lock to // guarantee that the block timestamp is the same across all nodes. - static BLOCK_TIMESTAMP: OnceCell>> = OnceCell::new(); + static BLOCK_TIMESTAMP: OnceCell< + TokioRwLock>, + > = OnceCell::new(); BLOCK_TIMESTAMP.get_or_init(|| TokioRwLock::new(HashMap::new())); let timestamp = *BLOCK_TIMESTAMP .get() .unwrap() .write() .await - .entry(request.block_hash()) + .entry(request.clone()) .or_insert(now_timestamp_as_millis_u64()); let _ = rpc_reply_port.send(Ok(Some(timestamp))); diff --git a/src/fiber/channel.rs b/src/fiber/channel.rs index 43e1b75f6..9680cf830 100644 --- a/src/fiber/channel.rs +++ b/src/fiber/channel.rs @@ -1,6 +1,9 @@ #[cfg(debug_assertions)] use crate::fiber::network::DebugEvent; -use crate::{debug_event, fiber::serde_utils::U64Hex}; +use crate::{ + debug_event, + fiber::{serde_utils::U64Hex, types::BroadcastMessageWithTimestamp}, +}; use bitflags::bitflags; use ckb_jsonrpc_types::BlockNumber; use futures::future::OptionFuture; @@ -25,7 +28,7 @@ use crate::{ }, serde_utils::{CompactSignatureAsBytes, EntityHex, PubNonceAsBytes}, types::{ - AcceptChannel, AddTlc, AnnouncementSignatures, BroadcastMessage, BroadcastMessageQuery, + AcceptChannel, AddTlc, AnnouncementSignatures, BroadcastMessageQuery, BroadcastMessageQueryFlags, ChannelAnnouncement, ChannelReady, ChannelUpdate, ClosingSigned, CommitmentSigned, EcdsaSignature, FiberChannelMessage, FiberMessage, Hash256, OpenChannel, PaymentOnionPacket, PeeledPaymentOnionPacket, Privkey, Pubkey, @@ -1709,7 +1712,7 @@ where event: ChannelEvent, ) -> Result<(), ProcessingChannelError> { match event { - ChannelEvent::FundingTransactionConfirmed(block_number, tx_index) => { + ChannelEvent::FundingTransactionConfirmed(block_number, tx_index, timestamp) => { debug!("Funding transaction confirmed"); let flags = match state.state { ChannelState::AwaitingChannelReady(flags) => flags, @@ -1723,7 +1726,7 @@ where "Expecting funding transaction confirmed event in state AwaitingChannelReady or after TX_SIGNATURES_SENT, but got state {:?}", &state.state))); } }; - state.funding_tx_confirmed_at = Some((block_number, tx_index)); + state.funding_tx_confirmed_at = Some((block_number, tx_index, timestamp)); self.network .send_message(NetworkActorMessage::new_command( NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId::new( @@ -1766,7 +1769,7 @@ where self.network .send_message(NetworkActorMessage::new_command( NetworkActorCommand::BroadcastMessages(vec![ - BroadcastMessage::ChannelUpdate(update), + BroadcastMessageWithTimestamp::ChannelUpdate(update), ]), )) .expect(ASSUME_NETWORK_ACTOR_ALIVE); @@ -2766,7 +2769,7 @@ pub struct ChannelActorState { #[serde_as(as = "Option")] pub funding_tx: Option, - pub funding_tx_confirmed_at: Option<(BlockNumber, u32)>, + pub funding_tx_confirmed_at: Option<(BlockNumber, u32, u64)>, #[serde_as(as = "Option")] pub funding_udt_type_script: Option