Skip to content

Commit

Permalink
Replace deprecated functions with new APIs
Browse files Browse the repository at this point in the history
The `cgi` Python module has been deprecated in PEP 594. In particular,
the `parse_header` function has been deprecated in favor of the new
`email.message.Message` API. This commit replaces all occurrences of
`parse_header` with the new API. This commit also adds a test for the
`remotepath.download` API to avoid regressions.

As of Python 3.3, the `logging.warn` API has been deprected in favor of
the `logging.warning` method. This commit substitutes all occurrences of
the `warn()` method with the equivalent `warning()` API.
  • Loading branch information
GlassOfWhiskey committed Nov 26, 2023
1 parent 05dcd32 commit 4450053
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 161 deletions.
2 changes: 1 addition & 1 deletion streamflow/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(self, name: str, config: MutableMapping[str, Any]) -> None:
self.deplyoments = config.get("models", {})
if self.deplyoments:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"The `models` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `deployments` instead."
)
Expand Down
2 changes: 1 addition & 1 deletion streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,7 @@ def _translate_command_line_tool(
# Otherwise, throw a warning and skip the DockerRequirement conversion
else:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
f"Skipping DockerRequirement conversion for step `{name_prefix}` "
f"when executing on `{target.deployment.name}` deployment."
)
Expand Down
27 changes: 17 additions & 10 deletions streamflow/data/remotepath.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from __future__ import annotations

import asyncio
import cgi
import errno
import glob
import hashlib
import os
import posixpath
import shutil
from email.message import Message
from typing import MutableSequence, TYPE_CHECKING

import aiohttp
from aiohttp import ClientResponse

from streamflow.core import utils
from streamflow.core.context import StreamFlowContext
Expand Down Expand Up @@ -41,6 +42,15 @@ def _file_checksum_local(path: str) -> str:
return sha1_checksum.hexdigest()


def _get_filename_from_response(response: ClientResponse, url: str):
if cd_header := response.headers.get("Content-Disposition"):
message = Message()
message["Content-Disposition"] = cd_header
if filename := message.get_param("filename"):
return filename
return url.rsplit("/", 1)[-1]


