Skip to content

Commit

Permalink
Improvements for the repository data migration
Browse files Browse the repository at this point in the history
  • Loading branch information
sphuber committed Oct 11, 2020
1 parent 6df445e commit 4ea60c7
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 30 deletions.
42 changes: 40 additions & 2 deletions aiida/backends/djsite/db/migrations/0047_migrate_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,60 @@

def migrate_repository(apps, _):
"""Migrate the repository."""
import json
import tempfile
from aiida.manage.configuration import get_profile

DbNode = apps.get_model('db', 'DbNode')

mapping_node_repository_metadata = utils.migrate_legacy_repository()
profile = get_profile()
node_count = DbNode.objects.count()
missing_node_uuids = []

mapping_node_repository_metadata, missing_sub_repo_folder = utils.migrate_legacy_repository(node_count)

if mapping_node_repository_metadata is None:
return

for node_uuid, repository_metadata in mapping_node_repository_metadata.items():
try:
# This can happen if the node was deleted but the repo folder wasn't, or the repo folder just never
# corresponded to an actual node. In any case, we don't want to fail but just log the warning.
node = DbNode.objects.get(uuid=node_uuid)
except ObjectDoesNotExist:
echo.echo_warning(f'repo contained folder for Node<{node_uuid}>, but the node does not exist, skipping.')
missing_node_uuids.append((node_uuid, repository_metadata))
else:
node.repository_metadata = repository_metadata
node.save()

if not profile.is_test_profile:

if missing_node_uuids:
prefix = 'migration-repository-missing-nodes-'
with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.json', dir='.', mode='w+') as handle:
json.dump(missing_node_uuids, handle)
echo.echo_warning(
'Detected node repository folders for nodes that do not exist in the database. The UUIDs of those '
f'nodes have been written to a log file: {handle.name}'
)

if missing_sub_repo_folder:
prefix = 'migration-repository-missing-subfolder-'
with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.json', dir='.', mode='w+') as handle:
json.dump(missing_sub_repo_folder, handle)
echo.echo_warning(
'Detected node repository folders that were missing the required subfolder `path` or `raw_input`. '
f'The paths of those nodes repository folders have been written to a log file: {handle.name}'
)

# If there were no nodes, most likely a new profile, there is not need to print the warning
if node_count:
import pathlib
echo.echo_warning(
'Migrated the file repository to the new disk object store. The old repository has not been deleted out'
f' of safety and can be found at {pathlib.Path(profile.repository_path, "repository")}.'
)


class Migration(migrations.Migration):
"""Migrate the file repository to the new disk object store based implementation."""
Expand Down
101 changes: 75 additions & 26 deletions aiida/backends/general/migrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from disk_objectstore import Container
from disk_objectstore.utils import LazyOpener

from aiida.cmdline.utils import echo
from aiida.common import json
from aiida.repository.backend import AbstractRepositoryBackend
from aiida.repository.common import File, FileType
Expand All @@ -33,7 +32,11 @@


class LazyFile(File):
"""Subclass of `File` where `key` also allows `LazyOpener` in addition to a string."""
"""Subclass of `File` where `key` also allows `LazyOpener` in addition to a string.
This subclass is necessary because the migration will be storing instances of `LazyOpener` as the `key` which should
normally only be a string. This subclass updates the `key` type check to allow this.
"""

