Skip to content

Commit

Permalink
SlotMap refactor: Added new NodesMap, changed shard addresses to be s…
Browse files Browse the repository at this point in the history
…hard between shard nodes and slot map values
  • Loading branch information
barshaul committed Aug 27, 2024
1 parent eafaadb commit 6ce81db
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 210 deletions.
34 changes: 16 additions & 18 deletions redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@
//! .expire(key, 60).ignore()
//! .query(&mut connection).unwrap();
//! ```
use rand::{seq::IteratorRandom, thread_rng, Rng};
use std::cell::RefCell;
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use rand::{seq::IteratorRandom, thread_rng, Rng};

use crate::cluster_pipeline::UNROUTABLE_ERROR;
use crate::cluster_routing::{
MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, SlotAddr,
Expand Down Expand Up @@ -343,22 +343,20 @@ where
let mut slots = self.slots.borrow_mut();
*slots = self.create_new_slots()?;

let mut nodes = slots.values().flatten().collect::<Vec<_>>();
nodes.sort_unstable();
nodes.dedup();

let nodes = slots.all_node_addresses();
let mut connections = self.connections.borrow_mut();
*connections = nodes
.into_iter()
.filter_map(|addr| {
if connections.contains_key(addr) {
let mut conn = connections.remove(addr).unwrap();
let addr = addr.to_string();
if connections.contains_key(&addr) {
let mut conn = connections.remove(&addr).unwrap();
if conn.check_connection() {
return Some((addr.to_string(), conn));
}
}

if let Ok(mut conn) = self.connect(addr) {
if let Ok(mut conn) = self.connect(&addr) {
if conn.check_connection() {
return Some((addr.to_string(), conn));
}
Expand Down Expand Up @@ -424,7 +422,7 @@ where
if let Some(addr) = slots.slot_addr_for_route(route) {
Ok((
addr.to_string(),
self.get_connection_by_addr(connections, addr)?,
self.get_connection_by_addr(connections, &addr)?,
))
} else {
// try a random node next. This is safe if slots are involved
Expand Down Expand Up @@ -495,13 +493,13 @@ where
fn execute_on_all<'a>(
&'a self,
input: Input,
addresses: HashSet<&'a str>,
addresses: HashSet<Arc<String>>,
connections: &'a mut HashMap<String, C>,
) -> Vec<RedisResult<(&'a str, Value)>> {
) -> Vec<RedisResult<(Arc<String>, Value)>> {
addresses
.into_iter()
.map(|addr| {
let connection = self.get_connection_by_addr(connections, addr)?;
let connection = self.get_connection_by_addr(connections, &addr)?;
match input {
Input::Slice { cmd, routable: _ } => connection.req_packed_command(cmd),
Input::Cmd(cmd) => connection.req_command(cmd),
Expand All @@ -526,16 +524,16 @@ where
input: Input,
slots: &'a mut SlotMap,
connections: &'a mut HashMap<String, C>,
) -> Vec<RedisResult<(&'a str, Value)>> {
self.execute_on_all(input, slots.addresses_for_all_nodes(), connections)
) -> Vec<RedisResult<(Arc<String>, Value)>> {
self.execute_on_all(input, slots.all_node_addresses(), connections)
}

fn execute_on_all_primaries<'a>(
&'a self,
input: Input,
slots: &'a mut SlotMap,
connections: &'a mut HashMap<String, C>,
) -> Vec<RedisResult<(&'a str, Value)>> {
) -> Vec<RedisResult<(Arc<String>, Value)>> {
self.execute_on_all(input, slots.addresses_for_all_primaries(), connections)
}

Expand All @@ -545,7 +543,7 @@ where
slots: &'a mut SlotMap,
connections: &'a mut HashMap<String, C>,
routes: &'b [(Route, Vec<usize>)],
) -> Vec<RedisResult<(&'a str, Value)>>
) -> Vec<RedisResult<(Arc<String>, Value)>>
where
'b: 'a,
{
Expand All @@ -557,7 +555,7 @@ where
ErrorKind::IoError,
"Couldn't find connection",
)))?;
let connection = self.get_connection_by_addr(connections, addr)?;
let connection = self.get_connection_by_addr(connections, &addr)?;
let (_, indices) = routes.get(index).unwrap();
let cmd =
crate::cluster_routing::command_for_multi_slot_indices(&input, indices.iter());
Expand Down
12 changes: 4 additions & 8 deletions redis/src/cluster_async/connections_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,18 +147,14 @@ where

/// Returns true if the address represents a known primary node.
pub(crate) fn is_primary(&self, address: &String) -> bool {
self.connection_for_address(address).is_some()
&& self
.slot_map
.values()
.any(|slot_addrs| slot_addrs.primary.as_str() == address)
self.connection_for_address(address).is_some() && self.slot_map.is_primary(address)
}

fn round_robin_read_from_replica(
&self,
slot_map_value: &SlotMapValue,
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let addrs = &slot_map_value.addrs.read().unwrap();
let initial_index = slot_map_value
.latest_used_replica
.load(std::sync::atomic::Ordering::Relaxed);
Expand All @@ -185,7 +181,7 @@ where

fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
let addrs = &slot_map_value.addrs;
let addrs = &slot_map_value.addrs.read().unwrap();
if addrs.replicas.is_empty() {
return self.connection_for_address(addrs.primary.as_str());
}
Expand Down Expand Up @@ -232,7 +228,7 @@ where
self.slot_map
.addresses_for_all_primaries()
.into_iter()
.flat_map(|addr| self.connection_for_address(addr))
.flat_map(|addr| self.connection_for_address(&addr))
}

pub(crate) fn node_for_address(&self, address: &str) -> Option<ClusterNode<Connection>> {
Expand Down
40 changes: 11 additions & 29 deletions redis/src/cluster_async/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ where
&self,
slot: u16,
slot_addr: SlotAddr,
) -> Option<String> {
) -> Option<Arc<String>> {
self.conn_lock
.read()
.await
Expand Down Expand Up @@ -444,7 +444,7 @@ where
}

// return slots of node
pub(crate) async fn get_slots_of_address(&self, node_address: &str) -> Vec<u16> {
pub(crate) async fn get_slots_of_address(&self, node_address: Arc<String>) -> Vec<u16> {
self.conn_lock
.read()
.await
Expand Down Expand Up @@ -1020,7 +1020,6 @@ where
Self::refresh_slots_and_subscriptions_with_retries(
connection.inner.clone(),
&RefreshPolicy::NotThrottable,
None,
)
.await?;

Expand Down Expand Up @@ -1164,7 +1163,6 @@ where
if let Err(err) = Self::refresh_slots_and_subscriptions_with_retries(
inner.clone(),
&RefreshPolicy::Throttable,
None,
)
.await
{
Expand Down Expand Up @@ -1336,7 +1334,6 @@ where
async fn refresh_slots_and_subscriptions_with_retries(
inner: Arc<InnerCore<C>>,
policy: &RefreshPolicy,
moved_redirect: Option<RedirectNode>,
) -> RedisResult<()> {
let SlotRefreshState {
in_progress,
Expand Down Expand Up @@ -1388,10 +1385,6 @@ where
Self::refresh_slots(inner.clone(), curr_retry)
})
.await;
} else if moved_redirect.is_some() {
// Update relevant slots in the slots map based on the moved_redirect address,
// rather than refreshing all slots by querying the cluster nodes for their topology view.
Self::update_slots_for_redirect_change(inner.clone(), moved_redirect).await?;
}
in_progress.store(false, Ordering::Relaxed);

Expand All @@ -1400,15 +1393,6 @@ where
res
}

/// Update relevant slots in the slots map based on the moved_redirect address
pub(crate) async fn update_slots_for_redirect_change(
_inner: Arc<InnerCore<C>>,
_moved_redirect: Option<RedirectNode>,
) -> RedisResult<()> {
// TODO: Add implementation
Ok(())
}

/// Determines if the cluster topology has changed and refreshes slots and subscriptions if needed.
/// Returns `RedisResult` with `true` if changes were detected and slots were refreshed,
/// or `false` if no changes were found. Raises an error if refreshing the topology fails.
Expand All @@ -1418,7 +1402,7 @@ where
) -> RedisResult<bool> {
let topology_changed = Self::check_for_topology_diff(inner.clone()).await;
if topology_changed {
Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy, None).await?;
Self::refresh_slots_and_subscriptions_with_retries(inner.clone(), policy).await?;
}
Ok(topology_changed)
}
Expand Down Expand Up @@ -1629,21 +1613,20 @@ where
.0?;
let connections = &*read_guard;
// Create a new connection vector of the found nodes
let mut nodes = new_slots.values().flatten().collect::<Vec<_>>();
nodes.sort_unstable();
nodes.dedup();
let nodes = new_slots.all_node_addresses();
let nodes_len = nodes.len();
let addresses_and_connections_iter = stream::iter(nodes)
.fold(
Vec::with_capacity(nodes_len),
|mut addrs_and_conns, addr| async move {
let addr = addr.to_string();
if let Some(node) = connections.node_for_address(addr.as_str()) {
addrs_and_conns.push((addr, Some(node)));
return addrs_and_conns;
}
// If it's a DNS endpoint, it could have been stored in the existing connections vector using the resolved IP address instead of the DNS endpoint's name.
// We shall check if a connection is already exists under the resolved IP name.
let (host, port) = match get_host_and_port_from_addr(addr) {
let (host, port) = match get_host_and_port_from_addr(&addr) {
Some((host, port)) => (host, port),
None => {
addrs_and_conns.push((addr, None));
Expand All @@ -1669,18 +1652,18 @@ where
|connections, (addr, node)| async {
let mut cluster_params = inner.cluster_params.clone();
let subs_guard = inner.subscriptions_by_address.read().await;
cluster_params.pubsub_subscriptions = subs_guard.get(addr).cloned();
cluster_params.pubsub_subscriptions = subs_guard.get(&addr).cloned();
drop(subs_guard);
let node = get_or_create_conn(
addr,
&addr,
node,
&cluster_params,
RefreshConnectionType::AllConnections,
inner.push_sender.clone(),
)
.await;
if let Ok(node) = node {
connections.0.insert(addr.into(), node);
connections.0.insert(addr, node);
}
connections
},
Expand Down Expand Up @@ -2024,7 +2007,6 @@ where
*future = Box::pin(Self::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
None,
));
Poll::Ready(Err(err))
}
Expand Down Expand Up @@ -2271,12 +2253,12 @@ where

match ready!(self.poll_complete(cx)) {
PollFlushAction::None => return Poll::Ready(Ok(())),
PollFlushAction::RebuildSlots(moved_redirect) => {
PollFlushAction::RebuildSlots(_moved_redirect) => {
// TODO: Add logic to update the slots map based on the MOVED error
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
self.inner.clone(),
&RefreshPolicy::Throttable,
moved_redirect,
),
)));
}
Expand Down
38 changes: 10 additions & 28 deletions redis/src/cluster_routing.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::cmp::min;
use std::collections::HashMap;

use crate::cluster_topology::get_slot;
use crate::cmd::{Arg, Cmd};
use crate::types::Value;
use crate::{ErrorKind, RedisResult};
use std::cmp::min;
use std::collections::HashMap;
use std::iter::Once;
use std::sync::Arc;

#[derive(Clone)]
pub(crate) enum Redirect {
Expand Down Expand Up @@ -866,14 +866,6 @@ impl Slot {
}
}

pub fn start(&self) -> u16 {
self.start
}

pub fn end(&self) -> u16 {
self.end
}

#[allow(dead_code)] // used in tests
pub(crate) fn master(&self) -> &str {
self.master.as_str()
Expand Down Expand Up @@ -902,25 +894,15 @@ pub enum SlotAddr {
/// which stores only the master and [optional] replica
/// to avoid the need to choose a replica each time
/// a command is executed
#[derive(Debug, Eq, PartialEq)]
pub(crate) struct SlotAddrs {
pub(crate) primary: String,
pub(crate) replicas: Vec<String>,
}

impl SlotAddrs {
pub(crate) fn new(primary: String, replicas: Vec<String>) -> Self {
Self { primary, replicas }
}

pub(crate) fn from_slot(slot: Slot) -> Self {
SlotAddrs::new(slot.master, slot.replicas)
}
#[derive(Debug, Eq, PartialEq, Clone, PartialOrd, Ord)]
pub(crate) struct ShardAddrs {
pub(crate) primary: Arc<String>,
pub(crate) replicas: Vec<Arc<String>>,
}

impl<'a> IntoIterator for &'a SlotAddrs {
type Item = &'a String;
type IntoIter = std::iter::Chain<Once<&'a String>, std::slice::Iter<'a, String>>;
impl<'a> IntoIterator for &'a ShardAddrs {
type Item = &'a Arc<String>;
type IntoIter = std::iter::Chain<Once<&'a Arc<String>>, std::slice::Iter<'a, Arc<String>>>;

fn into_iter(self) -> Self::IntoIter {
std::iter::once(&self.primary).chain(self.replicas.iter())
Expand Down
Loading

0 comments on commit 6ce81db

Please sign in to comment.