Skip to content

Commit

Permalink
improve lock handling and reduce number of API calls (#15)
Browse files Browse the repository at this point in the history
- detect non-existent lock file using backend-specific methods
  - TODO: implement specific checks for azure, ftp, google and sftp backends
- reduce number of calls to `isLocked()` to reduce the number of total
  backend-specific API calls (e.g. S3 API calls) and number of log messages
  • Loading branch information
pymonger authored May 27, 2020
1 parent 029b52b commit dd3f0d4
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 38 deletions.
8 changes: 1 addition & 7 deletions osaka/lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import os
import json
import socket
import traceback

# Py2k-3k imports
try:
Expand Down Expand Up @@ -102,12 +101,7 @@ def getLockMetadata(self, field):
try:
filelike = handle.get(self.luri)
return json.load(filelike).get(field, None)
except Exception as e:
osaka.utils.LOGGER.warning(
"Ignoring encountered exception: {}\n{}".format(
e, traceback.format_exc()
)
)
except osaka.utils.OsakaFileNotFound:
return None

def setLockMetadata(self, field, value):
Expand Down
9 changes: 8 additions & 1 deletion osaka/storage/az.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import urllib.parse
import datetime
import os.path
import traceback
import osaka.base
import osaka.utils
import osaka.storage.file
Expand Down Expand Up @@ -105,7 +106,13 @@ def get(self, uri):
pass
fh = open(fname, "r+b")
self.tmpfiles.append(fh)
self.service.get_blob_to_path(container, key, fname)
try:
self.service.get_blob_to_path(container, key, fname)
except Exception as e:
osaka.utils.LOGGER.warning(
"Encountered exception: {}\n{}".format(e, traceback.format_exc())
)
raise osaka.utils.OsakaFileNotFound("File {} doesn't exist.".format(uri))
fh.seek(0)
return fh

Expand Down
6 changes: 5 additions & 1 deletion osaka/storage/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ def get(self, uri):
"Non-absolute paths and non-null hostnames not supported with 'file://' schemed uris"
)
osaka.utils.LOGGER.debug("Getting stream from URI: {0}".format(uri))
fh = open(urllib.parse.urlparse(uri).path, "r")
path = urllib.parse.urlparse(uri).path
try:
fh = open(path, "r")
except FileNotFoundError:
raise osaka.utils.OsakaFileNotFound("File {} doesn't exist.".format(uri))
self.files.append(fh)
return fh

Expand Down
13 changes: 10 additions & 3 deletions osaka/storage/ftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import urllib.parse
import datetime
import os.path
import traceback
import osaka.base
import osaka.utils
import osaka.storage.file
Expand Down Expand Up @@ -69,9 +70,15 @@ def get(self, uri):
osaka.utils.LOGGER.debug("Getting stream from URI: {0}".format(uri))
filename = urllib.parse.urlparse(uri).path
fname = "/tmp/osaka-ftp-" + str(datetime.datetime.now())
with open(fname, "w") as tmpf:
self.ftp.retrbinary("RETR %s" % filename, tmpf.write)
fh = open(fname, "r+b")
try:
with open(fname, "w") as tmpf:
self.ftp.retrbinary("RETR %s" % filename, tmpf.write)
fh = open(fname, "r+b")
except Exception as e:
osaka.utils.LOGGER.warning(
"Encountered exception: {}\n{}".format(e, traceback.format_exc())
)
raise osaka.utils.OsakaFileNotFound("File {} doesn't exist.".format(uri))
self.tmpfiles.append(fh)
fh.seek(0)
return fh # obj.get()["Body"]
Expand Down
9 changes: 8 additions & 1 deletion osaka/storage/gs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

standard_library.install_aliases()
import re
import traceback
from google.cloud import storage
from google.cloud.exceptions import NotFound
import urllib.parse
Expand Down Expand Up @@ -85,7 +86,13 @@ def get(self, uri):
)
bucket = self.bucket(container, create=False)
blob = bucket.blob(key)
stream = StringIO(blob.download_as_string())
try:
stream = StringIO(blob.download_as_string())
except Exception as e:
osaka.utils.LOGGER.warning(
"Encountered exception: {}\n{}".format(e, traceback.format_exc())
)
raise osaka.utils.OsakaFileNotFound("File {} doesn't exist.".format(uri))
return stream

def put(self, stream, uri):
Expand Down
4 changes: 4 additions & 0 deletions osaka/storage/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ def get(self, uri, text=False):
osaka.utils.LOGGER.debug(
"Got HTTP status code: {}".format(response.status_code)
)

