Skip to content

Commit

Permalink
Check message existance before insert to messages_to_be_saved
Browse files Browse the repository at this point in the history
  • Loading branch information
contrun committed Jan 3, 2025
1 parent dba9190 commit d58843a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
15 changes: 13 additions & 2 deletions src/fiber/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ pub struct ExtendedGossipMessageStoreState<S> {
chain_actor: ActorRef<CkbChainMessage>,
next_id: u64,
output_ports: HashMap<u64, BroadcastMessageOutput>,
messages_to_be_saved: HashSet<BroadcastMessageWithOnChainInfo>,
messages_to_be_saved: Vec<BroadcastMessageWithOnChainInfo>,
}

impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
Expand Down Expand Up @@ -1083,6 +1083,13 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
}
}

match self.messages_to_be_saved.iter().find(|m| message == *m) {
Some(existing_message) => {
return Ok(existing_message.clone().into());
}
None => {}
}

let message = get_broadcast_message_with_on_chain_info(message.clone(), &self.chain_actor)
.await
.map_err(|error| GossipMessageProcessingError::ProcessingError(error.to_string()))?;
Expand Down Expand Up @@ -1110,11 +1117,15 @@ impl<S: GossipMessageStore> ExtendedGossipMessageStoreState<S> {
}

trace!("New gossip message saved to memory: {:?}", message);
self.messages_to_be_saved.insert(message.clone());
self.messages_to_be_saved.push(message.clone());
Ok(message.into())
}

fn has_dependencies_available(&self, message: &BroadcastMessageWithOnChainInfo) -> bool {
trace!(
"Checking if the dependencies of message {:?} are available",
message
);
match message {
BroadcastMessageWithOnChainInfo::ChannelUpdate(channel_update) => self
.get_channel_annnouncement(&channel_update.channel_outpoint)
Expand Down
20 changes: 20 additions & 0 deletions src/fiber/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2502,6 +2502,26 @@ impl BroadcastMessageWithOnChainInfo {
}
}

impl PartialEq<BroadcastMessageWithOnChainInfo> for BroadcastMessage {
fn eq(&self, other: &BroadcastMessageWithOnChainInfo) -> bool {
match (self, other) {
(
BroadcastMessage::NodeAnnouncement(node_announcement),
BroadcastMessageWithOnChainInfo::NodeAnnouncement(other_node_announcement),
) => node_announcement == other_node_announcement,
(
BroadcastMessage::ChannelAnnouncement(channel_announcement),
BroadcastMessageWithOnChainInfo::ChannelAnnouncement(_, other_channel_announcement),
) => channel_announcement == other_channel_announcement,
(
BroadcastMessage::ChannelUpdate(channel_update),
BroadcastMessageWithOnChainInfo::ChannelUpdate(other_channel_update),
) => channel_update == other_channel_update,
_ => false,
}
}
}

// Augment the broadcast message with timestamp so that we can easily obtain the cursor of the message.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum BroadcastMessageWithTimestamp {
Expand Down

0 comments on commit d58843a

Please sign in to comment.