Skip to content

Commit

Permalink
Core: Add AzAffinityReplicasAndPrimary Read Strategy (#2986)
Browse files Browse the repository at this point in the history
Core: Implement AzAffinityReplicasAndPrimary Read Strategy

Signed-off-by: Muhammad Awawdi <[email protected]>
  • Loading branch information
Muhammad-awawdi-amazon authored Feb 4, 2025
1 parent a296703 commit 3eca0f4
Show file tree
Hide file tree
Showing 15 changed files with 469 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#### Changes

* Core: Add support to AzAffinityReplicasAndPrimary read strategy ([#2792](https://github.com/valkey-io/valkey-glide/pull/2792))
* Go: Add `HScan` command ([#2917](https://github.com/valkey-io/valkey-glide/pull/2917))
* Java, Node, Python: Add transaction commands for JSON module ([#2862](https://github.com/valkey-io/valkey-glide/pull/2862))
* Go: Add HINCRBY command ([#2847](https://github.com/valkey-io/valkey-glide/pull/2847))
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/aio/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ where
async fn setup_connection<C>(
connection_info: &RedisConnectionInfo,
con: &mut C,
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity.
// This parameter is set to 'true' if ReadFromReplica strategy is set to AZAffinity or AZAffinityReplicasAndPrimary.
// An INFO command will be triggered in the connection's setup to update the 'availability_zone' property.
discover_az: bool,
) -> RedisResult<()>
Expand Down
2 changes: 1 addition & 1 deletion glide-core/redis-rs/redis/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ pub struct GlideConnectionOptions {
#[cfg(feature = "aio")]
/// Passive disconnect notifier
pub disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
/// If ReadFromReplica strategy is set to AZAffinity, this parameter will be set to 'true'.
/// If ReadFromReplica strategy is set to AZAffinity or AZAffinityReplicasAndPrimary, this parameter will be set to 'true'.
/// In this case, an INFO command will be triggered in the connection's setup to update the connection's 'availability_zone' property.
pub discover_az: bool,
/// Connection timeout duration.
Expand Down
192 changes: 183 additions & 9 deletions glide-core/redis-rs/redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,36 @@ where
&self,
slot_map_value: &SlotMapValue,
client_az: String,
) -> Option<ConnectionAndAddress<Connection>> {
self.get_connection_by_az_affinity_strategy(slot_map_value, client_az, false)
}

/// Returns the node's connection in the same availability zone as `client_az`,
/// checking replicas first, then primary, and falling back to any available node.
pub(crate) fn round_robin_read_from_replica_with_az_awareness_replicas_and_primary(
&self,
slot_map_value: &SlotMapValue,
client_az: String,
) -> Option<ConnectionAndAddress<Connection>> {
self.get_connection_by_az_affinity_strategy(slot_map_value, client_az, true)
}

fn get_connection_by_az_affinity_strategy(
&self,
slot_map_value: &SlotMapValue,
client_az: String,
check_primary: bool, // Strategy flag
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value.last_used_replica.load(Ordering::Relaxed);
let mut retries = 0usize;

// Step 1: Try to find a replica in the same AZ
loop {
retries = retries.saturating_add(1);
// Looped through all replicas; no connected replica found in the same availability zone.
if retries > addrs.replicas().len() {
// Attempt a fallback to any available replica or primary if needed.
return self.round_robin_read_from_replica(slot_map_value);
break;
}

// Calculate index based on initial index and check count.
Expand All @@ -286,6 +305,20 @@ where
}
}
}

// Step 2: Check if primary is in the same AZ
if check_primary {
if let Some((address, connection_details)) =
self.connection_details_for_address(addrs.primary().as_str())
{
if self.az_for_address(&address) == Some(client_az) {
return Some((address, connection_details.conn));
}
}
}

// Step 3: Fall back to any available replica using round-robin or primary if needed
self.round_robin_read_from_replica(slot_map_value)
}

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
Expand All @@ -311,6 +344,11 @@ where
slot_map_value,
az.to_string(),
),
ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(az) => self
.round_robin_read_from_replica_with_az_awareness_replicas_and_primary(
slot_map_value,
az.to_string(),
),
},
// when the user strategy per command is replica_preffered
SlotAddr::ReplicaRequired => match &self.read_from_replica_strategy {
Expand All @@ -319,6 +357,11 @@ where
slot_map_value,
az.to_string(),
),
ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(az) => self
.round_robin_read_from_replica_with_az_awareness_replicas_and_primary(
slot_map_value,
az.to_string(),
),
_ => self.round_robin_read_from_replica(slot_map_value),
},
}
Expand Down Expand Up @@ -510,6 +553,7 @@ mod tests {

fn create_container_with_az_strategy(
use_management_connections: bool,
strategy: Option<ReadFromReplicaStrategy>,
) -> ConnectionsContainer<usize> {
let slot_map = SlotMap::new(
vec![
Expand Down Expand Up @@ -562,15 +606,12 @@ mod tests {
"replica3-3".into(),
create_cluster_node(33, use_management_connections, Some("use-1a".to_string())),
);
connection_map.insert(
"replica3-3".into(),
create_cluster_node(33, use_management_connections, Some("use-1a".to_string())),
);

ConnectionsContainer {
slot_map,
connection_map,
read_from_replica_strategy: ReadFromReplicaStrategy::AZAffinity("use-1a".to_string()),
read_from_replica_strategy: strategy
.unwrap_or(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())),
topology_hash: 0,
}
}
Expand Down Expand Up @@ -801,7 +842,10 @@ mod tests {

#[test]
fn get_connection_for_az_affinity_route() {
let container = create_container_with_az_strategy(false);
let container = create_container_with_az_strategy(
false,
Some(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())),
);

// slot number is not exits
assert!(container
Expand Down Expand Up @@ -864,7 +908,10 @@ mod tests {

#[test]
fn get_connection_for_az_affinity_route_round_robin() {
let container = create_container_with_az_strategy(false);
let container = create_container_with_az_strategy(
false,
Some(ReadFromReplicaStrategy::AZAffinity("use-1a".to_string())),
);

let mut addresses = vec![
container
Expand All @@ -888,6 +935,133 @@ mod tests {
assert_eq!(addresses, vec![31, 31, 33, 33]);
}

#[test]
fn get_connection_for_az_affinity_replicas_and_primary_route() {
// Create a container with AZAffinityReplicasAndPrimary strategy
let container: ConnectionsContainer<usize> = create_container_with_az_strategy(
false,
Some(ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(
"use-1a".to_string(),
)),
);
// Modify the AZ of primary1
container
.connection_map
.get_mut("primary1")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());

// Modify the AZ of primary2
container
.connection_map
.get_mut("primary2")
.unwrap()
.user_connection
.az = Some("use-1c".to_string());

// Modify the AZ of primary3
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());

// Modify the AZ of replica2-1
container
.connection_map
.get_mut("replica2-1")
.unwrap()
.user_connection
.az = Some("use-1c".to_string());

// Slot number does not exist (slot 1001 wasn't assigned to any primary)
assert!(container
.connection_for_route(&Route::new(1001, SlotAddr::ReplicaOptional))
.is_none());

// Test getting replica in client's AZ for slot 2001
assert!(one_of(
container.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired)),
&[31, 33],
));

// Remove one replica in the client's AZ
remove_nodes(&container, &["replica3-3"]);

// Should still get the remaining replica in the client's AZ
assert_eq!(
31,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Remove all replicas in the client's AZ
remove_nodes(&container, &["replica3-1"]);

// Test falling back to replica in different AZ
assert_eq!(
32,
container
.connection_for_route(&Route::new(2001, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Set the primary to be in the client's AZ
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1a".to_string());

// Remove the last replica
remove_nodes(&container, &["replica3-2"]);

// Should now fall back to the primary in the client's AZ
assert_eq!(
3,
container
.connection_for_route(&Route::new(2001, SlotAddr::Master))
.unwrap()
.1
);

// Move the primary out of the client's AZ
container
.connection_map
.get_mut("primary3")
.unwrap()
.user_connection
.az = Some("use-1b".to_string());

// Test falling back to replica under different primary
assert_eq!(
21,
container
.connection_for_route(&Route::new(1002, SlotAddr::ReplicaRequired))
.unwrap()
.1
);

// Remove all replicas
remove_nodes(&container, &["replica2-1"]);

// Test falling back to available primaries with their respective slots
assert!(one_of(
container.connection_for_route(&Route::new(1002, SlotAddr::Master)),
&[2],
));
assert!(one_of(
container.connection_for_route(&Route::new(500, SlotAddr::Master)),
&[1],
));
}

#[test]
fn get_connection_by_address() {
let container = create_container();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ where
let discover_az = matches!(
params.read_from_replicas,
crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_)
| crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_)
);

match create_connection::<C>(
Expand Down
1 change: 1 addition & 0 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,7 @@ where
let discover_az = matches!(
cluster_params.read_from_replicas,
crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinity(_)
| crate::cluster_slotmap::ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_)
);

let glide_connection_options = GlideConnectionOptions {
Expand Down
2 changes: 2 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,8 @@ impl ClusterClientBuilder {
/// The parameter `read_strategy` can be one of:
/// `ReadFromReplicaStrategy::AZAffinity(availability_zone)` - attempt to access replicas in the same availability zone.
/// If no suitable replica is found (i.e. no replica could be found in the requested availability zone), choose any replica. Falling back to primary if needed.
/// `ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(availability_zone)` - attempt to access nodes in the same availability zone.
/// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.
/// `ReadFromReplicaStrategy::RoundRobin` - reads are distributed across replicas for load balancing using round-robin algorithm. Falling back to primary if needed.
/// `ReadFromReplicaStrategy::AlwaysFromPrimary` ensures all read and write queries are directed to the primary node.
///
Expand Down
4 changes: 4 additions & 0 deletions glide-core/redis-rs/redis/src/cluster_slotmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ pub enum ReadFromReplicaStrategy {
/// Spread the read requests between replicas in the same client's Aviliablity zone in a round robin manner,
/// falling back to other replicas or the primary if needed.
AZAffinity(String),
/// Spread the read requests among nodes within the client's Availability Zone (AZ) in a round robin manner,
/// prioritizing local replicas, then the local primary, and falling back to any replica or the primary if needed.
AZAffinityReplicasAndPrimary(String),
}

#[derive(Debug, Default)]
Expand Down Expand Up @@ -60,6 +63,7 @@ fn get_address_from_slot(
addrs.replicas()[index].clone()
}
ReadFromReplicaStrategy::AZAffinity(_az) => todo!(), // Drop sync client
ReadFromReplicaStrategy::AZAffinityReplicasAndPrimary(_az) => todo!(), // Drop sync client
}
}

Expand Down
Loading

0 comments on commit 3eca0f4

Please sign in to comment.