Skip to content

Commit

Permalink
Replace os.path in common package and its tests
Browse files Browse the repository at this point in the history
  • Loading branch information
klavman authored Mar 15, 2024
1 parent 7d62d76 commit 0d3f029
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 105 deletions.
6 changes: 2 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ repos:
files: |
(?x)^(
storage_service/storage_service/settings/.*\.py |
storage_service/common/gpgutils.py |
storage_service/common/startup.py |
storage_service/common/management/commands/populate_aip_checksums.py |
storage_service/common/management/commands/import_aip.py
storage_service/common/.*\.py |
tests/common/.*\.py
)
- repo: https://github.com/pre-commit/mirrors-eslint
rev: v8.56.0
Expand Down
146 changes: 77 additions & 69 deletions storage_service/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import logging
import mimetypes
import os
import pathlib
import re
import shutil
import subprocess
import tarfile
import uuid
from collections import deque
from collections import namedtuple

from administration import models
Expand Down Expand Up @@ -160,21 +162,23 @@ def download_file_stream(filepath, temp_dir=None):
Deletes temp_dir once stream created if it exists.
"""
file_path = pathlib.Path(filepath)

# If not found, return 404
if not os.path.exists(filepath):
if not file_path.exists():
return http.HttpResponseNotFound(_("File not found"))

filename = os.path.basename(filepath)
filename = file_path.name

# Open file in binary mode
response = http.FileResponse(open(filepath, "rb"))
response = http.FileResponse(file_path.open("rb"))

response["Content-type"] = get_mimetype(filename)
response["Content-Disposition"] = 'attachment; filename="' + filename + '"'
response["Content-Length"] = os.path.getsize(filepath)
response["Content-Length"] = file_path.stat().st_size

# Delete temp dir if created
if temp_dir and os.path.exists(temp_dir):
if temp_dir and pathlib.Path(temp_dir).exists():
shutil.rmtree(temp_dir, ignore_errors=True)

return response
Expand Down Expand Up @@ -347,28 +351,31 @@ def get_compress_command(compression, extract_path, basename, full_path):
`command` is the compression command (as a list of strings)
`compressed_filename` is the full path to the compressed file
"""
extract_path = pathlib.Path(extract_path) / basename
full_path = pathlib.Path(full_path)

if compression in (COMPRESSION_TAR, COMPRESSION_TAR_BZIP2, COMPRESSION_TAR_GZIP):
compressed_filename = os.path.join(extract_path, basename + TAR_EXTENSION)
relative_path = os.path.dirname(full_path)
compressed_filename = extract_path.with_suffix(TAR_EXTENSION)
relative_path = full_path.parent
algo = ""
if compression == COMPRESSION_TAR_BZIP2:
algo = "-j" # Compress with bzip2
compressed_filename += ".bz2"
compressed_filename = extract_path.with_suffix(TAR_EXTENSION + ".bz2")
elif compression == COMPRESSION_TAR_GZIP:
algo = "-z" # Compress with gzip
compressed_filename += ".gz"
compressed_filename = extract_path.with_suffix(TAR_EXTENSION + ".gz")
command = [
"tar",
"c", # Create tar
algo, # Optional compression flag
"-C",
relative_path, # Work in this directory
str(relative_path), # Work in this directory
"-f",
compressed_filename, # Output file
os.path.basename(full_path), # Relative path to source files
str(compressed_filename), # Output file
full_path.name, # Relative path to source files
]
elif compression in (COMPRESSION_7Z_BZIP, COMPRESSION_7Z_LZMA, COMPRESSION_7Z_COPY):
compressed_filename = os.path.join(extract_path, basename + ".7z")
compressed_filename = extract_path.with_suffix(".7z")
if compression == COMPRESSION_7Z_BZIP:
algo = COMPRESS_ALGO_BZIP2
elif compression == COMPRESSION_7Z_LZMA:
Expand All @@ -386,8 +393,8 @@ def get_compress_command(compression, extract_path, basename, full_path):
"-mtm=on",
"-mta=on", # Keep timestamps (create, mod, access)
"-mmt=on", # Multithreaded
compressed_filename, # Destination
full_path, # Source
str(compressed_filename), # Destination
str(full_path), # Source
]

