Skip to content

Commit

Permalink
tmp commit
Browse files Browse the repository at this point in the history
Signed-off-by: barshaul <[email protected]>
  • Loading branch information
barshaul committed Nov 13, 2024
1 parent c374d29 commit c6bf0a2
Show file tree
Hide file tree
Showing 8 changed files with 1,422 additions and 239 deletions.
45 changes: 21 additions & 24 deletions glide-core/redis-rs/redis/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,6 @@
//! .expire(key, 60).ignore()
//! .query(&mut connection).unwrap();
//! ```
use std::cell::RefCell;
use std::collections::HashSet;
use std::str::FromStr;
use std::thread;
use std::time::Duration;

use rand::{seq::IteratorRandom, thread_rng};

pub use crate::cluster_client::{ClusterClient, ClusterClientBuilder};
use crate::cluster_pipeline::UNROUTABLE_ERROR;
pub use crate::cluster_pipeline::{cluster_pipe, ClusterPipeline};
Expand All @@ -63,6 +55,13 @@ use crate::{
cluster_routing::{Redirect, Route, RoutingInfo},
IntoConnectionInfo, PushInfo,
};
use rand::{seq::IteratorRandom, thread_rng};
use std::cell::RefCell;
use std::collections::HashSet;
use std::str::FromStr;
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use tokio::sync::mpsc;

Expand Down Expand Up @@ -342,22 +341,20 @@ where
let mut slots = self.slots.borrow_mut();
*slots = self.create_new_slots()?;

let mut nodes = slots.values().flatten().collect::<Vec<_>>();
nodes.sort_unstable();
nodes.dedup();

