From 3bc0b0481825cbe628e1fb7c86cb0e054b42fbea Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 25 Nov 2024 01:21:00 +0100 Subject: [PATCH 1/3] Item: remove .chunks_healthy, fixes #8559 Well, it's not totally removed, some code in Item, Archive and borg transfer --from-borg1 needs to stay in place, so that we can pick the CORRECT chunks list that is in .chunks_healthy for all-zero-replacement-chunk-patched items when transferring archives from borg1 to borg2 repos. transfer: do not transfer replacement chunks, deal with missing chunks in other_repo FUSE fs read: IOError or all-zero result --- docs/internals/frontends.rst | 4 +- src/borg/archive.py | 103 +++--------------- src/borg/archiver/check_cmd.py | 23 +--- src/borg/archiver/compact_cmd.py | 33 ++---- src/borg/archiver/mount_cmds.py | 6 +- src/borg/archiver/recreate_cmd.py | 14 +-- src/borg/archiver/transfer_cmd.py | 97 ++++++++++------- src/borg/fuse.py | 25 ++--- src/borg/helpers/fs.py | 6 +- src/borg/helpers/parseformat.py | 6 - src/borg/testsuite/archiver/check_cmd_test.py | 63 +++-------- .../testsuite/archiver/mount_cmds_test.py | 14 ++- .../testsuite/archiver/transfer_cmd_test.py | 4 +- src/borg/upgrade.py | 9 +- 14 files changed, 134 insertions(+), 273 deletions(-) diff --git a/docs/internals/frontends.rst b/docs/internals/frontends.rst index 8b9b7faa3d..c29aa72cd3 100644 --- a/docs/internals/frontends.rst +++ b/docs/internals/frontends.rst @@ -480,8 +480,8 @@ Refer to the *borg list* documentation for the available keys and their meaning. Example (excerpt) of ``borg list --json-lines``:: - {"type": "d", "mode": "drwxr-xr-x", "user": "user", "group": "user", "uid": 1000, "gid": 1000, "path": "linux", "healthy": true, "target": "", "flags": null, "mtime": "2017-02-27T12:27:20.023407", "size": 0} - {"type": "d", "mode": "drwxr-xr-x", "user": "user", "group": "user", "uid": 1000, "gid": 1000, "path": "linux/baz", "healthy": true, "target": "", "flags": null, "mtime": "2017-02-27T12:27:20.585407", "size": 0} + {"type": "d", "mode": "drwxr-xr-x", "user": "user", "group": "user", "uid": 1000, "gid": 1000, "path": "linux", "target": "", "flags": null, "mtime": "2017-02-27T12:27:20.023407", "size": 0} + {"type": "d", "mode": "drwxr-xr-x", "user": "user", "group": "user", "uid": 1000, "gid": 1000, "path": "linux/baz", "target": "", "flags": null, "mtime": "2017-02-27T12:27:20.585407", "size": 0} Archive Differencing diff --git a/src/borg/archive.py b/src/borg/archive.py index 01d1617d17..384f68398b 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -280,7 +280,7 @@ def unpack_many(self, ids, *, filter=None): if filter is None or filter(item): if "chunks" in item: item.chunks = [ChunkListEntry(*e) for e in item.chunks] - if "chunks_healthy" in item: + if "chunks_healthy" in item: # legacy item.chunks_healthy = [ChunkListEntry(*e) for e in item.chunks_healthy] yield item @@ -762,7 +762,6 @@ def same_item(item, st): # if a previous extraction was interrupted between setting the mtime and setting non-default flags. return True - has_damaged_chunks = "chunks_healthy" in item if dry_run or stdout: with self.extract_helper(item, "", hlm, dry_run=dry_run or stdout) as hardlink_set: if not hardlink_set: @@ -789,8 +788,6 @@ def same_item(item, st): item_size, item_chunks_size ) ) - if has_damaged_chunks: - raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.") return dest = self.cwd @@ -845,8 +842,6 @@ def make_parent(path): raise BackupError( f"Size inconsistency detected: size {item_size}, chunks size {item_chunks_size}" ) - if has_damaged_chunks: - raise BackupError("File has damaged (all-zero) chunks. Try running borg check --repair.") return with backup_io: # No repository access beyond this point. @@ -1159,10 +1154,6 @@ def chunk_processor(chunk): return chunk_entry item.chunks = [] - # if we rechunkify, we'll get a fundamentally different chunks list, thus we need - # to get rid of .chunks_healthy, as it might not correspond to .chunks any more. - if self.rechunkify and "chunks_healthy" in item: - del item.chunks_healthy for chunk in chunk_iter: chunk_entry = chunk_processor(chunk) item.chunks.append(chunk_entry) @@ -1779,13 +1770,10 @@ def verify_data(self): if defect_chunks: if self.repair: # if we kill the defect chunk here, subsequent actions within this "borg check" - # run will find missing chunks and replace them with all-zero replacement - # chunks and flag the files as "repaired". - # if another backup is done later and the missing chunks get backed up again, - # a "borg check" afterwards can heal all files where this chunk was missing. + # run will find missing chunks. logger.warning( - "Found defect chunks. They will be deleted now, so affected files can " - "get repaired now and maybe healed later." + "Found defect chunks and will delete them now. " + "Reading files referencing these chunks will result in an I/O error." ) for defect_chunk in defect_chunks: # remote repo (ssh): retry might help for strange network / NIC / RAM errors @@ -1805,10 +1793,7 @@ def verify_data(self): else: logger.warning("chunk %s not deleted, did not consistently fail.", bin_to_hex(defect_chunk)) else: - logger.warning( - "Found defect chunks. With --repair, they would get deleted, so affected " - "files could get repaired then and maybe healed later." - ) + logger.warning("Found defect chunks. With --repair, they would get deleted.") for defect_chunk in defect_chunks: logger.debug("chunk %s is defect.", bin_to_hex(defect_chunk)) log = logger.error if errors else logger.info @@ -1919,80 +1904,18 @@ def add_reference(id_, size, cdata): self.repository.put(id_, cdata) def verify_file_chunks(archive_name, item): - """Verifies that all file chunks are present. - - Missing file chunks will be replaced with new chunks of the same length containing all zeros. - If a previously missing file chunk re-appears, the replacement chunk is replaced by the correct one. - """ - - def replacement_chunk(size): - chunk = Chunk(None, allocation=CH_ALLOC, size=size) - chunk_id, data = cached_hash(chunk, self.key.id_hash) - cdata = self.repo_objs.format(chunk_id, {}, data, ro_type=ROBJ_FILE_STREAM) - return chunk_id, size, cdata - + """Verifies that all file chunks are present. Missing file chunks will be logged.""" offset = 0 - chunk_list = [] - chunks_replaced = False - has_chunks_healthy = "chunks_healthy" in item - chunks_current = item.chunks - chunks_healthy = item.chunks_healthy if has_chunks_healthy else chunks_current - if has_chunks_healthy and len(chunks_current) != len(chunks_healthy): - # should never happen, but there was issue #3218. - logger.warning(f"{archive_name}: {item.path}: Invalid chunks_healthy metadata removed!") - del item.chunks_healthy - has_chunks_healthy = False - chunks_healthy = chunks_current - for chunk_current, chunk_healthy in zip(chunks_current, chunks_healthy): - chunk_id, size = chunk_healthy + for chunk in item.chunks: + chunk_id, size = chunk if chunk_id not in self.chunks: - # a chunk of the healthy list is missing - if chunk_current == chunk_healthy: - logger.error( - "{}: {}: New missing file chunk detected (Byte {}-{}, Chunk {}). " - "Replacing with all-zero chunk.".format( - archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id) - ) + logger.error( + "{}: {}: Missing file chunk detected (Byte {}-{}, Chunk {}).".format( + archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id) ) - self.error_found = chunks_replaced = True - chunk_id, size, cdata = replacement_chunk(size) - add_reference(chunk_id, size, cdata) - else: - logger.info( - "{}: {}: Previously missing file chunk is still missing (Byte {}-{}, Chunk {}). " - "It has an all-zero replacement chunk already.".format( - archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id) - ) - ) - chunk_id, size = chunk_current - if chunk_id not in self.chunks: - logger.warning( - "{}: {}: Missing all-zero replacement chunk detected (Byte {}-{}, Chunk {}). " - "Generating new replacement chunk.".format( - archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id) - ) - ) - self.error_found = chunks_replaced = True - chunk_id, size, cdata = replacement_chunk(size) - add_reference(chunk_id, size, cdata) - else: - if chunk_current == chunk_healthy: - pass # normal case, all fine. - else: - logger.info( - "{}: {}: Healed previously missing file chunk! (Byte {}-{}, Chunk {}).".format( - archive_name, item.path, offset, offset + size, bin_to_hex(chunk_id) - ) - ) - chunk_list.append([chunk_id, size]) # list-typed element as chunks_healthy is list-of-lists + ) + self.error_found = True offset += size - if chunks_replaced and not has_chunks_healthy: - # if this is first repair, remember the correct chunk IDs, so we can maybe heal the file later - item.chunks_healthy = item.chunks - if has_chunks_healthy and chunk_list == chunks_healthy: - logger.info(f"{archive_name}: {item.path}: Completely healed previously damaged file!") - del item.chunks_healthy - item.chunks = chunk_list if "size" in item: item_size = item.size item_chunks_size = item.get_size(from_chunks=True) diff --git a/src/borg/archiver/check_cmd.py b/src/borg/archiver/check_cmd.py index 0bfda9b78b..a9bd9f05b3 100644 --- a/src/borg/archiver/check_cmd.py +++ b/src/borg/archiver/check_cmd.py @@ -168,28 +168,7 @@ def build_parser_check(self, subparsers, common_parser, mid_common_parser): 2. When checking the consistency and correctness of archives, repair mode might remove whole archives from the manifest if their archive metadata chunk is - corrupt or lost. On a chunk level (i.e. the contents of files), repair mode - will replace corrupt or lost chunks with a same-size replacement chunk of - zeroes. If a previously zeroed chunk reappears, repair mode will restore - this lost chunk using the new chunk. - - Most steps taken by repair mode have a one-time effect on the repository, like - removing a lost archive from the repository. However, replacing a corrupt or - lost chunk with an all-zero replacement will have an ongoing effect on the - repository: When attempting to extract a file referencing an all-zero chunk, - the ``extract`` command will distinctly warn about it. The FUSE filesystem - created by the ``mount`` command will reject reading such a "zero-patched" - file unless a special mount option is given. - - As mentioned earlier, Borg might be able to "heal" a "zero-patched" file in - repair mode, if all its previously lost chunks reappear (e.g. via a later - backup). This is achieved by Borg not only keeping track of the all-zero - replacement chunks, but also by keeping metadata about the lost chunks. In - repair mode Borg will check whether a previously lost chunk reappeared and will - replace the all-zero replacement chunk by the reappeared chunk. If all lost - chunks of a "zero-patched" file reappear, this effectively "heals" the file. - Consequently, if lost chunks were repaired earlier, it is advised to run - ``--repair`` a second time after creating some new backups. + corrupt or lost. Borg will also report files that reference missing chunks. If ``--repair --find-lost-archives`` is given, previously lost entries will be recreated in the archive directory. This is only possible before diff --git a/src/borg/archiver/compact_cmd.py b/src/borg/archiver/compact_cmd.py index 133630820f..1dbbfa355a 100644 --- a/src/borg/archiver/compact_cmd.py +++ b/src/borg/archiver/compact_cmd.py @@ -6,7 +6,7 @@ from ..cache import write_chunkindex_to_repo_cache, build_chunkindex_from_repo from ..constants import * # NOQA from ..hashindex import ChunkIndex, ChunkIndexEntry -from ..helpers import set_ec, EXIT_WARNING, EXIT_ERROR, format_file_size, bin_to_hex +from ..helpers import set_ec, EXIT_ERROR, format_file_size, bin_to_hex from ..helpers import ProgressIndicatorPercent from ..manifest import Manifest from ..remote import RemoteRepository @@ -39,9 +39,7 @@ def garbage_collect(self): logger.info("Starting compaction / garbage collection...") self.chunks = self.get_repository_chunks() logger.info("Computing object IDs used by archives...") - (self.missing_chunks, self.reappeared_chunks, self.total_files, self.total_size, self.archives_count) = ( - self.analyze_archives() - ) + (self.missing_chunks, self.total_files, self.total_size, self.archives_count) = self.analyze_archives() self.report_and_delete() self.save_chunk_index() logger.info("Finished compaction / garbage collection...") @@ -73,28 +71,24 @@ def save_chunk_index(self): self.chunks.clear() # we already have updated the repo cache in get_repository_chunks self.chunks = None # nothing there (cleared!) - def analyze_archives(self) -> Tuple[Set, Set, int, int, int]: - """Iterate over all items in all archives, create the dicts id -> size of all used/wanted chunks.""" + def analyze_archives(self) -> Tuple[Set, int, int, int]: + """Iterate over all items in all archives, create the dicts id -> size of all used chunks.""" - def use_it(id, *, wanted=False): + def use_it(id): entry = self.chunks.get(id) if entry is not None: # the chunk is in the repo, mark it used. self.chunks[id] = entry._replace(flags=entry.flags | ChunkIndex.F_USED) - if wanted: - # chunk id is from chunks_healthy list: a lost chunk has re-appeared! - reappeared_chunks.add(id) else: # with --stats: we do NOT have this chunk in the repository! # without --stats: we do not have this chunk or the chunks index is incomplete. missing_chunks.add(id) missing_chunks: set[bytes] = set() - reappeared_chunks: set[bytes] = set() archive_infos = self.manifest.archives.list(sort_by=["ts"]) num_archives = len(archive_infos) pi = ProgressIndicatorPercent( - total=num_archives, msg="Computing used/wanted chunks %3.1f%%", step=0.1, msgid="compact.analyze_archives" + total=num_archives, msg="Computing used chunks %3.1f%%", step=0.1, msgid="compact.analyze_archives" ) total_size, total_files = 0, 0 for i, info in enumerate(archive_infos): @@ -114,25 +108,14 @@ def use_it(id, *, wanted=False): for id, size in item.chunks: total_size += size # original, uncompressed file content size use_it(id) - if "chunks_healthy" in item: - # we also consider the chunks_healthy chunks as referenced - do not throw away - # anything that borg check --repair might still need. - for id, size in item.chunks_healthy: - use_it(id, wanted=True) pi.finish() - return missing_chunks, reappeared_chunks, total_files, total_size, num_archives + return missing_chunks, total_files, total_size, num_archives def report_and_delete(self): - run_repair = " Run borg check --repair!" - if self.missing_chunks: - logger.error(f"Repository has {len(self.missing_chunks)} missing objects." + run_repair) + logger.error(f"Repository has {len(self.missing_chunks)} missing objects!") set_ec(EXIT_ERROR) - if self.reappeared_chunks: - logger.warning(f"{len(self.reappeared_chunks)} previously missing objects re-appeared!" + run_repair) - set_ec(EXIT_WARNING) - logger.info("Cleaning archives directory from soft-deleted archives...") archive_infos = self.manifest.archives.list(sort_by=["ts"], deleted=True) for archive_info in archive_infos: diff --git a/src/borg/archiver/mount_cmds.py b/src/borg/archiver/mount_cmds.py index 7535198cc0..7dcb2eda43 100644 --- a/src/borg/archiver/mount_cmds.py +++ b/src/borg/archiver/mount_cmds.py @@ -104,9 +104,9 @@ def build_parser_mount_umount(self, subparsers, common_parser, mid_common_parser - ``versions``: when used with a repository mount, this gives a merged, versioned view of the files in the archives. EXPERIMENTAL, layout may change in future. - - ``allow_damaged_files``: by default damaged files (where missing chunks were - replaced with runs of zeros by ``borg check --repair``) are not readable and - return EIO (I/O error). Set this option to read such files. + - ``allow_damaged_files``: by default damaged files (where chunks are missing) + will return EIO (I/O error) when trying to read the related parts of the file. + Set this option to replace the missing parts with all-zero bytes. - ``ignore_permissions``: for security reasons the ``default_permissions`` mount option is internally enforced by borg. ``ignore_permissions`` can be given to not enforce ``default_permissions``. diff --git a/src/borg/archiver/recreate_cmd.py b/src/borg/archiver/recreate_cmd.py index 26efe46fcd..c528d9018c 100644 --- a/src/borg/archiver/recreate_cmd.py +++ b/src/borg/archiver/recreate_cmd.py @@ -95,16 +95,10 @@ def build_parser_recreate(self, subparsers, common_parser, mid_common_parser): at least the entire deduplicated size of the archives using the previous chunker params. - If you recently ran borg check --repair and it had to fix lost chunks with all-zero - replacement chunks, please first run another backup for the same data and re-run - borg check --repair afterwards to heal any archives that had lost chunks which are - still generated from the input data. - - Important: running borg recreate to re-chunk will remove the chunks_healthy - metadata of all items with replacement chunks, so healing will not be possible - any more after re-chunking (it is also unlikely it would ever work: due to the - change of chunking parameters, the missing chunk likely will never be seen again - even if you still have the data that produced it). + If your most recent borg check found missing chunks, please first run another + backup for the same data, before doing any rechunking. If you are lucky, that + will re-create the missing chunks. Optionally, do another borg check, to see + if the chunks are still missing). """ ) subparser = subparsers.add_parser( diff --git a/src/borg/archiver/transfer_cmd.py b/src/borg/archiver/transfer_cmd.py index 73a9ad79a2..8b748da7ed 100644 --- a/src/borg/archiver/transfer_cmd.py +++ b/src/borg/archiver/transfer_cmd.py @@ -9,6 +9,8 @@ from ..helpers import location_validator, Location, archivename_validator, comment_validator from ..helpers import format_file_size, bin_to_hex from ..manifest import Manifest +from ..legacyrepository import LegacyRepository +from ..repository import Repository from ..logger import create_logger @@ -111,51 +113,64 @@ def do_transfer(self, args, *, repository, manifest, cache, other_repository=Non # so let's remove them from old archives also, considering there is no # code any more that deals with them in special ways (e.g. to get stats right). continue - if "chunks" in item: + if "chunks_healthy" in item: # legacy + other_chunks = item.chunks_healthy # chunks_healthy has the CORRECT chunks list, if present. + elif "chunks" in item: + other_chunks = item.chunks + else: + other_chunks = None + if other_chunks is not None: chunks = [] - for chunk_id, size in item.chunks: + for chunk_id, size in other_chunks: chunk_present = cache.seen_chunk(chunk_id, size) if not chunk_present: # target repo does not yet have this chunk if not dry_run: - cdata = other_repository.get(chunk_id) - if args.recompress == "never": - # keep compressed payload same, verify via assert_id (that will - # decompress, but avoid needing to compress it again): - meta, data = other_manifest.repo_objs.parse( - chunk_id, - cdata, - decompress=True, - want_compressed=True, - ro_type=ROBJ_FILE_STREAM, - ) - meta, data = upgrader.upgrade_compressed_chunk(meta, data) - chunk_entry = cache.add_chunk( - chunk_id, - meta, - data, - stats=archive.stats, - wait=False, - compress=False, - size=size, - ctype=meta["ctype"], - clevel=meta["clevel"], - ro_type=ROBJ_FILE_STREAM, - ) - elif args.recompress == "always": - # always decompress and re-compress file data chunks - meta, data = other_manifest.repo_objs.parse( - chunk_id, cdata, ro_type=ROBJ_FILE_STREAM - ) - chunk_entry = cache.add_chunk( - chunk_id, - meta, - data, - stats=archive.stats, - wait=False, - ro_type=ROBJ_FILE_STREAM, - ) + try: + cdata = other_repository.get(chunk_id) + except (Repository.ObjectNotFound, LegacyRepository.ObjectNotFound): + # missing correct chunk in other_repository (source) will result in + # a missing chunk in repository (destination). + # we do NOT want to transfer all-zero replacement chunks from borg1 repos. + pass else: - raise ValueError(f"unsupported recompress mode: {args.recompress}") + if args.recompress == "never": + # keep compressed payload same, verify via assert_id (that will + # decompress, but avoid needing to compress it again): + meta, data = other_manifest.repo_objs.parse( + chunk_id, + cdata, + decompress=True, + want_compressed=True, + ro_type=ROBJ_FILE_STREAM, + ) + meta, data = upgrader.upgrade_compressed_chunk(meta, data) + chunk_entry = cache.add_chunk( + chunk_id, + meta, + data, + stats=archive.stats, + wait=False, + compress=False, + size=size, + ctype=meta["ctype"], + clevel=meta["clevel"], + ro_type=ROBJ_FILE_STREAM, + ) + elif args.recompress == "always": + # always decompress and re-compress file data chunks + meta, data = other_manifest.repo_objs.parse( + chunk_id, cdata, ro_type=ROBJ_FILE_STREAM + ) + chunk_entry = cache.add_chunk( + chunk_id, + meta, + data, + stats=archive.stats, + wait=False, + ro_type=ROBJ_FILE_STREAM, + ) + else: + raise ValueError(f"unsupported recompress mode: {args.recompress}") cache.repository.async_response(wait=False) chunks.append(chunk_entry) transfer_size += size @@ -165,7 +180,7 @@ def do_transfer(self, args, *, repository, manifest, cache, other_repository=Non chunks.append(chunk_entry) present_size += size if not dry_run: - item.chunks = chunks # TODO: overwrite? IDs and sizes are same. + item.chunks = chunks archive.stats.nfiles += 1 if not dry_run: item = upgrader.upgrade_item(item=item) diff --git a/src/borg/fuse.py b/src/borg/fuse.py index a15d4714e6..d518b028d1 100644 --- a/src/borg/fuse.py +++ b/src/borg/fuse.py @@ -10,7 +10,7 @@ from collections import defaultdict, Counter from signal import SIGINT -from .constants import ROBJ_FILE_STREAM +from .constants import ROBJ_FILE_STREAM, zeros from .fuse_impl import llfuse, has_pyfuse3 @@ -46,6 +46,7 @@ def async_wrapper(fn): from .item import Item from .platform import uid2user, gid2group from .platformflags import is_darwin +from .repository import Repository from .remote import RemoteRepository @@ -652,17 +653,6 @@ def lookup(self, parent_inode, name, ctx=None): @async_wrapper def open(self, inode, flags, ctx=None): - if not self.allow_damaged_files: - item = self.get_item(inode) - if "chunks_healthy" in item: - # Processed archive items don't carry the path anymore; for converting the inode - # to the path we'd either have to store the inverse of the current structure, - # or search the entire archive. So we just don't print it. It's easy to correlate anyway. - logger.warning( - "File has damaged (all-zero) chunks. Try running borg check --repair. " - "Mount with allow_damaged_files to read damaged files." - ) - raise llfuse.FUSEError(errno.EIO) return llfuse.FileInfo(fh=inode) if has_pyfuse3 else inode @async_wrapper @@ -699,7 +689,16 @@ def read(self, fh, offset, size): # evict fully read chunk from cache del self.data_cache[id] else: - _, data = self.repo_objs.parse(id, self.repository_uncached.get(id), ro_type=ROBJ_FILE_STREAM) + try: + cdata = self.repository_uncached.get(id) + except Repository.ObjectNotFound: + if self.allow_damaged_files: + data = zeros[:s] + assert len(data) == s + else: + raise llfuse.FUSEError(errno.EIO) from None + else: + _, data = self.repo_objs.parse(id, cdata, ro_type=ROBJ_FILE_STREAM) if offset + n < len(data): # chunk was only partially read, cache it self.data_cache[id] = data diff --git a/src/borg/helpers/fs.py b/src/borg/helpers/fs.py index b3c602f90b..44db2eeb80 100644 --- a/src/borg/helpers/fs.py +++ b/src/borg/helpers/fs.py @@ -308,8 +308,8 @@ class HardLinkManager: If we encounter the same hlid again later, we hardlink to the path of the already extracted content of same hlid. C) When transferring from a borg1 archive, we need: - path -> chunks, chunks_healthy # for borg1_hl_targets - If we encounter a regular file item with source == path later, we reuse chunks and chunks_healthy + path -> chunks_correct # for borg1_hl_targets, chunks_correct must be either from .chunks_healthy or .chunks. + If we encounter a regular file item with source == path later, we reuse chunks_correct and create the same hlid = hardlink_id_from_path(source). D) When importing a tar file (simplified 1-pass way for now, not creating borg hardlink items): @@ -353,7 +353,7 @@ def remember(self, *, id, info): a hlid (new borg style) [bytes] a (dev, inode) tuple (filesystem) :param info: information to remember, could be: - chunks / chunks_healthy list + chunks list hlid """ assert isinstance(id, self.id_type), f"id is {id!r}, not of type {self.id_type}" diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index 705b4e290c..9849acd0bc 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -826,7 +826,6 @@ class ItemFormatter(BaseFormatter): "isoctime": "file change time (ISO 8601 format)", "isoatime": "file access time (ISO 8601 format)", "xxh64": "XXH64 checksum of this file (note: this is NOT a cryptographic hash!)", - "health": 'either "healthy" (file ok) or "broken" (if file has all-zero replacement chunks)', "archiveid": "internal ID of the archive", "archivename": "name of the archive", } @@ -836,7 +835,6 @@ class ItemFormatter(BaseFormatter): ("mtime", "ctime", "atime", "isomtime", "isoctime", "isoatime"), tuple(sorted(hash_algorithms)), ("archiveid", "archivename", "extra"), - ("health",), ) KEYS_REQUIRING_CACHE = () @@ -893,10 +891,6 @@ def get_item_data(self, item, jsonline=False): item_data.update(text_to_json("user", item.get("user", str(item_data["uid"])))) item_data.update(text_to_json("group", item.get("group", str(item_data["gid"])))) - if jsonline: - item_data["healthy"] = "chunks_healthy" not in item - else: - item_data["health"] = "broken" if "chunks_healthy" in item else "healthy" item_data["flags"] = item.get("bsdflags") # int if flags known, else (if flags unknown) None for key in self.used_call_keys: item_data[key] = self.call_keys[key](item) diff --git a/src/borg/testsuite/archiver/check_cmd_test.py b/src/borg/testsuite/archiver/check_cmd_test.py index 4b628b44bf..bf851ecf9e 100644 --- a/src/borg/testsuite/archiver/check_cmd_test.py +++ b/src/borg/testsuite/archiver/check_cmd_test.py @@ -155,28 +155,19 @@ def test_missing_file_chunk(archivers, request): else: pytest.fail("should not happen") # convert 'fail' - cmd(archiver, "check", exit_code=1) + output = cmd(archiver, "check", exit_code=1) + assert "Missing file chunk detected" in output output = cmd(archiver, "check", "--repair", exit_code=0) - assert "New missing file chunk detected" in output - - cmd(archiver, "check", exit_code=0) - output = cmd(archiver, "list", "archive1", "--format={health}#{path}{NL}", exit_code=0) - assert "broken#" in output + assert "Missing file chunk detected" in output # repair is not changing anything, just reporting. - # check that the file in the old archives has now a different chunk list without the killed chunk. - # also check that the correct original chunks list is preserved in item.chunks_healthy. + # check does not modify the chunks list. for archive_name in ("archive1", "archive2"): archive, repository = open_archive(archiver.repository_path, archive_name) with repository: for item in archive.iter_items(): if item.path.endswith(src_file): assert len(valid_chunks) == len(item.chunks) - assert killed_chunk not in item.chunks - assert valid_chunks != item.chunks - assert "chunks_healthy" in item - assert len(valid_chunks) == len(item.chunks_healthy) - assert killed_chunk in item.chunks_healthy - assert valid_chunks == item.chunks_healthy + assert valid_chunks == item.chunks break else: pytest.fail("should not happen") # convert 'fail' @@ -185,32 +176,9 @@ def test_missing_file_chunk(archivers, request): with patch.object(ChunkBuffer, "BUFFER_SIZE", 10): create_src_archive(archiver, "archive3") - # check should be able to heal the file now: + # check should not complain anymore about missing chunks: output = cmd(archiver, "check", "-v", "--repair", exit_code=0) - assert "Healed previously missing file chunk" in output - assert f"{src_file}: Completely healed previously damaged file!" in output - - # check that the file in the old archives has the correct chunks again. - # also check that chunks_healthy list is removed as it is not needed any more. - for archive_name in ("archive1", "archive2"): - archive, repository = open_archive(archiver.repository_path, archive_name) - with repository: - for item in archive.iter_items(): - if item.path.endswith(src_file): - assert valid_chunks == item.chunks - assert "chunks_healthy" not in item - break - else: - pytest.fail("should not happen") - - # list is also all-healthy again - output = cmd(archiver, "list", "archive1", "--format={health}#{path}{NL}", exit_code=0) - assert "broken#" not in output - - # check should be fine now (and not show it has healed anything). - output = cmd(archiver, "check", "-v", "--repair", exit_code=0) - assert "Healed previously missing file chunk" not in output - assert "testsuite/archiver.py: Completely healed previously damaged file!" not in output + assert "Missing file chunk detected" not in output def test_missing_archive_item_chunk(archivers, request): @@ -425,13 +393,14 @@ def fake_xxh64(data, seed=0): output = cmd(archiver, "check", "--archives-only", "--verify-data", exit_code=1) assert f"{bin_to_hex(chunk.id)}, integrity error" in output - # repair (heal is tested in another test) + # repair will find the defect chunk and remove it output = cmd(archiver, "check", "--repair", "--verify-data", exit_code=0) assert f"{bin_to_hex(chunk.id)}, integrity error" in output - assert f"{src_file}: New missing file chunk detected" in output + assert f"{src_file}: Missing file chunk detected" in output - # run with --verify-data again, all fine now (file was patched with a replacement chunk). - cmd(archiver, "check", "--archives-only", "--verify-data", exit_code=0) + # run with --verify-data again, it will notice the missing chunk. + output = cmd(archiver, "check", "--archives-only", "--verify-data", exit_code=1) + assert f"{src_file}: Missing file chunk detected" in output @pytest.mark.parametrize("init_args", [["--encryption=repokey-aes-ocb"], ["--encryption", "none"]]) @@ -457,13 +426,15 @@ def test_corrupted_file_chunk(archivers, request, init_args): output = cmd(archiver, "check", "--repository-only", exit_code=1) assert f"{bin_to_hex(chunk.id)} is corrupted: data does not match checksum." in output - # repair (heal is tested in another test) + # repair: the defect chunk will be removed by repair. output = cmd(archiver, "check", "--repair", exit_code=0) assert f"{bin_to_hex(chunk.id)} is corrupted: data does not match checksum." in output - assert f"{src_file}: New missing file chunk detected" in output + assert f"{src_file}: Missing file chunk detected" in output - # run normal check again, all fine now (file was patched with a replacement chunk). + # run normal check again cmd(archiver, "check", "--repository-only", exit_code=0) + output = cmd(archiver, "check", "--archives-only", exit_code=1) + assert f"{src_file}: Missing file chunk detected" in output def test_empty_repository(archivers, request): diff --git a/src/borg/testsuite/archiver/mount_cmds_test.py b/src/borg/testsuite/archiver/mount_cmds_test.py index 1a1ea12925..55a3d645f4 100644 --- a/src/borg/testsuite/archiver/mount_cmds_test.py +++ b/src/borg/testsuite/archiver/mount_cmds_test.py @@ -233,15 +233,19 @@ def test_fuse_allow_damaged_files(archivers, request): break else: assert False # missed the file - cmd(archiver, "check", "--repair", exit_code=0) mountpoint = os.path.join(archiver.tmpdir, "mountpoint") with fuse_mount(archiver, mountpoint, "-a", "archive"): - with pytest.raises(OSError) as excinfo: - open(os.path.join(mountpoint, "archive", path)) - assert excinfo.value.errno == errno.EIO + with open(os.path.join(mountpoint, "archive", path), "rb") as f: + with pytest.raises(OSError) as excinfo: + f.read() + assert excinfo.value.errno == errno.EIO + with fuse_mount(archiver, mountpoint, "-a", "archive", "-o", "allow_damaged_files"): - open(os.path.join(mountpoint, "archive", path)).close() + with open(os.path.join(mountpoint, "archive", path), "rb") as f: + # no exception raised, missing data will be all-zero + data = f.read() + assert data.endswith(b"\0\0") @pytest.mark.skipif(not llfuse, reason="llfuse not installed") diff --git a/src/borg/testsuite/archiver/transfer_cmd_test.py b/src/borg/testsuite/archiver/transfer_cmd_test.py index 3084a425c8..e5398e59cc 100644 --- a/src/borg/testsuite/archiver/transfer_cmd_test.py +++ b/src/borg/testsuite/archiver/transfer_cmd_test.py @@ -164,7 +164,6 @@ def convert_tz(local_naive, tzoffset, tzinfo): # fix expectation for size e["size"] = g["size"] # Note: size == 0 for all items without a size or chunks list (like e.g. directories) - # Note: healthy == True indicates the *absence* of the additional chunks_healthy list del g["hlid"] # borg 1 used "linktarget" and "source" for links, borg 2 uses "target" for symlinks. @@ -177,6 +176,9 @@ def convert_tz(local_naive, tzoffset, tzinfo): # The S_IFBLK macro is broken on MINGW del e["type"], g["type"] del e["mode"], g["mode"] + + del e["healthy"] # not supported anymore + assert g == e if name == "archive1": diff --git a/src/borg/upgrade.py b/src/borg/upgrade.py index 6277f0172d..92b26b8908 100644 --- a/src/borg/upgrade.py +++ b/src/borg/upgrade.py @@ -48,7 +48,7 @@ def __init__(self, *, cache): def new_archive(self, *, archive): self.archive = archive - self.hlm = HardLinkManager(id_type=bytes, info_type=tuple) # hlid -> (chunks, chunks_healthy) + self.hlm = HardLinkManager(id_type=bytes, info_type=list) # hlid -> chunks_correct def upgrade_item(self, *, item): """upgrade item as needed, get rid of legacy crap""" @@ -56,7 +56,6 @@ def upgrade_item(self, *, item): "path", "rdev", "chunks", - "chunks_healthy", "hlid", "mode", "user", @@ -78,16 +77,14 @@ def upgrade_item(self, *, item): if self.hlm.borg1_hardlink_master(item): item.hlid = hlid = self.hlm.hardlink_id_from_path(item.path) - self.hlm.remember(id=hlid, info=(item.get("chunks"), item.get("chunks_healthy"))) + self.hlm.remember(id=hlid, info=item.get("chunks")) elif self.hlm.borg1_hardlink_slave(item): item.hlid = hlid = self.hlm.hardlink_id_from_path(item.source) - chunks, chunks_healthy = self.hlm.retrieve(id=hlid, default=(None, None)) + chunks = self.hlm.retrieve(id=hlid) if chunks is not None: item.chunks = chunks for chunk_id, chunk_size in chunks: self.cache.reuse_chunk(chunk_id, chunk_size, self.archive.stats) - if chunks_healthy is not None: - item.chunks_healthy = chunks del item.source # not used for hardlinks any more, replaced by hlid # make sure we only have desired stuff in the new item. specifically, make sure to get rid of: # - 'acl' remnants of bug in attic <= 0.13 From c429decbee826a84cb6c11744cdfa36c0c78624d Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Mon, 25 Nov 2024 21:55:30 +0100 Subject: [PATCH 2/3] simplify fetch_many api it now takes either: - a list of ChunkListEntry namedtuples - a list of chunk IDs [bytes] --- src/borg/archive.py | 31 ++++++++++++++++++++----------- src/borg/archiver/tar_cmds.py | 4 +--- src/borg/helpers/misc.py | 2 +- src/borg/helpers/parseformat.py | 2 +- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/borg/archive.py b/src/borg/archive.py index 384f68398b..0228568c1b 100644 --- a/src/borg/archive.py +++ b/src/borg/archive.py @@ -312,10 +312,22 @@ def preload_item_chunks(self, item, optimize_hardlinks=False): self.repository.preload([c.id for c in item.chunks]) return preload_chunks - def fetch_many(self, ids, is_preloaded=False, ro_type=None): + def fetch_many(self, chunks, is_preloaded=False, ro_type=None): assert ro_type is not None - for id_, cdata in zip(ids, self.repository.get_many(ids, is_preloaded=is_preloaded)): - _, data = self.repo_objs.parse(id_, cdata, ro_type=ro_type) + ids = [] + sizes = [] + if all(isinstance(chunk, ChunkListEntry) for chunk in chunks): + for chunk in chunks: + ids.append(chunk.id) + sizes.append(chunk.size) + elif all(isinstance(chunk, bytes) for chunk in chunks): + ids = list(chunks) + sizes = [None] * len(ids) + else: + raise TypeError(f"unsupported or mixed element types: {chunks}") + for id, size, cdata in zip(ids, sizes, self.repository.get_many(ids, is_preloaded=is_preloaded)): + _, data = self.repo_objs.parse(id, cdata, ro_type=ro_type) + assert size is None or len(data) == size yield data @@ -770,9 +782,7 @@ def same_item(item, st): # it would get stuck. if "chunks" in item: item_chunks_size = 0 - for data in self.pipeline.fetch_many( - [c.id for c in item.chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM - ): + for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM): if pi: pi.show(increase=len(data), info=[remove_surrogates(item.path)]) if stdout: @@ -821,8 +831,7 @@ def make_parent(path): with backup_io("open"): fd = open(path, "wb") with fd: - ids = [c.id for c in item.chunks] - for data in self.pipeline.fetch_many(ids, is_preloaded=True, ro_type=ROBJ_FILE_STREAM): + for data in self.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM): if pi: pi.show(increase=len(data), info=[remove_surrogates(item.path)]) with backup_io("write"): @@ -1005,8 +1014,8 @@ def compare_items(path: str, item1: Item, item2: Item): path, item1, item2, - archive1.pipeline.fetch_many([c.id for c in item1.get("chunks", [])], ro_type=ROBJ_FILE_STREAM), - archive2.pipeline.fetch_many([c.id for c in item2.get("chunks", [])], ro_type=ROBJ_FILE_STREAM), + archive1.pipeline.fetch_many(item1.get("chunks", []), ro_type=ROBJ_FILE_STREAM), + archive2.pipeline.fetch_many(item2.get("chunks", []), ro_type=ROBJ_FILE_STREAM), can_compare_chunk_ids=can_compare_chunk_ids, ) @@ -2193,7 +2202,7 @@ def chunk_processor(self, target, chunk): return chunk_entry def iter_chunks(self, archive, target, chunks): - chunk_iterator = archive.pipeline.fetch_many([chunk_id for chunk_id, _ in chunks], ro_type=ROBJ_FILE_STREAM) + chunk_iterator = archive.pipeline.fetch_many(chunks, ro_type=ROBJ_FILE_STREAM) if target.recreate_rechunkify: # The target.chunker will read the file contents through ChunkIteratorFileWrapper chunk-by-chunk # (does not load the entire file into memory) diff --git a/src/borg/archiver/tar_cmds.py b/src/borg/archiver/tar_cmds.py index d95b992ef2..0be7fa02ea 100644 --- a/src/borg/archiver/tar_cmds.py +++ b/src/borg/archiver/tar_cmds.py @@ -113,9 +113,7 @@ def item_content_stream(item): """ Return a file-like object that reads from the chunks of *item*. """ - chunk_iterator = archive.pipeline.fetch_many( - [chunk_id for chunk_id, _ in item.chunks], is_preloaded=True, ro_type=ROBJ_FILE_STREAM - ) + chunk_iterator = archive.pipeline.fetch_many(item.chunks, is_preloaded=True, ro_type=ROBJ_FILE_STREAM) if pi: info = [remove_surrogates(item.path)] return ChunkIteratorFileWrapper( diff --git a/src/borg/helpers/misc.py b/src/borg/helpers/misc.py index 6028b93a30..a46ab2fa96 100644 --- a/src/borg/helpers/misc.py +++ b/src/borg/helpers/misc.py @@ -124,7 +124,7 @@ def read(self, nbytes): def open_item(archive, item): """Return file-like object for archived item (with chunks).""" - chunk_iterator = archive.pipeline.fetch_many([c.id for c in item.chunks], ro_type=ROBJ_FILE_STREAM) + chunk_iterator = archive.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM) return ChunkIteratorFileWrapper(chunk_iterator) diff --git a/src/borg/helpers/parseformat.py b/src/borg/helpers/parseformat.py index 9849acd0bc..b5cc1fb903 100644 --- a/src/borg/helpers/parseformat.py +++ b/src/borg/helpers/parseformat.py @@ -910,7 +910,7 @@ def hash_item(self, hash_function, item): hash = self.xxh64() elif hash_function in self.hash_algorithms: hash = hashlib.new(hash_function) - for data in self.archive.pipeline.fetch_many([c.id for c in item.chunks], ro_type=ROBJ_FILE_STREAM): + for data in self.archive.pipeline.fetch_many(item.chunks, ro_type=ROBJ_FILE_STREAM): hash.update(data) return hash.hexdigest() From 69d504db917f34f8dc84b50349c4bf512228f5b6 Mon Sep 17 00:00:00 2001 From: Thomas Waldmann Date: Fri, 29 Nov 2024 14:43:45 +0100 Subject: [PATCH 3/3] get/get_many: add raise_missing=True param --- src/borg/legacyrepository.py | 11 +++++++---- src/borg/remote.py | 30 ++++++++++++++++++------------ src/borg/repository.py | 11 +++++++---- 3 files changed, 32 insertions(+), 20 deletions(-) diff --git a/src/borg/legacyrepository.py b/src/borg/legacyrepository.py index 06e166532d..0ff463a86e 100644 --- a/src/borg/legacyrepository.py +++ b/src/borg/legacyrepository.py @@ -1211,18 +1211,21 @@ def list(self, limit=None, marker=None): self.index = self.open_index(self.get_transaction_id()) return [id_ for id_, _ in islice(self.index.iteritems(marker=marker), limit)] - def get(self, id, read_data=True): + def get(self, id, read_data=True, raise_missing=True): if not self.index: self.index = self.open_index(self.get_transaction_id()) try: in_index = NSIndex1Entry(*(self.index[id][:2])) # legacy: index entries have no size element return self.io.read(in_index.segment, in_index.offset, id, read_data=read_data) except KeyError: - raise self.ObjectNotFound(id, self.path) from None + if raise_missing: + raise self.ObjectNotFound(id, self.path) from None + else: + return None - def get_many(self, ids, read_data=True, is_preloaded=False): + def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True): for id_ in ids: - yield self.get(id_, read_data=read_data) + yield self.get(id_, read_data=read_data, raise_missing=raise_missing) def put(self, id, data, wait=True): """put a repo object diff --git a/src/borg/remote.py b/src/borg/remote.py index 03fd633f77..6197234eba 100644 --- a/src/borg/remote.py +++ b/src/borg/remote.py @@ -965,7 +965,7 @@ def handle_error(unpacked): self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: cmd, ARGS: args})) if not self.to_send and self.preload_ids: chunk_id = self.preload_ids.pop(0) - args = {"id": chunk_id} + args = {"id": chunk_id, "raise_missing": True} self.msgid += 1 self.chunkid_to_msgids.setdefault(chunk_id, []).append(self.msgid) self.to_send.push_back(msgpack.packb({MSGID: self.msgid, MSG: "get", ARGS: args})) @@ -1024,12 +1024,16 @@ def __len__(self): def list(self, limit=None, marker=None): """actual remoting is done via self.call in the @api decorator""" - def get(self, id, read_data=True): - for resp in self.get_many([id], read_data=read_data): + def get(self, id, read_data=True, raise_missing=True): + for resp in self.get_many([id], read_data=read_data, raise_missing=raise_missing): return resp - def get_many(self, ids, read_data=True, is_preloaded=False): - yield from self.call_many("get", [{"id": id, "read_data": read_data} for id in ids], is_preloaded=is_preloaded) + def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True): + yield from self.call_many( + "get", + [{"id": id, "read_data": read_data, "raise_missing": raise_missing} for id in ids], + is_preloaded=is_preloaded, + ) @api(since=parse_version("1.0.0")) def put(self, id, data, wait=True): @@ -1131,11 +1135,11 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def get(self, key, read_data=True): - return next(self.get_many([key], read_data=read_data, cache=False)) + def get(self, key, read_data=True, raise_missing=True): + return next(self.get_many([key], read_data=read_data, raise_missing=raise_missing, cache=False)) - def get_many(self, keys, read_data=True, cache=True): - for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data)): + def get_many(self, keys, read_data=True, raise_missing=True, cache=True): + for key, data in zip(keys, self.repository.get_many(keys, read_data=read_data, raise_missing=raise_missing)): yield self.transform(key, data) def log_instrumentation(self): @@ -1240,10 +1244,12 @@ def close(self): self.cache.clear() shutil.rmtree(self.basedir) - def get_many(self, keys, read_data=True, cache=True): + def get_many(self, keys, read_data=True, raise_missing=True, cache=True): # It could use different cache keys depending on read_data and cache full vs. meta-only chunks. unknown_keys = [key for key in keys if self.prefixed_key(key, complete=read_data) not in self.cache] - repository_iterator = zip(unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data)) + repository_iterator = zip( + unknown_keys, self.repository.get_many(unknown_keys, read_data=read_data, raise_missing=raise_missing) + ) for key in keys: pkey = self.prefixed_key(key, complete=read_data) if pkey in self.cache: @@ -1261,7 +1267,7 @@ def get_many(self, keys, read_data=True, cache=True): else: # slow path: eviction during this get_many removed this key from the cache t0 = time.perf_counter() - data = self.repository.get(key, read_data=read_data) + data = self.repository.get(key, read_data=read_data, raise_missing=raise_missing) self.slow_lat += time.perf_counter() - t0 transformed = self.add_entry(key, data, cache, complete=read_data) self.slow_misses += 1 diff --git a/src/borg/repository.py b/src/borg/repository.py index 395e312962..5181a5c406 100644 --- a/src/borg/repository.py +++ b/src/borg/repository.py @@ -426,7 +426,7 @@ def list(self, limit=None, marker=None): # note: do not collect the marker id return result - def get(self, id, read_data=True): + def get(self, id, read_data=True, raise_missing=True): self._lock_refresh() id_hex = bin_to_hex(id) key = "data/" + id_hex @@ -453,11 +453,14 @@ def get(self, id, read_data=True): raise IntegrityError(f"Object too small [id {id_hex}]: expected {meta_size}, got {len(meta)} bytes") return hdr + meta except StoreObjectNotFound: - raise self.ObjectNotFound(id, str(self._location)) from None + if raise_missing: + raise self.ObjectNotFound(id, str(self._location)) from None + else: + return None - def get_many(self, ids, read_data=True, is_preloaded=False): + def get_many(self, ids, read_data=True, is_preloaded=False, raise_missing=True): for id_ in ids: - yield self.get(id_, read_data=read_data) + yield self.get(id_, read_data=read_data, raise_missing=raise_missing) def put(self, id, data, wait=True): """put a repo object