Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save our own broadcast message directly and save raw gossip message from peers to memory first #434

Merged
merged 18 commits into from
Jan 7, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions src/ckb/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,19 @@ impl TraceTxResponse {
}
}

#[derive(Debug, Clone)]
pub struct GetBlockTimestampRequest {
block_hash: H256,
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum GetBlockTimestampRequest {
BlockNumber(u64),
contrun marked this conversation as resolved.
Show resolved Hide resolved
BlockHash(H256),
}

impl GetBlockTimestampRequest {
pub fn from_block_hash(block_hash: H256) -> Self {
Self { block_hash }
Self::BlockHash(block_hash)
}

pub fn block_hash(&self) -> H256 {
self.block_hash.clone()
pub fn from_block_number(block_number: u64) -> Self {
Self::BlockNumber(block_number)
}
}

Expand Down Expand Up @@ -257,18 +258,20 @@ impl Actor for CkbChainActor {
}
}
}
CkbChainMessage::GetBlockTimestamp(
GetBlockTimestampRequest { block_hash },
reply_port,
) => {
CkbChainMessage::GetBlockTimestamp(request, reply_port) => {
let rpc_url = state.config.rpc_url.clone();

tokio::task::block_in_place(move || {
let ckb_client = CkbRpcClient::new(&rpc_url);
let _ = reply_port.send(
ckb_client
let timestamp = match request {
GetBlockTimestampRequest::BlockNumber(block_number) => ckb_client
.get_header_by_number(block_number.into())
.map(|x| x.map(|x| x.inner.timestamp.into())),
GetBlockTimestampRequest::BlockHash(block_hash) => ckb_client
.get_header(block_hash)
.map(|x| x.map(|x| x.inner.timestamp.into())),
);
};
let _ = reply_port.send(timestamp);
});
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/ckb/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use ckb_types::{
core::{DepType, TransactionView},
packed::{CellDep, CellOutput, OutPoint, Script, Transaction},
prelude::{Builder, Entity, IntoTransactionView, Pack, PackVec, Unpack},
H256,
};
use once_cell::sync::{Lazy, OnceCell};
use std::{collections::HashMap, sync::Arc, sync::RwLock};
Expand All @@ -16,7 +15,7 @@ use crate::{
ckb::{
config::UdtCfgInfos,
contracts::{Contract, ContractsContext, ContractsInfo},
TraceTxRequest, TraceTxResponse,
GetBlockTimestampRequest, TraceTxRequest, TraceTxResponse,
},
now_timestamp_as_millis_u64,
};
Expand Down Expand Up @@ -472,6 +471,7 @@ impl Actor for MockChainActor {
}
}
TraceTx(tx, reply_port) => {
debug!("Tracing transaction: {:?}", &tx);
match state.tx_status.get(&tx.tx_hash).cloned() {
Some((tx_view, status)) => {
reply_trace_tx(Some(tx_view), status, reply_port);
Expand Down Expand Up @@ -500,14 +500,16 @@ impl Actor for MockChainActor {
// cause an infinite loop.
// So here we create an static lock which is shared across all nodes, and we use this lock to
// guarantee that the block timestamp is the same across all nodes.
static BLOCK_TIMESTAMP: OnceCell<TokioRwLock<HashMap<H256, u64>>> = OnceCell::new();
static BLOCK_TIMESTAMP: OnceCell<
TokioRwLock<HashMap<GetBlockTimestampRequest, u64>>,
> = OnceCell::new();
BLOCK_TIMESTAMP.get_or_init(|| TokioRwLock::new(HashMap::new()));
let timestamp = *BLOCK_TIMESTAMP
.get()
.unwrap()
.write()
.await
.entry(request.block_hash())
.entry(request.clone())
.or_insert(now_timestamp_as_millis_u64());

let _ = rpc_reply_port.send(Ok(Some(timestamp)));
Expand Down
36 changes: 24 additions & 12 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(debug_assertions)]
use crate::fiber::network::DebugEvent;
use crate::{debug_event, fiber::serde_utils::U64Hex};
use crate::{
debug_event,
fiber::{serde_utils::U64Hex, types::BroadcastMessageWithTimestamp},
};
use bitflags::bitflags;
use ckb_jsonrpc_types::BlockNumber;
use futures::future::OptionFuture;
Expand All @@ -25,7 +28,7 @@ use crate::{
},
serde_utils::{CompactSignatureAsBytes, EntityHex, PubNonceAsBytes},
types::{
AcceptChannel, AddTlc, AnnouncementSignatures, BroadcastMessage, BroadcastMessageQuery,
AcceptChannel, AddTlc, AnnouncementSignatures, BroadcastMessageQuery,
BroadcastMessageQueryFlags, ChannelAnnouncement, ChannelReady, ChannelUpdate,
ClosingSigned, CommitmentSigned, EcdsaSignature, FiberChannelMessage, FiberMessage,
Hash256, OpenChannel, PaymentOnionPacket, PeeledPaymentOnionPacket, Privkey, Pubkey,
Expand Down Expand Up @@ -1709,7 +1712,7 @@ where
event: ChannelEvent,
) -> Result<(), ProcessingChannelError> {
match event {
ChannelEvent::FundingTransactionConfirmed(block_number, tx_index) => {
ChannelEvent::FundingTransactionConfirmed(block_number, tx_index, timestamp) => {
debug!("Funding transaction confirmed");
let flags = match state.state {
ChannelState::AwaitingChannelReady(flags) => flags,
Expand All @@ -1723,7 +1726,7 @@ where
"Expecting funding transaction confirmed event in state AwaitingChannelReady or after TX_SIGNATURES_SENT, but got state {:?}", &state.state)));
}
};
state.funding_tx_confirmed_at = Some((block_number, tx_index));
state.funding_tx_confirmed_at = Some((block_number, tx_index, timestamp));
self.network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId::new(
Expand Down Expand Up @@ -1766,7 +1769,7 @@ where
self.network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::BroadcastMessages(vec![
BroadcastMessage::ChannelUpdate(update),
BroadcastMessageWithTimestamp::ChannelUpdate(update),
]),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Expand Down Expand Up @@ -2766,7 +2769,7 @@ pub struct ChannelActorState {
#[serde_as(as = "Option<EntityHex>")]
pub funding_tx: Option<Transaction>,

pub funding_tx_confirmed_at: Option<(BlockNumber, u32)>,
pub funding_tx_confirmed_at: Option<(BlockNumber, u32, u64)>,

#[serde_as(as = "Option<EntityHex>")]
pub funding_udt_type_script: Option<Script>,
Expand Down Expand Up @@ -2932,7 +2935,7 @@ pub struct ClosedChannel {}
#[derive(Debug)]
pub enum ChannelEvent {
PeerDisconnected,
FundingTransactionConfirmed(BlockNumber, u32),
FundingTransactionConfirmed(BlockNumber, u32, u64),
CommitmentTransactionConfirmed,
ClosingTransactionConfirmed,
CheckTlcSetdown,
Expand Down Expand Up @@ -3453,9 +3456,9 @@ impl ChannelActorState {
let channel_update = self.generate_channel_update(network).await;
network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::BroadcastMessages(vec![BroadcastMessage::ChannelUpdate(
channel_update,
)]),
NetworkActorCommand::BroadcastMessages(vec![
BroadcastMessageWithTimestamp::ChannelUpdate(channel_update),
]),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
}
Expand Down Expand Up @@ -4387,6 +4390,12 @@ impl ChannelActorState {
.expect("Funding transaction outpoint is present")
}

pub fn must_get_funding_transaction_timestamp(&self) -> u64 {
self.funding_tx_confirmed_at
.expect("Funding transaction confirmed at present")
.2
}

pub fn get_local_shutdown_script(&self) -> Script {
self.local_shutdown_script.clone()
}
Expand Down Expand Up @@ -5384,8 +5393,11 @@ impl ChannelActorState {
network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::BroadcastMessages(vec![
BroadcastMessage::ChannelAnnouncement(channel_announcement),
BroadcastMessage::ChannelUpdate(channel_update),
BroadcastMessageWithTimestamp::ChannelAnnouncement(
self.must_get_funding_transaction_timestamp(),
channel_announcement,
),
BroadcastMessageWithTimestamp::ChannelUpdate(channel_update),
]),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Expand Down
Loading
Loading