From 3935a2f47948761bc81a930d014f2ea23c43fca4 Mon Sep 17 00:00:00 2001 From: Josh Wilson Date: Fri, 27 Oct 2023 13:33:35 +0200 Subject: [PATCH] fix(gossip): convert gossip data to Bytes. This should help avoid potentially costly clones over as it is processed and published --- protocols/gossipsub/CHANGELOG.md | 1 + protocols/gossipsub/src/behaviour.rs | 9 ++++--- protocols/gossipsub/src/behaviour/tests.rs | 26 +++++++++---------- protocols/gossipsub/src/config.rs | 3 ++- .../gossipsub/src/generated/gossipsub/pb.rs | 5 ++-- protocols/gossipsub/src/mcache.rs | 3 ++- protocols/gossipsub/src/peer_score/tests.rs | 4 ++- protocols/gossipsub/src/protocol.rs | 2 ++ protocols/gossipsub/src/rpc_proto.rs | 3 ++- protocols/gossipsub/src/transform.rs | 14 +++------- protocols/gossipsub/src/types.rs | 5 ++-- 11 files changed, 41 insertions(+), 34 deletions(-) diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index b86ec4de6d4f..3b64ee06d2c7 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,5 +1,6 @@ ## 0.46.0 - unreleased +- Convert `data` to `Bytes` type internally to avoid potentially costly `clone`s. - Remove `fast_message_id_fn` mechanism from `Config`. See [PR 4285](https://github.com/libp2p/rust-libp2p/pull/4285). - Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`. diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index 2a3a13ea6e74..a292f786838c 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -29,6 +29,7 @@ use std::{ time::Duration, }; +use bytes::Bytes; use futures::StreamExt; use futures_ticker::Ticker; use log::{debug, error, trace, warn}; @@ -607,7 +608,9 @@ where topic: impl Into, data: impl Into>, ) -> Result { - let data = data.into(); + // Convert the input data into Bytes + let data = Bytes::from(data.into()); + // Convert the input topic into TopicHash let topic = topic.into(); // Transform the data before building a raw_message. @@ -2734,7 +2737,7 @@ where pub(crate) fn build_raw_message( &mut self, topic: TopicHash, - data: Vec, + data: Bytes, ) -> Result { match &mut self.publish_config { PublishConfig::Signing { @@ -3672,7 +3675,7 @@ mod local_test { fn test_message() -> RawMessage { RawMessage { source: Some(PeerId::random()), - data: vec![0; 100], + data: Bytes::from(vec![0; 100]), sequence_number: None, topic: TopicHash::from_raw("test_topic"), signature: None, diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index dba5db4c01db..49d41abc2390 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -1021,7 +1021,7 @@ fn test_handle_iwant_msg_cached() { let raw_message = RawMessage { source: Some(peers[11]), - data: vec![1, 2, 3, 4], + data: Bytes::from(vec![1, 2, 3, 4]), sequence_number: Some(1u64), topic: TopicHash::from_raw("topic"), signature: None, @@ -1079,7 +1079,7 @@ fn test_handle_iwant_msg_cached_shifted() { for shift in 1..10 { let raw_message = RawMessage { source: Some(peers[11]), - data: vec![1, 2, 3, 4], + data: Bytes::from(vec![1, 2, 3, 4]), sequence_number: Some(shift), topic: TopicHash::from_raw("topic"), signature: None, @@ -1552,7 +1552,7 @@ fn do_forward_messages_to_explicit_peers() { let message = RawMessage { source: Some(peers[1]), - data: vec![12], + data: Bytes::from(vec![12]), sequence_number: Some(0), topic: topic_hashes[0].clone(), signature: None, @@ -1696,7 +1696,7 @@ fn no_gossip_gets_sent_to_explicit_peers() { let message = RawMessage { source: Some(peers[1]), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topic_hashes[0].clone(), signature: None, @@ -2166,7 +2166,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() { //receive message let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topic_hashes[0].clone(), signature: None, @@ -2211,7 +2211,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() { //receive message let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topic_hashes[0].clone(), signature: None, @@ -2575,7 +2575,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() { // Receive message let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topics[0].clone(), signature: None, @@ -2652,7 +2652,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() { // Receive message let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topics[0].clone(), signature: None, @@ -2743,7 +2743,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() { //message that other peers have let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![], + data: Bytes::new(), sequence_number: Some(0), topic: topics[0].clone(), signature: None, @@ -2928,7 +2928,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { let raw_message1 = RawMessage { source: Some(PeerId::random()), - data: vec![1, 2, 3, 4], + data: Bytes::from(vec![1, 2, 3, 4]), sequence_number: Some(1u64), topic: topics[0].clone(), signature: None, @@ -2938,7 +2938,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { let raw_message2 = RawMessage { source: Some(PeerId::random()), - data: vec![1, 2, 3, 4, 5], + data: Bytes::from(vec![1, 2, 3, 4, 5]), sequence_number: Some(2u64), topic: topics[0].clone(), signature: None, @@ -2948,7 +2948,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { let raw_message3 = RawMessage { source: Some(PeerId::random()), - data: vec![1, 2, 3, 4, 5, 6], + data: Bytes::from(vec![1, 2, 3, 4, 5, 6]), sequence_number: Some(3u64), topic: topics[0].clone(), signature: None, @@ -2958,7 +2958,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() { let raw_message4 = RawMessage { source: Some(PeerId::random()), - data: vec![1, 2, 3, 4, 5, 6, 7], + data: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7]), sequence_number: Some(4u64), topic: topics[0].clone(), signature: None, diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 7e79912cc4a6..47f7160031c0 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -862,6 +862,7 @@ mod test { use crate::topic::IdentityHash; use crate::types::PeerKind; use crate::Topic; + use bytes::Bytes; use libp2p_core::UpgradeInfo; use libp2p_swarm::StreamProtocol; use std::collections::hash_map::DefaultHasher; @@ -961,7 +962,7 @@ mod test { fn get_gossipsub_message() -> Message { Message { source: None, - data: vec![12, 34, 56], + data: Bytes::from(vec![12, 34, 56]), sequence_number: None, topic: Topic::::new("test").hash(), } diff --git a/protocols/gossipsub/src/generated/gossipsub/pb.rs b/protocols/gossipsub/src/generated/gossipsub/pb.rs index 9a074fd61fc4..3a68960f8693 100644 --- a/protocols/gossipsub/src/generated/gossipsub/pb.rs +++ b/protocols/gossipsub/src/generated/gossipsub/pb.rs @@ -9,6 +9,7 @@ #![cfg_attr(rustfmt, rustfmt_skip)] +use bytes::Bytes; use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result}; use quick_protobuf::sizeofs::*; use super::super::*; @@ -99,7 +100,7 @@ impl MessageWrite for SubOpts { #[derive(Debug, Default, PartialEq, Clone)] pub struct Message { pub from: Option>, - pub data: Option>, + pub data: Option, pub seqno: Option>, pub topic: String, pub signature: Option>, @@ -112,7 +113,7 @@ impl<'a> MessageRead<'a> for Message { while !r.is_eof() { match r.next_tag(bytes) { Ok(10) => msg.from = Some(r.read_bytes(bytes)?.to_owned()), - Ok(18) => msg.data = Some(r.read_bytes(bytes)?.to_owned()), + Ok(18) => msg.data = Some(Bytes::from(r.read_bytes(bytes)?.to_owned())), Ok(26) => msg.seqno = Some(r.read_bytes(bytes)?.to_owned()), Ok(34) => msg.topic = r.read_string(bytes)?.to_owned(), Ok(42) => msg.signature = Some(r.read_bytes(bytes)?.to_owned()), diff --git a/protocols/gossipsub/src/mcache.rs b/protocols/gossipsub/src/mcache.rs index e85a5bf9c6a2..bd8bc27ca58c 100644 --- a/protocols/gossipsub/src/mcache.rs +++ b/protocols/gossipsub/src/mcache.rs @@ -224,6 +224,7 @@ mod tests { use super::*; use crate::types::RawMessage; use crate::{IdentTopic as Topic, TopicHash}; + use bytes::Bytes; use libp2p_identity::PeerId; fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawMessage) { @@ -235,7 +236,7 @@ mod tests { }; let u8x: u8 = x as u8; let source = Some(PeerId::random()); - let data: Vec = vec![u8x]; + let data = Bytes::from(vec![u8x]); let sequence_number = Some(x); let m = RawMessage { diff --git a/protocols/gossipsub/src/peer_score/tests.rs b/protocols/gossipsub/src/peer_score/tests.rs index 064e277eed76..91e57442714a 100644 --- a/protocols/gossipsub/src/peer_score/tests.rs +++ b/protocols/gossipsub/src/peer_score/tests.rs @@ -18,6 +18,8 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use bytes::Bytes; + /// A collection of unit tests mostly ported from the go implementation. use super::*; @@ -36,7 +38,7 @@ fn within_variance(value: f64, expected: f64, variance: f64) -> bool { fn make_test_message(seq: u64) -> (MessageId, RawMessage) { let raw_message = RawMessage { source: Some(PeerId::random()), - data: vec![12, 34, 56], + data: Bytes::from(vec![12, 34, 56]), sequence_number: Some(seq), topic: Topic::new("test").hash(), signature: None, diff --git a/protocols/gossipsub/src/protocol.rs b/protocols/gossipsub/src/protocol.rs index 15d2f59755a2..f2d77d9dff92 100644 --- a/protocols/gossipsub/src/protocol.rs +++ b/protocols/gossipsub/src/protocol.rs @@ -515,6 +515,7 @@ mod tests { use crate::config::Config; use crate::{Behaviour, ConfigBuilder}; use crate::{IdentTopic as Topic, Version}; + use bytes::Bytes; use libp2p_identity::Keypair; use quickcheck::*; @@ -532,6 +533,7 @@ mod tests { let data = (0..g.gen_range(10..10024u32)) .map(|_| u8::arbitrary(g)) .collect::>(); + let data = Bytes::from(data); let topic_id = TopicId::arbitrary(g).0; Message(gs.build_raw_message(topic_id, data).unwrap()) } diff --git a/protocols/gossipsub/src/rpc_proto.rs b/protocols/gossipsub/src/rpc_proto.rs index 94c7aafbc3e1..37001194d630 100644 --- a/protocols/gossipsub/src/rpc_proto.rs +++ b/protocols/gossipsub/src/rpc_proto.rs @@ -28,6 +28,7 @@ pub(crate) mod proto { mod test { use crate::rpc_proto::proto::compat; use crate::IdentTopic as Topic; + use bytes::Bytes; use libp2p_identity::PeerId; use quick_protobuf::{BytesReader, MessageRead, MessageWrite, Writer}; use rand::Rng; @@ -39,7 +40,7 @@ mod test { let new_message1 = super::proto::Message { from: Some(PeerId::random().to_bytes()), - data: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), + data: Some(Bytes::from(rand::thread_rng().gen::<[u8; 32]>().to_vec())), seqno: Some(rand::thread_rng().gen::<[u8; 8]>().to_vec()), topic: topic1.clone().into_string(), signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()), diff --git a/protocols/gossipsub/src/transform.rs b/protocols/gossipsub/src/transform.rs index 6f57d9fc46b7..7934eecf30f5 100644 --- a/protocols/gossipsub/src/transform.rs +++ b/protocols/gossipsub/src/transform.rs @@ -25,6 +25,8 @@ //! algorithms that can be topic-specific. Once the raw data is transformed the message-id is then //! calculated, allowing for applications to employ message-id functions post compression. +use bytes::Bytes; + use crate::{Message, RawMessage, TopicHash}; /// A general trait of transforming a [`RawMessage`] into a [`Message`]. The @@ -41,11 +43,7 @@ pub trait DataTransform { /// Takes the data to be published (a topic and associated data) transforms the data. The /// transformed data will then be used to create a [`crate::RawMessage`] to be sent to peers. - fn outbound_transform( - &self, - topic: &TopicHash, - data: Vec, - ) -> Result, std::io::Error>; + fn outbound_transform(&self, topic: &TopicHash, data: Bytes) -> Result; } /// The default transform, the raw data is propagated as is to the application layer gossipsub. @@ -62,11 +60,7 @@ impl DataTransform for IdentityTransform { }) } - fn outbound_transform( - &self, - _topic: &TopicHash, - data: Vec, - ) -> Result, std::io::Error> { + fn outbound_transform(&self, _topic: &TopicHash, data: Bytes) -> Result { Ok(data) } } diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 196468b8d328..875a2f4c8ecd 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -20,6 +20,7 @@ //! A collection of types using the Gossipsub system. use crate::TopicHash; +use bytes::Bytes; use libp2p_identity::PeerId; use libp2p_swarm::ConnectionId; use prometheus_client::encoding::EncodeLabelValue; @@ -99,7 +100,7 @@ pub struct RawMessage { pub source: Option, /// Content of the message. Its meaning is out of scope of this library. - pub data: Vec, + pub data: Bytes, /// A random sequence number. pub sequence_number: Option, @@ -140,7 +141,7 @@ pub struct Message { pub source: Option, /// Content of the message. - pub data: Vec, + pub data: Bytes, /// A random sequence number. pub sequence_number: Option,