Skip to content

Commit

Permalink
fix(gossip): convert gossip data to Bytes.
Browse files Browse the repository at this point in the history
This should help avoid potentially costly clones over  as it is
processed and published
  • Loading branch information
joshuef committed Oct 28, 2023
1 parent 459c9d4 commit 25893b2
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 45 deletions.
2 changes: 1 addition & 1 deletion examples/chat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
Ok(Some(line)) = stdin.next_line() => {
if let Err(e) = swarm
.behaviour_mut().gossipsub
.publish(topic.clone(), line.as_bytes()) {
.publish(topic.clone(), line.as_bytes().to_vec()) {
println!("Publish error: {e:?}");
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-private/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
if let Err(e) = swarm
.behaviour_mut()
.gossipsub
.publish(gossipsub_topic.clone(), line.as_bytes())
.publish(gossipsub_topic.clone(), line.as_bytes().to_vec())
{
println!("Publish error: {e:?}");
}
Expand Down
1 change: 1 addition & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## 0.46.0 - unreleased

- Convert `publish` to require `data: impl Into<Bytes>`.
- Remove `fast_message_id_fn` mechanism from `Config`.
See [PR 4285](https://github.com/libp2p/rust-libp2p/pull/4285).
- Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`.
Expand Down
12 changes: 7 additions & 5 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{
time::Duration,
};

use bytes::Bytes;
use futures::StreamExt;
use futures_ticker::Ticker;
use log::{debug, error, trace, warn};
Expand Down Expand Up @@ -605,9 +606,10 @@ where
pub fn publish(
&mut self,
topic: impl Into<TopicHash>,
data: impl Into<Vec<u8>>,
data: impl Into<Bytes>,
) -> Result<MessageId, PublishError> {
let data = data.into();
let data = Bytes::from(data.into());
// Convert the input topic into TopicHash
let topic = topic.into();

// Transform the data before building a raw_message.
Expand Down Expand Up @@ -2734,7 +2736,7 @@ where
pub(crate) fn build_raw_message(
&mut self,
topic: TopicHash,
data: Vec<u8>,
data: Bytes,
) -> Result<RawMessage, PublishError> {
match &mut self.publish_config {
PublishConfig::Signing {
Expand All @@ -2748,7 +2750,7 @@ where
let signature = {
let message = proto::Message {
from: Some(author.to_bytes()),
data: Some(data.clone()),
data: Some(data.to_vec()),
seqno: Some(sequence_number.to_be_bytes().to_vec()),
topic: topic.clone().into_string(),
signature: None,
Expand Down Expand Up @@ -3672,7 +3674,7 @@ mod local_test {
fn test_message() -> RawMessage {
RawMessage {
source: Some(PeerId::random()),
data: vec![0; 100],
data: Bytes::from(vec![0; 100]),
sequence_number: None,
topic: TopicHash::from_raw("test_topic"),
signature: None,
Expand Down
28 changes: 14 additions & 14 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc {
for message in rpc.publish.into_iter() {
messages.push(RawMessage {
source: message.from.map(|x| PeerId::from_bytes(&x).unwrap()),
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number: message.seqno.map(|x| BigEndian::read_u64(&x)), // don't inform the application
topic: TopicHash::from_raw(message.topic),
signature: message.signature, // don't inform the application
Expand Down Expand Up @@ -1021,7 +1021,7 @@ fn test_handle_iwant_msg_cached() {

let raw_message = RawMessage {
source: Some(peers[11]),
data: vec![1, 2, 3, 4],
data: Bytes::from(vec![1, 2, 3, 4]),
sequence_number: Some(1u64),
topic: TopicHash::from_raw("topic"),
signature: None,
Expand Down Expand Up @@ -1079,7 +1079,7 @@ fn test_handle_iwant_msg_cached_shifted() {
for shift in 1..10 {
let raw_message = RawMessage {
source: Some(peers[11]),
data: vec![1, 2, 3, 4],
data: Bytes::from(vec![1, 2, 3, 4]),
sequence_number: Some(shift),
topic: TopicHash::from_raw("topic"),
signature: None,
Expand Down Expand Up @@ -1552,7 +1552,7 @@ fn do_forward_messages_to_explicit_peers() {

let message = RawMessage {
source: Some(peers[1]),
data: vec![12],
data: Bytes::from(vec![12]),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -1696,7 +1696,7 @@ fn no_gossip_gets_sent_to_explicit_peers() {

let message = RawMessage {
source: Some(peers[1]),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -2166,7 +2166,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() {
//receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -2211,7 +2211,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() {
//receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -2575,7 +2575,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() {
// Receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topics[0].clone(),
signature: None,
Expand Down Expand Up @@ -2652,7 +2652,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() {
// Receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topics[0].clone(),
signature: None,
Expand Down Expand Up @@ -2743,7 +2743,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() {
//message that other peers have
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topics[0].clone(),
signature: None,
Expand Down Expand Up @@ -2928,7 +2928,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message1 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4],
data: Bytes::from(vec![1, 2, 3, 4]),
sequence_number: Some(1u64),
topic: topics[0].clone(),
signature: None,
Expand All @@ -2938,7 +2938,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message2 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5],
data: Bytes::from(vec![1, 2, 3, 4, 5]),
sequence_number: Some(2u64),
topic: topics[0].clone(),
signature: None,
Expand All @@ -2948,7 +2948,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message3 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5, 6],
data: Bytes::from(vec![1, 2, 3, 4, 5, 6]),
sequence_number: Some(3u64),
topic: topics[0].clone(),
signature: None,
Expand All @@ -2958,7 +2958,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message4 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5, 6, 7],
data: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7]),
sequence_number: Some(4u64),
topic: topics[0].clone(),
signature: None,
Expand Down
3 changes: 2 additions & 1 deletion protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ mod test {
use crate::topic::IdentityHash;
use crate::types::PeerKind;
use crate::Topic;
use bytes::Bytes;
use libp2p_core::UpgradeInfo;
use libp2p_swarm::StreamProtocol;
use std::collections::hash_map::DefaultHasher;
Expand Down Expand Up @@ -961,7 +962,7 @@ mod test {
fn get_gossipsub_message() -> Message {
Message {
source: None,
data: vec![12, 34, 56],
data: Bytes::from(vec![12, 34, 56]),
sequence_number: None,
topic: Topic::<IdentityHash>::new("test").hash(),
}
Expand Down
3 changes: 2 additions & 1 deletion protocols/gossipsub/src/mcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ mod tests {
use super::*;
use crate::types::RawMessage;
use crate::{IdentTopic as Topic, TopicHash};
use bytes::Bytes;
use libp2p_identity::PeerId;

fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawMessage) {
Expand All @@ -235,7 +236,7 @@ mod tests {
};
let u8x: u8 = x as u8;
let source = Some(PeerId::random());
let data: Vec<u8> = vec![u8x];
let data = Bytes::from(vec![u8x]);
let sequence_number = Some(x);

let m = RawMessage {
Expand Down
4 changes: 3 additions & 1 deletion protocols/gossipsub/src/peer_score/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use bytes::Bytes;

/// A collection of unit tests mostly ported from the go implementation.
use super::*;

Expand All @@ -36,7 +38,7 @@ fn within_variance(value: f64, expected: f64, variance: f64) -> bool {
fn make_test_message(seq: u64) -> (MessageId, RawMessage) {
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![12, 34, 56],
data: Bytes::from(vec![12, 34, 56]),
sequence_number: Some(seq),
topic: Topic::new("test").hash(),
signature: None,
Expand Down
16 changes: 9 additions & 7 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::types::{
use crate::ValidationError;
use asynchronous_codec::{Decoder, Encoder, Framed};
use byteorder::{BigEndian, ByteOrder};
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use futures::future;
use futures::prelude::*;
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
Expand Down Expand Up @@ -294,7 +294,7 @@ impl Decoder for GossipsubCodec {
if let Some(validation_error) = invalid_kind.take() {
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number: None, // don't inform the application
topic: TopicHash::from_raw(message.topic),
signature: None, // don't inform the application
Expand All @@ -314,7 +314,7 @@ impl Decoder for GossipsubCodec {
// and source)
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number: None, // don't inform the application
topic: TopicHash::from_raw(message.topic),
signature: None, // don't inform the application
Expand All @@ -339,7 +339,7 @@ impl Decoder for GossipsubCodec {
);
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number: None, // don't inform the application
topic: TopicHash::from_raw(message.topic),
signature: message.signature, // don't inform the application
Expand All @@ -358,7 +358,7 @@ impl Decoder for GossipsubCodec {
debug!("Sequence number not present but expected");
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number: None, // don't inform the application
topic: TopicHash::from_raw(message.topic),
signature: message.signature, // don't inform the application
Expand All @@ -384,7 +384,7 @@ impl Decoder for GossipsubCodec {
debug!("Message source has an invalid PeerId");
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number,
topic: TopicHash::from_raw(message.topic),
signature: message.signature, // don't inform the application
Expand All @@ -408,7 +408,7 @@ impl Decoder for GossipsubCodec {
// This message has passed all validation, add it to the validated messages.
messages.push(RawMessage {
source,
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number,
topic: TopicHash::from_raw(message.topic),
signature: message.signature,
Expand Down Expand Up @@ -515,6 +515,7 @@ mod tests {
use crate::config::Config;
use crate::{Behaviour, ConfigBuilder};
use crate::{IdentTopic as Topic, Version};
use bytes::Bytes;
use libp2p_identity::Keypair;
use quickcheck::*;

Expand All @@ -532,6 +533,7 @@ mod tests {
let data = (0..g.gen_range(10..10024u32))
.map(|_| u8::arbitrary(g))
.collect::<Vec<_>>();
let data = Bytes::from(data);
let topic_id = TopicId::arbitrary(g).0;
Message(gs.build_raw_message(topic_id, data).unwrap())
}
Expand Down
14 changes: 4 additions & 10 deletions protocols/gossipsub/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
//! algorithms that can be topic-specific. Once the raw data is transformed the message-id is then
//! calculated, allowing for applications to employ message-id functions post compression.
use bytes::Bytes;

use crate::{Message, RawMessage, TopicHash};

/// A general trait of transforming a [`RawMessage`] into a [`Message`]. The
Expand All @@ -41,11 +43,7 @@ pub trait DataTransform {

/// Takes the data to be published (a topic and associated data) transforms the data. The
/// transformed data will then be used to create a [`crate::RawMessage`] to be sent to peers.
fn outbound_transform(
&self,
topic: &TopicHash,
data: Vec<u8>,
) -> Result<Vec<u8>, std::io::Error>;
fn outbound_transform(&self, topic: &TopicHash, data: Bytes) -> Result<Bytes, std::io::Error>;
}

/// The default transform, the raw data is propagated as is to the application layer gossipsub.
Expand All @@ -62,11 +60,7 @@ impl DataTransform for IdentityTransform {
})
}

fn outbound_transform(
&self,
_topic: &TopicHash,
data: Vec<u8>,
) -> Result<Vec<u8>, std::io::Error> {
fn outbound_transform(&self, _topic: &TopicHash, data: Bytes) -> Result<Bytes, std::io::Error> {
Ok(data)
}
}
Loading

0 comments on commit 25893b2

Please sign in to comment.