Skip to content

Commit

Permalink
Add migrations for existing file repositories
Browse files Browse the repository at this point in the history
The migration of an existing legacy file repository consists of:

 1. Make inventory of all files in the current file repository
 2. Write these files to the new backend (disk object store)
 3. Store the metadata of a node's repository contents, containing the
    virtual hierarchy to the corresponding `repository_metadata` column
    in the database.

The repository metadata will contain a hashkey for each file that was
written to the disk object store, which was generated by the latter and
that can be used to retrieve the content of the file.

The migration is performed in a single database migration, meaning that
everything is executed in a single transaction. Either the entire
migration succeeds or in case of an error, it is a no-op. This why the
migration will not delete the legacy file repository in the same
migration.

The advantage of this approach is that now there is no risk of data loss
and/or corruption. If the migration fails, all original data will be in
tact. The downside, however, is that the content of the file repository
is more or less duplicated. This means that the migration will require a
potentially significant amount of disk space that is at worst equal to
the size of the existing file repository. This should be the upper limit
since the disk object store will both deduplicate as well as compress
the content that is written.
  • Loading branch information
sphuber committed Oct 11, 2020
1 parent 3a35690 commit 6df445e
Show file tree
Hide file tree
Showing 6 changed files with 465 additions and 10 deletions.
53 changes: 53 additions & 0 deletions aiida/backends/djsite/db/migrations/0047_migrate_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
# pylint: disable=invalid-name,too-few-public-methods
"""Migrate the file repository to the new disk object store based implementation."""
# pylint: disable=no-name-in-module,import-error
from django.core.exceptions import ObjectDoesNotExist
from django.db import migrations

from aiida.backends.djsite.db.migrations import upgrade_schema_version
from aiida.backends.general.migrations import utils
from aiida.cmdline.utils import echo

REVISION = '1.0.47'
DOWN_REVISION = '1.0.46'


def migrate_repository(apps, _):
"""Migrate the repository."""
DbNode = apps.get_model('db', 'DbNode')

mapping_node_repository_metadata = utils.migrate_legacy_repository()

if mapping_node_repository_metadata is None:
return

for node_uuid, repository_metadata in mapping_node_repository_metadata.items():
try:
node = DbNode.objects.get(uuid=node_uuid)
except ObjectDoesNotExist:
echo.echo_warning(f'repo contained folder for Node<{node_uuid}>, but the node does not exist, skipping.')
else:
node.repository_metadata = repository_metadata
node.save()


class Migration(migrations.Migration):
"""Migrate the file repository to the new disk object store based implementation."""

dependencies = [
('db', '0046_add_node_repository_metadata'),
]

operations = [
migrations.RunPython(migrate_repository, reverse_code=migrations.RunPython.noop),
upgrade_schema_version(REVISION, DOWN_REVISION),
]
2 changes: 1 addition & 1 deletion aiida/backends/djsite/db/migrations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class DeserializationException(AiidaException):
pass


LATEST_MIGRATION = '0046_add_node_repository_metadata'
LATEST_MIGRATION = '0047_migrate_repository'


def _update_schema_version(version, apps, _):
Expand Down
209 changes: 201 additions & 8 deletions aiida/backends/general/migrations/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,207 @@
# pylint: disable=invalid-name
"""Various utils that should be used during migrations and migrations tests because the AiiDA ORM cannot be used."""
import datetime
import errno
import functools
import io
import os
import pathlib
import re
import typing

import numpy
from disk_objectstore import Container
from disk_objectstore.utils import LazyOpener

from aiida.cmdline.utils import echo
from aiida.common import json
from aiida.repository.backend import AbstractRepositoryBackend
from aiida.repository.common import File, FileType
from aiida.repository.repository import Repository

