Skip to content

Commit

Permalink
remove fragment_message function,
Browse files Browse the repository at this point in the history
it doesn't fragment Publish/Forward messages as it's not on the spec data fragmentation, and separating
ControlAction and Subscription message optimization should only be added with proofs
(calculation/benchmarks) it improves performance. Removing it will allow us to simplify the code by
only converting to proto::RPC on the connection handler.
  • Loading branch information
jxs committed Nov 9, 2023
1 parent b007fad commit c3c5e19
Showing 1 changed file with 5 additions and 228 deletions.
233 changes: 5 additions & 228 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2819,139 +2819,14 @@ where
// error and drop the message (all individual messages should be small enough to fit in the
// max_transmit_size)

let messages = self.fragment_message(message)?;

for message in messages {
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::Message(message),
handler: NotifyHandler::Any,
})
}
self.events.push_back(ToSwarm::NotifyHandler {
peer_id,
event: HandlerIn::Message(message),
handler: NotifyHandler::Any,
});
Ok(())
}

// If a message is too large to be sent as-is, this attempts to fragment it into smaller RPC
// messages to be sent.
fn fragment_message(&self, rpc: proto::RPC) -> Result<Vec<proto::RPC>, PublishError> {
if rpc.get_size() < self.config.max_transmit_size() {
return Ok(vec![rpc]);
}

let new_rpc = proto::RPC {
subscriptions: Vec::new(),
publish: Vec::new(),
control: None,
};

let mut rpc_list = vec![new_rpc.clone()];

// Gets an RPC if the object size will fit, otherwise create a new RPC. The last element
// will be the RPC to add an object.
macro_rules! create_or_add_rpc {
($object_size: ident ) => {
let list_index = rpc_list.len() - 1; // the list is never empty

// create a new RPC if the new object plus 5% of its size (for length prefix
// buffers) exceeds the max transmit size.
if rpc_list[list_index].get_size() + (($object_size as f64) * 1.05) as usize
> self.config.max_transmit_size()
&& rpc_list[list_index] != new_rpc
{
// create a new rpc and use this as the current
rpc_list.push(new_rpc.clone());
}
};
}

macro_rules! add_item {
($object: ident, $type: ident ) => {
let object_size = $object.get_size();

if object_size + 2 > self.config.max_transmit_size() {
// This should not be possible. All received and published messages have already
// been vetted to fit within the size.
tracing::error!("Individual message too large to fragment");
return Err(PublishError::MessageTooLarge);
}

create_or_add_rpc!(object_size);
rpc_list
.last_mut()
.expect("Must have at least one element")
.$type
.push($object.clone());
};
}

// Add messages until the limit
for message in &rpc.publish {
add_item!(message, publish);
}
for subscription in &rpc.subscriptions {
add_item!(subscription, subscriptions);
}

// handle the control messages. If all are within the max_transmit_size, send them without
// fragmenting, otherwise, fragment the control messages
let empty_control = proto::ControlMessage::default();
if let Some(control) = rpc.control.as_ref() {
if control.get_size() + 2 > self.config.max_transmit_size() {
// fragment the RPC
for ihave in &control.ihave {
let len = ihave.get_size();
create_or_add_rpc!(len);
rpc_list
.last_mut()
.expect("Always an element")
.control
.get_or_insert_with(|| empty_control.clone())
.ihave
.push(ihave.clone());
}
for iwant in &control.iwant {
let len = iwant.get_size();
create_or_add_rpc!(len);
rpc_list
.last_mut()
.expect("Always an element")
.control
.get_or_insert_with(|| empty_control.clone())
.iwant
.push(iwant.clone());
}
for graft in &control.graft {
let len = graft.get_size();
create_or_add_rpc!(len);
rpc_list
.last_mut()
.expect("Always an element")
.control
.get_or_insert_with(|| empty_control.clone())
.graft
.push(graft.clone());
}
for prune in &control.prune {
let len = prune.get_size();
create_or_add_rpc!(len);
rpc_list
.last_mut()
.expect("Always an element")
.control
.get_or_insert_with(|| empty_control.clone())
.prune
.push(prune.clone());
}
} else {
let len = control.get_size();
create_or_add_rpc!(len);
rpc_list.last_mut().expect("Always an element").control = Some(control.clone());
}
}

Ok(rpc_list)
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
Expand Down Expand Up @@ -3605,7 +3480,6 @@ impl fmt::Debug for PublishConfig {
mod local_test {
use super::*;
use crate::IdentTopic;
use asynchronous_codec::Encoder;
use quickcheck::*;

fn test_message() -> RawMessage {
Expand Down Expand Up @@ -3663,101 +3537,4 @@ mod local_test {
}
}
}

#[test]
/// Tests RPC message fragmentation
fn test_message_fragmentation_deterministic() {
let max_transmit_size = 500;
let config = crate::config::ConfigBuilder::default()
.max_transmit_size(max_transmit_size)
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();
let gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, config).unwrap();

// Message under the limit should be fine.
let mut rpc_proto = RpcOut::Publish(test_message()).into_protobuf();
let fragmented_messages = gs.fragment_message(rpc_proto.clone()).unwrap();
assert_eq!(
fragmented_messages,
vec![rpc_proto.clone()],
"Messages under the limit shouldn't be fragmented"
);

// Messages over the limit should be split

let mut messages = vec![];
while rpc_proto.get_size() < max_transmit_size {
messages.push(test_message());
rpc_proto = RpcOut::Forward(messages.clone()).into_protobuf();
}

let fragmented_messages = gs
.fragment_message(rpc_proto)
.expect("Should be able to fragment the messages");

assert!(
fragmented_messages.len() > 1,
"the message should be fragmented"
);

// all fragmented messages should be under the limit
for message in fragmented_messages {
assert!(
message.get_size() < max_transmit_size,
"all messages should be less than the transmission size"
);
}
}

#[test]
fn test_message_fragmentation() {
fn prop(rpc: RpcOut) {
let max_transmit_size = 500;
let config = crate::config::ConfigBuilder::default()
.max_transmit_size(max_transmit_size)
.validation_mode(ValidationMode::Permissive)
.build()
.unwrap();
let gs: Behaviour = Behaviour::new(MessageAuthenticity::RandomAuthor, config).unwrap();

let mut length_codec = unsigned_varint::codec::UviBytes::default();
length_codec.set_max_len(max_transmit_size);
let mut codec =
crate::protocol::GossipsubCodec::new(length_codec, ValidationMode::Permissive);

let rpc_proto = rpc.into_protobuf();
let fragmented_messages = gs
.fragment_message(rpc_proto.clone())
.expect("Messages must be valid");

if rpc_proto.get_size() < max_transmit_size {
assert_eq!(
fragmented_messages.len(),
1,
"the message should not be fragmented"
);
} else {
assert!(
fragmented_messages.len() > 1,
"the message should be fragmented"
);
}

// all fragmented messages should be under the limit
for message in fragmented_messages {
assert!(
message.get_size() < max_transmit_size,
"all messages should be less than the transmission size: list size {} max size{}", message.get_size(), max_transmit_size
);

// ensure they can all be encoded
let mut buf = bytes::BytesMut::with_capacity(message.get_size());
codec.encode(message, &mut buf).unwrap()
}
}
QuickCheck::new()
.max_tests(100)
.quickcheck(prop as fn(_) -> _)
}
}

0 comments on commit c3c5e19

Please sign in to comment.