Skip to content

Commit

Permalink
allow attaching custom data
Browse files Browse the repository at this point in the history
  • Loading branch information
drHuangMHT committed Jan 31, 2025
1 parent c94f636 commit a02f088
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 99 deletions.
122 changes: 83 additions & 39 deletions misc/peer-store/src/memory_store.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::HashMap,
collections::{HashMap, VecDeque},
num::NonZeroUsize,
task::{Context, Poll},
time::{Duration, Instant},
Expand All @@ -9,37 +9,65 @@ use futures_timer::Delay;
use futures_util::FutureExt;
use libp2p_core::{Multiaddr, PeerId};
use libp2p_swarm::FromSwarm;
use record::PeerRecord;

use super::{store::Event, Store};
use super::Store;
use crate::{store::AddressSource, Behaviour};

#[derive(Debug, Clone)]
pub enum Event {
CustomDataUpdated(PeerId),
}

/// A in-memory store.
#[derive(Default)]
pub struct MemoryStore {
/// An address book of peers regardless of their status(connected or not).
address_book: HashMap<PeerId, record::PeerAddressRecord>,
pub struct MemoryStore<T> {
/// The internal store.
records: HashMap<PeerId, record::PeerRecord<T>>,
pending_events: VecDeque<Event>,
record_ttl_timer: Option<Delay>,
config: Config,
}

impl MemoryStore {
impl<T> MemoryStore<T> {
pub fn new(config: Config) -> Self {
Self {
config,
..Default::default()
records: HashMap::new(),
record_ttl_timer: None,
pending_events: VecDeque::default(),
}
}

fn check_record_ttl(&mut self) {
let now = Instant::now();
for r in &mut self.address_book.values_mut() {
r.check_ttl(now, self.config.record_ttl);
for r in &mut self.records.values_mut() {
r.check_addresses_ttl(now, self.config.record_ttl);
}
}

pub fn get_custom_data(&self, peer: &PeerId) -> Option<&T> {
self.records.get(peer).and_then(|r| r.get_custom_data())
}
pub fn take_custom_data(&mut self, peer: &PeerId) -> Option<T> {
self.records
.get_mut(peer)
.and_then(|r| r.take_custom_data())
}
pub fn insert_custom_data(&mut self, peer: &PeerId, custom_data: T) {
if let Some(r) = self.records.get_mut(peer) {
return r.insert_custom_data(custom_data);
}
let mut new_record = PeerRecord::new(self.config.record_capacity);
new_record.insert_custom_data(custom_data);
self.records.insert(*peer, new_record);
self.pending_events
.push_back(Event::CustomDataUpdated(*peer));
}
}

impl Store for MemoryStore {
type ToSwarm = ();
impl<T> Store for MemoryStore<T> {
type ToSwarm = Event;

fn update_address(
&mut self,
Expand All @@ -48,12 +76,12 @@ impl Store for MemoryStore {
source: AddressSource,
should_expire: bool,
) -> bool {
if let Some(record) = self.address_book.get_mut(peer) {
if let Some(record) = self.records.get_mut(peer) {
return record.update_address(address, source, should_expire);
}
let mut new_record = record::PeerAddressRecord::new(self.config.record_capacity);
let mut new_record = record::PeerRecord::new(self.config.record_capacity);
new_record.update_address(address, source, should_expire);
self.address_book.insert(*peer, new_record);
self.records.insert(*peer, new_record);
true
}

Expand All @@ -64,27 +92,27 @@ impl Store for MemoryStore {
should_expire: bool,
) -> bool {
let peer = signed_record.peer_id();
if let Some(record) = self.address_book.get_mut(&peer) {
if let Some(record) = self.records.get_mut(&peer) {
return record.update_certified_address(signed_record, source, should_expire);
}
let mut new_record = record::PeerAddressRecord::new(self.config.record_capacity);
let mut new_record = record::PeerRecord::new(self.config.record_capacity);
new_record.update_certified_address(signed_record, source, should_expire);
self.address_book.insert(peer, new_record);
self.records.insert(peer, new_record);
true
}

fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) -> bool {
if let Some(record) = self.address_book.get_mut(peer) {
if let Some(record) = self.records.get_mut(peer) {
return record.remove_address(address);
}
false
}

fn on_swarm_event(&mut self, swarm_event: &FromSwarm) -> Option<Event> {
fn on_swarm_event(&mut self, swarm_event: &FromSwarm) -> Option<super::store::Event> {
match swarm_event {
FromSwarm::NewExternalAddrOfPeer(info) => {
if self.update_address(&info.peer_id, info.addr, AddressSource::Behaviour, true) {
return Some(Event::RecordUpdated(info.peer_id));
return Some(super::store::Event::RecordUpdated(info.peer_id));
}
None
}
Expand All @@ -100,7 +128,7 @@ impl Store for MemoryStore {
false,
);
if is_record_updated {
return Some(Event::RecordUpdated(info.peer_id));
return Some(super::store::Event::RecordUpdated(info.peer_id));
}
None
}
Expand All @@ -109,33 +137,39 @@ impl Store for MemoryStore {
}

fn addresses_of_peer(&self, peer: &PeerId) -> Option<impl Iterator<Item = &Multiaddr>> {
self.address_book.get(peer).map(|record| {
self.records.get(peer).map(|record| {
record
.records()
.addresses()
.filter(|(_, r)| !self.config.strict_mode || r.signature.is_some())
.map(|(addr, _)| addr)
})
}

fn poll(&mut self, cx: &mut Context<'_>) -> Option<()> {
fn poll(&mut self, cx: &mut Context<'_>) -> Option<Self::ToSwarm> {
if let Some(mut timer) = self.record_ttl_timer.take() {
if let Poll::Ready(()) = timer.poll_unpin(cx) {
self.check_record_ttl();
self.record_ttl_timer = Some(Delay::new(self.config.check_record_ttl_interval));
}
self.record_ttl_timer = Some(timer)
}
if let Some(ev) = self.pending_events.pop_front() {
return Some(ev);
}
None
}
}

impl Behaviour<MemoryStore> {
impl<T> Behaviour<MemoryStore<T>>
where
T: 'static,
{
/// Get all stored address records of the peer, not affected by `strict_mode`.
pub fn address_record_of_peer(
&self,
peer: &PeerId,
) -> Option<impl Iterator<Item = (&Multiaddr, &record::AddressRecord)>> {
self.store().address_book.get(peer).map(|r| r.records())
self.store().records.get(peer).map(|r| r.addresses())
}
}

Expand Down Expand Up @@ -165,23 +199,24 @@ impl Default for Config {
mod record {
use std::rc::Rc;

use libp2p_core::PeerRecord;
use lru::LruCache;

use super::*;

pub(crate) struct PeerAddressRecord {
pub(crate) struct PeerRecord<T> {
/// A LRU(Least Recently Used) cache for addresses.
/// Will delete the least-recently-used record when full.
addresses: LruCache<Multiaddr, AddressRecord>,
custom: Option<T>,
}
impl PeerAddressRecord {
impl<T> PeerRecord<T> {
pub(crate) fn new(capacity: NonZeroUsize) -> Self {
Self {
addresses: LruCache::new(capacity),
custom: None,
}
}
pub(crate) fn records(&self) -> impl Iterator<Item = (&Multiaddr, &AddressRecord)> {
pub(crate) fn addresses(&self) -> impl Iterator<Item = (&Multiaddr, &AddressRecord)> {
self.addresses.iter()
}
pub(crate) fn update_address(
Expand All @@ -202,7 +237,7 @@ mod record {
}
pub(crate) fn update_certified_address(
&mut self,
signed_record: &PeerRecord,
signed_record: &libp2p_core::PeerRecord,
source: AddressSource,
should_expire: bool,
) -> bool {
Expand All @@ -225,7 +260,7 @@ mod record {
pub(crate) fn remove_address(&mut self, address: &Multiaddr) -> bool {
self.addresses.pop(address).is_some()
}
pub(crate) fn check_ttl(&mut self, now: Instant, ttl: Duration) {
pub(crate) fn check_addresses_ttl(&mut self, now: Instant, ttl: Duration) {
let mut records_to_be_deleted = Vec::new();
for (k, record) in self.addresses.iter() {
if record.is_expired(now, ttl) {
Expand All @@ -236,6 +271,15 @@ mod record {
self.addresses.pop(&k);
}
}
pub(crate) fn get_custom_data(&self) -> Option<&T> {
self.custom.as_ref()
}
pub(crate) fn take_custom_data(&mut self) -> Option<T> {
self.custom.take()
}
pub(crate) fn insert_custom_data(&mut self, custom_data: T) {
let _ = self.custom.insert(custom_data);
}
}

pub struct AddressRecord {
Expand Down Expand Up @@ -288,7 +332,7 @@ mod test {
record_ttl: Duration::from_millis(1),
..Default::default()
};
let mut store = MemoryStore::new(config);
let mut store: MemoryStore<()> = MemoryStore::new(config);
let fake_peer = PeerId::random();
let addr_no_expire = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
let addr_should_expire = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
Expand Down Expand Up @@ -318,7 +362,7 @@ mod test {

#[test]
fn recent_use_bubble_up() {
let mut store = MemoryStore::new(Default::default());
let mut store: MemoryStore<()> = MemoryStore::new(Default::default());
let fake_peer = PeerId::random();
let addr1 = Multiaddr::from_str("/ip4/127.0.0.1").expect("parsing to succeed");
let addr2 = Multiaddr::from_str("/ip4/127.0.0.2").expect("parsing to succeed");
Expand All @@ -336,10 +380,10 @@ mod test {
);
assert!(
*store
.address_book
.records
.get(&fake_peer)
.expect("peer to be in the store")
.records()
.addresses()
.last()
.expect("addr in the record")
.0
Expand All @@ -353,10 +397,10 @@ mod test {
);
assert!(
*store
.address_book
.records
.get(&fake_peer)
.expect("peer to be in the store")
.records()
.addresses()
.last()
.expect("addr in the record")
.0
Expand All @@ -366,7 +410,7 @@ mod test {

#[test]
fn bounded_store() {
let mut store = MemoryStore::new(Default::default());
let mut store: MemoryStore<()> = MemoryStore::new(Default::default());
let fake_peer = PeerId::random();
for i in 1..10 {
let addr_string = format!("/ip4/127.0.0.{}", i);
Expand Down
Loading

0 comments on commit a02f088

Please sign in to comment.