else:
Expand All @@ -396,7 +403,7 @@ def get_compress_command(compression, extract_path, basename, full_path):
)

command = [_f for _f in command if _f]
return (command, compressed_filename)
return (command, str(compressed_filename))


def get_compressed_package_checksum(pointer_path):
Expand Down Expand Up @@ -591,10 +598,10 @@ def create_tar(path, extension=False):
:param path: Path to directory or file to tar (str)
:param extension: Flag indicating whether to add .tar extension (bool)
"""
path = path.rstrip("/")
tarpath = f"{path}{TAR_EXTENSION}"
changedir = os.path.dirname(tarpath)
source = os.path.basename(path)
path = pathlib.Path(path)
tarpath = path.with_suffix(TAR_EXTENSION)
changedir = tarpath.parent
source = path.name
cmd = ["tar", "-C", changedir, "-cf", tarpath, source]
LOGGER.info(
"creating archive of %s at %s, relative to %s", source, tarpath, changedir
Expand All @@ -608,21 +615,21 @@ def create_tar(path, extension=False):
raise TARException(fail_msg)

# Providing the TAR is successfully created then remove the original.
if os.path.isfile(tarpath) and tarfile.is_tarfile(tarpath):
if tarpath.is_file() and tarfile.is_tarfile(tarpath):
try:
shutil.rmtree(path)
except OSError:
# Remove a file-path as We're likely packaging a file, e.g. 7z.
os.remove(path)
path.unlink()
if not extension:
os.rename(tarpath, path)
tarpath.rename(path)
else:
raise TARException(fail_msg)

if not tarfile.is_tarfile(tarpath if extension else path):
raise TARException(fail_msg)

if os.path.exists(path if extension else tarpath):
if (path if extension else tarpath).exists():
raise TARException(fail_msg)


Expand All @@ -631,10 +638,10 @@ def extract_tar(tarpath):
:param tarpath: Path to tarfile to extract (str)
"""
newtarpath = tarpath
newtarpath = f"{tarpath}{TAR_EXTENSION}"
os.rename(tarpath, newtarpath)
changedir = os.path.dirname(newtarpath)
tarpath = pathlib.Path(tarpath)
newtarpath = tarpath.with_suffix(TAR_EXTENSION)
tarpath.rename(newtarpath)
changedir = newtarpath.parent
cmd = ["tar", "-xf", newtarpath, "-C", changedir]
try:
subprocess.check_output(cmd)
Expand All @@ -643,9 +650,9 @@ def extract_tar(tarpath):
"Failed to extract %(tarpath)s: %(error)s"
% {"tarpath": tarpath, "error": err}
)
os.rename(newtarpath, tarpath)
newtarpath.rename(tarpath)
raise TARException(fail_msg)
os.remove(newtarpath)
newtarpath.unlink()


# ########### OTHER ############
Expand All @@ -661,12 +668,13 @@ def generate_checksum(file_path, checksum_type="md5"):
If checksum_type is not a valid checksum, ValueError raised by hashlib.
"""
file_path = pathlib.Path(file_path)
checksum = hashlib.new(checksum_type)

if os.path.isdir(file_path):
if file_path.is_dir():
file_path = find_tagmanifest(file_path)

with open(file_path, "rb") as f:
with file_path.open("rb") as f:
for chunk in iter(lambda: f.read(128 * checksum.block_size), b""):
checksum.update(chunk)
return checksum
Expand All @@ -678,10 +686,10 @@ def find_tagmanifest(file_path):
If there are multiple, return the first of sha512, sha256, or md5,
respecting the BagIt spec's preference for sha512 or sha256, respectively.
"""
if not os.path.isdir(file_path):
if not file_path.is_dir():
return

