Skip to content

Commit

Permalink
test: add del slots helper
Browse files Browse the repository at this point in the history
Signed-off-by: jhpung <[email protected]>
  • Loading branch information
jhpung committed Dec 21, 2024
1 parent 3e97a0b commit 992e71c
Showing 1 changed file with 57 additions and 26 deletions.
83 changes: 57 additions & 26 deletions glide-core/redis-rs/redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,61 @@ mod test_cluster_scan_async {
cmd, from_redis_value, ClusterScanArgs, ObjectType, RedisResult, ScanStateRC, Value,
};
use std::time::Duration;
use tokio::time::{sleep, Instant};

async fn del_slots_range(
cluster: &TestClusterContext,
range: (u16, u16),
) -> Result<(), &'static str> {
let mut cluster_conn = cluster.async_connection(None).await;
let mut del_slots_cmd = cmd("CLUSTER");
let (start, end) = range;
del_slots_cmd.arg("DELSLOTSRANGE").arg(start).arg(end);
let _: RedisResult<Value> = cluster_conn
.route_command(
&del_slots_cmd,
RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
Some(ResponsePolicy::AllSucceeded),
)),
)
.await;

let timeout = Duration::from_secs(10);
let mut invalid = false;
loop {
sleep(Duration::from_millis(500)).await;

let now = Instant::now();
if now.elapsed() > timeout {
return Err("Timeout while waiting for slots to be deleted");
}

let slot_distribution =
cluster.get_slots_ranges_distribution(&cluster.get_cluster_nodes().await);
for (_, _, _, slot_ranges) in slot_distribution {
println!("slot_ranges: {:?}", slot_ranges);
for slot_range in slot_ranges {
let (slot_start, slot_end) = (slot_range[0], slot_range[1]);

println!("slot_start: {}, slot_end: {}", slot_start, slot_end);
if slot_start >= start && slot_start <= end {
invalid = true;
continue;
}
if slot_end >= start && slot_end <= end {
invalid = true;
continue;
}
}
}

if invalid {
continue;
}
return Ok(());
}
}

async fn kill_one_node(
cluster: &TestClusterContext,
Expand Down Expand Up @@ -148,7 +203,7 @@ mod test_cluster_scan_async {

#[tokio::test]
#[serial_test::serial]
async fn test_async_cluster_scan_with_few_delslots() {
async fn test_async_cluster_scan_with_delslots() {
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
Expand All @@ -168,31 +223,7 @@ mod test_cluster_scan_async {
expected_keys.push(key);
}

let mut delslots_cmd = cmd("CLUSTER");
// delete a few slots
delslots_cmd.arg("DELSLOTSRANGE").arg("1").arg("100");
let _: RedisResult<Value> = connection
.route_command(
&delslots_cmd,
RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
Some(ResponsePolicy::AllSucceeded),
)),
)
.await;

// Wait for the slots to be refreshed
tokio::time::sleep(Duration::from_secs(5)).await;

let cluster_nodes = cluster.get_cluster_nodes().await;
let slot_distribution = cluster.get_slots_ranges_distribution(&cluster_nodes);

for (_, _, _, slot_ranges) in slot_distribution {
for slot_range in slot_ranges {
let (start, end) = (slot_range[0], slot_range[1]);
assert!(start > 100 || end < 1);
}
}
del_slots_range(&cluster, (1, 100)).await.unwrap();

let mut scan_state_rc = ScanStateRC::new();
let mut keys: Vec<String> = Vec::new();
Expand Down

0 comments on commit 992e71c

Please sign in to comment.