Skip to content

Commit

Permalink
chenged logic to use internal existing function
Browse files Browse the repository at this point in the history
  • Loading branch information
avifenesh committed Jun 25, 2024
1 parent 53cdc6f commit 84d2b35
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 75 deletions.
46 changes: 20 additions & 26 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ pub(crate) struct InnerCore<C> {
pub(crate) conn_lock: RwLock<ConnectionsContainer<C>>,
cluster_params: ClusterParams,
pending_requests: Mutex<Vec<PendingRequest<C>>>,
pub(crate) slot_refresh_in_progress: AtomicBool,
slot_refresh_in_progress: AtomicBool,
initial_nodes: Vec<ConnectionInfo>,
push_sender: Option<mpsc::UnboundedSender<PushInfo>>,
subscriptions_by_address: RwLock<HashMap<ArcStr, PubSubSubscriptionInfo>>,
Expand Down Expand Up @@ -383,20 +383,6 @@ where
.slot_map
.get_slots_of_node(node_address)
}

// Route command to the given address
pub(crate) async fn route_command_inner(
core: Arc<InnerCore<C>>,
cmd: Cmd,
address: &str,
) -> RedisResult<Value> {
let mut node_conn = ClusterConnInner::get_connection(
InternalSingleNodeRouting::ByAddress(address.to_string()),
core,
)
.await?;
node_conn.1.req_packed_command(&cmd).await
}
}

