From da80ad44109e611cdf568cd811dffe89a3275d18 Mon Sep 17 00:00:00 2001 From: avifenesh Date: Tue, 25 Jun 2024 14:12:56 +0000 Subject: [PATCH] chenged logic to use internal existing function --- redis/src/cluster_async/mod.rs | 46 ++++++++----------- redis/src/commands/cluster_scan.rs | 67 +++++++++++---------------- redis/tests/test_cluster_scan.rs | 74 ++++++++++++++++++++++++++---- 3 files changed, 113 insertions(+), 74 deletions(-) diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 112d97fd36..d0947cf151 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -310,7 +310,7 @@ pub(crate) struct InnerCore { pub(crate) conn_lock: RwLock>, cluster_params: ClusterParams, pending_requests: Mutex>>, - pub(crate) slot_refresh_in_progress: AtomicBool, + slot_refresh_in_progress: AtomicBool, initial_nodes: Vec, push_sender: Option>, subscriptions_by_address: RwLock>, @@ -383,20 +383,6 @@ where .slot_map .get_slots_of_node(node_address) } - - // Route command to the given address - pub(crate) async fn route_command_inner( - core: Arc>, - cmd: Cmd, - address: &str, - ) -> RedisResult { - let mut node_conn = ClusterConnInner::get_connection( - InternalSingleNodeRouting::ByAddress(address.to_string()), - core, - ) - .await?; - node_conn.1.req_packed_command(&cmd).await - } } pub(crate) struct ClusterConnInner { @@ -416,7 +402,7 @@ impl Dispose for ClusterConnInner { } #[derive(Clone)] -enum InternalRoutingInfo { +pub(crate) enum InternalRoutingInfo { SingleNode(InternalSingleNodeRouting), MultiNode((MultipleNodeRoutingInfo, Option)), } @@ -441,7 +427,7 @@ impl From> for InternalRoutingInfo { } #[derive(Clone)] -enum InternalSingleNodeRouting { +pub(crate) enum InternalSingleNodeRouting { Random, SpecificNode(Route), ByAddress(String), @@ -536,13 +522,13 @@ fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> { return Box::pin(async_std::task::sleep(duration)); } -enum Response { +pub(crate) enum Response { Single(Value), ClusterScanResult(ScanStateRC, Vec), Multiple(Vec), } -enum OperationTarget { +pub(crate) enum OperationTarget { Node { address: ArcStr }, FanOut, NotFound, @@ -754,6 +740,7 @@ impl Future for Request { return Next::Done.into(); } OperationTarget::NotFound => { + println!("Request error `{}` not found", err); // TODO - this is essentially a repeat of the retirable error. probably can remove duplication. let mut request = this.request.take().unwrap(); request.info.reset_redirect(); @@ -1040,7 +1027,7 @@ where } } - pub(crate) async fn refresh_connections( + async fn refresh_connections( inner: Arc>, addresses: Vec, conn_type: RefreshConnectionType, @@ -1165,7 +1152,7 @@ where } // Query a node to discover slot-> master mappings with retries - pub(crate) async fn refresh_slots_and_subscriptions_with_retries( + async fn refresh_slots_and_subscriptions_with_retries( inner: Arc>, ) -> RedisResult<()> { if inner @@ -1196,6 +1183,15 @@ where res } + pub(crate) async fn check_topology_and_refresh_if_diff(inner: Arc>) -> bool { + if Self::check_for_topology_diff(inner.clone()).await { + let _ = Self::refresh_slots_and_subscriptions_with_retries(inner.clone()).await; + true + } else { + false + } + } + async fn periodic_topology_check( inner: Arc>, interval_duration: Duration, @@ -1207,9 +1203,7 @@ where } let _ = boxed_sleep(interval_duration).await; - if Self::check_for_topology_diff(inner.clone()).await { - let _ = Self::refresh_slots_and_subscriptions_with_retries(inner.clone()).await; - } else { + if !Self::check_topology_and_refresh_if_diff(inner.clone()).await { Self::refresh_pubsub_subscriptions(inner.clone()).await; } } @@ -1337,7 +1331,7 @@ where false } - pub(crate) async fn refresh_slots( + async fn refresh_slots( inner: Arc>, curr_retry: usize, ) -> Result<(), BackoffError> { @@ -1552,7 +1546,7 @@ where .map_err(|err| (OperationTarget::FanOut, err)) } - async fn try_cmd_request( + pub(crate) async fn try_cmd_request( cmd: Arc, routing: InternalRoutingInfo, core: Core, diff --git a/redis/src/commands/cluster_scan.rs b/redis/src/commands/cluster_scan.rs index 7689e20a26..6b1a18ec83 100644 --- a/redis/src/commands/cluster_scan.rs +++ b/redis/src/commands/cluster_scan.rs @@ -1,5 +1,7 @@ use crate::aio::ConnectionLike; -use crate::cluster_async::{ClusterConnInner, Connect, Core, InnerCore}; +use crate::cluster_async::{ + ClusterConnInner, Connect, Core, InternalRoutingInfo, InternalSingleNodeRouting, Response, +}; use crate::cluster_routing::SlotAddr; use crate::cluster_topology::SLOT_SIZE; use crate::{cmd, from_redis_value, Cmd, ErrorKind, RedisError, RedisResult, Value}; @@ -138,17 +140,14 @@ pub(crate) trait ClusterInScan { /// Retrieves the slots assigned to a given address in the cluster. async fn get_slots_of_address(&self, address: &str) -> Vec; - /// Refreshes the slot mapping of the cluster. - async fn refresh_slots(&self) -> RedisResult<()>; - /// Routes a Redis command to a specific address in the cluster. - async fn route_command(&self, cmd: &Cmd, address: &str) -> RedisResult; + async fn route_command(&self, cmd: Cmd, address: &str) -> RedisResult; /// Check if all slots are covered by the cluster async fn are_all_slots_covered(&self) -> bool; /// Check if the topology of the cluster has changed and refresh the slots if needed - async fn refresh_if_topology_changed(&self) -> RedisResult<()>; + async fn refresh_if_topology_changed(&self); } /// Represents the state of a scan operation in a Redis cluster. @@ -366,23 +365,27 @@ where async fn get_slots_of_address(&self, address: &str) -> Vec { self.as_ref().get_slots_of_address(address).await } - // Refresh the topology of the cluster - async fn refresh_slots(&self) -> RedisResult<()> { - ClusterConnInner::refresh_slots_and_subscriptions_with_retries(self.to_owned()).await - } - async fn route_command(&self, cmd: &Cmd, address: &str) -> RedisResult { + async fn route_command(&self, cmd: Cmd, address: &str) -> RedisResult { + let routing = InternalRoutingInfo::SingleNode(InternalSingleNodeRouting::ByAddress( + address.to_string(), + )); let core = self.to_owned(); - InnerCore::route_command_inner(core, cmd.clone(), address).await + let response = ClusterConnInner::::try_cmd_request(Arc::new(cmd), routing, core) + .await + .map_err(|err| err.1)?; + match response { + Response::Single(value) => Ok(value), + _ => Err(RedisError::from(( + ErrorKind::ResponseError, + "Expected single response, got unexpected response", + ))), + } } async fn are_all_slots_covered(&self) -> bool { ClusterConnInner::::check_if_all_slots_covered(&self.conn_lock.read().await.slot_map) } - async fn refresh_if_topology_changed(&self) -> RedisResult<()> { - if ClusterConnInner::check_for_topology_diff(self.to_owned()).await { - self.refresh_slots().await - } else { - Ok(()) - } + async fn refresh_if_topology_changed(&self) { + ClusterConnInner::check_topology_and_refresh_if_diff(self.to_owned()).await; } } @@ -440,19 +443,6 @@ where retry_scan(&scan_state, &core, match_pattern, count, object_type).await?; (from_redis_value(&retry.0?)?, retry.1) } - ErrorKind::BusyLoadingError | ErrorKind::TryAgain => ( - from_redis_value( - &send_scan( - &scan_state, - &core, - match_pattern.clone(), - count, - object_type.clone(), - ) - .await?, - )?, - scan_state.clone(), - ), _ => return Err(err), }, }; @@ -502,7 +492,8 @@ where if let Some(object_type) = object_type { scan_command.arg("TYPE").arg(object_type.to_string()); } - core.route_command(&scan_command, &scan_state.address_in_scan) + + core.route_command(scan_command, &scan_state.address_in_scan) .await } @@ -521,8 +512,7 @@ async fn retry_scan( where C: ClusterInScan, { - core.refresh_slots().await?; - + core.refresh_if_topology_changed().await; if !core.are_all_slots_covered().await { return Err(RedisError::from(( ErrorKind::NotAllSlotsCovered, @@ -604,8 +594,8 @@ mod tests { struct MockConnection; #[async_trait] impl ClusterInScan for MockConnection { - async fn refresh_if_topology_changed(&self) -> RedisResult<()> { - Ok(()) + async fn refresh_if_topology_changed(&self) { + (); } async fn get_address_by_slot(&self, _slot: u16) -> RedisResult { Ok("mock_address".to_string()) @@ -620,10 +610,7 @@ mod tests { vec![0, 1, 2] } } - async fn refresh_slots(&self) -> RedisResult<()> { - Ok(()) - } - async fn route_command(&self, _: &Cmd, _: &str) -> RedisResult { + async fn route_command(&self, _: Cmd, _: &str) -> RedisResult { unimplemented!() } async fn are_all_slots_covered(&self) -> bool { diff --git a/redis/tests/test_cluster_scan.rs b/redis/tests/test_cluster_scan.rs index fc65681ec2..b76bd2098c 100644 --- a/redis/tests/test_cluster_scan.rs +++ b/redis/tests/test_cluster_scan.rs @@ -215,14 +215,8 @@ mod test_cluster_scan_async { assert_ne!(slot_distribution, new_slot_distribution); } } - - // We expect an error of finding address covering slot or not all slots are covered - // Both errors message contains "please check the cluster configuration" - let res = result - .unwrap_err() - .to_string() - .contains("please check the cluster configuration"); - assert!(res); + // We expect an error of finding address + assert!(result.is_err()); } #[tokio::test] // Test cluster scan with killing all masters during scan @@ -765,4 +759,68 @@ mod test_cluster_scan_async { } assert!(keys.len() == expected_keys.len()); } + + #[tokio::test] + // Testing cluster scan when connection fails in the middle and we get an error + // then cluster up again and scanning can continue without any problem + async fn test_async_cluster_scan_failover() { + let mut cluster = TestClusterContext::new(3, 0); + let mut connection = cluster.async_connection(None).await; + let mut i = 0; + loop { + let key = format!("key{}", i); + let _: Result<(), redis::RedisError> = redis::cmd("SET") + .arg(&key) + .arg("value") + .query_async(&mut connection) + .await; + i += 1; + if i == 1000 { + break; + } + } + let mut scan_state_rc = ScanStateRC::new(); + let mut keys: Vec = Vec::new(); + let mut count = 0; + loop { + count += 1; + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, None, None, None) + .await; + if scan_response.is_err() { + println!("error: {:?}", scan_response); + } + let (next_cursor, scan_keys) = scan_response.unwrap(); + scan_state_rc = next_cursor; + keys.extend(scan_keys.into_iter().map(|v| from_redis_value(&v).unwrap())); + if scan_state_rc.is_finished() { + break; + } + if count == 5 { + drop(cluster); + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc.clone(), None, None, None) + .await; + assert!(scan_response.is_err()); + break; + }; + } + cluster = TestClusterContext::new(3, 0); + connection = cluster.async_connection(None).await; + loop { + let scan_response: RedisResult<(ScanStateRC, Vec)> = connection + .cluster_scan(scan_state_rc, None, None, None) + .await; + if scan_response.is_err() { + println!("error: {:?}", scan_response); + } + let (next_cursor, scan_keys) = scan_response.unwrap(); + scan_state_rc = next_cursor; + keys.extend(scan_keys.into_iter().map(|v| from_redis_value(&v).unwrap())); + if scan_state_rc.is_finished() { + assert!(true); + break; + } + } + } }