From aafcdcfacd89233d749d1bb324c5f3bcd13de277 Mon Sep 17 00:00:00 2001 From: davidparks21 Date: Thu, 27 Jul 2023 08:30:00 -0700 Subject: [PATCH] Corrected for fact that ActualObjectSize may not be provided by all S3 implementations. --- smart_open/s3.py | 174 ++--------------------------------------------- 1 file changed, 5 insertions(+), 169 deletions(-) diff --git a/smart_open/s3.py b/smart_open/s3.py index b573de23..d01a3d8f 100644 --- a/smart_open/s3.py +++ b/smart_open/s3.py @@ -369,171 +369,6 @@ def _unwrap_ioerror(ioe): return None -# class _SeekableRawReader(object): -# """Read an S3 object. -# -# This class is internal to the S3 submodule. -# """ -# -# def __init__( -# self, -# client, -# bucket, -# key, -# stream_range, -# version_id=None, -# ): -# self._client = client -# self._bucket = bucket -# self._key = key -# self._version_id = version_id -# -# self._content_length = None -# self._position = 0 -# self._body = None -# -# # the max_stream_size setting limits how much data will be read in a single HTTP request, this is an -# # important protection for the S3 server, ensuring the S3 server doesn't get an open-ended byte-range request -# # which can cause it to internally queue up a massive file when only a small bit of it may ultimately be -# # read by the user. The variable _stream_range_[from|to] tracks the range of bytes that can be read -# # from the current request body (e.g. from the same HTTP request). Note that the first read call -# # will always set the byte range header to exactly the read size, an optimization for uses cases in which a -# # single small read is performed against a large file (example: random sampling small data samples from -# # large files in machine learning contexts). -# self._stream_range = stream_range -# self._stream_range_from = None # a None value signifies the first call to `read` where this will be set -# self._stream_range_to = None -# -# def seek(self, offset, whence=constants.WHENCE_START): -# """Seek to the specified position. -# -# :param int offset: The offset in bytes. -# :param int whence: Where the offset is from. -# -# :returns: the position after seeking. -# :rtype: int -# """ -# if whence not in constants.WHENCE_CHOICES: -# raise ValueError('invalid whence, expected one of %r' % constants.WHENCE_CHOICES) -# if whence == constants.WHENCE_END and offset > 0: -# raise ValueError('offset must be <= 0 when whence == WHENCE_END, got offset: ' + offset) -# -# if whence == constants.WHENCE_END and self._content_length is None: -# # WHENCE_END: head request is necessary to determine file length if it's not known yet -# # this is necessary to return the absolute position as specified by io.IOBase -# response = _head(self._client, self._bucket, self._key, self._version_id) -# _log_retry_attempts(response) -# self._content_length = int(response['ContentLength']) -# self._position = self._content_length + offset -# elif whence == constants.WHENCE_END: -# # WHENCE_END: we already have file length, no API call needed to compute the absolute position -# self._position = self._content_length + offset -# else: -# # WHENCE_START or WHENCE_CURRENT -# start = 0 if whence == constants.WHENCE_START else self._position -# self._position = start + offset -# -# return self._position -# -# def _open_body(self, start=None, stop=None): -# """Open a connection to download the specified range of bytes. Store -# the open file handle in self._body. -# -# If no range is specified, start defaults to self._position. -# start and stop follow the semantics of the http range header, -# so a stop without a start will read bytes beginning at stop. -# -# As a side effect, set self._content_length. Set self._position -# to self._content_length if start is past end of file. -# """ -# if start is None and stop is None: -# start = self._position -# range_string = smart_open.utils.make_range_string(start, stop) -# -# try: -# # Optimistically try to fetch the requested content range. -# response = _get( -# self._client, -# self._bucket, -# self._key, -# self._version_id, -# range_string, -# ) -# except IOError as ioe: -# # Handle requested content range exceeding content size. -# error_response = _unwrap_ioerror(ioe) -# if error_response is None or error_response.get('Code') != _OUT_OF_RANGE: -# raise -# self._position = self._content_length = int(error_response['ActualObjectSize']) -# self._body = io.BytesIO() -# else: -# _log_retry_attempts(response) # keep track of how many retries boto3 attempted -# units, start, stop, length = smart_open.utils.parse_content_range(response['ContentRange']) -# self._content_length = length -# self._position = start -# self._body = response['Body'] -# -# def read(self, size=-1): -# """Read from the continuous connection with the remote peer.""" -# -# # If we can figure out that we've read past the EOF, then we can save -# # an extra API call. -# reached_eof = True if self._content_length is not None and self._position >= self._content_length else False -# -# if reached_eof or size == 0: -# return b'' -# -# if self._body is None: -# stop = None if size == -1 else self._position + size -# self._open_body(start=self._position, stop=stop) -# -# # -# # Boto3 has built-in error handling and retry mechanisms: -# # -# # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html -# # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/retries.html -# # -# # Unfortunately, it isn't always enough. There is still a non-zero -# # possibility that an exception will slip past these mechanisms and -# # terminate the read prematurely. Luckily, at this stage, it's very -# # simple to recover from the problem: wait a little bit, reopen the -# # HTTP connection and try again. Usually, a single retry attempt is -# # enough to recover, but we try multiple times "just in case". -# # -# for attempt, seconds in enumerate([1, 2, 4, 8, 16], 1): -# try: -# if size == -1: -# binary = self._body.read() -# else: -# binary = self._body.read(size) -# except ( -# ConnectionResetError, -# botocore.exceptions.BotoCoreError, -# urllib3.exceptions.HTTPError, -# ) as err: -# logger.warning( -# '%s: caught %r while reading %d bytes, sleeping %ds before retry', -# self, -# err, -# size, -# seconds, -# ) -# time.sleep(seconds) -# self._open_body() -# else: -# self._position += len(binary) -# if self._optimize == 'reading' or (self._optimize == 'auto' and self._read_call_counter == 0): -# self._body.close() -# self._body = None -# self._read_call_counter += 0 -# return binary -# -# raise IOError('%s: failed to read %d bytes after %d attempts' % (self, size, attempt)) -# -# def __str__(self): -# return 'smart_open.s3._SeekableReader(%r, %r)' % (self._bucket, self._key) - - def _initialize_boto3(client, client_kwargs, bucket, key): """Created the required objects for accessing S3. Ideally, they have been already created for us and we can just reuse them.""" @@ -590,11 +425,11 @@ def __init__( # which can cause it to internally queue up a massive file when only a small bit of it may ultimately be # read by the user. The variable _stream_range_[from|to] tracks the range of bytes that can be read # from the current request body (e.g. from the same HTTP request). Note that the first read call - # will always set the byte range header to exactly the read size, an optimization for uses cases in which a + # will always set the byte range header to exactly the read size, an optimization for use cases in which a # single small read is performed against a large file (example: random sampling small data samples from # large files in machine learning contexts). self._stream_range = stream_range - self._stream_range_from = None # a None value signifies the first call to `read` where this will be set + self._stream_range_from = None # this will be set on the first call to `read` self._stream_range_to = None # @@ -855,8 +690,9 @@ def _open_body(self, start=None, stop=None): error_response = _unwrap_ioerror(ioe) if error_response is None or error_response.get('Code') != _OUT_OF_RANGE: raise - # self._position = self._content_length = int(error_response['ActualObjectSize']) - self._content_length = int(error_response['ActualObjectSize']) + if 'ActualObjectSize' in error_response: + # Note: Not all S3 implementations will return an ActualObjectSize + self._content_length = int(error_response['ActualObjectSize']) self._body = io.BytesIO() else: self._log_retry_attempts(response) # keep track of how many retries boto3 attempted