pub(crate) struct ClusterConnInner<C> {
Expand All @@ -416,7 +402,7 @@ impl<C> Dispose for ClusterConnInner<C> {
}

#[derive(Clone)]
enum InternalRoutingInfo<C> {
pub(crate) enum InternalRoutingInfo<C> {
SingleNode(InternalSingleNodeRouting<C>),
MultiNode((MultipleNodeRoutingInfo, Option<ResponsePolicy>)),
}
Expand All @@ -441,7 +427,7 @@ impl<C> From<InternalSingleNodeRouting<C>> for InternalRoutingInfo<C> {
}

#[derive(Clone)]
enum InternalSingleNodeRouting<C> {
pub(crate) enum InternalSingleNodeRouting<C> {
Random,
SpecificNode(Route),
ByAddress(String),
Expand Down Expand Up @@ -536,13 +522,13 @@ fn boxed_sleep(duration: Duration) -> BoxFuture<'static, ()> {
return Box::pin(async_std::task::sleep(duration));
}

enum Response {
pub(crate) enum Response {
Single(Value),
ClusterScanResult(ScanStateRC, Vec<Value>),
Multiple(Vec<Value>),
}

enum OperationTarget {
pub(crate) enum OperationTarget {
Node { address: ArcStr },
FanOut,
NotFound,
Expand Down Expand Up @@ -754,6 +740,7 @@ impl<C> Future for Request<C> {
return Next::Done.into();
}
OperationTarget::NotFound => {
println!("Request error `{}` not found", err);
// TODO - this is essentially a repeat of the retirable error. probably can remove duplication.
let mut request = this.request.take().unwrap();
request.info.reset_redirect();
Expand Down Expand Up @@ -1040,7 +1027,7 @@ where
}
}

pub(crate) async fn refresh_connections(
async fn refresh_connections(
inner: Arc<InnerCore<C>>,
addresses: Vec<ArcStr>,
conn_type: RefreshConnectionType,
Expand Down Expand Up @@ -1165,7 +1152,7 @@ where
}

// Query a node to discover slot-> master mappings with retries
pub(crate) async fn refresh_slots_and_subscriptions_with_retries(
async fn refresh_slots_and_subscriptions_with_retries(
inner: Arc<InnerCore<C>>,
) -> RedisResult<()> {
if inner
Expand Down Expand Up @@ -1196,6 +1183,15 @@ where
res
}

pub(crate) async fn check_topology_and_refresh_if_diff(inner: Arc<InnerCore<C>>) -> bool {
if Self::check_for_topology_diff(inner.clone()).await {
let _ = Self::refresh_slots_and_subscriptions_with_retries(inner.clone()).await;
true
} else {
false
}
}

async fn periodic_topology_check(
inner: Arc<InnerCore<C>>,
interval_duration: Duration,
Expand All @@ -1207,9 +1203,7 @@ where
}
let _ = boxed_sleep(interval_duration).await;

if Self::check_for_topology_diff(inner.clone()).await {
let _ = Self::refresh_slots_and_subscriptions_with_retries(inner.clone()).await;
} else {
if !Self::check_topology_and_refresh_if_diff(inner.clone()).await {
Self::refresh_pubsub_subscriptions(inner.clone()).await;
}
}
Expand Down Expand Up @@ -1337,7 +1331,7 @@ where
false
}

pub(crate) async fn refresh_slots(
async fn refresh_slots(
inner: Arc<InnerCore<C>>,
curr_retry: usize,
) -> Result<(), BackoffError<RedisError>> {
Expand Down Expand Up @@ -1552,7 +1546,7 @@ where
.map_err(|err| (OperationTarget::FanOut, err))
}

async fn try_cmd_request(
pub(crate) async fn try_cmd_request(
cmd: Arc<Cmd>,
routing: InternalRoutingInfo<C>,
core: Core<C>,
Expand Down
67 changes: 26 additions & 41 deletions redis/src/commands/cluster_scan.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::aio::ConnectionLike;
use crate::cluster_async::{ClusterConnInner, Connect, Core, InnerCore};
use crate::cluster_async::{
ClusterConnInner, Connect, Core, InternalRoutingInfo, InternalSingleNodeRouting, Response,
};
use crate::cluster_routing::SlotAddr;
use crate::cluster_topology::SLOT_SIZE;
use crate::{cmd, from_redis_value, Cmd, ErrorKind, RedisError, RedisResult, Value};
Expand Down Expand Up @@ -138,17 +140,14 @@ pub(crate) trait ClusterInScan {
/// Retrieves the slots assigned to a given address in the cluster.
async fn get_slots_of_address(&self, address: &str) -> Vec<u16>;

/// Refreshes the slot mapping of the cluster.
async fn refresh_slots(&self) -> RedisResult<()>;

/// Routes a Redis command to a specific address in the cluster.
async fn route_command(&self, cmd: &Cmd, address: &str) -> RedisResult<Value>;
async fn route_command(&self, cmd: Cmd, address: &str) -> RedisResult<Value>;

/// Check if all slots are covered by the cluster
async fn are_all_slots_covered(&self) -> bool;

/// Check if the topology of the cluster has changed and refresh the slots if needed
async fn refresh_if_topology_changed(&self) -> RedisResult<()>;
async fn refresh_if_topology_changed(&self);
}

/// Represents the state of a scan operation in a Redis cluster.
Expand Down Expand Up @@ -366,23 +365,27 @@ where
async fn get_slots_of_address(&self, address: &str) -> Vec<u16> {
self.as_ref().get_slots_of_address(address).await
}
// Refresh the topology of the cluster
async fn refresh_slots(&self) -> RedisResult<()> {
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(self.to_owned()).await
}
async fn route_command(&self, cmd: &Cmd, address: &str) -> RedisResult<Value> {
async fn route_command(&self, cmd: Cmd, address: &str) -> RedisResult<Value> {
let routing = InternalRoutingInfo::SingleNode(InternalSingleNodeRouting::ByAddress(
address.to_string(),
));
let core = self.to_owned();
InnerCore::route_command_inner(core, cmd.clone(), address).await
let response = ClusterConnInner::<C>::try_cmd_request(Arc::new(cmd), routing, core)
.await
.map_err(|err| err.1)?;
match response {
Response::Single(value) => Ok(value),
_ => Err(RedisError::from((
ErrorKind::ResponseError,
"Expected single response, got unexpected response",
))),
}
}
async fn are_all_slots_covered(&self) -> bool {
ClusterConnInner::<C>::check_if_all_slots_covered(&self.conn_lock.read().await.slot_map)
}
async fn refresh_if_topology_changed(&self) -> RedisResult<()> {
if ClusterConnInner::check_for_topology_diff(self.to_owned()).await {
self.refresh_slots().await
} else {
Ok(())
}
async fn refresh_if_topology_changed(&self) {
ClusterConnInner::check_topology_and_refresh_if_diff(self.to_owned()).await;
}
}

Expand Down Expand Up @@ -440,19 +443,6 @@ where
retry_scan(&scan_state, &core, match_pattern, count, object_type).await?;
(from_redis_value(&retry.0?)?, retry.1)
}
ErrorKind::BusyLoadingError | ErrorKind::TryAgain => (
from_redis_value(
&send_scan(
&scan_state,
&core,
match_pattern.clone(),
count,
object_type.clone(),
)
.await?,
)?,
scan_state.clone(),
),
_ => return Err(err),
},
};
Expand Down Expand Up @@ -502,7 +492,8 @@ where
if let Some(object_type) = object_type {
scan_command.arg("TYPE").arg(object_type.to_string());
}
core.route_command(&scan_command, &scan_state.address_in_scan)

core.route_command(scan_command, &scan_state.address_in_scan)
.await
}

Expand All @@ -521,8 +512,7 @@ async fn retry_scan<C>(
where
C: ClusterInScan,
{
core.refresh_slots().await?;

core.refresh_if_topology_changed().await;
if !core.are_all_slots_covered().await {
return Err(RedisError::from((
ErrorKind::NotAllSlotsCovered,
Expand Down Expand Up @@ -604,9 +594,7 @@ mod tests {
struct MockConnection;
#[async_trait]
impl ClusterInScan for MockConnection {
async fn refresh_if_topology_changed(&self) -> RedisResult<()> {
Ok(())
}
async fn refresh_if_topology_changed(&self) {}
async fn get_address_by_slot(&self, _slot: u16) -> RedisResult<String> {
Ok("mock_address".to_string())
}
Expand All @@ -620,10 +608,7 @@ mod tests {
vec![0, 1, 2]
}
}
async fn refresh_slots(&self) -> RedisResult<()> {
Ok(())
}
async fn route_command(&self, _: &Cmd, _: &str) -> RedisResult<Value> {
async fn route_command(&self, _: Cmd, _: &str) -> RedisResult<Value> {
unimplemented!()
}
async fn are_all_slots_covered(&self) -> bool {
Expand Down
73 changes: 65 additions & 8 deletions redis/tests/test_cluster_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,8 @@ mod test_cluster_scan_async {
assert_ne!(slot_distribution, new_slot_distribution);
}
}

// We expect an error of finding address covering slot or not all slots are covered
// Both errors message contains "please check the cluster configuration"
let res = result
.unwrap_err()
.to_string()
.contains("please check the cluster configuration");
assert!(res);
// We expect an error of finding address
assert!(result.is_err());
}

#[tokio::test] // Test cluster scan with killing all masters during scan
Expand Down Expand Up @@ -765,4 +759,67 @@ mod test_cluster_scan_async {
}
assert!(keys.len() == expected_keys.len());
}

#[tokio::test]
// Testing cluster scan when connection fails in the middle and we get an error
// then cluster up again and scanning can continue without any problem
async fn test_async_cluster_scan_failover() {
let mut cluster = TestClusterContext::new(3, 0);
let mut connection = cluster.async_connection(None).await;
let mut i = 0;
loop {
let key = format!("key{}", i);
let _: Result<(), redis::RedisError> = redis::cmd("SET")
.arg(&key)
.arg("value")
.query_async(&mut connection)
.await;
i += 1;
if i == 1000 {
break;
}
}
let mut scan_state_rc = ScanStateRC::new();
let mut keys: Vec<String> = Vec::new();
let mut count = 0;
loop {
count += 1;
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
if scan_response.is_err() {
println!("error: {:?}", scan_response);
}
let (next_cursor, scan_keys) = scan_response.unwrap();
scan_state_rc = next_cursor;
keys.extend(scan_keys.into_iter().map(|v| from_redis_value(&v).unwrap()));
if scan_state_rc.is_finished() {
break;
}
if count == 5 {
drop(cluster);
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc.clone(), None, None, None)
.await;
assert!(scan_response.is_err());
break;
};
}
cluster = TestClusterContext::new(3, 0);
connection = cluster.async_connection(None).await;
loop {
let scan_response: RedisResult<(ScanStateRC, Vec<Value>)> = connection
.cluster_scan(scan_state_rc, None, None, None)
.await;
if scan_response.is_err() {
println!("error: {:?}", scan_response);
}
let (next_cursor, scan_keys) = scan_response.unwrap();
scan_state_rc = next_cursor;
keys.extend(scan_keys.into_iter().map(|v| from_redis_value(&v).unwrap()));
if scan_state_rc.is_finished() {
break;
}
}
}
}

0 comments on commit 84d2b35

Please sign in to comment.