From 4ea60c7561b9cceac25d5830b4a5912b2f504703 Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Sun, 11 Oct 2020 22:33:32 +0200 Subject: [PATCH] Improvements for the repository data migration --- .../db/migrations/0047_migrate_repository.py | 42 +++++++- aiida/backends/general/migrations/utils.py | 101 +++++++++++++----- .../1feaea71bd5a_migrate_repository.py | 30 +++++- 3 files changed, 143 insertions(+), 30 deletions(-) diff --git a/aiida/backends/djsite/db/migrations/0047_migrate_repository.py b/aiida/backends/djsite/db/migrations/0047_migrate_repository.py index fcdf9dfadf..357951e389 100644 --- a/aiida/backends/djsite/db/migrations/0047_migrate_repository.py +++ b/aiida/backends/djsite/db/migrations/0047_migrate_repository.py @@ -23,22 +23,60 @@ def migrate_repository(apps, _): """Migrate the repository.""" + import json + import tempfile + from aiida.manage.configuration import get_profile + DbNode = apps.get_model('db', 'DbNode') - mapping_node_repository_metadata = utils.migrate_legacy_repository() + profile = get_profile() + node_count = DbNode.objects.count() + missing_node_uuids = [] + + mapping_node_repository_metadata, missing_sub_repo_folder = utils.migrate_legacy_repository(node_count) if mapping_node_repository_metadata is None: return for node_uuid, repository_metadata in mapping_node_repository_metadata.items(): try: + # This can happen if the node was deleted but the repo folder wasn't, or the repo folder just never + # corresponded to an actual node. In any case, we don't want to fail but just log the warning. node = DbNode.objects.get(uuid=node_uuid) except ObjectDoesNotExist: - echo.echo_warning(f'repo contained folder for Node<{node_uuid}>, but the node does not exist, skipping.') + missing_node_uuids.append((node_uuid, repository_metadata)) else: node.repository_metadata = repository_metadata node.save() + if not profile.is_test_profile: + + if missing_node_uuids: + prefix = 'migration-repository-missing-nodes-' + with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.json', dir='.', mode='w+') as handle: + json.dump(missing_node_uuids, handle) + echo.echo_warning( + 'Detected node repository folders for nodes that do not exist in the database. The UUIDs of those ' + f'nodes have been written to a log file: {handle.name}' + ) + + if missing_sub_repo_folder: + prefix = 'migration-repository-missing-subfolder-' + with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.json', dir='.', mode='w+') as handle: + json.dump(missing_sub_repo_folder, handle) + echo.echo_warning( + 'Detected node repository folders that were missing the required subfolder `path` or `raw_input`. ' + f'The paths of those nodes repository folders have been written to a log file: {handle.name}' + ) + + # If there were no nodes, most likely a new profile, there is not need to print the warning + if node_count: + import pathlib + echo.echo_warning( + 'Migrated the file repository to the new disk object store. The old repository has not been deleted out' + f' of safety and can be found at {pathlib.Path(profile.repository_path, "repository")}.' + ) + class Migration(migrations.Migration): """Migrate the file repository to the new disk object store based implementation.""" diff --git a/aiida/backends/general/migrations/utils.py b/aiida/backends/general/migrations/utils.py index 9cf414f3cf..2da9ea63a5 100644 --- a/aiida/backends/general/migrations/utils.py +++ b/aiida/backends/general/migrations/utils.py @@ -21,7 +21,6 @@ from disk_objectstore import Container from disk_objectstore.utils import LazyOpener -from aiida.cmdline.utils import echo from aiida.common import json from aiida.repository.backend import AbstractRepositoryBackend from aiida.repository.common import File, FileType @@ -33,7 +32,11 @@ class LazyFile(File): - """Subclass of `File` where `key` also allows `LazyOpener` in addition to a string.""" + """Subclass of `File` where `key` also allows `LazyOpener` in addition to a string. + + This subclass is necessary because the migration will be storing instances of `LazyOpener` as the `key` which should + normally only be a string. This subclass updates the `key` type check to allow this. + """ def __init__( self, @@ -74,7 +77,13 @@ class MigrationRepository(Repository): class NoopRepositoryBackend(AbstractRepositoryBackend): - """Implementation of the ``AbstractRepositoryBackend`` where all write operations are no-ops.""" + """Implementation of the ``AbstractRepositoryBackend`` where all write operations are no-ops. + + This repository backend is used to use the ``Repository`` interface to build repository metadata but instead of + actually writing the content of the current repository to disk elsewhere, it will simply open a lazy file opener. + In a subsequent step, all these streams are passed to the new Disk Object Store that will write their content + directly to pack files for optimal efficiency. + """ def put_object_from_filelike(self, handle: io.BufferedIOBase) -> str: """Store the byte contents of a file in the repository. @@ -94,7 +103,7 @@ def has_object(self, key: str) -> bool: raise NotImplementedError() -def migrate_legacy_repository(): +def migrate_legacy_repository(node_count): """Migrate the legacy file repository to the new disk object store and return mapping of repository metadata. The format of the return value will be a dictionary where the keys are the UUIDs of the nodes whose repository @@ -102,6 +111,12 @@ def migrate_legacy_repository(): the keys for the generated files with which the files in the disk object store can be retrieved. The format of the repository metadata follows exactly that of what is generated normally by the ORM. + This implementation consciously uses the ``Repository`` interface in order to not have to rewrite the logic that + builds the nested repository metadata based on the contents of a folder on disk. The advantage is that in this way + it is guarantee that the exact same repository metadata is generated as it would have during normal operation. + However, if the ``Repository`` interface or its implementation ever changes, it is possible that this solution will + have to be adapted and the significant parts of the implementation will have to be copy pasted here. + :return: mapping of node UUIDs onto the new repository metadata. """ # pylint: disable=too-many-locals @@ -109,28 +124,45 @@ def migrate_legacy_repository(): profile = get_profile() backend = NoopRepositoryBackend() - container = profile.get_repository_container() repository = MigrationRepository(backend=backend) # Initialize the new container: don't go through the profile, because that will not check if it already exists filepath = pathlib.Path(profile.repository_path) / 'container' + basepath = pathlib.Path(profile.repository_path) / 'repository' / 'node' container = Container(filepath) - if not container.is_initialised: - raise RuntimeError(f'the container {filepath} already exists.') + if not basepath.is_dir(): + # If the database is empty, this is a new profile and so it is normal the repo folder doesn't exist. We simply + # return as there is nothing to migrate + if profile.is_test_profile or node_count == 0: + return None, None + + raise RuntimeError( + f'the file repository `{basepath}` does not exist but the database is not empty, it contains {node_count} ' + 'nodes. Aborting the migration.' + ) - basepath = pathlib.Path(profile.repository_path) / 'repository' / 'node' + if container.is_initialised: + raise RuntimeError( + f'the container {filepath} already exists. If you ran this migration before and it failed simply delete ' + 'this directory and restart the migration.' + ) - if not basepath.is_dir(): - echo.echo_warning(f'could not find the repository basepath {basepath}: nothing to migrate') - return + container.init_container(clear=True) - node_repository_dirpaths = get_node_repository_dirpaths(basepath) + node_repository_dirpaths, missing_sub_repo_folder = get_node_repository_dirpaths(basepath) filepaths = [] streams = [] mapping_metadata = {} + # Loop over all the folders for each node that was found in the existing file repository and generate the repository + # metadata that will have to be stored on the node. Calling `put_object_from_tree` will generate the virtual + # hierarchy in memory, writing the files not actually to disk but opening lazy file handles, and then the call to + # `serialize_repository` serializes the virtual hierarchy into JSON storable dictionary. This will later be stored + # on the nodes in the database, and so it is added to the `mapping_metadata` which will be returned from this + # function. After having constructed the virtual hierarchy, we walk over the contents and take just the files and + # add the value (which is the `LazyOpener`) to the `streams` list as well as its relative path to `filepaths`. for node_uuid, node_dirpath in node_repository_dirpaths.items(): repository.put_object_from_tree(node_dirpath) metadata = serialize_repository(repository) @@ -144,26 +176,33 @@ def migrate_legacy_repository(): # Reset the repository to a clean node repository, which removes the internal virtual file hierarchy repository.reset() - hashkeys = container.add_streamed_objects_to_pack(streams, compress=True, open_streams=True) + # Free up the memory of this mapping that is no longer needed and can be big + del node_repository_dirpaths + + hashkeys = container.add_streamed_objects_to_pack(streams, compress=False, open_streams=True) - # Regroup by node UUID + # Now all that remains is to go through all the generated repository metadata, stored for each node in the + # `mapping_metadata` and replace the "values" for all the files, which are currently still the `LazyOpener` + # instances, and replace them with the hashkey that was generated from its content by the DOS container. for hashkey, (node_uuid, parts) in zip(hashkeys, filepaths): repository_metadata = mapping_metadata[node_uuid] - filename = parts.pop() - file_object = repository_metadata['o'] - for part in parts: - file_object = file_object[part]['o'] - file_object[filename]['k'] = hashkey + functools.reduce(lambda objects, part: objects['o'].get(part), parts, repository_metadata)['k'] = hashkey - return mapping_metadata + del filepaths + del streams + + return mapping_metadata, missing_sub_repo_folder def get_node_repository_dirpaths(basepath): """Return a mapping of node UUIDs onto the path to their current repository folder in the old repository. - :return: dictionary of node UUID onto absolute filepath + :return: dictionary of node UUID onto absolute filepath and list of node repo missing one of the two known sub + folders, ``path`` or ``raw_input``, which is unexpected. """ mapping = {} + missing_sub_repo_folder = [] + contains_both = [] for shard_one in basepath.iterdir(): @@ -184,18 +223,28 @@ def get_node_repository_dirpaths(basepath): dirpath = basepath / shard_one / shard_two / shard_three subdirs = [path.name for path in dirpath.iterdir()] - if 'path' in subdirs: + if 'path' in subdirs and 'raw_input' in subdirs: + contains_both.append(dirpath) + elif 'path' in subdirs: path = dirpath / 'path' elif 'raw_input' in subdirs: path = dirpath / 'raw_input' else: - echo.echo_warning( - f'skipping node repository folder {dirpath} as it does not contain `path` nor `raw_input`' - ) + missing_sub_repo_folder.append(dirpath) mapping[uuid] = path - return mapping + if contains_both: + raise RuntimeError( + f'The file repository `{basepath}` contained node repository folders that contained both the `path` as well' + ' as the `raw_input` subfolders. This should not have happen, as the latter is used for calculation job ' + 'nodes, and the former for all other nodes. The migration will be aborted and the paths of the offending ' + 'node folders will be printed below. If you know which of the subpaths is incorrect, you can manually ' + 'delete it and then restart the migration. Here is the list of offending node folders:\n' + + '\n'.join(contains_both) + ) + + return mapping, missing_sub_repo_folder def serialize_repository(repository: Repository) -> dict: diff --git a/aiida/backends/sqlalchemy/migrations/versions/1feaea71bd5a_migrate_repository.py b/aiida/backends/sqlalchemy/migrations/versions/1feaea71bd5a_migrate_repository.py index 2caa6be7f1..38323f7d04 100644 --- a/aiida/backends/sqlalchemy/migrations/versions/1feaea71bd5a_migrate_repository.py +++ b/aiida/backends/sqlalchemy/migrations/versions/1feaea71bd5a_migrate_repository.py @@ -10,9 +10,10 @@ from alembic import op from sqlalchemy import Integer, cast from sqlalchemy.dialects.postgresql import UUID, JSONB -from sqlalchemy.sql import table, column +from sqlalchemy.sql import table, column, select, func from aiida.backends.general.migrations import utils +from aiida.cmdline.utils import echo # revision identifiers, used by Alembic. revision = '1feaea71bd5a' @@ -23,6 +24,10 @@ def upgrade(): """Migrations for the upgrade.""" + import json + import tempfile + from aiida.manage.configuration import get_profile + connection = op.get_bind() DbNode = table( @@ -32,7 +37,9 @@ def upgrade(): column('repository_metadata', JSONB), ) - mapping_node_repository_metadata = utils.migrate_legacy_repository() + profile = get_profile() + node_count = connection.execute(select([func.count()]).select_from(DbNode)).scalar() + mapping_node_repository_metadata, missing_sub_repo_folder = utils.migrate_legacy_repository(node_count) if mapping_node_repository_metadata is None: return @@ -41,6 +48,25 @@ def upgrade(): value = cast(repository_metadata, JSONB) connection.execute(DbNode.update().where(DbNode.c.uuid == node_uuid).values(repository_metadata=value)) + if not profile.is_test_profile: + + if missing_sub_repo_folder: + prefix = 'migration-repository-missing-subfolder-' + with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.json', dir='.', mode='w+') as handle: + json.dump(missing_sub_repo_folder, handle) + echo.echo_warning( + 'Detected node repository folders that were missing the required subfolder `path` or `raw_input`. ' + f'The paths of those nodes repository folders have been written to a log file: {handle.name}' + ) + + # If there were no nodes, most likely a new profile, there is not need to print the warning + if node_count: + import pathlib + echo.echo_warning( + 'Migrated the file repository to the new disk object store. The old repository has not been deleted out' + f' of safety and can be found at {pathlib.Path(get_profile().repository_path, "repository")}.' + ) + def downgrade(): """Migrations for the downgrade."""