let nodes = slots.all_node_addresses();
let mut connections = self.connections.borrow_mut();
*connections = nodes
.into_iter()
.filter_map(|addr| {
if connections.contains_key(addr) {
let mut conn = connections.remove(addr).unwrap();
let addr = addr.to_string();
if connections.contains_key(&addr) {
let mut conn = connections.remove(&addr).unwrap();
if conn.check_connection() {
return Some((addr.to_string(), conn));
}
}

if let Ok(mut conn) = self.connect(addr) {
if let Ok(mut conn) = self.connect(&addr) {
if conn.check_connection() {
return Some((addr.to_string(), conn));
}
Expand Down Expand Up @@ -423,7 +420,7 @@ where
if let Some(addr) = slots.slot_addr_for_route(route) {
Ok((
addr.to_string(),
self.get_connection_by_addr(connections, addr)?,
self.get_connection_by_addr(connections, &addr)?,
))
} else {
// try a random node next. This is safe if slots are involved
Expand Down Expand Up @@ -491,13 +488,13 @@ where
fn execute_on_all<'a>(
&'a self,
input: Input,
addresses: HashSet<&'a str>,
addresses: HashSet<Arc<String>>,
connections: &'a mut HashMap<String, C>,
) -> Vec<RedisResult<(&'a str, Value)>> {
) -> Vec<RedisResult<(Arc<String>, Value)>> {
addresses
.into_iter()
.map(|addr| {
let connection = self.get_connection_by_addr(connections, addr)?;
let connection = self.get_connection_by_addr(connections, &addr)?;
match input {
Input::Slice { cmd, routable: _ } => connection.req_packed_command(cmd),
Input::Cmd(cmd) => connection.req_command(cmd),
Expand All @@ -522,16 +519,16 @@ where
input: Input,
slots: &'a mut SlotMap,
connections: &'a mut HashMap<String, C>,
) -> Vec<RedisResult<(&'a str, Value)>> {
self.execute_on_all(input, slots.addresses_for_all_nodes(), connections)
) -> Vec<RedisResult<(Arc<String>, Value)>> {
self.execute_on_all(input, slots.all_node_addresses(), connections)
}

fn execute_on_all_primaries<'a>(
&'a self,
input: Input,
slots: &'a mut SlotMap,
connections: &'a mut HashMap<String, C>,
) -> Vec<RedisResult<(&'a str, Value)>> {
) -> Vec<RedisResult<(Arc<String>, Value)>> {
self.execute_on_all(input, slots.addresses_for_all_primaries(), connections)
}

Expand All @@ -541,7 +538,7 @@ where
slots: &'a mut SlotMap,
connections: &'a mut HashMap<String, C>,
routes: &'b [(Route, Vec<usize>)],
) -> Vec<RedisResult<(&'a str, Value)>>
) -> Vec<RedisResult<(Arc<String>, Value)>>
where
'b: 'a,
{
Expand All @@ -553,7 +550,7 @@ where
ErrorKind::IoError,
"Couldn't find connection",
)))?;
let connection = self.get_connection_by_addr(connections, addr)?;
let connection = self.get_connection_by_addr(connections, &addr)?;
let (_, indices) = routes.get(index).unwrap();
let cmd =
crate::cluster_routing::command_for_multi_slot_indices(&input, indices.iter());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::cluster_async::ConnectionFuture;
use crate::cluster_routing::{Route, SlotAddr};
use crate::cluster_routing::{Route, ShardAddrs, SlotAddr};
use crate::cluster_slotmap::{ReadFromReplicaStrategy, SlotMap, SlotMapValue};
use crate::cluster_topology::TopologyHash;
use dashmap::DashMap;
use futures::FutureExt;
use rand::seq::IteratorRandom;
use std::net::IpAddr;
use std::sync::Arc;
use telemetrylib::Telemetry;

/// Count the number of connections in a connections_map object
Expand Down Expand Up @@ -175,6 +176,16 @@ where
}
}

/// Returns an iterator over the nodes in the `slot_map`, yielding pairs of the node address and its associated shard addresses.
pub(crate) fn slot_map_nodes(
&self,
) -> impl Iterator<Item = (Arc<String>, Arc<ShardAddrs>)> + '_ {
self.slot_map
.nodes_map()
.iter()
.map(|item| (item.key().clone(), item.value().clone()))
}

// Extends the current connection map with the provided one
pub(crate) fn extend_connection_map(
&mut self,
Expand All @@ -189,11 +200,7 @@ where

/// Returns true if the address represents a known primary node.
pub(crate) fn is_primary(&self, address: &String) -> bool {
self.connection_for_address(address).is_some()
&& self
.slot_map
.values()
.any(|slot_addrs| slot_addrs.primary.as_str() == address)
self.connection_for_address(address).is_some() && self.slot_map.is_primary(address)
}

fn round_robin_read_from_replica(
Expand All @@ -202,19 +209,20 @@ where
) -> Option<ConnectionAndAddress<Connection>> {
let addrs = &slot_map_value.addrs;
let initial_index = slot_map_value
.latest_used_replica
.last_used_replica
.load(std::sync::atomic::Ordering::Relaxed);
let mut check_count = 0;
loop {
check_count += 1;

// Looped through all replicas, no connected replica was found.
if check_count > addrs.replicas.len() {
return self.connection_for_address(addrs.primary.as_str());
if check_count > addrs.replicas().len() {
return self.connection_for_address(addrs.primary().as_str());
}
let index = (initial_index + check_count) % addrs.replicas.len();
if let Some(connection) = self.connection_for_address(addrs.replicas[index].as_str()) {
let _ = slot_map_value.latest_used_replica.compare_exchange_weak(
let index = (initial_index + check_count) % addrs.replicas().len();
if let Some(connection) = self.connection_for_address(addrs.replicas()[index].as_str())
{
let _ = slot_map_value.last_used_replica.compare_exchange_weak(
initial_index,
index,
std::sync::atomic::Ordering::Relaxed,
Expand All @@ -228,15 +236,15 @@ where
fn lookup_route(&self, route: &Route) -> Option<ConnectionAndAddress<Connection>> {
let slot_map_value = self.slot_map.slot_value_for_route(route)?;
let addrs = &slot_map_value.addrs;
if addrs.replicas.is_empty() {
return self.connection_for_address(addrs.primary.as_str());
if addrs.replicas().is_empty() {
return self.connection_for_address(addrs.primary().as_str());
}

match route.slot_addr() {
SlotAddr::Master => self.connection_for_address(addrs.primary.as_str()),
SlotAddr::Master => self.connection_for_address(addrs.primary().as_str()),
SlotAddr::ReplicaOptional => match self.read_from_replica_strategy {
ReadFromReplicaStrategy::AlwaysFromPrimary => {
self.connection_for_address(addrs.primary.as_str())
self.connection_for_address(addrs.primary().as_str())
}
ReadFromReplicaStrategy::RoundRobin => {
self.round_robin_read_from_replica(slot_map_value)
Expand Down Expand Up @@ -274,7 +282,7 @@ where
self.slot_map
.addresses_for_all_primaries()
.into_iter()
.flat_map(|addr| self.connection_for_address(addr))
.flat_map(|addr| self.connection_for_address(&addr))
}

pub(crate) fn node_for_address(&self, address: &str) -> Option<ClusterNode<Connection>> {
Expand Down
Loading

0 comments on commit c6bf0a2

Please sign in to comment.