# catch 404 status code
if response.status_code == 404:
raise osaka.utils.OsakaFileNotFound("File {} doesn't exist.".format(uri))
response.raise_for_status()

# catch 202 status code
Expand Down
13 changes: 12 additions & 1 deletion osaka/storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
# S3 region info
S3_REGION_INFO = None

# regexes
NOT_FOUND_RE = re.compile(r"Not Found")


def get_region_info():
"""
Expand Down Expand Up @@ -144,7 +147,15 @@ def get(self, uri):
pass
fh = open(fname, "r+b")
self.tmpfiles.append(fh)
obj.download_fileobj(fh)
try:
obj.download_fileobj(fh)
except botocore.exceptions.ClientError as e:
if NOT_FOUND_RE.search(str(e)):
raise osaka.utils.OsakaFileNotFound(
"File {} doesn't exist.".format(uri)
)
else:
raise
fh.seek(0)
return fh # obj.get()["Body"]

Expand Down
10 changes: 9 additions & 1 deletion osaka/storage/sftp.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import stat
import urllib.parse
import paramiko
import traceback
import osaka.utils

"""
A backend used to handle stfp using parimiko
Expand Down Expand Up @@ -120,7 +122,13 @@ def get(self, url, path):
@param path - path to place fetched files
"""
rpath = urllib.parse.urlparse(url).path
self.sftp.get(rpath, path)
try:
self.sftp.get(rpath, path)
except Exception as e:
osaka.utils.LOGGER.warning(
"Encountered exception: {}\n{}".format(e, traceback.format_exc())
)
raise osaka.utils.OsakaFileNotFound("File {} doesn't exist.".format(url))

def rm(self, url):
"""
Expand Down
6 changes: 2 additions & 4 deletions osaka/tests/test_http.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import unittest
import requests.exceptions

import osaka.storage.http


Expand Down Expand Up @@ -36,8 +34,8 @@ def test_404_http_status(self):
storage_http = osaka.storage.http.HTTP()
storage_http.connect(test_url)
self.assertRaisesRegex(
requests.exceptions.HTTPError,
"404 Client Error.+$",
osaka.utils.OsakaFileNotFound,
"File {} doesn't exist.".format(test_url),
storage_http.get,
test_url,
)
Expand Down
40 changes: 21 additions & 19 deletions osaka/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,18 @@ def transfer(
)
osaka.utils.LOGGER.error(error)
raise osaka.utils.NoClobberException(error)
if slock.isLocked() and not force:
error = "Source {0} has not completed previous tranfer. Will not continue.".format(
source
)
osaka.utils.LOGGER.error(error)
raise osaka.utils.OsakaException(error)
elif slock.isLocked() and force:
error = "Source {0} has not completed previous tranfer. Will continue by force.".format(
source
)
osaka.utils.LOGGER.warning(error)
if slock.isLocked():
if force:
error = "Source {0} has not completed previous tranfer. Will continue by force.".format(
source
)
osaka.utils.LOGGER.warning(error)
else:
error = "Source {0} has not completed previous tranfer. Will not continue.".format(
source
)
osaka.utils.LOGGER.error(error)
raise osaka.utils.OsakaException(error)
osaka.utils.LOGGER.info(
"Transferring between {0} and {1}".format(source, dest)
)
Expand Down Expand Up @@ -171,14 +172,15 @@ def remove(self, uri, params={}, unlock=False, retries=0):
for retry in range(0, retries + 1):
try:
handle.connect(uri, params)
if not unlock and lock.isLocked():
error = "URI {0} has not completed previous tranfer. Will not continue.".format(
uri
)
osaka.utils.LOGGER.error(error)
raise osaka.utils.OsakaException(error)
elif lock.isLocked():
lock.unlock()
if lock.isLocked():
if not unlock:
error = "URI {0} has not completed previous tranfer. Will not continue.".format(
uri
)
osaka.utils.LOGGER.error(error)
raise osaka.utils.OsakaException(error)
else:
lock.unlock()

def remove_one(item):
""" Remove one item """
Expand Down
4 changes: 4 additions & 0 deletions osaka/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,3 +121,7 @@ class TimeoutException(OsakaException):

class NoClobberException(OsakaException):
pass


class OsakaFileNotFound(OsakaException):
pass

0 comments on commit dd3f0d4

Please sign in to comment.