Skip to content

Commit

Permalink
BACKPORT-CONFLICT
Browse files Browse the repository at this point in the history
  • Loading branch information
lexnv authored and github-actions[bot] committed Jan 15, 2025
1 parent 6ae6bbc commit 0f93f13
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 9 deletions.
6 changes: 6 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -815,8 +815,13 @@ linked-hash-map = { version = "0.5.4" }
linked_hash_set = { version = "0.1.4" }
linregress = { version = "0.5.1" }
lite-json = { version = "0.2.0", default-features = false }
<<<<<<< HEAD
litep2p = { version = "0.6.2" }
log = { version = "0.4.21", default-features = false }
=======
litep2p = { version = "0.9.0", features = ["websocket"] }
log = { version = "0.4.22", default-features = false }
>>>>>>> 77c78e1 (litep2p: Provide partial results to speedup GetRecord queries (#7099))
macro_magic = { version = "0.5.1" }
maplit = { version = "1.0.2" }
memmap2 = { version = "0.9.3" }
Expand Down
16 changes: 16 additions & 0 deletions prdoc/pr_7099.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: Provide partial results to speedup GetRecord queries

doc:
- audience: Node Dev
description: |
This PR provides the partial results of the GetRecord kademlia query.

This significantly improves the authority discovery records, from ~37 minutes to ~2/3 minutes.
In contrast, libp2p discovers authority records in around ~10 minutes.

The authority discovery was slow because litep2p provided the records only after the Kademlia query was completed. A normal Kademlia query completes in around 40 seconds to a few minutes.
In this PR, partial records are provided as soon as they are discovered from the network.

crates:
- name: sc-network
bump: patch
35 changes: 29 additions & 6 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@ use litep2p::{
libp2p::{
identify::{Config as IdentifyConfig, IdentifyEvent},
kademlia::{
<<<<<<< HEAD
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
Record, RecordKey, RecordsType,
=======
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, PeerRecord, QueryId,
Quorum, Record, RecordKey,
>>>>>>> 77c78e1 (litep2p: Provide partial results to speedup GetRecord queries (#7099))
},
ping::{Config as PingConfig, PingEvent},
},
Expand Down Expand Up @@ -119,13 +125,19 @@ pub enum DiscoveryEvent {
address: Multiaddr,
},

/// Record was found from the DHT.
/// `GetRecord` query succeeded.
GetRecordSuccess {
/// Query ID.
query_id: QueryId,
},

/// Records.
records: RecordsType,
/// Record was found from the DHT.
GetRecordPartialResult {
/// Query ID.
query_id: QueryId,

/// Record.
record: PeerRecord,
},

/// Record was successfully stored on the DHT.
Expand Down Expand Up @@ -496,13 +508,24 @@ impl Stream for Discovery {
peers: peers.into_iter().collect(),
}))
},
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id, records })) => {
Poll::Ready(Some(KademliaEvent::GetRecordSuccess { query_id })) => {
log::trace!(
target: LOG_TARGET,
"`GET_RECORD` succeeded for {query_id:?}: {records:?}",
"`GET_RECORD` succeeded for {query_id:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id, records }));
return Poll::Ready(Some(DiscoveryEvent::GetRecordSuccess { query_id }));
},
Poll::Ready(Some(KademliaEvent::GetRecordPartialResult { query_id, record })) => {
log::trace!(
target: LOG_TARGET,
"`GET_RECORD` intermediary succeeded for {query_id:?}: {record:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetRecordPartialResult {
query_id,
record,
}));
},
Poll::Ready(Some(KademliaEvent::PutRecordSucess { query_id, key: _ })) =>
return Poll::Ready(Some(DiscoveryEvent::PutRecordSuccess { query_id })),
Expand Down
46 changes: 43 additions & 3 deletions substrate/client/network/src/litep2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use litep2p::{
protocol::{
libp2p::{
bitswap::Config as BitswapConfig,
kademlia::{QueryId, Record, RecordsType},
kademlia::{QueryId, Record},
},
request_response::ConfigBuilder as RequestResponseConfigBuilder,
},
Expand Down Expand Up @@ -804,18 +804,56 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.peerstore_handle.add_known_peer(peer.into());
}
}
<<<<<<< HEAD
Some(DiscoveryEvent::GetRecordSuccess { query_id, records }) => {
match self.pending_get_values.remove(&query_id) {
None => log::warn!(
target: LOG_TARGET,
"`GET_VALUE` succeeded for a non-existent query",
),
Some((key, started)) => {
=======
Some(DiscoveryEvent::GetRecordPartialResult { query_id, record }) => {
if !self.pending_queries.contains_key(&query_id) {
log::error!(
target: LOG_TARGET,
"Missing/invalid pending query for `GET_VALUE` partial result: {query_id:?}"
);

continue
}

let peer_id: sc_network_types::PeerId = record.peer.into();
let record = PeerRecord {
record: P2PRecord {
key: record.record.key.to_vec().into(),
value: record.record.value,
publisher: record.record.publisher.map(|peer_id| {
let peer_id: sc_network_types::PeerId = peer_id.into();
peer_id.into()
}),
expires: record.record.expires,
},
peer: Some(peer_id.into()),
};

self.event_streams.send(
Event::Dht(
DhtEvent::ValueFound(
record.into()
)
)
);
}
Some(DiscoveryEvent::GetRecordSuccess { query_id }) => {
match self.pending_queries.remove(&query_id) {
Some(KadQuery::GetValue(key, started)) => {
>>>>>>> 77c78e1 (litep2p: Provide partial results to speedup GetRecord queries (#7099))
log::trace!(
target: LOG_TARGET,
"`GET_VALUE` for {:?} ({query_id:?}) succeeded",
key,
"`GET_VALUE` for {key:?} ({query_id:?}) succeeded",
);
<<<<<<< HEAD

let value_found = match records {
RecordsType::LocalStore(record) => vec![
Expand All @@ -829,6 +867,8 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkBackend<B, H> for Litep2pNetworkBac
self.event_streams.send(Event::Dht(
DhtEvent::ValueFound(value_found)
));
=======
>>>>>>> 77c78e1 (litep2p: Provide partial results to speedup GetRecord queries (#7099))

if let Some(ref metrics) = self.metrics {
metrics
Expand Down

0 comments on commit 0f93f13

Please sign in to comment.