Skip to content

Commit

Permalink
Corrected for fact that ActualObjectSize may not be provided by all S…
Browse files Browse the repository at this point in the history
…3 implementations.
  • Loading branch information
davidparks21 committed Jul 27, 2023
1 parent 5a321d0 commit aafcdcf
Showing 1 changed file with 5 additions and 169 deletions.
174 changes: 5 additions & 169 deletions smart_open/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -369,171 +369,6 @@ def _unwrap_ioerror(ioe):
return None


# class _SeekableRawReader(object):
# """Read an S3 object.
#
# This class is internal to the S3 submodule.
# """
#
# def __init__(
# self,
# client,
# bucket,
# key,
# stream_range,
# version_id=None,
# ):
# self._client = client
# self._bucket = bucket
# self._key = key
# self._version_id = version_id
#
# self._content_length = None
# self._position = 0
# self._body = None
#
# # the max_stream_size setting limits how much data will be read in a single HTTP request, this is an
# # important protection for the S3 server, ensuring the S3 server doesn't get an open-ended byte-range request
# # which can cause it to internally queue up a massive file when only a small bit of it may ultimately be
# # read by the user. The variable _stream_range_[from|to] tracks the range of bytes that can be read
# # from the current request body (e.g. from the same HTTP request). Note that the first read call
# # will always set the byte range header to exactly the read size, an optimization for uses cases in which a
# # single small read is performed against a large file (example: random sampling small data samples from
# # large files in machine learning contexts).
# self._stream_range = stream_range
# self._stream_range_from = None # a None value signifies the first call to `read` where this will be set
# self._stream_range_to = None
#
# def seek(self, offset, whence=constants.WHENCE_START):
# """Seek to the specified position.
#
# :param int offset: The offset in bytes.
# :param int whence: Where the offset is from.
#
# :returns: the position after seeking.
# :rtype: int
# """
# if whence not in constants.WHENCE_CHOICES:
# raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES)
# if whence == constants.WHENCE_END and offset > 0:
# raise ValueError('offset must be <= 0 when whence == WHENCE_END, got offset: ' + offset)
#
# if whence == constants.WHENCE_END and self._content_length is None:
# # WHENCE_END: head request is necessary to determine file length if it's not known yet
# # this is necessary to return the absolute position as specified by io.IOBase
# response = _head(self._client, self._bucket, self._key, self._version_id)
# _log_retry_attempts(response)
# self._content_length = int(response['ContentLength'])
# self._position = self._content_length + offset
# elif whence == constants.WHENCE_END:
# # WHENCE_END: we already have file length, no API call needed to compute the absolute position
# self._position = self._content_length + offset
# else:
# # WHENCE_START or WHENCE_CURRENT
# start = 0 if whence == constants.WHENCE_START else self._position
# self._position = start + offset
#
# return self._position
#
# def _open_body(self, start=None, stop=None):
# """Open a connection to download the specified range of bytes. Store
# the open file handle in self._body.
#
# If no range is specified, start defaults to self._position.
# start and stop follow the semantics of the http range header,
# so a stop without a start will read bytes beginning at stop.
#
# As a side effect, set self._content_length. Set self._position
# to self._content_length if start is past end of file.
# """
# if start is None and stop is None:
# start = self._position
# range_string = smart_open.utils.make_range_string(start, stop)
#
# try:
# # Optimistically try to fetch the requested content range.
# response = _get(
# self._client,
# self._bucket,
# self._key,
# self._version_id,
# range_string,
# )
# except IOError as ioe:
# # Handle requested content range exceeding content size.
# error_response = _unwrap_ioerror(ioe)
# if error_response is None or error_response.get('Code') != _OUT_OF_RANGE:
# raise
# self._position = self._content_length = int(error_response['ActualObjectSize'])
# self._body = io.BytesIO()
# else:
# _log_retry_attempts(response) # keep track of how many retries boto3 attempted
# units, start, stop, length = smart_open.utils.parse_content_range(response['ContentRange'])
# self._content_length = length
# self._position = start
# self._body = response['Body']
#
# def read(self, size=-1):
# """Read from the continuous connection with the remote peer."""
#
# # If we can figure out that we've read past the EOF, then we can save
# # an extra API call.
# reached_eof = True if self._content_length is not None and self._position >= self._content_length else False
#
# if reached_eof or size == 0:
# return b''
#
# if self._body is None:
# stop = None if size == -1 else self._position + size
# self._open_body(start=self._position, stop=stop)
#
# #
# # Boto3 has built-in error handling and retry mechanisms:
# #
# # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
# # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html
# #
# # Unfortunately, it isn't always enough. There is still a non-zero
# # possibility that an exception will slip past these mechanisms and
# # terminate the read prematurely. Luckily, at this stage, it's very
# # simple to recover from the problem: wait a little bit, reopen the
# # HTTP connection and try again. Usually, a single retry attempt is
# # enough to recover, but we try multiple times "just in case".
# #
# for attempt, seconds in enumerate([1, 2, 4, 8, 16], 1):
# try:
# if size == -1:
# binary = self._body.read()
# else:
# binary = self._body.read(size)
# except (
# ConnectionResetError,
# botocore.exceptions.BotoCoreError,
# urllib3.exceptions.HTTPError,
# ) as err:
# logger.warning(
# '%s: caught %r while reading %d bytes, sleeping %ds before retry',
# self,
# err,
# size,
# seconds,
# )
# time.sleep(seconds)
# self._open_body()
# else:
# self._position += len(binary)
# if self._optimize == 'reading' or (self._optimize == 'auto' and self._read_call_counter == 0):
# self._body.close()
# self._body = None
# self._read_call_counter += 0
# return binary
#
# raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt))
#
# def __str__(self):
# return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key)


def _initialize_boto3(client, client_kwargs, bucket, key):
"""Created the required objects for accessing S3. Ideally, they have
been already created for us and we can just reuse them."""
Expand Down Expand Up @@ -590,11 +425,11 @@ def __init__(
# which can cause it to internally queue up a massive file when only a small bit of it may ultimately be
# read by the user. The variable _stream_range_[from|to] tracks the range of bytes that can be read
# from the current request body (e.g. from the same HTTP request). Note that the first read call
# will always set the byte range header to exactly the read size, an optimization for uses cases in which a
# will always set the byte range header to exactly the read size, an optimization for use cases in which a
# single small read is performed against a large file (example: random sampling small data samples from
# large files in machine learning contexts).
self._stream_range = stream_range
self._stream_range_from = None # a None value signifies the first call to `read` where this will be set
self._stream_range_from = None # this will be set on the first call to `read`
self._stream_range_to = None

#
Expand Down Expand Up @@ -855,8 +690,9 @@ def _open_body(self, start=None, stop=None):
error_response = _unwrap_ioerror(ioe)
if error_response is None or error_response.get('Code') != _OUT_OF_RANGE:
raise
# self._position = self._content_length = int(error_response['ActualObjectSize'])
self._content_length = int(error_response['ActualObjectSize'])
if 'ActualObjectSize' in error_response:
# Note: Not all S3 implementations will return an ActualObjectSize
self._content_length = int(error_response['ActualObjectSize'])
self._body = io.BytesIO()
else:
self._log_retry_attempts(response) # keep track of how many retries boto3 attempted
Expand Down

0 comments on commit aafcdcf

Please sign in to comment.