Skip to content

Commit

Permalink
addresed comments
Browse files Browse the repository at this point in the history
Signed-off-by: avifenesh <[email protected]>
  • Loading branch information
avifenesh committed Dec 18, 2024
1 parent 29024eb commit 84adcfe
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 25 deletions.
51 changes: 26 additions & 25 deletions glide-core/redis-rs/redis/src/commands/cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,13 @@ pub struct ClusterScanArgs {
/// Optional pattern to match keys during the scan.
pub match_pattern: Option<Vec<u8>>,

/// Optional number of keys to return per scan.
/// A "hint" to the cluster of how much keys to return per scan iteration, if none is sent to the server, the default value is 10.
pub count: Option<u32>,

/// Optional filter to include only keys of a specific data type.
pub object_type: Option<ObjectType>,

/// Flag indicating whether to allow scanning when there are slots not covered by the cluster.
/// Flag indicating whether to allow scanning when there are slots not covered by the cluster, by default it is set to false and the scan will stop if some slots are not covered.
pub allow_non_covered_slots: bool,
}

Expand Down Expand Up @@ -142,9 +142,13 @@ impl ClusterScanArgs {
/// .build();
/// ```
pub struct ClusterScanArgsBuilder {
/// By default, the match pattern is set to `None` and no filtering is applied.
match_pattern: Option<Vec<u8>>,
/// A "hint" to the cluster of how much keys to return per scan iteration, by default none is sent to the server, the default value is 10.
count: Option<u32>,
/// By default, the object type is set to `None` and no filtering is applied.
object_type: Option<ObjectType>,
/// By default, the flag to allow scanning non-covered slots is set to `false`, meaning scanning will stop if some slots are not covered.
allow_non_covered_slots: Option<bool>,
}

Expand Down Expand Up @@ -566,33 +570,29 @@ where
if slot == END_OF_SCAN {
return Ok(NextNodeResult::AllSlotsCompleted);
}
// Attempt to retrieve the node address responsible for the current slot
match core

if let Some(addr) = core
.conn_lock
.read()
.expect(MUTEX_READ_ERR)
.slot_map
.node_address_for_slot(slot, SlotAddr::ReplicaRequired)
{
Some(addr) => {
// Found a valid address for the slot
return Ok(NextNodeResult::Address(addr));
}
None if allow_non_covered_slots => {
// Mark the current slot as scanned
mark_slot_as_scanned(scanned_slots_map, slot);
// Move to the next slot
slot = slot.saturating_add(1);
}
None => {
// Error if slots are not covered and scanning is not allowed
return Err(RedisError::from((
ErrorKind::NotAllSlotsCovered,
"Could not find an address covering a slot, SCAN operation cannot continue \n
If you want to continue scanning even if some slots are not covered, set allow_non_covered_slots to true \n
Note that this may lead to incomplete scanning, and the SCAN operation lose its all guarantees ",
)));
}
// Found a valid address for the slot
return Ok(NextNodeResult::Address(addr));
} else if allow_non_covered_slots {
// Mark the current slot as scanned
mark_slot_as_scanned(scanned_slots_map, slot);
// Move to the next slot
slot = slot.saturating_add(1);
} else {
// Error if slots are not covered and scanning is not allowed
return Err(RedisError::from((
ErrorKind::NotAllSlotsCovered,
"Could not find an address covering a slot, SCAN operation cannot continue \n
If you want to continue scanning even if some slots are not covered, set allow_non_covered_slots to true \n
Note that this may lead to incomplete scanning, and the SCAN operation lose its all guarantees ",
)));
}
}
}
Expand Down Expand Up @@ -644,7 +644,7 @@ where
},
};
// Send the SCAN command using the current scan state and scan arguments
let ((new_cursor, new_keys), mut scan_state): ((u64, Vec<Value>), ScanState) =
let ((new_cursor, new_keys), mut scan_state) =
try_scan(&scan_state, &cluster_scan_args, core.clone()).await?;

// Check if the cursor indicates the end of the current scan segment
Expand Down Expand Up @@ -713,7 +713,8 @@ where
} else {
Err(RedisError::from((
ErrorKind::ConnectionNotFoundForRoute,
"No connection available for address",
"Cluster scan failed. No connection available for address: ",
format!("{}", scan_state.address_in_scan),
)))
}
}
Expand Down
1 change: 1 addition & 0 deletions python/python/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,7 @@ async def test_cluster_scan_non_covered_slots(
in str(e_info)
)
break
cursor = ClusterScanCursor()
# Scan with allow_non_covered_slots=True
while not cursor.is_finished():
result = await glide_client_scoped.scan(
Expand Down

0 comments on commit 84adcfe

Please sign in to comment.