bag_files = os.listdir(file_path)
bag_files = [file_.name for file_ in file_path.iterdir()]
tagmanifest_files = [
"tagmanifest-sha512.txt",
"tagmanifest-sha256.txt",
Expand All @@ -690,7 +698,7 @@ def find_tagmanifest(file_path):

for tagmanifest in tagmanifest_files:
if tagmanifest in bag_files:
return os.path.join(file_path, tagmanifest)
return file_path / tagmanifest


def uuid_to_path(uuid):
Expand All @@ -699,9 +707,9 @@ def uuid_to_path(uuid):
Every 4 alphanumeric characters of the UUID become a folder name."""
uuid = uuid.hex
path = [uuid[i : i + 4] for i in range(0, len(uuid), 4)]
path = os.path.join(*path)
path = pathlib.Path(*path)
LOGGER.debug("path %s", path)
return path
return str(path)


def removedirs(relative_path, base=None):
Expand All @@ -717,19 +725,15 @@ def removedirs(relative_path, base=None):
"""
if not base:
return os.removedirs(relative_path)

base_path = pathlib.Path(base)
full_path = base_path / relative_path
try:
os.rmdir(os.path.join(base, relative_path))
while full_path != base_path:
full_path.rmdir()
full_path = full_path.parent
except OSError:
pass
head, tail = os.path.split(relative_path)
if not tail:
head, tail = os.path.split(head)
while head and tail:
try:
os.rmdir(os.path.join(base, head))
except OSError:
break
head, tail = os.path.split(head)


def strip_quad_dirs_from_path(dest_path):
Expand All @@ -738,18 +742,24 @@ def strip_quad_dirs_from_path(dest_path):
Ensure that paths to uncompressed packages terminate in a trailing slash.
"""
UUID4_QUAD = re.compile(r"[0-9a-f]{4}\Z", re.I)
dest_path = dest_path.rstrip("/")
output_path, package_name = os.path.split(dest_path)
for _quad_dir in range(8):
head, tail = os.path.split(output_path)
if not re.match(UUID4_QUAD, tail):
continue
output_path = head
output_path = os.path.join(output_path, package_name)

dest_path = pathlib.Path(dest_path)
package_name = dest_path.name

filtered_parts = deque()
quad_dir_count = 0
for part in reversed(dest_path.parent.parts):
if quad_dir_count < 8 and UUID4_QUAD.match(part):
quad_dir_count += 1
else:
filtered_parts.appendleft(part)

output_path = pathlib.Path(*filtered_parts) / package_name

for extension in PACKAGE_EXTENSIONS:
if output_path.endswith(extension):
return output_path
return os.path.join(output_path, "")
if output_path.suffix == extension:
return str(output_path)
return str(output_path / "_")[:-1]


StorageEffects = namedtuple(
Expand All @@ -762,14 +772,15 @@ def recalculate_size(rein_aip_internal_path):
because of changed preservation derivatives or because of a metadata-only
reingest. If the AIP is a directory, then calculate the size recursively.
"""
if os.path.isdir(rein_aip_internal_path):
size = 0
for dirpath, ___, filenames in os.walk(rein_aip_internal_path):
for filename in filenames:
file_path = os.path.join(dirpath, filename)
size += os.path.getsize(file_path)
rein_aip_internal_path = pathlib.Path(rein_aip_internal_path)
size = 0

if rein_aip_internal_path.is_dir():
for file_path in rein_aip_internal_path.rglob("*"):
if file_path.is_file():
size += file_path.stat().st_size
else:
size = os.path.getsize(rein_aip_internal_path)
size = rein_aip_internal_path.stat().st_size
return size


Expand All @@ -778,10 +789,7 @@ def package_is_file(path):
or a directory. As paths are usually abstract, i.e. stored in the database,
we can't (usually) simply test whether the object is a file on disk.
"""
for ext in PACKAGE_EXTENSIONS:
if path.endswith(ext):
return True
return False
return pathlib.Path(path).suffix in PACKAGE_EXTENSIONS


def get_mimetype(path):
Expand Down
8 changes: 4 additions & 4 deletions tests/common/test_command_import_aip.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import os
import pathlib
import uuid

import pytest
from django.core.management import call_command
from locations import models

TEST_DIR = os.path.dirname(os.path.realpath(__file__))
FIXTURES_DIR = os.path.join(TEST_DIR, "fixtures")
AIP_PATH = os.path.join(FIXTURES_DIR, "import_aip_test.7z")
TEST_DIR = pathlib.Path(__file__).resolve().parent
FIXTURES_DIR = TEST_DIR / "fixtures"
AIP_PATH = FIXTURES_DIR / "import_aip_test.7z"


@pytest.fixture
Expand Down
Loading

0 comments on commit 0d3f029

Please sign in to comment.