From c812dd418eac871284f755c652f9e3649c3be20f Mon Sep 17 00:00:00 2001 From: Josh Wilson Date: Fri, 27 Oct 2023 21:24:00 +0200 Subject: [PATCH] feat(kad): convert kad record.value to Bytes. This should help avoid potentially costly clones --- examples/distributed-key-value-store/src/main.rs | 3 ++- examples/ipfs-kad/src/main.rs | 7 +++++-- protocols/kad/CHANGELOG.md | 6 +++++- protocols/kad/src/behaviour/test.rs | 9 +++++---- protocols/kad/src/handler.rs | 5 +++-- protocols/kad/src/protocol.rs | 12 ++++++------ protocols/kad/src/record.rs | 6 +++--- 7 files changed, 29 insertions(+), 19 deletions(-) diff --git a/examples/distributed-key-value-store/src/main.rs b/examples/distributed-key-value-store/src/main.rs index 404333f3d20..641ee39a985 100644 --- a/examples/distributed-key-value-store/src/main.rs +++ b/examples/distributed-key-value-store/src/main.rs @@ -22,6 +22,7 @@ use async_std::io; use futures::{prelude::*, select}; +use libp2p::bytes::Bytes; use libp2p::kad; use libp2p::kad::store::MemoryStore; use libp2p::kad::Mode; @@ -186,7 +187,7 @@ fn handle_input_line(kademlia: &mut kad::Behaviour, line: String) { }; let value = { match args.next() { - Some(value) => value.as_bytes().to_vec(), + Some(value) => Bytes::from(value.as_bytes().to_owned()), None => { eprintln!("Expected value"); return; diff --git a/examples/ipfs-kad/src/main.rs b/examples/ipfs-kad/src/main.rs index 95921d6fa35..4560427ed35 100644 --- a/examples/ipfs-kad/src/main.rs +++ b/examples/ipfs-kad/src/main.rs @@ -27,6 +27,7 @@ use std::time::{Duration, Instant}; use anyhow::{bail, Result}; use clap::Parser; use futures::StreamExt; +use libp2p::bytes::Bytes; use libp2p::swarm::{StreamProtocol, SwarmEvent}; use libp2p::{bytes::BufMut, identity, kad, noise, tcp, yamux, PeerId}; use tracing_subscriber::EnvFilter; @@ -91,8 +92,10 @@ async fn main() -> Result<()> { pk_record_key.put_slice("/pk/".as_bytes()); pk_record_key.put_slice(swarm.local_peer_id().to_bytes().as_slice()); - let mut pk_record = - kad::Record::new(pk_record_key, local_key.public().encode_protobuf()); + let mut pk_record = kad::Record::new( + pk_record_key, + Bytes::from(local_key.public().encode_protobuf()), + ); pk_record.publisher = Some(*swarm.local_peer_id()); pk_record.expires = Some(Instant::now().add(Duration::from_secs(60))); diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index d0ab7986aad..819c50377ef 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,10 +1,14 @@ +## 0.48.0 - Unreleased +- Set Record.value to be `Bytes` instead of `Vec` to make clones lighter + See [PR 4753](https://github.com/libp2p/rust-libp2p/pull/4753). + ## 0.47.0 - Expose a kad query facility allowing specify num_results dynamicly. See [PR 5555](https://github.com/libp2p/rust-libp2p/pull/5555). - Add `mode` getter on `Behaviour`. See [PR 5573](https://github.com/libp2p/rust-libp2p/pull/5573). - + ## 0.46.2 diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 7409168ac2a..81bd4fb3610 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -24,6 +24,7 @@ use super::*; use crate::record::{store::MemoryStore, Key}; use crate::{K_VALUE, PROTOCOL_NAME, SHA_256_MH}; +use bytes::Bytes; use futures::{executor::block_on, future::poll_fn, prelude::*}; use futures_timer::Delay; use libp2p_core::{ @@ -821,7 +822,7 @@ fn get_record() { .map(|(_addr, swarm)| swarm) .collect::>(); - let record = Record::new(random_multihash(), vec![4, 5, 6]); + let record = Record::new(random_multihash(), Bytes::from(vec![4, 5, 6])); swarms[2].behaviour_mut().store.put(record.clone()).unwrap(); let qid = swarms[0].behaviour_mut().get_record(record.key.clone()); @@ -874,7 +875,7 @@ fn get_record_many() { .collect::>(); let num_results = 10; - let record = Record::new(random_multihash(), vec![4, 5, 6]); + let record = Record::new(random_multihash(), Bytes::from(vec![4, 5, 6])); for swarm in swarms.iter_mut().take(num_nodes) { swarm.behaviour_mut().store.put(record.clone()).unwrap(); @@ -1173,8 +1174,8 @@ fn disjoint_query_does_not_finish_before_all_paths_did() { Multihash::<64>::wrap(SHA_256_MH, &thread_rng().gen::<[u8; 32]>()) .expect("32 array to fit into 64 byte multihash"), ); - let record_bob = Record::new(key.clone(), b"bob".to_vec()); - let record_trudy = Record::new(key.clone(), b"trudy".to_vec()); + let record_bob = Record::new(key.clone(), Bytes::from(b"bob".to_vec())); + let record_trudy = Record::new(key.clone(), Bytes::from(b"trudy".to_vec())); // Make `bob` and `trudy` aware of their version of the record searched by // `alice`. diff --git a/protocols/kad/src/handler.rs b/protocols/kad/src/handler.rs index 17c483da709..df25f43fd9b 100644 --- a/protocols/kad/src/handler.rs +++ b/protocols/kad/src/handler.rs @@ -24,6 +24,7 @@ use crate::protocol::{ }; use crate::record::{self, Record}; use crate::QueryId; +use bytes::Bytes; use either::Either; use futures::channel::oneshot; use futures::prelude::*; @@ -267,7 +268,7 @@ pub enum HandlerEvent { /// The key of the stored record. key: record::Key, /// The value of the stored record. - value: Vec, + value: Bytes, /// The user data passed to the `PutValue`. query_id: QueryId, }, @@ -405,7 +406,7 @@ pub enum HandlerIn { /// Key of the value that was put. key: record::Key, /// Value that was put. - value: Vec, + value: Bytes, /// Identifier of the request that was made by the remote. request_id: RequestId, }, diff --git a/protocols/kad/src/protocol.rs b/protocols/kad/src/protocol.rs index 9d2ef56f5d8..81358cc2caa 100644 --- a/protocols/kad/src/protocol.rs +++ b/protocols/kad/src/protocol.rs @@ -29,7 +29,7 @@ use crate::proto; use crate::record::{self, Record}; use asynchronous_codec::{Decoder, Encoder, Framed}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use futures::prelude::*; use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo}; use libp2p_core::Multiaddr; @@ -335,7 +335,7 @@ pub enum KadResponseMsg { /// The key of the record. key: record::Key, /// Value of the record. - value: Vec, + value: Bytes, }, } @@ -443,7 +443,7 @@ fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message { key: key.to_vec(), record: Some(proto::Record { key: key.to_vec(), - value, + value: value.to_vec(), ..proto::Record::default() }), ..proto::Message::default() @@ -549,7 +549,7 @@ fn proto_to_resp_msg(message: proto::Message) -> Result Result Result { let key = record::Key::from(record.key); - let value = record.value; + let value = Bytes::from(record.value); let publisher = if !record.publisher.is_empty() { PeerId::from_bytes(&record.publisher) @@ -588,7 +588,7 @@ fn record_from_proto(record: proto::Record) -> Result { fn record_to_proto(record: Record) -> proto::Record { proto::Record { key: record.key.to_vec(), - value: record.value, + value: record.value.to_vec(), publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(), ttl: record .expires diff --git a/protocols/kad/src/record.rs b/protocols/kad/src/record.rs index cb7c4b866fc..eca32f712f0 100644 --- a/protocols/kad/src/record.rs +++ b/protocols/kad/src/record.rs @@ -78,7 +78,7 @@ pub struct Record { /// Key of the record. pub key: Key, /// Value of the record. - pub value: Vec, + pub value: Bytes, /// The (original) publisher of the record. pub publisher: Option, /// The expiration time as measured by a local, monotonic clock. @@ -87,7 +87,7 @@ pub struct Record { impl Record { /// Creates a new record for insertion into the DHT. - pub fn new(key: K, value: Vec) -> Self + pub fn new(key: K, value: Bytes) -> Self where K: Into, { @@ -176,7 +176,7 @@ mod tests { fn arbitrary(g: &mut Gen) -> Record { Record { key: Key::arbitrary(g), - value: Vec::arbitrary(g), + value: Bytes::from(Vec::arbitrary(g)), publisher: if bool::arbitrary(g) { Some(PeerId::random()) } else {