Skip to content

Commit

Permalink
Store our own broadcast messages directly
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Jan 6, 2025
1 parent 3b371bc commit 5649b07
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 246 deletions.
29 changes: 16 additions & 13 deletions src/ckb/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,19 @@ impl TraceTxResponse {
}
}

#[derive(Debug, Clone)]
pub struct GetBlockTimestampRequest {
block_hash: H256,
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub enum GetBlockTimestampRequest {
BlockNumber(u64),
BlockHash(H256),
}

impl GetBlockTimestampRequest {
pub fn from_block_hash(block_hash: H256) -> Self {
Self { block_hash }
Self::BlockHash(block_hash)
}

pub fn block_hash(&self) -> H256 {
self.block_hash.clone()
pub fn from_block_number(block_number: u64) -> Self {
Self::BlockNumber(block_number)
}
}

Expand Down Expand Up @@ -257,18 +258,20 @@ impl Actor for CkbChainActor {
}
}
}
CkbChainMessage::GetBlockTimestamp(
GetBlockTimestampRequest { block_hash },
reply_port,
) => {
CkbChainMessage::GetBlockTimestamp(request, reply_port) => {
let rpc_url = state.config.rpc_url.clone();

tokio::task::block_in_place(move || {
let ckb_client = CkbRpcClient::new(&rpc_url);
let _ = reply_port.send(
ckb_client
let timestamp = match request {
GetBlockTimestampRequest::BlockNumber(block_number) => ckb_client
.get_header_by_number(block_number.into())
.map(|x| x.map(|x| x.inner.timestamp.into())),
GetBlockTimestampRequest::BlockHash(block_hash) => ckb_client
.get_header(block_hash)
.map(|x| x.map(|x| x.inner.timestamp.into())),
);
};
let _ = reply_port.send(timestamp);
});
}
}
Expand Down
9 changes: 5 additions & 4 deletions src/ckb/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use ckb_types::{
core::{DepType, TransactionView},
packed::{CellDep, CellOutput, OutPoint, Script, Transaction},
prelude::{Builder, Entity, IntoTransactionView, Pack, PackVec, Unpack},
H256,
};
use once_cell::sync::{Lazy, OnceCell};
use std::{collections::HashMap, sync::Arc, sync::RwLock};
Expand All @@ -16,7 +15,7 @@ use crate::{
ckb::{
config::UdtCfgInfos,
contracts::{Contract, ContractsContext, ContractsInfo},
TraceTxRequest, TraceTxResponse,
GetBlockTimestampRequest, TraceTxRequest, TraceTxResponse,
},
now_timestamp_as_millis_u64,
};
Expand Down Expand Up @@ -501,14 +500,16 @@ impl Actor for MockChainActor {
// cause an infinite loop.
// So here we create an static lock which is shared across all nodes, and we use this lock to
// guarantee that the block timestamp is the same across all nodes.
static BLOCK_TIMESTAMP: OnceCell<TokioRwLock<HashMap<H256, u64>>> = OnceCell::new();
static BLOCK_TIMESTAMP: OnceCell<
TokioRwLock<HashMap<GetBlockTimestampRequest, u64>>,
> = OnceCell::new();
BLOCK_TIMESTAMP.get_or_init(|| TokioRwLock::new(HashMap::new()));
let timestamp = *BLOCK_TIMESTAMP
.get()
.unwrap()
.write()
.await
.entry(request.block_hash())
.entry(request.clone())
.or_insert(now_timestamp_as_millis_u64());

let _ = rpc_reply_port.send(Ok(Some(timestamp)));
Expand Down
32 changes: 20 additions & 12 deletions src/fiber/channel.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(debug_assertions)]
use crate::fiber::network::DebugEvent;
use crate::{debug_event, fiber::serde_utils::U64Hex};
use crate::{
debug_event,
fiber::{serde_utils::U64Hex, types::BroadcastMessageWithTimestamp},
};
use bitflags::bitflags;
use ckb_jsonrpc_types::BlockNumber;
use futures::future::OptionFuture;
Expand All @@ -25,7 +28,7 @@ use crate::{
},
serde_utils::{CompactSignatureAsBytes, EntityHex, PubNonceAsBytes},
types::{
AcceptChannel, AddTlc, AnnouncementSignatures, BroadcastMessage, BroadcastMessageQuery,
AcceptChannel, AddTlc, AnnouncementSignatures, BroadcastMessageQuery,
BroadcastMessageQueryFlags, ChannelAnnouncement, ChannelReady, ChannelUpdate,
ClosingSigned, CommitmentSigned, EcdsaSignature, FiberChannelMessage, FiberMessage,
Hash256, OpenChannel, PaymentOnionPacket, PeeledPaymentOnionPacket, Privkey, Pubkey,
Expand Down Expand Up @@ -1709,7 +1712,7 @@ where
event: ChannelEvent,
) -> Result<(), ProcessingChannelError> {
match event {
ChannelEvent::FundingTransactionConfirmed(block_number, tx_index) => {
ChannelEvent::FundingTransactionConfirmed(block_number, tx_index, timestamp) => {
debug!("Funding transaction confirmed");
let flags = match state.state {
ChannelState::AwaitingChannelReady(flags) => flags,
Expand All @@ -1723,7 +1726,7 @@ where
"Expecting funding transaction confirmed event in state AwaitingChannelReady or after TX_SIGNATURES_SENT, but got state {:?}", &state.state)));
}
};
state.funding_tx_confirmed_at = Some((block_number, tx_index));
state.funding_tx_confirmed_at = Some((block_number, tx_index, timestamp));
self.network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::SendFiberMessage(FiberMessageWithPeerId::new(
Expand Down Expand Up @@ -1766,7 +1769,7 @@ where
self.network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::BroadcastMessages(vec![
BroadcastMessage::ChannelUpdate(update),
BroadcastMessageWithTimestamp::ChannelUpdate(update),
]),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Expand Down Expand Up @@ -2766,7 +2769,7 @@ pub struct ChannelActorState {
#[serde_as(as = "Option<EntityHex>")]
pub funding_tx: Option<Transaction>,

pub funding_tx_confirmed_at: Option<(BlockNumber, u32)>,
pub funding_tx_confirmed_at: Option<(BlockNumber, u32, u64)>,

#[serde_as(as = "Option<EntityHex>")]
pub funding_udt_type_script: Option<Script>,
Expand Down Expand Up @@ -2932,7 +2935,7 @@ pub struct ClosedChannel {}
#[derive(Debug)]
pub enum ChannelEvent {
PeerDisconnected,
FundingTransactionConfirmed(BlockNumber, u32),
FundingTransactionConfirmed(BlockNumber, u32, u64),
CommitmentTransactionConfirmed,
ClosingTransactionConfirmed,
CheckTlcSetdown,
Expand Down Expand Up @@ -3453,9 +3456,9 @@ impl ChannelActorState {
let channel_update = self.generate_channel_update(network).await;
network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::BroadcastMessages(vec![BroadcastMessage::ChannelUpdate(
channel_update,
)]),
NetworkActorCommand::BroadcastMessages(vec![
BroadcastMessageWithTimestamp::ChannelUpdate(channel_update),
]),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
}
Expand Down Expand Up @@ -5384,8 +5387,13 @@ impl ChannelActorState {
network
.send_message(NetworkActorMessage::new_command(
NetworkActorCommand::BroadcastMessages(vec![
BroadcastMessage::ChannelAnnouncement(channel_announcement),
BroadcastMessage::ChannelUpdate(channel_update),
BroadcastMessageWithTimestamp::ChannelAnnouncement(
self.funding_tx_confirmed_at
.expect("funding tx confirmed")
.2,
channel_announcement,
),
BroadcastMessageWithTimestamp::ChannelUpdate(channel_update),
]),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
Expand Down
Loading

0 comments on commit 5649b07

Please sign in to comment.