diff --git a/requirements-dev.in b/requirements-dev.in
index ceecf7a3b..699b7fa50 100644
--- a/requirements-dev.in
+++ b/requirements-dev.in
@@ -4,7 +4,6 @@
pip-tools
pytest-cov
pytest-django
-pytest-mock
pytest-playwright
pytest-randomly
pytest
@@ -15,5 +14,6 @@ tox
# See https://github.com/microsoft/playwright-python/issues/2190
git+https://github.com/microsoft/playwright-python.git@d9cdfbb1e178b6770625e9f857139aff77516af0#egg=playwright
-# coverage 7.6.2 dropped support for Python 3.8, so pinning it for now.
+# These dependencies dropped support for Python 3.8, so pinning them for now.
coverage==7.6.1
+pytest-randomly==3.15.0
diff --git a/requirements-dev.txt b/requirements-dev.txt
index ba7298580..e981db5f6 100644
--- a/requirements-dev.txt
+++ b/requirements-dev.txt
@@ -12,9 +12,9 @@ asgiref==3.8.1
# django
bagit==1.8.1
# via -r requirements.txt
-boto3==1.35.45
+boto3==1.35.49
# via -r requirements.txt
-botocore==1.35.45
+botocore==1.35.49
# via
# -r requirements.txt
# boto3
@@ -258,7 +258,6 @@ pytest==8.3.3
# pytest-base-url
# pytest-cov
# pytest-django
- # pytest-mock
# pytest-playwright
# pytest-randomly
pytest-base-url==2.1.0
@@ -267,8 +266,6 @@ pytest-cov==5.0.0
# via -r requirements-dev.in
pytest-django==4.9.0
# via -r requirements-dev.in
-pytest-mock==3.14.0
- # via -r requirements-dev.in
pytest-playwright==0.5.2
# via -r requirements-dev.in
pytest-randomly==3.15.0
@@ -387,7 +384,7 @@ zope-event==5.0
# via
# -r requirements.txt
# gevent
-zope-interface==7.1.0
+zope-interface==7.1.1
# via
# -r requirements.txt
# gevent
diff --git a/requirements.in b/requirements.in
index c1ce1481a..ce3439f5c 100644
--- a/requirements.in
+++ b/requirements.in
@@ -5,7 +5,6 @@ brotli
Django>=4.2,<5
django-csp
django-tastypie
-dj-database-url
gunicorn
importlib_resources
jsonfield
@@ -31,7 +30,8 @@ django-cas-ng
# Required for OpenID Connect authentication
mozilla-django-oidc
-# gevent 24.10.1 dropped support for Python 3.8, so pinning it for now.
+# These dependencies dropped support for Python 3.8, so pinning them for now.
+dj-database-url==2.2.0
django-auth-ldap==5.0.0
gevent==24.2.1
pyparsing==3.1.4
diff --git a/requirements.txt b/requirements.txt
index 2f5871968..3e46541d8 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -10,9 +10,9 @@ asgiref==3.8.1
# via django
bagit==1.8.1
# via -r requirements.in
-boto3==1.35.45
+boto3==1.35.49
# via -r requirements.in
-botocore==1.35.45
+botocore==1.35.49
# via
# boto3
# s3transfer
@@ -219,7 +219,7 @@ zipp==3.20.2
# via importlib-resources
zope-event==5.0
# via gevent
-zope-interface==7.1.0
+zope-interface==7.1.1
# via gevent
# The following packages are considered to be unsafe in a requirements file:
diff --git a/tests/administration/test_views.py b/tests/administration/test_views.py
index 7906361ae..e51fcfe9f 100644
--- a/tests/administration/test_views.py
+++ b/tests/administration/test_views.py
@@ -9,7 +9,7 @@
@pytest.mark.parametrize(
- "check_output,expected_result",
+ "output,expected_result",
[
(
b"d9c93f388a770287cf6337d4f9bcbbe60c25fdb8\n",
@@ -19,8 +19,9 @@
],
ids=["success", "error"],
)
-def test_get_git_commit(check_output, expected_result, mocker):
- mocker.patch("subprocess.check_output", side_effect=[check_output])
+@mock.patch("subprocess.check_output")
+def test_get_git_commit(check_output, output, expected_result):
+ check_output.side_effect = [output]
assert get_git_commit() == expected_result
diff --git a/tests/common/conftest.py b/tests/common/conftest.py
index 7cf1f6129..4bdbbe135 100644
--- a/tests/common/conftest.py
+++ b/tests/common/conftest.py
@@ -1,12 +1,15 @@
+from unittest import mock
+
import pytest
@pytest.fixture(autouse=True)
-def do_not_reset_the_root_logger(mocker):
+def do_not_reset_the_root_logger():
# Some management commands suppress logging from models/package.py
# by calling logging.config.dictConfig directly which breaks the caplog
# fixture. See https://github.com/pytest-dev/pytest/discussions/11011
#
# This avoids breaking the caplog fixture when those management command
# modules are imported or called during tests.
- mocker.patch("logging.config")
+ with mock.patch("logging.config"):
+ yield
diff --git a/tests/common/test_utils.py b/tests/common/test_utils.py
index 7fdf13272..da47f0f65 100644
--- a/tests/common/test_utils.py
+++ b/tests/common/test_utils.py
@@ -1,6 +1,5 @@
import pathlib
import shutil
-import subprocess
import tarfile
from collections import namedtuple
from io import StringIO
@@ -160,10 +159,10 @@ def test_get_tool_info_command(compression, command):
)
@mock.patch("subprocess.check_output")
def test_get_compression_event_detail(
- mock_subprocess, compression, cmd_output, expected_detail
+ check_output, compression, cmd_output, expected_detail
):
# subprocess.check_output returns bytes in python3
- mock_subprocess.return_value = cmd_output.encode("utf8")
+ check_output.return_value = cmd_output.encode("utf8")
detail = utils.get_compression_event_detail(compression)
assert (
@@ -299,17 +298,19 @@ def test_package_is_file(package_path, is_file):
ExTarCase(path="/a/b/c", isdir=True, raises=True, expected="fail"),
],
)
-def test_extract_tar(mocker, path, will_be_dir, sp_raises, expected):
+@mock.patch("pathlib.Path.rename")
+@mock.patch("pathlib.Path.unlink")
+@mock.patch("pathlib.Path.is_dir")
+@mock.patch("subprocess.check_output")
+def test_extract_tar(
+ check_output, is_dir, unlink, rename, path, will_be_dir, sp_raises, expected
+):
if sp_raises:
- mocker.patch.object(subprocess, "check_output", side_effect=OSError("gotcha!"))
- else:
- mocker.patch.object(subprocess, "check_output")
- mocker.patch.object(pathlib.Path, "rename")
- mocker.patch.object(pathlib.Path, "unlink")
+ check_output.side_effect = OSError("gotcha!")
if will_be_dir:
- mocker.patch.object(pathlib.Path, "is_dir", return_value=True)
+ is_dir.return_value = True
else:
- mocker.patch.object(pathlib.Path, "is_dir", return_value=False)
+ is_dir.return_value = False
path = pathlib.Path(path)
tarpath_ext = path.with_suffix(".tar")
dirname = tarpath_ext.parent
@@ -324,9 +325,7 @@ def test_extract_tar(mocker, path, will_be_dir, sp_raises, expected):
tarpath_ext.rename.assert_any_call(path)
assert not pathlib.Path.unlink.called
path.rename.assert_any_call(tarpath_ext)
- subprocess.check_output.assert_called_once_with(
- ["tar", "-xf", tarpath_ext, "-C", dirname]
- )
+ check_output.assert_called_once_with(["tar", "-xf", tarpath_ext, "-C", dirname])
@pytest.mark.parametrize(
@@ -390,17 +389,29 @@ def test_extract_tar(mocker, path, will_be_dir, sp_raises, expected):
),
],
)
+@mock.patch("pathlib.Path.rename")
+@mock.patch("shutil.rmtree")
+@mock.patch("pathlib.Path.is_file")
+@mock.patch("tarfile.is_tarfile")
+@mock.patch("subprocess.check_output")
def test_create_tar(
- mocker, path, will_be_file, will_be_tar, sp_raises, expected, extension
+ check_output,
+ is_tarfile,
+ is_file,
+ rmtree,
+ rename,
+ path,
+ will_be_file,
+ will_be_tar,
+ sp_raises,
+ expected,
+ extension,
):
if sp_raises:
- mocker.patch.object(subprocess, "check_output", side_effect=OSError("gotcha!"))
- else:
- mocker.patch.object(subprocess, "check_output")
- mocker.patch.object(pathlib.Path, "is_file", return_value=will_be_file)
- mocker.patch.object(tarfile, "is_tarfile", return_value=will_be_tar)
- mocker.patch.object(pathlib.Path, "rename")
- mocker.patch.object(shutil, "rmtree")
+ check_output.side_effect = OSError("gotcha!")
+ is_file.return_value = will_be_file
+ is_tarfile.return_value = will_be_tar
+
fixed_path = pathlib.Path(path)
tarpath = fixed_path.with_suffix(".tar")
if expected == "success":
@@ -480,11 +491,12 @@ def test_strip_quad_dirs_from_path(input_path, expected_path):
(["tagmanifest-md5.txt"], "tagmanifest-md5.txt"),
],
)
-def test_find_tagmanifest(mocker, tmp_path, dir_listing, tagmanifest_file):
+@mock.patch("pathlib.Path.iterdir")
+def test_find_tagmanifest(iterdir, tmp_path, dir_listing, tagmanifest_file):
aip_path = tmp_path / "aip"
aip_path.mkdir()
mock_files = [aip_path / file_ for file_ in dir_listing]
- mocker.patch.object(pathlib.Path, "iterdir", return_value=mock_files)
+ iterdir.return_value = mock_files
if tagmanifest_file is None:
assert utils.find_tagmanifest(aip_path) is None
@@ -496,16 +508,15 @@ def test_find_tagmanifest(mocker, tmp_path, dir_listing, tagmanifest_file):
assert utils.find_tagmanifest(file_path) is None
-def test_generate_checksum_uncompressed_aip(mocker, tmp_path):
+@mock.patch("common.utils.find_tagmanifest")
+@mock.patch("pathlib.Path.is_dir", return_value=True)
+def test_generate_checksum_uncompressed_aip(is_dir, find_tag_manifest, tmp_path):
aip_path = tmp_path / "aip"
aip_path.mkdir()
tagmanifest = aip_path / "tagmanifest-md5.txt"
tagmanifest.write_text("some test data")
- mocker.patch.object(pathlib.Path, "is_dir", return_value=True)
- find_tag_manifest = mocker.patch(
- "common.utils.find_tagmanifest", return_value=tagmanifest
- )
+ find_tag_manifest.return_value = tagmanifest
utils.generate_checksum(aip_path)
find_tag_manifest.assert_called_once()
diff --git a/tests/locations/test_dspace_rest.py b/tests/locations/test_dspace_rest.py
index 4f3dace03..dd9a95709 100644
--- a/tests/locations/test_dspace_rest.py
+++ b/tests/locations/test_dspace_rest.py
@@ -4,6 +4,7 @@
import os
import subprocess
from collections import namedtuple
+from unittest import mock
from uuid import uuid4
import agentarchives
@@ -305,8 +306,23 @@ def add_digital_object(self, *args, **kwargs):
MoveFromCaseDIP(as_credentials_set=True, as_credentials_valid=as_ado_exc),
],
)
+@mock.patch("agentarchives.archivesspace.ArchivesSpaceClient")
+@mock.patch("requests.post")
+@mock.patch("os.walk")
+@mock.patch("os.listdir")
+@mock.patch("builtins.open")
+@mock.patch("os.remove")
+@mock.patch("subprocess.Popen", return_value=MockProcess(["fake-command"]))
+@mock.patch("os.path.isfile")
def test_move_from_storage_service(
- mocker,
+ isfile,
+ popen,
+ remove,
+ open,
+ listdir,
+ walk,
+ post,
+ archives_space_client,
package,
ds_aip_collection,
metadata,
@@ -317,7 +333,7 @@ def test_move_from_storage_service(
as_credentials_valid,
upload_to_tsm,
):
- mocker.patch("os.path.isfile", return_value=getattr(package, "isfile", True))
+ isfile.return_value = getattr(package, "isfile", True)
dspace_rest_space = DSpaceREST(
space=Space(),
ds_rest_url=DS_REST_URL,
@@ -354,12 +370,8 @@ def test_move_from_storage_service(
return
# Simple patches
- mocker.patch("subprocess.Popen", return_value=MockProcess(["fake-command"]))
- mocker.patch("lxml.etree.parse", return_value=etree.parse(package.fake_mets_file))
- mocker.patch("os.remove")
- mocker.patch("builtins.open")
- mocker.patch("os.listdir", return_value=[AIP_METS_FILENAME])
- mocker.patch("os.walk", return_value=[("", [], [AIP_METS_FILENAME])])
+ listdir.return_value = [AIP_METS_FILENAME]
+ walk.return_value = [("", [], [AIP_METS_FILENAME])]
# Patch ``requests.post``
def mock_requests_post(*args, **kwargs):
@@ -371,7 +383,7 @@ def mock_requests_post(*args, **kwargs):
raise ds_request_validity.exc
return FakeDSpaceRESTPOSTResponse()
- mocker.patch("requests.post", side_effect=mock_requests_post)
+ post.side_effect = mock_requests_post
# Patch ``agentarchives.archivesspace.ArchivesSpaceClient``
if (
@@ -380,48 +392,49 @@ def mock_requests_post(*args, **kwargs):
fake_as_client = FakeArchivesSpaceClient(exc=as_credentials_valid.exc)
else:
fake_as_client = FakeArchivesSpaceClient()
- mocker.patch(
- "agentarchives.archivesspace.ArchivesSpaceClient", return_value=fake_as_client
- )
+ archives_space_client.return_value = fake_as_client
if not as_credentials_valid.is_valid:
agentarchives.archivesspace.ArchivesSpaceClient.side_effect = (
as_credentials_valid.exc
)
- # Simulate AS request-related failure
- if not as_credentials_valid.is_valid:
- with pytest.raises(dspace_rest.DSpaceRESTException) as excinfo:
- dspace_rest_space.move_from_storage_service(
- package_source_path, AIP_DEST_PATH, package=package
- )
- return
-
- # Simulate AS "add digital object" failure
- if (
- not as_credentials_valid.is_valid
- ) and as_credentials_valid.method_that_raises == "constructor":
- with pytest.raises(dspace_rest.DSpaceRESTException) as excinfo:
- dspace_rest_space.move_from_storage_service(
- package_source_path, AIP_DEST_PATH, package=package
+ with mock.patch(
+ "lxml.etree.parse", return_value=etree.parse(package.fake_mets_file)
+ ) as parse:
+ # Simulate AS request-related failure
+ if not as_credentials_valid.is_valid:
+ with pytest.raises(dspace_rest.DSpaceRESTException) as excinfo:
+ dspace_rest_space.move_from_storage_service(
+ package_source_path, AIP_DEST_PATH, package=package
+ )
+ return
+
+ # Simulate AS "add digital object" failure
+ if (
+ not as_credentials_valid.is_valid
+ ) and as_credentials_valid.method_that_raises == "constructor":
+ with pytest.raises(dspace_rest.DSpaceRESTException) as excinfo:
+ dspace_rest_space.move_from_storage_service(
+ package_source_path, AIP_DEST_PATH, package=package
+ )
+ assert excinfo.value.message == (
+ "Error depositing to DSpace or ArchiveSpace: Could not login to"
+ f" ArchivesSpace server: {AS_URL_NO_PORT}, port: {AS_PORT}, user: {AS_USER}, repository:"
+ f" {AS_REPOSITORY}"
)
- assert excinfo.value.message == (
- "Error depositing to DSpace or ArchiveSpace: Could not login to"
- f" ArchivesSpace server: {AS_URL_NO_PORT}, port: {AS_PORT}, user: {AS_USER}, repository:"
- f" {AS_REPOSITORY}"
+ return
+
+ if not ds_request_validity.is_valid:
+ with pytest.raises(dspace_rest.DSpaceRESTException) as excinfo:
+ dspace_rest_space.move_from_storage_service(
+ package_source_path, AIP_DEST_PATH, package=package
+ )
+ return
+
+ # Call the test-targeting method in the happy path
+ dspace_rest_space.move_from_storage_service(
+ package_source_path, AIP_DEST_PATH, package=package
)
- return
-
- if not ds_request_validity.is_valid:
- with pytest.raises(dspace_rest.DSpaceRESTException) as excinfo:
- dspace_rest_space.move_from_storage_service(
- package_source_path, AIP_DEST_PATH, package=package
- )
- return
-
- # Call the test-targeting method in the happy path
- dspace_rest_space.move_from_storage_service(
- package_source_path, AIP_DEST_PATH, package=package
- )
# Assertions about the 4 requests.post calls:
# 1. login to DSpace,
@@ -435,14 +448,14 @@ def mock_requests_post(*args, **kwargs):
(_, actual_bitstream_args, actual_bitstream_kwargs),
actual_logout_call,
) = requests.post.mock_calls
- assert actual_login_call == mocker.call(
+ assert actual_login_call == mock.call(
DS_REST_LOGIN_URL,
cookies=None,
data={"password": DS_PASSWORD, "email": DS_EMAIL},
headers=None,
verify=VERIFY_SSL,
)
- assert actual_logout_call == mocker.call(
+ assert actual_logout_call == mock.call(
DS_REST_LOGOUT_URL,
cookies=COOKIES,
data=None,
@@ -462,7 +475,7 @@ def mock_requests_post(*args, **kwargs):
assert actual_bitstream_kwargs["verify"] == VERIFY_SSL
assert actual_bitstream_kwargs["cookies"] == COOKIES
assert actual_bitstream_kwargs["headers"] == JSON_HEADERS
- etree.parse.assert_called_once_with(package_mets_path)
+ parse.assert_called_once_with(package_mets_path)
if package.package_type == Package.DIP:
# No METS extraction happens with DIP, therefore no removal needed
diff --git a/tests/locations/test_duracloud.py b/tests/locations/test_duracloud.py
index a5e865004..5b49d2a7f 100644
--- a/tests/locations/test_duracloud.py
+++ b/tests/locations/test_duracloud.py
@@ -1,3 +1,5 @@
+from unittest import mock
+
import pytest
import requests
from locations.models import Duracloud
@@ -18,27 +20,33 @@ def test_duraspace_url(space):
@pytest.mark.django_db
-def test_browse(space, mocker):
- page1 = """
-
- - /foo/
- - /foo/bar/bar-a.zip
- - /foo/bar/bar-b.zip
- - /foo/foo-a.zip
- - /baz/
-
- """
- page2 = """
-
-
- """
- mocker.patch(
- "requests.Session.get",
- side_effect=[
- mocker.Mock(status_code=200, content=page1, spec=requests.Response),
- mocker.Mock(status_code=200, content=page2, spec=requests.Response),
- ],
- )
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[
+ mock.Mock(
+ status_code=200,
+ content="""
+
+ - /foo/
+ - /foo/bar/bar-a.zip
+ - /foo/bar/bar-b.zip
+ - /foo/foo-a.zip
+ - /baz/
+
+ """,
+ spec=requests.Response,
+ ),
+ mock.Mock(
+ status_code=200,
+ content="""
+
+
+ """,
+ spec=requests.Response,
+ ),
+ ],
+)
+def test_browse(get, space):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
result = d.browse("/foo")
@@ -51,30 +59,36 @@ def test_browse(space, mocker):
@pytest.mark.django_db
-def test_browse_strips_manifest_and_chunk_suffixes(space, mocker):
- page1 = """
-
- - /foo/
- - /foo/bar/bar-a.zip
- - /foo/bar/bar-b.zip.dura-manifest
- - /foo/bar/bar-b.zip.dura-chunk-0000
- - /foo/foo-a.zip
- - /foo/foo-b.zip.dura-manifest
- - /foo/foo-b.zip.dura-chunk-0000
- - /baz/
-
- """
- page2 = """
-
-
- """
- mocker.patch(
- "requests.Session.get",
- side_effect=[
- mocker.Mock(status_code=200, content=page1, spec=requests.Response),
- mocker.Mock(status_code=200, content=page2, spec=requests.Response),
- ],
- )
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[
+ mock.Mock(
+ status_code=200,
+ content="""
+
+ - /foo/
+ - /foo/bar/bar-a.zip
+ - /foo/bar/bar-b.zip.dura-manifest
+ - /foo/bar/bar-b.zip.dura-chunk-0000
+ - /foo/foo-a.zip
+ - /foo/foo-b.zip.dura-manifest
+ - /foo/foo-b.zip.dura-chunk-0000
+ - /baz/
+
+ """,
+ spec=requests.Response,
+ ),
+ mock.Mock(
+ status_code=200,
+ content="""
+
+
+ """,
+ spec=requests.Response,
+ ),
+ ],
+)
+def test_browse_strips_manifest_and_chunk_suffixes(get, space):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
result = d.browse("/foo")
@@ -87,11 +101,11 @@ def test_browse_strips_manifest_and_chunk_suffixes(space, mocker):
@pytest.mark.django_db
-def test_browse_fails_if_it_cannot_retrieve_files_initially(space, mocker):
- mocker.patch(
- "requests.Session.get",
- side_effect=[mocker.Mock(status_code=503, spec=requests.Response)],
- )
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[mock.Mock(status_code=503, spec=requests.Response)],
+)
+def test_browse_fails_if_it_cannot_retrieve_files_initially(get, space):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
with pytest.raises(StorageException, match="Unable to get list of files in /foo/"):
@@ -99,23 +113,26 @@ def test_browse_fails_if_it_cannot_retrieve_files_initially(space, mocker):
@pytest.mark.django_db
-def test_browse_fails_if_it_cannot_retrieve_additional_files(space, mocker):
- page1 = """
-
- - /foo/
- - /foo/bar/bar-a.zip
- - /foo/bar/bar-b.zip
- - /foo/foo-a.zip
- - /baz/
-
- """
- mocker.patch(
- "requests.Session.get",
- side_effect=[
- mocker.Mock(status_code=200, content=page1, spec=requests.Response),
- mocker.Mock(status_code=503, spec=requests.Response),
- ],
- )
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[
+ mock.Mock(
+ status_code=200,
+ content="""
+
+ - /foo/
+ - /foo/bar/bar-a.zip
+ - /foo/bar/bar-b.zip
+ - /foo/foo-a.zip
+ - /baz/
+
+ """,
+ spec=requests.Response,
+ ),
+ mock.Mock(status_code=503, spec=requests.Response),
+ ],
+)
+def test_browse_fails_if_it_cannot_retrieve_additional_files(get, space):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
with pytest.raises(
@@ -125,103 +142,99 @@ def test_browse_fails_if_it_cannot_retrieve_additional_files(space, mocker):
@pytest.mark.django_db
-def test_delete_path_deletes_file(space, mocker):
- delete = mocker.patch(
- "requests.Session.delete",
- side_effect=[mocker.Mock(status_code=200, spec=requests.Response)],
- )
+@mock.patch(
+ "requests.Session.delete",
+ side_effect=[mock.Mock(status_code=200, spec=requests.Response)],
+)
+def test_delete_path_deletes_file(delete, space):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
d.delete_path("some/file.zip")
assert delete.mock_calls == [
- mocker.call("https://duracloud.org/durastore/myspace/some/file.zip")
+ mock.call("https://duracloud.org/durastore/myspace/some/file.zip")
]
@pytest.mark.django_db
-def test_delete_path_deletes_chunked_file(space, mocker):
- delete = mocker.patch(
- "requests.Session.delete",
- side_effect=[
- mocker.Mock(status_code=404, spec=requests.Response),
- mocker.Mock(status_code=200, spec=requests.Response),
- ],
- )
- get = mocker.patch(
- "requests.Session.get",
- side_effect=[
- mocker.Mock(
- status_code=200,
- content=b"""\
+@mock.patch(
+ "requests.Session.delete",
+ side_effect=[
+ mock.Mock(status_code=404, spec=requests.Response),
+ mock.Mock(status_code=200, spec=requests.Response),
+ ],
+)
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[
+ mock.Mock(
+ status_code=200,
+ content=b"""\
8084
dcbfdd6ff7f78194e1084f900514a194
- """,
- spec=requests.Response,
- )
- ],
- )
+ """,
+ spec=requests.Response,
+ )
+ ],
+)
+def test_delete_path_deletes_chunked_file(get, delete, space):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
d.delete_path("some/file.zip")
assert delete.mock_calls == [
- mocker.call("https://duracloud.org/durastore/myspace/some/file.zip"),
- mocker.call(
+ mock.call("https://duracloud.org/durastore/myspace/some/file.zip"),
+ mock.call(
"https://duracloud.org/durastore/myspace/some/file.zip.dura-manifest"
),
]
assert get.mock_calls == [
- mocker.call(
- "https://duracloud.org/durastore/myspace/some/file.zip.dura-manifest"
- )
+ mock.call("https://duracloud.org/durastore/myspace/some/file.zip.dura-manifest")
]
@pytest.mark.django_db
-def test_delete_path_deletes_folder(space, mocker):
- delete = mocker.patch(
- "requests.Session.delete",
- side_effect=[
- mocker.Mock(status_code=404, spec=requests.Response),
- mocker.Mock(status_code=200, spec=requests.Response),
- mocker.Mock(status_code=200, spec=requests.Response),
- ],
- )
- get = mocker.patch(
- "requests.Session.get",
- side_effect=[mocker.Mock(status_code=404, ok=False, spec=requests.Response)],
- )
- mocker.patch(
- "locations.models.duracloud.Duracloud._get_files_list",
- side_effect=[["some/folder/a.zip", "some/folder/b.zip"]],
- )
+@mock.patch(
+ "requests.Session.delete",
+ side_effect=[
+ mock.Mock(status_code=404, spec=requests.Response),
+ mock.Mock(status_code=200, spec=requests.Response),
+ mock.Mock(status_code=200, spec=requests.Response),
+ ],
+)
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[mock.Mock(status_code=404, ok=False, spec=requests.Response)],
+)
+@mock.patch(
+ "locations.models.duracloud.Duracloud._get_files_list",
+ side_effect=[["some/folder/a.zip", "some/folder/b.zip"]],
+)
+def test_delete_path_deletes_folder(_get_files_list, get, delete, space):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
d.delete_path("some/folder")
assert delete.mock_calls == [
- mocker.call("https://duracloud.org/durastore/myspace/some/folder"),
- mocker.call("https://duracloud.org/durastore/myspace/some/folder/a.zip"),
- mocker.call("https://duracloud.org/durastore/myspace/some/folder/b.zip"),
+ mock.call("https://duracloud.org/durastore/myspace/some/folder"),
+ mock.call("https://duracloud.org/durastore/myspace/some/folder/a.zip"),
+ mock.call("https://duracloud.org/durastore/myspace/some/folder/b.zip"),
]
assert get.mock_calls == [
- mocker.call("https://duracloud.org/durastore/myspace/some/folder.dura-manifest")
+ mock.call("https://duracloud.org/durastore/myspace/some/folder.dura-manifest")
]
@pytest.mark.django_db
-def test_move_to_storage_service_downloads_file(space, mocker, tmp_path):
- mocker.patch(
- "requests.Session.send",
- side_effect=[
- mocker.Mock(status_code=200, content=b"a file", spec=requests.Response)
- ],
- )
+@mock.patch(
+ "requests.Session.send",
+ side_effect=[mock.Mock(status_code=200, content=b"a file", spec=requests.Response)],
+)
+def test_move_to_storage_service_downloads_file(send, space, tmp_path):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
dst = tmp_path / "dst" / "file.txt"
@@ -231,21 +244,20 @@ def test_move_to_storage_service_downloads_file(space, mocker, tmp_path):
@pytest.mark.django_db
-def test_move_to_storage_service_downloads_chunked_file(space, mocker, tmp_path):
- mocker.patch(
- "requests.Session.send",
- side_effect=[
- mocker.Mock(status_code=404, spec=requests.Response),
- mocker.Mock(status_code=200, content=b"a ch", spec=requests.Response),
- mocker.Mock(status_code=200, content=b"unked file", spec=requests.Response),
- ],
- )
- mocker.patch(
- "requests.Session.get",
- side_effect=[
- mocker.Mock(
- status_code=200,
- content=b"""\
+@mock.patch(
+ "requests.Session.send",
+ side_effect=[
+ mock.Mock(status_code=404, spec=requests.Response),
+ mock.Mock(status_code=200, content=b"a ch", spec=requests.Response),
+ mock.Mock(status_code=200, content=b"unked file", spec=requests.Response),
+ ],
+)
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[
+ mock.Mock(
+ status_code=200,
+ content=b"""\
@@ -266,11 +278,12 @@ def test_move_to_storage_service_downloads_chunked_file(space, mocker, tmp_path)
- """,
- spec=requests.Response,
- ),
- ],
- )
+ """,
+ spec=requests.Response,
+ ),
+ ],
+)
+def test_move_to_storage_service_downloads_chunked_file(get, send, space, tmp_path):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
dst = tmp_path / "dst" / "file.txt"
@@ -280,23 +293,25 @@ def test_move_to_storage_service_downloads_chunked_file(space, mocker, tmp_path)
@pytest.mark.django_db
-def test_move_to_storage_service_downloads_folder(space, mocker, tmp_path):
- mocker.patch(
- "requests.Session.send",
- side_effect=[
- mocker.Mock(status_code=404, spec=requests.Response),
- mocker.Mock(status_code=200, content=b"file A", spec=requests.Response),
- mocker.Mock(status_code=200, content=b"file B", spec=requests.Response),
- ],
- )
- mocker.patch(
- "requests.Session.get",
- side_effect=[mocker.Mock(status_code=404, ok=False, spec=requests.Response)],
- )
- mocker.patch(
- "locations.models.duracloud.Duracloud._get_files_list",
- side_effect=[["some/folder/a.txt", "some/folder/b.txt"]],
- )
+@mock.patch(
+ "requests.Session.send",
+ side_effect=[
+ mock.Mock(status_code=404, spec=requests.Response),
+ mock.Mock(status_code=200, content=b"file A", spec=requests.Response),
+ mock.Mock(status_code=200, content=b"file B", spec=requests.Response),
+ ],
+)
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[mock.Mock(status_code=404, ok=False, spec=requests.Response)],
+)
+@mock.patch(
+ "locations.models.duracloud.Duracloud._get_files_list",
+ side_effect=[["some/folder/a.txt", "some/folder/b.txt"]],
+)
+def test_move_to_storage_service_downloads_folder(
+ _get_files_list, get, send, space, tmp_path
+):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
dst = tmp_path / "dst"
@@ -309,13 +324,13 @@ def test_move_to_storage_service_downloads_folder(space, mocker, tmp_path):
@pytest.mark.django_db
+@mock.patch(
+ "requests.Session.send",
+ side_effect=[mock.Mock(status_code=503, spec=requests.Response)],
+)
def test_move_to_storage_service_fails_if_it_cannot_download_file(
- space, mocker, tmp_path
+ send, space, tmp_path
):
- mocker.patch(
- "requests.Session.send",
- side_effect=[mocker.Mock(status_code=503, spec=requests.Response)],
- )
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
dst = tmp_path / "dst" / "file.txt"
@@ -327,24 +342,21 @@ def test_move_to_storage_service_fails_if_it_cannot_download_file(
@pytest.mark.django_db
-def test_move_to_storage_service_retries_after_chunk_download_errors(
- space, mocker, tmp_path
-):
- send = mocker.patch(
- "requests.Session.send",
- side_effect=[
- mocker.Mock(status_code=404, spec=requests.Response),
- mocker.Mock(status_code=200, content=b"a", spec=requests.Response),
- mocker.Mock(status_code=200, content=b"a ch", spec=requests.Response),
- mocker.Mock(status_code=200, content=b"unked file", spec=requests.Response),
- ],
- )
- mocker.patch(
- "requests.Session.get",
- side_effect=[
- mocker.Mock(
- status_code=200,
- content=b"""\
+@mock.patch(
+ "requests.Session.send",
+ side_effect=[
+ mock.Mock(status_code=404, spec=requests.Response),
+ mock.Mock(status_code=200, content=b"a", spec=requests.Response),
+ mock.Mock(status_code=200, content=b"a ch", spec=requests.Response),
+ mock.Mock(status_code=200, content=b"unked file", spec=requests.Response),
+ ],
+)
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[
+ mock.Mock(
+ status_code=200,
+ content=b"""\
@@ -365,11 +377,14 @@ def test_move_to_storage_service_retries_after_chunk_download_errors(
- """,
- spec=requests.Response,
- ),
- ],
- )
+ """,
+ spec=requests.Response,
+ ),
+ ],
+)
+def test_move_to_storage_service_retries_after_chunk_download_errors(
+ get, send, space, tmp_path
+):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
dst = tmp_path / "dst" / "file.txt"
@@ -382,26 +397,23 @@ def test_move_to_storage_service_retries_after_chunk_download_errors(
@pytest.mark.django_db
-def test_move_to_storage_service_fails_if_chunk_size_does_not_match(
- space, mocker, tmp_path
-):
- send = mocker.patch(
- "requests.Session.send",
- side_effect=[
- mocker.Mock(status_code=404, spec=requests.Response),
- # Fail all the download retry attempts.
- mocker.Mock(status_code=200, content=b"ERRORERROR", spec=requests.Response),
- mocker.Mock(status_code=200, content=b"ERRORERROR", spec=requests.Response),
- mocker.Mock(status_code=200, content=b"ERRORERROR", spec=requests.Response),
- mocker.Mock(status_code=200, content=b"unked file", spec=requests.Response),
- ],
- )
- mocker.patch(
- "requests.Session.get",
- side_effect=[
- mocker.Mock(
- status_code=200,
- content=b"""\
+@mock.patch(
+ "requests.Session.send",
+ side_effect=[
+ mock.Mock(status_code=404, spec=requests.Response),
+ # Fail all the download retry attempts.
+ mock.Mock(status_code=200, content=b"ERRORERROR", spec=requests.Response),
+ mock.Mock(status_code=200, content=b"ERRORERROR", spec=requests.Response),
+ mock.Mock(status_code=200, content=b"ERRORERROR", spec=requests.Response),
+ mock.Mock(status_code=200, content=b"unked file", spec=requests.Response),
+ ],
+)
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[
+ mock.Mock(
+ status_code=200,
+ content=b"""\
@@ -423,10 +435,13 @@ def test_move_to_storage_service_fails_if_chunk_size_does_not_match(
""",
- spec=requests.Response,
- ),
- ],
- )
+ spec=requests.Response,
+ ),
+ ],
+)
+def test_move_to_storage_service_fails_if_chunk_size_does_not_match(
+ get, send, space, tmp_path
+):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
dst = tmp_path / "dst" / "file.txt"
@@ -449,23 +464,20 @@ def test_move_to_storage_service_fails_if_chunk_size_does_not_match(
@pytest.mark.django_db
-def test_move_to_storage_service_fails_if_chunk_checksum_does_not_match(
- space, mocker, tmp_path
-):
- send = mocker.patch(
- "requests.Session.send",
- side_effect=[
- mocker.Mock(status_code=404, spec=requests.Response),
- mocker.Mock(status_code=200, content=b"a ch", spec=requests.Response),
- mocker.Mock(status_code=200, content=b"unked file", spec=requests.Response),
- ],
- )
- mocker.patch(
- "requests.Session.get",
- side_effect=[
- mocker.Mock(
- status_code=200,
- content=b"""\
+@mock.patch(
+ "requests.Session.send",
+ side_effect=[
+ mock.Mock(status_code=404, spec=requests.Response),
+ mock.Mock(status_code=200, content=b"a ch", spec=requests.Response),
+ mock.Mock(status_code=200, content=b"unked file", spec=requests.Response),
+ ],
+)
+@mock.patch(
+ "requests.Session.get",
+ side_effect=[
+ mock.Mock(
+ status_code=200,
+ content=b"""\
@@ -487,10 +499,13 @@ def test_move_to_storage_service_fails_if_chunk_checksum_does_not_match(
""",
- spec=requests.Response,
- ),
- ],
- )
+ spec=requests.Response,
+ ),
+ ],
+)
+def test_move_to_storage_service_fails_if_chunk_checksum_does_not_match(
+ get, send, space, tmp_path
+):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
dst = tmp_path / "dst" / "file.txt"
@@ -515,11 +530,11 @@ def test_move_to_storage_service_fails_if_chunk_checksum_does_not_match(
@pytest.mark.django_db
-def test_move_from_storage_service_uploads_file(space, mocker, tmp_path):
- put = mocker.patch(
- "requests.Session.put",
- side_effect=[mocker.Mock(status_code=201, spec=requests.Response)],
- )
+@mock.patch(
+ "requests.Session.put",
+ side_effect=[mock.Mock(status_code=201, spec=requests.Response)],
+)
+def test_move_from_storage_service_uploads_file(put, space, tmp_path):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
src = tmp_path / "src"
src.mkdir()
@@ -529,9 +544,9 @@ def test_move_from_storage_service_uploads_file(space, mocker, tmp_path):
d.move_from_storage_service(f.as_posix(), "some/file.txt")
assert put.mock_calls == [
- mocker.call(
+ mock.call(
"https://duracloud.org/durastore/myspace/some/file.txt",
- data=mocker.ANY,
+ data=mock.ANY,
headers={
"Content-MD5": "d6d0c756fb8abfb33e652a20e85b70bc",
"Content-Type": "text/plain",
@@ -541,15 +556,15 @@ def test_move_from_storage_service_uploads_file(space, mocker, tmp_path):
@pytest.mark.django_db
-def test_move_from_storage_service_uploads_chunked_file(space, mocker, tmp_path):
- put = mocker.patch(
- "requests.Session.put",
- side_effect=[
- mocker.Mock(status_code=201, spec=requests.Response),
- mocker.Mock(status_code=201, spec=requests.Response),
- mocker.Mock(status_code=201, spec=requests.Response),
- ],
- )
+@mock.patch(
+ "requests.Session.put",
+ side_effect=[
+ mock.Mock(status_code=201, spec=requests.Response),
+ mock.Mock(status_code=201, spec=requests.Response),
+ mock.Mock(status_code=201, spec=requests.Response),
+ ],
+)
+def test_move_from_storage_service_uploads_chunked_file(put, space, tmp_path):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
d.CHUNK_SIZE = 4
d.BUFFER_SIZE = 2
@@ -561,33 +576,33 @@ def test_move_from_storage_service_uploads_chunked_file(space, mocker, tmp_path)
d.move_from_storage_service(f.as_posix(), "some/file.txt")
assert put.mock_calls == [
- mocker.call(
+ mock.call(
"https://duracloud.org/durastore/myspace/some/file.txt.dura-chunk-0000",
- data=mocker.ANY,
+ data=mock.ANY,
headers={"Content-MD5": "417a095d4148cb184601f536fb626765"},
),
- mocker.call(
+ mock.call(
"https://duracloud.org/durastore/myspace/some/file.txt.dura-chunk-0001",
- data=mocker.ANY,
+ data=mock.ANY,
headers={"Content-MD5": "d9180594744f870aeefb086982e980bb"},
),
- mocker.call(
+ mock.call(
"https://duracloud.org/durastore/myspace/some/file.txt.dura-manifest",
- data=mocker.ANY,
+ data=mock.ANY,
headers={"Content-MD5": "59e5f62e5ed85ba339e73db5756e57c7"},
),
]
@pytest.mark.django_db
-def test_move_from_storage_service_uploads_folder_contents(space, mocker, tmp_path):
- put = mocker.patch(
- "requests.Session.put",
- side_effect=[
- mocker.Mock(status_code=201, spec=requests.Response),
- mocker.Mock(status_code=201, spec=requests.Response),
- ],
- )
+@mock.patch(
+ "requests.Session.put",
+ side_effect=[
+ mock.Mock(status_code=201, spec=requests.Response),
+ mock.Mock(status_code=201, spec=requests.Response),
+ ],
+)
+def test_move_from_storage_service_uploads_folder_contents(put, space, tmp_path):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
src = tmp_path / "src"
src.mkdir()
@@ -598,17 +613,17 @@ def test_move_from_storage_service_uploads_folder_contents(space, mocker, tmp_pa
put.assert_has_calls(
[
- mocker.call(
+ mock.call(
"https://duracloud.org/durastore/myspace/some/folder//a.txt",
- data=mocker.ANY,
+ data=mock.ANY,
headers={
"Content-MD5": "31d97c4d04593b21b399ace73b061c34",
"Content-Type": "text/plain",
},
),
- mocker.call(
+ mock.call(
"https://duracloud.org/durastore/myspace/some/folder//b.txt",
- data=mocker.ANY,
+ data=mock.ANY,
headers={
"Content-MD5": "1651d570b74339e94cace90cde7d3147",
"Content-Type": "text/plain",
@@ -620,13 +635,13 @@ def test_move_from_storage_service_uploads_folder_contents(space, mocker, tmp_pa
@pytest.mark.django_db
+@mock.patch(
+ "requests.Session.put",
+ side_effect=mock.Mock(status_code=503, spec=requests.Response),
+)
def test_move_from_storage_service_fails_uploading_after_exceeding_retries(
- space, mocker, tmp_path
+ put, space, tmp_path
):
- put = mocker.patch(
- "requests.Session.put",
- side_effect=mocker.Mock(status_code=503, spec=requests.Response),
- )
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
src = tmp_path / "src"
src.mkdir()
@@ -641,11 +656,11 @@ def test_move_from_storage_service_fails_uploading_after_exceeding_retries(
@pytest.mark.django_db
-def test_move_from_storage_service_reraises_requests_exception(space, mocker, tmp_path):
- put = mocker.patch(
- "requests.Session.put",
- side_effect=requests.exceptions.ConnectionError(),
- )
+@mock.patch(
+ "requests.Session.put",
+ side_effect=requests.exceptions.ConnectionError(),
+)
+def test_move_from_storage_service_reraises_requests_exception(put, space, tmp_path):
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
src = tmp_path / "src"
src.mkdir()
@@ -671,11 +686,11 @@ def test_move_from_storage_service_fails_if_source_file_does_not_exist(space, tm
@pytest.mark.django_db
+@mock.patch("os.path.exists", return_value=True)
def test_move_from_storage_service_fails_if_source_file_cannot_be_determined(
- space, mocker, tmp_path
+ exists, space, tmp_path
):
# Mocking exists like this forces all move_from_storage_service conditions to fail.
- mocker.patch("os.path.exists", return_value=True)
d = Duracloud.objects.create(space=space, host="duracloud.org", duraspace="myspace")
src = tmp_path / "src"
src.mkdir()
diff --git a/tests/locations/test_gpg.py b/tests/locations/test_gpg.py
index 0a959b43b..657df8f4b 100644
--- a/tests/locations/test_gpg.py
+++ b/tests/locations/test_gpg.py
@@ -6,6 +6,7 @@
from collections import namedtuple
from typing import Any
from typing import Dict
+from unittest import mock
import pytest
from common import gpgutils
@@ -120,48 +121,50 @@ def should_have_pointer_file(self):
],
)
def test_move_to_storage_service(
- mocker, src_path, dst_path, src_exists1, src_exists2, encr_path, expect
+ src_path, dst_path, src_exists1, src_exists2, encr_path, expect
):
gpg_space = gpg.GPG(key=SOME_FINGERPRINT, space=space.Space())
- mocker.patch.object(gpg_space.space, "create_local_directory")
- mocker.patch.object(gpg_space.space, "move_rsync")
- mocker.patch.object(gpg, "_gpg_decrypt")
- mocker.patch.object(gpg, "_gpg_encrypt")
- mocker.patch.object(
- gpg, "_encr_path2key_fingerprint", return_value=SOME_FINGERPRINT
- )
- mocker.patch.object(gpg, "_get_encrypted_path", return_value=encr_path)
- mocker.patch.object(os.path, "exists", side_effect=(src_exists1, src_exists2))
- if expect == "success":
- ret = gpg_space.move_to_storage_service(src_path, dst_path, None)
- assert ret is None
- else:
- with pytest.raises(gpg.GPGException) as excinfo:
- gpg_space.move_to_storage_service(src_path, dst_path, None)
- if not encr_path:
- assert (
- f"Unable to move {src_path}; this file/dir does not exist;"
- " nor is it in an encrypted directory." == str(excinfo.value)
- )
- if not src_exists2:
- assert (
- f"Unable to move {src_path}; this file/dir does not"
- " exist, not even in encrypted directory"
- f" {encr_path}." == str(excinfo.value)
- )
- if src_exists2 and encr_path:
- gpg_space.space.move_rsync.assert_called_once_with(src_path, dst_path)
- else:
- assert not gpg_space.space.move_rsync.called
- if src_exists1:
- gpg._gpg_decrypt.assert_called_once_with(dst_path)
- assert not gpg._gpg_encrypt.called
- else:
- gpg._get_encrypted_path.assert_called_once_with(src_path)
- if encr_path:
- gpg._gpg_encrypt.assert_called_once_with(encr_path, SOME_FINGERPRINT)
- gpg._gpg_decrypt.assert_called_once_with(encr_path)
- gpg_space.space.create_local_directory.assert_called_once_with(dst_path)
+ with (
+ mock.patch.object(gpg_space.space, "create_local_directory"),
+ mock.patch.object(gpg_space.space, "move_rsync"),
+ mock.patch.object(gpg, "_gpg_decrypt"),
+ mock.patch.object(gpg, "_gpg_encrypt"),
+ mock.patch.object(
+ gpg, "_encr_path2key_fingerprint", return_value=SOME_FINGERPRINT
+ ),
+ mock.patch.object(gpg, "_get_encrypted_path", return_value=encr_path),
+ mock.patch.object(os.path, "exists", side_effect=(src_exists1, src_exists2)),
+ ):
+ if expect == "success":
+ ret = gpg_space.move_to_storage_service(src_path, dst_path, None)
+ assert ret is None
+ else:
+ with pytest.raises(gpg.GPGException) as excinfo:
+ gpg_space.move_to_storage_service(src_path, dst_path, None)
+ if not encr_path:
+ assert (
+ f"Unable to move {src_path}; this file/dir does not exist;"
+ " nor is it in an encrypted directory." == str(excinfo.value)
+ )
+ if not src_exists2:
+ assert (
+ f"Unable to move {src_path}; this file/dir does not"
+ " exist, not even in encrypted directory"
+ f" {encr_path}." == str(excinfo.value)
+ )
+ if src_exists2 and encr_path:
+ gpg_space.space.move_rsync.assert_called_once_with(src_path, dst_path)
+ else:
+ assert not gpg_space.space.move_rsync.called
+ if src_exists1:
+ gpg._gpg_decrypt.assert_called_once_with(dst_path)
+ assert not gpg._gpg_encrypt.called
+ else:
+ gpg._get_encrypted_path.assert_called_once_with(src_path)
+ if encr_path:
+ gpg._gpg_encrypt.assert_called_once_with(encr_path, SOME_FINGERPRINT)
+ gpg._gpg_decrypt.assert_called_once_with(encr_path)
+ gpg_space.space.create_local_directory.assert_called_once_with(dst_path)
@pytest.mark.parametrize(
diff --git a/tests/locations/test_space.py b/tests/locations/test_space.py
index f51f481fc..87504e17e 100644
--- a/tests/locations/test_space.py
+++ b/tests/locations/test_space.py
@@ -3,6 +3,7 @@
import subprocess
from os import makedirs
from os import scandir
+from unittest import mock
import pytest
from locations.models import LocalFilesystem
@@ -55,12 +56,10 @@ def tree(tmpdir):
return result
+@mock.patch("common.utils.get_setting", return_value=False)
def test_path2browse_dict_object_counting_ignores_read_protected_directories(
- tree, mocker
+ get_setting, tree
):
- # Enable object counting in spaces.
- mocker.patch("common.utils.get_setting", return_value=False)
-
# Count objects in the tree without access restrictions.
result = path2browse_dict(str(tree))
assert result == {
@@ -76,63 +75,63 @@ def test_path2browse_dict_object_counting_ignores_read_protected_directories(
}
# Restrict read access to the "empty" directory.
- mocker.patch("os.scandir", side_effect=_restrict_access_to(tree.join("empty")))
- assert path2browse_dict(str(tree)) == {
- "directories": ["empty", "first", "second"],
- "entries": ["empty", "error.txt", "first", "second", "tree_a.txt"],
- "properties": {
- "empty": {"object count": 0},
- "error.txt": {"size": 8},
- "first": {"object count": 2},
- "second": {"object count": 2},
- "tree_a.txt": {"size": 6},
- },
- }
+ with mock.patch("os.scandir", side_effect=_restrict_access_to(tree.join("empty"))):
+ assert path2browse_dict(str(tree)) == {
+ "directories": ["empty", "first", "second"],
+ "entries": ["empty", "error.txt", "first", "second", "tree_a.txt"],
+ "properties": {
+ "empty": {"object count": 0},
+ "error.txt": {"size": 8},
+ "first": {"object count": 2},
+ "second": {"object count": 2},
+ "tree_a.txt": {"size": 6},
+ },
+ }
# Restrict read access to the "first" directory.
- mocker.patch("os.scandir", side_effect=_restrict_access_to(tree.join("first")))
- assert path2browse_dict(str(tree)) == {
- "directories": ["empty", "first", "second"],
- "entries": ["empty", "error.txt", "first", "second", "tree_a.txt"],
- "properties": {
- "empty": {"object count": 0},
- "error.txt": {"size": 8},
- "first": {"object count": 0},
- "second": {"object count": 2},
- "tree_a.txt": {"size": 6},
- },
- }
+ with mock.patch("os.scandir", side_effect=_restrict_access_to(tree.join("first"))):
+ assert path2browse_dict(str(tree)) == {
+ "directories": ["empty", "first", "second"],
+ "entries": ["empty", "error.txt", "first", "second", "tree_a.txt"],
+ "properties": {
+ "empty": {"object count": 0},
+ "error.txt": {"size": 8},
+ "first": {"object count": 0},
+ "second": {"object count": 2},
+ "tree_a.txt": {"size": 6},
+ },
+ }
# Restrict read access to the "second" directory.
- mocker.patch("os.scandir", side_effect=_restrict_access_to(tree.join("second")))
- assert path2browse_dict(str(tree)) == {
- "directories": ["empty", "first", "second"],
- "entries": ["empty", "error.txt", "first", "second", "tree_a.txt"],
- "properties": {
- "empty": {"object count": 0},
- "error.txt": {"size": 8},
- "first": {"object count": 2},
- "second": {"object count": 0},
- "tree_a.txt": {"size": 6},
- },
- }
+ with mock.patch("os.scandir", side_effect=_restrict_access_to(tree.join("second"))):
+ assert path2browse_dict(str(tree)) == {
+ "directories": ["empty", "first", "second"],
+ "entries": ["empty", "error.txt", "first", "second", "tree_a.txt"],
+ "properties": {
+ "empty": {"object count": 0},
+ "error.txt": {"size": 8},
+ "first": {"object count": 2},
+ "second": {"object count": 0},
+ "tree_a.txt": {"size": 6},
+ },
+ }
# Restrict read access to the "third" directory (child of the "second" directory).
- mocker.patch(
+ with mock.patch(
"os.scandir",
side_effect=_restrict_access_to(tree.join("second").join("third")),
- )
- assert path2browse_dict(str(tree)) == {
- "directories": ["empty", "first", "second"],
- "entries": ["empty", "error.txt", "first", "second", "tree_a.txt"],
- "properties": {
- "empty": {"object count": 0},
- "error.txt": {"size": 8},
- "first": {"object count": 2},
- "second": {"object count": 1},
- "tree_a.txt": {"size": 6},
- },
- }
+ ):
+ assert path2browse_dict(str(tree)) == {
+ "directories": ["empty", "first", "second"],
+ "entries": ["empty", "error.txt", "first", "second", "tree_a.txt"],
+ "properties": {
+ "empty": {"object count": 0},
+ "error.txt": {"size": 8},
+ "first": {"object count": 2},
+ "second": {"object count": 1},
+ "tree_a.txt": {"size": 6},
+ },
+ }
# AIP store directory structure with components of the quad structure
@@ -281,7 +280,7 @@ def aipstore_compressed(tmpdir):
],
)
def test_delete_compressed_path_local(
- aipstore_compressed, path, aip_name, mocker, remaining_directory, deleted_directory
+ aipstore_compressed, path, aip_name, remaining_directory, deleted_directory
):
"""Test that local compressed or packaged paths e.g. tar.gz are
deleted as expected.
@@ -293,14 +292,13 @@ def test_delete_compressed_path_local(
# rmtree is used to delete directories, we want to make sure we
# don't call it from the delete function.
- mocker.patch("shutil.rmtree", side_effect=shutil.rmtree)
-
- # Make sure there is something to delete and delete it.
- assert os.path.exists(path_to_delete)
- sp._delete_path_local(path_to_delete)
+ with mock.patch("shutil.rmtree") as rmtree:
+ # Make sure there is something to delete and delete it.
+ assert os.path.exists(path_to_delete)
+ sp._delete_path_local(path_to_delete)
- # Verify that we called shutil was not called to remove a file.
- shutil.rmtree.assert_not_called()
+ # Verify that we called shutil was not called to remove a file.
+ rmtree.assert_not_called()
# Make sure the path is gone.
assert not os.path.exists(path_to_delete)
@@ -450,7 +448,6 @@ def test_delete_uncompressed_path_local(
aip_name,
remaining_directory,
deleted_directory,
- mocker,
):
"""Test that local uncompressed paths are deleted as expected."""
@@ -460,14 +457,13 @@ def test_delete_uncompressed_path_local(
# rmtree is used to delete directories, we want to make sure we
# do call it from the delete function.
- mocker.patch("shutil.rmtree", side_effect=shutil.rmtree)
-
- # Make sure there is something to delete and delete it.
- assert os.path.exists(path_to_delete)
- sp._delete_path_local(path_to_delete)
+ with mock.patch("shutil.rmtree", side_effect=shutil.rmtree) as rmtree:
+ # Make sure there is something to delete and delete it.
+ assert os.path.exists(path_to_delete)
+ sp._delete_path_local(path_to_delete)
- # Verify that we called shutil to remove a directory, not a file.
- shutil.rmtree.assert_called_with(path_to_delete)
+ # Verify that we called shutil to remove a directory, not a file.
+ rmtree.assert_called_with(path_to_delete)
# Make sure the path is gone.
assert not os.path.exists(path_to_delete)
@@ -522,13 +518,13 @@ def test_delete_non_existant_path_local(tmpdir, aipstore_uncompressed):
assert os.path.exists(remaining_aip)
-def test_move_rsync_command_decodes_paths(mocker):
- popen = mocker.patch(
- "subprocess.Popen",
- return_value=mocker.Mock(
- **{"communicate.return_value": ("command output", None), "returncode": 0}
- ),
- )
+@mock.patch(
+ "subprocess.Popen",
+ return_value=mock.Mock(
+ **{"communicate.return_value": ("command output", None), "returncode": 0}
+ ),
+)
+def test_move_rsync_command_decodes_paths(popen):
space = Space()
space.move_rsync("source_dir", "destination_dir")
@@ -549,20 +545,21 @@ def test_move_rsync_command_decodes_paths(mocker):
)
-def test_create_rsync_directory_commands_decode_paths(tmp_path, mocker):
+@mock.patch("tempfile.mkdtemp")
+@mock.patch("subprocess.check_call")
+def test_create_rsync_directory_commands_decode_paths(check_call, mkdtemp, tmp_path):
temp_dir = tmp_path / "tmp"
temp_dir.mkdir()
dest_dir = "/a/mock/path/"
- check_call = mocker.patch("subprocess.check_call")
- mocker.patch("tempfile.mkdtemp", return_value=str(temp_dir))
+ mkdtemp.return_value = str(temp_dir)
space = Space()
space.create_rsync_directory(dest_dir, "user", "host")
check_call.assert_has_calls(
[
- mocker.call(
+ mock.call(
[
"rsync",
"-vv",
@@ -573,7 +570,7 @@ def test_create_rsync_directory_commands_decode_paths(tmp_path, mocker):
"user@host:/",
]
),
- mocker.call(
+ mock.call(
[
"rsync",
"-vv",
@@ -584,7 +581,7 @@ def test_create_rsync_directory_commands_decode_paths(tmp_path, mocker):
"user@host:/a/",
]
),
- mocker.call(
+ mock.call(
[
"rsync",
"-vv",
@@ -595,7 +592,7 @@ def test_create_rsync_directory_commands_decode_paths(tmp_path, mocker):
"user@host:/a/mock/",
]
),
- mocker.call(
+ mock.call(
[
"rsync",
"-vv",