Skip to content

Commit

Permalink
changed check from refreshing slots to periodic check for more effici…
Browse files Browse the repository at this point in the history
…ent scan
  • Loading branch information
avifenesh committed Jun 25, 2024
1 parent b0ac611 commit 53cdc6f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
2 changes: 1 addition & 1 deletion redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1305,7 +1305,7 @@ where
/// Queries log2n nodes (where n represents the number of cluster nodes) to determine whether their
/// topology view differs from the one currently stored in the connection manager.
/// Returns true if change was detected, otherwise false.
async fn check_for_topology_diff(inner: Arc<InnerCore<C>>) -> bool {
pub(crate) async fn check_for_topology_diff(inner: Arc<InnerCore<C>>) -> bool {
let read_guard = inner.conn_lock.read().await;
let num_of_nodes: usize = read_guard.len();
// TODO: Starting from Rust V1.67, integers has logarithms support.
Expand Down
15 changes: 14 additions & 1 deletion redis/src/commands/cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ pub(crate) trait ClusterInScan {

/// Check if all slots are covered by the cluster
async fn are_all_slots_covered(&self) -> bool;

/// Check if the topology of the cluster has changed and refresh the slots if needed
async fn refresh_if_topology_changed(&self) -> RedisResult<()>;
}

/// Represents the state of a scan operation in a Redis cluster.
Expand Down Expand Up @@ -283,7 +286,7 @@ impl ScanState {
&mut self,
connection: &C,
) -> RedisResult<ScanState> {
let _ = connection.refresh_slots().await;
let _ = connection.refresh_if_topology_changed().await;
let mut scanned_slots_map = self.scanned_slots_map;
// If the address epoch changed it mean that some slots in the address are new, so we cant know which slots been there from the beginning and which are new, or out and in later.
// In this case we will skip updating the scanned_slots_map and will just update the address and the cursor
Expand Down Expand Up @@ -374,6 +377,13 @@ where
async fn are_all_slots_covered(&self) -> bool {
ClusterConnInner::<C>::check_if_all_slots_covered(&self.conn_lock.read().await.slot_map)
}
async fn refresh_if_topology_changed(&self) -> RedisResult<()> {
if ClusterConnInner::check_for_topology_diff(self.to_owned()).await {
self.refresh_slots().await
} else {
Ok(())
}
}
}

/// Perform a cluster scan operation.
Expand Down Expand Up @@ -594,6 +604,9 @@ mod tests {
struct MockConnection;
#[async_trait]
impl ClusterInScan for MockConnection {
async fn refresh_if_topology_changed(&self) -> RedisResult<()> {
Ok(())
}
async fn get_address_by_slot(&self, _slot: u16) -> RedisResult<String> {
Ok("mock_address".to_string())
}
Expand Down

0 comments on commit 53cdc6f

Please sign in to comment.