Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): use Bytes to cut down on allocations #4751

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/chat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#![doc = include_str!("../README.md")]

use futures::stream::StreamExt;
use libp2p::bytes::Bytes;
use libp2p::{gossipsub, mdns, noise, swarm::NetworkBehaviour, swarm::SwarmEvent, tcp, yamux};
use std::collections::hash_map::DefaultHasher;
use std::error::Error;
Expand Down Expand Up @@ -93,7 +94,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
Ok(Some(line)) = stdin.next_line() => {
if let Err(e) = swarm
.behaviour_mut().gossipsub
.publish(topic.clone(), line.as_bytes()) {
.publish(topic.clone(), Bytes::copy_from_slice(line.as_bytes())) {
println!("Publish error: {e:?}");
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-private/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
if let Err(e) = swarm
.behaviour_mut()
.gossipsub
.publish(gossipsub_topic.clone(), line.as_bytes())
.publish(gossipsub_topic.clone(), line.as_bytes().to_vec())
{
println!("Publish error: {e:?}");
}
Expand Down
2 changes: 2 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## 0.46.0 - unreleased

- Optimize memory usage through use of Bytes in various places. (`publish` require's `data: impl Into<Bytes>` (as opposed to `data: impl Into<Vec<u8>>`, `outbound_transform` now takes `data` as `Bytes`.
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
See [PR 4751](https://github.com/libp2p/rust-libp2p/pull/4751).
- Remove `fast_message_id_fn` mechanism from `Config`.
See [PR 4285](https://github.com/libp2p/rust-libp2p/pull/4285).
- Remove deprecated `gossipsub::Config::idle_timeout` in favor of `SwarmBuilder::idle_connection_timeout`.
Expand Down
10 changes: 6 additions & 4 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::{
time::Duration,
};

use bytes::Bytes;
use futures::StreamExt;
use futures_ticker::Ticker;
use log::{debug, error, trace, warn};
Expand Down Expand Up @@ -605,9 +606,10 @@ where
pub fn publish(
&mut self,
topic: impl Into<TopicHash>,
data: impl Into<Vec<u8>>,
data: impl Into<Bytes>,
) -> Result<MessageId, PublishError> {
let data = data.into();
// Convert the input topic into TopicHash
let topic = topic.into();

// Transform the data before building a raw_message.
Expand Down Expand Up @@ -2734,7 +2736,7 @@ where
pub(crate) fn build_raw_message(
&mut self,
topic: TopicHash,
data: Vec<u8>,
data: Bytes,
) -> Result<RawMessage, PublishError> {
match &mut self.publish_config {
PublishConfig::Signing {
Expand All @@ -2748,7 +2750,7 @@ where
let signature = {
let message = proto::Message {
from: Some(author.to_bytes()),
data: Some(data.clone()),
data: Some(data.to_vec()),
seqno: Some(sequence_number.to_be_bytes().to_vec()),
topic: topic.clone().into_string(),
signature: None,
Expand Down Expand Up @@ -3672,7 +3674,7 @@ mod local_test {
fn test_message() -> RawMessage {
RawMessage {
source: Some(PeerId::random()),
data: vec![0; 100],
data: Bytes::from(vec![0; 100]),
sequence_number: None,
topic: TopicHash::from_raw("test_topic"),
signature: None,
Expand Down
28 changes: 14 additions & 14 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ fn proto_to_message(rpc: &proto::RPC) -> Rpc {
for message in rpc.publish.into_iter() {
messages.push(RawMessage {
source: message.from.map(|x| PeerId::from_bytes(&x).unwrap()),
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number: message.seqno.map(|x| BigEndian::read_u64(&x)), // don't inform the application
topic: TopicHash::from_raw(message.topic),
signature: message.signature, // don't inform the application
Expand Down Expand Up @@ -1021,7 +1021,7 @@ fn test_handle_iwant_msg_cached() {

let raw_message = RawMessage {
source: Some(peers[11]),
data: vec![1, 2, 3, 4],
data: Bytes::from(vec![1, 2, 3, 4]),
sequence_number: Some(1u64),
topic: TopicHash::from_raw("topic"),
signature: None,
Expand Down Expand Up @@ -1079,7 +1079,7 @@ fn test_handle_iwant_msg_cached_shifted() {
for shift in 1..10 {
let raw_message = RawMessage {
source: Some(peers[11]),
data: vec![1, 2, 3, 4],
data: Bytes::from(vec![1, 2, 3, 4]),
sequence_number: Some(shift),
topic: TopicHash::from_raw("topic"),
signature: None,
Expand Down Expand Up @@ -1552,7 +1552,7 @@ fn do_forward_messages_to_explicit_peers() {

let message = RawMessage {
source: Some(peers[1]),
data: vec![12],
data: Bytes::from(vec![12]),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -1696,7 +1696,7 @@ fn no_gossip_gets_sent_to_explicit_peers() {

let message = RawMessage {
source: Some(peers[1]),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -2166,7 +2166,7 @@ fn test_gossip_to_at_least_gossip_lazy_peers() {
//receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -2211,7 +2211,7 @@ fn test_gossip_to_at_most_gossip_factor_peers() {
//receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topic_hashes[0].clone(),
signature: None,
Expand Down Expand Up @@ -2575,7 +2575,7 @@ fn test_do_not_gossip_to_peers_below_gossip_threshold() {
// Receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topics[0].clone(),
signature: None,
Expand Down Expand Up @@ -2652,7 +2652,7 @@ fn test_iwant_msg_from_peer_below_gossip_threshold_gets_ignored() {
// Receive message
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topics[0].clone(),
signature: None,
Expand Down Expand Up @@ -2743,7 +2743,7 @@ fn test_ihave_msg_from_peer_below_gossip_threshold_gets_ignored() {
//message that other peers have
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![],
data: Bytes::new(),
sequence_number: Some(0),
topic: topics[0].clone(),
signature: None,
Expand Down Expand Up @@ -2928,7 +2928,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message1 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4],
data: Bytes::from(vec![1, 2, 3, 4]),
sequence_number: Some(1u64),
topic: topics[0].clone(),
signature: None,
Expand All @@ -2938,7 +2938,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message2 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5],
data: Bytes::from(vec![1, 2, 3, 4, 5]),
sequence_number: Some(2u64),
topic: topics[0].clone(),
signature: None,
Expand All @@ -2948,7 +2948,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message3 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5, 6],
data: Bytes::from(vec![1, 2, 3, 4, 5, 6]),
sequence_number: Some(3u64),
topic: topics[0].clone(),
signature: None,
Expand All @@ -2958,7 +2958,7 @@ fn test_ignore_rpc_from_peers_below_graylist_threshold() {

let raw_message4 = RawMessage {
source: Some(PeerId::random()),
data: vec![1, 2, 3, 4, 5, 6, 7],
data: Bytes::from(vec![1, 2, 3, 4, 5, 6, 7]),
sequence_number: Some(4u64),
topic: topics[0].clone(),
signature: None,
Expand Down
3 changes: 2 additions & 1 deletion protocols/gossipsub/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -862,6 +862,7 @@ mod test {
use crate::topic::IdentityHash;
use crate::types::PeerKind;
use crate::Topic;
use bytes::Bytes;
use libp2p_core::UpgradeInfo;
use libp2p_swarm::StreamProtocol;
use std::collections::hash_map::DefaultHasher;
Expand Down Expand Up @@ -961,7 +962,7 @@ mod test {
fn get_gossipsub_message() -> Message {
Message {
source: None,
data: vec![12, 34, 56],
data: Bytes::from(vec![12, 34, 56]),
sequence_number: None,
topic: Topic::<IdentityHash>::new("test").hash(),
}
Expand Down
3 changes: 2 additions & 1 deletion protocols/gossipsub/src/mcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ mod tests {
use super::*;
use crate::types::RawMessage;
use crate::{IdentTopic as Topic, TopicHash};
use bytes::Bytes;
use libp2p_identity::PeerId;

fn gen_testm(x: u64, topic: TopicHash) -> (MessageId, RawMessage) {
Expand All @@ -235,7 +236,7 @@ mod tests {
};
let u8x: u8 = x as u8;
let source = Some(PeerId::random());
let data: Vec<u8> = vec![u8x];
let data = Bytes::from(vec![u8x]);
let sequence_number = Some(x);

let m = RawMessage {
Expand Down
4 changes: 3 additions & 1 deletion protocols/gossipsub/src/peer_score/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use bytes::Bytes;

/// A collection of unit tests mostly ported from the go implementation.
use super::*;

Expand All @@ -36,7 +38,7 @@ fn within_variance(value: f64, expected: f64, variance: f64) -> bool {
fn make_test_message(seq: u64) -> (MessageId, RawMessage) {
let raw_message = RawMessage {
source: Some(PeerId::random()),
data: vec![12, 34, 56],
data: Bytes::from(vec![12, 34, 56]),
sequence_number: Some(seq),
topic: Topic::new("test").hash(),
signature: None,
Expand Down
17 changes: 9 additions & 8 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::types::{
use crate::ValidationError;
use asynchronous_codec::{Decoder, Encoder, Framed};
use byteorder::{BigEndian, ByteOrder};
use bytes::BytesMut;
use bytes::{Bytes, BytesMut};
use futures::future;
use futures::prelude::*;
use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
Expand Down Expand Up @@ -294,7 +294,7 @@ impl Decoder for GossipsubCodec {
if let Some(validation_error) = invalid_kind.take() {
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number: None, // don't inform the application
topic: TopicHash::from_raw(message.topic),
signature: None, // don't inform the application
Expand All @@ -314,7 +314,7 @@ impl Decoder for GossipsubCodec {
// and source)
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number: None, // don't inform the application
topic: TopicHash::from_raw(message.topic),
signature: None, // don't inform the application
Expand All @@ -339,7 +339,7 @@ impl Decoder for GossipsubCodec {
);
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number: None, // don't inform the application
topic: TopicHash::from_raw(message.topic),
signature: message.signature, // don't inform the application
Expand All @@ -358,7 +358,7 @@ impl Decoder for GossipsubCodec {
debug!("Sequence number not present but expected");
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number: None, // don't inform the application
topic: TopicHash::from_raw(message.topic),
signature: message.signature, // don't inform the application
Expand All @@ -384,7 +384,7 @@ impl Decoder for GossipsubCodec {
debug!("Message source has an invalid PeerId");
let message = RawMessage {
source: None, // don't bother inform the application
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number,
topic: TopicHash::from_raw(message.topic),
signature: message.signature, // don't inform the application
Expand All @@ -408,7 +408,7 @@ impl Decoder for GossipsubCodec {
// This message has passed all validation, add it to the validated messages.
messages.push(RawMessage {
source,
data: message.data.unwrap_or_default(),
data: Bytes::from(message.data.unwrap_or_default()),
sequence_number,
topic: TopicHash::from_raw(message.topic),
signature: message.signature,
Expand Down Expand Up @@ -515,6 +515,7 @@ mod tests {
use crate::config::Config;
use crate::{Behaviour, ConfigBuilder};
use crate::{IdentTopic as Topic, Version};
use bytes::Bytes;
use libp2p_identity::Keypair;
use quickcheck::*;

Expand All @@ -531,7 +532,7 @@ mod tests {
Behaviour::new(crate::MessageAuthenticity::Signed(keypair.0), config).unwrap();
let data = (0..g.gen_range(10..10024u32))
.map(|_| u8::arbitrary(g))
.collect::<Vec<_>>();
.collect::<Bytes>();
let topic_id = TopicId::arbitrary(g).0;
Message(gs.build_raw_message(topic_id, data).unwrap())
}
Expand Down
14 changes: 4 additions & 10 deletions protocols/gossipsub/src/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
//! algorithms that can be topic-specific. Once the raw data is transformed the message-id is then
//! calculated, allowing for applications to employ message-id functions post compression.

use bytes::Bytes;

use crate::{Message, RawMessage, TopicHash};

/// A general trait of transforming a [`RawMessage`] into a [`Message`]. The
Expand All @@ -41,11 +43,7 @@ pub trait DataTransform {

/// Takes the data to be published (a topic and associated data) transforms the data. The
/// transformed data will then be used to create a [`crate::RawMessage`] to be sent to peers.
fn outbound_transform(
thomaseizinger marked this conversation as resolved.
Show resolved Hide resolved
&self,
topic: &TopicHash,
data: Vec<u8>,
) -> Result<Vec<u8>, std::io::Error>;
fn outbound_transform(&self, topic: &TopicHash, data: Bytes) -> Result<Bytes, std::io::Error>;
}

/// The default transform, the raw data is propagated as is to the application layer gossipsub.
Expand All @@ -62,11 +60,7 @@ impl DataTransform for IdentityTransform {
})
}

fn outbound_transform(
&self,
_topic: &TopicHash,
data: Vec<u8>,
) -> Result<Vec<u8>, std::io::Error> {
fn outbound_transform(&self, _topic: &TopicHash, data: Bytes) -> Result<Bytes, std::io::Error> {
Ok(data)
}
}
Loading
Loading