def _listdir_local(path: str, file_type: FileType | None) -> MutableSequence[str]:
content = []
dir_content = os.listdir(path)
Expand Down Expand Up @@ -95,25 +105,22 @@ async def download(
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
_, params = cgi.parse_header(
response.headers.get("Content-Disposition", "")
filepath = os.path.join(
parent_dir, _get_filename_from_response(response, url)
)
filepath = os.path.join(parent_dir, params["filename"])
content = await response.read()
with open(filepath, mode="wb") as f:
f.write(await response.read())
else:
raise Exception(
f"Downloading {url} failed with status {response.status}:\n{response.content}"
)
with open(filepath, mode="wb") as f:
f.write(content)
else:
async with aiohttp.ClientSession() as session:
async with session.head(url, allow_redirects=True) as response:
if response.status == 200:
_, params = cgi.parse_header(
response.headers.get("Content-Disposition", "")
filepath = posixpath.join(
parent_dir, _get_filename_from_response(response, url)
)
filepath = posixpath.join(parent_dir, params["filename"])
else:
raise Exception(
f"Downloading {url} failed with status {response.status}:\n{response.content}"
Expand Down
4 changes: 2 additions & 2 deletions streamflow/deployment/connector/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def __init__(
cacheSize = resourcesCacheSize
if cacheSize is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"The `resourcesCacheSize` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locationsCacheSize` instead."
)
Expand All @@ -134,7 +134,7 @@ def __init__(
cacheTTL = resourcesCacheTTL
if cacheTTL is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"The `resourcesCacheTTL` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locationsCacheTTL` instead."
)
Expand Down
4 changes: 2 additions & 2 deletions streamflow/deployment/connector/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def __init__(
cacheSize = resourcesCacheSize
if cacheSize is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"The `resourcesCacheSize` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locationsCacheSize` instead."
)
Expand All @@ -200,7 +200,7 @@ def __init__(
cacheTTL = resourcesCacheTTL
if cacheTTL is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"The `resourcesCacheTTL` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locationsCacheTTL` instead."
)
Expand Down
8 changes: 4 additions & 4 deletions streamflow/deployment/connector/queue_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def __init__(
self._inner_ssh_connector: bool = False
if hostname is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"Inline SSH options are deprecated and will be removed in StreamFlow 0.3.0. "
f"Define a standalone `SSHConnector` and link the `{self.__class__.__name__}` "
"to it using the `wraps` property."
Expand Down Expand Up @@ -375,7 +375,7 @@ def __init__(
files_map[name] = f.read()
if file is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"The `file` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `services` instead."
)
Expand Down Expand Up @@ -553,12 +553,12 @@ async def undeploy(self, external: bool) -> None:
self.scheduledJobs = {}
if self._inner_ssh_connector:
if logger.isEnabledFor(logging.INFO):
logger.warn(
logger.warning(
f"UNDEPLOYING inner SSH connector for {self.deployment_name} deployment."
)
await self.connector.undeploy(external)
if logger.isEnabledFor(logging.INFO):
logger.warn(
logger.warning(
f"COMPLETED Undeployment of inner SSH connector for {self.deployment_name} deployment."
)

Expand Down
2 changes: 1 addition & 1 deletion streamflow/deployment/connector/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def __init__(
services_map[name] = f.read()
if file is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"The `file` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `services` instead."
)
Expand Down
2 changes: 1 addition & 1 deletion streamflow/deployment/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ async def _inner_deploy(
else:
if deployment_config.wraps is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
f"The `wraps` directive has no effect on deployment {deployment_config.name}, "
f"as the `{deployment_config.type}` connector does not inherit from the ConnectorWrapper class."
)
Expand Down
4 changes: 2 additions & 2 deletions streamflow/deployment/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_binding_config(
else:
target_deployment = workflow_config.deplyoments[target["model"]]
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"The `model` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `deployment` instead."
)
Expand All @@ -40,7 +40,7 @@ def get_binding_config(
locations = target.get("resources")
if locations is not None:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"The `resources` keyword is deprecated and will be removed in StreamFlow 0.3.0. "
"Use `locations` instead."
)
Expand Down
2 changes: 1 addition & 1 deletion streamflow/ext/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _register(self, name: str, cls: type, extension_point: str):
)
if name in extension_points[extension_point]:
if logger.isEnabledFor(logging.WARN):
logger.warn(
logger.warning(
"{} is already installed and will be overridden by {}".format(
name, self.__class__.__module__ + "." + self.__class__.__name__
)
Expand Down
6 changes: 3 additions & 3 deletions streamflow/provenance/run_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ async def add_file(self, file: MutableMapping[str, str]) -> None:
dst_parent = posixpath.dirname(posixpath.normpath(dst))
if src in self.files_map:
if logger.isEnabledFor(logging.WARN):
logger.warn(f"File {src} is already present in the archive.")
logger.warning(f"File {src} is already present in the archive.")
else:
if os.path.isfile(os.path.realpath(src)):
checksum = _file_checksum(
Expand Down Expand Up @@ -681,7 +681,7 @@ async def add_property(self, key: str, value: str):
current_obj = current_obj[k]
if logger.isEnabledFor(logging.WARN):
if keys[-1] in current_obj:
logger.warn(
logger.warning(
f"Key {key} already exists in archive manifest and will be overridden."
)
value = ESCAPED_EQUAL.sub("=", value)
Expand Down Expand Up @@ -889,7 +889,7 @@ async def create_archive(
if dst not in archive.namelist():
archive.write(src, dst)
else:
logger.warn(f"File {src} does not exist.")
logger.warning(f"File {src} does not exist.")
print(f"Successfully created run_crate archive at {path}")

@abstractmethod
Expand Down
148 changes: 84 additions & 64 deletions tests/test_remotepath.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import posixpath
import tempfile

import pytest
import pytest_asyncio
Expand All @@ -7,6 +8,7 @@
from streamflow.core.data import FileType
from streamflow.core.deployment import Connector, Location
from streamflow.data import remotepath
from streamflow.deployment.connector import LocalConnector
from streamflow.deployment.utils import get_path_processor
from tests.utils.deployment import get_location

Expand All @@ -21,6 +23,88 @@ def connector(context, location) -> Connector:
return context.deployment_manager.get_connector(location.deployment)


@pytest.mark.asyncio
async def test_directory(context, connector, location):
"""Test directory creation and deletion."""
path = utils.random_name()
try:
await remotepath.mkdir(connector, [location], path)
assert await remotepath.exists(connector, location, path)
assert await remotepath.isdir(connector, location, path)
# ./
# file1.txt
# file2.csv
# dir1/
# dir2/
await remotepath.mkdirs(
connector,
[location],
[posixpath.join(path, "dir1"), posixpath.join(path, "dir2")],
)
await remotepath.write(
connector, location, posixpath.join(path, "file1.txt"), "StreamFlow"
)
await remotepath.write(
connector, location, posixpath.join(path, "file2.csv"), "StreamFlow"
)
files = await remotepath.listdir(connector, location, path, FileType.FILE)
assert len(files) == 2
assert posixpath.join(path, "file1.txt") in files
assert posixpath.join(path, "file2.csv") in files
dirs = await remotepath.listdir(connector, location, path, FileType.DIRECTORY)
assert len(dirs) == 2
assert posixpath.join(path, "dir1") in dirs
assert posixpath.join(path, "dir2") in dirs
await remotepath.rm(connector, location, path)
assert not await remotepath.exists(connector, location, path)
finally:
if await remotepath.exists(connector, location, path):
await remotepath.rm(connector, location, path)


@pytest.mark.asyncio
async def test_download(context, connector, location):
"""Test remote file download."""
url1 = "https://raw.githubusercontent.com/alpha-unito/streamflow/master/LICENSE"
url2 = "https://github.com/alpha-unito/streamflow/archive/refs/tags/0.1.6.zip"
parent_dir = (
tempfile.gettempdir() if isinstance(connector, LocalConnector) else "/tmp"
)

for url in [url1, url2]:
try:
path = await remotepath.download(connector, [location], url, parent_dir)
assert await remotepath.exists(connector, location, path)
finally:
if await remotepath.exists(connector, location, path):
await remotepath.rm(connector, location, path)


@pytest.mark.asyncio
async def test_file(context, connector, location):
"""Test file creation, size, checksum and deletion."""
path = utils.random_name()
path2 = utils.random_name()
try:
await remotepath.write(connector, location, path, "StreamFlow")
assert await remotepath.exists(connector, location, path)
assert await remotepath.isfile(connector, location, path)
assert await remotepath.size(connector, location, path) == 10
await remotepath.write(connector, location, path2, "CWL")
assert await remotepath.exists(connector, location, path2)
assert await remotepath.size(connector, location, [path, path2]) == 13
digest = await remotepath.checksum(context, connector, location, path)
assert digest == "e8abb7445e1c4061c3ef39a0e1690159b094d3b5"
await remotepath.rm(connector, location, [path, path2])
assert not await remotepath.exists(connector, location, path)
assert not await remotepath.exists(connector, location, path2)
finally:
if await remotepath.exists(connector, location, path):
await remotepath.rm(connector, location, path)
if await remotepath.exists(connector, location, path2):
await remotepath.rm(connector, location, path2)


@pytest.mark.asyncio
async def test_resolve(context, connector, location):
"""Test glob resolution."""
Expand Down Expand Up @@ -93,70 +177,6 @@ async def test_resolve(context, connector, location):
await remotepath.rm(connector, location, path)


@pytest.mark.asyncio
async def test_directory(context, connector, location):
"""Test directory creation and deletion."""
path = utils.random_name()
try:
await remotepath.mkdir(connector, [location], path)
assert await remotepath.exists(connector, location, path)
assert await remotepath.isdir(connector, location, path)
# ./
# file1.txt
# file2.csv
# dir1/
# dir2/
await remotepath.mkdirs(
connector,
[location],
[posixpath.join(path, "dir1"), posixpath.join(path, "dir2")],
)
await remotepath.write(
connector, location, posixpath.join(path, "file1.txt"), "StreamFlow"
)
await remotepath.write(
connector, location, posixpath.join(path, "file2.csv"), "StreamFlow"
)
files = await remotepath.listdir(connector, location, path, FileType.FILE)
assert len(files) == 2
assert posixpath.join(path, "file1.txt") in files
assert posixpath.join(path, "file2.csv") in files
dirs = await remotepath.listdir(connector, location, path, FileType.DIRECTORY)
assert len(dirs) == 2
assert posixpath.join(path, "dir1") in dirs
assert posixpath.join(path, "dir2") in dirs
await remotepath.rm(connector, location, path)
assert not await remotepath.exists(connector, location, path)
finally:
if await remotepath.exists(connector, location, path):
await remotepath.rm(connector, location, path)


@pytest.mark.asyncio
async def test_file(context, connector, location):
"""Test file creation, size, checksum and deletion."""
path = utils.random_name()
path2 = utils.random_name()
try:
await remotepath.write(connector, location, path, "StreamFlow")
assert await remotepath.exists(connector, location, path)
assert await remotepath.isfile(connector, location, path)
assert await remotepath.size(connector, location, path) == 10
await remotepath.write(connector, location, path2, "CWL")
assert await remotepath.exists(connector, location, path2)
assert await remotepath.size(connector, location, [path, path2]) == 13
digest = await remotepath.checksum(context, connector, location, path)
assert digest == "e8abb7445e1c4061c3ef39a0e1690159b094d3b5"
await remotepath.rm(connector, location, [path, path2])
assert not await remotepath.exists(connector, location, path)
assert not await remotepath.exists(connector, location, path2)
finally:
if await remotepath.exists(connector, location, path):
await remotepath.rm(connector, location, path)
if await remotepath.exists(connector, location, path2):
await remotepath.rm(connector, location, path2)


@pytest.mark.asyncio
async def test_symlink(context, connector, location):
"""Test symlink creation, resolution and deletion."""
Expand Down
Empty file added tests/utils/__init__.py
Empty file.
Loading

0 comments on commit 4450053

Please sign in to comment.