Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix rebase
Browse files Browse the repository at this point in the history
XAMPPRocky committed Feb 26, 2024
1 parent a96c071 commit cea5c98
Showing 2 changed files with 65 additions and 27 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

78 changes: 51 additions & 27 deletions src/net/cluster.rs
Original file line number Diff line number Diff line change
@@ -61,8 +61,6 @@ pub(crate) fn active_endpoints() -> &'static prometheus::IntGauge {
&ACTIVE_ENDPOINTS
}

use std::fmt;

#[derive(Copy, Clone, PartialEq, Eq)]
pub struct EndpointSetVersion(u64);

@@ -179,32 +177,44 @@ impl EndpointSet {
}

/// Represents a full snapshot of all clusters.
#[derive(Default, Debug)]
pub struct ClusterMap<S = RandomState> {
map: DashMap<Option<Locality>, EndpointSet, S>,
num_endpoints: AtomicUsize,
version: AtomicU64,
}

type DashMapRef<'inner, S> =
dashmap::mapref::one::Ref<'inner, Option<Locality>, BTreeSet<Endpoint>, S>;
type DashMapRef<'inner, S> = dashmap::mapref::one::Ref<'inner, Option<Locality>, EndpointSet, S>;
type DashMapRefMut<'inner, S> =
dashmap::mapref::one::RefMut<'inner, Option<Locality>, BTreeSet<Endpoint>, S>;
dashmap::mapref::one::RefMut<'inner, Option<Locality>, EndpointSet, S>;

impl ClusterMap<RandomState> {
pub fn new() -> Self {
Self::default()
}

pub fn new_default(cluster: BTreeSet<Endpoint>) -> Self {
let this = Self::default();
this.insert_default(cluster);
this
}
}

impl<S> ClusterMap<S> {
#[inline]
pub fn version(&self) -> u64 {
self.version.load(Relaxed)
}
}

impl<S> ClusterMap<S>
where
S: Default + std::hash::BuildHasher + Clone,
{
pub fn benchmarking(capacity: usize, hasher: S) -> Self {
Self(DashMap::with_capacity_and_hasher(capacity, hasher))
Self {
map: DashMap::with_capacity_and_hasher(capacity, hasher),
..Self::default()
}
}

#[inline]
@@ -255,11 +265,11 @@ where
}

pub fn get(&self, key: &Option<Locality>) -> Option<DashMapRef<S>> {
self.0.get(key)
self.map.get(key)
}

pub fn get_mut(&self, key: &Option<Locality>) -> Option<DashMapRefMut<S>> {
self.0.get_mut(key)
self.map.get_mut(key)
}

pub fn get_default(&self) -> Option<DashMapRef<S>> {
@@ -323,12 +333,8 @@ where
pub fn entry(
&self,
key: Option<Locality>,
) -> dashmap::mapref::entry::Entry<Option<Locality>, BTreeSet<Endpoint>, S> {
self.0.entry(key)
}

pub fn default_entry(&self) -> DashMapRefMut<S> {
self.entry(None).or_default()
) -> dashmap::mapref::entry::Entry<Option<Locality>, EndpointSet, S> {
self.map.entry(key)
}

#[inline]
@@ -395,11 +401,6 @@ where
self.num_of_endpoints() != 0
}

#[inline]
pub fn version(&self) -> u64 {
self.version.load(Relaxed)
}

#[inline]
pub fn update_unlocated_endpoints(&self, locality: Locality) {
if let Some((_, set)) = self.map.remove(&None) {
@@ -422,7 +423,7 @@ where
}
}

impl crate::config::watch::Watchable for ClusterMap {
impl<S> crate::config::watch::Watchable for ClusterMap<S> {
#[inline]
fn mark(&self) -> crate::config::watch::Marker {
crate::config::watch::Marker::Version(self.version())
@@ -444,11 +445,25 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ClusterMap")
.field("map", &self.0)
.field("map", &self.map)
.field("version", &self.version)
.finish_non_exhaustive()
}
}

impl<S> Default for ClusterMap<S>
where
S: Default + std::hash::BuildHasher + Clone,
{
fn default() -> Self {
Self {
map: <DashMap<Option<Locality>, EndpointSet, S>>::default(),
version: <_>::default(),
num_endpoints: <_>::default(),
}
}
}

impl Clone for ClusterMap {
fn clone(&self) -> Self {
let map = self.map.clone();
@@ -457,7 +472,10 @@ impl Clone for ClusterMap {
}

#[cfg(test)]
impl PartialEq for ClusterMap {
impl<S> PartialEq for ClusterMap<S>
where
S: Default + std::hash::BuildHasher + Clone,
{
fn eq(&self, rhs: &Self) -> bool {
for a in self.iter() {
match rhs
@@ -551,7 +569,10 @@ impl Serialize for ClusterMap {
}
}

impl From<ClusterMapDeser> for ClusterMap {
impl<S> From<ClusterMapDeser> for ClusterMap<S>
where
S: Default + std::hash::BuildHasher + Clone,
{
fn from(cmd: ClusterMapDeser) -> Self {
let map = DashMap::from_iter(cmd.endpoints.into_iter().map(
|EndpointWithLocality {
@@ -564,8 +585,11 @@ impl From<ClusterMapDeser> for ClusterMap {
}
}

impl From<DashMap<Option<Locality>, EndpointSet>> for ClusterMap {
fn from(map: DashMap<Option<Locality>, EndpointSet>) -> Self {
impl<S> From<DashMap<Option<Locality>, EndpointSet, S>> for ClusterMap<S>
where
S: Default + std::hash::BuildHasher + Clone,
{
fn from(map: DashMap<Option<Locality>, EndpointSet, S>) -> Self {
let num_endpoints = AtomicUsize::new(map.iter().map(|kv| kv.value().len()).sum());
Self {
map,
@@ -615,7 +639,7 @@ mod tests {
let de1 = Locality::with_region("de-1");

let mut endpoint = Endpoint::new((Ipv4Addr::LOCALHOST, 7777).into());
let cluster1 = ClusterMap::default();
let cluster1 = ClusterMap::new();

cluster1.insert(Some(nl1.clone()), [endpoint.clone()].into());
cluster1.insert(Some(de1.clone()), [endpoint.clone()].into());

0 comments on commit cea5c98

Please sign in to comment.