diff --git a/Cargo.lock b/Cargo.lock index f92dee00be..87774d75b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -813,6 +813,12 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fixedstr" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4e4dfef7b590ab7d11e531d602fdfb6a3413b09924db1428902bbc4410a9a8" + [[package]] name = "fnv" version = "1.0.7" @@ -2134,6 +2140,7 @@ dependencies = [ "either", "enum-map", "eyre", + "fixedstr", "form_urlencoded", "futures", "hyper", @@ -2188,6 +2195,7 @@ dependencies = [ "tryhard", "url", "uuid", + "xxhash-rust", ] [[package]] @@ -3642,6 +3650,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "xxhash-rust" +version = "0.8.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927da81e25be1e1a2901d59b81b37dd2efd1fc9c9345a55007f09bf5a2d3ee03" + [[package]] name = "yansi" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 646aee018a..432794f980 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -71,6 +71,11 @@ name = "compression" harness = false test = false +[[bench]] +name = "cluster_map" +harness = false +test = false + [dependencies] # Local quilkin-macros = { version = "0.8.0-dev", path = "./macros" } @@ -88,6 +93,7 @@ dashmap = { version = "5.5.3", features = ["serde"] } either = "1.9.0" enum-map = "2.6.3" eyre = "0.6.8" +fixedstr = { version = "0.5", features = ["flex-str"] } futures.workspace = true hyper = { version = "0.14.27", features = ["http2"] } hyper-rustls = { version = "0.24.1", features = ["http2", "webpki-roots"] } @@ -153,6 +159,7 @@ rand = "0.8.5" regex = "1.9.6" tracing-test = "0.2.4" tempfile = "3.8.0" +xxhash-rust = { version = "0.8", features = ["xxh3"] } [build-dependencies] tonic-build = { version = "0.10.2", default_features = false, features = [ diff --git a/about.toml b/about.toml index 71fd3ee5e8..0cb819e010 100644 --- a/about.toml +++ b/about.toml @@ -17,6 +17,7 @@ accepted = [ "Apache-2.0", "BSD-2-Clause", + "BSD-2-Clause", "BSD-3-Clause", "CC0-1.0", "ISC", @@ -26,7 +27,10 @@ accepted = [ "Unicode-DFS-2016", "Zlib", ] -workarounds = ["ring"] +workarounds = ["ring", "xxhash-rust"] [ring] accepted = ["ISC", "OpenSSL"] + +[xxhash-rust] +accepted = ["BSL-1.0"] diff --git a/benches/cluster_map.rs b/benches/cluster_map.rs new file mode 100644 index 0000000000..d45f5e3180 --- /dev/null +++ b/benches/cluster_map.rs @@ -0,0 +1,360 @@ +use std::{ + collections::BTreeSet, + hash::{Hash as _, Hasher as _}, + net::{Ipv4Addr, Ipv6Addr}, +}; + +use divan::Bencher; +use quilkin::net::{cluster::ClusterMap, endpoint::Locality, Endpoint, EndpointAddress}; +use rand::Rng; +use xxhash_rust::xxh3::Xxh3 as Hasher; + +const LOCALITIES: &[&str] = &[ + "us:east1:b", + "us:east1:c", + "us:east1:d", + "us:east4:c", + "us:east4:b", + "us:east4:a", + "us:central1:c", + "us:central1:a", + "us:central1:f", + "us:central1:b", + "us:west1:b", + "us:west1:c", + "us:west1:a", + "europe:west4:a", + "europe:west4:b", + "europe:west4:c", + "europe:west1:b", + "europe:west1:d", + "europe:west1:c", + "europe:west3:c", + "europe:west3:a", + "europe:west3:b", + "europe:west2:c", + "europe:west2:b", + "europe:west2:a", + "asia:east1:b", + "asia:east1:a", + "asia:east1:c", + "asia:southeast1:b", + "asia:southeast1:a", + "asia:southeast1:c", + "asia:northeast1:b", + "asia:northeast1:c", + "asia:northeast1:a", + "asia:south1:c", + "asia:south1:b", + "asia:south1:a", + "australia:southeast1:b", + "australia:southeast1:c", + "australia:southeast1:a", + "southamerica:east1:b", + "southamerica:east1:c", + "southamerica:east1:a", + "asia:east2:a", + "asia:east2:b", + "asia:east2:c", + "asia:northeast2:a", + "asia:northeast2:b", + "asia:northeast2:c", + "asia:northeast3:a", + "asia:northeast3:b", + "asia:northeast3:c", + "asia:south2:a", + "asia:south2:b", + "asia:south2:c", + "asia:southeast2:a", + "asia:southeast2:b", + "asia:southeast2:c", + "australia:southeast2:a", + "australia:southeast2:b", + "australia:southeast2:c", + "europe:central2:a", + "europe:central2:b", + "europe:central2:c", + "europe:north1:a", + "europe:north1:b", + "europe:north1:c", + "europe:southwest1:a", + "europe:southwest1:b", + "europe:southwest1:c", + "europe:west10:a", + "europe:west10:b", + "europe:west10:c", + "europe:west12:a", + "europe:west12:b", + "europe:west12:c", + "europe:west6:a", + "europe:west6:b", + "europe:west6:c", + "europe:west8:a", + "europe:west8:b", + "europe:west8:c", + "europe:west9:a", + "europe:west9:b", + "europe:west9:c", + "me:central1:a", + "me:central1:b", + "me:central1:c", + "me:central2:a", + "me:central2:b", + "me:central2:c", + "me:west1:a", + "me:west1:b", + "me:west1:c", + "northamerica:northeast1:a", + "northamerica:northeast1:b", + "northamerica:northeast1:c", + "northamerica:northeast2:a", + "northamerica:northeast2:b", + "northamerica:northeast2:c", + "southamerica:west1:a", + "southamerica:west1:b", + "southamerica:west1:c", + "us:east5:a", + "us:east5:b", + "us:east5:c", + "us:south1:a", + "us:south1:b", + "us:south1:c", + "us:west2:a", + "us:west2:b", + "us:west2:c", + "us:west3:a", + "us:west3:b", + "us:west3:c", + "us:west4:a", + "us:west4:b", + "us:west4:c", +]; + +fn gen_endpoints(rng: &mut rand::rngs::SmallRng, hasher: &mut Hasher) -> BTreeSet { + let num_endpoints = rng.gen_range(100..10_000); + hasher.write_u16(num_endpoints); + + let mut endpoints = BTreeSet::new(); + + for i in 0..num_endpoints { + let ep_addr = match i % 3 { + 0 => (Ipv4Addr::new(100, 200, (i >> 8) as _, (i & 0xff) as _), i).into(), + 1 => EndpointAddress { + host: quilkin::net::endpoint::AddressKind::Name(format!("benchmark-{i}")), + port: i, + }, + 2 => (Ipv6Addr::new(100, 200, i, 0, 0, 1, 2, 3), i).into(), + _ => unreachable!(), + }; + + endpoints.insert(Endpoint::new(ep_addr)); + } + + for ep in &endpoints { + ep.address.hash(hasher); + } + + endpoints +} + +#[allow(dead_code)] +struct GenCluster { + cm: ClusterMap, + hash: u64, + total_endpoints: usize, + sets: std::collections::BTreeMap, BTreeSet>, +} + +#[inline] +fn write_locality(hasher: &mut Hasher, loc: &Option) { + if let Some(key) = loc { + key.hash(hasher); + } else { + hasher.write("None".as_bytes()); + } +} + +fn gen_cluster_map() -> GenCluster { + use rand::prelude::*; + + let mut rng = rand::rngs::SmallRng::seed_from_u64(S); + + let mut hasher = Hasher::with_seed(S); + let mut total_endpoints = 0; + + let num_locals = rng.gen_range(10..LOCALITIES.len()); + + // Select how many localities we want, note we add 1 since we always have a default cluster + hasher.write_usize(num_locals + 1); + + let cm = ClusterMap::default(); + + for locality in LOCALITIES.choose_multiple(&mut rng, num_locals) { + let locality = locality.parse().unwrap(); + cm.insert(Some(locality), Default::default()); + } + + // Now actually insert the endpoints, now that the order of keys is established, + // annoying, but note we split out iteration versus insertion, otherwise we deadlock + let keys: Vec<_> = cm.iter().map(|kv| kv.key().clone()).collect(); + let mut sets = std::collections::BTreeMap::new(); + + for key in keys { + write_locality(&mut hasher, &key); + + let ep = gen_endpoints(&mut rng, &mut hasher); + total_endpoints += ep.len(); + cm.insert(key.clone(), ep.clone()); + sets.insert(key, ep); + } + + GenCluster { + cm, + hash: hasher.finish(), + total_endpoints, + sets, + } +} + +#[divan::bench_group(sample_count = 10)] +mod serde { + use super::*; + use prost_types::Any; + use quilkin::net::cluster::proto::Cluster; + + fn serialize_to_protobuf(cm: &ClusterMap) -> Vec { + let mut resources = Vec::new(); + let resource_type = quilkin::net::xds::ResourceType::Cluster; + + for cluster in cm.iter() { + resources.push( + resource_type + .encode_to_any(&Cluster::try_from((cluster.key(), cluster.value())).unwrap()) + .unwrap(), + ); + } + + resources + } + + fn deserialize_from_protobuf(pv: Vec) -> ClusterMap { + let cm = ClusterMap::default(); + + for any in pv { + let c = quilkin::net::xds::Resource::try_from(any).unwrap(); + + let quilkin::net::xds::Resource::Cluster(cluster) = c else { + unreachable!() + }; + cm.merge( + cluster.locality.map(From::from), + cluster + .endpoints + .into_iter() + .map(TryFrom::try_from) + .collect::>() + .unwrap(), + ); + } + + cm + } + + fn serialize_to_json(cm: &ClusterMap) -> serde_json::Value { + serde_json::to_value(cm).unwrap() + } + + fn deserialize_from_json(json: serde_json::Value) -> ClusterMap { + serde_json::from_value(json.clone()).unwrap() + } + + #[divan::bench(consts = SEEDS)] + fn serialize_proto(b: Bencher) { + let gc = gen_cluster_map::(); + b.counter(gc.total_endpoints) + .bench(|| divan::black_box(serialize_to_protobuf(&gc.cm))); + } + + #[divan::bench(consts = SEEDS)] + fn serialize_json(b: Bencher) { + let gc = gen_cluster_map::(); + b.counter(gc.total_endpoints) + .bench(|| divan::black_box(serialize_to_json(&gc.cm))); + } + + #[divan::bench(consts = SEEDS)] + fn deserialize_json(b: Bencher) { + let gc = gen_cluster_map::(); + let json = serialize_to_json(&gc.cm); + + b.with_inputs(|| json.clone()) + .counter(gc.total_endpoints) + .bench_values(|json| divan::black_box(deserialize_from_json(json))); + } + + #[divan::bench(consts = SEEDS)] + fn deserialize_proto(b: Bencher) { + let gc = gen_cluster_map::(); + let pv = serialize_to_protobuf(&gc.cm); + + b.with_inputs(|| pv.clone()) + .counter(gc.total_endpoints) + .bench_values(|pv| divan::black_box(deserialize_from_protobuf(pv))); + } +} + +const SEEDS: &[u64] = &[100, 200, 300, 400, 500]; + +#[divan::bench_group(sample_count = 10)] +mod ops { + use super::*; + + fn compute_hash(gc: &GenCluster) -> usize { + let mut total_endpoints = 0; + + for kv in gc.cm.iter() { + for _ep in kv.value() { + total_endpoints += 1; + } + } + + assert_eq!(total_endpoints, gc.total_endpoints); + total_endpoints + } + + #[allow(clippy::eq_op)] + fn is_equal(gc: &GenCluster) -> usize { + assert_eq!(gc.cm, gc.cm); + gc.total_endpoints + } + + #[divan::bench(consts = SEEDS)] + fn iterate(b: Bencher) { + let cm = gen_cluster_map::(); + + b.counter(cm.total_endpoints) + .bench_local(|| divan::black_box(compute_hash::(&cm))); + + drop(cm); + } + + #[divan::bench(consts = SEEDS)] + fn iterate_par(b: Bencher) { + let cm = gen_cluster_map::(); + + b.counter(cm.total_endpoints) + .bench(|| divan::black_box(compute_hash::(&cm))) + } + + #[divan::bench(consts = SEEDS)] + fn partial_eq(b: Bencher) { + let cm = gen_cluster_map::(); + + b.counter(cm.total_endpoints) + .bench(|| divan::black_box(is_equal(&cm))) + } +} + +fn main() { + divan::main(); +} diff --git a/deny.toml b/deny.toml index 1eb2f9b9a7..6904a61954 100644 --- a/deny.toml +++ b/deny.toml @@ -37,6 +37,7 @@ exceptions = [ # Each entry is the crate and version constraint, and its specific allow # list { name = "webpki-roots", version = "0.25.0", allow = ["MPL-2.0"] }, + { name = "xxhash-rust", version = "0.8", allow = ["BSL-1.0"] }, ] [[licenses.clarify]] diff --git a/macros/src/include.rs b/macros/src/include.rs index 3528d469cf..660118397d 100644 --- a/macros/src/include.rs +++ b/macros/src/include.rs @@ -49,7 +49,7 @@ impl ToTokens for IncludeProto { let module = id.split('.').rev().fold::, _>(items, |acc, module| { let module = syn::Ident::new(module, Span::mixed_site()); let result: syn::ItemMod = syn::parse_quote! { - #[allow(warnings, clippy::all)] pub(crate) mod #module { #(#acc)* } + #[allow(warnings, clippy::all)] pub mod #module { #(#acc)* } }; vec![syn::Item::Mod(result)] diff --git a/src/cli/agent.rs b/src/cli/agent.rs index fe448450f7..582a29b473 100644 --- a/src/cli/agent.rs +++ b/src/cli/agent.rs @@ -79,12 +79,13 @@ impl Agent { mode: Admin, mut shutdown_rx: crate::ShutdownRx, ) -> crate::Result<()> { - let locality = (self.region.is_some() || self.zone.is_some() || self.sub_zone.is_some()) - .then(|| crate::net::endpoint::Locality { - region: self.region.clone().unwrap_or_default(), - zone: self.zone.clone().unwrap_or_default(), - sub_zone: self.sub_zone.clone().unwrap_or_default(), - }); + let locality = self.region.as_ref().map(|region| { + crate::net::endpoint::Locality::new( + region, + self.zone.as_deref().unwrap_or_default(), + self.sub_zone.as_deref().unwrap_or_default(), + ) + }); let runtime_config = mode.unwrap_agent(); diff --git a/src/cli/manage.rs b/src/cli/manage.rs index f968feb039..f98d971efa 100644 --- a/src/cli/manage.rs +++ b/src/cli/manage.rs @@ -60,12 +60,13 @@ impl Manage { mode: Admin, mut shutdown_rx: crate::ShutdownRx, ) -> crate::Result<()> { - let locality = (self.region.is_some() || self.zone.is_some() || self.sub_zone.is_some()) - .then(|| crate::net::endpoint::Locality { - region: self.region.clone().unwrap_or_default(), - zone: self.zone.clone().unwrap_or_default(), - sub_zone: self.sub_zone.clone().unwrap_or_default(), - }); + let locality = self.region.as_ref().map(|region| { + crate::net::endpoint::Locality::new( + region, + self.zone.as_deref().unwrap_or_default(), + self.sub_zone.as_deref().unwrap_or_default(), + ) + }); if let Some(locality) = &locality { config diff --git a/src/net.rs b/src/net.rs index 5c03f49473..ec16c47527 100644 --- a/src/net.rs +++ b/src/net.rs @@ -17,7 +17,7 @@ pub mod cluster; pub mod endpoint; pub(crate) mod maxmind_db; -pub(crate) mod xds; +pub mod xds; use std::{ io, diff --git a/src/net/cluster.rs b/src/net/cluster.rs index ed4df5818a..8f2756c5b0 100644 --- a/src/net/cluster.rs +++ b/src/net/cluster.rs @@ -15,7 +15,8 @@ */ use std::{ - collections::BTreeSet, + collections::{hash_map::RandomState, BTreeSet}, + fmt, sync::atomic::{AtomicU64, AtomicUsize, Ordering::Relaxed}, }; @@ -28,7 +29,7 @@ use crate::net::endpoint::{Endpoint, Locality}; const SUBSYSTEM: &str = "cluster"; crate::include_proto!("quilkin.config.v1alpha1"); -pub(crate) use self::quilkin::config::v1alpha1 as proto; +pub use self::quilkin::config::v1alpha1 as proto; pub(crate) fn active_clusters() -> &'static prometheus::IntGauge { static ACTIVE_CLUSTERS: Lazy = Lazy::new(|| { @@ -60,8 +61,6 @@ pub(crate) fn active_endpoints() -> &'static prometheus::IntGauge { &ACTIVE_ENDPOINTS } -use std::fmt; - #[derive(Copy, Clone, PartialEq, Eq)] pub struct EndpointSetVersion(u64); @@ -178,21 +177,45 @@ impl EndpointSet { } /// Represents a full snapshot of all clusters. -#[derive(Default, Debug)] -pub struct ClusterMap { - map: DashMap, EndpointSet>, +pub struct ClusterMap { + map: DashMap, EndpointSet, S>, num_endpoints: AtomicUsize, version: AtomicU64, } -type DashMapRef<'inner> = dashmap::mapref::one::Ref<'inner, Option, EndpointSet>; +type DashMapRef<'inner, S> = dashmap::mapref::one::Ref<'inner, Option, EndpointSet, S>; +type DashMapRefMut<'inner, S> = + dashmap::mapref::one::RefMut<'inner, Option, EndpointSet, S>; + +impl ClusterMap { + pub fn new() -> Self { + Self::default() + } -impl ClusterMap { pub fn new_default(cluster: BTreeSet) -> Self { let this = Self::default(); this.insert_default(cluster); this } +} + +impl ClusterMap { + #[inline] + pub fn version(&self) -> u64 { + self.version.load(Relaxed) + } +} + +impl ClusterMap +where + S: Default + std::hash::BuildHasher + Clone, +{ + pub fn benchmarking(capacity: usize, hasher: S) -> Self { + Self { + map: DashMap::with_capacity_and_hasher(capacity, hasher), + ..Self::default() + } + } #[inline] pub fn insert( @@ -241,16 +264,22 @@ impl ClusterMap { self.map.is_empty() } - #[inline] - pub fn get(&self, key: &Option) -> Option { + pub fn get(&self, key: &Option) -> Option> { self.map.get(key) } - #[inline] - pub fn get_default(&self) -> Option { + pub fn get_mut(&self, key: &Option) -> Option> { + self.map.get_mut(key) + } + + pub fn get_default(&self) -> Option> { self.get(&None) } + pub fn get_default_mut(&self) -> Option> { + self.get_mut(&None) + } + #[inline] pub fn insert_default(&self, endpoints: BTreeSet) { self.insert(None, endpoints); @@ -296,6 +325,18 @@ impl ClusterMap { false } + #[inline] + pub fn iter(&self) -> dashmap::iter::Iter, EndpointSet, S> { + self.map.iter() + } + + pub fn entry( + &self, + key: Option, + ) -> dashmap::mapref::entry::Entry, EndpointSet, S> { + self.map.entry(key) + } + #[inline] pub fn replace(&self, locality: Option, endpoint: Endpoint) -> Option { if let Some(mut set) = self.map.get_mut(&locality) { @@ -314,11 +355,6 @@ impl ClusterMap { } } - #[inline] - pub fn iter(&self) -> dashmap::iter::Iter, EndpointSet> { - self.map.iter() - } - #[inline] pub fn endpoints(&self) -> Vec { let mut endpoints = Vec::with_capacity(self.num_of_endpoints()); @@ -365,11 +401,6 @@ impl ClusterMap { self.num_of_endpoints() != 0 } - #[inline] - pub fn version(&self) -> u64 { - self.version.load(Relaxed) - } - #[inline] pub fn update_unlocated_endpoints(&self, locality: Locality) { if let Some((_, set)) = self.map.remove(&None) { @@ -392,7 +423,7 @@ impl ClusterMap { } } -impl crate::config::watch::Watchable for ClusterMap { +impl crate::config::watch::Watchable for ClusterMap { #[inline] fn mark(&self) -> crate::config::watch::Marker { crate::config::watch::Marker::Version(self.version()) @@ -408,6 +439,31 @@ impl crate::config::watch::Watchable for ClusterMap { } } +impl fmt::Debug for ClusterMap +where + S: Default + std::hash::BuildHasher + Clone, +{ + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("ClusterMap") + .field("map", &self.map) + .field("version", &self.version) + .finish_non_exhaustive() + } +} + +impl Default for ClusterMap +where + S: Default + std::hash::BuildHasher + Clone, +{ + fn default() -> Self { + Self { + map: , EndpointSet, S>>::default(), + version: <_>::default(), + num_endpoints: <_>::default(), + } + } +} + impl Clone for ClusterMap { fn clone(&self) -> Self { let map = self.map.clone(); @@ -416,7 +472,10 @@ impl Clone for ClusterMap { } #[cfg(test)] -impl PartialEq for ClusterMap { +impl PartialEq for ClusterMap +where + S: Default + std::hash::BuildHasher + Clone, +{ fn eq(&self, rhs: &Self) -> bool { for a in self.iter() { match rhs @@ -510,7 +569,10 @@ impl Serialize for ClusterMap { } } -impl From for ClusterMap { +impl From for ClusterMap +where + S: Default + std::hash::BuildHasher + Clone, +{ fn from(cmd: ClusterMapDeser) -> Self { let map = DashMap::from_iter(cmd.endpoints.into_iter().map( |EndpointWithLocality { @@ -523,8 +585,11 @@ impl From for ClusterMap { } } -impl From, EndpointSet>> for ClusterMap { - fn from(map: DashMap, EndpointSet>) -> Self { +impl From, EndpointSet, S>> for ClusterMap +where + S: Default + std::hash::BuildHasher + Clone, +{ + fn from(map: DashMap, EndpointSet, S>) -> Self { let num_endpoints = AtomicUsize::new(map.iter().map(|kv| kv.value().len()).sum()); Self { map, @@ -570,11 +635,11 @@ mod tests { #[test] fn merge() { - let nl1 = Locality::region("nl-1"); - let de1 = Locality::region("de-1"); + let nl1 = Locality::with_region("nl-1"); + let de1 = Locality::with_region("de-1"); let mut endpoint = Endpoint::new((Ipv4Addr::LOCALHOST, 7777).into()); - let cluster1 = ClusterMap::default(); + let cluster1 = ClusterMap::new(); cluster1.insert(Some(nl1.clone()), [endpoint.clone()].into()); cluster1.insert(Some(de1.clone()), [endpoint.clone()].into()); diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs index 06106bb7c6..c82acdd9f0 100644 --- a/src/net/endpoint.rs +++ b/src/net/endpoint.rs @@ -22,7 +22,11 @@ pub mod metadata; use serde::{Deserialize, Serialize}; -pub use self::{address::EndpointAddress, locality::Locality, metadata::DynamicMetadata}; +pub use self::{ + address::{AddressKind, EndpointAddress}, + locality::Locality, + metadata::DynamicMetadata, +}; pub type EndpointMetadata = metadata::MetadataView; diff --git a/src/net/endpoint/locality.rs b/src/net/endpoint/locality.rs index 839e4566b2..eadbcac38b 100644 --- a/src/net/endpoint/locality.rs +++ b/src/net/endpoint/locality.rs @@ -15,83 +15,89 @@ */ use serde::{Deserialize, Serialize}; +use std::num::NonZeroUsize; + +type FixedBuffer = fixedstr::Flexstr<64>; +const SEP: char = ':'; /// The location of an [`Endpoint`][super::Endpoint]. -#[derive( - Clone, - Default, - Debug, - Hash, - Eq, - PartialEq, - Deserialize, - Serialize, - schemars::JsonSchema, - PartialOrd, - Ord, -)] +#[derive(Clone, Default, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)] pub struct Locality { - /// The geographic region. - #[serde(default)] - pub region: String, - /// The zone within the `region`, if applicable. - #[serde(default)] - pub zone: String, - /// The subzone within the `zone`, if applicable. - #[serde(default)] - pub sub_zone: String, + /// Internal buffer with the full string + buffer: FixedBuffer, + /// End offset of the geographic region portion + region: usize, + /// End offset of the zone within the region, if applicable + zone: Option, } impl Locality { - pub fn new( - region: impl Into, - zone: impl Into, - sub_zone: impl Into, - ) -> Self { + pub fn new(region: impl AsRef, zone: impl AsRef, sub_zone: impl AsRef) -> Self { + let mut buffer = FixedBuffer::new(); + buffer.push_str(region.as_ref()); + let region = buffer.len(); + + let zone = zone.as_ref(); + let zone = if !zone.is_empty() { + buffer.push_char(SEP); + buffer.push_str(zone.as_ref()); + let zone_offset = buffer.len(); + + let sub = sub_zone.as_ref(); + if !sub.is_empty() { + buffer.push_char(SEP); + buffer.push_str(sub.as_ref()); + } + + NonZeroUsize::new(zone_offset) + } else { + None + }; + Self { - region: region.into(), - zone: zone.into(), - sub_zone: sub_zone.into(), + buffer, + region, + zone, } } - pub fn region(region: impl Into) -> Self { + #[cfg(test)] + pub fn with_region(region: impl AsRef) -> Self { + let region = region.as_ref(); Self { - region: region.into(), - ..Self::default() + buffer: region.into(), + region: region.len(), + zone: None, } } - pub fn zone(mut self, zone: impl Into) -> Self { - self.zone = zone.into(); - self - } - + #[inline] pub fn colon_separated_string(&self) -> String { - let mut string = String::from(&*self.region); - - if !self.zone.is_empty() { - string += ":"; - string += &*self.zone; - } + self.buffer.as_str().to_owned() + } - if !self.sub_zone.is_empty() { - string += ":"; - string += &*self.sub_zone; - } + #[inline] + pub fn region(&self) -> &str { + &self.buffer[..self.region] + } - string + #[inline] + pub fn zone(&self) -> Option<&str> { + self.zone.map(|z| &self.buffer[self.region + 1..z.get()]) } - pub fn sub_zone(mut self, sub_zone: impl Into) -> Self { - self.sub_zone = sub_zone.into(); - self + #[inline] + pub fn sub_zone(&self) -> Option<&str> { + self.zone.and_then(|z| { + let o = z.get() + 1; + (o < self.buffer.len()).then(|| &self.buffer[o..]) + }) } } impl std::fmt::Display for Locality { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.colon_separated_string().fmt(f) + f.write_str(self.buffer.as_str()) } } @@ -99,44 +105,178 @@ impl std::str::FromStr for Locality { type Err = eyre::Error; fn from_str(input: &str) -> Result { - let vec: Vec<_> = input.split(':').collect(); - - Ok(match vec.len() { - 1 => Self { - region: vec[0].into(), - ..<_>::default() - }, - 2 => Self { - region: vec[0].into(), - zone: vec[1].into(), - ..<_>::default() - }, - 3 => Self { - region: vec[0].into(), - zone: vec[1].into(), - sub_zone: vec[2].into(), - }, - _ => return Err(eyre::eyre!("invalid locality identifier")), + if input.is_empty() { + return Err(eyre::eyre!("region not specified")); + } + + let mut iter = input.split(':'); + + let Some(region) = iter.next().filter(|r| !r.is_empty()) else { + return Err(eyre::eyre!("region not specified")); + }; + + let region = region.len(); + + let zone = iter.next().and_then(|z| { + (!z.is_empty()) + .then_some(region + 1 + z.len()) + .and_then(NonZeroUsize::new) + }); + let _subzone = iter.next(); + + if let Some(invalid) = iter.next() { + return Err(eyre::eyre!("locality identifier '{input}' had more than 3 components, '{invalid}' is not a region, zone, or subzone")); + } + + Ok(Self { + buffer: input.into(), + region, + zone, }) } } impl From for Locality { + #[inline] fn from(value: crate::net::cluster::proto::Locality) -> Self { - Self { - region: value.region, - zone: value.zone, - sub_zone: value.sub_zone, - } + Self::new(value.region, value.zone, value.sub_zone) } } impl From for crate::net::cluster::proto::Locality { + #[inline] fn from(value: Locality) -> Self { Self { - region: value.region, - zone: value.zone, - sub_zone: value.sub_zone, + region: value.region().to_owned(), + zone: value.zone().unwrap_or_default().to_owned(), + sub_zone: value.sub_zone().unwrap_or_default().to_owned(), } } } + +impl Serialize for Locality { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.buffer) + } +} + +impl<'de> Deserialize<'de> for Locality { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct LocalityVisitor; + + impl<'de> serde::de::Visitor<'de> for LocalityVisitor { + type Value = Locality; + + fn expecting(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str("a Locality identifier") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + v.parse().map_err(|err| E::custom(err)) + } + + fn visit_borrowed_str(self, v: &'de str) -> Result + where + E: serde::de::Error, + { + self.visit_str(v) + } + + fn visit_string(self, v: String) -> Result + where + E: serde::de::Error, + { + self.visit_str(&v) + } + } + + deserializer.deserialize_any(LocalityVisitor) + } +} + +impl schemars::JsonSchema for Locality { + fn is_referenceable() -> bool { + String::is_referenceable() + } + + fn schema_name() -> String { + String::schema_name() + } + + fn schema_id() -> std::borrow::Cow<'static, str> { + String::schema_id() + } + + fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { + String::json_schema(gen) + } + + fn _schemars_private_non_optional_json_schema( + gen: &mut schemars::gen::SchemaGenerator, + ) -> schemars::schema::Schema { + String::_schemars_private_non_optional_json_schema(gen) + } + + fn _schemars_private_is_option() -> bool { + String::_schemars_private_is_option() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn locality() { + let components = [ + ("region", None, None), + ("region1", Some("zone"), None), + ("region2", Some("zone1"), Some("subzone")), + ]; + + for comp in components { + let string = { + let mut s = String::new(); + s.push_str(comp.0); + + if let Some(z) = comp.1 { + s.push(SEP); + s.push_str(z); + + if let Some(sz) = comp.2 { + s.push(SEP); + s.push_str(sz); + } + } + + s + }; + + let parsed: Locality = string.parse().unwrap(); + assert_eq!(parsed.to_string(), string); + + assert_eq!(comp.0, parsed.region()); + assert_eq!(comp.1, parsed.zone()); + assert_eq!(comp.2, parsed.sub_zone()); + } + } + + #[test] + fn parse_fails_invalid() { + assert!("".parse::().is_err()); + assert!(":".parse::().is_err()); + assert!("::".parse::().is_err()); + assert!("region:zone:subzone:invalid".parse::().is_err()); + + assert!("region::".parse::().unwrap().zone().is_none()); + } +}