diff --git a/Cargo.lock b/Cargo.lock index 31df58e8ec4..989259ddb7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2680,6 +2680,7 @@ dependencies = [ "libp2p-metrics", "libp2p-mplex", "libp2p-noise", + "libp2p-peer-store", "libp2p-ping", "libp2p-plaintext", "libp2p-pnet", @@ -3106,6 +3107,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "libp2p-peer-store" +version = "0.1.0" +dependencies = [ + "futures-timer", + "futures-util", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "lru", +] + [[package]] name = "libp2p-perf" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index c77768db311..11a209ab4c5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ members = [ "misc/memory-connection-limits", "misc/metrics", "misc/multistream-select", + "misc/peer-store", "misc/quick-protobuf-codec", "misc/quickcheck-ext", "misc/rw-stream-sink", @@ -89,6 +90,7 @@ libp2p-memory-connection-limits = { version = "0.3.1", path = "misc/memory-conne libp2p-metrics = { version = "0.15.0", path = "misc/metrics" } libp2p-mplex = { version = "0.42.0", path = "muxers/mplex" } libp2p-noise = { version = "0.45.1", path = "transports/noise" } +libp2p-peer-store = { version = "0.1.0", path = "misc/peer-store" } libp2p-perf = { version = "0.4.0", path = "protocols/perf" } libp2p-ping = { version = "0.45.1", path = "protocols/ping" } libp2p-plaintext = { version = "0.42.0", path = "transports/plaintext" } @@ -100,7 +102,7 @@ libp2p-request-response = { version = "0.28.0", path = "protocols/request-respon libp2p-server = { version = "0.12.8", path = "misc/server" } libp2p-stream = { version = "0.2.0-alpha.1", path = "protocols/stream" } libp2p-swarm = { version = "0.45.2", path = "swarm" } -libp2p-swarm-derive = { version = "=0.35.0", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required. +libp2p-swarm-derive = { version = "=0.35.0", path = "swarm-derive" } # `libp2p-swarm-derive` may not be compatible with different `libp2p-swarm` non-breaking releases. E.g. `libp2p-swarm` might introduce a new enum variant `FromSwarm` (which is `#[non-exhaustive]`) in a non-breaking release. Older versions of `libp2p-swarm-derive` would not forward this enum variant within the `NetworkBehaviour` hierarchy. Thus the version pinning is required. libp2p-swarm-test = { version = "0.5.0", path = "swarm-test" } libp2p-tcp = { version = "0.42.0", path = "transports/tcp" } libp2p-tls = { version = "0.5.0", path = "transports/tls" } diff --git a/libp2p/CHANGELOG.md b/libp2p/CHANGELOG.md index 8b7bf0ff55f..5579af66b5a 100644 --- a/libp2p/CHANGELOG.md +++ b/libp2p/CHANGELOG.md @@ -1,3 +1,8 @@ +## 0.55.0(unreleased) + +- Introduce `libp2p-peer-store`. + See [PR 5724](https://github.com/libp2p/rust-libp2p/pull/5724). + ## 0.54.2 - Add `with_connection_timeout` on `SwarmBuilder` to allow configuration of the connection_timeout parameter. diff --git a/libp2p/Cargo.toml b/libp2p/Cargo.toml index 3d44e0bc43c..ac05152f6e4 100644 --- a/libp2p/Cargo.toml +++ b/libp2p/Cargo.toml @@ -68,6 +68,7 @@ mdns = ["dep:libp2p-mdns"] memory-connection-limits = ["dep:libp2p-memory-connection-limits"] metrics = ["dep:libp2p-metrics"] noise = ["dep:libp2p-noise"] +# peer-store = ["dep:libp2p-peer-store"] ping = ["dep:libp2p-ping", "libp2p-metrics?/ping"] plaintext = ["dep:libp2p-plaintext"] pnet = ["dep:libp2p-pnet"] @@ -110,6 +111,7 @@ libp2p-identity = { workspace = true, features = ["rand"] } libp2p-kad = { workspace = true, optional = true } libp2p-metrics = { workspace = true, optional = true } libp2p-noise = { workspace = true, optional = true } +libp2p-peer-store = { workspace = true } libp2p-ping = { workspace = true, optional = true } libp2p-plaintext = { workspace = true, optional = true } libp2p-pnet = { workspace = true, optional = true } diff --git a/libp2p/src/lib.rs b/libp2p/src/lib.rs index 47e1142d0e9..13193b48eb6 100644 --- a/libp2p/src/lib.rs +++ b/libp2p/src/lib.rs @@ -81,6 +81,8 @@ pub use libp2p_metrics as metrics; #[cfg(feature = "noise")] #[doc(inline)] pub use libp2p_noise as noise; +#[doc(inline)] +pub use libp2p_peer_store as peer_store; #[cfg(feature = "ping")] #[doc(inline)] pub use libp2p_ping as ping; diff --git a/misc/peer-store/CHANGELOG.md b/misc/peer-store/CHANGELOG.md new file mode 100644 index 00000000000..8353d77b5cf --- /dev/null +++ b/misc/peer-store/CHANGELOG.md @@ -0,0 +1,4 @@ +## 0.1.0 + +- Introduce `libp2p-peer-store`. + See [PR 5724](https://github.com/libp2p/rust-libp2p/pull/5724). diff --git a/misc/peer-store/Cargo.toml b/misc/peer-store/Cargo.toml new file mode 100644 index 00000000000..94b4ce4ffd5 --- /dev/null +++ b/misc/peer-store/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "libp2p-peer-store" +edition = "2021" +version = "0.1.0" +authors = ["drHuangMHT "] +license = "MIT" +repository = "https://github.com/libp2p/rust-libp2p" +rust-version.workspace = true + +[dependencies] +libp2p-core = { workspace = true } +libp2p-swarm = { workspace = true } +lru = "*" +futures-timer = "*" +futures-util = "*" + +[dev-dependencies] +libp2p-identity = { workspace = true, features = ["rand"] } + +[lints] +workspace = true diff --git a/misc/peer-store/src/behaviour.rs b/misc/peer-store/src/behaviour.rs new file mode 100644 index 00000000000..107fbd252fc --- /dev/null +++ b/misc/peer-store/src/behaviour.rs @@ -0,0 +1,179 @@ +use std::{collections::VecDeque, task::Poll}; + +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::{dummy, NetworkBehaviour}; + +use crate::store::{AddressSource, Store}; + +/// Events generated by [`Behaviour`] and emitted back to the [`libp2p_swarm::Swarm`]. +pub enum Event { + /// The peer's record has been updated. + /// Manually updating a record will always emit this event + /// even if it provides no new information. + RecordUpdated { + peer: PeerId, + }, + Store(S::ToSwarm), +} + +pub struct Behaviour { + store: S, + /// Pending Events to be emitted back to the [`libp2p_swarm::Swarm`]. + pending_events: VecDeque>, +} + +impl<'a, S> Behaviour +where + S: Store + 'static, +{ + pub fn new(store: S) -> Self { + Self { + store, + pending_events: VecDeque::new(), + } + } + + /// Try to get all observed address of the given peer. + /// Returns `None` when the peer is not in the store. + pub fn address_of_peer<'b>( + &'a self, + peer: &'b PeerId, + ) -> Option + use<'a, 'b, S>> { + self.store.addresses_of_peer(peer) + } + + /// Manually update a record. + /// This will always emit an `Event::RecordUpdated`. + pub fn update_address(&mut self, peer: &PeerId, address: &Multiaddr) { + self.store + .update_address(peer, address, AddressSource::Manual, false); + self.pending_events + .push_back(Event::RecordUpdated { peer: *peer }); + } + + /// Should be called when other protocol emits a [`PeerRecord`](libp2p_core::PeerRecord). + /// This will always emit an `Event::RecordUpdated`. + pub fn on_signed_peer_record( + &mut self, + signed_record: &libp2p_core::PeerRecord, + source: AddressSource, + ) { + self.store + .update_certified_address(signed_record, source, false); + self.pending_events.push_back(Event::RecordUpdated { + peer: signed_record.peer_id(), + }); + } + + /// Get a immutable reference to the internal store. + pub fn store(&self) -> &S { + &self.store + } + + /// Get a mutable reference to the internal store. + pub fn store_mut(&mut self) -> &mut S { + &mut self.store + } + + fn on_address_update( + &mut self, + peer: &PeerId, + address: &Multiaddr, + source: AddressSource, + should_expire: bool, + ) { + if self + .store + .update_address(peer, address, source, should_expire) + { + self.pending_events + .push_back(Event::RecordUpdated { peer: *peer }); + } + } + fn handle_store_event(&mut self, event: super::store::Event) { + use super::store::Event::*; + match event { + RecordUpdated(peer) => self.pending_events.push_back(Event::RecordUpdated { peer }), + } + } +} + +impl NetworkBehaviour for Behaviour +where + S: Store + 'static, + ::ToSwarm: Send + Sync, +{ + type ConnectionHandler = dummy::ConnectionHandler; + + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + peer: libp2p_core::PeerId, + _local_addr: &libp2p_core::Multiaddr, + remote_addr: &libp2p_core::Multiaddr, + ) -> Result, libp2p_swarm::ConnectionDenied> { + self.on_address_update(&peer, remote_addr, AddressSource::DirectConnection, false); + Ok(dummy::ConnectionHandler) + } + + fn handle_pending_outbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + maybe_peer: Option, + _addresses: &[Multiaddr], + _effective_role: libp2p_core::Endpoint, + ) -> Result, libp2p_swarm::ConnectionDenied> { + if maybe_peer.is_none() { + return Ok(Vec::new()); + } + let peer = maybe_peer.expect("already handled"); + Ok(self + .store + .addresses_of_peer(&peer) + .map(|i| i.cloned().collect()) + .unwrap_or_default()) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: libp2p_swarm::ConnectionId, + peer: libp2p_core::PeerId, + addr: &libp2p_core::Multiaddr, + _role_override: libp2p_core::Endpoint, + _port_use: libp2p_core::transport::PortUse, + ) -> Result, libp2p_swarm::ConnectionDenied> { + self.on_address_update(&peer, addr, AddressSource::DirectConnection, false); + Ok(dummy::ConnectionHandler) + } + + fn on_swarm_event(&mut self, event: libp2p_swarm::FromSwarm) { + if let Some(ev) = self.store.on_swarm_event(&event) { + self.handle_store_event(ev); + }; + } + + fn on_connection_handler_event( + &mut self, + _peer_id: libp2p_core::PeerId, + _connection_id: libp2p_swarm::ConnectionId, + _event: libp2p_swarm::THandlerOutEvent, + ) { + unreachable!("No event will be produced by a dummy handler.") + } + + fn poll( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll>> + { + if let Some(ev) = self.pending_events.pop_front() { + return Poll::Ready(libp2p_swarm::ToSwarm::GenerateEvent(ev)); + } + if let Some(ev) = self.store.poll(cx) { + self.pending_events.push_back(Event::Store(ev)); + }; + Poll::Pending + } +} diff --git a/misc/peer-store/src/lib.rs b/misc/peer-store/src/lib.rs new file mode 100644 index 00000000000..819fe41cd2f --- /dev/null +++ b/misc/peer-store/src/lib.rs @@ -0,0 +1,6 @@ +mod behaviour; +pub mod memory_store; +mod store; + +pub use behaviour::{Behaviour, Event}; +pub use store::Store; diff --git a/misc/peer-store/src/memory_store.rs b/misc/peer-store/src/memory_store.rs new file mode 100644 index 00000000000..9d8c27be6c0 --- /dev/null +++ b/misc/peer-store/src/memory_store.rs @@ -0,0 +1,435 @@ +use std::{ + collections::{HashMap, VecDeque}, + num::NonZeroUsize, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use futures_timer::Delay; +use futures_util::FutureExt; +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::FromSwarm; +use record::PeerRecord; + +use super::Store; +use crate::{store::AddressSource, Behaviour}; + +#[derive(Debug, Clone)] +pub enum Event { + CustomDataUpdated(PeerId), +} + +/// A in-memory store. +#[derive(Default)] +pub struct MemoryStore { + /// The internal store. + records: HashMap>, + pending_events: VecDeque, + record_ttl_timer: Option, + config: Config, +} + +impl MemoryStore { + pub fn new(config: Config) -> Self { + Self { + config, + records: HashMap::new(), + record_ttl_timer: None, + pending_events: VecDeque::default(), + } + } + + fn check_record_ttl(&mut self) { + let now = Instant::now(); + for r in &mut self.records.values_mut() { + r.check_addresses_ttl(now, self.config.record_ttl); + } + } + + pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> { + self.records.get(peer).and_then(|r| r.get_custom_data()) + } + pub fn take_custom_data(&mut self, peer: &PeerId) -> Option { + self.records + .get_mut(peer) + .and_then(|r| r.take_custom_data()) + } + pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) { + if let Some(r) = self.records.get_mut(peer) { + return r.insert_custom_data(custom_data); + } + let mut new_record = PeerRecord::new(self.config.record_capacity); + new_record.insert_custom_data(custom_data); + self.records.insert(*peer, new_record); + self.pending_events + .push_back(Event::CustomDataUpdated(*peer)); + } +} + +impl Store for MemoryStore { + type ToSwarm = Event; + + fn update_address( + &mut self, + peer: &PeerId, + address: &Multiaddr, + source: AddressSource, + should_expire: bool, + ) -> bool { + if let Some(record) = self.records.get_mut(peer) { + return record.update_address(address, source, should_expire); + } + let mut new_record = record::PeerRecord::new(self.config.record_capacity); + new_record.update_address(address, source, should_expire); + self.records.insert(*peer, new_record); + true + } + + fn update_certified_address( + &mut self, + signed_record: &libp2p_core::PeerRecord, + source: AddressSource, + should_expire: bool, + ) -> bool { + let peer = signed_record.peer_id(); + if let Some(record) = self.records.get_mut(&peer) { + return record.update_certified_address(signed_record, source, should_expire); + } + let mut new_record = record::PeerRecord::new(self.config.record_capacity); + new_record.update_certified_address(signed_record, source, should_expire); + self.records.insert(peer, new_record); + true + } + + fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool { + if let Some(record) = self.records.get_mut(peer) { + return record.remove_address(address); + } + false + } + + fn on_swarm_event(&mut self, swarm_event: &FromSwarm) -> Option { + match swarm_event { + FromSwarm::NewExternalAddrOfPeer(info) => { + if self.update_address(&info.peer_id, info.addr, AddressSource::Behaviour, true) { + return Some(super::store::Event::RecordUpdated(info.peer_id)); + } + None + } + FromSwarm::ConnectionEstablished(info) => { + let mut is_record_updated = false; + for failed_addr in info.failed_addresses { + is_record_updated |= self.remove_address(&info.peer_id, failed_addr); + } + is_record_updated |= self.update_address( + &info.peer_id, + info.endpoint.get_remote_address(), + AddressSource::DirectConnection, + false, + ); + if is_record_updated { + return Some(super::store::Event::RecordUpdated(info.peer_id)); + } + None + } + _ => None, + } + } + + fn addresses_of_peer(&self, peer: &PeerId) -> Option> { + self.records.get(peer).map(|record| { + record + .addresses() + .filter(|(_, r)| !self.config.strict_mode || r.signature.is_some()) + .map(|(addr, _)| addr) + }) + } + + fn poll(&mut self, cx: &mut Context<'_>) -> Option { + if let Some(mut timer) = self.record_ttl_timer.take() { + if let Poll::Ready(()) = timer.poll_unpin(cx) { + self.check_record_ttl(); + self.record_ttl_timer = Some(Delay::new(self.config.check_record_ttl_interval)); + } + self.record_ttl_timer = Some(timer) + } + if let Some(ev) = self.pending_events.pop_front() { + return Some(ev); + } + None + } +} + +impl Behaviour> +where + T: 'static, +{ + /// Get all stored address records of the peer, not affected by `strict_mode`. + pub fn address_record_of_peer( + &self, + peer: &PeerId, + ) -> Option> { + self.store().records.get(peer).map(|r| r.addresses()) + } +} + +pub struct Config { + /// TTL for a record. + record_ttl: Duration, + /// The capacaity of a record store. + /// The least used record will be discarded when the store is full. + record_capacity: NonZeroUsize, + /// The interval for garbage collecting records. + check_record_ttl_interval: Duration, + /// Only provide signed addresses to the behaviour when set to true. + strict_mode: bool, +} + +impl Default for Config { + fn default() -> Self { + Self { + record_ttl: Duration::from_secs(600), + record_capacity: NonZeroUsize::try_from(8).expect("8 > 0"), + check_record_ttl_interval: Duration::from_secs(5), + strict_mode: false, + } + } +} + +mod record { + use std::rc::Rc; + + use lru::LruCache; + + use super::*; + + pub(crate) struct PeerRecord { + /// A LRU(Least Recently Used) cache for addresses. + /// Will delete the least-recently-used record when full. + addresses: LruCache, + custom: Option, + } + impl PeerRecord { + pub(crate) fn new(capacity: NonZeroUsize) -> Self { + Self { + addresses: LruCache::new(capacity), + custom: None, + } + } + pub(crate) fn addresses(&self) -> impl Iterator { + self.addresses.iter() + } + pub(crate) fn update_address( + &mut self, + address: &Multiaddr, + source: AddressSource, + should_expire: bool, + ) -> bool { + if let Some(record) = self.addresses.get_mut(address) { + record.update_last_seen(); + return false; + } + // new record won't call `Instant::now()` twice + self.addresses.get_or_insert(address.clone(), || { + AddressRecord::new(source, should_expire, None) + }); + true + } + pub(crate) fn update_certified_address( + &mut self, + signed_record: &libp2p_core::PeerRecord, + source: AddressSource, + should_expire: bool, + ) -> bool { + let mut is_updated = false; + let signed_record = Rc::new(signed_record.clone()); + for address in signed_record.addresses() { + // promote the address or update with the latest signature. + if let Some(r) = self.addresses.get_mut(address) { + r.signature = Some(signed_record.clone()); + continue; + } + // the address is not present. this defers cloning. + self.addresses.get_or_insert(address.clone(), || { + AddressRecord::new(source, should_expire, Some(signed_record.clone())) + }); + is_updated = true; + } + is_updated + } + pub(crate) fn remove_address(&mut self, address: &Multiaddr) -> bool { + self.addresses.pop(address).is_some() + } + pub(crate) fn check_addresses_ttl(&mut self, now: Instant, ttl: Duration) { + let mut records_to_be_deleted = Vec::new(); + for (k, record) in self.addresses.iter() { + if record.is_expired(now, ttl) { + records_to_be_deleted.push(k.clone()); + } + } + for k in records_to_be_deleted { + self.addresses.pop(&k); + } + } + pub(crate) fn get_custom_data(&self) -> Option<&T> { + self.custom.as_ref() + } + pub(crate) fn take_custom_data(&mut self) -> Option { + self.custom.take() + } + pub(crate) fn insert_custom_data(&mut self, custom_data: T) { + let _ = self.custom.insert(custom_data); + } + } + + pub struct AddressRecord { + /// The time when the address is last seen. + pub last_seen: Instant, + /// How the address is discovered. + pub source: AddressSource, + /// Whether the address will expire. + pub should_expire: bool, + /// Reference to the `PeerRecord` that contains this address. + /// The inner `PeerRecord` will be dropped automatically + /// when there is no living reference to it. + pub signature: Option>, + } + impl AddressRecord { + pub(crate) fn new( + source: AddressSource, + should_expire: bool, + signed: Option>, + ) -> Self { + Self { + last_seen: Instant::now(), + source, + should_expire, + signature: signed, + } + } + pub(crate) fn update_last_seen(&mut self) { + self.last_seen = Instant::now(); + } + pub(crate) fn is_expired(&self, now: Instant, ttl: Duration) -> bool { + self.should_expire && now.duration_since(self.last_seen) > ttl + } + } +} + +#[cfg(test)] +mod test { + use std::{num::NonZeroUsize, str::FromStr, thread, time::Duration}; + + use libp2p_core::{Multiaddr, PeerId}; + + use super::{Config, MemoryStore}; + use crate::Store; + + #[test] + fn record_expire() { + let config = Config { + record_capacity: NonZeroUsize::try_from(4).expect("4 > 0"), + record_ttl: Duration::from_millis(1), + ..Default::default() + }; + let mut store: MemoryStore<()> = MemoryStore::new(config); + let fake_peer = PeerId::random(); + let addr_no_expire = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); + let addr_should_expire = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); + store.update_address( + &fake_peer, + &addr_no_expire, + crate::store::AddressSource::Manual, + false, + ); + store.update_address( + &fake_peer, + &addr_should_expire, + crate::store::AddressSource::Manual, + true, + ); + thread::sleep(Duration::from_millis(2)); + store.check_record_ttl(); + assert!(!store + .addresses_of_peer(&fake_peer) + .expect("peer to be in the store") + .any(|r| *r == addr_should_expire)); + assert!(store + .addresses_of_peer(&fake_peer) + .expect("peer to be in the store") + .any(|r| *r == addr_no_expire)); + } + + #[test] + fn recent_use_bubble_up() { + let mut store: MemoryStore<()> = MemoryStore::new(Default::default()); + let fake_peer = PeerId::random(); + let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); + let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); + store.update_address( + &fake_peer, + &addr1, + crate::store::AddressSource::Manual, + false, + ); + store.update_address( + &fake_peer, + &addr2, + crate::store::AddressSource::Manual, + false, + ); + assert!( + *store + .records + .get(&fake_peer) + .expect("peer to be in the store") + .addresses() + .last() + .expect("addr in the record") + .0 + == addr1 + ); + store.update_address( + &fake_peer, + &addr1, + crate::store::AddressSource::Manual, + false, + ); + assert!( + *store + .records + .get(&fake_peer) + .expect("peer to be in the store") + .addresses() + .last() + .expect("addr in the record") + .0 + == addr2 + ); + } + + #[test] + fn bounded_store() { + let mut store: MemoryStore<()> = MemoryStore::new(Default::default()); + let fake_peer = PeerId::random(); + for i in 1..10 { + let addr_string = format!("/ip4/127.0.0.{}", i); + store.update_address( + &fake_peer, + &Multiaddr::from_str(&addr_string).expect("parsing to succeed"), + crate::store::AddressSource::Manual, + false, + ); + } + let first_record = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed"); + assert!(!store + .addresses_of_peer(&fake_peer) + .expect("peer to be in the store") + .any(|addr| *addr == first_record)); + let second_record = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed"); + assert!(store + .addresses_of_peer(&fake_peer) + .expect("peer to be in the store") + .any(|addr| *addr == second_record)); + } +} diff --git a/misc/peer-store/src/store.rs b/misc/peer-store/src/store.rs new file mode 100644 index 00000000000..39ee2095e7c --- /dev/null +++ b/misc/peer-store/src/store.rs @@ -0,0 +1,61 @@ +use std::task::Context; + +use libp2p_core::{Multiaddr, PeerId}; +use libp2p_swarm::FromSwarm; + +/// A store that +/// - contains all observed addresses of peers; +pub trait Store { + /// Event generated by the store and emitted to [`Swarm`](libp2p_swarm::Swarm). + /// The behaviour cannot handle this event. + type ToSwarm; + + /// Update an address record. + /// Returns `true` when the address is new. + fn update_address( + &mut self, + peer: &PeerId, + address: &Multiaddr, + source: AddressSource, + should_expire: bool, + ) -> bool; + + /// Update an address record. + /// Returns `true` when the address is new. + fn update_certified_address( + &mut self, + signed_record: &libp2p_core::PeerRecord, + source: AddressSource, + should_expire: bool, + ) -> bool; + + /// Remove an address record. + /// Returns `true` when the address is removed. + fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool; + + /// How this store handles events from the swarm. + fn on_swarm_event(&mut self, event: &FromSwarm) -> Option; + + /// Get all stored addresses of the peer. + fn addresses_of_peer(&self, peer: &PeerId) -> Option>; + + /// Trigger grabage collection for records. + fn poll(&mut self, cx: &mut Context<'_>) -> Option; +} + +/// Event that will be handled by the behaviour. +/// `Store::ToSwarm` should be a superset of this event. +pub enum Event { + RecordUpdated(PeerId), +} + +/// How the address is discovered. +#[derive(Debug, Clone, Copy)] +pub enum AddressSource { + /// The address is discovered from a behaviour(e.g. kadelima, identify). + Behaviour, + /// We have direct connection to the address. + DirectConnection, + /// The address is manually added. + Manual, +}