diff --git a/reports/duplicates/accruals.py b/reports/duplicates/accruals.py new file mode 100644 index 00000000..91b669f9 --- /dev/null +++ b/reports/duplicates/accruals.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +from __future__ import print_function, unicode_literals + +import copy +import logging +import os +import sys + +try: + from .appconfig import AppConfig + from .digital_object import DigitalObject + from . import duplicates + from . import loggingconfig + from .serialize_to_csv import CSVOut + from . import utils +except ValueError: + from appconfig import AppConfig + from digital_object import DigitalObject + import duplicates + import loggingconfig + from serialize_to_csv import CSVOut + import utils + +logging_dir = os.path.dirname(os.path.abspath(__file__)) + +logger = logging.getLogger("accruals") +logger.disabled = False + +# Location purpose = Transfer Source (TS) +location_purpose = "TS" +default_location = "accruals" + + +# Do something with this... +DOCKER = True + +# Store our appraisal paths. +accrual_paths = [] + + +def create_manifest(aip_index, accrual_objs): + """do something.""" + dupes = [] + aip_obj_hashes = aip_index.get(duplicates.MANIFEST_DATA) + for accrual_obj in accrual_objs: + for accrual_hash in accrual_obj.hashes: + if accrual_hash in aip_obj_hashes.keys(): + for _, aip_items in aip_obj_hashes.items(): + for item in aip_items: + if accrual_obj == item: + cp = copy.copy(accrual_obj) + cp.package_name = item.package_name + dupes.append(cp) + # Only need one hash to match then break. + # May also be redundant as we only have one hash from the + # bag manifests... + break + return dupes + + +def create_comparison_obj(transfer_path): + """Do something.""" + transfer_arr = [] + for root, dirs, files in os.walk(transfer_path, topdown=True): + for name in files: + file_ = os.path.join(root, name) + if os.path.isfile(file_): + transfer_arr.append(DigitalObject(file_, transfer_path)) + return transfer_arr + + +def stat_transfers(accruals_path, all_transfers): + """Retrieve all transfer paths and make a request to generate statistics + about all the objects in that transfer path. + """ + aip_index = duplicates.retrieve_aip_index() + reports = [] + transfers = [] + for transfer in all_transfers: + transfer_home = os.path.join(accruals_path, transfer) + if DOCKER: + transfer_home = utils.get_docker_path(transfer_home) + objs = create_comparison_obj(transfer_home) + transfers.append(objs) + reports.append({transfer: create_manifest(aip_index, objs)}) + CSVOut.stat_manifests(aip_index, transfers) + CSVOut.csv_out(reports, "") + + +def main(location=default_location): + """Primary entry point for this script.""" + + am = AppConfig().get_am_client() + sources = am.list_storage_locations() + + accruals = False + for source in sources.get("objects"): + if ( + source.get("purpose") == location_purpose + and source.get("description") == location + ): + """do something.""" + am.transfer_source = source.get("uuid") + am.transfer_path = source.get("path") + accruals = True + if not accruals: + logger.info("Exiting. No transfer source: {}".format(location)) + sys.exit() + + # All transfer directories. Assumption is the same as Archivematica that + # each transfer is organized into a single directory at this level. + all_transfers = am.transferables().get("directories") + stat_transfers(am.transfer_path, all_transfers) + + +if __name__ == "__main__": + loggingconfig.setup("INFO", os.path.join(logging_dir, "report.log")) + source = default_location + try: + source = sys.argv[1:][0] + logger.error("Attempting to find transfers at: %s", source) + except IndexError: + pass + sys.exit(main(source)) diff --git a/reports/duplicates/digital_object.py b/reports/duplicates/digital_object.py new file mode 100644 index 00000000..72ece0d3 --- /dev/null +++ b/reports/duplicates/digital_object.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +"""Digital object class to help with matching.""" + +import json +import os +import time + +try: + from . import hashutils +except ImportError: + import hashutils + + +class DigitalObjectException(Exception): + """If there's a problem raise this.""" + + +class DigitalObject(object): + + # Object members. + basename = None + date_modified = None + dirname = None + filepath = None + hashes = None + package_uuid = None + package_name = None + + def __init__(self, path=None, transfer_path=None): + """Populate the digital object metadata. If we don't supply a path + we'll just return an empty object to be populated on our own terms. + """ + if not path: + self.basename = None + self.date_modified = None + self.dirname = None + self.filepath = None + self.hashes = [] + self.package_uuid = None + self.package_name = None + + if path: + if not transfer_path: + raise DigitalObjectException("Transfer path isn't set") + # Construct path as if it is in a Bag object. + comparison_path = path.replace( + transfer_path, os.path.join("data", "objects") + ) + self.filepath = comparison_path + self.set_basename(comparison_path) + self.set_dirname(comparison_path) + self.hashes = hashutils.hash(path) + self.date_modified = self.get_timestamp(path) + + def set_basename(self, path): + """do something.""" + self.basename = os.path.basename(path) + + def set_dirname(self, path): + """do something.""" + self.dirname = os.path.dirname(path) + + def as_dict(self): + return self.__dict__ + + def __str__(self): + """Let's override this!""" + return json.dumps( + self.__dict__, sort_keys=True, indent=4, separators=(",", ": ") + ) + + def __eq__(self, other): + """Comparison operator for the digital object class. If two hashes + match, and the given file path, we will return True. + """ + ret = False + for key in self.hashes.keys(): + if key in other.hashes.keys(): + ret = True + break + if self.filepath != other.filepath: + ret = False + if self.date_modified != other.date_modified: + ret = False + return ret + + def __mod__(self, other): + """Modulo operator for the digital object class. If two hashes match, + and the given file-path, then return zero. If there is any partial + match, then return basis information. % is potentially useful for + debugging, or enhanced reporting. + """ + if self.__eq__(other): + return 0 + # ret is False, repurpose to return basis information. + ret = "" + if self.date_modified == other.date_modified: + msg = "date modified match" + ret = self.__concat_basis__(ret, msg) + if self.basename == other.basename: + msg = "filename match" + ret = self.__concat_basis__(ret, msg) + if self.dirname == other.dirname: + msg = "directory name match" + ret = self.__concat_basis__(ret, msg) + if not ret: + return "No matching components" + return ret + + @staticmethod + def __concat_basis__(ret, msg): + """Helper function to bring basis information together usefully.""" + if ret: + return "{}; {}".format(ret, msg) + return msg + + @staticmethod + def get_timestamp(path): + """do something.""" + return time.strftime("%Y-%m-%d", time.localtime(os.path.getmtime(path))) diff --git a/reports/duplicates/duplicates.py b/reports/duplicates/duplicates.py index 69a08068..4c65ea0a 100644 --- a/reports/duplicates/duplicates.py +++ b/reports/duplicates/duplicates.py @@ -57,24 +57,25 @@ from __future__ import print_function, unicode_literals import logging -import json import os import shutil import sys from tempfile import mkdtemp try: + from .digital_object import DigitalObject + from . import hashutils from . import loggingconfig from .appconfig import AppConfig from .parsemets import read_premis_data - from .serialize_to_csv import CSVOut + from . import utils except ValueError: + from digital_object import DigitalObject + import hashutils import loggingconfig from appconfig import AppConfig from parsemets import read_premis_data - from serialize_to_csv import CSVOut - -import utils + import utils logging_dir = os.path.dirname(os.path.abspath(__file__)) @@ -84,13 +85,11 @@ logger.disabled = False -class ExtractError(Exception): - """Custom exception for handling extract errors.""" +MANIFEST_DATA = "manifest_data" -def json_pretty_print(json_string): - """Pretty print a JSON string.""" - return json.dumps(json_string, sort_keys=True, indent=4) +class ExtractError(Exception): + """Custom exception for handling extract errors.""" def retrieve_file(am, package_uuid, save_as_loc, relative_path): @@ -136,15 +135,17 @@ def filter_aip_files(filepath, package_uuid): def augment_data(package_uuid, duplicate_report, date_info): + """do something.""" manifest_data = duplicate_report.get("manifest_data", {}) for key, value in manifest_data.items(): for package in value: - if package_uuid != package.get("package_uuid", ""): + if package_uuid != package.package_uuid: continue for dates in date_info: - path_ = package.get("filepath", "").strip(os.path.join("data", "")) + path_ = package.filepath.replace(os.path.join("data", ""), "") if path_ == dates.get("filepath", ""): - package["date_modified"] = dates.get("date_modified", "") + package.date_modified = dates.get("date_modified", "") + break def read_mets(mets_loc): @@ -157,7 +158,6 @@ def retrieve_mets(am, duplicate_report, temp_dir): information. """ for key, value in duplicate_report.get("packages", {}).items(): - """do nothing""" package_uuid = key package_name = value mets = "{}/data/METS.{}.xml".format(package_name, package_uuid) @@ -172,42 +172,28 @@ def retrieve_mets(am, duplicate_report, temp_dir): continue -def filter_duplicates(duplicate_report): +def filter_packages(duplicate_report): """Filter our report for packages containing duplicates only.""" - dupes = dict(duplicate_report.get("manifest_data", {})) + logger.info("Filtering duplicates only...") packages = {} - for key, values in dupes.items(): - if len(values) > 1: - for entry in values: - packages[entry.get("package_uuid")] = entry.get("package_name") - else: - try: - duplicate_report.get("manifest_data", {}).pop(key) - logger.info("Popped checksum: %s", key) - except (AttributeError, KeyError): - raise ExtractError("Error filtering report for duplicates") + for key, values in duplicate_report.get(MANIFEST_DATA, "").items(): + for entry in values: + packages[entry.package_uuid] = entry.package_name duplicate_report["packages"] = packages return duplicate_report -def output_report(duplicate_report): - """Provide mechanisms to output different serializations.""" - with open("aipstore-duplicates.json", "w") as json_file: - json_file.write(json_pretty_print(duplicate_report)) - print(json_pretty_print(duplicate_report)) - CSVOut.csv_out(duplicate_report, "aipstore-duplicates.csv") +def output_report(aip_report): + print("We still need to implement this") -def main(): +def retrieve_aip_index(): """Script's primary entry-point.""" temp_dir = mkdtemp() - loggingconfig.setup("INFO", os.path.join(logging_dir, "report.log")) am = AppConfig().get_am_client() # Maintain state of all values across the aipstore. duplicate_report = {} manifest_data = {} - # Checksum algorithms to test for. - checksum_algorithms = ("md5", "sha1", "sha256") # Get all AIPS that the storage service knows about. aips = am.aips() for aip in aips: @@ -216,7 +202,7 @@ def main(): # TODO: make this more accurate... package_name = package_name.replace(ext, "") package_uuid = aip.get("uuid") - for algorithm in checksum_algorithms: + for algorithm in hashutils.checksum_algorithms: # Store our manifest somewhere. relative_path = "{}/manifest-{}.txt".format(package_name, algorithm) save_path = "{}-manifest-{}.txt".format(package_name, algorithm) @@ -239,23 +225,37 @@ def main(): # entry. checksum, filepath = line.split(" ", 1) if not filter_aip_files(filepath, package_uuid): - entry = {} filepath = filepath.strip() - entry["package_uuid"] = am.package_uuid.strip() - entry["package_name"] = package_name.strip() - entry["filepath"] = filepath - entry["basename"] = os.path.basename(filepath) - entry["dirname"] = os.path.dirname(filepath) + obj = DigitalObject() + obj.package_uuid = am.package_uuid.strip() + obj.package_name = package_name.strip() + obj.filepath = filepath + obj.set_basename(filepath) + obj.set_dirname(filepath) + obj.hashes = {checksum.strip(): algorithm} manifest_data.setdefault(checksum.strip(), []) - manifest_data[checksum].append(entry) - duplicate_report["manifest_data"] = manifest_data - duplicate_report = filter_duplicates(duplicate_report) + manifest_data[checksum].append(obj) + duplicate_report[MANIFEST_DATA] = manifest_data + + # Add packages to report to make it easier to retrieve METS. + filter_packages(duplicate_report) + + # Retrieve METS and augment the data with date_modified information. retrieve_mets(am, duplicate_report, temp_dir) - # Save to JSON and CSV. - output_report(duplicate_report) + # Cleanup our temporary folder. shutil.rmtree(temp_dir) + # Return our complete AIP manifest to the caller + return duplicate_report + + +def main(): + """Script's primary entry-point.""" + report = retrieve_aip_index() + output_report(report) + if __name__ == "__main__": + loggingconfig.setup("INFO", os.path.join(logging_dir, "report.log")) sys.exit(main()) diff --git a/reports/duplicates/hashutils.py b/reports/duplicates/hashutils.py new file mode 100644 index 00000000..09b4b55f --- /dev/null +++ b/reports/duplicates/hashutils.py @@ -0,0 +1,23 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import hashlib + +# Checksum algorithms to test for based on allowed bag compressions in +# Archivematica. +checksum_functions = [hashlib.md5(), hashlib.sha1(), hashlib.sha256(), hashlib.sha512()] +checksum_algorithms = [algorithm.name for algorithm in checksum_functions] + + +def hash(fname): + """Run all the hashes available against a given file. Return a dictionary + allowing they consumer to look use the hash value (the key) and look up + the hash type if needed (value). + """ + hash_list = {} + for hash_func in checksum_functions: + with open(fname, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_func.update(chunk) + hash_list[hash_func.hexdigest()] = hash_func.name + return hash_list diff --git a/reports/duplicates/loggingconfig.py b/reports/duplicates/loggingconfig.py index c29d3247..91a027b4 100644 --- a/reports/duplicates/loggingconfig.py +++ b/reports/duplicates/loggingconfig.py @@ -25,7 +25,8 @@ def setup(log_level, log_file_name): }, }, "loggers": { - "duplicates": {"level": log_level, "handlers": ["console", "file"]} + "duplicates": {"level": log_level, "handlers": ["console", "file"]}, + "accruals": {"level": log_level, "handlers": ["console", "file"]}, }, } diff --git a/reports/duplicates/serialize_to_csv.py b/reports/duplicates/serialize_to_csv.py index fd174630..d65c3c09 100644 --- a/reports/duplicates/serialize_to_csv.py +++ b/reports/duplicates/serialize_to_csv.py @@ -1,13 +1,91 @@ # -*- coding: utf-8 -*- +import logging + from pandas import DataFrame +try: + from . import utils +except ImportError: + import utils + + +logger = logging.getLogger("accruals") + + +class CSVException(Exception): + """Exception to return if there is a problem generating the report.""" + class CSVOut: """Conveniently wrap CSV output capability.""" + @staticmethod + def stat_manifests(aip_index, transfers): + """Output some statistics about the transfer.""" + SUMMARY_FILE = "accruals_aip_store_summary.json" + MANIFEST_DATA = "manifest_data" + PACKAGES = "packages" + summary = {} + aipcount = 0 + aipdict = aip_index.get(MANIFEST_DATA) + keys = aipdict.keys() + for key in keys: + aipcount += len(aipdict.get(key, [])) + number_of_packages = len(aip_index.get(PACKAGES, {}).keys()) + summary["count_of_files_across_aips"] = aipcount + summary["number_of_aips"] = number_of_packages + logger.info( + "Number of files in '%s' AIPs in the AIP store: %s", + number_of_packages, + aipcount, + ) + summary["numer_of_transfers"] = len(transfers) + logger.info("Number of transfers: %s", len(transfers)) + for no, transfer in enumerate(transfers, 1): + summary["files_in_transfer-{}".format(no)] = len(transfer) + logger.info("Number of items in transfer %s: %s", no, len(transfer)) + print(utils.json_pretty_print(summary)) + with open(SUMMARY_FILE, "w") as summary_file: + summary_file.write(utils.json_pretty_print(summary)) + @staticmethod def csv_out(duplicate_report, filename): + """Copy of the original csv_out as we understand where this code is + going. + """ + accrual_comparison_csv = "accrual_to_aip_store_comparison.csv" + cols = [ + "path", + "in_transfer_name", + "hash", + "modified_date", + "already_in_package", + ] + csv = [] + for transfer in duplicate_report: + transfer_name = transfer.keys() + if len(transfer_name) > 1: + raise CSVException( + "Too many keys to deal with: {}".format(transfer_name) + ) + row_data = transfer.get(list(transfer_name)[0], {}) + for datum in row_data: + row = [] + row.append(datum.filepath) + row.append(list(transfer_name)[0]) + hash_ = list(datum.hashes.keys())[0] + row.append("{} ({})".format(hash_, datum.hashes[hash_])) + row.append(datum.date_modified) + row.append(datum.package_name) + csv.append(row) + df = DataFrame(csv, columns=cols) + df.sort_values(by=["in_transfer_name"]) + logger.error("Outputting report to: %s", accrual_comparison_csv) + df.to_csv(accrual_comparison_csv, index=None, header=True, encoding="utf8") + + @staticmethod + def _csv_out(duplicate_report, filename): """Output a CSV using Pandas and a bit of magic.""" dupes = duplicate_report.get("manifest_data", {}) cols = 0 @@ -59,4 +137,7 @@ def csv_out(duplicate_report, filename): cols_no_prefix = [x.split("_", 1)[1] for x in cols_no_suffix if "_" in x] cols_no_prefix = ["Checksum"] + cols_no_prefix df = df[cols] + # TODO: if cols_no_prefix is empty we can't do anything here... + if not cols_no_prefix: + return df.to_csv(filename, index=None, header=cols_no_prefix, encoding="utf8") diff --git a/reports/duplicates/tests/test_digital_object.py b/reports/duplicates/tests/test_digital_object.py new file mode 100644 index 00000000..0e07eaaf --- /dev/null +++ b/reports/duplicates/tests/test_digital_object.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import copy +import os +import sys + +duplicates_module = os.path.dirname((os.path.dirname(os.path.abspath(__file__)))) +sys.path.append(duplicates_module) + + +from digital_object import DigitalObject + + + +def test_equality(): + """do something.""" + + test_path = "data/objects/sub-dir-1/2" + obj = DigitalObject() + obj.set_basename(test_path) + obj.set_dirname(test_path) + obj.filepath = test_path + obj.hashes = {"d41d8cd98f00b204e9800998ecf8427e": "md5"} + obj.date_modified = "2018-08-14" + + assert obj == obj + assert obj % obj == 0 + + new_obj = copy.copy(obj) + new_obj.date_modified = "2018-08-16" + + assert new_obj != obj + assert new_obj % obj != 0 diff --git a/reports/duplicates/utils.py b/reports/duplicates/utils.py index d4acdcff..b6117d2d 100644 --- a/reports/duplicates/utils.py +++ b/reports/duplicates/utils.py @@ -1,4 +1,19 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- +import json + +# strip /home/ add to docker_home +# /home/appraise-accruals +DOCKER_HOME = "/home/ross-spencer/.am/ss-location-data/" EXTS = [".7z", ".tar.gz", ".tar.bz2"] + + +def get_docker_path(path): + """Return a path on the local machine relative to the transfer source.""" + return path.replace("/home/", DOCKER_HOME) + + +def json_pretty_print(json_string): + """Pretty print a JSON string.""" + return json.dumps(json_string, sort_keys=True, indent=4)