Skip to content

Commit

Permalink
Breaking change: reconnect_to_initial_nodes will be called only when …
Browse files Browse the repository at this point in the history
…AllConnectionsUnavailable (#183)

* Fixed reconnect_to_initial_nodes not to replace the current connection map but to extend it

* Breaking changes: renamed ClusterConnectionNotFound to AllConnectionsUnavailable, added new error ConnectionNotFoundForRoute
  • Loading branch information
barshaul authored Aug 15, 2024
1 parent efd61ff commit 2d7200f
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 26 deletions.
37 changes: 37 additions & 0 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ where
}
}

// Extends the current connection map with the provided one
pub(crate) fn extend_connection_map(
&mut self,
other_connection_map: ConnectionsMap<Connection>,
) {
self.connection_map.extend(other_connection_map.0);
}

/// Returns true if the address represents a known primary node.
pub(crate) fn is_primary(&self, address: &String) -> bool {
self.connection_for_address(address).is_some()
Expand Down Expand Up @@ -841,4 +849,33 @@ mod tests {

assert!(!container.is_primary(&address));
}

#[test]
fn test_extend_connection_map() {
let mut container = create_container();
let mut current_addresses: Vec<_> = container
.all_node_connections()
.map(|conn| conn.0)
.collect();

let new_node = "new_primary1".to_string();
// Check that `new_node` not exists in the current
assert!(container.connection_for_address(&new_node).is_none());
// Create new connection map
let new_connection_map = DashMap::new();
new_connection_map.insert(new_node.clone(), create_cluster_node(1, false));

// Extend the current connection map
container.extend_connection_map(ConnectionsMap(new_connection_map));

// Check that the new addresses vector contains both the new node and all previous nodes
let mut new_addresses: Vec<_> = container
.all_node_connections()
.map(|conn| conn.0)
.collect();
current_addresses.push(new_node);
current_addresses.sort();
new_addresses.sort();
assert_eq!(current_addresses, new_addresses);
}
}
23 changes: 9 additions & 14 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ impl<C> Future for Request<C> {
let request = this.request.as_mut().unwrap();
// TODO - would be nice if we didn't need to repeat this code twice, with & without retries.
if request.retry >= this.retry_params.number_of_retries {
let next = if err.kind() == ErrorKind::ClusterConnectionNotFound {
let next = if err.kind() == ErrorKind::AllConnectionsUnavailable {
Next::ReconnectToInitialNodes { request: None }.into()
} else if matches!(err.retry_method(), crate::types::RetryMethod::MovedRedirect)
|| matches!(target, OperationTarget::NotFound)
Expand Down Expand Up @@ -836,7 +836,7 @@ impl<C> Future for Request<C> {
}
request.retry = request.retry.saturating_add(1);

if err.kind() == ErrorKind::ClusterConnectionNotFound {
if err.kind() == ErrorKind::AllConnectionsUnavailable {
return Next::ReconnectToInitialNodes {
request: Some(this.request.take().unwrap()),
}
Expand Down Expand Up @@ -1132,12 +1132,7 @@ where
}
};
let mut write_lock = inner.conn_lock.write().await;
*write_lock = ConnectionsContainer::new(
Default::default(),
connection_map,
inner.cluster_params.read_from_replicas,
0,
);
write_lock.extend_connection_map(connection_map);
drop(write_lock);
if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries(
inner.clone(),
Expand Down Expand Up @@ -1260,7 +1255,7 @@ where
} else {
Err(last_err.unwrap_or_else(|| {
(
ErrorKind::ClusterConnectionNotFound,
ErrorKind::AllConnectionsUnavailable,
"Couldn't find any connection",
)
.into()
Expand Down Expand Up @@ -1656,7 +1651,7 @@ where
return OperationResult::Err((
OperationTarget::FanOut,
(
ErrorKind::ClusterConnectionNotFound,
ErrorKind::AllConnectionsUnavailable,
"No connections found for multi-node operation",
)
.into(),
Expand Down Expand Up @@ -1700,7 +1695,7 @@ where
)
} else {
let _ = sender.send(Err((
ErrorKind::ClusterConnectionNotFound,
ErrorKind::ConnectionNotFoundForRoute,
"Connection not found",
)
.into()));
Expand Down Expand Up @@ -1871,7 +1866,7 @@ where
&& !RoutingInfo::is_key_routing_command(&routable_cmd.unwrap())
{
return Err((
ErrorKind::ClusterConnectionNotFound,
ErrorKind::ConnectionNotFoundForRoute,
"Requested connection not found for route",
format!("{route:?}"),
)
Expand All @@ -1892,7 +1887,7 @@ where
return Ok((address, conn.await));
} else {
return Err((
ErrorKind::ClusterConnectionNotFound,
ErrorKind::ConnectionNotFoundForRoute,
"Requested connection not found",
address,
)
Expand Down Expand Up @@ -1938,7 +1933,7 @@ where
.random_connections(1, ConnectionType::User)
.next()
.ok_or(RedisError::from((
ErrorKind::ClusterConnectionNotFound,
ErrorKind::AllConnectionsUnavailable,
"No random connection found",
)))?;
return Ok((random_address, random_conn_future.await));
Expand Down
4 changes: 3 additions & 1 deletion redis/src/commands/cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ where
// the connection to the address cant be reached from different reasons, we will check we want to check if
// the problem is problem that we can recover from like failover or scale down or some network issue
// that we can retry the scan command to an address that own the next slot we are at.
ErrorKind::IoError | ErrorKind::ClusterConnectionNotFound => {
ErrorKind::IoError
| ErrorKind::AllConnectionsUnavailable
| ErrorKind::ConnectionNotFoundForRoute => {
let retry =
retry_scan(&scan_state, &core, match_pattern, count, object_type).await?;
(from_redis_value(&retry.0?)?, retry.1)
Expand Down
12 changes: 8 additions & 4 deletions redis/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,10 @@ pub enum ErrorKind {
EmptySentinelList,
/// Attempted to kill a script/function while they werent' executing
NotBusy,
/// Used when a cluster connection cannot find a connection to a valid node.
ClusterConnectionNotFound,
/// Used when no valid node connections remain in the cluster connection
AllConnectionsUnavailable,
/// Used when a connection is not found for the specified route.
ConnectionNotFoundForRoute,

#[cfg(feature = "json")]
/// Error Serializing a struct to JSON form
Expand Down Expand Up @@ -875,7 +877,8 @@ impl RedisError {
ErrorKind::NoValidReplicasFoundBySentinel => "no valid replicas found by sentinel",
ErrorKind::EmptySentinelList => "empty sentinel list",
ErrorKind::NotBusy => "not busy",
ErrorKind::ClusterConnectionNotFound => "connection to node in cluster not found",
ErrorKind::AllConnectionsUnavailable => "no valid connections remain in the cluster",
ErrorKind::ConnectionNotFoundForRoute => "No connection found for the requested route",
#[cfg(feature = "json")]
ErrorKind::Serialize => "serializing",
ErrorKind::RESP3NotSupported => "resp3 is not supported by server",
Expand Down Expand Up @@ -1046,7 +1049,8 @@ impl RedisError {

ErrorKind::ParseError => RetryMethod::Reconnect,
ErrorKind::AuthenticationFailed => RetryMethod::Reconnect,
ErrorKind::ClusterConnectionNotFound => RetryMethod::Reconnect,
ErrorKind::AllConnectionsUnavailable => RetryMethod::Reconnect,
ErrorKind::ConnectionNotFoundForRoute => RetryMethod::Reconnect,

ErrorKind::IoError => match &self.repr {
ErrorRepr::IoError(err) => match err.kind() {
Expand Down
6 changes: 3 additions & 3 deletions redis/tests/test_cluster_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2400,7 +2400,7 @@ mod cluster_async {
.block_on(cmd.query_async::<_, Vec<String>>(&mut connection))
.unwrap_err();
assert!(
matches!(result.kind(), ErrorKind::ClusterConnectionNotFound)
matches!(result.kind(), ErrorKind::ConnectionNotFoundForRoute)
|| result.is_connection_dropped()
);
}
Expand Down Expand Up @@ -4031,7 +4031,7 @@ mod cluster_async {
handler: _handler,
..
} = MockEnv::with_client_builder(
ClusterClient::builder(vec![&*format!("redis://{name}")]),
ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(1),
name,
move |received_cmd: &[u8], _| {
let slots_config_vec = vec![
Expand Down Expand Up @@ -4071,7 +4071,7 @@ mod cluster_async {
let res_err = res.unwrap_err();
assert_eq!(
res_err.kind(),
ErrorKind::ClusterConnectionNotFound,
ErrorKind::ConnectionNotFoundForRoute,
"{:?}",
res_err
);
Expand Down
26 changes: 22 additions & 4 deletions redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,12 @@ mod test_cluster_scan_async {

#[tokio::test] // test cluster scan with node fail in the middle
async fn test_async_cluster_scan_with_fail() {
let cluster = TestClusterContext::new(3, 0);
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(1),
false,
);
let mut connection = cluster.async_connection(None).await;
// Set some keys
for i in 0..1000 {
Expand Down Expand Up @@ -224,7 +229,11 @@ mod test_cluster_scan_async {
let cluster = TestClusterContext::new_with_cluster_client_builder(
6,
1,
|builder| builder.slots_refresh_rate_limit(Duration::from_secs(0), 0),
|builder| {
builder
.slots_refresh_rate_limit(Duration::from_secs(0), 0)
.retries(1)
},
false,
);

Expand Down Expand Up @@ -374,7 +383,11 @@ mod test_cluster_scan_async {
let cluster = TestClusterContext::new_with_cluster_client_builder(
6,
1,
|builder| builder.slots_refresh_rate_limit(Duration::from_secs(0), 0),
|builder| {
builder
.slots_refresh_rate_limit(Duration::from_secs(0), 0)
.retries(1)
},
false,
);

Expand Down Expand Up @@ -772,7 +785,12 @@ mod test_cluster_scan_async {
// Testing cluster scan when connection fails in the middle and we get an error
// then cluster up again and scanning can continue without any problem
async fn test_async_cluster_scan_failover() {
let mut cluster = TestClusterContext::new(3, 0);
let mut cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(1),
false,
);
let mut connection = cluster.async_connection(None).await;
let mut i = 0;
loop {
Expand Down

0 comments on commit 2d7200f

Please sign in to comment.