From d7c883cd7b138a9a983b12a1bff7a45a8c769a00 Mon Sep 17 00:00:00 2001 From: ikopylov Date: Fri, 20 Oct 2023 01:20:16 +0200 Subject: [PATCH 1/8] Update pearl to v0.20.0 (#848) * Update pearl to v0.20.0 * Update changelog From f972a59373da2e94a1901604b665da333e90e9b6 Mon Sep 17 00:00:00 2001 From: Tagir Asadullin Date: Mon, 13 Nov 2023 02:53:31 +0500 Subject: [PATCH 2/8] add range pattern expansion --- bob-apps/Cargo.toml | 1 + bob-apps/bin/ccg.rs | 105 +++++++++++++++++- .../bin/config_cluster_generator/utils.rs | 49 +++++++- bob-common/src/configs/cluster.rs | 15 +++ 4 files changed, 166 insertions(+), 4 deletions(-) diff --git a/bob-apps/Cargo.toml b/bob-apps/Cargo.toml index e76cd2b63..bfc2cf22a 100644 --- a/bob-apps/Cargo.toml +++ b/bob-apps/Cargo.toml @@ -43,6 +43,7 @@ env_logger = { workspace = true } clap = { workspace = true } tokio = { workspace = true, features = ["signal"] } pearl = { workspace = true } +itertools = "0.10.5" [target.'cfg(all(target_env = "musl", target_arch = "x86_64", target_pointer_width = "64"))'.dependencies] diff --git a/bob-apps/bin/ccg.rs b/bob-apps/bin/ccg.rs index b03fa1398..c5a180399 100644 --- a/bob-apps/bin/ccg.rs +++ b/bob-apps/bin/ccg.rs @@ -8,8 +8,15 @@ use bob::{ClusterConfig}; use clap::{App, Arg, ArgMatches, SubCommand}; use config_cluster_generator::{ center::{check_expand_configs, get_new_disks, get_new_racks, Center}, - utils::{init_logger, ceil, read_config_from_file, write_to_file}, + utils::{ + init_logger, ceil, read_config_from_file, write_to_file, + generate_range_samples, substitute_node, parse_address_pattern + }, }; +use itertools::Itertools; + +use bob_common::configs::cluster::{DistributionFunc, Node}; +use bob_common::core_types::{DiskName, DiskPath}; #[tokio::main] async fn main() { @@ -28,6 +35,7 @@ fn try_main() -> AnyResult<()> { match get_matches().subcommand() { ("new", Some(matches)) => subcommand_new(matches), ("expand", Some(matches)) => subcommand_expand(matches), + ("new-hw", Some(matches)) => subcommand_new_hw(matches), _ => Err(anyhow!("incorrect arguments: ERR")), } } @@ -67,6 +75,22 @@ fn subcommand_expand(matches: &ArgMatches) -> AnyResult<()> { Ok(()) } +fn subcommand_new_hw(matches: &ArgMatches) -> AnyResult<()> { + debug!("start new config by pattern generation"); + debug!("arguments: {:?}", matches); + let output = generate_hw_config(matches)?; + let output = serde_yaml::to_string(&output).expect("config serialization error"); + debug!("config cluster generation: OK"); + if let Some(name) = matches.value_of("output") { + write_to_file(output, name.to_owned()); + debug!("output to file: OK"); + } else { + println!("{}", output); + debug!("no file provided, stdout print: OK"); + } + Ok(()) +} + fn generate_config(matches: &ArgMatches, input: ClusterConfig) -> AnyResult { let replicas_count = get_replicas_count(matches)?; let (total_vdisks, vdisks_per_disk) = get_vdisks_total_and_per_disk(matches)?; @@ -93,6 +117,15 @@ fn expand_config( Ok(res) } +fn generate_hw_config(matches: &ArgMatches) -> AnyResult { + debug!("arguments: {:?}", matches); + let pattern = get_pattern(matches)?; + let node_pattern = get_nodename(matches); + let res = pattern_gen(pattern, node_pattern)?; + debug!("generate hw config: OK"); + Ok(res) +} + fn simple_expand( config: ClusterConfig, mut hardware_config: ClusterConfig, @@ -153,6 +186,39 @@ fn simple_gen( Ok(config) } +fn pattern_gen(pattern: String, node_pattern: String) -> AnyResult { + let nodes: Vec = generate_range_samples(&pattern) + .iter() + .map(|key| { + let (ip, port, path) = parse_address_pattern(&key).unwrap(); + ((ip, port), path) + }) + .group_by(|key| key.0) + .into_iter() + .enumerate() + .map(|(node_count, (ip_port, addresses))| { + let disks: Vec = addresses + .enumerate() + .map(|(disk_count, (_, disk))| { + DiskPath::new( + DiskName::new(format!("disk{}", disk_count + 1).as_str()), + disk.as_str(), + ) + }) + .collect(); + + Node::new( + substitute_node(&node_pattern, ip_port.0, ip_port.1, node_count).clone(), + format!("{}:{}", ip_port.0, ip_port.1), + disks, + ) + }) + .collect(); + let config = ClusterConfig::new(nodes, vec![], vec![], DistributionFunc::default()); + debug!("pattern gen: OK [\n{:#?}\n]", config); + Ok(config) +} + fn get_input_config_name(matches: &ArgMatches) -> String { let name = matches .value_of("input") @@ -197,6 +263,24 @@ fn get_vdisks_total_and_per_disk(matches: &ArgMatches) -> AnyResult<(Option AnyResult { + if let Some(name) = matches.value_of("pattern") { + debug!("get_pattern: OK [{}]", name); + Ok(name.to_owned()) + } else { + debug!("get_pattern: No value"); + Err(anyhow!("Failed no pattern present")) + } +} + +fn get_nodename(matches: &ArgMatches) -> String { + let name = matches + .value_of("nodename") + .expect("is some, because of default arg value"); + debug!("get_nodename: OK [{}]", name); + name.to_owned() +} + fn get_matches() -> ArgMatches<'static> { let input = Arg::with_name("input") .short("i") @@ -231,6 +315,15 @@ fn get_matches() -> ArgMatches<'static> { .long("use-racks") .help("Use racks field in config") .takes_value(false); + let pattern_config = Arg::with_name("pattern") + .short("p") + .help("Pattern for pattern generation") + .takes_value(true); + let nodename_config = Arg::with_name("nodename") + .short("n") + .default_value("Node_{ip}:{port}_{id}") + .help("Node name pattern for pattern generation") + .takes_value(true); debug!("input arg: OK"); let subcommand_expand = SubCommand::with_name("expand") .arg(input.clone()) @@ -239,15 +332,21 @@ fn get_matches() -> ArgMatches<'static> { .arg(hardware_config); let subcommand_new = SubCommand::with_name("new") - .arg(input) - .arg(output) + .arg(input.clone()) + .arg(output.clone()) .arg(vdisks_per_disk) .arg(vdisks_count) .arg(use_racks) .arg(replicas); + let subcommand_new_hw = SubCommand::with_name("new-hw") + .arg(output) + .arg(pattern_config) + .arg(nodename_config); + App::new("Config Cluster Generator") .subcommand(subcommand_expand) .subcommand(subcommand_new) + .subcommand(subcommand_new_hw) .get_matches() } diff --git a/bob-apps/bin/config_cluster_generator/utils.rs b/bob-apps/bin/config_cluster_generator/utils.rs index 6b2ef9867..9194fbc62 100644 --- a/bob-apps/bin/config_cluster_generator/utils.rs +++ b/bob-apps/bin/config_cluster_generator/utils.rs @@ -5,6 +5,7 @@ use env_logger::fmt::Color; use log::{Level, LevelFilter}; use std::fs::{File, OpenOptions}; use std::io::{Read, Write}; +use std::net::Ipv4Addr; pub fn init_logger() { env_logger::builder() @@ -89,4 +90,50 @@ pub fn ceil(a: usize, b: usize) -> usize { } else { a / b } -} \ No newline at end of file +} + +pub fn parse_address_pattern(pattern: &String) -> AnyResult<(Ipv4Addr, u16, String)> { + let re: regex::Regex = regex::Regex::new(r"^(\d+\.\d+\.\d+\.\d+):(\d+)(/.+)$").unwrap(); + + if let Some(captures) = re.captures(pattern) { + let ip = captures.get(1).unwrap().as_str(); + let port = captures.get(2).unwrap().as_str(); + let path = captures.get(3).unwrap().as_str().to_owned(); + + let ip: Ipv4Addr = ip.parse().map_err(|_| anyhow!("Failed to parse ip"))?; + let port: u16 = port.parse().map_err(|_| anyhow!("Failed to parse port"))?; + Ok((ip, port, path)) + } else { + Err(anyhow!("Failed to match the pattern")) + } +} +pub fn substitute_node(node_pattern: &String, ip: Ipv4Addr, port: u16, id: usize) -> String { + let substituted = node_pattern + .replace("{ip}", &ip.to_string()) + .replace("{port}", &port.to_string()) + .replace("{id}", &id.to_string()); + substituted +} + +pub fn generate_range_samples(pattern: &String) -> Vec { + let re = regex::Regex::new(r"\[(\d+)-(\d+)]").unwrap(); + let ranges = re.captures_iter(pattern).map(|captures| { + let start: usize = captures[1].parse().unwrap(); + let end: usize = captures[2].parse().unwrap(); + (start, end) + }); + + let mut samples: Vec = vec![pattern.to_string()]; + + for (start, end) in ranges { + samples = samples + .iter() + .flat_map(|template| { + (start..=end).map(move |i| { + template.replacen(&format!("[{}-{}]", start, end), &i.to_string(), 1) + }) + }) + .collect(); + } + samples +} diff --git a/bob-common/src/configs/cluster.rs b/bob-common/src/configs/cluster.rs index 206cb375b..7f048cce5 100755 --- a/bob-common/src/configs/cluster.rs +++ b/bob-common/src/configs/cluster.rs @@ -83,6 +83,13 @@ pub struct Node { } impl Node { + pub fn new(name: String, address: String, disks: Vec) -> Node { + Node { + name, + address, + disks, + } + } /// Returns node name, empty if name wasn't set in config. #[inline] #[must_use] @@ -247,6 +254,14 @@ pub struct Cluster { } impl Cluster { + pub fn new(nodes: Vec, vdisks: Vec, racks: Vec, distribution_func: DistributionFunc) -> Cluster { + Cluster { + nodes, + vdisks, + racks, + distribution_func, + } + } /// Returns slice with [`Node`]s. #[must_use] pub fn nodes(&self) -> &[Node] { From 15df29fe6a3c47e895d5df5550d35c1147f3719b Mon Sep 17 00:00:00 2001 From: Tagir Asadullin Date: Mon, 29 Jan 2024 21:22:34 +0500 Subject: [PATCH 3/8] implement extending config by pattern --- bob-apps/bin/ccg.rs | 90 +++++++----- bob-apps/bin/config_cluster_generator/mod.rs | 1 + .../bin/config_cluster_generator/pattern.rs | 134 ++++++++++++++++++ .../bin/config_cluster_generator/utils.rs | 47 ------ bob-common/src/configs/cluster.rs | 37 ++++- 5 files changed, 217 insertions(+), 92 deletions(-) create mode 100644 bob-apps/bin/config_cluster_generator/pattern.rs diff --git a/bob-apps/bin/ccg.rs b/bob-apps/bin/ccg.rs index c5a180399..13c710746 100644 --- a/bob-apps/bin/ccg.rs +++ b/bob-apps/bin/ccg.rs @@ -4,19 +4,14 @@ mod config_cluster_generator; extern crate log; use anyhow::{anyhow, Result as AnyResult}; -use bob::{ClusterConfig}; +use bob::ClusterConfig; +use bob_common::configs::{cluster::DistributionFunc}; use clap::{App, Arg, ArgMatches, SubCommand}; use config_cluster_generator::{ center::{check_expand_configs, get_new_disks, get_new_racks, Center}, - utils::{ - init_logger, ceil, read_config_from_file, write_to_file, - generate_range_samples, substitute_node, parse_address_pattern - }, + pattern::{pattern_to_nodes}, + utils::{ceil, init_logger, read_config_from_file, write_to_file}, }; -use itertools::Itertools; - -use bob_common::configs::cluster::{DistributionFunc, Node}; -use bob_common::core_types::{DiskName, DiskPath}; #[tokio::main] async fn main() { @@ -36,6 +31,7 @@ fn try_main() -> AnyResult<()> { ("new", Some(matches)) => subcommand_new(matches), ("expand", Some(matches)) => subcommand_expand(matches), ("new-hw", Some(matches)) => subcommand_new_hw(matches), + ("expand-hw", Some(matches)) => subcommand_expand_hw(matches), _ => Err(anyhow!("incorrect arguments: ERR")), } } @@ -91,6 +87,23 @@ fn subcommand_new_hw(matches: &ArgMatches) -> AnyResult<()> { Ok(()) } +fn subcommand_expand_hw(matches: &ArgMatches) -> AnyResult<()> { + debug!("start config extending with new nodes by range pattern"); + debug!("arguments: {:?}", matches); + let config = read_config_from_file(&get_input_config_name(matches))?; + let output = expand_hw_config(matches, config)?; + let output = serde_yaml::to_string(&output).expect("config serialization error"); + debug!("config cluster extending: OK"); + if let Some(name) = matches.value_of("output") { + write_to_file(output, name.to_owned()); + debug!("output to file: OK"); + } else { + println!("{}", output); + debug!("no file provided, stdout print: OK"); + } + Ok(()) +} + fn generate_config(matches: &ArgMatches, input: ClusterConfig) -> AnyResult { let replicas_count = get_replicas_count(matches)?; let (total_vdisks, vdisks_per_disk) = get_vdisks_total_and_per_disk(matches)?; @@ -126,6 +139,14 @@ fn generate_hw_config(matches: &ArgMatches) -> AnyResult { Ok(res) } +fn expand_hw_config(matches: &ArgMatches, config: ClusterConfig) -> AnyResult { + let pattern = get_pattern(matches)?; + let node_pattern = get_nodename(matches); + let res = pattern_expand(config, pattern, node_pattern)?; + debug!("expand hw config: OK"); + Ok(res) +} + fn simple_expand( config: ClusterConfig, mut hardware_config: ClusterConfig, @@ -187,38 +208,22 @@ fn simple_gen( } fn pattern_gen(pattern: String, node_pattern: String) -> AnyResult { - let nodes: Vec = generate_range_samples(&pattern) - .iter() - .map(|key| { - let (ip, port, path) = parse_address_pattern(&key).unwrap(); - ((ip, port), path) - }) - .group_by(|key| key.0) - .into_iter() - .enumerate() - .map(|(node_count, (ip_port, addresses))| { - let disks: Vec = addresses - .enumerate() - .map(|(disk_count, (_, disk))| { - DiskPath::new( - DiskName::new(format!("disk{}", disk_count + 1).as_str()), - disk.as_str(), - ) - }) - .collect(); - - Node::new( - substitute_node(&node_pattern, ip_port.0, ip_port.1, node_count).clone(), - format!("{}:{}", ip_port.0, ip_port.1), - disks, - ) - }) - .collect(); + let nodes = pattern_to_nodes(pattern, node_pattern)?; let config = ClusterConfig::new(nodes, vec![], vec![], DistributionFunc::default()); debug!("pattern gen: OK [\n{:#?}\n]", config); Ok(config) } +fn pattern_expand( + mut config: ClusterConfig, + pattern: String, + node_pattern: String, +) -> AnyResult { + let nodes = pattern_to_nodes(pattern, node_pattern)?; + config.disjoint_union_nodes(nodes); + Ok(config) +} + fn get_input_config_name(matches: &ArgMatches) -> String { let name = matches .value_of("input") @@ -269,7 +274,7 @@ fn get_pattern(matches: &ArgMatches) -> AnyResult { Ok(name.to_owned()) } else { debug!("get_pattern: No value"); - Err(anyhow!("Failed no pattern present")) + Err(anyhow!("Failed: no pattern present")) } } @@ -321,7 +326,7 @@ fn get_matches() -> ArgMatches<'static> { .takes_value(true); let nodename_config = Arg::with_name("nodename") .short("n") - .default_value("Node_{ip}:{port}_{id}") + .default_value("Node_{ip}_{port}_{id}") .help("Node name pattern for pattern generation") .takes_value(true); debug!("input arg: OK"); @@ -340,7 +345,13 @@ fn get_matches() -> ArgMatches<'static> { .arg(replicas); let subcommand_new_hw = SubCommand::with_name("new-hw") - .arg(output) + .arg(output.clone()) + .arg(pattern_config.clone()) + .arg(nodename_config.clone()); + + let subcommand_expand_hw = SubCommand::with_name("expand-hw") + .arg(input.clone()) + .arg(output.clone()) .arg(pattern_config) .arg(nodename_config); @@ -348,5 +359,6 @@ fn get_matches() -> ArgMatches<'static> { .subcommand(subcommand_expand) .subcommand(subcommand_new) .subcommand(subcommand_new_hw) + .subcommand(subcommand_expand_hw) .get_matches() } diff --git a/bob-apps/bin/config_cluster_generator/mod.rs b/bob-apps/bin/config_cluster_generator/mod.rs index 0642c25b8..a5b08aee6 100644 --- a/bob-apps/bin/config_cluster_generator/mod.rs +++ b/bob-apps/bin/config_cluster_generator/mod.rs @@ -1,2 +1,3 @@ pub mod center; +pub mod pattern; pub mod utils; diff --git a/bob-apps/bin/config_cluster_generator/pattern.rs b/bob-apps/bin/config_cluster_generator/pattern.rs new file mode 100644 index 000000000..0ff9f534c --- /dev/null +++ b/bob-apps/bin/config_cluster_generator/pattern.rs @@ -0,0 +1,134 @@ +use anyhow::{anyhow, Result as AnyResult}; +use itertools::Itertools; +use bob_common::configs::cluster::Node; +use bob_common::core_types::{DiskName, DiskPath}; + +fn parse_address_pattern(pattern: &String) -> AnyResult<((String, u16), String)> { + debug!("Pattern to parse: {}", pattern); + let re: regex::Regex = regex::Regex::new(r"^([\w.]+):(\d+)(/[\w/]+)$").unwrap(); + + if let Some(captures) = re.captures(pattern) { + let ip = captures.get(1).unwrap().as_str().to_owned(); + let port = captures.get(2).unwrap().as_str(); + let path = captures.get(3).unwrap().as_str().to_owned(); + + let port: u16 = port + .parse() + .map_err(|_| anyhow!("Failed to parse port {}", port))?; + Ok(((ip, port), path)) + } else { + Err(anyhow!("Failed to parse the pattern {}", pattern)) + } +} + +fn substitute_node(node_pattern: &str, ip: &str, port: u16, id: usize) -> String { + node_pattern + .replace("{ip}", &ip) + .replace("{port}", &port.to_string()) + .replace("{id}", &id.to_string()) +} + +fn generate_range_samples(pattern: &str) -> impl Iterator { + let re = regex::Regex::new(r"\[(\d+)-(\d+)]").unwrap(); + + let ranges = re.captures_iter(pattern).map(|captures| { + let start: usize = captures[1].parse().unwrap(); + let end: usize = captures[2].parse().unwrap(); + start..=end + }); + + re.split(pattern) + .zip_longest(ranges) + .map(|x| { + if let itertools::EitherOrBoth::Both(part, range) = x { + range.map(|i| part.to_string() + &i.to_string()).collect() + } else { + vec![x.left().unwrap().to_string()] + } + }) + .multi_cartesian_product() + .map(|x| x.concat()) + .into_iter() +} + +pub fn pattern_to_nodes(pattern: String, node_pattern: String) -> AnyResult> { + let mut err = Ok(()); + let nodes = generate_range_samples(&pattern) + .map_while(|key| { + let result = parse_address_pattern(&key); + match result { + Ok(((ip, port), path)) => Some(((ip, port), path)), + Err(e) => { err = Err(e); None }, + } + }) + .group_by(|key| key.0.clone()) + .into_iter() + .enumerate() + .map(|(node_count, (ip_port, addresses))| { + let disks: Vec = addresses + .enumerate() + .map(|(disk_count, disk)| { + DiskPath::new( + DiskName::new(format!("disk{}", disk_count + 1).as_str()), + disk.1.as_str(), + ) + }) + .collect(); + + Node::new( + substitute_node(node_pattern.as_str(), ip_port.0.as_str(), ip_port.1, node_count + 1), + format!("{}:{}", ip_port.0, ip_port.1), + disks, + ) + }) + .collect(); + err?; + Ok(nodes) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_generate_range_samples() { + let pattern = "abc[1-3]def"; + let samples: Vec = generate_range_samples(pattern).collect(); + assert_eq!(samples, vec!["abc1def", "abc2def", "abc3def"]); + + let pattern = "[0-1]a[1-2]b[2-3]"; + let samples: Vec = generate_range_samples(pattern).collect(); + assert_eq!( + samples, + vec!["0a1b2", "0a1b3", "0a2b2", "0a2b3", "1a1b2", "1a1b3", "1a2b2", "1a2b3"] + ); + + let pattern = "a[5-6]b[2-3]c"; + let samples: Vec = generate_range_samples(pattern).collect(); + assert_eq!(samples, vec!["a5b2c", "a5b3c", "a6b2c", "a6b3c"]); + + let pattern = "[5-5]a[0-0]"; + let samples: Vec = generate_range_samples(pattern).collect(); + assert_eq!(samples, vec!["5a0"]); + } + + #[test] + fn test_parse_address_pattern() { + let pattern = String::from("127.0.0.1:8080/disk/path"); + let result = parse_address_pattern(&pattern); + assert!(result.is_ok()); + + let ((ip, port), path) = result.unwrap(); + assert_eq!(ip, "127.0.0.1"); + assert_eq!(port, 8080); + assert_eq!(path, "/disk/path"); + + let pattern = String::from("127.0.0.1:65536/disk/path"); + let result = parse_address_pattern(&pattern); + assert!(result.is_err()); + + let pattern = String::from("a,a:8080/disk/path"); + let result = parse_address_pattern(&pattern); + assert!(result.is_err()); + } +} diff --git a/bob-apps/bin/config_cluster_generator/utils.rs b/bob-apps/bin/config_cluster_generator/utils.rs index 9194fbc62..65f9900ee 100644 --- a/bob-apps/bin/config_cluster_generator/utils.rs +++ b/bob-apps/bin/config_cluster_generator/utils.rs @@ -5,7 +5,6 @@ use env_logger::fmt::Color; use log::{Level, LevelFilter}; use std::fs::{File, OpenOptions}; use std::io::{Read, Write}; -use std::net::Ipv4Addr; pub fn init_logger() { env_logger::builder() @@ -91,49 +90,3 @@ pub fn ceil(a: usize, b: usize) -> usize { a / b } } - -pub fn parse_address_pattern(pattern: &String) -> AnyResult<(Ipv4Addr, u16, String)> { - let re: regex::Regex = regex::Regex::new(r"^(\d+\.\d+\.\d+\.\d+):(\d+)(/.+)$").unwrap(); - - if let Some(captures) = re.captures(pattern) { - let ip = captures.get(1).unwrap().as_str(); - let port = captures.get(2).unwrap().as_str(); - let path = captures.get(3).unwrap().as_str().to_owned(); - - let ip: Ipv4Addr = ip.parse().map_err(|_| anyhow!("Failed to parse ip"))?; - let port: u16 = port.parse().map_err(|_| anyhow!("Failed to parse port"))?; - Ok((ip, port, path)) - } else { - Err(anyhow!("Failed to match the pattern")) - } -} -pub fn substitute_node(node_pattern: &String, ip: Ipv4Addr, port: u16, id: usize) -> String { - let substituted = node_pattern - .replace("{ip}", &ip.to_string()) - .replace("{port}", &port.to_string()) - .replace("{id}", &id.to_string()); - substituted -} - -pub fn generate_range_samples(pattern: &String) -> Vec { - let re = regex::Regex::new(r"\[(\d+)-(\d+)]").unwrap(); - let ranges = re.captures_iter(pattern).map(|captures| { - let start: usize = captures[1].parse().unwrap(); - let end: usize = captures[2].parse().unwrap(); - (start, end) - }); - - let mut samples: Vec = vec![pattern.to_string()]; - - for (start, end) in ranges { - samples = samples - .iter() - .flat_map(|template| { - (start..=end).map(move |i| { - template.replacen(&format!("[{}-{}]", start, end), &i.to_string(), 1) - }) - }) - .collect(); - } - samples -} diff --git a/bob-common/src/configs/cluster.rs b/bob-common/src/configs/cluster.rs index 7f048cce5..c6d474221 100755 --- a/bob-common/src/configs/cluster.rs +++ b/bob-common/src/configs/cluster.rs @@ -1,16 +1,16 @@ use super::{ node::BackendType, reader::YamlBobConfig, - validation::{Validatable, Validator} + validation::{Validatable, Validator}, }; use crate::{ configs::node::Node as NodeConfig, + core_types::{DiskName, DiskPath, NodeDisk, VDiskId}, node::NodeName, - core_types::{DiskPath, VDiskId, NodeDisk, DiskName}, }; -use anyhow::{Result as AnyResult, anyhow}; +use anyhow::{anyhow, Result as AnyResult}; use http::Uri; -use std::collections::{ HashMap, HashSet }; +use std::collections::{HashMap, HashSet}; impl Validatable for DiskPath { fn validate(&self) -> Result<(), String> { @@ -254,7 +254,12 @@ pub struct Cluster { } impl Cluster { - pub fn new(nodes: Vec, vdisks: Vec, racks: Vec, distribution_func: DistributionFunc) -> Cluster { + pub fn new( + nodes: Vec, + vdisks: Vec, + racks: Vec, + distribution_func: DistributionFunc, + ) -> Cluster { Cluster { nodes, vdisks, @@ -287,7 +292,7 @@ impl Cluster { } /// Extends the vdisks collection with contents of the iterator. - pub fn vdisks_extend(&mut self, iter: impl IntoIterator) { + pub fn vdisks_extend(&mut self, iter: impl IntoIterator) { self.vdisks.extend(iter) } @@ -453,6 +458,26 @@ impl Cluster { Ok(config) } } + + pub fn disjoint_union_nodes(&mut self, other_nodes: Vec) { + for node2 in other_nodes { + if let Some(node1) = self + .nodes + .iter_mut() + .find(|node1| node1.address() == node2.address()) { + node2.disks.into_iter().for_each(|disk2| { + if !node1.disks.iter().any(|disk1| disk1.path() == disk2.path()) { + node1.disks.push(DiskPath::new( + DiskName::new(&format!("disk{}", node1.disks.len() + 1)), + disk2.path(), + )); + } + }); + } else { + self.nodes.push(node2); + } + } + } } impl Validatable for Cluster { From fc19bd291e8a96294be75bc9a8b19ddd557b9479 Mon Sep 17 00:00:00 2001 From: Tagir Asadullin Date: Wed, 31 Jan 2024 21:06:05 +0500 Subject: [PATCH 4/8] node expanding numeration fix (#568) --- bob-apps/bin/ccg.rs | 11 +- .../bin/config_cluster_generator/pattern.rs | 173 ++++++++++++++++-- bob-common/src/configs/cluster.rs | 26 +-- 3 files changed, 162 insertions(+), 48 deletions(-) diff --git a/bob-apps/bin/ccg.rs b/bob-apps/bin/ccg.rs index 13c710746..5b5bc1593 100644 --- a/bob-apps/bin/ccg.rs +++ b/bob-apps/bin/ccg.rs @@ -9,7 +9,7 @@ use bob_common::configs::{cluster::DistributionFunc}; use clap::{App, Arg, ArgMatches, SubCommand}; use config_cluster_generator::{ center::{check_expand_configs, get_new_disks, get_new_racks, Center}, - pattern::{pattern_to_nodes}, + pattern::{pattern_extend_nodes}, utils::{ceil, init_logger, read_config_from_file, write_to_file}, }; @@ -208,19 +208,20 @@ fn simple_gen( } fn pattern_gen(pattern: String, node_pattern: String) -> AnyResult { - let nodes = pattern_to_nodes(pattern, node_pattern)?; + let nodes = pattern_extend_nodes(vec![], pattern, node_pattern)?; let config = ClusterConfig::new(nodes, vec![], vec![], DistributionFunc::default()); debug!("pattern gen: OK [\n{:#?}\n]", config); Ok(config) } fn pattern_expand( - mut config: ClusterConfig, + config: ClusterConfig, pattern: String, node_pattern: String, ) -> AnyResult { - let nodes = pattern_to_nodes(pattern, node_pattern)?; - config.disjoint_union_nodes(nodes); + let nodes = pattern_extend_nodes(config.nodes().to_owned(), pattern, node_pattern)?; + let config = ClusterConfig::new(nodes, config.vdisks().to_owned(), config.racks().to_owned(), config.distribution_func()); + debug!("pattern extending: OK [\n{:#?}\n]", config); Ok(config) } diff --git a/bob-apps/bin/config_cluster_generator/pattern.rs b/bob-apps/bin/config_cluster_generator/pattern.rs index 0ff9f534c..9aa5ec490 100644 --- a/bob-apps/bin/config_cluster_generator/pattern.rs +++ b/bob-apps/bin/config_cluster_generator/pattern.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Result as AnyResult}; -use itertools::Itertools; use bob_common::configs::cluster::Node; use bob_common::core_types::{DiskName, DiskPath}; +use itertools::Itertools; fn parse_address_pattern(pattern: &String) -> AnyResult<((String, u16), String)> { debug!("Pattern to parse: {}", pattern); @@ -51,39 +51,73 @@ fn generate_range_samples(pattern: &str) -> impl Iterator { .into_iter() } -pub fn pattern_to_nodes(pattern: String, node_pattern: String) -> AnyResult> { +pub fn pattern_extend_nodes( + mut old_nodes: Vec, + pattern: String, + node_pattern: String, +) -> AnyResult> { let mut err = Ok(()); - let nodes = generate_range_samples(&pattern) + let mut node_counter = old_nodes.len(); + let new_nodes: Vec = generate_range_samples(&pattern) .map_while(|key| { let result = parse_address_pattern(&key); match result { Ok(((ip, port), path)) => Some(((ip, port), path)), - Err(e) => { err = Err(e); None }, + Err(e) => { + err = Err(e); + None + } } }) .group_by(|key| key.0.clone()) .into_iter() - .enumerate() - .map(|(node_count, (ip_port, addresses))| { - let disks: Vec = addresses - .enumerate() - .map(|(disk_count, disk)| { - DiskPath::new( - DiskName::new(format!("disk{}", disk_count + 1).as_str()), - disk.1.as_str(), - ) + .filter_map(|(ip_port, addresses)| { + let address = format!("{}:{}", ip_port.0, ip_port.1); + let existed_node = old_nodes + .iter() + .find_position(|node| node.address() == address); + + let mut old_disks = existed_node + .map(|(_, node)| node.disks().to_vec()) + .unwrap_or_default(); + + let mut disk_counter = old_disks.len(); + let new_disks: Vec = addresses + .filter_map(|(_, path)| { + let disk_path = path.as_str(); + if !old_nodes.is_empty() && old_disks.iter().any(|d| d.path() == disk_path) { + None + } else { + disk_counter += 1; + Some(DiskPath::new( + DiskName::new(&format!("disk{}", disk_counter)), + disk_path, + )) + } }) .collect(); - - Node::new( - substitute_node(node_pattern.as_str(), ip_port.0.as_str(), ip_port.1, node_count + 1), - format!("{}:{}", ip_port.0, ip_port.1), - disks, - ) + old_disks.extend_from_slice(&new_disks); + if let Some((pos, node)) = existed_node { + old_nodes[pos] = Node::new(node.name().to_string(), address, old_disks); + None + } else { + node_counter += 1; + Some(Node::new( + substitute_node( + node_pattern.as_str(), + ip_port.0.as_str(), + ip_port.1, + node_counter, + ), + address, + old_disks, + )) + } }) .collect(); err?; - Ok(nodes) + old_nodes.extend_from_slice(&new_nodes); + Ok(old_nodes) } #[cfg(test)] @@ -131,4 +165,103 @@ mod tests { let result = parse_address_pattern(&pattern); assert!(result.is_err()); } + #[test] + fn test_pattern_extend_nodes() { + let old_nodes = vec![]; + let pattern = "test[1-3]:10000/a[1-2]".to_string(); + let node_pattern = "{ip}_{port}_{id}".to_string(); + + let result = pattern_extend_nodes(old_nodes, pattern, node_pattern).unwrap(); + + assert_eq!( + result, + vec![ + Node::new( + "test1_10000_1".to_string(), + "test1:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + Node::new( + "test2_10000_2".to_string(), + "test2:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + Node::new( + "test3_10000_3".to_string(), + "test3:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + ] + ); + // extending + let old_nodes = result; + let pattern = "test[2-4]:10000/a[2-5]".to_string(); + let node_pattern = "{ip}_{port}_{id}".to_string(); + + let result = pattern_extend_nodes(old_nodes, pattern, node_pattern).unwrap(); + + assert_eq!( + result, + vec![ + Node::new( + "test1_10000_1".to_string(), + "test1:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + ], + ), + Node::new( + "test2_10000_2".to_string(), + "test2:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + DiskPath::new(DiskName::new("disk3"), "/a3"), + DiskPath::new(DiskName::new("disk4"), "/a4"), + DiskPath::new(DiskName::new("disk5"), "/a5"), + ], + ), + Node::new( + "test3_10000_3".to_string(), + "test3:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a1"), + DiskPath::new(DiskName::new("disk2"), "/a2"), + DiskPath::new(DiskName::new("disk3"), "/a3"), + DiskPath::new(DiskName::new("disk4"), "/a4"), + DiskPath::new(DiskName::new("disk5"), "/a5"), + ], + ), + Node::new( + "test4_10000_4".to_string(), + "test4:10000".to_string(), + vec![ + DiskPath::new(DiskName::new("disk1"), "/a2"), + DiskPath::new(DiskName::new("disk2"), "/a3"), + DiskPath::new(DiskName::new("disk3"), "/a4"), + DiskPath::new(DiskName::new("disk4"), "/a5"), + ], + ), + ] + ); + } + + #[test] + fn test_pattern_extend_nodes_invalid() { + let old_nodes = vec![]; + let pattern = "test[1-4]:[65535-65537]/a[2-5]".to_string(); // port type: u8 + let node_pattern = "{ip}_{port}_{id}".to_string(); + let result = pattern_extend_nodes(old_nodes, pattern, node_pattern); + assert!(result.is_err()); + } } diff --git a/bob-common/src/configs/cluster.rs b/bob-common/src/configs/cluster.rs index c6d474221..e822a8f72 100755 --- a/bob-common/src/configs/cluster.rs +++ b/bob-common/src/configs/cluster.rs @@ -28,7 +28,7 @@ impl Validatable for DiskPath { } /// Rack config struct, with name and [`Node`] names. -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct Rack { name: String, nodes: Vec, @@ -75,7 +75,7 @@ impl Validatable for Rack { } /// Node config struct, with name, address and [`DiskPath`]s. -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct Node { name: String, address: String, @@ -176,7 +176,7 @@ impl Validatable for Replica { } /// Config for virtual disks, stores replicas locations. -#[derive(Debug, PartialEq, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct VDisk { id: u32, #[serde(default)] @@ -458,26 +458,6 @@ impl Cluster { Ok(config) } } - - pub fn disjoint_union_nodes(&mut self, other_nodes: Vec) { - for node2 in other_nodes { - if let Some(node1) = self - .nodes - .iter_mut() - .find(|node1| node1.address() == node2.address()) { - node2.disks.into_iter().for_each(|disk2| { - if !node1.disks.iter().any(|disk1| disk1.path() == disk2.path()) { - node1.disks.push(DiskPath::new( - DiskName::new(&format!("disk{}", node1.disks.len() + 1)), - disk2.path(), - )); - } - }); - } else { - self.nodes.push(node2); - } - } - } } impl Validatable for Cluster { From 592e4a9984ec84ac9709a26e97a072d624d6212f Mon Sep 17 00:00:00 2001 From: Tagir Asadullin Date: Mon, 5 Feb 2024 18:50:01 +0500 Subject: [PATCH 5/8] Code improvements (#568) --- CHANGELOG.md | 1 + .../bin/config_cluster_generator/pattern.rs | 129 +++++++----------- .../bin/config_cluster_generator/utils.rs | 2 +- bob-common/src/configs/cluster.rs | 25 +++- 4 files changed, 70 insertions(+), 87 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 98b826591..8e2947aba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Bob versions changelog ## [Unreleased] #### Added - Blob performes fsync if buffered bytes are larger than max_dirty_bytes_before_sync config param (#748) +- Command to generate nodes section for cluster.yaml or add new nodes to an existing cluster.yaml using range syntax patterns. (#568) #### Changed - Use cargo workspace to declare dependencies to avoid their duplication (#821) diff --git a/bob-apps/bin/config_cluster_generator/pattern.rs b/bob-apps/bin/config_cluster_generator/pattern.rs index 9aa5ec490..93ae429b5 100644 --- a/bob-apps/bin/config_cluster_generator/pattern.rs +++ b/bob-apps/bin/config_cluster_generator/pattern.rs @@ -1,35 +1,37 @@ use anyhow::{anyhow, Result as AnyResult}; use bob_common::configs::cluster::Node; -use bob_common::core_types::{DiskName, DiskPath}; use itertools::Itertools; - -fn parse_address_pattern(pattern: &String) -> AnyResult<((String, u16), String)> { - debug!("Pattern to parse: {}", pattern); - let re: regex::Regex = regex::Regex::new(r"^([\w.]+):(\d+)(/[\w/]+)$").unwrap(); - - if let Some(captures) = re.captures(pattern) { - let ip = captures.get(1).unwrap().as_str().to_owned(); - let port = captures.get(2).unwrap().as_str(); - let path = captures.get(3).unwrap().as_str().to_owned(); - +use regex::{Captures, Regex}; + +fn parse_address_sample(sample: &str) -> AnyResult<((String, u16), String)> { + debug!("Sample to parse: {}", sample); + let re = Regex::new(r"^(?[\w.]+):(?\d+)(?/[\w/]+)$").unwrap(); + if let Some(captures) = re.captures(sample) { + let ip = captures["addr"].to_owned(); + let port = &captures["port"]; + let path = captures["path"].to_owned(); let port: u16 = port .parse() .map_err(|_| anyhow!("Failed to parse port {}", port))?; Ok(((ip, port), path)) } else { - Err(anyhow!("Failed to parse the pattern {}", pattern)) + Err(anyhow!("Failed to parse the sample {}", sample)) } } -fn substitute_node(node_pattern: &str, ip: &str, port: u16, id: usize) -> String { - node_pattern - .replace("{ip}", &ip) - .replace("{port}", &port.to_string()) - .replace("{id}", &id.to_string()) +fn substitute_node_pattern(node_pattern: &str, ip: &str, port: u16, id: usize) -> String { + let re = Regex::new(r"\{(\w+)}").unwrap(); + re.replace_all(node_pattern, |caps: &Captures| match &caps[1] { + "ip" => ip.to_string(), + "port" => port.to_string(), + "id" => id.to_string(), + _ => caps[0].to_string(), + }) + .to_string() } fn generate_range_samples(pattern: &str) -> impl Iterator { - let re = regex::Regex::new(r"\[(\d+)-(\d+)]").unwrap(); + let re = Regex::new(r"\[(\d+)-(\d+)]").unwrap(); let ranges = re.captures_iter(pattern).map(|captures| { let start: usize = captures[1].parse().unwrap(); @@ -43,7 +45,7 @@ fn generate_range_samples(pattern: &str) -> impl Iterator { if let itertools::EitherOrBoth::Both(part, range) = x { range.map(|i| part.to_string() + &i.to_string()).collect() } else { - vec![x.left().unwrap().to_string()] + vec![x.left().expect("is some because split > range").to_string()] } }) .multi_cartesian_product() @@ -56,73 +58,38 @@ pub fn pattern_extend_nodes( pattern: String, node_pattern: String, ) -> AnyResult> { - let mut err = Ok(()); + let parsed_samples = generate_range_samples(&pattern) + .map(|key| parse_address_sample(&key)) + .collect::>>()?; + let mut node_counter = old_nodes.len(); - let new_nodes: Vec = generate_range_samples(&pattern) - .map_while(|key| { - let result = parse_address_pattern(&key); - match result { - Ok(((ip, port), path)) => Some(((ip, port), path)), - Err(e) => { - err = Err(e); - None - } - } - }) - .group_by(|key| key.0.clone()) + for (ip_port, addresses) in parsed_samples + .iter() + .group_by(|(ip_port, _)| ip_port) .into_iter() - .filter_map(|(ip_port, addresses)| { - let address = format!("{}:{}", ip_port.0, ip_port.1); - let existed_node = old_nodes - .iter() - .find_position(|node| node.address() == address); - - let mut old_disks = existed_node - .map(|(_, node)| node.disks().to_vec()) - .unwrap_or_default(); + { + let address = format!("{}:{}", ip_port.0, ip_port.1); + if let Some(node) = old_nodes.iter_mut().find(|node| node.address() == address) { + node.merge_disks(addresses.map(|(_, path)| path.to_owned())); + } else { + node_counter += 1; + let mut new_node = Node::new( + substitute_node_pattern(&node_pattern, &ip_port.0, ip_port.1, node_counter), + address, + vec![], + ); + new_node.merge_disks(addresses.map(|(_, path)| path.to_owned())); + old_nodes.push(new_node); + } + } - let mut disk_counter = old_disks.len(); - let new_disks: Vec = addresses - .filter_map(|(_, path)| { - let disk_path = path.as_str(); - if !old_nodes.is_empty() && old_disks.iter().any(|d| d.path() == disk_path) { - None - } else { - disk_counter += 1; - Some(DiskPath::new( - DiskName::new(&format!("disk{}", disk_counter)), - disk_path, - )) - } - }) - .collect(); - old_disks.extend_from_slice(&new_disks); - if let Some((pos, node)) = existed_node { - old_nodes[pos] = Node::new(node.name().to_string(), address, old_disks); - None - } else { - node_counter += 1; - Some(Node::new( - substitute_node( - node_pattern.as_str(), - ip_port.0.as_str(), - ip_port.1, - node_counter, - ), - address, - old_disks, - )) - } - }) - .collect(); - err?; - old_nodes.extend_from_slice(&new_nodes); Ok(old_nodes) } #[cfg(test)] mod tests { use super::*; + use bob_common::core_types::{DiskName, DiskPath}; #[test] fn test_generate_range_samples() { @@ -149,7 +116,7 @@ mod tests { #[test] fn test_parse_address_pattern() { let pattern = String::from("127.0.0.1:8080/disk/path"); - let result = parse_address_pattern(&pattern); + let result = parse_address_sample(&pattern); assert!(result.is_ok()); let ((ip, port), path) = result.unwrap(); @@ -158,11 +125,11 @@ mod tests { assert_eq!(path, "/disk/path"); let pattern = String::from("127.0.0.1:65536/disk/path"); - let result = parse_address_pattern(&pattern); + let result = parse_address_sample(&pattern); assert!(result.is_err()); let pattern = String::from("a,a:8080/disk/path"); - let result = parse_address_pattern(&pattern); + let result = parse_address_sample(&pattern); assert!(result.is_err()); } #[test] @@ -259,7 +226,7 @@ mod tests { #[test] fn test_pattern_extend_nodes_invalid() { let old_nodes = vec![]; - let pattern = "test[1-4]:[65535-65537]/a[2-5]".to_string(); // port type: u8 + let pattern = "test[1-4]:[65535-65537]/a[2-5]".to_string(); // port type: u16 let node_pattern = "{ip}_{port}_{id}".to_string(); let result = pattern_extend_nodes(old_nodes, pattern, node_pattern); assert!(result.is_err()); diff --git a/bob-apps/bin/config_cluster_generator/utils.rs b/bob-apps/bin/config_cluster_generator/utils.rs index 65f9900ee..6b2ef9867 100644 --- a/bob-apps/bin/config_cluster_generator/utils.rs +++ b/bob-apps/bin/config_cluster_generator/utils.rs @@ -89,4 +89,4 @@ pub fn ceil(a: usize, b: usize) -> usize { } else { a / b } -} +} \ No newline at end of file diff --git a/bob-common/src/configs/cluster.rs b/bob-common/src/configs/cluster.rs index e822a8f72..a87e86fdf 100755 --- a/bob-common/src/configs/cluster.rs +++ b/bob-common/src/configs/cluster.rs @@ -1,16 +1,16 @@ use super::{ node::BackendType, reader::YamlBobConfig, - validation::{Validatable, Validator}, + validation::{Validatable, Validator} }; use crate::{ configs::node::Node as NodeConfig, - core_types::{DiskName, DiskPath, NodeDisk, VDiskId}, node::NodeName, + core_types::{DiskPath, VDiskId, NodeDisk, DiskName}, }; -use anyhow::{anyhow, Result as AnyResult}; +use anyhow::{Result as AnyResult, anyhow}; use http::Uri; -use std::collections::{HashMap, HashSet}; +use std::collections::{ HashMap, HashSet }; impl Validatable for DiskPath { fn validate(&self) -> Result<(), String> { @@ -83,6 +83,7 @@ pub struct Node { } impl Node { + #[must_use] pub fn new(name: String, address: String, disks: Vec) -> Node { Node { name, @@ -104,6 +105,20 @@ impl Node { &self.disks } + /// Merges new disk paths into a node's disk list, without duplicates. + pub fn merge_disks(&mut self, new_disks_paths: impl Iterator) { + let mut disk_counter = self.disks().len() + 1; + for disk_path in new_disks_paths { + if !self.disks().iter().any(|d| d.path() == disk_path) { + self.disks.push(DiskPath::new( + DiskName::new(&format!("disk{}", disk_counter)), + disk_path.as_str(), + )); + disk_counter += 1; + } + } + } + /// Returns node address, empty if address wasn't set in config. #[inline] #[must_use] @@ -292,7 +307,7 @@ impl Cluster { } /// Extends the vdisks collection with contents of the iterator. - pub fn vdisks_extend(&mut self, iter: impl IntoIterator) { + pub fn vdisks_extend(&mut self, iter: impl IntoIterator) { self.vdisks.extend(iter) } From 4e42b77e52a0f2df5725704fa2e900987628da48 Mon Sep 17 00:00:00 2001 From: Tagir Asadullin Date: Wed, 21 Feb 2024 01:23:42 +0500 Subject: [PATCH 6/8] Minor code improvements (#568) --- CHANGELOG.md | 2 +- bob-apps/bin/ccg.rs | 5 +- .../bin/config_cluster_generator/pattern.rs | 53 ++++++++++++------- bob-common/src/configs/cluster.rs | 24 +++------ 4 files changed, 42 insertions(+), 42 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e2947aba..4f6401e0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,7 @@ Bob versions changelog ## [Unreleased] #### Added - Blob performes fsync if buffered bytes are larger than max_dirty_bytes_before_sync config param (#748) -- Command to generate nodes section for cluster.yaml or add new nodes to an existing cluster.yaml using range syntax patterns. (#568) +- Command to generate nodes section for cluster.yaml or add new nodes to an existing cluster.yaml using range syntax patterns (#568) #### Changed - Use cargo workspace to declare dependencies to avoid their duplication (#821) diff --git a/bob-apps/bin/ccg.rs b/bob-apps/bin/ccg.rs index 5b5bc1593..f7098d8c1 100644 --- a/bob-apps/bin/ccg.rs +++ b/bob-apps/bin/ccg.rs @@ -5,7 +5,6 @@ extern crate log; use anyhow::{anyhow, Result as AnyResult}; use bob::ClusterConfig; -use bob_common::configs::{cluster::DistributionFunc}; use clap::{App, Arg, ArgMatches, SubCommand}; use config_cluster_generator::{ center::{check_expand_configs, get_new_disks, get_new_racks, Center}, @@ -209,7 +208,7 @@ fn simple_gen( fn pattern_gen(pattern: String, node_pattern: String) -> AnyResult { let nodes = pattern_extend_nodes(vec![], pattern, node_pattern)?; - let config = ClusterConfig::new(nodes, vec![], vec![], DistributionFunc::default()); + let config = ClusterConfig::new(nodes); debug!("pattern gen: OK [\n{:#?}\n]", config); Ok(config) } @@ -220,7 +219,7 @@ fn pattern_expand( node_pattern: String, ) -> AnyResult { let nodes = pattern_extend_nodes(config.nodes().to_owned(), pattern, node_pattern)?; - let config = ClusterConfig::new(nodes, config.vdisks().to_owned(), config.racks().to_owned(), config.distribution_func()); + let config = ClusterConfig::new(nodes); debug!("pattern extending: OK [\n{:#?}\n]", config); Ok(config) } diff --git a/bob-apps/bin/config_cluster_generator/pattern.rs b/bob-apps/bin/config_cluster_generator/pattern.rs index 93ae429b5..674679bc9 100644 --- a/bob-apps/bin/config_cluster_generator/pattern.rs +++ b/bob-apps/bin/config_cluster_generator/pattern.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, Result as AnyResult}; use bob_common::configs::cluster::Node; +use bob_common::core_types::{DiskName, DiskPath}; use itertools::Itertools; use regex::{Captures, Regex}; @@ -61,35 +62,47 @@ pub fn pattern_extend_nodes( let parsed_samples = generate_range_samples(&pattern) .map(|key| parse_address_sample(&key)) .collect::>>()?; - - let mut node_counter = old_nodes.len(); - for (ip_port, addresses) in parsed_samples + parsed_samples .iter() .group_by(|(ip_port, _)| ip_port) .into_iter() - { - let address = format!("{}:{}", ip_port.0, ip_port.1); - if let Some(node) = old_nodes.iter_mut().find(|node| node.address() == address) { - node.merge_disks(addresses.map(|(_, path)| path.to_owned())); - } else { - node_counter += 1; - let mut new_node = Node::new( - substitute_node_pattern(&node_pattern, &ip_port.0, ip_port.1, node_counter), - address, - vec![], - ); - new_node.merge_disks(addresses.map(|(_, path)| path.to_owned())); - old_nodes.push(new_node); - } - } - + .for_each(|(ip_port, addresses)| { + let address = format!("{}:{}", ip_port.0, ip_port.1); + let mut p_node = old_nodes.iter_mut().find(|node| node.address() == address); + if p_node.is_none() { + old_nodes.push(Node::new( + substitute_node_pattern( + &node_pattern, + &ip_port.0, + ip_port.1, + old_nodes.len() + 1, + ), + address, + vec![], + )); + p_node = old_nodes.last_mut(); + } + let p_node = p_node.expect("is some because it's either the pushed one or found"); + let old_disks = p_node.disks(); + let new_disks: Vec = addresses + .map(|(_, path)| path) + .filter(|disk_path| !p_node.disks().iter().any(|d| d.path() == *disk_path)) + .enumerate() + .map(|(idx, disk_path)| { + DiskPath::new( + DiskName::new(&format!("disk{}", idx + old_disks.len() + 1)), + disk_path.as_str(), + ) + }) + .collect(); + p_node.disks_extend(new_disks); + }); Ok(old_nodes) } #[cfg(test)] mod tests { use super::*; - use bob_common::core_types::{DiskName, DiskPath}; #[test] fn test_generate_range_samples() { diff --git a/bob-common/src/configs/cluster.rs b/bob-common/src/configs/cluster.rs index a87e86fdf..15b7ec57e 100755 --- a/bob-common/src/configs/cluster.rs +++ b/bob-common/src/configs/cluster.rs @@ -105,18 +105,9 @@ impl Node { &self.disks } - /// Merges new disk paths into a node's disk list, without duplicates. - pub fn merge_disks(&mut self, new_disks_paths: impl Iterator) { - let mut disk_counter = self.disks().len() + 1; - for disk_path in new_disks_paths { - if !self.disks().iter().any(|d| d.path() == disk_path) { - self.disks.push(DiskPath::new( - DiskName::new(&format!("disk{}", disk_counter)), - disk_path.as_str(), - )); - disk_counter += 1; - } - } + /// Extends the disks collection with contents of the iterator. + pub fn disks_extend(&mut self, iter: impl IntoIterator) { + self.disks.extend(iter) } /// Returns node address, empty if address wasn't set in config. @@ -271,15 +262,12 @@ pub struct Cluster { impl Cluster { pub fn new( nodes: Vec, - vdisks: Vec, - racks: Vec, - distribution_func: DistributionFunc, ) -> Cluster { Cluster { nodes, - vdisks, - racks, - distribution_func, + vdisks: Vec::default(), + racks: Vec::default(), + distribution_func: DistributionFunc::default(), } } /// Returns slice with [`Node`]s. From 348daad4954d3ad9429866f60a817d58947b3d8e Mon Sep 17 00:00:00 2001 From: Tagir Asadullin Date: Wed, 21 Feb 2024 13:13:33 +0500 Subject: [PATCH 7/8] Decompose nodes extending (#568) --- .../bin/config_cluster_generator/pattern.rs | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/bob-apps/bin/config_cluster_generator/pattern.rs b/bob-apps/bin/config_cluster_generator/pattern.rs index 674679bc9..a8ed565d9 100644 --- a/bob-apps/bin/config_cluster_generator/pattern.rs +++ b/bob-apps/bin/config_cluster_generator/pattern.rs @@ -54,6 +54,21 @@ fn generate_range_samples(pattern: &str) -> impl Iterator { .into_iter() } +fn pattern_extend_disks(node: &mut Node, disk_paths: impl Iterator) { + let old_disks = node.disks(); + let new_disks: Vec = disk_paths + .filter(|disk_path| !old_disks.iter().any(|d| d.path() == *disk_path)) + .enumerate() + .map(|(idx, disk_path)| { + DiskPath::new( + DiskName::new(&format!("disk{}", idx + old_disks.len() + 1)), + disk_path.as_str(), + ) + }) + .collect(); + node.disks_extend(new_disks); +} + pub fn pattern_extend_nodes( mut old_nodes: Vec, pattern: String, @@ -83,19 +98,7 @@ pub fn pattern_extend_nodes( p_node = old_nodes.last_mut(); } let p_node = p_node.expect("is some because it's either the pushed one or found"); - let old_disks = p_node.disks(); - let new_disks: Vec = addresses - .map(|(_, path)| path) - .filter(|disk_path| !p_node.disks().iter().any(|d| d.path() == *disk_path)) - .enumerate() - .map(|(idx, disk_path)| { - DiskPath::new( - DiskName::new(&format!("disk{}", idx + old_disks.len() + 1)), - disk_path.as_str(), - ) - }) - .collect(); - p_node.disks_extend(new_disks); + pattern_extend_disks(p_node, addresses.map(|(_, path)| path.to_owned())); }); Ok(old_nodes) } From 0505a72396e5ee424cd81b46425d53c71cd5b8da Mon Sep 17 00:00:00 2001 From: Tagir Asadullin Date: Mon, 18 Mar 2024 17:23:24 +0500 Subject: [PATCH 8/8] HashSet usage, split cycle in pattern extending (#568) --- .../bin/config_cluster_generator/pattern.rs | 86 +++++++++++-------- bob-common/src/configs/cluster.rs | 4 +- 2 files changed, 54 insertions(+), 36 deletions(-) diff --git a/bob-apps/bin/config_cluster_generator/pattern.rs b/bob-apps/bin/config_cluster_generator/pattern.rs index a8ed565d9..f256dbb0a 100644 --- a/bob-apps/bin/config_cluster_generator/pattern.rs +++ b/bob-apps/bin/config_cluster_generator/pattern.rs @@ -4,16 +4,17 @@ use bob_common::core_types::{DiskName, DiskPath}; use itertools::Itertools; use regex::{Captures, Regex}; -fn parse_address_sample(sample: &str) -> AnyResult<((String, u16), String)> { +type Sample = ((String, u16), String); + +fn parse_address_sample(sample: &str) -> AnyResult { debug!("Sample to parse: {}", sample); let re = Regex::new(r"^(?[\w.]+):(?\d+)(?/[\w/]+)$").unwrap(); if let Some(captures) = re.captures(sample) { let ip = captures["addr"].to_owned(); - let port = &captures["port"]; + let port = captures["port"] + .parse::() + .map_err(|_| anyhow!("Failed to parse port {}", &captures["port"]))?; let path = captures["path"].to_owned(); - let port: u16 = port - .parse() - .map_err(|_| anyhow!("Failed to parse port {}", port))?; Ok(((ip, port), path)) } else { Err(anyhow!("Failed to parse the sample {}", sample)) @@ -55,9 +56,9 @@ fn generate_range_samples(pattern: &str) -> impl Iterator { } fn pattern_extend_disks(node: &mut Node, disk_paths: impl Iterator) { - let old_disks = node.disks(); + let old_disks: std::collections::HashSet<_> = node.disks().iter().map(|d| d.path()).collect(); let new_disks: Vec = disk_paths - .filter(|disk_path| !old_disks.iter().any(|d| d.path() == *disk_path)) + .filter(|disk_path| !old_disks.contains(disk_path.as_str())) .enumerate() .map(|(idx, disk_path)| { DiskPath::new( @@ -69,42 +70,59 @@ fn pattern_extend_disks(node: &mut Node, disk_paths: impl Iterator, + parsed_samples: &Vec, + node_pattern: String, +) { + let existing_addresses: std::collections::HashSet<_> = + nodes.iter().map(|node| node.address()).collect(); + let mut new_nodes = Vec::new(); + for (ip_port, _) in &parsed_samples.iter().group_by(|(ip_port, _)| ip_port) { + let address = format!("{}:{}", ip_port.0, ip_port.1); + if !existing_addresses.contains(address.as_str()) { + let new_node = Node::new( + substitute_node_pattern( + &node_pattern, + &ip_port.0, + ip_port.1, + nodes.len() + new_nodes.len() + 1, + ), + address.to_owned(), + vec![], + ); + new_nodes.push(new_node); + } + } + nodes.extend(new_nodes); +} + pub fn pattern_extend_nodes( - mut old_nodes: Vec, + mut nodes: Vec, pattern: String, node_pattern: String, ) -> AnyResult> { let parsed_samples = generate_range_samples(&pattern) .map(|key| parse_address_sample(&key)) .collect::>>()?; - parsed_samples - .iter() - .group_by(|(ip_port, _)| ip_port) - .into_iter() - .for_each(|(ip_port, addresses)| { - let address = format!("{}:{}", ip_port.0, ip_port.1); - let mut p_node = old_nodes.iter_mut().find(|node| node.address() == address); - if p_node.is_none() { - old_nodes.push(Node::new( - substitute_node_pattern( - &node_pattern, - &ip_port.0, - ip_port.1, - old_nodes.len() + 1, - ), - address, - vec![], - )); - p_node = old_nodes.last_mut(); - } - let p_node = p_node.expect("is some because it's either the pushed one or found"); - pattern_extend_disks(p_node, addresses.map(|(_, path)| path.to_owned())); - }); - Ok(old_nodes) + + extend_nodes_by_samples(&mut nodes, &parsed_samples, node_pattern); + for (ip_port, paths) in &parsed_samples.iter().group_by(|(ip_port, _)| ip_port) { + if let Some(node) = nodes + .iter_mut() + .find(|node| node.address() == format!("{}:{}", ip_port.0, ip_port.1)) + { + let disks_to_extend = paths.map(|(_, path)| path.to_owned()); + pattern_extend_disks(node, disks_to_extend); + } + } + + Ok(nodes) } #[cfg(test)] mod tests { + use super::Node; use super::*; #[test] @@ -150,11 +168,11 @@ mod tests { } #[test] fn test_pattern_extend_nodes() { - let old_nodes = vec![]; + let nodes = vec![]; let pattern = "test[1-3]:10000/a[1-2]".to_string(); let node_pattern = "{ip}_{port}_{id}".to_string(); - let result = pattern_extend_nodes(old_nodes, pattern, node_pattern).unwrap(); + let result = pattern_extend_nodes(nodes, pattern, node_pattern).unwrap(); assert_eq!( result, diff --git a/bob-common/src/configs/cluster.rs b/bob-common/src/configs/cluster.rs index 15b7ec57e..bf6c56781 100755 --- a/bob-common/src/configs/cluster.rs +++ b/bob-common/src/configs/cluster.rs @@ -28,7 +28,7 @@ impl Validatable for DiskPath { } /// Rack config struct, with name and [`Node`] names. -#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct Rack { name: String, nodes: Vec, @@ -182,7 +182,7 @@ impl Validatable for Replica { } /// Config for virtual disks, stores replicas locations. -#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[derive(Debug, PartialEq, Serialize, Deserialize)] pub struct VDisk { id: u32, #[serde(default)]