From 428e8c1e2c8bf394ed66dd312f6fd82e7ab992e6 Mon Sep 17 00:00:00 2001 From: simonsan <14062932+simonsan@users.noreply.github.com> Date: Mon, 25 Nov 2024 14:17:25 +0100 Subject: [PATCH] feat: refactor summary handling in merge and archiver Signed-off-by: simonsan <14062932+simonsan@users.noreply.github.com> --- crates/core/src/archiver.rs | 15 +- crates/core/src/commands/check.rs | 285 ++++++++++++++++++++++-------- crates/core/src/commands/merge.rs | 10 +- crates/core/src/repository.rs | 14 +- 4 files changed, 236 insertions(+), 88 deletions(-) diff --git a/crates/core/src/archiver.rs b/crates/core/src/archiver.rs index fa2f6db6a..035be123b 100644 --- a/crates/core/src/archiver.rs +++ b/crates/core/src/archiver.rs @@ -84,11 +84,12 @@ impl<'a, BE: DecryptFullBackend, I: ReadGlobalIndex> Archiver<'a, BE, I> { mut snap: SnapshotFile, ) -> RusticResult { let indexer = Indexer::new(be.clone()).into_shared(); - let mut summary = snap.summary.take().unwrap_or_default(); - summary.backup_start = Local::now(); + let mut snapshot_summary = snap.summary.take().unwrap_or_default(); + snapshot_summary.backup_start = Local::now(); let file_archiver = FileArchiver::new(be.clone(), index, indexer.clone(), config)?; - let tree_archiver = TreeArchiver::new(be.clone(), index, indexer.clone(), config, summary)?; + let tree_archiver = + TreeArchiver::new(be.clone(), index, indexer.clone(), config, snapshot_summary)?; Ok(Self { file_archiver, @@ -213,20 +214,20 @@ impl<'a, BE: DecryptFullBackend, I: ReadGlobalIndex> Archiver<'a, BE, I> { })?; let stats = self.file_archiver.finalize()?; - let (id, mut summary) = self.tree_archiver.finalize(self.parent.tree_id())?; - stats.apply(&mut summary, BlobType::Data); + let (id, mut snapshot_summary) = self.tree_archiver.finalize(self.parent.tree_id())?; + stats.apply(&mut snapshot_summary, BlobType::Data); self.snap.tree = id; self.indexer.write().unwrap().finalize()?; - summary.finalize(self.snap.time).map_err(|err| { + snapshot_summary.finalize(self.snap.time).map_err(|err| { RusticError::with_source( ErrorKind::Internal, "Could not finalize summary, please check the logs for more information.", err, ) })?; - self.snap.summary = Some(summary); + self.snap.summary = Some(snapshot_summary); if !skip_identical_parent || Some(self.snap.tree) != self.parent.tree_id() { let id = self.be.save_file(&self.snap)?; diff --git a/crates/core/src/commands/check.rs b/crates/core/src/commands/check.rs index 5dcc82db3..7f6094299 100644 --- a/crates/core/src/commands/check.rs +++ b/crates/core/src/commands/check.rs @@ -11,7 +11,7 @@ use bytes::Bytes; use bytesize::ByteSize; use chrono::{Datelike, Local, NaiveDateTime, Timelike}; use derive_setters::Setters; -use log::{debug, error, warn}; +use log::{debug, error}; use rand::{prelude::SliceRandom, thread_rng, Rng}; use rayon::prelude::{IntoParallelIterator, ParallelIterator}; use zstd::stream::decode_all; @@ -21,7 +21,7 @@ use crate::{ blob::{tree::TreeStreamerOnce, BlobId, BlobType}, crypto::hasher::hash, error::{ - summary::{self, IssueScope, Summary}, + summary::{IssueScope, Summary}, RusticError, RusticResult, }, id::Id, @@ -241,7 +241,7 @@ pub(crate) fn check_repository( opts: CheckOptions, trees: Vec, ) -> RusticResult { - let summary = Summary::new("check"); + let summary = Arc::new(Mutex::new(Summary::new("check"))); let be = repo.dbe(); let cache = repo.cache(); let hot_be = &repo.be_hot; @@ -261,7 +261,9 @@ pub(crate) fn check_repository( // TODO: Make concurrency (20) customizable let cache_file_summary = check_cache_files(20, cache, raw_be, file_type, &p)?; - summary.merge(cache_file_summary); + if let Some(cache_file_summary) = cache_file_summary { + summary.lock().unwrap().merge(cache_file_summary); + } } } } @@ -269,12 +271,16 @@ pub(crate) fn check_repository( if let Some(hot_be) = hot_be { for file_type in [FileType::Snapshot, FileType::Index] { let hot_file_summary = check_hot_files(raw_be, hot_be, file_type, pb)?; - - summary.merge(hot_file_summary); + { + summary.lock().unwrap().merge(hot_file_summary); + } } } - let index_collector = check_packs(be, hot_be, pb)?; + let (index_collector, check_packs_summary) = check_packs(be, hot_be, pb)?; + { + summary.lock().unwrap().merge(check_packs_summary); + } if let Some(cache) = &cache { let p = pb.progress_spinner("cleaning up packs from cache..."); @@ -284,23 +290,33 @@ pub(crate) fn check_repository( .map(|(id, size)| (**id, *size)) .collect(); if let Err(err) = cache.remove_not_in_list(FileType::Pack, &ids) { - warn!( - "Error in cache backend removing pack files: {}", - err.display_log() + summary.lock().unwrap().add_warning( + IssueScope::Internal, + "Error in cache backend removing pack files", + Some(err.display_log().into()), ); } + p.finish(); if !opts.trust_cache { let p = pb.progress_bytes("checking packs in cache..."); + // TODO: Make concurrency (5) customizable - check_cache_files(5, cache, raw_be, FileType::Pack, &p)?; + let cache_file_summary = check_cache_files(5, cache, raw_be, FileType::Pack, &p)?; + + if let Some(cache_file_summary) = cache_file_summary { + summary.lock().unwrap().merge(cache_file_summary); + } } } let index_be = GlobalIndex::new_from_index(index_collector.into_index()); - let packs = check_trees(be, &index_be, trees, pb)?; + let (packs, check_trees_summary) = check_trees(be, &index_be, trees, pb)?; + { + summary.lock().unwrap().merge(check_trees_summary); + } if opts.read_data { let packs = index_be @@ -322,19 +338,42 @@ pub(crate) fn check_repository( let data = match be.read_full(FileType::Pack, &id) { Ok(data) => data, Err(err) => { - error!("Error reading data for pack {id} : {}", err.display_log()); + summary.lock().unwrap().add_error( + IssueScope::Internal, + format!("Error reading pack {id} from backend",), + Some(err.display_log().into()), + ); return; } }; match check_pack(be, pack, data, &p) { Ok(()) => {} - Err(err) => error!("Pack {id} is not valid: {}", err.display_log()), + Err(err) => { + summary.lock().unwrap().add_error( + IssueScope::Internal, + format!("Pack {id} is not valid",), + Some(err.display_log().into()), + ); + } } }); p.finish(); } - Ok(()) + let mut summary = Arc::try_unwrap(summary) + .map_err(|_err| RusticError::new(ErrorKind::Internal, "Error unwrapping Mutex from Arc."))? + .into_inner() + .map_err(|err| { + RusticError::with_source( + ErrorKind::Internal, + "Mutex poisoned while getting summary for check command.", + err, + ) + })?; + + summary.complete(); + + Ok(summary) } /// Checks if all files in the backend are also in the hot backend @@ -385,11 +424,18 @@ fn check_hot_files( } for (id, _) in files { - error!("hot file Type: {file_type:?}, Id: {id} is missing!",); + summary.add_error( + IssueScope::Internal, + format!("hot file Type: {file_type:?}, Id: {id} is missing in hot repo",), + None, + ); } + p.finish(); - Ok(()) + summary.complete(); + + Ok(summary) } /// Checks if all files in the cache are also in the backend @@ -411,12 +457,12 @@ fn check_cache_files( be: &impl ReadBackend, file_type: FileType, p: &impl Progress, -) -> RusticResult { +) -> RusticResult> { let summary = Arc::new(Mutex::new(Summary::new("cache files").enable_log())); let files = cache.list_with_size(file_type)?; if files.is_empty() { - return Ok(()); + return Ok(None); } let total_size = files.values().map(|size| u64::from(*size)).sum(); @@ -425,30 +471,28 @@ fn check_cache_files( files .into_par_iter() .for_each_with((cache, be, p.clone()), |(cache, be, p), (id, size)| { + let summary = Arc::clone(&summary); // Read file from cache and from backend and compare match ( cache.read_full(file_type, &id), be.read_full(file_type, &id), ) { (Err(err), _) => { - let mut summary = summary.lock().unwrap(); - summary.add_error( + summary.lock().unwrap().add_error( IssueScope::Internal, format!("Error reading cached file Type: {file_type:?}, Id: {id}",), - Some(err.display_log()), + Some(err.display_log().into()), ); } (_, Err(err)) => { - let mut summary = summary.lock().unwrap(); - summary.add_error( + summary.lock().unwrap().add_error( IssueScope::Internal, format!("Error reading file Type: {file_type:?}, Id: {id}",), - Some(err.display_log()), + Some(err.display_log().into()), ); } (Ok(Some(data_cached)), Ok(data)) if data_cached != data => { - let mut summary = summary.lock().unwrap(); - summary.add_error( + summary.lock().unwrap().add_error( IssueScope::Internal, format!("Cached file Type: {file_type:?}, Id: {id} is not identical to backend!",), None @@ -462,9 +506,20 @@ fn check_cache_files( p.finish(); - summary.lock().unwrap().complete(); + let mut summary = Arc::try_unwrap(summary) + .map_err(|_err| RusticError::new(ErrorKind::Internal, "Error unwrapping Mutex from Arc."))? + .into_inner() + .map_err(|err| { + RusticError::with_source( + ErrorKind::Internal, + "Mutex poisoned while getting summary for check command.", + err, + ) + })?; + + summary.complete(); - Ok(summary.into_inner().unwrap()) + Ok(Some(summary)) } /// Check if packs correspond to index and are present in the backend @@ -487,7 +542,8 @@ fn check_packs( be: &impl DecryptReadBackend, hot_be: &Option, pb: &impl ProgressBars, -) -> RusticResult { +) -> RusticResult<(IndexCollector, Summary)> { + let mut summary = Summary::new("packs").enable_log(); let mut packs = HashMap::new(); let mut tree_packs = HashMap::new(); let mut index_collector = IndexCollector::new(IndexType::Full); @@ -507,7 +563,11 @@ fn check_packs( // Check if time is set _ if check_time && p.time.is_none() { - error!("pack {}: No time is set! Run prune to correct this!", p.id); + summary.add_error( + IssueScope::Internal, + format!("pack {}: No time is set! Run prune to correct this!", p.id), + None, + ); } // check offsests in index @@ -516,16 +576,24 @@ fn check_packs( blobs.sort_unstable(); for blob in blobs { if blob.tpe != blob_type { - error!( - "pack {}: blob {} blob type does not match: type: {:?}, expected: {:?}", - p.id, blob.id, blob.tpe, blob_type + summary.add_error( + IssueScope::Internal, + format!( + "pack {}: blob {} blob type does not match: type: {:?}, expected: {blob_type:?}", + p.id, blob.id, blob.tpe + ), + None, ); } if blob.offset != expected_offset { - error!( - "pack {}: blob {} offset in index: {}, expected: {}", - p.id, blob.id, blob.offset, expected_offset + summary.add_error( + IssueScope::Internal, + format!( + "pack {}: blob {} offset in index: {}, expected: {expected_offset}", + p.id, blob.id, blob.offset + ), + None, ); } expected_offset += blob.length; @@ -537,15 +605,19 @@ fn check_packs( if let Some(hot_be) = hot_be { let p = pb.progress_spinner("listing packs in hot repo..."); - check_packs_list_hot(hot_be, tree_packs, &packs)?; + let packs_list_hot_summary = check_packs_list_hot(hot_be, tree_packs, &packs)?; + summary.merge(packs_list_hot_summary); p.finish(); } let p = pb.progress_spinner("listing packs..."); - check_packs_list(be, packs)?; + let check_packs_list_summary = check_packs_list(be, packs)?; + summary.merge(check_packs_list_summary); p.finish(); - Ok(index_collector) + summary.complete(); + + Ok((index_collector, summary)) } // TODO: Add documentation @@ -559,21 +631,42 @@ fn check_packs( /// # Errors /// /// * If a pack is missing or has a different size -fn check_packs_list(be: &impl ReadBackend, mut packs: HashMap) -> RusticResult<()> { +fn check_packs_list( + be: &impl ReadBackend, + mut packs: HashMap, +) -> RusticResult { + let mut summary = Summary::new("packs").enable_log(); for (id, size) in be.list_with_size(FileType::Pack)? { match packs.remove(&PackId::from(id)) { - None => warn!("pack {id} not referenced in index. Can be a parallel backup job. To repair: 'rustic repair index'."), + None => { + summary.add_warning( + IssueScope::Internal, + format!("pack {id} not referenced in index. Can be a parallel backup job. To repair: 'rustic repair index'."), + None, + ); + } Some(index_size) if index_size != size => { - error!("pack {id}: size computed by index: {index_size}, actual size: {size}. To repair: 'rustic repair index'."); + summary.add_error( + IssueScope::Internal, + format!("pack {id}: size computed by index: {index_size}, actual size: {size}. To repair: 'rustic repair index'."), + None, + ); } _ => {} //everything ok } } for (id, _) in packs { - error!("pack {id} is referenced by the index but not present! To repair: 'rustic repair index'.",); + summary.add_error( + IssueScope::Internal, + format!("pack {id} is referenced by the index but not present! To repair: 'rustic repair index'.",), + None, + ); } - Ok(()) + + summary.complete(); + + Ok(summary) } /// Checks if all packs in the backend are also in the index @@ -590,27 +683,48 @@ fn check_packs_list_hot( be: &impl ReadBackend, mut treepacks: HashMap, packs: &HashMap, -) -> RusticResult<()> { +) -> RusticResult { + let mut summary = Summary::new("hot packs").enable_log(); + for (id, size) in be.list_with_size(FileType::Pack)? { match treepacks.remove(&PackId::from(id)) { None => { if packs.contains_key(&PackId::from(id)) { - warn!("hot pack {id} is a data pack. This should not happen."); + summary.add_warning( + IssueScope::Internal, + format!("hot pack {id} is a data pack. This should not happen."), + None, + ); } else { - warn!("hot pack {id} not referenced in index. Can be a parallel backup job. To repair: 'rustic repair index'."); + summary.add_warning( + IssueScope::Internal, + format!("hot pack {id} not referenced in index. Can be a parallel backup job. To repair: 'rustic repair index'."), + None, + ); } } Some(index_size) if index_size != size => { - error!("hot pack {id}: size computed by index: {index_size}, actual size: {size}. To repair: 'rustic repair index'."); + summary.add_error( + IssueScope::Internal, + format!("hot pack {id}: size computed by index: {index_size}, actual size: {size}. To repair: 'rustic repair index'."), + None, + ); } _ => {} //everything ok } } for (id, _) in treepacks { - error!("tree pack {id} is referenced by the index but not present in hot repo! To repair: 'rustic repair index'.",); + summary.add_error( + IssueScope::Internal, + format!("hot pack {id} is referenced by the index but not present in hot repo! To repair: 'rustic repair index'.",), + None, + ); } - Ok(()) + + summary.complete(); + + Ok(summary) } /// Check if all snapshots and contained trees can be loaded and contents exist in the index @@ -628,7 +742,8 @@ fn check_trees( index: &impl ReadGlobalIndex, snap_trees: Vec, pb: &impl ProgressBars, -) -> RusticResult> { +) -> RusticResult<(BTreeSet, Summary)> { + let mut summary = Box::new(Summary::new("trees").enable_log()); let mut packs = BTreeSet::new(); let p = pb.progress_counter("checking trees..."); let mut tree_streamer = TreeStreamerOnce::new(be, index, snap_trees, p)?; @@ -636,22 +751,30 @@ fn check_trees( let (path, tree) = item; for node in tree.nodes { match node.node_type { - NodeType::File => node.content.as_ref().map_or_else( - || { - error!("file {:?} doesn't have a content", path.join(node.name())); - }, - |content| { + NodeType::File => { + if let Some(content) = node.content.as_ref() { for (i, id) in content.iter().enumerate() { if id.is_null() { - error!("file {:?} blob {} has null ID", path.join(node.name()), i); + summary.add_error( + IssueScope::Internal, + format!( + "file {:?} blob {} has null ID", + path.join(node.name()), + i + ), + None, + ); } match index.get_data(id) { None => { - error!( - "file {:?} blob {} is missing in index", - path.join(node.name()), - id + summary.add_error( + IssueScope::Internal, + format!( + "file {:?} blob {id} is missing in index", + path.join(node.name()) + ), + None, ); } Some(entry) => { @@ -659,23 +782,41 @@ fn check_trees( } } } - }, - ), + } else { + summary.add_error( + IssueScope::Internal, + format!("file {:?} doesn't have a content", path.join(node.name())), + None, + ); + } + } NodeType::Dir => { match node.subtree { None => { - error!("dir {:?} subtree does not exist", path.join(node.name())); + summary.add_error( + IssueScope::Internal, + format!("dir {:?} subtree does not exist", path.join(node.name())), + None, + ); } Some(tree) if tree.is_null() => { - error!("dir {:?} subtree has null ID", path.join(node.name())); + summary.add_error( + IssueScope::Internal, + format!("dir {:?} subtree has null ID", path.join(node.name())), + None, + ); } Some(id) => match index.get_tree(&id) { None => { - error!( - "dir {:?} subtree blob {} is missing in index", - path.join(node.name()), - id + summary.add_error( + IssueScope::Internal, + format!( + "dir {:?} subtree tree {} is missing in index", + path.join(node.name()), + id + ), + None, ); } Some(entry) => { @@ -690,7 +831,9 @@ fn check_trees( } } - Ok(packs) + summary.complete(); + + Ok((packs, *summary)) } /// Check if a pack is valid diff --git a/crates/core/src/commands/merge.rs b/crates/core/src/commands/merge.rs index 10d15dd19..4cf78610a 100644 --- a/crates/core/src/commands/merge.rs +++ b/crates/core/src/commands/merge.rs @@ -59,16 +59,16 @@ pub(crate) fn merge_snapshots( .max_by_key(|sn| sn.time) .map_or(now, |sn| sn.time); - let mut summary = snap.summary.take().unwrap_or_default(); - summary.backup_start = Local::now(); + let mut snapshot_summary = snap.summary.take().unwrap_or_default(); + snapshot_summary.backup_start = Local::now(); let trees: Vec = snapshots.iter().map(|sn| sn.tree).collect(); - snap.tree = merge_trees(repo, &trees, cmp, &mut summary)?; + snap.tree = merge_trees(repo, &trees, cmp, &mut snapshot_summary)?; - summary.finalize(now).map_err(|err| { + snapshot_summary.finalize(now).map_err(|err| { RusticError::with_source(ErrorKind::Internal, "Failed to finalize summary.", err) })?; - snap.summary = Some(summary); + snap.summary = Some(snapshot_summary); snap.id = repo.dbe().save_file(&snap)?.into(); Ok(snap) diff --git a/crates/core/src/repository.rs b/crates/core/src/repository.rs index d15d8f21d..2627198fc 100644 --- a/crates/core/src/repository.rs +++ b/crates/core/src/repository.rs @@ -46,7 +46,7 @@ use crate::{ restore::{collect_and_prepare, restore_repository, RestoreOptions, RestorePlan}, }, crypto::aespoly1305::Key, - error::{ErrorKind, RusticResult}, + error::{summary::Summary, ErrorKind, RusticResult}, index::{ binarysorted::{IndexCollector, IndexType}, GlobalIndex, IndexEntry, ReadGlobalIndex, ReadIndex, @@ -1146,16 +1146,16 @@ impl Repository { /// # Panics /// // TODO: Document panics - pub fn check(&self, opts: CheckOptions) -> RusticResult<()> { + pub fn check(&self, opts: CheckOptions) -> RusticResult { let trees = self .get_all_snapshots()? .into_iter() .map(|snap| snap.tree) .collect(); - check_repository(self, opts, trees)?; + let summary = check_repository(self, opts, trees)?; - Ok(()) + Ok(summary) } /// Check the repository and given trees for errors or inconsistencies @@ -1170,7 +1170,11 @@ impl Repository { /// # Panics /// // TODO: Document panics - pub fn check_with_trees(&self, opts: CheckOptions, trees: Vec) -> RusticResult<()> { + pub fn check_with_trees( + &self, + opts: CheckOptions, + trees: Vec, + ) -> RusticResult { check_repository(self, opts, trees) }