diff --git a/Cargo.lock b/Cargo.lock index ce94dfeaa..a6a8c9b7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -652,6 +652,20 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "derivative" version = "2.2.0" @@ -1637,6 +1651,7 @@ dependencies = [ "combine", "crc16", "criterion", + "dashmap", "derivative", "dispose", "fast-math", diff --git a/redis/Cargo.toml b/redis/Cargo.toml index 0d836a698..fd79ff079 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -63,6 +63,9 @@ crc16 = { version = "0.4", optional = true } rand = { version = "0.8", optional = true } derivative = { version = "2.2.0", optional = true } +# Only needed for async cluster +dashmap = { version = "6.0", optional = true } + # Only needed for async_std support async-std = { version = "1.8.0", optional = true } async-trait = { version = "0.1.24", optional = true } @@ -124,7 +127,7 @@ tokio-native-tls-comp = ["tokio-comp", "tls-native-tls", "tokio-native-tls"] tokio-rustls-comp = ["tokio-comp", "tls-rustls", "tokio-rustls"] connection-manager = ["futures", "aio", "tokio-retry"] streams = [] -cluster-async = ["cluster", "futures", "futures-util"] +cluster-async = ["cluster", "futures", "futures-util", "dashmap"] keep-alive = ["socket2"] sentinel = ["rand"] tcp_nodelay = [] diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index 2a438aa77..e0b2fc87d 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -1,13 +1,11 @@ use crate::cluster_async::ConnectionFuture; -use arcstr::ArcStr; -use futures::FutureExt; -use rand::seq::IteratorRandom; -use std::collections::HashMap; -use std::net::IpAddr; - use crate::cluster_routing::{Route, SlotAddr}; use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue}; use crate::cluster_topology::TopologyHash; +use dashmap::DashMap; +use futures::FutureExt; +use rand::seq::IteratorRandom; +use std::net::IpAddr; /// A struct that encapsulates a network connection along with its associated IP address. #[derive(Clone, Eq, PartialEq, Debug)] @@ -86,11 +84,12 @@ pub(crate) enum ConnectionType { PreferManagement, } -pub(crate) struct ConnectionsMap(pub(crate) HashMap>); +pub(crate) struct ConnectionsMap(pub(crate) DashMap>); impl std::fmt::Display for ConnectionsMap { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for (address, node) in self.0.iter() { + for item in self.0.iter() { + let (address, node) = (item.key(), item.value()); match node.user_connection.ip { Some(ip) => writeln!(f, "{address} - {ip}")?, None => writeln!(f, "{address}")?, @@ -101,7 +100,7 @@ impl std::fmt::Display for ConnectionsMap { } pub(crate) struct ConnectionsContainer { - connection_map: HashMap>, + connection_map: DashMap>, pub(crate) slot_map: SlotMap, read_from_replica_strategy: ReadFromReplicaStrategy, topology_hash: TopologyHash, @@ -118,7 +117,7 @@ impl Default for ConnectionsContainer { } } -pub(crate) type ConnectionAndAddress = (ArcStr, Connection); +pub(crate) type ConnectionAndAddress = (String, Connection); impl ConnectionsContainer where @@ -139,7 +138,7 @@ where } /// Returns true if the address represents a known primary node. - pub(crate) fn is_primary(&self, address: &ArcStr) -> bool { + pub(crate) fn is_primary(&self, address: &String) -> bool { self.connection_for_address(address).is_some() && self .slot_map @@ -213,9 +212,10 @@ where pub(crate) fn all_node_connections( &self, ) -> impl Iterator> + '_ { - self.connection_map - .iter() - .map(move |(address, node)| (address.clone(), node.user_connection.conn.clone())) + self.connection_map.iter().map(move |item| { + let (node, address) = (item.key(), item.value()); + (node.clone(), address.user_connection.conn.clone()) + }) } pub(crate) fn all_primary_connections( @@ -228,16 +228,19 @@ where } pub(crate) fn node_for_address(&self, address: &str) -> Option> { - self.connection_map.get(address).cloned() + self.connection_map + .get(address) + .map(|item| item.value().clone()) } pub(crate) fn connection_for_address( &self, address: &str, ) -> Option> { - self.connection_map - .get_key_value(address) - .map(|(address, conn)| (address.clone(), conn.user_connection.conn.clone())) + self.connection_map.get(address).map(|item| { + let (address, conn) = (item.key(), item.value()); + (address.clone(), conn.user_connection.conn.clone()) + }) } pub(crate) fn random_connections( @@ -249,24 +252,27 @@ where .iter() .choose_multiple(&mut rand::thread_rng(), amount) .into_iter() - .map(move |(address, node)| { + .map(move |item| { + let (address, node) = (item.key(), item.value()); let conn = node.get_connection(&conn_type); (address.clone(), conn) }) } pub(crate) fn replace_or_add_connection_for_address( - &mut self, - address: impl Into, + &self, + address: impl Into, node: ClusterNode, - ) -> ArcStr { + ) -> String { let address = address.into(); self.connection_map.insert(address.clone(), node); address } - pub(crate) fn remove_node(&mut self, address: &ArcStr) -> Option> { - self.connection_map.remove(address) + pub(crate) fn remove_node(&self, address: &String) -> Option> { + self.connection_map + .remove(address) + .map(|(_key, value)| value) } pub(crate) fn len(&self) -> usize { @@ -302,13 +308,13 @@ mod tests { } } } - fn remove_nodes(container: &mut ConnectionsContainer, addresss: &[&str]) { - for address in addresss { + fn remove_nodes(container: &ConnectionsContainer, addresses: &[&str]) { + for address in addresses { container.remove_node(&(*address).into()); } } - fn remove_all_connections(container: &mut ConnectionsContainer) { + fn remove_all_connections(container: &ConnectionsContainer) { remove_nodes( container, &[ @@ -366,7 +372,7 @@ mod tests { ], ReadFromReplicaStrategy::AlwaysFromPrimary, // this argument shouldn't matter, since we overload the RFR strategy. ); - let mut connection_map = HashMap::new(); + let connection_map = DashMap::new(); connection_map.insert( "primary1".into(), create_cluster_node(1, use_management_connections), @@ -514,7 +520,7 @@ mod tests { #[test] fn get_replica_connection_for_replica_route_if_some_but_not_all_replicas_were_removed() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"replica3-2".into()); assert_eq!( @@ -540,8 +546,8 @@ mod tests { #[test] fn get_primary_connection_for_replica_route_if_all_replicas_were_removed() { - let mut container = create_container(); - remove_nodes(&mut container, &["replica2-1", "replica3-1", "replica3-2"]); + let container = create_container(); + remove_nodes(&container, &["replica2-1", "replica3-1", "replica3-2"]); assert_eq!( 2, @@ -593,7 +599,7 @@ mod tests { #[test] fn get_connection_by_address_returns_none_if_connection_was_removed() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"primary1".into()); assert!(container.connection_for_address("primary1").is_none()); @@ -601,7 +607,7 @@ mod tests { #[test] fn get_connection_by_address_returns_added_connection() { - let mut container = create_container(); + let container = create_container(); let address = container.replace_or_add_connection_for_address( "foobar", ClusterNode::new_only_with_user_conn(4), @@ -630,8 +636,8 @@ mod tests { #[test] fn get_random_connections_returns_none_if_all_connections_were_removed() { - let mut container = create_container(); - remove_all_connections(&mut container); + let container = create_container(); + remove_all_connections(&container); assert_eq!( 0, @@ -643,8 +649,8 @@ mod tests { #[test] fn get_random_connections_returns_added_connection() { - let mut container = create_container(); - remove_all_connections(&mut container); + let container = create_container(); + remove_all_connections(&container); let address = container.replace_or_add_connection_for_address( "foobar", ClusterNode::new_only_with_user_conn(4), @@ -694,7 +700,7 @@ mod tests { #[test] fn get_all_user_connections_returns_added_connection() { - let mut container = create_container(); + let container = create_container(); container.replace_or_add_connection_for_address( "foobar", ClusterNode::new_only_with_user_conn(4), @@ -711,7 +717,7 @@ mod tests { #[test] fn get_all_user_connections_does_not_return_removed_connection() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"primary1".into()); let mut connections: Vec<_> = container @@ -738,7 +744,7 @@ mod tests { #[test] fn get_all_primaries_does_not_return_removed_connection() { - let mut container = create_container(); + let container = create_container(); container.remove_node(&"primary1".into()); let mut connections: Vec<_> = container @@ -752,7 +758,7 @@ mod tests { #[test] fn len_is_adjusted_on_removals_and_additions() { - let mut container = create_container(); + let container = create_container(); assert_eq!(container.len(), 6); @@ -769,7 +775,7 @@ mod tests { #[test] fn len_is_not_adjusted_on_removals_of_nonexisting_connections_or_additions_of_existing_connections( ) { - let mut container = create_container(); + let container = create_container(); assert_eq!(container.len(), 6); @@ -785,7 +791,7 @@ mod tests { #[test] fn remove_node_returns_connection_if_it_exists() { - let mut container = create_container(); + let container = create_container(); let connection = container.remove_node(&"primary1".into()); assert_eq!(connection, Some(ClusterNode::new_only_with_user_conn(1))); @@ -796,7 +802,7 @@ mod tests { #[test] fn test_is_empty() { - let mut container = create_container(); + let container = create_container(); assert!(!container.is_empty()); container.remove_node(&"primary1".into()); @@ -829,7 +835,7 @@ mod tests { #[test] fn is_primary_returns_false_for_removed_node() { - let mut container = create_container(); + let container = create_container(); let address = "primary1".into(); container.remove_node(&address); diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 0a4d56e99..55e1aeff8 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -39,6 +39,7 @@ use crate::{ }; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use async_std::task::{spawn, JoinHandle}; +use dashmap::DashMap; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use futures::executor::block_on; use std::{ @@ -81,7 +82,6 @@ use std::time::Duration; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use crate::aio::{async_std::AsyncStd, RedisRuntime}; -use arcstr::ArcStr; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] use backoff_std_async::future::retry; #[cfg(all(not(feature = "tokio-comp"), feature = "async-std-comp"))] @@ -381,7 +381,7 @@ pub(crate) struct InnerCore { slot_refresh_state: SlotRefreshState, initial_nodes: Vec, push_sender: Option>, - subscriptions_by_address: RwLock>, + subscriptions_by_address: RwLock>, unassigned_subscriptions: RwLock, } @@ -516,7 +516,7 @@ pub(crate) enum InternalSingleNodeRouting { SpecificNode(Route), ByAddress(String), Connection { - address: ArcStr, + address: String, conn: ConnectionFuture, }, Redirect { @@ -613,14 +613,14 @@ pub(crate) enum Response { } pub(crate) enum OperationTarget { - Node { address: ArcStr }, + Node { address: String }, FanOut, NotFound, } type OperationResult = Result; -impl From for OperationTarget { - fn from(address: ArcStr) -> Self { +impl From for OperationTarget { + fn from(address: String) -> Self { OperationTarget::Node { address } } } @@ -761,12 +761,12 @@ enum Next { }, RetryBusyLoadingError { request: PendingRequest, - address: ArcStr, + address: String, }, Reconnect { // if not set, then a reconnect should happen without sending a request afterwards request: Option>, - target: ArcStr, + target: String, }, RefreshSlots { // if not set, then a slot refresh should happen without sending a request afterwards @@ -943,7 +943,7 @@ impl Request { } enum ConnectionCheck { - Found((ArcStr, ConnectionFuture)), + Found((String, ConnectionFuture)), OnlyAddress(String), RandomConnection, } @@ -1090,13 +1090,13 @@ where .buffer_unordered(initial_nodes.len()) .fold( ( - ConnectionsMap(HashMap::with_capacity(initial_nodes.len())), + ConnectionsMap(DashMap::with_capacity(initial_nodes.len())), None, ), - |mut connections: (ConnectionMap, Option), addr_conn_res| async move { + |connections: (ConnectionMap, Option), addr_conn_res| async move { match addr_conn_res { Ok((addr, node)) => { - connections.0 .0.insert(addr.into(), node); + connections.0 .0.insert(addr, node); (connections.0, None) } Err(e) => (connections.0, Some(e.to_string())), @@ -1152,18 +1152,18 @@ where async fn refresh_connections( inner: Arc>, - addresses: Vec, + addresses: Vec, conn_type: RefreshConnectionType, ) { info!("Started refreshing connections to {:?}", addresses); - let mut connections_container = inner.conn_lock.write().await; + let connections_container = inner.conn_lock.read().await; let cluster_params = &inner.cluster_params; let subscriptions_by_address = &inner.subscriptions_by_address; let push_sender = &inner.push_sender; stream::iter(addresses.into_iter()) .fold( - &mut *connections_container, + &*connections_container, |connections_container, address| async move { let node_option = connections_container.remove_node(&address); @@ -1200,7 +1200,7 @@ where } async fn aggregate_results( - receivers: Vec<(Option, oneshot::Receiver>)>, + receivers: Vec<(Option, oneshot::Receiver>)>, routing: &MultipleNodeRoutingInfo, response_policy: Option, ) -> RedisResult { @@ -1404,7 +1404,7 @@ where return; } - let mut addrs_to_refresh: HashSet = HashSet::new(); + let mut addrs_to_refresh: HashSet = HashSet::new(); let mut subs_by_address_guard = inner.subscriptions_by_address.write().await; let mut unassigned_subs_guard = inner.unassigned_subscriptions.write().await; let conns_read_guard = inner.conn_lock.read().await; @@ -1476,12 +1476,12 @@ where drop(subs_by_address_guard); if !addrs_to_refresh.is_empty() { - let mut conns_write_guard = inner.conn_lock.write().await; + let conns_read_guard = inner.conn_lock.read().await; // have to remove or otherwise the refresh_connection wont trigger node recreation for addr_to_refresh in addrs_to_refresh.iter() { - conns_write_guard.remove_node(addr_to_refresh); + conns_read_guard.remove_node(addr_to_refresh); } - drop(conns_write_guard); + drop(conns_read_guard); // immediately trigger connection reestablishment Self::refresh_connections( inner.clone(), @@ -1609,12 +1609,11 @@ where .await; let new_connections: ConnectionMap = stream::iter(addresses_and_connections_iter) .fold( - ConnectionsMap(HashMap::with_capacity(nodes_len)), - |mut connections, (addr, node)| async { + ConnectionsMap(DashMap::with_capacity(nodes_len)), + |connections, (addr, node)| async { let mut cluster_params = inner.cluster_params.clone(); let subs_guard = inner.subscriptions_by_address.read().await; - cluster_params.pubsub_subscriptions = - subs_guard.get(&ArcStr::from(addr.as_str())).cloned(); + cluster_params.pubsub_subscriptions = subs_guard.get(addr).cloned(); drop(subs_guard); let node = get_or_create_conn( addr, @@ -1673,7 +1672,7 @@ where Item = Option<(Arc, ConnectionAndAddress>)>, >, ) -> ( - Vec<(Option, Receiver>)>, + Vec<(Option, Receiver>)>, Vec>>, ) { iterator @@ -1785,7 +1784,7 @@ where pipeline: Arc, offset: usize, count: usize, - conn: impl Future>, + conn: impl Future>, ) -> OperationResult { trace!("try_pipeline_request"); let (address, mut conn) = conn.await.map_err(|err| (OperationTarget::NotFound, err))?; @@ -1833,7 +1832,7 @@ where routing: InternalSingleNodeRouting, core: Core, cmd: Option>, - ) -> RedisResult<(ArcStr, C)> { + ) -> RedisResult<(String, C)> { let read_guard = core.conn_lock.read().await; let mut asking = false; @@ -1923,7 +1922,7 @@ where { Ok(node) => { let connection_clone = node.user_connection.conn.clone().await; - let mut connections = core.conn_lock.write().await; + let connections = core.conn_lock.read().await; let address = connections.replace_or_add_connection_for_address(addr, node); drop(connections); (address, connection_clone) @@ -1985,7 +1984,7 @@ where async fn handle_loading_error( core: Core, info: RequestInfo, - address: ArcStr, + address: String, retry: u32, ) -> OperationResult { let is_primary = core.conn_lock.read().await.is_primary(&address); @@ -1993,7 +1992,7 @@ where if !is_primary { // If the connection is a replica, remove the connection and retry. // The connection will be established again on the next call to refresh slots once the replica is no longer in loading state. - core.conn_lock.write().await.remove_node(&address); + core.conn_lock.read().await.remove_node(&address); } else { // If the connection is primary, just sleep and retry let sleep_duration = core.cluster_params.retry_params.wait_time_for_retry(retry); @@ -2137,7 +2136,7 @@ where enum PollFlushAction { None, RebuildSlots, - Reconnect(Vec), + Reconnect(Vec), ReconnectFromInitialConnections, } @@ -2269,7 +2268,7 @@ async fn calculate_topology_from_random_nodes<'a, C>( crate::cluster_slotmap::SlotMap, crate::cluster_topology::TopologyHash, )>, - Vec, + Vec, ) where C: ConnectionLike + Connect + Clone + Send + Sync + 'static,