Skip to content

Commit

Permalink
Merge branch 'main' into prep-skydriver-2.0-2
Browse files Browse the repository at this point in the history
  • Loading branch information
ric-evans committed Oct 2, 2024
2 parents 9b36003 + c3b0755 commit 5d4a44e
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 61 deletions.
3 changes: 3 additions & 0 deletions skymap_scanner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
# HTTP source to download data from.
REMOTE_DATA_SOURCE: Final[str] = "http://prod-exe.icecube.wisc.edu"

REMOTE_DATA_DOWNLOAD_RETRIES: Final[int] = 2 # note: attempts = retries + 1
REMOTE_DATA_DOWNLOAD_TIMEOUT: Final[int] = 15 # sec

# Local ephemeral directory to stage files.
LOCAL_DATA_CACHE: Final[Path] = Path("./data-staging-cache")

Expand Down
67 changes: 47 additions & 20 deletions skymap_scanner/utils/data_handling.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
"""data_handling.py."""

import itertools
import logging
import subprocess
import time
from pathlib import Path
from typing import List

import requests

from .. import config as cfg

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -41,12 +44,12 @@ def stage_files(self, file_list: List[str]):
LOGGER.debug("File is available on staging path.")
else:
LOGGER.debug("Staging from HTTP source.")
self.fetch_file(basename)
self.download_file(basename)

else:
LOGGER.debug(f"File {basename} is available at {filepath}.")

def fetch_file(self, basename: str):
def download_file(self, basename: str):
"""Retrieves a file from the HTTP source.
Args:
Expand All @@ -55,24 +58,48 @@ def fetch_file(self, basename: str):
Raises:
RuntimeError: if the file retrieval fails.
"""
local_destination_path = self.staging_path / basename
http_source_path = f"{self.remote_path}/{basename}"
# not sure why we use the -O pattern here
cmd = [
"wget",
"-nv",
"-t",
"5",
"-O",
str(local_destination_path),
http_source_path,
]

subprocess.run(cmd, check=True)

if not local_destination_path.is_file():
dest = self.staging_path / basename
url = f"{self.remote_path}/{basename}"

def backoff_sleep(attempt: int):
"""Sleep with exponential backoff."""
sleep_duration = 2**attempt # Exponential backoff: 2, 4, 8 seconds...
LOGGER.info(f"Retrying file download in {sleep_duration} seconds...")
time.sleep(sleep_duration)

# Step 1: Download the file
for attempt in itertools.count(1):
if attempt > 1:
backoff_sleep(attempt)
# get
try:
response = requests.get(
url,
stream=True,
timeout=cfg.REMOTE_DATA_DOWNLOAD_TIMEOUT,
)
response.raise_for_status() # Check if the request was successful (2xx)
break
except requests.exceptions.RequestException as e:
if attempt > cfg.REMOTE_DATA_DOWNLOAD_RETRIES: # 'attempt' is 1-indexed
raise RuntimeError(
f"Download failed after {cfg.REMOTE_DATA_DOWNLOAD_RETRIES} retries: {e}"
) from e

# Step 2: Write the file
try:
with open(dest, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
except IOError as e:
raise RuntimeError(f"File download failed during file write: {e}") from e

# Step 3: Ensure the file was created successfully
if dest.is_file():
LOGGER.debug(f"File successfully created at {dest}.")
else:
raise RuntimeError(
f"Subprocess `wget` succeeded but the resulting file is invalid:\n-> {cmd}"
f"File download failed during file write (file is invalid):\n-> {dest}."
)

def get_filepath(self, filename: str) -> str:
Expand Down
96 changes: 55 additions & 41 deletions tests/file_staging.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,74 @@
"""Tests for file-staging logic."""

import logging
import subprocess
from typing import Dict


from skymap_scanner.utils.data_handling import DataStager
from skymap_scanner import config as cfg
from skymap_scanner.utils.data_handling import DataStager

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)


