From 804637cf8bffe32fa3b4eaab67123eac0b2da930 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Sat, 28 May 2022 22:55:32 +0100 Subject: [PATCH 01/13] Initial implementation of archive repack --- disk_objectstore/container.py | 148 +++++++++++++++++++++++++++++++++- disk_objectstore/database.py | 20 +++++ 2 files changed, 165 insertions(+), 3 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 63f9b8f..dcab04d 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -1,15 +1,17 @@ """ The main implementation of the ``Container`` class of the object store. """ -# pylint: disable=too-many-lines import io import json import os import shutil import uuid import warnings +import zipfile from collections import defaultdict, namedtuple from contextlib import contextmanager + +# pylint: disable=too-many-lines from enum import Enum from pathlib import Path from typing import ( @@ -37,7 +39,7 @@ from sqlalchemy.sql import func from sqlalchemy.sql.expression import delete, select, text, update -from .database import Obj, get_session +from .database import Obj, Pack, PackState, get_session from .exceptions import InconsistentContent, NotExistent, NotInitialised from .utils import ( CallbackStreamWrapper, @@ -391,6 +393,7 @@ def init_container( for folder in [ self._get_pack_folder(), + self._get_archive_folder(), self._get_loose_folder(), self._get_duplicates_folder(), self._get_sandbox_folder(), @@ -2621,7 +2624,7 @@ def repack_pack( obj_dicts = [] # Now we can safely delete the old object. I just check that there is no object still - # refencing the old pack, to be sure. + # referencing the old pack, to be sure. one_object_in_pack = session.execute( select(Obj.id).where(Obj.pack_id == pack_id).limit(1) ).all() @@ -2662,3 +2665,142 @@ def repack_pack( # We are now done. The temporary pack is gone, and the old `pack_id` # has now been replaced with an udpated, repacked pack. + + def archive_pack(self, pack_id, run_read_test): + """ + Archive the pack - transfer the pack into a ZIP archive + + This is accepted as a slow operation. + The destination may not be on the same file system, e.g. the archive folder may be on a + different (networked) file system. + """ + # Check that it does not exist + assert not os.path.exists( + self._get_pack_path_from_pack_id( + self._REPACK_PACK_ID, allow_repack_pack=True + ) + ), f"The repack pack '{self._REPACK_PACK_ID}' already exists, probably a previous repacking aborted?" + + session = self._get_cached_session() + one_object_in_pack = session.execute( + select(Obj.id).where(Obj.pack_id == pack_id).limit(1) + ).all() + # No thing to seal + if not one_object_in_pack: + # No objects. Clean up the pack file, if it exists. + if os.path.exists(self._get_pack_path_from_pack_id(pack_id)): + os.remove(self._get_pack_path_from_pack_id(pack_id)) + return + # Gather all information of the objects to be stored + stmt = ( + select(Obj.id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed) + .where(Obj.pack_id == pack_id) + .order_by(Obj.offset) + ) + obj_dicts = [] + for rowid, hashkey, size, offset, length, compressed in session.execute(stmt): + obj_dict = {} + obj_dict["id"] = rowid + obj_dict["hashkey"] = hashkey + obj_dict["size"] = size + obj_dict["compressed"] = compressed + obj_dict["length"] = length + obj_dict["offset"] = offset + + # Process the file name of the zipped files + if self.loose_prefix_len: + obj_dict["filename"] = os.path.join( + hashkey[: self.loose_prefix_len], + hashkey[self.loose_prefix_len :], + ) + else: + obj_dict["filename"] = hashkey + obj_dicts.append(obj_dict) + # We now have a list of objects to be stored archived in the ZIP file + # Proceed with writing out the file + + # Createa temporary pack with id -1 + with self.lock_pack( + str(self._REPACK_PACK_ID), allow_repack_pack=True + ) as write_pack_handle: + with zipfile.ZipFile(write_pack_handle, "w") as archive: + # Iterate all objects in the pack to be archived + for obj_dict in obj_dicts: + zinfo = zipfile.ZipInfo(filename=obj_dict["filename"]) + zinfo.file_size = obj_dict["size"] + # Using deflated compression algorithm + zinfo.compress_type = zipfile.ZIP_DEFLATED + with archive.open(zinfo, "w") as zip_write_handle: + with self.get_object_stream( + obj_dict["hashkey"] + ) as source_handle: + shutil.copyfileobj(source_handle, zip_write_handle) + # Now the field of the zinfo has been set + obj_dict["length"] = zinfo.compress_size + # Offset was the current location minus the size of the compressed information written + obj_dict["offset"] = write_pack_handle.tell() - zinfo.compress_size + obj_dict["compressed"] = True + # The writing of the zip file is now completed + + # Update the SQLite database so this pack is "archived" + for obj_dict in obj_dicts: + obj_dict.pop("filename") + + # Now test the archvie file + if run_read_test is True: + test_result = self._validate_archvie_file( + self._get_archive_path_from_pack_id(pack_id), obj_dicts + ) + if test_result is False: + raise ValueError("Created archive file does not pass read tests") + + # Update the pack table such that the pack is "archived" + session.execute( + update(Pack) + .where(Pack.pack_id == pack_id) + .values( + state=PackState.ARCHIVED, + location=self._get_archive_path_from_pack_id(pack_id), + ) + ) + + # Commit change in compress, offset, length, for each object + session.bulk_update_mappings(Obj, obj_dicts) + + # Delete the original pack file + os.unlink(self._get_pack_path_from_pack_id(pack_id)) + + def _get_archive_path_from_pack_id(self, pack_id): + """ + Return the path to the archived pack. + + The name includes the container_id, as the archive fold can be mounted from other + file systems. + """ + pack_id = str(pack_id) + session = self._get_cached_session() + pack = session.execute(select(Pack).where(Pack.id == pack_id).limit(1)).all() + # No location set - use the default one + if not pack or not pack[0].location: + return os.path.join( + self._get_archive_folder(), self.container_id + "-" + pack_id + ".zip" + ) + return pack[0].location + + def _get_archive_folder(self): + """Return folder of the archive""" + return os.path.join(self._folder, "archives") + + def _validate_archvie_file(self, fpath, obj_dicts: List[Dict[str, Any]]) -> bool: + """Test reading from an archive file""" + if len(obj_dicts) < 1: + return False + with open(fpath, "rb") as fhandle: + for obj_dict in obj_dicts: + reader = get_stream_decompresser(self.compression_algorithm)( + PackedObjectReader(fhandle, obj_dict["offset"], obj_dict["length"]) + ) + hashkey, size = compute_hash_and_size(reader, self.hash_type) + if hashkey != obj_dict["hashkey"] or size != obj_dict["size"]: + return False + return True diff --git a/disk_objectstore/database.py b/disk_objectstore/database.py index 73acc06..07a02dd 100644 --- a/disk_objectstore/database.py +++ b/disk_objectstore/database.py @@ -1,4 +1,5 @@ """Models for the container index file (SQLite DB).""" +import enum import os from typing import Optional @@ -31,6 +32,25 @@ class Obj(Base): # pylint: disable=too-few-public-methods ) # integer ID of the pack in which this entry is stored +class PackState(enum.Enum): + """Enum for valid sate of seal packs""" + + ARCHIVED = "Archived" + UNSEALED = "Unsealed" + + +class Pack(Base): # pylint: disable=too-few-public-methods + """The table for storing the state of pack files. If missing, it means that the pack is currently active""" + + __tablename__ = "db_pack" + + id = Column(Integer, primary_key=True) + pack_id = Column(Integer, primary_key=False) + state = Column(String, nullable=False, unique=False) + md5 = Column(String, unique=True, nullable=False) + location = Column(String, nullable=True) + + def get_session( path: str, create: bool = False, raise_if_missing: bool = False ) -> Optional[Session]: From 5cb655ef0498962e44b9b29c0c254da1c3495faa Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Sun, 29 May 2022 12:09:51 +0100 Subject: [PATCH 02/13] Added tests for pack path/pack id allocation Archive packs needs special handling. 1. They should not be selected for writing 2. They can reside on different locations --- disk_objectstore/container.py | 80 ++++++++++++++--- disk_objectstore/database.py | 5 +- tests/test_container.py | 162 ++++++++++++++++++++++++++++++++++ 3 files changed, 231 insertions(+), 16 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index dcab04d..244773d 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -251,6 +251,19 @@ def _get_pack_path_from_pack_id( assert self._is_valid_pack_id( pack_id, allow_repack_pack=allow_repack_pack ), f"Invalid pack ID {pack_id}" + + # Are we trying to read from an archived pack? + session = self._get_session() + archived = ( + session.execute( + select(Pack).filter_by(pack_id=pack_id, state=PackState.ARCHIVED.value) + ) + .scalars() + .first() + ) + if archived is not None: + return self._get_archive_path_from_pack_id(pack_id) + return os.path.join(self._get_pack_folder(), pack_id) def _get_pack_index_path(self) -> str: @@ -269,14 +282,26 @@ def _get_pack_id_to_write_to(self) -> int: """ # Default to zero if not set (e.g. if it's None) pack_id = self._current_pack_id or 0 + + # Find all archived pack_ids + # We do not expect this to change over the concurrent operation, so more efficient to do it in one go + session = self._get_cached_session() + archived = session.execute( + select(Pack.pack_id).filter_by(state=PackState.ARCHIVED.value) + ).all() + # Convert the string representation back to int + all_archived_ids = [int(entry[0]) for entry in archived] + while True: pack_path = self._get_pack_path_from_pack_id(pack_id) - if not os.path.exists(pack_path): - # Use this ID - the pack file does not exist yet - break - if os.path.getsize(pack_path) < self.pack_size_target: - # Use this ID - the pack file is not "full" yet - break + # Check if we are trying to accessing an archive pack + if pack_id not in all_archived_ids: + if not os.path.exists(pack_path): + # Use this ID - the pack file does not exist yet + break + if os.path.getsize(pack_path) < self.pack_size_target: + # Use this ID - the pack file is not "full" yet + break # Try the next pack pack_id += 1 @@ -1150,6 +1175,24 @@ def lock_pack( """ assert self._is_valid_pack_id(pack_id, allow_repack_pack=allow_repack_pack) + # Check that this is not an archived pack + session = self._get_cached_session() + this_pack = ( + session.execute(select(Pack).filter(Pack.pack_id == pack_id)) + .scalars() + .first() + ) + if this_pack is not None and this_pack.state == PackState.ARCHIVED.value: + raise ValueError( + f"Pack {pack_id} is archived, so it cannot be locked for writing!" + ) + + # This is a new pack, so we update the Pack table in the SQLite database + if this_pack is None: + this_pack = Pack(state=PackState.ACTIVE.value, pack_id=pack_id) + session.add(this_pack) + session.commit() + # Open file in exclusive mode lock_file = os.path.join(self._get_pack_folder(), f"{pack_id}.lock") pack_file = self._get_pack_path_from_pack_id( @@ -2666,7 +2709,7 @@ def repack_pack( # We are now done. The temporary pack is gone, and the old `pack_id` # has now been replaced with an udpated, repacked pack. - def archive_pack(self, pack_id, run_read_test): + def archive_pack(self, pack_id, run_read_test=True): """ Archive the pack - transfer the pack into a ZIP archive @@ -2720,8 +2763,8 @@ def archive_pack(self, pack_id, run_read_test): # Proceed with writing out the file # Createa temporary pack with id -1 - with self.lock_pack( - str(self._REPACK_PACK_ID), allow_repack_pack=True + with open( + self._get_archive_path_from_pack_id(pack_id), "wb" ) as write_pack_handle: with zipfile.ZipFile(write_pack_handle, "w") as archive: # Iterate all objects in the pack to be archived @@ -2759,7 +2802,7 @@ def archive_pack(self, pack_id, run_read_test): update(Pack) .where(Pack.pack_id == pack_id) .values( - state=PackState.ARCHIVED, + state=PackState.ARCHIVED.value, location=self._get_archive_path_from_pack_id(pack_id), ) ) @@ -2776,16 +2819,27 @@ def _get_archive_path_from_pack_id(self, pack_id): The name includes the container_id, as the archive fold can be mounted from other file systems. + + There are three possible cases: + + 1. The pack is not archived, return the default path + 2. The pack is archived, but not explicity path, return the default one + 3. The pack is archvied, but ha an explicity path, return the stored path + """ pack_id = str(pack_id) session = self._get_cached_session() - pack = session.execute(select(Pack).where(Pack.id == pack_id).limit(1)).all() + pack = ( + session.execute(select(Pack).where(Pack.pack_id == pack_id)) + .scalars() + .first() + ) # No location set - use the default one - if not pack or not pack[0].location: + if not pack or not pack.location: return os.path.join( self._get_archive_folder(), self.container_id + "-" + pack_id + ".zip" ) - return pack[0].location + return pack.location def _get_archive_folder(self): """Return folder of the archive""" diff --git a/disk_objectstore/database.py b/disk_objectstore/database.py index 07a02dd..59fa9ee 100644 --- a/disk_objectstore/database.py +++ b/disk_objectstore/database.py @@ -36,7 +36,7 @@ class PackState(enum.Enum): """Enum for valid sate of seal packs""" ARCHIVED = "Archived" - UNSEALED = "Unsealed" + ACTIVE = "Active" class Pack(Base): # pylint: disable=too-few-public-methods @@ -45,9 +45,8 @@ class Pack(Base): # pylint: disable=too-few-public-methods __tablename__ = "db_pack" id = Column(Integer, primary_key=True) - pack_id = Column(Integer, primary_key=False) + pack_id = Column(String, primary_key=False) state = Column(String, nullable=False, unique=False) - md5 = Column(String, unique=True, nullable=False) location = Column(String, nullable=True) diff --git a/tests/test_container.py b/tests/test_container.py index da1b1df..cefe43b 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -15,6 +15,8 @@ import disk_objectstore.exceptions as exc from disk_objectstore import CompressMode, Container, ObjectType, database, utils +from disk_objectstore.container import select, update +from disk_objectstore.database import Pack, PackState COMPRESSION_ALGORITHMS_TO_TEST = ["zlib+1", "zlib+9"] @@ -3325,3 +3327,163 @@ def test_unknown_compressers(temp_container, compression_algorithm): temp_container.init_container( clear=True, compression_algorithm=compression_algorithm ) + + +def _test_archive(temp_dir): + """Test the repacking functionality.""" + temp_container = Container(temp_dir) + temp_container.init_container(clear=True, pack_size_target=39) + + # data of 10 bytes each. Will fill two packs. + data = [ + b"-123456789", + b"a123456789", + b"b123456789", + b"c123456789", + b"d123456789", + b"e123456789", + b"f123456789", + b"g123456789", + b"h123456789", + ] + + hashkeys = [] + # Add them one by one, so I am sure in wich pack they go + for datum in data: + hashkeys.append(temp_container.add_objects_to_pack([datum])[0]) + + assert temp_container.get_object_meta(hashkeys[0])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[1])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[2])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[3])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[4])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[5])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[6])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[7])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[8])["pack_id"] == 2 + + # I check which packs exist + assert sorted(temp_container._list_packs()) == [ + "0", + "1", + "2", + ] + + counts = temp_container.count_objects() + assert counts["packed"] == len(data) + size = temp_container.get_total_size() + assert size["total_size_packed"] == 10 * len(data) + assert size["total_size_packfiles_on_disk"] == 10 * len(data) + + # I now archive the pack + temp_container.archive_pack(1) + + # Important before exiting from the tests + temp_container.close() + + +def test_get_archive_path(temp_container): + """ + Test getting archvie path + """ + + # New pack + path = temp_container._get_archive_path_from_pack_id(999) + assert ( + path + == os.path.join( + temp_container._get_archive_folder(), temp_container.container_id + ) + + "-999.zip" + ) + + # Existing active pack + session = temp_container._get_cached_session() + pack = Pack(pack_id="998", state=PackState.ACTIVE.value) + session.add(pack) + session.commit() + path = temp_container._get_archive_path_from_pack_id(998) + assert ( + path + == os.path.join( + temp_container._get_archive_folder(), temp_container.container_id + ) + + "-998.zip" + ) + + # Existing archive pack + pack = Pack(pack_id="997", state=PackState.ARCHIVED.value) + session.add(pack) + session.commit() + path = temp_container._get_archive_path_from_pack_id(997) + assert ( + path + == os.path.join( + temp_container._get_archive_folder(), temp_container.container_id + ) + + "-997.zip" + ) + + pack = Pack( + pack_id="996", state=PackState.ARCHIVED.value, location="/tmp/archive.zip" + ) + session.add(pack) + session.commit() + path = temp_container._get_archive_path_from_pack_id(996) + assert path == "/tmp/archive.zip" + + +def test_get_pack_id_with_archive(temp_dir): + """Test get_pack_id_to_write_to with archived packs""" + + temp_container = Container(temp_dir) + temp_container.init_container(clear=True, pack_size_target=39) + + # data of 10 bytes each. Will fill two packs. + data = [ + b"-123456789", + b"a123456789", + b"b123456789", + b"c123456789", + b"d123456789", + b"e123456789", + b"f123456789", + b"g123456789", + b"h123456789", + ] + + hashkeys = [] + # Add them one by one, so I am sure in wich pack they go + for datum in data: + hashkeys.append(temp_container.add_objects_to_pack([datum])[0]) + + session = temp_container._get_cached_session() + packs = session.execute(select(Pack)).scalars().all() + # Check there should be three packs + assert len(packs) == 3 + + # Three packs should be recorded in the Pack table as well + assert ( + len( + session.execute( + select(Pack.pack_id).filter_by(state=PackState.ACTIVE.value) + ).all() + ) + == 3 + ) + + # Now, the next object should writ to pack 2, since it is not ful + assert temp_container._get_pack_id_to_write_to() == 2 + # Mark the third pack as ARCHIVE + session.execute( + update(Pack) + .where(Pack.id == packs[-1].id) + .values(state=PackState.ARCHIVED.value, location="/tmp/2.zip") + ) + session.commit() + + # Writing to pack 2 is not allowed anymore + assert temp_container._get_pack_id_to_write_to() == 3 + + # Getting the "pack_path" for pack 2 should now point to the custom location + assert temp_container._get_pack_path_from_pack_id(2) == "/tmp/2.zip" From 1f53429646f5b41f4d1993befb7b12860ca9c0f3 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Sun, 29 May 2022 17:01:22 +0100 Subject: [PATCH 03/13] Implemented archive_pack function --- disk_objectstore/container.py | 118 ++++++++++++++++++++++++++++------ disk_objectstore/utils.py | 8 +++ tests/test_container.py | 21 +++++- 3 files changed, 125 insertions(+), 22 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 244773d..db96b2f 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -270,6 +270,15 @@ def _get_pack_index_path(self) -> str: """Return the path to the SQLite file containing the index of packed objects.""" return os.path.join(self._folder, f"packs{self._PACK_INDEX_SUFFIX}") + def get_archived_pack_ids(self) -> List[int]: + """Return ids of the archived packs""" + session = self._get_cached_session() + archived = session.execute( + select(Pack.pack_id).filter_by(state=PackState.ARCHIVED.value) + ).all() + # Convert the string representation back to int + return [int(entry[0]) for entry in archived] + def _get_pack_id_to_write_to(self) -> int: """Return the pack ID to write the next object. @@ -285,17 +294,11 @@ def _get_pack_id_to_write_to(self) -> int: # Find all archived pack_ids # We do not expect this to change over the concurrent operation, so more efficient to do it in one go - session = self._get_cached_session() - archived = session.execute( - select(Pack.pack_id).filter_by(state=PackState.ARCHIVED.value) - ).all() - # Convert the string representation back to int - all_archived_ids = [int(entry[0]) for entry in archived] - + archived_int_ids = self.get_archived_pack_ids() while True: pack_path = self._get_pack_path_from_pack_id(pack_id) # Check if we are trying to accessing an archive pack - if pack_id not in all_archived_ids: + if pack_id not in archived_int_ids: if not os.path.exists(pack_path): # Use this ID - the pack file does not exist yet break @@ -625,6 +628,8 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too obj_reader: StreamReadBytesType + archived_pack_int_ids = self.get_archived_pack_ids() + if len(hashkeys_set) <= self._MAX_CHUNK_ITERATE_LENGTH: # Operate in chunks, due to the SQLite limits # (see comment above the definition of self._IN_SQL_MAX_LENGTH) @@ -688,6 +693,8 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too ) if metadata.compressed: obj_reader = self._get_stream_decompresser()(obj_reader) + if pack_int_id in archived_pack_int_ids: + obj_reader.set_zip_mode() yield metadata.hashkey, obj_reader, meta else: yield metadata.hashkey, meta @@ -827,7 +834,12 @@ def _get_objects_stream_meta_generator( # pylint: disable=too-many-branches,too length=metadata.length, ) if metadata.compressed: + # We reading from archived pack we need to set it to ZIP mode. + # This is because the stream in the ZIP file does not contain zlib + # header/trailer (WBITS=-15) obj_reader = self._get_stream_decompresser()(obj_reader) + if pack_int_id in archived_pack_int_ids: + obj_reader.set_zip_mode() yield metadata.hashkey, obj_reader, meta else: yield metadata.hashkey, meta @@ -1231,7 +1243,7 @@ def _list_loose(self) -> Iterator[str]: continue yield first_level - def _list_packs(self) -> Iterator[str]: + def _list_packs(self, include_archived=True) -> Iterator[str]: """Iterate over packs. .. note:: this returns a generator of the pack IDs. @@ -1244,6 +1256,11 @@ def _list_packs(self) -> Iterator[str]: if self._is_valid_pack_id(fname): yield fname + # Include also archived packs + if include_archived: + for pack_id in self.get_archived_pack_ids(): + yield str(pack_id) + def list_all_objects(self) -> Iterator[str]: """Iterate of all object hashkeys. @@ -2356,6 +2373,8 @@ def callback(self, action, value): session = self._get_cached_session() + is_archived = pack_id in self.get_archived_pack_ids() + if callback: # If we have a callback, compute the total count of objects in this pack total = session.scalar( @@ -2385,7 +2404,10 @@ def callback(self, action, value): fhandle=pack_handle, offset=offset, length=length ) if compressed: + obj_reader = self._get_stream_decompresser()(obj_reader) + if is_archived: + obj_reader.set_zip_mode() computed_hash, computed_size = compute_hash_and_size( obj_reader, self.hash_type @@ -2709,7 +2731,9 @@ def repack_pack( # We are now done. The temporary pack is gone, and the old `pack_id` # has now been replaced with an udpated, repacked pack. - def archive_pack(self, pack_id, run_read_test=True): + def archive_pack( + self, pack_id, run_read_test=True, trim_filenames=True + ): # pylint: disable=too-many-locals """ Archive the pack - transfer the pack into a ZIP archive @@ -2736,7 +2760,9 @@ def archive_pack(self, pack_id, run_read_test=True): return # Gather all information of the objects to be stored stmt = ( - select(Obj.id, Obj.hashkey, Obj.offset, Obj.length, Obj.compressed) + select( + Obj.id, Obj.hashkey, Obj.size, Obj.offset, Obj.length, Obj.compressed + ) .where(Obj.pack_id == pack_id) .order_by(Obj.offset) ) @@ -2760,19 +2786,35 @@ def archive_pack(self, pack_id, run_read_test=True): obj_dict["filename"] = hashkey obj_dicts.append(obj_dict) # We now have a list of objects to be stored archived in the ZIP file - # Proceed with writing out the file - # Createa temporary pack with id -1 + # Check if we can trim the filename to save some overhead + if trim_filenames is True: + lname = minimum_length_without_duplication( + [x["filename"] for x in obj_dicts] + ) + for obj_dict in obj_dicts: + obj_dict["filename"] = obj_dict["filename"][:lname] + + # Proceed with writing out the file + assert self.compression_algorithm.startswith( + "zlib" + ), "Cannot make archive with compression algorithm other than zlib" + compress_level = int(self.compression_algorithm.split("+")[1]) with open( self._get_archive_path_from_pack_id(pack_id), "wb" ) as write_pack_handle: - with zipfile.ZipFile(write_pack_handle, "w") as archive: + with zipfile.ZipFile( + write_pack_handle, + "w", + compression=zipfile.ZIP_DEFLATED, + compresslevel=compress_level, + ) as archive: # Iterate all objects in the pack to be archived for obj_dict in obj_dicts: zinfo = zipfile.ZipInfo(filename=obj_dict["filename"]) zinfo.file_size = obj_dict["size"] - # Using deflated compression algorithm - zinfo.compress_type = zipfile.ZIP_DEFLATED + # Set the correct compresslevel + self._set_compresslevel(zinfo) with archive.open(zinfo, "w") as zip_write_handle: with self.get_object_stream( obj_dict["hashkey"] @@ -2780,7 +2822,7 @@ def archive_pack(self, pack_id, run_read_test=True): shutil.copyfileobj(source_handle, zip_write_handle) # Now the field of the zinfo has been set obj_dict["length"] = zinfo.compress_size - # Offset was the current location minus the size of the compressed information written + # Offset was the current location minus the size of the compressed stream written obj_dict["offset"] = write_pack_handle.tell() - zinfo.compress_size obj_dict["compressed"] = True # The writing of the zip file is now completed @@ -2789,7 +2831,7 @@ def archive_pack(self, pack_id, run_read_test=True): for obj_dict in obj_dicts: obj_dict.pop("filename") - # Now test the archvie file + # Validate the archvie file just created if run_read_test is True: test_result = self._validate_archvie_file( self._get_archive_path_from_pack_id(pack_id), obj_dicts @@ -2797,6 +2839,12 @@ def archive_pack(self, pack_id, run_read_test=True): if test_result is False: raise ValueError("Created archive file does not pass read tests") + # We can now update the database so the Obj table with the new offsets + + # First, record the original "pack" file's path, since it will be changed once + # the archvied pack is active + original_pack_path = self._get_pack_path_from_pack_id(pack_id) + # Update the pack table such that the pack is "archived" session.execute( update(Pack) @@ -2807,11 +2855,12 @@ def archive_pack(self, pack_id, run_read_test=True): ) ) - # Commit change in compress, offset, length, for each object + # Commit change in compress, offset, length, for each object in the Obj table session.bulk_update_mappings(Obj, obj_dicts) + session.commit() - # Delete the original pack file - os.unlink(self._get_pack_path_from_pack_id(pack_id)) + # We can now delete the original pack file + os.unlink(original_pack_path) def _get_archive_path_from_pack_id(self, pack_id): """ @@ -2854,7 +2903,34 @@ def _validate_archvie_file(self, fpath, obj_dicts: List[Dict[str, Any]]) -> bool reader = get_stream_decompresser(self.compression_algorithm)( PackedObjectReader(fhandle, obj_dict["offset"], obj_dict["length"]) ) + reader.set_zip_mode() hashkey, size = compute_hash_and_size(reader, self.hash_type) if hashkey != obj_dict["hashkey"] or size != obj_dict["size"]: return False return True + + def _set_compresslevel(self, zinfo): + # The compression used in the archvie must be the same as that defined by the container + if self.compression_algorithm.startswith("zlib"): + zinfo.compress_type = zipfile.ZIP_DEFLATED + zinfo._compresslevel = int( # pylint: disable=protected-access + self.compression_algorithm.split("+")[1] + ) + else: + raise ValueError( + "Compression algorithm is in-compatible with pack archiving." + ) + + +def minimum_length_without_duplication(names, min_length=8): + """ + Find how many characters is needed to ensure there is no conflict among a set of filenames + """ + length = min_length + length_ok = False + while not length_ok: + length += 1 + trimmed = {name[:length] for name in names} + length_ok = len(trimmed) == len(names) + + return length diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index 5622aa0..3cbb571 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -653,6 +653,14 @@ def __init__(self, compressed_stream: StreamSeekBytesType) -> None: self._internal_buffer = b"" self._pos = 0 + def set_zip_mode(self): + """Switch to ZIP mode - decompresss with WBITS=-15""" + self._decompressor = self.decompressobj_class(-15) + + def set_zlib_mode(self): + """Switch to normal operation mode""" + self._decompressor = self.decompressobj_class() + @property def mode(self) -> str: return getattr(self._compressed_stream, "mode", "rb") diff --git a/tests/test_container.py b/tests/test_container.py index cefe43b..e9a63e0 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -9,6 +9,7 @@ import shutil import stat import tempfile +import zipfile import psutil import pytest @@ -3329,7 +3330,7 @@ def test_unknown_compressers(temp_container, compression_algorithm): ) -def _test_archive(temp_dir): +def test_archive(temp_dir): """Test the repacking functionality.""" temp_container = Container(temp_dir) temp_container.init_container(clear=True, pack_size_target=39) @@ -3378,6 +3379,24 @@ def _test_archive(temp_dir): # I now archive the pack temp_container.archive_pack(1) + assert temp_container._get_pack_path_from_pack_id(1).endswith("zip") + + # Can still read everything + for idx, value in enumerate(data): + assert temp_container.get_object_content(hashkeys[idx]) == value + + # Validate the hashes + temp_container._validate_hashkeys_pack(1) + + size = temp_container.get_total_size() + # Due to the overhead, packs on the disk now takes more space.... + assert size["total_size_packfiles_on_disk"] > 10 * len(data) + + # Check that the Zipfile generated is valid + with zipfile.ZipFile(temp_container._get_pack_path_from_pack_id(1)) as zfile: + assert len(zfile.infolist()) == 4 + zfile.testzip() + # Important before exiting from the tests temp_container.close() From 8c499f449f89975d7ed0a4a7330ef9ffa0ec0dae Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Sun, 29 May 2022 17:59:24 +0100 Subject: [PATCH 04/13] Added methods to "loosen" archives This allow an archived pack to be imported into other containers, without the need of the SQLite database file. --- disk_objectstore/container.py | 72 ++++++++++++++++++++++++++--------- tests/test_container.py | 24 ++++++++++++ 2 files changed, 77 insertions(+), 19 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index db96b2f..0cf5bc4 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -2603,6 +2603,14 @@ def repack_pack( pack_id != self._REPACK_PACK_ID ), f"The specified pack_id '{pack_id}' is invalid, it is the one used for repacking" + # We cannot repack a packed file since the DEFLATE stream in the ZIP is not the same + # as that written by zlib.compress, due to the requirement of WBITS=-15 for those in the + # ZIP file, while those written in unpacked file has WBITS=15 (with zlib header/trailers) + # This limitation can be lifted if other CompressMode is supported. + assert ( + pack_id not in self.get_archived_pack_ids() + ), f"The specified pack_id '{pack_id}' is archived, but repacking archived packs is not currently implemented." + # Check that it does not exist assert not os.path.exists( self._get_pack_path_from_pack_id( @@ -2741,18 +2749,12 @@ def archive_pack( The destination may not be on the same file system, e.g. the archive folder may be on a different (networked) file system. """ - # Check that it does not exist - assert not os.path.exists( - self._get_pack_path_from_pack_id( - self._REPACK_PACK_ID, allow_repack_pack=True - ) - ), f"The repack pack '{self._REPACK_PACK_ID}' already exists, probably a previous repacking aborted?" session = self._get_cached_session() one_object_in_pack = session.execute( select(Obj.id).where(Obj.pack_id == pack_id).limit(1) ).all() - # No thing to seal + # No thing to archive for if not one_object_in_pack: # No objects. Clean up the pack file, if it exists. if os.path.exists(self._get_pack_path_from_pack_id(pack_id)): @@ -2788,6 +2790,8 @@ def archive_pack( # We now have a list of objects to be stored archived in the ZIP file # Check if we can trim the filename to save some overhead + # NOTE: If the names are trimmed, the archvie files should not be unzipped into the same + # folder if trim_filenames is True: lname = minimum_length_without_duplication( [x["filename"] for x in obj_dicts] @@ -2795,11 +2799,12 @@ def archive_pack( for obj_dict in obj_dicts: obj_dict["filename"] = obj_dict["filename"][:lname] - # Proceed with writing out the file assert self.compression_algorithm.startswith( "zlib" ), "Cannot make archive with compression algorithm other than zlib" compress_level = int(self.compression_algorithm.split("+")[1]) + + # Proceed with writing out the archive file with open( self._get_archive_path_from_pack_id(pack_id), "wb" ) as write_pack_handle: @@ -2820,17 +2825,14 @@ def archive_pack( obj_dict["hashkey"] ) as source_handle: shutil.copyfileobj(source_handle, zip_write_handle) - # Now the field of the zinfo has been set + # Update the obj_dict according to the zinfo files that have been set obj_dict["length"] = zinfo.compress_size # Offset was the current location minus the size of the compressed stream written obj_dict["offset"] = write_pack_handle.tell() - zinfo.compress_size obj_dict["compressed"] = True - # The writing of the zip file is now completed - - # Update the SQLite database so this pack is "archived" - for obj_dict in obj_dicts: - obj_dict.pop("filename") + # The writing of the zip file is now completed, now we update the database + # but before that we valid the archvie file, just to be sure # Validate the archvie file just created if run_read_test is True: test_result = self._validate_archvie_file( @@ -2839,13 +2841,13 @@ def archive_pack( if test_result is False: raise ValueError("Created archive file does not pass read tests") - # We can now update the database so the Obj table with the new offsets - - # First, record the original "pack" file's path, since it will be changed once + # Reading from the archive file works, now update the database + # I record the original "pack" file's path, since it will be changed once # the archvied pack is active original_pack_path = self._get_pack_path_from_pack_id(pack_id) # Update the pack table such that the pack is "archived" + session.execute( update(Pack) .where(Pack.pack_id == pack_id) @@ -2855,14 +2857,18 @@ def archive_pack( ) ) - # Commit change in compress, offset, length, for each object in the Obj table + # Update the Obj table with new length and compression + for obj_dict in obj_dicts: + obj_dict.pop("filename") session.bulk_update_mappings(Obj, obj_dicts) + + # Commit change in compress, offset, length, for each object in the Obj table session.commit() # We can now delete the original pack file os.unlink(original_pack_path) - def _get_archive_path_from_pack_id(self, pack_id): + def _get_archive_path_from_pack_id(self, pack_id) -> str: """ Return the path to the archived pack. @@ -2921,6 +2927,34 @@ def _set_compresslevel(self, zinfo): "Compression algorithm is in-compatible with pack archiving." ) + def lossen_archive(self, archive_file: Union[str, Path], dst: Union[str, Path]): + """ + Extract archived pack into a destination folder as loose objects + + The destination folder must not exist to begin with. + This method renames the file names (which may be truncated) to their full hash values. + The destination folder can be copied into "loose" folder of a container which effectively + imports the data from the archive into that container. + """ + assert not Path(dst).exists(), "Destination folder already exists!" + with zipfile.ZipFile(archive_file, "r") as zfile: + zfile.extractall(path=dst) + + # Walk the directory and rename files according to the configuration of this container + for dirname, _, files in list(os.walk(dst)): + for file in files: + fullname = os.path.join(dirname, file) + with open(fullname, "rb") as fhandle: + hash_value, _ = compute_hash_and_size( + fhandle, hash_type=self.hash_type + ) + # Rename the file + new_filename = hash_value[self.loose_prefix_len :] + # Make parent directories if not exist already + Path(new_filename).parent.mkdir(exist_ok=True) + # Rename the files + os.rename(fullname, os.path.join(dirname, new_filename)) + def minimum_length_without_duplication(names, min_length=8): """ diff --git a/tests/test_container.py b/tests/test_container.py index e9a63e0..523b393 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3397,6 +3397,30 @@ def test_archive(temp_dir): assert len(zfile.infolist()) == 4 zfile.testzip() + # Test loosen the objects and then import them into a new container + temp_container.lossen_archive( + temp_container._get_pack_path_from_pack_id(1), + os.path.join(temp_dir, "temp_loose"), + ) + temp_container2 = Container(os.path.join(temp_dir, "sub")) + temp_container2.init_container() + + shutil.copytree( + os.path.join(temp_dir, "temp_loose"), + os.path.join(temp_dir, "sub/loose"), + dirs_exist_ok=True, + ) + assert temp_container2.count_objects()["loose"] == 4 + temp_container2.pack_all_loose() + temp_container2.clean_storage() + assert temp_container2.count_objects()["packed"] == 4 + assert temp_container2.count_objects()["loose"] == 0 + size2 = temp_container2.get_total_size() + size2["total_size_packfiles_on_disk"] = 10 * 4 + # Validate that we are all good + for value in temp_container2.validate().values(): + assert not value + # Important before exiting from the tests temp_container.close() From 637a0011de0188f3a27ad283316b73116d786d05 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Sun, 29 May 2022 19:00:12 +0100 Subject: [PATCH 05/13] Update tests and docstrings --- disk_objectstore/container.py | 52 ++++++++++++++++------------------- disk_objectstore/utils.py | 14 ++++++++++ tests/test_container.py | 14 +++++----- tests/test_utils.py | 7 +++++ 4 files changed, 52 insertions(+), 35 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 0cf5bc4..9c01aa1 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -59,6 +59,7 @@ get_stream_decompresser, is_known_hash, merge_sorted, + minimum_length_without_duplication, nullcontext, rename_callback, safe_flush_to_disk, @@ -253,15 +254,8 @@ def _get_pack_path_from_pack_id( ), f"Invalid pack ID {pack_id}" # Are we trying to read from an archived pack? - session = self._get_session() - archived = ( - session.execute( - select(Pack).filter_by(pack_id=pack_id, state=PackState.ARCHIVED.value) - ) - .scalars() - .first() - ) - if archived is not None: + archived = self.get_archived_pack_ids() + if pack_id in archived: return self._get_archive_path_from_pack_id(pack_id) return os.path.join(self._get_pack_folder(), pack_id) @@ -2748,8 +2742,15 @@ def archive_pack( This is accepted as a slow operation. The destination may not be on the same file system, e.g. the archive folder may be on a different (networked) file system. + + :param pack_id: ID of the pack to be archived + :param run_read_test: Test reading the archive file before committing changes. + :trim_filenames: Trim the filenames in the ZIP archive created so it only contains the first + few characters of the hash. + """ + pack_id = str(pack_id) session = self._get_cached_session() one_object_in_pack = session.execute( select(Obj.id).where(Obj.pack_id == pack_id).limit(1) @@ -2868,7 +2869,7 @@ def archive_pack( # We can now delete the original pack file os.unlink(original_pack_path) - def _get_archive_path_from_pack_id(self, pack_id) -> str: + def _get_archive_path_from_pack_id(self, pack_id: Union[str, int]) -> str: """ Return the path to the archived pack. @@ -2896,12 +2897,21 @@ def _get_archive_path_from_pack_id(self, pack_id) -> str: ) return pack.location - def _get_archive_folder(self): - """Return folder of the archive""" + def _get_archive_folder(self) -> str: + """Return folder of the default `archive` folder""" return os.path.join(self._folder, "archives") - def _validate_archvie_file(self, fpath, obj_dicts: List[Dict[str, Any]]) -> bool: - """Test reading from an archive file""" + def _validate_archvie_file( + self, fpath: Union[Path, str], obj_dicts: List[Dict[str, Any]] + ) -> bool: + """ + Test reading from an archive file using the interface in this package. + + :param fpath: Path to the archive file + :obj_dicts: A list of dictionary containing the information of each objects to be read + + :returns: True if everything is OK, False if mismatch in hash is found. + """ if len(obj_dicts) < 1: return False with open(fpath, "rb") as fhandle: @@ -2954,17 +2964,3 @@ def lossen_archive(self, archive_file: Union[str, Path], dst: Union[str, Path]): Path(new_filename).parent.mkdir(exist_ok=True) # Rename the files os.rename(fullname, os.path.join(dirname, new_filename)) - - -def minimum_length_without_duplication(names, min_length=8): - """ - Find how many characters is needed to ensure there is no conflict among a set of filenames - """ - length = min_length - length_ok = False - while not length_ok: - length += 1 - trimmed = {name[:length] for name in names} - length_ok = len(trimmed) == len(names) - - return length diff --git a/disk_objectstore/utils.py b/disk_objectstore/utils.py index 3cbb571..d4859ff 100644 --- a/disk_objectstore/utils.py +++ b/disk_objectstore/utils.py @@ -1279,3 +1279,17 @@ def merge_sorted(iterator1: Iterable[Any], iterator2: Iterable[Any]) -> Iterator for item, _ in detect_where_sorted(iterator1, iterator2): # Whereever it is (only left, only right, on both) I return the object. yield item + + +def minimum_length_without_duplication(names, min_length=8): + """ + Find how many characters is needed to ensure there is no conflict among a set of filenames + """ + length = min_length - 1 + length_ok = False + while not length_ok: + length += 1 + trimmed = {name[:length] for name in names} + length_ok = len(trimmed) == len(names) + + return length diff --git a/tests/test_container.py b/tests/test_container.py index 523b393..9e0f64b 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -1073,7 +1073,7 @@ def test_sizes( def test_get_objects_stream_closes(temp_container, generate_random_data): """Test that get_objects_stream_and_meta closes intermediate streams. - I also check that at most one additional file is open at any given time. + I also check that at most two additional file is open at any given time. .. note: apparently, at least on my Mac, even if I forget to close a file, this is automatically closed when it goes out of scope - so I add also the test that, inside the loop, at most one more file is open. @@ -1096,7 +1096,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): obj_md5s.keys(), skip_if_missing=True ): # I don't use the triplets - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 # Check that at the end nothing is left open assert len(current_process.open_files()) == start_open_files @@ -1106,7 +1106,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): ) as triplets: # I loop over the triplets, but I don't do anything for _ in triplets: - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 # Check that at the end nothing is left open assert len(current_process.open_files()) == start_open_files @@ -1117,7 +1117,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): ) as triplets: # I loop over the triplets, but I don't do anything for _, stream, _ in triplets: - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 stream.read() # Check that at the end nothing is left open @@ -1134,7 +1134,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): with temp_container.get_objects_stream_and_meta(obj_md5s.keys()): # I don't use the triplets - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 # Check that at the end nothing is left open assert len(current_process.open_files()) == start_open_files @@ -1142,7 +1142,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): with temp_container.get_objects_stream_and_meta(obj_md5s.keys()) as triplets: # I loop over the triplets, but I don't do anything for _ in triplets: - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 # Check that at the end nothing is left open assert len(current_process.open_files()) == start_open_files @@ -1153,7 +1153,7 @@ def test_get_objects_stream_closes(temp_container, generate_random_data): ) as triplets: # I loop over the triplets, but I don't do anything for _, stream, _ in triplets: - assert len(current_process.open_files()) <= start_open_files + 1 + assert len(current_process.open_files()) <= start_open_files + 2 stream.read() # Check that at the end nothing is left open assert len(current_process.open_files()) == start_open_files diff --git a/tests/test_utils.py b/tests/test_utils.py index 1c85731..a96a189 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1680,3 +1680,10 @@ def test_unknown_compressers(): utils.get_compressobj_instance(invalid) with pytest.raises(ValueError): utils.get_stream_decompresser(invalid) + + +def test_minimum_length_compute(): + """Test finding minmum string length without duplication""" + assert utils.minimum_length_without_duplication(["a", "b"]) == 8 + assert utils.minimum_length_without_duplication(["aaaaaaaa", "bbbbbbbb"]) == 8 + assert utils.minimum_length_without_duplication(["aaaaaaaac", "aaaaaaaab"]) == 9 From 37870b4082e4981e360103a1b49a4968c92d94d7 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Sun, 29 May 2022 22:26:56 +0100 Subject: [PATCH 06/13] Update precommit settings --- .pre-commit-config.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index dce4dbe..971897c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -64,6 +64,7 @@ repos: rev: 21.12b0 hooks: - id: black + additional_dependencies: ['click<8.1'] - repo: https://github.com/PyCQA/pylint rev: v2.12.2 From d41ed0f5481cd7de103f956eebc4566f8c35e024 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Mon, 30 May 2022 08:06:23 +0100 Subject: [PATCH 07/13] Fix a bug about pack_id handling Sometimes it is a string, sometimes it can be an int. sqlite does not seem to care? But matching inside python differentiate the two. --- disk_objectstore/container.py | 11 ++++++----- tests/test_container.py | 10 +++++++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 9c01aa1..40198c3 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -254,7 +254,7 @@ def _get_pack_path_from_pack_id( ), f"Invalid pack ID {pack_id}" # Are we trying to read from an archived pack? - archived = self.get_archived_pack_ids() + archived = self.get_archived_pack_ids(return_str=True) if pack_id in archived: return self._get_archive_path_from_pack_id(pack_id) @@ -264,14 +264,16 @@ def _get_pack_index_path(self) -> str: """Return the path to the SQLite file containing the index of packed objects.""" return os.path.join(self._folder, f"packs{self._PACK_INDEX_SUFFIX}") - def get_archived_pack_ids(self) -> List[int]: + def get_archived_pack_ids(self, return_str=False) -> Union[List[int], List[str]]: """Return ids of the archived packs""" session = self._get_cached_session() archived = session.execute( select(Pack.pack_id).filter_by(state=PackState.ARCHIVED.value) ).all() # Convert the string representation back to int - return [int(entry[0]) for entry in archived] + if return_str is False: + return [int(entry[0]) for entry in archived] + return [entry[0] for entry in archived] def _get_pack_id_to_write_to(self) -> int: """Return the pack ID to write the next object. @@ -1252,8 +1254,7 @@ def _list_packs(self, include_archived=True) -> Iterator[str]: # Include also archived packs if include_archived: - for pack_id in self.get_archived_pack_ids(): - yield str(pack_id) + yield from self.get_archived_pack_ids(return_str=True) def list_all_objects(self) -> Iterator[str]: """Iterate of all object hashkeys. diff --git a/tests/test_container.py b/tests/test_container.py index 9e0f64b..7561ea0 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3518,10 +3518,11 @@ def test_get_pack_id_with_archive(temp_dir): # Now, the next object should writ to pack 2, since it is not ful assert temp_container._get_pack_id_to_write_to() == 2 # Mark the third pack as ARCHIVE + to_archive = packs[-1].pack_id session.execute( update(Pack) - .where(Pack.id == packs[-1].id) - .values(state=PackState.ARCHIVED.value, location="/tmp/2.zip") + .where(Pack.pack_id == str(to_archive)) + .values(state=PackState.ARCHIVED.value, location=f"/tmp/{to_archive}.zip") ) session.commit() @@ -3529,4 +3530,7 @@ def test_get_pack_id_with_archive(temp_dir): assert temp_container._get_pack_id_to_write_to() == 3 # Getting the "pack_path" for pack 2 should now point to the custom location - assert temp_container._get_pack_path_from_pack_id(2) == "/tmp/2.zip" + assert ( + temp_container._get_pack_path_from_pack_id(to_archive) + == f"/tmp/{to_archive}.zip" + ) From 2ddf5613d76f6fdbc49bd636c766051c0043f76b Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Mon, 30 May 2022 09:45:50 +0100 Subject: [PATCH 08/13] Added methods to get and set archive locations This allows archive files to exist outside of the container folder. --- disk_objectstore/cli.py | 83 +++++++++++++++++++++++++++++++++++ disk_objectstore/container.py | 53 ++++++++++++++++++++++ tests/test_container.py | 54 +++++++++++++++++++---- 3 files changed, 182 insertions(+), 8 deletions(-) diff --git a/disk_objectstore/cli.py b/disk_objectstore/cli.py index 86c484f..0920071 100644 --- a/disk_objectstore/cli.py +++ b/disk_objectstore/cli.py @@ -122,3 +122,86 @@ def optimize( container.clean_storage(vacuum=vacuum) size = sum(f.stat().st_size for f in dostore.path.glob("**/*") if f.is_file()) click.echo(f"Final container size: {round(size/1000, 2)} Mb") + + +@main.group("archive") +def archive(): + """ + Interface for managing archived packs. + + An archived pack is essentially an ZIP file and will no longer be considered for writing new + data. Once an pack is archived, it can be moved to other locations, for example, networked storages. + """ + + +@archive.command("list") +@pass_dostore +def archive_list(dosstore: ContainerContext): + """List all archives in the container""" + + with dosstore.container as container: + location = container.get_archive_locations() + click.echo(json.dumps(location, indent=2)) + + +@archive.command("update-location") +@click.argument("pack_id") +@click.argument("new_location") +@click.option("--force", help="Skip checks if passed", is_flag=True, default=False) +@pass_dostore +def archive_update_location( + dosstore: ContainerContext, pack_id: str, new_location: str, force: bool +): + """Update the location of archive files""" + + with dosstore.container as container: + container._update_archive_location( # pylint: disable=protected-access + pack_id, new_location, force + ) + click.echo(f"Updated location of pack {pack_id} to {new_location}") + + +@archive.command("create") +@click.argument("--pack_id") +@click.option( + "--validate/--no-validate", + show_default=True, + help="Validate the created archive or not.", +) +@click.option( + "--trim-names/--no-trim-names", + show_default=True, + help="Trim the filenames in the archive, reduce storage overheads.", +) +@pass_dostore +def archive_create( + dosstore: ContainerContext, pack_id: str, validate: bool, trim_names: bool +): + """Turn the pack_id into an archive pack""" + with dosstore.container as container: + archives = container.get_archived_pack_ids(return_str=True) + if pack_id in archives: + raise click.Abort(f"Pack {pack_id} is already archived!") + container.archive_pack( + pack_id, run_read_test=validate, trim_filenames=trim_names + ) + location = container.get_archive_locations()[pack_id] + click.echo(f"Successfully archvied pack {pack_id} at {location}") + + +@archive.command("extract") +@click.argument("archive_path") +@click.argument("destination", type=click.Path(exists=False)) +def archive_extract( + dosstore: ContainerContext, + archive_path: str, + destination: str, +): + """ + Extract an existing archive and renames the files in the destination folder + the same that used for the loose objects, so that they can be imported into a another container. + """ + + with dosstore.container as container: + container.lossen_archive(archive_path, destination) + click.echo(f"Objects from {archive_path} have been extracted to {destination}") diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 40198c3..bc8bd0e 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -2965,3 +2965,56 @@ def lossen_archive(self, archive_file: Union[str, Path], dst: Union[str, Path]): Path(new_filename).parent.mkdir(exist_ok=True) # Rename the files os.rename(fullname, os.path.join(dirname, new_filename)) + + def get_archive_locations(self) -> Dict[str, str]: + """ + Return the location of each archived packs + """ + paths = {} + for pack_id in self.get_archived_pack_ids(return_str=True): + paths[str(pack_id)] = self._get_pack_path_from_pack_id(pack_id) + return paths + + def _update_archive_location(self, pack_id, location, force=False): + """ + Set the location of archive packs + """ + location = Path(location) + + if str(pack_id) not in self.get_archived_pack_ids(return_str=True): + raise ValueError(f"Pack {pack_id} is not an archived pack.") + + if not force and not Path(location).exists(): + raise ValueError(f"Suppied pack location {location} does not exist!") + + # Update the location and validate the new archive + + session = self._get_cached_session() + result = session.execute( + update(Pack) + .where(Pack.pack_id == str(pack_id)) + .values(location=str(location)) + ) + # Check we have actually updated the content + assert result.rowcount == 1 # type: ignore + + if not force: + # See if we can read the first few objects from the new archive + objs = ( + session.execute(select(Obj).filter_by(pack_id=int(pack_id)).limit(10)) + .scalars() + .all() + ) + try: + for obj in objs: + with self.get_object_stream(obj.hashkey) as stream: + hashkey, _ = compute_hash_and_size(stream, self.hash_type) + assert ( + hashkey == obj.hashkey + ), "Error reading object from the new archive location!" + except Exception as error: + session.rollback() + raise error + + # All good - commit the changes + session.commit() diff --git a/tests/test_container.py b/tests/test_container.py index 7561ea0..c15ab90 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -1,5 +1,5 @@ """Test of the object-store container module.""" -# pylint: disable=too-many-lines,protected-access +# pylint: disable=too-many-lines,protected-access, redefined-outer-name import functools import hashlib import io @@ -10,6 +10,7 @@ import stat import tempfile import zipfile +from pathlib import Path import psutil import pytest @@ -3330,8 +3331,9 @@ def test_unknown_compressers(temp_container, compression_algorithm): ) -def test_archive(temp_dir): - """Test the repacking functionality.""" +@pytest.fixture +def container_with_archive(temp_dir): + """Return an contain with pack 1 archived""" temp_container = Container(temp_dir) temp_container.init_container(clear=True, pack_size_target=39) @@ -3375,10 +3377,16 @@ def test_archive(temp_dir): size = temp_container.get_total_size() assert size["total_size_packed"] == 10 * len(data) assert size["total_size_packfiles_on_disk"] == 10 * len(data) - - # I now archive the pack temp_container.archive_pack(1) + yield temp_dir, temp_container, data, hashkeys + temp_container.close() + + +def test_archive(container_with_archive): + """Test the repacking functionality.""" + temp_dir, temp_container, data, hashkeys = container_with_archive + assert temp_container._get_pack_path_from_pack_id(1).endswith("zip") # Can still read everything @@ -3405,10 +3413,10 @@ def test_archive(temp_dir): temp_container2 = Container(os.path.join(temp_dir, "sub")) temp_container2.init_container() + shutil.rmtree(os.path.join(temp_dir, "sub/loose")) shutil.copytree( os.path.join(temp_dir, "temp_loose"), os.path.join(temp_dir, "sub/loose"), - dirs_exist_ok=True, ) assert temp_container2.count_objects()["loose"] == 4 temp_container2.pack_all_loose() @@ -3421,8 +3429,38 @@ def test_archive(temp_dir): for value in temp_container2.validate().values(): assert not value - # Important before exiting from the tests - temp_container.close() + +def test_archive_path_settings(container_with_archive): + """Setting getting/setting container paths""" + _, temp_container, _, _ = container_with_archive + + assert "1" in temp_container.get_archive_locations() + assert temp_container.get_archive_locations()["1"].endswith( + f"{temp_container.container_id}-1.zip" + ) + + location = Path(temp_container.get_archive_locations()["1"]) + new_location = location.with_name("11.zip") + # Update the location + with pytest.raises(ValueError): + temp_container._update_archive_location(1, new_location) + assert temp_container.get_archive_locations()["1"] == str(location) + + # New location exists, but contains invalid data + Path(new_location).write_text("aa") + with pytest.raises(ValueError): + temp_container._update_archive_location(1, new_location) + assert temp_container.get_archive_locations()["1"] == str(location) + + # Not valid, but we forced it + temp_container._update_archive_location(1, new_location, force=True) + assert temp_container.get_archive_locations()["1"] == str(new_location) + temp_container._update_archive_location(1, location) + + # This should work + location.rename(new_location) + temp_container._update_archive_location(1, new_location) + assert temp_container.get_archive_locations()["1"] == str(new_location) def test_get_archive_path(temp_container): From c8e2c948851c67bf7ddcb180b87710bd97f6e520 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Mon, 30 May 2022 09:53:43 +0100 Subject: [PATCH 09/13] get/set archive location and cli Allow archive locations to be set/updated, alow added CLI --- disk_objectstore/container.py | 7 +++++-- tests/test_container.py | 9 +++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index bc8bd0e..503776e 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -2855,7 +2855,6 @@ def archive_pack( .where(Pack.pack_id == pack_id) .values( state=PackState.ARCHIVED.value, - location=self._get_archive_path_from_pack_id(pack_id), ) ) @@ -2896,6 +2895,10 @@ def _get_archive_path_from_pack_id(self, pack_id: Union[str, int]) -> str: return os.path.join( self._get_archive_folder(), self.container_id + "-" + pack_id + ".zip" ) + # We have a location set, it is an relative path we set it relative to the container folder + if not Path(pack.location).is_absolute(): + return os.path.join(self._folder, pack.location) + return pack.location def _get_archive_folder(self) -> str: @@ -2972,7 +2975,7 @@ def get_archive_locations(self) -> Dict[str, str]: """ paths = {} for pack_id in self.get_archived_pack_ids(return_str=True): - paths[str(pack_id)] = self._get_pack_path_from_pack_id(pack_id) + paths[str(pack_id)] = self._get_archive_path_from_pack_id(pack_id) return paths def _update_archive_location(self, pack_id, location, force=False): diff --git a/tests/test_container.py b/tests/test_container.py index c15ab90..ea3a8fd 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3513,6 +3513,15 @@ def test_get_archive_path(temp_container): path = temp_container._get_archive_path_from_pack_id(996) assert path == "/tmp/archive.zip" + # Relative path is relative to the container base folder + pack = Pack( + pack_id="995", state=PackState.ARCHIVED.value, location="archive2/archive.zip" + ) + session.add(pack) + session.commit() + path = temp_container._get_archive_path_from_pack_id(995) + assert path == os.path.join(temp_container._folder, "archive2/archive.zip") + def test_get_pack_id_with_archive(temp_dir): """Test get_pack_id_to_write_to with archived packs""" From bc62f1ee671282609803edf42330780ac86e31d3 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Mon, 30 May 2022 10:18:16 +0100 Subject: [PATCH 10/13] Fix for tests on window --- tests/test_container.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/tests/test_container.py b/tests/test_container.py index ea3a8fd..b668288 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3458,6 +3458,7 @@ def test_archive_path_settings(container_with_archive): temp_container._update_archive_location(1, location) # This should work + os.unlink(new_location) location.rename(new_location) temp_container._update_archive_location(1, new_location) assert temp_container.get_archive_locations()["1"] == str(new_location) @@ -3505,13 +3506,13 @@ def test_get_archive_path(temp_container): + "-997.zip" ) - pack = Pack( - pack_id="996", state=PackState.ARCHIVED.value, location="/tmp/archive.zip" - ) + temp_dir = temp_container._folder + abs_archive = os.path.join(temp_dir, "temp_archive/2.zip") + pack = Pack(pack_id="996", state=PackState.ARCHIVED.value, location=abs_archive) session.add(pack) session.commit() path = temp_container._get_archive_path_from_pack_id(996) - assert path == "/tmp/archive.zip" + assert path == abs_archive # Relative path is relative to the container base folder pack = Pack( @@ -3522,6 +3523,8 @@ def test_get_archive_path(temp_container): path = temp_container._get_archive_path_from_pack_id(995) assert path == os.path.join(temp_container._folder, "archive2/archive.zip") + temp_container.close() + def test_get_pack_id_with_archive(temp_dir): """Test get_pack_id_to_write_to with archived packs""" @@ -3569,7 +3572,10 @@ def test_get_pack_id_with_archive(temp_dir): session.execute( update(Pack) .where(Pack.pack_id == str(to_archive)) - .values(state=PackState.ARCHIVED.value, location=f"/tmp/{to_archive}.zip") + .values( + state=PackState.ARCHIVED.value, + location=os.path.join(temp_dir, f"{to_archive}.zip"), + ) ) session.commit() @@ -3577,7 +3583,8 @@ def test_get_pack_id_with_archive(temp_dir): assert temp_container._get_pack_id_to_write_to() == 3 # Getting the "pack_path" for pack 2 should now point to the custom location - assert ( - temp_container._get_pack_path_from_pack_id(to_archive) - == f"/tmp/{to_archive}.zip" + assert temp_container._get_pack_path_from_pack_id(to_archive) == os.path.join( + temp_dir, f"{to_archive}.zip" ) + + temp_container.close() From 083d63f83120126ea3401a8d56bcf0f0c2b49cb3 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Mon, 30 May 2022 10:51:55 +0100 Subject: [PATCH 11/13] CLI update --- disk_objectstore/cli.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/disk_objectstore/cli.py b/disk_objectstore/cli.py index 0920071..38c1f23 100644 --- a/disk_objectstore/cli.py +++ b/disk_objectstore/cli.py @@ -162,7 +162,7 @@ def archive_update_location( @archive.command("create") -@click.argument("--pack_id") +@click.argument("pack_id") @click.option( "--validate/--no-validate", show_default=True, @@ -192,6 +192,7 @@ def archive_create( @archive.command("extract") @click.argument("archive_path") @click.argument("destination", type=click.Path(exists=False)) +@pass_dostore def archive_extract( dosstore: ContainerContext, archive_path: str, From 47d4400eae5c330fae56ae6320e7d97fc2b89371 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 2 Jun 2022 11:25:29 +0100 Subject: [PATCH 12/13] Docstring update --- disk_objectstore/cli.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/disk_objectstore/cli.py b/disk_objectstore/cli.py index 38c1f23..a539e39 100644 --- a/disk_objectstore/cli.py +++ b/disk_objectstore/cli.py @@ -152,7 +152,11 @@ def archive_list(dosstore: ContainerContext): def archive_update_location( dosstore: ContainerContext, pack_id: str, new_location: str, force: bool ): - """Update the location of archive files""" + """ + Update the location of archive files + + NOTE: relative path is interpreted as relative to the root folder of the container. + """ with dosstore.container as container: container._update_archive_location( # pylint: disable=protected-access From c7af5afe5dd78d3677f87c53b5edc43d96a93bf1 Mon Sep 17 00:00:00 2001 From: Bonan Zhu Date: Thu, 2 Jun 2022 12:10:02 +0100 Subject: [PATCH 13/13] Implement CompressMode other than KEEP for repacking Alow CompressMode.YES and CompressMode.NO to be used when repacking. Previously, there was no way to change compression once an object is stored in a packed file. --- disk_objectstore/container.py | 45 +++++++++++++++----- tests/test_container.py | 79 +++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 11 deletions(-) diff --git a/disk_objectstore/container.py b/disk_objectstore/container.py index 503776e..e8dfe8d 100644 --- a/disk_objectstore/container.py +++ b/disk_objectstore/container.py @@ -2578,7 +2578,7 @@ def repack(self, compress_mode: CompressMode = CompressMode.KEEP) -> None: self.repack_pack(pack_id, compress_mode=compress_mode) self._vacuum() - def repack_pack( + def repack_pack( # pylint: disable=too-many-branches, too-many-statements self, pack_id: str, compress_mode: CompressMode = CompressMode.KEEP ) -> None: """Perform a repack of a given pack object. @@ -2586,13 +2586,9 @@ def repack_pack( This is a maintenance operation. :param compress_mode: must be a valid CompressMode enum type. - Currently, the only implemented mode is KEEP, meaning that it - preserves the same compression (this means that repacking is *much* faster - as it can simply transfer the bytes without decompressing everything first, - and recompressing it back again). """ - if compress_mode != CompressMode.KEEP: - raise NotImplementedError("Only keep method currently implemented") + # if compress_mode != CompressMode.KEEP: + # raise NotImplementedError("Only keep method currently implemented") assert ( pack_id != self._REPACK_PACK_ID @@ -2649,26 +2645,53 @@ def repack_pack( # Since I am assuming above that the method is `KEEP`, I will just transfer # the bytes. Otherwise I have to properly take into account compression in the # source and in the destination. + read_handle: Union[PackedObjectReader, ZlibStreamDecompresser] read_handle = PackedObjectReader(read_pack, offset, length) obj_dict = {} obj_dict["id"] = rowid obj_dict["hashkey"] = hashkey obj_dict["pack_id"] = self._REPACK_PACK_ID - obj_dict["compressed"] = compressed obj_dict["size"] = size obj_dict["offset"] = write_pack_handle.tell() + if compress_mode == CompressMode.KEEP: + obj_dict["compressed"] = compressed + elif compress_mode == CompressMode.YES: + obj_dict["compressed"] = True + elif compress_mode == CompressMode.NO: + obj_dict["compressed"] = False # Transfer data in chunks. # No need to rehash - it's the same container so the same hash. - # Not checking the compression on source or destination - we are assuming - # for now that the mode is KEEP. + # Compression cases + # Original Mode Action + # - KEEP Copy in chunks + # yes KEEP Copy in chunks + # no KEEP Copy in chunks + # yes NO Copy in chunks, decompress the stream + # no NO Copy in chunks + # yes YES Copy in chunks + # no YES Copy in chunks, compress when writing + if compress_mode == CompressMode.YES and not compressed: + compressobj = self._get_compressobj_instance() + else: + compressobj = None + + # Read compressed, but write uncompressed + if compressed is True and compress_mode == CompressMode.NO: + read_handle = self._get_stream_decompresser()(read_handle) + while True: chunk = read_handle.read(self._CHUNKSIZE) if chunk == b"": # Returns an empty bytes object on EOF. break - write_pack_handle.write(chunk) + if compressobj is not None: + write_pack_handle.write(compressobj.compress(chunk)) + else: + write_pack_handle.write(chunk) + if compressobj is not None: + write_pack_handle.write(compressobj.flush()) obj_dict["length"] = write_pack_handle.tell() - obj_dict["offset"] # Appending for later bulk commit diff --git a/tests/test_container.py b/tests/test_container.py index b668288..587e17e 100644 --- a/tests/test_container.py +++ b/tests/test_container.py @@ -3270,6 +3270,85 @@ def test_repack(temp_dir): temp_container.close() +def test_repack_compress_modes(temp_dir): + """ + Test the repacking functionality and handling of CompressMode. + """ + temp_container = Container(temp_dir) + temp_container.init_container(clear=True, pack_size_target=39) + + # data of 10 bytes each. Will fill two packs. + data = [ + b"-123456789", + b"a123456789", + b"b123456789", + b"c123456789", + b"d123456789", + b"e123456789", + b"f123456789", + b"g123456789", + b"h123456789", + ] + compress_flags = [False, True, True, False, False, False, True, True, False] + + hashkeys = [] + # Add them one by one, so I am sure in wich pack they go + for datum, compress in zip(data, compress_flags): + hashkeys.append( + temp_container.add_objects_to_pack([datum], compress=compress)[0] + ) + + assert temp_container.get_object_meta(hashkeys[0])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[1])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[2])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[3])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[4])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[5])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[6])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[7])["pack_id"] == 2 + assert temp_container.get_object_meta(hashkeys[8])["pack_id"] == 2 + + # I check which packs exist + assert sorted(temp_container._list_packs()) == [ + "0", + "1", + "2", + ] + + # I now repack + temp_container.repack_pack(0, compress_mode=CompressMode.NO) + assert temp_container.get_object_meta(hashkeys[0])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[0])["pack_compressed"] is False + assert temp_container.get_object_meta(hashkeys[1])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[1])["pack_compressed"] is False + assert temp_container.get_object_meta(hashkeys[2])["pack_id"] == 0 + assert temp_container.get_object_meta(hashkeys[2])["pack_compressed"] is False + + temp_container.repack_pack(1, compress_mode=CompressMode.YES) + assert temp_container.get_object_meta(hashkeys[3])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[3])["pack_compressed"] is True + assert temp_container.get_object_meta(hashkeys[4])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[4])["pack_compressed"] is True + assert temp_container.get_object_meta(hashkeys[5])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[5])["pack_compressed"] is True + assert temp_container.get_object_meta(hashkeys[6])["pack_id"] == 1 + assert temp_container.get_object_meta(hashkeys[6])["pack_compressed"] is True + + temp_container.repack_pack(1, compress_mode=CompressMode.KEEP) + assert temp_container.get_object_meta(hashkeys[7])["pack_id"] == 2 + assert temp_container.get_object_meta(hashkeys[7])["pack_compressed"] is True + assert temp_container.get_object_meta(hashkeys[8])["pack_id"] == 2 + assert temp_container.get_object_meta(hashkeys[8])["pack_compressed"] is False + + # Check that the content is still correct + # Should not raise + errors = temp_container.validate() + assert not any(errors.values()) + + # Important before exiting from the tests + temp_container.close() + + def test_not_implemented_repacks(temp_container): """Check the error for not implemented repack methods.""" # We need to have at least one pack