ISOFORMAT_DATETIME_REGEX = re.compile(r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+(\+\d{2}:\d{2})?$')
REGEX_SHARD_SUB_LEVEL = re.compile(r'^[0-9a-f]{2}$')
REGEX_SHARD_FINAL_LEVEL = re.compile(r'^[0-9a-f-]{32}$')


class LazyFile(File):
"""Subclass of `File` where `key` also allows `LazyOpener` in addition to a string."""

def __init__(
self,
name: str = '',
file_type: FileType = FileType.DIRECTORY,
key: typing.Union[str, None, LazyOpener] = None,
objects: typing.Dict[str, 'File'] = None
):
# pylint: disable=super-init-not-called
if not isinstance(name, str):
raise TypeError('name should be a string.')

if not isinstance(file_type, FileType):
raise TypeError('file_type should be an instance of `FileType`.')

if key is not None and not isinstance(key, (str, LazyOpener)):
raise TypeError('key should be `None` or a string.')

if objects is not None and any([not isinstance(obj, self.__class__) for obj in objects.values()]):
raise TypeError('objects should be `None` or a dictionary of `File` instances.')

if file_type == FileType.DIRECTORY and key is not None:
raise ValueError('an object of type `FileType.DIRECTORY` cannot define a key.')

if file_type == FileType.FILE and objects is not None:
raise ValueError('an object of type `FileType.FILE` cannot define any objects.')

self._name = name
self._file_type = file_type
self._key = key
self._objects = objects or {}


class MigrationRepository(Repository):
"""Subclass of `Repository` that uses `LazyFile` instead of `File` as its file class."""

_file_cls = LazyFile


class NoopRepositoryBackend(AbstractRepositoryBackend):
"""Implementation of the ``AbstractRepositoryBackend`` where all write operations are no-ops."""

def put_object_from_filelike(self, handle: io.BufferedIOBase) -> str:
"""Store the byte contents of a file in the repository.
:param handle: filelike object with the byte content to be stored.
:return: the generated fully qualified identifier for the object within the repository.
:raises TypeError: if the handle is not a byte stream.
"""
return LazyOpener(handle.name)

def has_object(self, key: str) -> bool:
"""Return whether the repository has an object with the given key.
:param key: fully qualified identifier for the object within the repository.
:return: True if the object exists, False otherwise.
"""
raise NotImplementedError()


def migrate_legacy_repository():
"""Migrate the legacy file repository to the new disk object store and return mapping of repository metadata.
The format of the return value will be a dictionary where the keys are the UUIDs of the nodes whose repository
folder has contents have been migrated to the disk object store. The values are the repository metadata that contain
the keys for the generated files with which the files in the disk object store can be retrieved. The format of the
repository metadata follows exactly that of what is generated normally by the ORM.
:return: mapping of node UUIDs onto the new repository metadata.
"""
# pylint: disable=too-many-locals
from aiida.manage.configuration import get_profile

profile = get_profile()
backend = NoopRepositoryBackend()
container = profile.get_repository_container()
repository = MigrationRepository(backend=backend)

# Initialize the new container: don't go through the profile, because that will not check if it already exists
filepath = pathlib.Path(profile.repository_path) / 'container'
container = Container(filepath)

if not container.is_initialised:
raise RuntimeError(f'the container {filepath} already exists.')

basepath = pathlib.Path(profile.repository_path) / 'repository' / 'node'

if not basepath.is_dir():
echo.echo_warning(f'could not find the repository basepath {basepath}: nothing to migrate')
return

node_repository_dirpaths = get_node_repository_dirpaths(basepath)

filepaths = []
streams = []
mapping_metadata = {}

for node_uuid, node_dirpath in node_repository_dirpaths.items():
repository.put_object_from_tree(node_dirpath)
metadata = serialize_repository(repository)
mapping_metadata[node_uuid] = metadata
for root, _, filenames in repository.walk():
for filename in filenames:
parts = list(pathlib.Path(root / filename).parts)
filepaths.append((node_uuid, parts))
streams.append(functools.reduce(lambda objects, part: objects['o'].get(part), parts, metadata)['k'])

# Reset the repository to a clean node repository, which removes the internal virtual file hierarchy
repository.reset()

hashkeys = container.add_streamed_objects_to_pack(streams, compress=True, open_streams=True)

# Regroup by node UUID
for hashkey, (node_uuid, parts) in zip(hashkeys, filepaths):
repository_metadata = mapping_metadata[node_uuid]
filename = parts.pop()
file_object = repository_metadata['o']
for part in parts:
file_object = file_object[part]['o']
file_object[filename]['k'] = hashkey

return mapping_metadata


def get_node_repository_dirpaths(basepath):
"""Return a mapping of node UUIDs onto the path to their current repository folder in the old repository.
:return: dictionary of node UUID onto absolute filepath
"""
mapping = {}

for shard_one in basepath.iterdir():

if not REGEX_SHARD_SUB_LEVEL.match(shard_one.name):
continue

for shard_two in shard_one.iterdir():

if not REGEX_SHARD_SUB_LEVEL.match(shard_two.name):
continue

for shard_three in shard_two.iterdir():

if not REGEX_SHARD_FINAL_LEVEL.match(shard_three.name):
continue

uuid = shard_one.name + shard_two.name + shard_three.name
dirpath = basepath / shard_one / shard_two / shard_three
subdirs = [path.name for path in dirpath.iterdir()]

if 'path' in subdirs:
path = dirpath / 'path'
elif 'raw_input' in subdirs:
path = dirpath / 'raw_input'
else:
echo.echo_warning(
f'skipping node repository folder {dirpath} as it does not contain `path` nor `raw_input`'
)

mapping[uuid] = path

return mapping


def serialize_repository(repository: Repository) -> dict:
"""Serialize the metadata into a JSON-serializable format.
.. note:: the serialization format is optimized to reduce the size in bytes.
:return: dictionary with the content metadata.
"""
file_object = repository._directory # pylint: disable=protected-access
if file_object.file_type == FileType.DIRECTORY:
if file_object.objects:
return {'o': {key: obj.serialize() for key, obj in file_object.objects.items()}}
return {}
return {'k': file_object.key}


def ensure_repository_folder_created(uuid):
Expand All @@ -27,12 +219,7 @@ def ensure_repository_folder_created(uuid):
:param uuid: UUID of the node
"""
dirpath = get_node_repository_sub_folder(uuid)

try:
os.makedirs(dirpath)
except OSError as exception:
if exception.errno != errno.EEXIST:
raise
os.makedirs(dirpath, exist_ok=True)


def put_object_from_string(uuid, name, content):
Expand All @@ -43,7 +230,13 @@ def put_object_from_string(uuid, name, content):
:param content: the content to write to the file
"""
ensure_repository_folder_created(uuid)
filepath = os.path.join(get_node_repository_sub_folder(uuid), name)
basepath = get_node_repository_sub_folder(uuid)
dirname = os.path.dirname(name)

if dirname:
os.makedirs(os.path.join(basepath, dirname), exist_ok=True)

filepath = os.path.join(basepath, name)

with open(filepath, 'w', encoding='utf-8') as handle:
handle.write(content)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# -*- coding: utf-8 -*-
# pylint: disable=invalid-name,no-member
"""Migrate the file repository to the new disk object store based implementation.
Revision ID: 1feaea71bd5a
Revises: 7536a82b2cc4
Create Date: 2020-10-01 15:05:49.271958
"""
from alembic import op
from sqlalchemy import Integer, cast
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.sql import table, column

from aiida.backends.general.migrations import utils

# revision identifiers, used by Alembic.
revision = '1feaea71bd5a'
down_revision = '7536a82b2cc4'
branch_labels = None
depends_on = None


def upgrade():
"""Migrations for the upgrade."""
connection = op.get_bind()

DbNode = table(
'db_dbnode',
column('id', Integer),
column('uuid', UUID),
column('repository_metadata', JSONB),
)

mapping_node_repository_metadata = utils.migrate_legacy_repository()

if mapping_node_repository_metadata is None:
return

for node_uuid, repository_metadata in mapping_node_repository_metadata.items():
value = cast(repository_metadata, JSONB)
connection.execute(DbNode.update().where(DbNode.c.uuid == node_uuid).values(repository_metadata=value))


def downgrade():
"""Migrations for the downgrade."""
Loading

0 comments on commit 6df445e

Please sign in to comment.