From fc19bd291e8a96294be75bc9a8b19ddd557b9479 Mon Sep 17 00:00:00 2001 From: Tagir Asadullin Date: Wed, 31 Jan 2024 21:06:05 +0500 Subject: [PATCH] node expanding numeration fix (#568) --- bob-apps/bin/ccg.rs | 11 +- .../bin/config_cluster_generator/pattern.rs | 173 ++++++++++++++++-- bob-common/src/configs/cluster.rs | 26 +-- 3 files changed, 162 insertions(+), 48 deletions(-) diff --git a/bob-apps/bin/ccg.rs b/bob-apps/bin/ccg.rs index 13c71074..5b5bc159 100644 --- a/bob-apps/bin/ccg.rs +++ b/bob-apps/bin/ccg.rs @@ -9,7 +9,7 @@ use bob_common::configs::{cluster::DistributionFunc}; use clap::{App, Arg, ArgMatches, SubCommand}; use config_cluster_generator::{ center::{check_expand_configs, get_new_disks, get_new_racks, Center}, - pattern::{pattern_to_nodes}, + pattern::{pattern_extend_nodes}, utils::{ceil, init_logger, read_config_from_file, write_to_file}, }; @@ -208,19 +208,20 @@ fn simple_gen( } fn pattern_gen(pattern: String, node_pattern: String) -> AnyResult { - let nodes = pattern_to_nodes(pattern, node_pattern)?; + let nodes = pattern_extend_nodes(vec![], pattern, node_pattern)?; let config = ClusterConfig::new(nodes, vec![], vec![], DistributionFunc::default()); debug!("pattern gen: OK [\n{:#?}\n]", config); Ok(config) } fn pattern_expand( - mut config: ClusterConfig, + config: ClusterConfig, pattern: String, node_pattern: String, ) -> AnyResult { - let nodes = pattern_to_nodes(pattern, node_pattern)?; - config.disjoint_union_nodes(nodes); + let nodes = pattern_extend_nodes(config.nodes().to_owned(), pattern, node_pattern)?; + let config = ClusterConfig::new(nodes, config.vdisks().to_owned(), config.racks().to_owned(), config.distribution_func()); + debug!("pattern extending: OK [\n{:#?}\n]", config); Ok(config) } diff --git a/bob-apps/bin/config_cluster_generator/pattern.rs b/bob-apps/bin/config_cluster_generator/pattern.rs index 0ff9f534..9aa5ec49 100644 --- a/bob-apps/bin/config_cluster_generator/pattern.rs +++ b/bob-apps/bin/config_cluster_generator/pattern.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Result as AnyResult}; -use itertools::Itertools; use bob_common::configs::cluster::Node; use bob_common::core_types::{DiskName, DiskPath}; +use itertools::Itertools; fn parse_address_pattern(pattern: &String) -> AnyResult<((String, u16), String)> { debug!("Pattern to parse: {}", pattern); @@ -51,39 +51,73 @@ fn generate_range_samples(pattern: &str) -> impl Iterator { .into_iter() } -pub fn pattern_to_nodes(pattern: String, node_pattern: String) -> AnyResult> { +pub fn pattern_extend_nodes( + mut old_nodes: Vec, + pattern: String, + node_pattern: String, +) -> AnyResult> { let mut err = Ok(()); - let nodes = generate_range_samples(&pattern) + let mut node_counter = old_nodes.len(); + let new_nodes: Vec = generate_range_samples(&pattern) .map_while(|key| { let result = parse_address_pattern(&key); match result { Ok(((ip, port), path)) => Some(((ip, port), path)), - Err(e) => { err = Err(e); None }, + Err(e) => { + err = Err(e); + None + } } }) .group_by(|key| key.0.clone()) .into_iter() - .enumerate() - .map(|(node_count, (ip_port, addresses))| { - let disks: Vec = addresses - .enumerate() - .map(|(disk_count, disk)| { - DiskPath::new( - DiskName::new(format!("disk{}", disk_count + 1).as_str()), - disk.1.as_str(), - ) + .filter_map(|(ip_port, addresses)| { + let address = format!("{}:{}", ip_port.0, ip_port.1); + let existed_node = old_nodes + .iter() + .find_position(|node| node.address() == address); + + let mut old_disks = existed_node + .map(|(_, node)| node.disks().to_vec()) + .unwrap_or_default(); + + let mut disk_counter = old_disks.len(); + let new_disks: Vec = addresses + .filter_map(|(_, path)| { + let disk_path = path.as_str(); + if !old_nodes.is_empty() && old_disks.iter().any(|d| d.path() == disk_path) { + None + } else { + disk_counter += 1; + Some(DiskPath::new( + DiskName::new(&format!("disk{}", disk_counter)), + disk_path, + )) + } }) .collect(); - - Node::new( - substitute_node(node_pattern.as_str(), ip_port.0.as_str(), ip_port.1, node_count + 1), - format!("{}:{}", ip_port.0, ip_port.1), - disks, - ) + old_disks.extend_from_slice(&new_disks); + if let Some((pos, node)) = existed_node { + old_nodes[pos] = Node::new(node.name().to_string(), address, old_disks); + None + } else { + node_counter += 1; + Some(Node::new( + substitute_node( + node_pattern.as_str(), + ip_port.0.as_str(), + ip_port.1, + node_counter, + ), + address, + old_disks, + )) + } }) .collect(); err?; - Ok(nodes) + old_nodes.extend_from_slice(&new_nodes); + Ok(old_nodes) } #[cfg(test)] @@ -131,4 +165,103 @@ mod tests { let result = parse_address_pattern(&pattern); assert!(result.is_err()); } + #[test] + fn test_pattern_extend_nodes() { + let old_nodes = vec![]; + let pattern = "test[1-3]:10000/a[1-2]".to_string(); + let node_pattern = "{ip}_{port}_{id}".to_string(); + + let result = pattern_extend_nodes(old_nodes, pattern, node_pattern).unwrap(); + + assert_eq!( + result, + vec![ + Node::new( + "test1_10000_1".to_string(), + "test1:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + Node::new( + "test2_10000_2".to_string(), + "test2:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + Node::new( + "test3_10000_3".to_string(), + "test3:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + ] + ); + // extending + let old_nodes = result; + let pattern = "test[2-4]:10000/a[2-5]".to_string(); + let node_pattern = "{ip}_{port}_{id}".to_string(); + + let result = pattern_extend_nodes(old_nodes, pattern, node_pattern).unwrap(); + + assert_eq!( + result, + vec![ + Node::new( + "test1_10000_1".to_string(), + "test1:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + Node::new( + "test2_10000_2".to_string(), + "test2:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + DiskPath::new(DiskName::new("disk3"), "/a3"), + DiskPath::new(DiskName::new("disk4"), "/a4"), + DiskPath::new(DiskName::new("disk5"), "/a5"), + ], + ), + Node::new( + "test3_10000_3".to_string(), + "test3:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + DiskPath::new(DiskName::new("disk3"), "/a3"), + DiskPath::new(DiskName::new("disk4"), "/a4"), + DiskPath::new(DiskName::new("disk5"), "/a5"), + ], + ), + Node::new( + "test4_10000_4".to_string(), + "test4:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a2"), + DiskPath::new(DiskName::new("disk2"), "/a3"), + DiskPath::new(DiskName::new("disk3"), "/a4"), + DiskPath::new(DiskName::new("disk4"), "/a5"), + ], + ), + ] + ); + } + + #[test] + fn test_pattern_extend_nodes_invalid() { + let old_nodes = vec![]; + let pattern = "test[1-4]:[65535-65537]/a[2-5]".to_string(); // port type: u8 + let node_pattern = "{ip}_{port}_{id}".to_string(); + let result = pattern_extend_nodes(old_nodes, pattern, node_pattern); + assert!(result.is_err()); + } } diff --git a/bob-common/src/configs/cluster.rs b/bob-common/src/configs/cluster.rs index c6d47422..e822a8f7 100755 --- a/bob-common/src/configs/cluster.rs +++ b/bob-common/src/configs/cluster.rs @@ -28,7 +28,7 @@ impl Validatable for DiskPath { } /// Rack config struct, with name and [`Node`] names. -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct Rack { name: String, nodes: Vec, @@ -75,7 +75,7 @@ impl Validatable for Rack { } /// Node config struct, with name, address and [`DiskPath`]s. -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct Node { name: String, address: String, @@ -176,7 +176,7 @@ impl Validatable for Replica { } /// Config for virtual disks, stores replicas locations. -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct VDisk { id: u32, #[serde(default)] @@ -458,26 +458,6 @@ impl Cluster { Ok(config) } } - - pub fn disjoint_union_nodes(&mut self, other_nodes: Vec) { - for node2 in other_nodes { - if let Some(node1) = self - .nodes - .iter_mut() - .find(|node1| node1.address() == node2.address()) { - node2.disks.into_iter().for_each(|disk2| { - if !node1.disks.iter().any(|disk1| disk1.path() == disk2.path()) { - node1.disks.push(DiskPath::new( - DiskName::new(&format!("disk{}", node1.disks.len() + 1)), - disk2.path(), - )); - } - }); - } else { - self.nodes.push(node2); - } - } - } } impl Validatable for Cluster {