Skip to content

Commit

Permalink
remove external message support (#4034)
Browse files Browse the repository at this point in the history
  • Loading branch information
rob-maron authored Jan 16, 2025
1 parent 94ee43b commit 46e1c0a
Show file tree
Hide file tree
Showing 5 changed files with 4 additions and 163 deletions.
46 changes: 3 additions & 43 deletions crates/hotshot/src/types/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,11 @@ use hotshot_types::{
consensus::Consensus,
data::{Leaf2, QuorumProposal2},
error::HotShotError,
message::{Message, MessageKind, Proposal, RecipientList},
message::Proposal,
request_response::ProposalRequestPayload,
traits::{
consensus_api::ConsensusApi,
election::Membership,
network::{BroadcastDelay, ConnectedNetwork, Topic},
node_implementation::NodeType,
signature_key::SignatureKey,
consensus_api::ConsensusApi, election::Membership, network::ConnectedNetwork,
node_implementation::NodeType, signature_key::SignatureKey,
},
vote::HasViewNumber,
};
Expand Down Expand Up @@ -94,43 +91,6 @@ impl<TYPES: NodeType, I: NodeImplementation<TYPES> + 'static, V: Versions>
self.output_event_stream.1.activate_cloned()
}

/// Message other participants with a serialized message from the application
/// Receivers of this message will get an `Event::ExternalMessageReceived` via
/// the event stream.
///
/// # Errors
/// Errors if serializing the request fails, or the request fails to be sent
pub async fn send_external_message(
&self,
msg: Vec<u8>,
recipients: RecipientList<TYPES::SignatureKey>,
) -> Result<()> {
let message = Message {
sender: self.public_key().clone(),
kind: MessageKind::External(msg),
};
let serialized_message = self.hotshot.upgrade_lock.serialize(&message).await?;

match recipients {
RecipientList::Broadcast => {
self.network
.broadcast_message(serialized_message, Topic::Global, BroadcastDelay::None)
.await?;
}
RecipientList::Direct(recipient) => {
self.network
.direct_message(serialized_message, recipient)
.await?;
}
RecipientList::Many(recipients) => {
self.network
.da_broadcast_message(serialized_message, recipients, BroadcastDelay::None)
.await?;
}
}
Ok(())
}

/// Request a proposal from the all other nodes. Will block until some node
/// returns a valid proposal with the requested commitment. If nobody has the
/// proposal this will block forever
Expand Down
18 changes: 1 addition & 17 deletions crates/task-impls/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use hotshot_task::task::TaskState;
use hotshot_types::{
consensus::OuterConsensus,
data::{VidDisperse, VidDisperseShare, VidDisperseShare2},
event::{Event, EventType, HotShotAction},
event::{Event, HotShotAction},
message::{
convert_proposal, DaConsensusMessage, DataMessage, GeneralConsensusMessage, Message,
MessageKind, Proposal, SequencingMessage, UpgradeLock,
Expand Down Expand Up @@ -226,22 +226,6 @@ impl<TYPES: NodeType> NetworkMessageTaskState<TYPES> {
}
}
},

// Handle external messages
MessageKind::External(data) => {
if sender == self.public_key {
return;
}
// Send the external message to the external event stream so it can be processed
broadcast_event(
Event {
view_number: TYPES::View::new(1),
event: EventType::ExternalMessageReceived { sender, data },
},
&self.external_event_stream,
)
.await;
}
}
}
}
Expand Down
92 changes: 0 additions & 92 deletions crates/testing/tests/tests_1/network_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,98 +110,6 @@ async fn test_network_task() {
));
}

#[cfg(test)]
#[tokio::test(flavor = "multi_thread")]
async fn test_network_external_mnessages() {
use hotshot::types::EventType;
use hotshot_testing::helpers::build_system_handle_from_launcher;
use hotshot_types::message::RecipientList;

hotshot::helpers::initialize_logging();

let builder: TestDescription<TestTypes, MemoryImpl, TestVersions> =
TestDescription::default_multiple_rounds();

let launcher = builder.gen_launcher(0);

let mut handles = vec![];
let mut event_streams = vec![];
for i in 0..launcher.metadata.num_nodes_with_stake {
let handle = build_system_handle_from_launcher::<TestTypes, MemoryImpl, TestVersions>(
i.try_into().unwrap(),
&launcher,
)
.await
.0;
event_streams.push(handle.event_stream_known_impl());
handles.push(handle);
}

// Send a message from 1 -> 2
handles[1]
.send_external_message(vec![1, 2], RecipientList::Direct(handles[2].public_key()))
.await
.unwrap();
let event = tokio::time::timeout(Duration::from_millis(100), event_streams[2].recv())
.await
.unwrap()
.unwrap()
.event;

// check that 2 received the message
assert!(matches!(
event,
EventType::ExternalMessageReceived {
sender,
data,
} if sender == handles[1].public_key() && data == vec![1, 2]
));

// Send a message from 2 -> 1
handles[2]
.send_external_message(vec![2, 1], RecipientList::Direct(handles[1].public_key()))
.await
.unwrap();
let event = tokio::time::timeout(Duration::from_millis(100), event_streams[1].recv())
.await
.unwrap()
.unwrap()
.event;

// check that 1 received the message
assert!(matches!(
event,
EventType::ExternalMessageReceived {
sender,
data,
} if sender == handles[2].public_key() && data == vec![2,1]
));

// Check broadcast works
handles[0]
.send_external_message(vec![0, 0, 0], RecipientList::Broadcast)
.await
.unwrap();
// All other nodes get the broadcast
for stream in event_streams.iter_mut().skip(1) {
let event = tokio::time::timeout(Duration::from_millis(100), stream.recv())
.await
.unwrap()
.unwrap()
.event;
assert!(matches!(
event,
EventType::ExternalMessageReceived {
sender,
data,
} if sender == handles[0].public_key() && data == vec![0,0,0]
));
}
// No event on 0 even after short sleep
tokio::time::sleep(Duration::from_millis(2)).await;
assert!(event_streams[0].is_empty());
}

#[cfg(test)]
#[tokio::test(flavor = "multi_thread")]
async fn test_network_storage_fail() {
Expand Down
8 changes: 0 additions & 8 deletions crates/types/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,6 @@ pub enum EventType<TYPES: NodeType> {
/// Public key of the leader submitting the proposal
sender: TYPES::SignatureKey,
},

/// A message destined for external listeners was received
ExternalMessageReceived {
/// Public Key of the message sender
sender: TYPES::SignatureKey,
/// Serialized data of the message
data: Vec<u8>,
},
}
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
/// A list of actions that we track for nodes
Expand Down
3 changes: 0 additions & 3 deletions crates/types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,6 @@ pub enum MessageKind<TYPES: NodeType> {
Consensus(SequencingMessage<TYPES>),
/// Messages relating to sharing data between nodes
Data(DataMessage<TYPES>),
/// A (still serialized) message to be passed through to external listeners
External(Vec<u8>),
}

/// List of keys to send a message to, or broadcast to all known keys
Expand Down Expand Up @@ -162,7 +160,6 @@ impl<TYPES: NodeType> ViewMessage<TYPES> for MessageKind<TYPES> {
ResponseMessage::Found(m) => m.view_number(),
ResponseMessage::NotFound | ResponseMessage::Denied => TYPES::View::new(1),
},
MessageKind::External(_) => TYPES::View::new(1),
}
}
}
Expand Down

0 comments on commit 46e1c0a

Please sign in to comment.