# Build list of all local files.
local_file_list = []
for path in cfg.LOCAL_DATA_SOURCES:
subpath = path / cfg.LOCAL_SPLINE_SUBDIR
directory_content = subpath.glob("*")
for path in directory_content:
if path.is_file(): # skip directories
local_file_list.append(path.name) # store filename without path
def test_file_staging() -> None:

# Declare at least one filename only expected to be available remotely.
remote_file_list = ["README"]
# Build list of all local files.
local_file_list = []
for path in cfg.LOCAL_DATA_SOURCES:
subpath = path / cfg.LOCAL_SPLINE_SUBDIR
directory_content = subpath.glob("*")
for path in directory_content:
if path.is_file(): # skip directories
local_file_list.append(path.name) # store filename without path

# Declare at least one filename that does not exist.
invalid_file_list = ["NONEXISTENT_FILE"]
# Declare at least one filename only expected to be available remotely.
remote_file_list = ["README"]

datastager = DataStager(
local_paths=cfg.LOCAL_DATA_SOURCES,
local_subdir=cfg.LOCAL_SPLINE_SUBDIR,
remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}",
)
# Declare at least one filename that does not exist.
invalid_file_list = ["NONEXISTENT_FILE"]

for file_list in [local_file_list, remote_file_list, invalid_file_list]:
try:
datastager = DataStager(
local_paths=cfg.LOCAL_DATA_SOURCES,
local_subdir=cfg.LOCAL_SPLINE_SUBDIR,
remote_path=f"{cfg.REMOTE_DATA_SOURCE}/{cfg.REMOTE_SPLINE_SUBDIR}",
)

# test stage_files()
# -> OK
for file_list in [local_file_list, remote_file_list]:
datastager.stage_files(file_list)
except subprocess.CalledProcessError:
logger.debug(f"Staging failed as expected for invalid file.")
# -> ERROR
try:
datastager.stage_files(invalid_file_list)
except Exception as e:
assert isinstance(e, RuntimeError)
assert str(e) == (
f"Download failed after {cfg.REMOTE_DATA_DOWNLOAD_RETRIES} retries: "
f"404 Client Error: Not Found for url: {datastager.remote_path}/{invalid_file_list[0]}"
)

# ensure that filepaths can be retrieved for all local files
local_filepaths: Dict[str, str] = dict()
for filename in local_file_list:
logger.debug(f"Testing local file: {filename}.")
local_filepaths[filename] = datastager.get_local_filepath(filename)
assert local_filepaths[filename] == datastager.get_filepath(filename)
logger.debug(f"File available at {local_filepaths[filename]}.")

# ensure that filepaths can be retrieved for all local files
local_filepaths: Dict[str, str] = dict()
for filename in local_file_list:
logger.debug(f"Testing local file: {filename}.")
local_filepaths[filename] = datastager.get_local_filepath(filename)
assert local_filepaths[filename] == datastager.get_filepath(filename)
logger.debug(f"File available at {local_filepaths[filename]}.")
for filename in remote_file_list:
logger.debug(f"Testing staging of remote file: {filename}")
filepath: str = datastager.get_filepath(filename)
logger.debug(f"File available at {filepath}.")

for filename in remote_file_list:
logger.debug(f"Testing staging of remote file: {filename}")
filepath: str = datastager.get_filepath(filename)
logger.debug(f"File available at {filepath}.")
for filename in invalid_file_list:
logger.debug(f"Testing staging of remote file: {filename}")
try:
filepath = datastager.get_filepath(filename)
except FileNotFoundError:
logger.debug(f"File not available as expected.")
else:
assert 0 # we shouldn't get here!


for filename in invalid_file_list:
logger.debug(f"Testing staging of remote file: {filename}")
try:
filepath = datastager.get_filepath(filename)
except FileNotFoundError:
logger.debug(f"File not available as expected.")
if __name__ == "__main__":
test_file_staging()

0 comments on commit 5d4a44e

Please sign in to comment.