def __init__(
self,
Expand Down Expand Up @@ -74,7 +77,13 @@ class MigrationRepository(Repository):


class NoopRepositoryBackend(AbstractRepositoryBackend):
"""Implementation of the ``AbstractRepositoryBackend`` where all write operations are no-ops."""
"""Implementation of the ``AbstractRepositoryBackend`` where all write operations are no-ops.
This repository backend is used to use the ``Repository`` interface to build repository metadata but instead of
actually writing the content of the current repository to disk elsewhere, it will simply open a lazy file opener.
In a subsequent step, all these streams are passed to the new Disk Object Store that will write their content
directly to pack files for optimal efficiency.
"""

def put_object_from_filelike(self, handle: io.BufferedIOBase) -> str:
"""Store the byte contents of a file in the repository.
Expand All @@ -94,43 +103,66 @@ def has_object(self, key: str) -> bool:
raise NotImplementedError()


def migrate_legacy_repository():
def migrate_legacy_repository(node_count):
"""Migrate the legacy file repository to the new disk object store and return mapping of repository metadata.
The format of the return value will be a dictionary where the keys are the UUIDs of the nodes whose repository
folder has contents have been migrated to the disk object store. The values are the repository metadata that contain
the keys for the generated files with which the files in the disk object store can be retrieved. The format of the
repository metadata follows exactly that of what is generated normally by the ORM.
This implementation consciously uses the ``Repository`` interface in order to not have to rewrite the logic that
builds the nested repository metadata based on the contents of a folder on disk. The advantage is that in this way
it is guarantee that the exact same repository metadata is generated as it would have during normal operation.
However, if the ``Repository`` interface or its implementation ever changes, it is possible that this solution will
have to be adapted and the significant parts of the implementation will have to be copy pasted here.
:return: mapping of node UUIDs onto the new repository metadata.
"""
# pylint: disable=too-many-locals
from aiida.manage.configuration import get_profile

profile = get_profile()
backend = NoopRepositoryBackend()
container = profile.get_repository_container()
repository = MigrationRepository(backend=backend)

# Initialize the new container: don't go through the profile, because that will not check if it already exists
filepath = pathlib.Path(profile.repository_path) / 'container'
basepath = pathlib.Path(profile.repository_path) / 'repository' / 'node'
container = Container(filepath)

if not container.is_initialised:
raise RuntimeError(f'the container {filepath} already exists.')
if not basepath.is_dir():
# If the database is empty, this is a new profile and so it is normal the repo folder doesn't exist. We simply
# return as there is nothing to migrate
if profile.is_test_profile or node_count == 0:
return None, None

raise RuntimeError(
f'the file repository `{basepath}` does not exist but the database is not empty, it contains {node_count} '
'nodes. Aborting the migration.'
)

basepath = pathlib.Path(profile.repository_path) / 'repository' / 'node'
if container.is_initialised:
raise RuntimeError(
f'the container {filepath} already exists. If you ran this migration before and it failed simply delete '
'this directory and restart the migration.'
)

if not basepath.is_dir():
echo.echo_warning(f'could not find the repository basepath {basepath}: nothing to migrate')
return
container.init_container(clear=True)

node_repository_dirpaths = get_node_repository_dirpaths(basepath)
node_repository_dirpaths, missing_sub_repo_folder = get_node_repository_dirpaths(basepath)

filepaths = []
streams = []
mapping_metadata = {}

# Loop over all the folders for each node that was found in the existing file repository and generate the repository
# metadata that will have to be stored on the node. Calling `put_object_from_tree` will generate the virtual
# hierarchy in memory, writing the files not actually to disk but opening lazy file handles, and then the call to
# `serialize_repository` serializes the virtual hierarchy into JSON storable dictionary. This will later be stored
# on the nodes in the database, and so it is added to the `mapping_metadata` which will be returned from this
# function. After having constructed the virtual hierarchy, we walk over the contents and take just the files and
# add the value (which is the `LazyOpener`) to the `streams` list as well as its relative path to `filepaths`.
for node_uuid, node_dirpath in node_repository_dirpaths.items():
repository.put_object_from_tree(node_dirpath)
metadata = serialize_repository(repository)
Expand All @@ -144,26 +176,33 @@ def migrate_legacy_repository():
# Reset the repository to a clean node repository, which removes the internal virtual file hierarchy
repository.reset()

hashkeys = container.add_streamed_objects_to_pack(streams, compress=True, open_streams=True)
# Free up the memory of this mapping that is no longer needed and can be big
del node_repository_dirpaths

hashkeys = container.add_streamed_objects_to_pack(streams, compress=False, open_streams=True)

# Regroup by node UUID
# Now all that remains is to go through all the generated repository metadata, stored for each node in the
# `mapping_metadata` and replace the "values" for all the files, which are currently still the `LazyOpener`
# instances, and replace them with the hashkey that was generated from its content by the DOS container.
for hashkey, (node_uuid, parts) in zip(hashkeys, filepaths):
repository_metadata = mapping_metadata[node_uuid]
filename = parts.pop()
file_object = repository_metadata['o']
for part in parts:
file_object = file_object[part]['o']
file_object[filename]['k'] = hashkey
functools.reduce(lambda objects, part: objects['o'].get(part), parts, repository_metadata)['k'] = hashkey

return mapping_metadata
del filepaths
del streams

return mapping_metadata, missing_sub_repo_folder


def get_node_repository_dirpaths(basepath):
"""Return a mapping of node UUIDs onto the path to their current repository folder in the old repository.
:return: dictionary of node UUID onto absolute filepath
:return: dictionary of node UUID onto absolute filepath and list of node repo missing one of the two known sub
folders, ``path`` or ``raw_input``, which is unexpected.
"""
mapping = {}
missing_sub_repo_folder = []
contains_both = []

for shard_one in basepath.iterdir():

Expand All @@ -184,18 +223,28 @@ def get_node_repository_dirpaths(basepath):
dirpath = basepath / shard_one / shard_two / shard_three
subdirs = [path.name for path in dirpath.iterdir()]

if 'path' in subdirs:
if 'path' in subdirs and 'raw_input' in subdirs:
contains_both.append(dirpath)
elif 'path' in subdirs:
path = dirpath / 'path'
elif 'raw_input' in subdirs:
path = dirpath / 'raw_input'
else:
echo.echo_warning(
f'skipping node repository folder {dirpath} as it does not contain `path` nor `raw_input`'
)
missing_sub_repo_folder.append(dirpath)

mapping[uuid] = path

return mapping
if contains_both:
raise RuntimeError(
f'The file repository `{basepath}` contained node repository folders that contained both the `path` as well'
' as the `raw_input` subfolders. This should not have happen, as the latter is used for calculation job '
'nodes, and the former for all other nodes. The migration will be aborted and the paths of the offending '
'node folders will be printed below. If you know which of the subpaths is incorrect, you can manually '
'delete it and then restart the migration. Here is the list of offending node folders:\n' +
'\n'.join(contains_both)
)

return mapping, missing_sub_repo_folder


def serialize_repository(repository: Repository) -> dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from alembic import op
from sqlalchemy import Integer, cast
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.sql import table, column
from sqlalchemy.sql import table, column, select, func

from aiida.backends.general.migrations import utils
from aiida.cmdline.utils import echo

# revision identifiers, used by Alembic.
revision = '1feaea71bd5a'
Expand All @@ -23,6 +24,10 @@

def upgrade():
"""Migrations for the upgrade."""
import json
import tempfile
from aiida.manage.configuration import get_profile

connection = op.get_bind()

DbNode = table(
Expand All @@ -32,7 +37,9 @@ def upgrade():
column('repository_metadata', JSONB),
)

mapping_node_repository_metadata = utils.migrate_legacy_repository()
profile = get_profile()
node_count = connection.execute(select([func.count()]).select_from(DbNode)).scalar()
mapping_node_repository_metadata, missing_sub_repo_folder = utils.migrate_legacy_repository(node_count)

if mapping_node_repository_metadata is None:
return
Expand All @@ -41,6 +48,25 @@ def upgrade():
value = cast(repository_metadata, JSONB)
connection.execute(DbNode.update().where(DbNode.c.uuid == node_uuid).values(repository_metadata=value))

if not profile.is_test_profile:

if missing_sub_repo_folder:
prefix = 'migration-repository-missing-subfolder-'
with tempfile.NamedTemporaryFile(prefix=prefix, suffix='.json', dir='.', mode='w+') as handle:
json.dump(missing_sub_repo_folder, handle)
echo.echo_warning(
'Detected node repository folders that were missing the required subfolder `path` or `raw_input`. '
f'The paths of those nodes repository folders have been written to a log file: {handle.name}'
)

# If there were no nodes, most likely a new profile, there is not need to print the warning
if node_count:
import pathlib
echo.echo_warning(
'Migrated the file repository to the new disk object store. The old repository has not been deleted out'
f' of safety and can be found at {pathlib.Path(get_profile().repository_path, "repository")}.'
)


def downgrade():
"""Migrations for the downgrade."""

0 comments on commit 4ea60c7

Please sign in to comment.