diff --git a/redis/src/cluster_async/connections_container.rs b/redis/src/cluster_async/connections_container.rs index e0b2fc87d..2bfbb8b93 100644 --- a/redis/src/cluster_async/connections_container.rs +++ b/redis/src/cluster_async/connections_container.rs @@ -137,6 +137,14 @@ where } } + // Extends the current connection map with the provided one + pub(crate) fn extend_connection_map( + &mut self, + other_connection_map: ConnectionsMap, + ) { + self.connection_map.extend(other_connection_map.0); + } + /// Returns true if the address represents a known primary node. pub(crate) fn is_primary(&self, address: &String) -> bool { self.connection_for_address(address).is_some() @@ -841,4 +849,33 @@ mod tests { assert!(!container.is_primary(&address)); } + + #[test] + fn test_extend_connection_map() { + let mut container = create_container(); + let mut current_addresses: Vec<_> = container + .all_node_connections() + .map(|conn| conn.0) + .collect(); + + let new_node = "new_primary1".to_string(); + // Check that `new_node` not exists in the current + assert!(container.connection_for_address(&new_node).is_none()); + // Create new connection map + let new_connection_map = DashMap::new(); + new_connection_map.insert(new_node.clone(), create_cluster_node(1, false)); + + // Extend the current connection map + container.extend_connection_map(ConnectionsMap(new_connection_map)); + + // Check that the new addresses vector contains both the new node and all previous nodes + let mut new_addresses: Vec<_> = container + .all_node_connections() + .map(|conn| conn.0) + .collect(); + current_addresses.push(new_node); + current_addresses.sort(); + new_addresses.sort(); + assert_eq!(current_addresses, new_addresses); + } } diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 53f09f08f..cf977dd2a 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -808,7 +808,7 @@ impl Future for Request { let request = this.request.as_mut().unwrap(); // TODO - would be nice if we didn't need to repeat this code twice, with & without retries. if request.retry >= this.retry_params.number_of_retries { - let next = if err.kind() == ErrorKind::ClusterConnectionNotFound { + let next = if err.kind() == ErrorKind::AllConnectionsUnavailable { Next::ReconnectToInitialNodes { request: None }.into() } else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect) || matches!(target, OperationTarget::NotFound) @@ -836,7 +836,7 @@ impl Future for Request { } request.retry = request.retry.saturating_add(1); - if err.kind() == ErrorKind::ClusterConnectionNotFound { + if err.kind() == ErrorKind::AllConnectionsUnavailable { return Next::ReconnectToInitialNodes { request: Some(this.request.take().unwrap()), } @@ -1132,12 +1132,7 @@ where } }; let mut write_lock = inner.conn_lock.write().await; - *write_lock = ConnectionsContainer::new( - Default::default(), - connection_map, - inner.cluster_params.read_from_replicas, - 0, - ); + write_lock.extend_connection_map(connection_map); drop(write_lock); if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries( inner.clone(), @@ -1260,7 +1255,7 @@ where } else { Err(last_err.unwrap_or_else(|| { ( - ErrorKind::ClusterConnectionNotFound, + ErrorKind::AllConnectionsUnavailable, "Couldn't find any connection", ) .into() @@ -1656,7 +1651,7 @@ where return OperationResult::Err(( OperationTarget::FanOut, ( - ErrorKind::ClusterConnectionNotFound, + ErrorKind::AllConnectionsUnavailable, "No connections found for multi-node operation", ) .into(), @@ -1700,7 +1695,7 @@ where ) } else { let _ = sender.send(Err(( - ErrorKind::ClusterConnectionNotFound, + ErrorKind::ConnectionNotFoundForRoute, "Connection not found", ) .into())); @@ -1871,7 +1866,7 @@ where && !RoutingInfo::is_key_routing_command(&routable_cmd.unwrap()) { return Err(( - ErrorKind::ClusterConnectionNotFound, + ErrorKind::ConnectionNotFoundForRoute, "Requested connection not found for route", format!("{route:?}"), ) @@ -1892,7 +1887,7 @@ where return Ok((address, conn.await)); } else { return Err(( - ErrorKind::ClusterConnectionNotFound, + ErrorKind::ConnectionNotFoundForRoute, "Requested connection not found", address, ) @@ -1938,7 +1933,7 @@ where .random_connections(1, ConnectionType::User) .next() .ok_or(RedisError::from(( - ErrorKind::ClusterConnectionNotFound, + ErrorKind::AllConnectionsUnavailable, "No random connection found", )))?; return Ok((random_address, random_conn_future.await)); diff --git a/redis/src/commands/cluster_scan.rs b/redis/src/commands/cluster_scan.rs index 83881f4a7..97f10577a 100644 --- a/redis/src/commands/cluster_scan.rs +++ b/redis/src/commands/cluster_scan.rs @@ -451,7 +451,9 @@ where // the connection to the address cant be reached from different reasons, we will check we want to check if // the problem is problem that we can recover from like failover or scale down or some network issue // that we can retry the scan command to an address that own the next slot we are at. - ErrorKind::IoError | ErrorKind::ClusterConnectionNotFound => { + ErrorKind::IoError + | ErrorKind::AllConnectionsUnavailable + | ErrorKind::ConnectionNotFoundForRoute => { let retry = retry_scan(&scan_state, &core, match_pattern, count, object_type).await?; (from_redis_value(&retry.0?)?, retry.1) diff --git a/redis/src/types.rs b/redis/src/types.rs index 3376dbe13..a024f16a7 100644 --- a/redis/src/types.rs +++ b/redis/src/types.rs @@ -133,8 +133,10 @@ pub enum ErrorKind { EmptySentinelList, /// Attempted to kill a script/function while they werent' executing NotBusy, - /// Used when a cluster connection cannot find a connection to a valid node. - ClusterConnectionNotFound, + /// Used when no valid node connections remain in the cluster connection + AllConnectionsUnavailable, + /// Used when a connection is not found for the specified route. + ConnectionNotFoundForRoute, #[cfg(feature = "json")] /// Error Serializing a struct to JSON form @@ -875,7 +877,8 @@ impl RedisError { ErrorKind::NoValidReplicasFoundBySentinel => "no valid replicas found by sentinel", ErrorKind::EmptySentinelList => "empty sentinel list", ErrorKind::NotBusy => "not busy", - ErrorKind::ClusterConnectionNotFound => "connection to node in cluster not found", + ErrorKind::AllConnectionsUnavailable => "no valid connections remain in the cluster", + ErrorKind::ConnectionNotFoundForRoute => "No connection found for the requested route", #[cfg(feature = "json")] ErrorKind::Serialize => "serializing", ErrorKind::RESP3NotSupported => "resp3 is not supported by server", @@ -1046,7 +1049,8 @@ impl RedisError { ErrorKind::ParseError => RetryMethod::Reconnect, ErrorKind::AuthenticationFailed => RetryMethod::Reconnect, - ErrorKind::ClusterConnectionNotFound => RetryMethod::Reconnect, + ErrorKind::AllConnectionsUnavailable => RetryMethod::Reconnect, + ErrorKind::ConnectionNotFoundForRoute => RetryMethod::Reconnect, ErrorKind::IoError => match &self.repr { ErrorRepr::IoError(err) => match err.kind() { diff --git a/redis/tests/test_cluster_async.rs b/redis/tests/test_cluster_async.rs index 4dbc0653b..7d1249c3e 100644 --- a/redis/tests/test_cluster_async.rs +++ b/redis/tests/test_cluster_async.rs @@ -2400,7 +2400,7 @@ mod cluster_async { .block_on(cmd.query_async::<_, Vec>(&mut connection)) .unwrap_err(); assert!( - matches!(result.kind(), ErrorKind::ClusterConnectionNotFound) + matches!(result.kind(), ErrorKind::ConnectionNotFoundForRoute) || result.is_connection_dropped() ); } @@ -4031,7 +4031,7 @@ mod cluster_async { handler: _handler, .. } = MockEnv::with_client_builder( - ClusterClient::builder(vec![&*format!("redis://{name}")]), + ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(1), name, move |received_cmd: &[u8], _| { let slots_config_vec = vec![ @@ -4071,7 +4071,7 @@ mod cluster_async { let res_err = res.unwrap_err(); assert_eq!( res_err.kind(), - ErrorKind::ClusterConnectionNotFound, + ErrorKind::ConnectionNotFoundForRoute, "{:?}", res_err ); diff --git a/redis/tests/test_cluster_scan.rs b/redis/tests/test_cluster_scan.rs index a4bb85625..29a3c87b4 100644 --- a/redis/tests/test_cluster_scan.rs +++ b/redis/tests/test_cluster_scan.rs @@ -164,7 +164,12 @@ mod test_cluster_scan_async { #[tokio::test] // test cluster scan with node fail in the middle async fn test_async_cluster_scan_with_fail() { - let cluster = TestClusterContext::new(3, 0); + let cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(1), + false, + ); let mut connection = cluster.async_connection(None).await; // Set some keys for i in 0..1000 { @@ -224,7 +229,11 @@ mod test_cluster_scan_async { let cluster = TestClusterContext::new_with_cluster_client_builder( 6, 1, - |builder| builder.slots_refresh_rate_limit(Duration::from_secs(0), 0), + |builder| { + builder + .slots_refresh_rate_limit(Duration::from_secs(0), 0) + .retries(1) + }, false, ); @@ -374,7 +383,11 @@ mod test_cluster_scan_async { let cluster = TestClusterContext::new_with_cluster_client_builder( 6, 1, - |builder| builder.slots_refresh_rate_limit(Duration::from_secs(0), 0), + |builder| { + builder + .slots_refresh_rate_limit(Duration::from_secs(0), 0) + .retries(1) + }, false, ); @@ -772,7 +785,12 @@ mod test_cluster_scan_async { // Testing cluster scan when connection fails in the middle and we get an error // then cluster up again and scanning can continue without any problem async fn test_async_cluster_scan_failover() { - let mut cluster = TestClusterContext::new(3, 0); + let mut cluster = TestClusterContext::new_with_cluster_client_builder( + 3, + 0, + |builder| builder.retries(1), + false, + ); let mut connection = cluster.async_connection(None).await; let mut i = 0; loop {