Skip to content

Commit

Permalink
Archive refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
nmassey001 committed Sep 18, 2024
1 parent 7762644 commit dafe58f
Show file tree
Hide file tree
Showing 30 changed files with 438 additions and 139 deletions.
8 changes: 8 additions & 0 deletions nlds/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,11 @@ class CallbackError(BaseException):

def __init__(self, *args: object) -> None:
super().__init__(*args)

class MessageError(Exception):
def __init__(self, message, *args):
super().__init__(args)
self.message = message

def __str__(self):
return self.message
9 changes: 2 additions & 7 deletions nlds/rabbit/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,13 @@ def __setup_queues(self, queue: str = None):
raise ValueError("No rabbit queues found in config.")

if queue not in [q.name for q in self.queues]:
raise ValueError("Requested queue not in configuration.")
raise ValueError(f"Requested queue {queue} not in configuration.")

else:
raise ValueError("No queue specified, switching to default " "config.")

except ValueError as e:
raise Exception(e)
print("Using default queue config - only fit for testing purposes.")
self.name = self.DEFAULT_QUEUE_NAME
self.queues = [
Expand Down Expand Up @@ -206,12 +207,6 @@ def load_config_value(self, config_option: str, path_listify_fl: bool = False):
try:
return_val = self.consumer_config[config_option]
if path_listify_fl:
# TODO: (2022-02-17) This is very specific to the use-case
# of the indexer, could potentially be divided up into
# listify and convert functions, but that's probably only
# necessary if we refactor this into Consumer – which is
# probably a good idea when we start fleshing out other
# consumers
return_val_list = self.consumer_config[config_option]
# Make sure returned value is a list and not a string
# Note: it can't be any other iterable because it's loaded
Expand Down
2 changes: 1 addition & 1 deletion nlds/rabbit/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ def publish_message(
# For any Undelivered messages attempt to send again
logger.error(
"Message delivery was not confirmed, wasn't delivered "
f"properly (rk = {routing_key}). Attempting retry..."
f"properly (rk = {routing_key})."
)
logger.debug(f"{type(e).__name__}: {e}")
# NOTE: don't reraise in this case, can cause an infinite loop as
Expand Down
19 changes: 0 additions & 19 deletions nlds/templates/server_config.j2
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,12 @@
"max_bytes": "{{ logging_max_bytes }}",
"backup_count": "{{ logging_backup_count }}"
},
"general": {
"retry_delays": "{{ general_retry }}"
},
"nlds_q": {
"logging": "{{ nlds_q_logging_dict }}",
"max_retries": "{{ nlds_q_max_retries }}",
"retry_delays": "{{ nlds_q_retry_delays }}",
"print_tracebacks_fl": "{{ nlds_q_print_tracebacks_fl }}"
},
"index_q": {
"logging": "{{ index_q_logging_dict }}",
"max_retries": "{{ index_q_max_retries }}",
"retry_delays": "{{ index_q_retry_delays }}",
"print_tracebacks_fl": "{{ index_q_print_tracebacks_fl }}",
"filelist_max_length": "{{ index_q_filelist_max_length }}",
"message_threshold": "{{ index_q_message_threshold }}",
Expand All @@ -63,8 +56,6 @@
},
"catalog_q": {
"logging": "{{ catalog_q_logging_dict }}",
"max_retries": "{{ catalog_q_max_retries }}",
"retry_delays": "{{ catalog_q_retry_delays }}",
"print_tracebacks_fl": "{{ catalog_q_print_tracebacks_fl }}",
"db_engine": "{{ catalog_q_db_engine }}",
"db_options": {
Expand All @@ -78,8 +69,6 @@
},
"monitor_q": {
"logging": "{{ monitor_q_logging_dict }}",
"max_retries": "{{ monitor_q_max_retries }}",
"retry_delays": "{{ monitor_q_retry_delays }}",
"print_tracebacks_fl": "{{ monitor_q_print_tracebacks_fl }}",
"db_engine": "{{ monitor_q_db_engine }}",
"db_options": {
Expand All @@ -91,8 +80,6 @@
},
"transfer_put_q": {
"logging": "{{ transfer_put_q_logging_dict }}",
"max_retries": "{{ transfer_put_q_max_retries }}",
"retry_delays": "{{ transfer_put_q_retry_delays }}",
"print_tracebacks_fl": "{{ transfer_put_q_print_tracebacks_fl }}",
"filelist_max_length": "{{ transfer_put_q_filelist_max_length }}",
"check_permissions_fl": "{{ transfer_put_q_check_permissions_fl }}",
Expand All @@ -101,8 +88,6 @@
},
"transfer_get_q": {
"logging": "{{ transfer_put_q_logging_dict }}",
"max_retries": "{{ transfer_put_q_max_retries }}",
"retry_delays": "{{ transfer_put_q_retry_delays }}",
"print_tracebacks_fl": "{{ transfer_put_q_print_tracebacks_fl }}",
"filelist_max_length": "{{ transfer_put_q_filelist_max_length }}",
"check_permissions_fl": "{{ transfer_put_q_check_permissions_fl }}",
Expand All @@ -117,8 +102,6 @@
},
"archive_put_q": {
"logging": "{{ archive_put_q_logging_dict }}",
"max_retries": "{{ archive_put_q_max_retries }}",
"retry_delays": "{{ archive_put_q_retry_delays }}",
"print_tracebacks_fl": "{{ archive_put_q_print_tracebacks_fl }}",
"tenancy": "{{ archive_put_q_tenancy }}",
"check_permissions_fl": "{{ archive_put_q_check_permissions_fl }}",
Expand All @@ -130,8 +113,6 @@
},
"archive_get_q": {
"logging": "{{ archive_get_q_logging_dict }}",
"max_retries": "{{ archive_get_q_max_retries }}",
"retry_delays": "{{ archive_get_q_retry_delays }}",
"print_tracebacks_fl": "{{ archive_get_q_print_tracebacks_fl }}",
"tenancy": "{{ archive_get_q_tenancy }}",
"check_permissions_fl": "{{ archive_get_q_check_permissions_fl }}",
Expand Down
9 changes: 9 additions & 0 deletions nlds_processors/archiver/adler32file.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
"""
adler32file.py
"""
__author__ = "Neil Massey"
__date__ = "18 Sep 2024"
__copyright__ = "Copyright 2024 United Kingdom Research and Innovation"
__license__ = "BSD - see LICENSE file in top-level package directory"
__contact__ = "[email protected]"

from zlib import adler32


Expand Down
9 changes: 2 additions & 7 deletions nlds_processors/archiver/archive_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from abc import ABC, abstractmethod
import json
from typing import List, Dict
from typing import List, Dict, Tuple
from enum import Enum

from nlds_processors.transferers.base_transfer import (
Expand Down Expand Up @@ -119,11 +119,6 @@ def callback(self, ch, method, properties, body, connection):
self.log(str(e), RK.LOG_DEBUG)
raise

self.log(
f"Starting tape transfer between {tape_url} and object store " f"{tenancy}",
RK.LOG_INFO,
)

# Append route info to message to track the route of the message
body_dict = self.append_route_info(body_dict)

Expand Down Expand Up @@ -155,7 +150,7 @@ def get_tape_config(self, body_dict) -> Tuple:
# or the server_config - exit if not.
if tape_url is None:
reason = (
"No tenancy specified at server- or request-level, exiting " "callback."
"No tape_url specified at server- or request-level, exiting callback."
)
self.log(reason, RK.LOG_ERROR)
raise ArchiveError(reason)
Expand Down
61 changes: 39 additions & 22 deletions nlds_processors/archiver/archive_put.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@

from nlds.rabbit.consumer import State
from nlds.details import PathDetails
from nlds.errors import CallbackError
import nlds.rabbit.routing_keys as RK
import nlds.rabbit.message_keys as MSG

Expand Down Expand Up @@ -58,14 +57,25 @@ def transfer(
# Create the S3 to tape or disk streamer
try:
if USE_DISKTAPE:
disk_loc = os.path.expanduser("~/DISKTAPE")
self.log(
f"Starting disk transfer between {disk_loc} and object store "
f"{tenancy}",
RK.LOG_INFO,
)
streamer = S3ToTarfileDisk(
s3_tenancy=tenancy,
s3_access_key=access_key,
s3_secret_key=secret_key,
disk_location=os.path.expanduser("~/DISKTAPE"),
disk_location=disk_loc,
logger=self.log,
)
else:
self.log(
f"Starting tape transfer between {tape_url} and object store "
f"{tenancy}",
RK.LOG_INFO,
)
streamer = S3ToTarfileTape(
s3_tenancy=tenancy,
s3_access_key=access_key,
Expand All @@ -74,29 +84,36 @@ def transfer(
logger=self.log,
)
except S3StreamError as e:
raise CallbackError(e)

# NOTE: For the purposes of tape reading and writing, the holding prefix
# has 'nlds.' prepended
holding_prefix = self.get_holding_prefix(body_json)

try:
completelist, failedlist, checksum = streamer.put(holding_prefix, filelist)
except (S3StreamError) as e:
raise CallbackError(e)
# assign the return data
body_json[MSG.DATA][MSG.CHECKSUM] = checksum
self.completelist.extend(completelist)
self.failedlist.extend(failedlist)

self.log(
"Archive complete, passing lists back to worker for re-routing and "
"cataloguing.",
RK.LOG_INFO,
)
# if a S3StreamError occurs then all files have failed
for path_details in filelist:
path_details.failure_reason = e.message
self.failedlist.append(path_details)
checksum = None
else:
# NOTE: For the purposes of tape reading and writing, the holding prefix
# has 'nlds.' prepended
holding_prefix = self.get_holding_prefix(body_json)

try:
self.completelist, self.failedlist, checksum = streamer.put(
holding_prefix, filelist
)
except S3StreamError as e:
# if a S3StreamError occurs then all files have failed
for path_details in filelist:
path_details.failure_reason = e.message
self.failedlist.append(path_details)
checksum = None
# assign the return data
body_json[MSG.DATA][MSG.CHECKSUM] = checksum

# Send whatever remains after all items have been put
if len(self.completelist) > 0:
self.log(
"Archive complete, passing lists back to worker for re-routing and "
"cataloguing.",
RK.LOG_INFO,
)
self.send_pathlist(
self.completelist, rk_complete, body_json, state=State.ARCHIVE_PUTTING
)
Expand Down
45 changes: 24 additions & 21 deletions nlds_processors/archiver/s3_to_tarfile_disk.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,13 @@
"""
s3_to_tarfile_disk.py
"""

__author__ = "Neil Massey"
__date__ = "18 Sep 2024"
__copyright__ = "Copyright 2024 United Kingdom Research and Innovation"
__license__ = "BSD - see LICENSE file in top-level package directory"
__contact__ = "[email protected]"

import os
from typing import List
from zlib import adler32
Expand All @@ -24,7 +34,7 @@ def __init__(
s3_tenancy: str,
s3_access_key: str,
s3_secret_key: str,
disk_loc: str,
disk_location: str,
logger,
) -> None:
# Initialise the S3 client first
Expand All @@ -36,7 +46,7 @@ def __init__(
)
# record and make the disk location directory if it doesn't exist
try:
self.disk_loc = os.path.expanduser(disk_loc)
self.disk_loc = os.path.expanduser(disk_location)
os.mkdir(self.disk_loc)
except FileExistsError:
# it's okay if the path already exists
Expand Down Expand Up @@ -68,11 +78,9 @@ def put(self, holding_prefix: str, filelist: List[PathDetails]):

try:
# open the tarfile to write to
file = open(self.tarfile_diskpath, 'wb')
file = open(self.tarfile_diskpath, "wb")
file_object = Adler32File(file, debug_fl=False)
completelist, failedlist, checksum = self._stream_to_fileobject(
file_object
)
completelist, failedlist, checksum = self._stream_to_fileobject(file_object)
except FileExistsError:
msg = (
f"Couldn't create tarfile ({self.tarfile_diskpath}). File already "
Expand All @@ -95,26 +103,21 @@ def put(self, holding_prefix: str, filelist: List[PathDetails]):
)
self.log(msg, RK.LOG_ERROR)
self._remove_tarfile_from_disktape()
# need to set all completed_files to failed
failedlist.extend(completelist)
completelist.clear()
raise S3StreamError(msg)

# now verify the checksum
try:
self._validate_tarfile_checksum(checksum)
except S3StreamError as e:
msg = (f"Exception occurred during validation of tarfile "
f"{self.tarfile_tapepath}. Original exception: {e}")
msg = (
f"Exception occurred during validation of tarfile "
f"{self.tarfile_tapepath}. Original exception: {e}"
)
self.log(msg, RK.LOG_ERROR)
self._remove_tarfile_from_disktape()
# need to set all completed_files to failed
failedlist.extend(completelist)
completelist.clear()
raise S3StreamError(msg)
return completelist, failedlist, checksum


def holding_diskpath(self):
"""Get the holding diskpath (i.e. the enclosing directory) on the DISKTAPE"""
assert self.disk_loc
Expand All @@ -131,13 +134,13 @@ def tarfile_diskpath(self):
def _validate_tarfile_checksum(self, tarfile_checksum: str):
"""Calculate the Adler32 checksum of the tarfile and compare it to the checksum
calculated when streaming from the S3 server to the DISKTAPE"""
blocksize = 256*1024*1024
blocksize = 256 * 1024 * 1024
asum = 1
with open(self.tarfile_diskpath, 'rb') as fh:
while (data := fh.read(blocksize)):
with open(self.tarfile_diskpath, "rb") as fh:
while data := fh.read(blocksize):
asum = adler32(data, asum)
try:
assert(asum == tarfile_checksum)
assert asum == tarfile_checksum
except AssertionError as e:
reason = (
f"Checksum {asum} differs from that calculated during streaming "
Expand All @@ -147,7 +150,7 @@ def _validate_tarfile_checksum(self, tarfile_checksum: str):
raise S3StreamError(
f"Failure occurred during DISKTAPE-write " f"({reason})."
)

def _remove_tarfile_from_disktape(self):
"""On failure, remove tarfile from disk"""
try:
Expand Down
Loading

0 comments on commit dafe58f

Please sign in to comment.