Skip to content

Commit

Permalink
Add on chain info to messages_to_be_saved
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Jan 3, 2025
1 parent b6bbf40 commit 95cf1f8
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 74 deletions.
1 change: 1 addition & 0 deletions src/ckb/tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,7 @@ impl Actor for MockChainActor {
}
}
TraceTx(tx, reply_port) => {
debug!("Tracing transaction: {:?}", &tx);
match state.tx_status.get(&tx.tx_hash).cloned() {
Some((tx_view, status)) => {
reply_trace_tx(Some(tx_view), status, reply_port);
Expand Down
177 changes: 105 additions & 72 deletions src/fiber/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ use super::{
network::{check_chain_hash, get_chain_hash, GossipMessageWithPeerId, GOSSIP_PROTOCOL_ID},
types::{
BroadcastMessage, BroadcastMessageID, BroadcastMessageQuery, BroadcastMessageQueryFlags,
BroadcastMessageWithTimestamp, BroadcastMessagesFilter, BroadcastMessagesFilterResult,
ChannelAnnouncement, ChannelUpdate, Cursor, GetBroadcastMessages,
GetBroadcastMessagesResult, GossipMessage, NodeAnnouncement, Pubkey,
QueryBroadcastMessages, QueryBroadcastMessagesResult,
BroadcastMessageWithOnChainInfo, BroadcastMessageWithTimestamp, BroadcastMessagesFilter,
BroadcastMessagesFilterResult, ChannelAnnouncement, ChannelOnchainInfo, ChannelUpdate,
Cursor, GetBroadcastMessages, GetBroadcastMessagesResult, GossipMessage, NodeAnnouncement,
Pubkey, QueryBroadcastMessages, QueryBroadcastMessagesResult,
},
};

Expand Down Expand Up @@ -485,7 +485,7 @@ where
Some(last_message) => {
// We need the message timestamp to construct a valid cursor.
match get_message_cursor(
last_message.clone(),
last_message,
&state.store.store,
&state.chain_actor,
)
Expand Down Expand Up @@ -976,7 +976,7 @@ pub struct ExtendedGossipMessageStoreState<S> {
chain_actor: ActorRef<CkbChainMessage>,
next_id: u64,
output_ports: HashMap<u64, BroadcastMessageOutput>,
messages_to_be_saved: HashSet<BroadcastMessageWithTimestamp>,
messages_to_be_saved: HashSet<BroadcastMessageWithOnChainInfo>,
}

impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
Expand Down Expand Up @@ -1018,10 +1018,9 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {

let mut verified_sorted_messages = Vec::with_capacity(sorted_messages.len());
for message in sorted_messages {
match verify_and_save_broadcast_message(&message, &self.store, &self.chain_actor).await
{
match verify_and_save_broadcast_message(&message, &self.store).await {
Ok(_) => {
verified_sorted_messages.push(message);
verified_sorted_messages.push(message.into());
}
Err(error) => {
trace!(
Expand All @@ -1047,10 +1046,11 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
outpoint: &OutPoint,
) -> Option<(u64, ChannelAnnouncement)> {
self.messages_to_be_saved.iter().find_map(|m| match m {
BroadcastMessageWithTimestamp::ChannelAnnouncement(timestamp, channel_announcement)
if &channel_announcement.channel_outpoint == outpoint =>
{
Some((*timestamp, channel_announcement.clone()))
BroadcastMessageWithOnChainInfo::ChannelAnnouncement(
on_chain_info,
channel_announcement,
) if &channel_announcement.channel_outpoint == outpoint => {
Some((on_chain_info.timestamp, channel_announcement.clone()))
}
_ => None,
})
Expand All @@ -1070,12 +1070,9 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
}
}

let message =
get_broadcast_message_with_timestamp(message.clone(), &self.store, &self.chain_actor)
.await
.map_err(|error| {
GossipMessageProcessingError::ProcessingError(error.to_string())
})?;
let message = get_broadcast_message_with_on_chain_info(message.clone(), &self.chain_actor)
.await
.map_err(|error| GossipMessageProcessingError::ProcessingError(error.to_string()))?;

let max_acceptable_gossip_message_timestamp = max_acceptable_gossip_message_timestamp();
if message.timestamp() > max_acceptable_gossip_message_timestamp {
Expand All @@ -1086,7 +1083,7 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
}

if !self.announce_private_addr {
if let BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement) = &message {
if let BroadcastMessageWithOnChainInfo::NodeAnnouncement(node_announcement) = &message {
if !node_announcement.addresses.iter().any(|addr| {
multiaddr_to_socketaddr(addr)
.map(|socket_addr| is_reachable(socket_addr.ip()))
Expand All @@ -1101,12 +1098,12 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {

trace!("New gossip message saved to memory: {:?}", message);
self.messages_to_be_saved.insert(message.clone());
Ok(message)
Ok(message.into())
}

fn has_dependencies_available(&self, message: &BroadcastMessageWithTimestamp) -> bool {
fn has_dependencies_available(&self, message: &BroadcastMessageWithOnChainInfo) -> bool {
match message {
BroadcastMessageWithTimestamp::ChannelUpdate(channel_update) => self
BroadcastMessageWithOnChainInfo::ChannelUpdate(channel_update) => self
.get_channel_annnouncement(&channel_update.channel_outpoint)
.is_some(),
_ => true,
Expand Down Expand Up @@ -1703,12 +1700,30 @@ fn get_dependent_message_queries<S: GossipMessageStore>(
}

async fn get_message_cursor<S: GossipMessageStore>(
message: BroadcastMessage,
message: &BroadcastMessage,
store: &S,
chain: &ActorRef<CkbChainMessage>,
) -> Result<Cursor, Error> {
let m = get_broadcast_message_with_timestamp(message, store, chain).await?;
Ok(m.cursor())
match message {
BroadcastMessage::ChannelAnnouncement(channel_announcement) => {
let timestamp =
get_channel_timestamp(&channel_announcement.channel_outpoint, store, chain).await?;
Ok(Cursor::new(
timestamp,
BroadcastMessageID::ChannelAnnouncement(
channel_announcement.channel_outpoint.clone(),
),
))
}
BroadcastMessage::ChannelUpdate(channel_update) => Ok(Cursor::new(
channel_update.timestamp,
BroadcastMessageID::ChannelUpdate(channel_update.channel_outpoint.clone()),
)),
BroadcastMessage::NodeAnnouncement(node_announcement) => Ok(Cursor::new(
node_announcement.timestamp,
BroadcastMessageID::NodeAnnouncement(node_announcement.node_id.clone()),
)),
}
}

fn get_existing_broadcast_message<S: GossipMessageStore>(
Expand Down Expand Up @@ -1749,25 +1764,24 @@ fn get_existing_newer_broadcast_message<S: GossipMessageStore>(
})
}

async fn get_broadcast_message_with_timestamp<S: GossipMessageStore>(
async fn get_broadcast_message_with_on_chain_info(
message: BroadcastMessage,
store: &S,
chain: &ActorRef<CkbChainMessage>,
) -> Result<BroadcastMessageWithTimestamp, Error> {
) -> Result<BroadcastMessageWithOnChainInfo, Error> {
match message {
BroadcastMessage::ChannelAnnouncement(channel_announcement) => {
let timestamp =
get_channel_timestamp(&channel_announcement.channel_outpoint, store, chain).await?;
Ok(BroadcastMessageWithTimestamp::ChannelAnnouncement(
timestamp,
let on_chain_info =
get_channel_on_chain_info(&channel_announcement.channel_outpoint, chain).await?;
Ok(BroadcastMessageWithOnChainInfo::ChannelAnnouncement(
on_chain_info,
channel_announcement,
))
}
BroadcastMessage::ChannelUpdate(channel_update) => {
Ok(BroadcastMessageWithTimestamp::ChannelUpdate(channel_update))
}
BroadcastMessage::ChannelUpdate(channel_update) => Ok(
BroadcastMessageWithOnChainInfo::ChannelUpdate(channel_update),
),
BroadcastMessage::NodeAnnouncement(node_announcement) => Ok(
BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement),
BroadcastMessageWithOnChainInfo::NodeAnnouncement(node_announcement),
),
}
}
Expand All @@ -1781,22 +1795,27 @@ async fn get_broadcast_message_with_timestamp<S: GossipMessageStore>(
// announcement is saved before the channel announcement, we need to temporarily save the channel
// announcement to lagged_messages and wait for the node announcement to be saved.
async fn verify_and_save_broadcast_message<S: GossipMessageStore>(
message: &BroadcastMessageWithTimestamp,
message: &BroadcastMessageWithOnChainInfo,
store: &S,
chain: &ActorRef<CkbChainMessage>,
) -> Result<(), Error> {
match message {
BroadcastMessageWithTimestamp::ChannelAnnouncement(timestamp, channel_announcement) => {
if !verify_channel_announcement(channel_announcement, store, chain).await? {
store.save_channel_announcement(*timestamp, channel_announcement.clone());
BroadcastMessageWithOnChainInfo::ChannelAnnouncement(
on_chain_info,
channel_announcement,
) => {
if !verify_channel_announcement(channel_announcement, on_chain_info, store).await? {
store.save_channel_announcement(
on_chain_info.timestamp,
channel_announcement.clone(),
);
}
}
BroadcastMessageWithTimestamp::ChannelUpdate(channel_update) => {
BroadcastMessageWithOnChainInfo::ChannelUpdate(channel_update) => {
if !verify_channel_update(channel_update, store)? {
store.save_channel_update(channel_update.clone());
}
}
BroadcastMessageWithTimestamp::NodeAnnouncement(node_announcement) => {
BroadcastMessageWithOnChainInfo::NodeAnnouncement(node_announcement) => {
if !verify_node_announcement(node_announcement, store)? {
store.save_node_announcement(node_announcement.clone());
}
Expand All @@ -1815,7 +1834,7 @@ async fn get_channel_tx(
DEFAULT_CHAIN_ACTOR_TIMEOUT,
TraceTxRequest {
tx_hash: outpoint.tx_hash(),
confirmations: 1,
confirmations: 2,
}
) {
Ok(TraceTxResponse {
Expand Down Expand Up @@ -1844,7 +1863,26 @@ async fn get_channel_timestamp<S: GossipMessageStore>(
return Ok(timestamp);
}

let (_, block_hash) = get_channel_tx(outpoint, chain).await?;
let on_chain_info = get_channel_on_chain_info(outpoint, chain).await?;

Ok(on_chain_info.timestamp)
}

async fn get_channel_on_chain_info(
outpoint: &OutPoint,
chain: &ActorRef<CkbChainMessage>,
) -> Result<ChannelOnchainInfo, Error> {
let (tx, block_hash) = get_channel_tx(outpoint, chain).await?;
let first_output = match tx.inner.outputs.first() {
None => {
return Err(Error::InvalidParameter(format!(
"On-chain transaction found but no output: {:?}",
&outpoint
)));
}
Some(output) => output.clone(),
};

let timestamp: u64 = match call_t!(
chain,
CkbChainMessage::GetBlockTimestamp,
Expand Down Expand Up @@ -1872,7 +1910,10 @@ async fn get_channel_timestamp<S: GossipMessageStore>(
}
};

Ok(timestamp)
Ok(ChannelOnchainInfo {
timestamp,
first_output,
})
}

// Verify the channel announcement message. If any error occurs, return the error.
Expand All @@ -1881,8 +1922,8 @@ async fn get_channel_timestamp<S: GossipMessageStore>(
// is true, otherwise it is false.
async fn verify_channel_announcement<S: GossipMessageStore>(
channel_announcement: &ChannelAnnouncement,
on_chain_info: &ChannelOnchainInfo,
store: &S,
chain: &ActorRef<CkbChainMessage>,
) -> Result<bool, Error> {
if let Some((_, announcement)) =
store.get_latest_channel_announcement(&channel_announcement.channel_outpoint)
Expand Down Expand Up @@ -1939,40 +1980,32 @@ async fn verify_channel_announcement<S: GossipMessageStore>(
)));
}

let (tx, _) = chain
.get_channel_tx(&channel_announcement.channel_outpoint)
.await?;

let pubkey = channel_announcement.ckb_key.serialize();
let pubkey_hash = &blake2b_256(pubkey.as_slice())[0..20];
match tx.inner.outputs.first() {
None => {
return Err(Error::InvalidParameter(format!(
"On-chain transaction found but no output: {:?}",
&channel_announcement
)));
}
Some(output) => {
if output.lock.args.as_bytes() != pubkey_hash {
return Err(Error::InvalidParameter(format!(

let output = &on_chain_info.first_output;
if output.lock.args.as_bytes() != pubkey_hash {
return Err(Error::InvalidParameter(format!(
"On-chain transaction found but pubkey hash mismatched: on chain hash {:?}, pub key ({:?}) hash {:?}",
&output.lock.args.as_bytes(),
hex::encode(pubkey),
&pubkey_hash
)));
}
let capacity: u128 = u64::from(output.capacity).into();
if channel_announcement.udt_type_script.is_none()
&& channel_announcement.capacity > capacity
{
}
let capacity: u128 = u64::from(output.capacity).into();
match channel_announcement.udt_type_script {
Some(_) => {
// TODO: verify the capacity of the UDT
}
None => {
if channel_announcement.capacity > capacity {
return Err(Error::InvalidParameter(format!(
"On-chain transaction found but capacity mismatched: on chain capacity {:?} smaller than annoucned channel capacity {:?}",
&output.capacity, &channel_announcement.capacity
)));
"On-chain transaction found but capacity mismatched: on chain capacity {:?} smaller than annoucned channel capacity {:?}",
&output.capacity, &channel_announcement.capacity
)));
}
capacity
}
};
}

if let Err(err) = secp256k1_instance().verify_schnorr(
ckb_signature,
Expand Down
Loading

0 comments on commit 95cf1f8

Please sign in to comment.