Skip to content

Commit

Permalink
fix(gossip): convert gossip data to Bytes.
Browse files Browse the repository at this point in the history
This should help avoid potentially costly clones over  as it is
processed and published
  • Loading branch information
joshuef committed Oct 27, 2023
1 parent 459c9d4 commit 3935a2f
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 34 deletions.
1 change: 1 addition & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## 0.46.0 - unreleased

- Convert `data` to `Bytes` type internally to avoid potentially costly `clone`s.
- Remove `fast_message_id_fn` mechanism from `Config`.
See [PR 4285](https://github.com/libp2p/rust-libp2p/pull/4285).
- Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`.
Expand Down
9 changes: 6 additions & 3 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{
time::Duration,
};

use bytes::Bytes;
use futures::StreamExt;
use futures_ticker::Ticker;
use log::{debug, error, trace, warn};
Expand Down Expand Up @@ -607,7 +608,9 @@ where
topic: impl Into<TopicHash>,
data: impl Into<Vec<u8>>,
) -> Result<MessageId, PublishError> {
let data = data.into();
// Convert the input data into Bytes
let data = Bytes::from(data.into());
// Convert the input topic into TopicHash
let topic = topic.into();

// Transform the data before building a raw_message.
Expand Down Expand Up @@ -2734,7 +2737,7 @@ where
pub(crate) fn build_raw_message(
&mut self,
topic: TopicHash,
data: Vec<u8>,
data: Bytes,
) -> Result<RawMessage, PublishError> {
match &mut self.publish_config {
PublishConfig::Signing {
Expand Down Expand Up @@ -3672,7 +3675,7 @@ mod local_test {
fn test_message() -> RawMessage {
RawMessage {
source: Some(PeerId::random()),
data: vec![0; 100],
data: Bytes::from(vec![0; 100]),
sequence_number: None,
topic: TopicHash::from_raw("test_topic"),
signature: None,
Expand Down
26 changes: 13 additions & 13 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ fn test_handle_iwant_msg_cached() {

let raw_message = RawMessage {
source: Some(peers[11]),
data: vec![1, 2, 3, 4],
data: Bytes::from(vec![1, 2, 3, 4]),
sequence_number: Some(1u64),
topic: TopicHash::from_raw("topic"),
signature: None,
Expand Down Expand Up @@ -1079,7 +1079,7 @@ fn test_handle_iwant_msg_cached_shifted() {
for shift in 1..10 {
let raw_message = RawMessage {
source: Some(peers[11]),
data: vec![1, 2, 3, 4],
data: Bytes::from(vec![1, 2, 3, 4]),
sequence_number: Some(shift),
topic: TopicHash::from_raw("topic"),
signature: None,
Expand Down Expand Up @@ -1552,7 +1552,7 @@ fn do_forward_messages_to_explicit_peers() {

let message = RawMessage {
source: Some(peers[1]),
data: vec![12],
data: Bytes::from(vec![12]),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -1696,7 +1696,7 @@ fn no_gossip_gets_sent_to_explicit_peers() {

let message = RawMessage {
source: Some(peers[1]),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -2166,7 +2166,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() {
//receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -2211,7 +2211,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() {
//receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -2575,7 +2575,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() {
// Receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topics[0].clone(),
signature: None,
Expand Down Expand Up @@ -2652,7 +2652,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() {
// Receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topics[0].clone(),
signature: None,
Expand Down Expand Up @@ -2743,7 +2743,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() {
//message that other peers have
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topics[0].clone(),
signature: None,
Expand Down Expand Up @@ -2928,7 +2928,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message1 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4],
data: Bytes::from(vec![1, 2, 3, 4]),
sequence_number: Some(1u64),
topic: topics[0].clone(),
signature: None,
Expand All @@ -2938,7 +2938,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message2 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5],
data: Bytes::from(vec![1, 2, 3, 4, 5]),
sequence_number: Some(2u64),
topic: topics[0].clone(),
signature: None,
Expand All @@ -2948,7 +2948,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message3 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5, 6],
data: Bytes::from(vec![1, 2, 3, 4, 5, 6]),
sequence_number: Some(3u64),
topic: topics[0].clone(),
signature: None,
Expand All @@ -2958,7 +2958,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message4 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5, 6, 7],
data: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7]),
sequence_number: Some(4u64),
topic: topics[0].clone(),
signature: None,
Expand Down
3 changes: 2 additions & 1 deletion protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ mod test {
use crate::topic::IdentityHash;
use crate::types::PeerKind;
use crate::Topic;
use bytes::Bytes;
use libp2p_core::UpgradeInfo;
use libp2p_swarm::StreamProtocol;
use std::collections::hash_map::DefaultHasher;
Expand Down Expand Up @@ -961,7 +962,7 @@ mod test {
fn get_gossipsub_message() -> Message {
Message {
source: None,
data: vec![12, 34, 56],
data: Bytes::from(vec![12, 34, 56]),
sequence_number: None,
topic: Topic::<IdentityHash>::new("test").hash(),
}
Expand Down
5 changes: 3 additions & 2 deletions protocols/gossipsub/src/generated/gossipsub/pb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#![cfg_attr(rustfmt, rustfmt_skip)]


use bytes::Bytes;
use quick_protobuf::{MessageInfo, MessageRead, MessageWrite, BytesReader, Writer, WriterBackend, Result};
use quick_protobuf::sizeofs::*;
use super::super::*;
Expand Down Expand Up @@ -99,7 +100,7 @@ impl MessageWrite for SubOpts {
#[derive(Debug, Default, PartialEq, Clone)]
pub struct Message {
pub from: Option<Vec<u8>>,
pub data: Option<Vec<u8>>,
pub data: Option<Bytes>,
pub seqno: Option<Vec<u8>>,
pub topic: String,
pub signature: Option<Vec<u8>>,
Expand All @@ -112,7 +113,7 @@ impl<'a> MessageRead<'a> for Message {
while !r.is_eof() {
match r.next_tag(bytes) {
Ok(10) => msg.from = Some(r.read_bytes(bytes)?.to_owned()),
Ok(18) => msg.data = Some(r.read_bytes(bytes)?.to_owned()),
Ok(18) => msg.data = Some(Bytes::from(r.read_bytes(bytes)?.to_owned())),
Ok(26) => msg.seqno = Some(r.read_bytes(bytes)?.to_owned()),
Ok(34) => msg.topic = r.read_string(bytes)?.to_owned(),
Ok(42) => msg.signature = Some(r.read_bytes(bytes)?.to_owned()),
Expand Down
3 changes: 2 additions & 1 deletion protocols/gossipsub/src/mcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ mod tests {
use super::*;
use crate::types::RawMessage;
use crate::{IdentTopic as Topic, TopicHash};
use bytes::Bytes;
use libp2p_identity::PeerId;

fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawMessage) {
Expand All @@ -235,7 +236,7 @@ mod tests {
};
let u8x: u8 = x as u8;
let source = Some(PeerId::random());
let data: Vec<u8> = vec![u8x];
let data = Bytes::from(vec![u8x]);
let sequence_number = Some(x);

let m = RawMessage {
Expand Down
4 changes: 3 additions & 1 deletion protocols/gossipsub/src/peer_score/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use bytes::Bytes;

/// A collection of unit tests mostly ported from the go implementation.
use super::*;

Expand All @@ -36,7 +38,7 @@ fn within_variance(value: f64, expected: f64, variance: f64) -> bool {
fn make_test_message(seq: u64) -> (MessageId, RawMessage) {
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![12, 34, 56],
data: Bytes::from(vec![12, 34, 56]),
sequence_number: Some(seq),
topic: Topic::new("test").hash(),
signature: None,
Expand Down
2 changes: 2 additions & 0 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,7 @@ mod tests {
use crate::config::Config;
use crate::{Behaviour, ConfigBuilder};
use crate::{IdentTopic as Topic, Version};
use bytes::Bytes;
use libp2p_identity::Keypair;
use quickcheck::*;

Expand All @@ -532,6 +533,7 @@ mod tests {
let data = (0..g.gen_range(10..10024u32))
.map(|_| u8::arbitrary(g))
.collect::<Vec<_>>();
let data = Bytes::from(data);
let topic_id = TopicId::arbitrary(g).0;
Message(gs.build_raw_message(topic_id, data).unwrap())
}
Expand Down
3 changes: 2 additions & 1 deletion protocols/gossipsub/src/rpc_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(crate) mod proto {
mod test {
use crate::rpc_proto::proto::compat;
use crate::IdentTopic as Topic;
use bytes::Bytes;
use libp2p_identity::PeerId;
use quick_protobuf::{BytesReader, MessageRead, MessageWrite, Writer};
use rand::Rng;
Expand All @@ -39,7 +40,7 @@ mod test {

let new_message1 = super::proto::Message {
from: Some(PeerId::random().to_bytes()),
data: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()),
data: Some(Bytes::from(rand::thread_rng().gen::<[u8; 32]>().to_vec())),
seqno: Some(rand::thread_rng().gen::<[u8; 8]>().to_vec()),
topic: topic1.clone().into_string(),
signature: Some(rand::thread_rng().gen::<[u8; 32]>().to_vec()),
Expand Down
14 changes: 4 additions & 10 deletions protocols/gossipsub/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
//! algorithms that can be topic-specific. Once the raw data is transformed the message-id is then
//! calculated, allowing for applications to employ message-id functions post compression.
use bytes::Bytes;

use crate::{Message, RawMessage, TopicHash};

/// A general trait of transforming a [`RawMessage`] into a [`Message`]. The
Expand All @@ -41,11 +43,7 @@ pub trait DataTransform {

/// Takes the data to be published (a topic and associated data) transforms the data. The
/// transformed data will then be used to create a [`crate::RawMessage`] to be sent to peers.
fn outbound_transform(
&self,
topic: &TopicHash,
data: Vec<u8>,
) -> Result<Vec<u8>, std::io::Error>;
fn outbound_transform(&self, topic: &TopicHash, data: Bytes) -> Result<Bytes, std::io::Error>;
}

/// The default transform, the raw data is propagated as is to the application layer gossipsub.
Expand All @@ -62,11 +60,7 @@ impl DataTransform for IdentityTransform {
})
}

fn outbound_transform(
&self,
_topic: &TopicHash,
data: Vec<u8>,
) -> Result<Vec<u8>, std::io::Error> {
fn outbound_transform(&self, _topic: &TopicHash, data: Bytes) -> Result<Bytes, std::io::Error> {
Ok(data)
}
}
5 changes: 3 additions & 2 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

//! A collection of types using the Gossipsub system.
use crate::TopicHash;
use bytes::Bytes;
use libp2p_identity::PeerId;
use libp2p_swarm::ConnectionId;
use prometheus_client::encoding::EncodeLabelValue;
Expand Down Expand Up @@ -99,7 +100,7 @@ pub struct RawMessage {
pub source: Option<PeerId>,

/// Content of the message. Its meaning is out of scope of this library.
pub data: Vec<u8>,
pub data: Bytes,

/// A random sequence number.
pub sequence_number: Option<u64>,
Expand Down Expand Up @@ -140,7 +141,7 @@ pub struct Message {
pub source: Option<PeerId>,

/// Content of the message.
pub data: Vec<u8>,
pub data: Bytes,

/// A random sequence number.
pub sequence_number: Option<u64>,
Expand Down

0 comments on commit 3935a2f

Please sign in to comment.