Skip to content

Commit

Permalink
Refactor slot scanning logic and improve test coverage for cluster sc…
Browse files Browse the repository at this point in the history
…an functionality

Signed-off-by: avifenesh <[email protected]>
  • Loading branch information
avifenesh committed Dec 22, 2024
1 parent 4fc7997 commit 0bf56e3
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 76 deletions.
49 changes: 23 additions & 26 deletions glide-core/redis-rs/redis/src/commands/cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,25 +418,6 @@ impl ScanState {
}
}

/// Get the next slot to be scanned based on the scanned slots map.
/// If all slots have been scanned, the method returns [`END_OF_SCAN`].
fn next_slot(&self, scanned_slots_map: &SlotsBitsArray) -> Option<u16> {
let all_slots_scanned = scanned_slots_map.iter().all(|&word| word == u64::MAX);
if all_slots_scanned {
return Some(END_OF_SCAN);
}
for (i, slot) in scanned_slots_map.iter().enumerate() {
let mut mask = 1;
for j in 0..BITS_PER_U64 {
if (slot & mask) == 0 {
return Some(i as u16 * BITS_PER_U64 + j);
}
mask <<= 1;
}
}
None
}

/// Update the scan state without updating the scanned slots map.
/// This method is used when the address epoch has changed, and we can't determine which slots are new.
/// In this case, we skip updating the scanned slots map and only update the address and cursor.
Expand All @@ -454,7 +435,7 @@ impl ScanState {
// meaning that we could safely update the scanned slots map with the slots owned by the node.
// Epoch change means that some slots are new, and we can't determine which slots been there from the beginning and which are new.
let mut scanned_slots_map = new_scanned_slots_map.unwrap_or(self.scanned_slots_map);
let next_slot = self.next_slot(&scanned_slots_map).unwrap_or(0);
let next_slot = next_slot(&scanned_slots_map).unwrap_or(0);
match next_address_to_scan(
&core,
next_slot,
Expand Down Expand Up @@ -583,8 +564,7 @@ where
} else if allow_non_covered_slots {
// Mark the current slot as scanned
mark_slot_as_scanned(scanned_slots_map, slot);
// Move to the next slot
slot = slot.saturating_add(1);
slot = next_slot(scanned_slots_map).unwrap();
} else {
// Error if slots are not covered and scanning is not allowed
return Err(RedisError::from((
Expand All @@ -597,6 +577,25 @@ where
}
}

/// Get the next slot to be scanned based on the scanned slots map.
/// If all slots have been scanned, the method returns [`END_OF_SCAN`].
fn next_slot(scanned_slots_map: &SlotsBitsArray) -> Option<u16> {
let all_slots_scanned = scanned_slots_map.iter().all(|&word| word == u64::MAX);
if all_slots_scanned {
return Some(END_OF_SCAN);
}
for (i, slot) in scanned_slots_map.iter().enumerate() {
let mut mask = 1;
for j in 0..BITS_PER_U64 {
if (slot & mask) == 0 {
return Some(i as u16 * BITS_PER_U64 + j);
}
mask <<= 1;
}
}
None
}

/// Performs a cluster-wide `SCAN` operation.
///
/// This function scans the cluster for keys based on the provided arguments.
Expand Down Expand Up @@ -747,9 +746,7 @@ async fn next_scan_state<C>(
where
C: ConnectionLike + Connect + Clone + Send + Sync + 'static,
{
let next_slot = scan_state
.next_slot(&scan_state.scanned_slots_map)
.unwrap_or(0);
let next_slot = next_slot(&scan_state.scanned_slots_map).unwrap_or(0);
let mut scanned_slots_map = scan_state.scanned_slots_map;
match next_address_to_scan(
core,
Expand Down Expand Up @@ -877,7 +874,7 @@ mod tests {
1,
ScanStateStage::InProgress,
);
let next_slot = scan_state.next_slot(&[0; BITS_ARRAY_SIZE as usize]);
let next_slot = next_slot(&scan_state.scanned_slots_map);

assert_eq!(next_slot, Some(0));
}
Expand Down
154 changes: 154 additions & 0 deletions glide-core/redis-rs/redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,61 @@ mod test_cluster_scan_async {
cmd, from_redis_value, ClusterScanArgs, ObjectType, RedisResult, ScanStateRC, Value,
};
use std::time::Duration;
use tokio::time::{sleep, Instant};

async fn del_slots_range(
cluster: &TestClusterContext,
range: (u16, u16),
) -> Result<(), &'static str> {
let mut cluster_conn = cluster.async_connection(None).await;
let mut del_slots_cmd = cmd("CLUSTER");
let (start, end) = range;
del_slots_cmd.arg("DELSLOTSRANGE").arg(start).arg(end);
let _: RedisResult<Value> = cluster_conn
.route_command(
&del_slots_cmd,
RoutingInfo::MultiNode((
MultipleNodeRoutingInfo::AllNodes,
Some(ResponsePolicy::AllSucceeded),
)),
)
.await;

let timeout = Duration::from_secs(10);
let mut invalid = false;
loop {
sleep(Duration::from_millis(500)).await;

let now = Instant::now();
if now.elapsed() > timeout {
return Err("Timeout while waiting for slots to be deleted");
}

let slot_distribution =
cluster.get_slots_ranges_distribution(&cluster.get_cluster_nodes().await);
for (_, _, _, slot_ranges) in slot_distribution {
println!("slot_ranges: {:?}", slot_ranges);
for slot_range in slot_ranges {
let (slot_start, slot_end) = (slot_range[0], slot_range[1]);

println!("slot_start: {}, slot_end: {}", slot_start, slot_end);
if slot_start >= start && slot_start <= end {
invalid = true;
continue;
}
if slot_end >= start && slot_end <= end {
invalid = true;
continue;
}
}
}

if invalid {
continue;
}
return Ok(());
}
}

async fn kill_one_node(
cluster: &TestClusterContext,
Expand Down Expand Up @@ -97,6 +152,105 @@ mod test_cluster_scan_async {
}
}

#[tokio::test]
#[serial_test::serial]
async fn test_async_cluster_scan_with_allow_non_covered_slots() {
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(1),
false,
);

let mut connection = cluster.async_connection(None).await;
let mut expected_keys: Vec<String> = Vec::new();

for i in 0..1000 {
let key = format!("key{}", i);
let _: Result<(), redis::RedisError> = redis::cmd("SET")
.arg(&key)
.arg("value")
.query_async(&mut connection)
.await;
expected_keys.push(key);
}

let mut scan_state_rc = ScanStateRC::new();
let mut keys: Vec<String> = Vec::new();
loop {
let cluster_scan_args = ClusterScanArgs::builder()
.allow_non_covered_slots(true)
.build();
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, cluster_scan_args)
.await
.unwrap();
scan_state_rc = next_cursor;
let mut scan_keys = scan_keys
.into_iter()
.map(|v| from_redis_value(&v).unwrap())
.collect::<Vec<String>>();
keys.append(&mut scan_keys);
if scan_state_rc.is_finished() {
break;
}
}

keys.sort();
expected_keys.sort();
assert_eq!(keys, expected_keys);
}

#[tokio::test]
#[serial_test::serial]
async fn test_async_cluster_scan_with_delslots() {
let cluster = TestClusterContext::new_with_cluster_client_builder(
3,
0,
|builder| builder.retries(1),
false,
);
let mut connection = cluster.async_connection(None).await;
let mut expected_keys: Vec<String> = Vec::new();

for i in 0..1000 {
let key = format!("key{}", i);
let _: Result<(), redis::RedisError> = redis::cmd("SET")
.arg(&key)
.arg("value")
.query_async(&mut connection)
.await;
expected_keys.push(key);
}

del_slots_range(&cluster, (1, 100)).await.unwrap();

let mut scan_state_rc = ScanStateRC::new();
let mut keys: Vec<String> = Vec::new();
loop {
let cluster_scan_args = ClusterScanArgs::builder()
.allow_non_covered_slots(true)
.build();
let (next_cursor, scan_keys): (ScanStateRC, Vec<Value>) = connection
.cluster_scan(scan_state_rc, cluster_scan_args)
.await
.unwrap();
scan_state_rc = next_cursor;
let mut scan_keys = scan_keys
.into_iter()
.map(|v| from_redis_value(&v).unwrap())
.collect::<Vec<String>>();
keys.append(&mut scan_keys);
if scan_state_rc.is_finished() {
break;
}
}

keys.sort();
expected_keys.sort();
assert_eq!(keys, expected_keys);
}

#[tokio::test]
#[serial_test::serial] // test cluster scan with slot migration in the middle
async fn test_async_cluster_scan_with_migration() {
Expand Down
77 changes: 27 additions & 50 deletions python/python/tests/test_scan.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@

import asyncio
from typing import AsyncGenerator, List, cast

import pytest
from glide import ByAddressRoute
from glide import AllNodes, AllPrimaries, ByAddressRoute
from glide.async_commands.command_args import ObjectType
from glide.config import ProtocolVersion
from glide.exceptions import RequestError
Expand All @@ -17,40 +18,18 @@
async def is_cluster_ready(client: GlideClusterClient, count: int) -> bool:
# we allow max 20 seconds to get the nodes
timeout = 20
start_time = asyncio.get_event_loop().time()

start = asyncio.get_event_loop().time()
while True:
if asyncio.get_event_loop().time() - start_time > timeout:
if asyncio.get_event_loop().time() - start > timeout:
return False

cluster_info = await client.custom_command(["CLUSTER", "INFO"])
cluster_info_map = {}

if cluster_info:
info_str = (
cluster_info
if isinstance(cluster_info, str)
else (
cluster_info.decode()
if isinstance(cluster_info, bytes)
else str(cluster_info)
)
)
cluster_info_lines = info_str.split("\n")
cluster_info_lines = [line for line in cluster_info_lines if line]

for line in cluster_info_lines:
key, value = line.split(":")
cluster_info_map[key.strip()] = value.strip()

if (
cluster_info_map.get("cluster_state") == "ok"
and int(cluster_info_map.get("cluster_known_nodes", "0")) == count
):
break

await asyncio.sleep(2)

nodes_raw = await client.custom_command(["CLUSTER", "NODES"])
node_bytes_raw = cast(bytes, nodes_raw)
parsed_nodes = [
line for line in node_bytes_raw.decode().split("\n") if line.strip()
]
if len(parsed_nodes) == count:
break
await asyncio.sleep(1)
return True


Expand Down Expand Up @@ -85,6 +64,7 @@ async def glide_client_scoped(
True,
valkey_cluster=function_scoped_cluster,
protocol=protocol,
timeout=100,
)
assert isinstance(client, GlideClusterClient)
yield client
Expand Down Expand Up @@ -339,8 +319,11 @@ async def test_cluster_scan_non_covered_slots(
glide_client_scoped: GlideClusterClient,
):
key = get_random_string(10)
for i in range(1000):
await glide_client_scoped.set(f"{key}:{i}", "value")
for i in range(10000):
try:
await glide_client_scoped.set(f"{key}:{i}", "value")
except RequestError as e:
continue
cursor = ClusterScanCursor()
result = await glide_client_scoped.scan(cursor)
cursor = cast(ClusterScanCursor, result[0])
Expand All @@ -360,22 +343,16 @@ async def test_cluster_scan_non_covered_slots(
)
# now we let it few seconds gossip to get the new cluster configuration
await is_cluster_ready(glide_client_scoped, len(all_other_addresses))
# Iterate scan until error is returned, as it might take time for the inner core to forget the missing node
cursor = ClusterScanCursor()
while True:
try:
while not cursor.is_finished():
result = await glide_client_scoped.scan(cursor)
cursor = cast(ClusterScanCursor, result[0])
# Reset cursor for next iteration
cursor = ClusterScanCursor()
except RequestError as e_info:
assert (
"Could not find an address covering a slot, SCAN operation cannot continue"
in str(e_info)
)
break
# Iterate scan to get missing slots error
cursor = ClusterScanCursor()
with pytest.raises(RequestError) as e_info:
while not cursor.is_finished():
result = await glide_client_scoped.scan(cursor)
cursor = cast(ClusterScanCursor, result[0])
assert (
"Could not find an address covering a slot, SCAN operation cannot continue"
in str(e_info.value)
)
# Scan with allow_non_covered_slots=True
while not cursor.is_finished():
result = await glide_client_scoped.scan(
Expand Down

0 comments on commit 0bf56e3

Please sign in to comment.