Skip to content

Commit

Permalink
Removing the sync point inside the poll_flush as we understand it's n…
Browse files Browse the repository at this point in the history
…ot needed in rust where tokio runs the synced poll_flush to the end and only then let the refresh task to update the connection map

Signed-off-by: GilboaAWS <[email protected]>
  • Loading branch information
GilboaAWS committed Feb 3, 2025
1 parent 78a966f commit b3c292d
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 167 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,6 @@ impl RefreshTaskNotifier {
// The status transitions from `Reconnecting` to `ReconnectingTooLong` under specific
// conditions (e.g., after one attempt of reconnecting inside the task or after a timeout).
//
// The transition from `Reconnecting` to `ReconnectingTooLong` is managed exclusively
// within the `update_refreshed_connection` function in `poll_flush`. This ensures that
// all requests maintain a consistent view of the connections.
//
// When transitioning from `Reconnecting` to `ReconnectingTooLong`, the associated
// notifier is triggered to unblock all awaiting tasks.
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -251,6 +247,7 @@ impl RefreshTaskState {
}
}


impl Drop for RefreshTaskState {
fn drop(&mut self) {
if let RefreshTaskStatus::Reconnecting(ref notifier) = self.status {
Expand All @@ -271,22 +268,16 @@ impl Drop for RefreshTaskState {
}

// This struct is used to track the status of each address refresh
pub(crate) struct RefreshConnectionStates<Connection> {
// Holds all the failed addresses that started a refresh task.
pub(crate) refresh_addresses_started: HashSet<String>,
pub(crate) struct RefreshConnectionStates {
// Follow the refresh ops on the connections
pub(crate) refresh_address_in_progress: HashMap<String, RefreshTaskState>,
// Holds all the refreshed addresses that are ready to be inserted into the connection_map
pub(crate) refresh_addresses_done: HashMap<String, Option<ClusterNode<Connection>>>,
}

impl<Connection> RefreshConnectionStates<Connection> {
impl RefreshConnectionStates {
// Clears all ongoing refresh connection tasks and resets associated state tracking.
//
// - This method removes all entries in the `refresh_address_in_progress` map.
// - The `Drop` trait is responsible for notifying the associated notifiers and aborting any unfinished refresh tasks.
// - Additionally, this method clears `refresh_addresses_started` and `refresh_addresses_done`
// to ensure no stale data remains in the refresh state tracking.
pub(crate) fn clear_refresh_state(&mut self) {
debug!(
"clear_refresh_state: removing all in-progress refresh connection tasks for addresses: {:?}",
Expand All @@ -297,8 +288,6 @@ impl<Connection> RefreshConnectionStates<Connection> {
self.refresh_address_in_progress.clear();

// Clear other state tracking
self.refresh_addresses_started.clear();
self.refresh_addresses_done.clear();
}

// Collects the notifiers for the given addresses and returns them as a vector.
Expand Down Expand Up @@ -331,12 +320,10 @@ impl<Connection> RefreshConnectionStates<Connection> {
}
}

impl<Connection> Default for RefreshConnectionStates<Connection> {
impl Default for RefreshConnectionStates {
fn default() -> Self {
Self {
refresh_addresses_started: HashSet::new(),
refresh_address_in_progress: HashMap::new(),
refresh_addresses_done: HashMap::new(),
}
}
}
Expand All @@ -346,7 +333,7 @@ pub(crate) struct ConnectionsContainer<Connection> {
pub(crate) slot_map: SlotMap,
read_from_replica_strategy: ReadFromReplicaStrategy,
topology_hash: TopologyHash,
pub(crate) refresh_conn_state: RefreshConnectionStates<Connection>,
pub(crate) refresh_conn_state: RefreshConnectionStates,
}

impl<Connection> Drop for ConnectionsContainer<Connection> {
Expand Down
171 changes: 22 additions & 149 deletions glide-core/redis-rs/redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1405,9 +1405,6 @@ where
.map(|notify| notify.notified())
.collect();
futures::future::join_all(futures).await;

// Update the connections in the connection_container
Self::update_refreshed_connection(inner);
}

async fn trigger_refresh_connection_tasks(
Expand Down Expand Up @@ -1435,25 +1432,22 @@ where
let address_clone = address.clone();
let address_clone_for_task = address.clone();

// Add this address to be removed in poll_flush so all requests see a consistent connection map.
// See next comment for elaborated explanation.
inner_clone
// let node_option = if check_existing_conn {
// let connections_container = inner.conn_lock.read().expect(MUTEX_READ_ERR);
// connections_container.remove_node(&address)
// } else {
// None
// };

let mut node_option = inner
.conn_lock
.write()
.read()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_addresses_started
.insert(address_clone_for_task.clone());

let node_option = if check_existing_conn {
let connections_container = inner_clone.conn_lock.read().expect(MUTEX_READ_ERR);
connections_container
.connection_map()
.get(&address_clone_for_task)
.map(|node| node.value().clone())
} else {
None
};
.remove_node(&address);

if !check_existing_conn {
node_option = None;
}

let handle = tokio::spawn(async move {
info!(
Expand Down Expand Up @@ -1504,45 +1498,30 @@ where
.conn_lock
.write()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_addresses_done
.insert(address_clone_for_task.clone(), Some(node));
.replace_or_add_connection_for_address(
address_clone_for_task.clone(),
node,
);
}
Err(err) => {
warn!(
"Failed to refresh connection for node {}. Error: `{:?}`",
address_clone_for_task, err
);
// TODO - When we move to retry more than once, we add this address to a new set of running to long, and then only move
// the RefreshTaskState.status to RunningTooLong in the poll_flush context inside update_refreshed_connection.
inner_clone
.conn_lock
.write()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_addresses_done
.insert(address_clone_for_task.clone(), None);
}
}

// Need to notify here the awaitng requests inorder to awaket the context of the poll_flush as
// Note!! - TODO
// Need to notify here the awaiting requests inorder to awake the context of the poll_flush as
// it awaits on this notifier inside the get_connection in the poll_next inside poll_complete.
// Otherwise poll_flush won't be polled until the next start_send or other requests I/O.
if let Some(task_state) = inner_clone
inner_clone
.conn_lock
.write()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_address_in_progress
.get_mut(&address_clone_for_task)
{
task_state.status.notify_waiting_requests();
} else {
warn!(
"No refresh task state found for address: {}",
address_clone_for_task
);
}
.remove(&address_clone_for_task);

info!("Refreshing connection task to {:?} is done", address_clone);
});
Expand Down Expand Up @@ -2623,112 +2602,6 @@ where
Self::try_request(info, core).await
}

fn update_refreshed_connection(inner: Arc<InnerCore<C>>) {
trace!("update_refreshed_connection started");
loop {
let connections_container = inner.conn_lock.read().expect(MUTEX_READ_ERR);

// Check if both sets are empty
if connections_container
.refresh_conn_state
.refresh_addresses_started
.is_empty()
&& connections_container
.refresh_conn_state
.refresh_addresses_done
.is_empty()
{
break;
}

let addresses_to_remove: Vec<String> = connections_container
.refresh_conn_state
.refresh_addresses_started
.iter()
.cloned()
.collect();

let addresses_done: Vec<String> = connections_container
.refresh_conn_state
.refresh_addresses_done
.keys()
.cloned()
.collect();

drop(connections_container);

// Process refresh_addresses_started
for address in addresses_to_remove {
inner
.conn_lock
.write()
.expect(MUTEX_READ_ERR)
.refresh_conn_state
.refresh_addresses_started
.remove(&address);
inner
.conn_lock
.write()
.expect(MUTEX_READ_ERR)
.remove_node(&address);
}

// Process refresh_addresses_done
for address in addresses_done {
// Check if the address exists in refresh_addresses_done
let mut conn_lock_write = inner.conn_lock.write().expect(MUTEX_READ_ERR);
if let Some(conn_option) = conn_lock_write
.refresh_conn_state
.refresh_addresses_done
.get_mut(&address)
{
// Match the content of the Option
match conn_option.take() {
Some(conn) => {
debug!(
"update_refreshed_connection: found refreshed connection for address {}",
address
);
// Move the node_conn to the function
conn_lock_write
.replace_or_add_connection_for_address(address.clone(), conn);
}
None => {
debug!(
"update_refreshed_connection: task completed, but no connection for address {}",
address
);
}
}
}

// Remove this address from refresh_addresses_done
conn_lock_write
.refresh_conn_state
.refresh_addresses_done
.remove(&address);

// Remove this entry from refresh_address_in_progress
if conn_lock_write
.refresh_conn_state
.refresh_address_in_progress
.remove(&address)
.is_some()
{
debug!(
"update_refreshed_connection: Successfully removed refresh state for address: {}",
address
);
} else {
warn!(
"update_refreshed_connection: No refresh state found to remove for address: {:?}",
address
);
}
}
}
}

fn poll_complete(&mut self, cx: &mut task::Context<'_>) -> Poll<PollFlushAction> {
let retry_params = self
.inner
Expand Down

0 comments on commit b3c292d

Please sign in to comment.