Skip to content

Commit

Permalink
do not start multipart upload for small writes
Browse files Browse the repository at this point in the history
  • Loading branch information
mrk-its committed Nov 17, 2019
1 parent 3696a90 commit 695b879
Showing 1 changed file with 27 additions and 25 deletions.
52 changes: 27 additions & 25 deletions fs_s3fs/_s3fs_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,12 @@ def __init__(

self._object = s3_object
self._min_part_size = min_part_size
self._mp = self._object.initiate_multipart_upload(**self._upload_kwargs)

self._buf = b''
self._mp = None
self._buf = bytearray()
self._total_bytes = 0
self._total_parts = 0
self._parts = []
self._closed = False

#
# This member is part of the io.BufferedIOBase interface.
Expand All @@ -156,7 +156,7 @@ def flush(self):

@property
def closed(self):
return self._mp is None
return self._closed

def writable(self):
"""Return True if the stream supports writing."""
Expand All @@ -180,10 +180,7 @@ def write(self, b):
There's buffering happening under the covers, so this may not actually
do any HTTP transfer right away."""

if self._buf:
self._buf += b
else:
self._buf = b
self._buf.extend(b)

length = len(b)
self._total_bytes += length
Expand All @@ -199,36 +196,41 @@ def close(self):

if tuple(sys.exc_info()) != (None, None, None):
self.terminate()
self._closed = True
return

if self._buf:
self._upload_next_part()

if self._total_bytes:
self._mp.complete(MultipartUpload={"Parts": self._parts})
logger.debug("completed multipart upload")
else:
if self._total_bytes < self._min_part_size:
# if we wrote less than min_part_size bytes
# then directly put buffer contents instead of starting
# multipart upload. It also fixes following:
#
# AWS complains with "The XML you provided was not well-formed or
# did not validate against our published schema" when the input is
# completely empty => abort the upload, no file created.
#
# We work around this by creating an empty file explicitly.
#
logger.debug("empty input, ignoring multipart upload")
self.terminate()
self._object.put(Body=b"", **self._upload_kwargs)
assert not self._mp
logger.debug("small input, ignoring multipart upload")
self._object.put(Body=self._buf, **self._upload_kwargs)
else:
if self._buf:
self._upload_next_part()
self._mp.complete(MultipartUpload={"Parts": self._parts})
logger.debug("completed multipart upload")

self._mp = None
self._closed = True
logger.debug("successfully closed")

@check_if_open
def terminate(self):
"""Cancel the underlying multipart upload."""
assert self._mp, "no multipart upload in progress"
self._mp.abort()
self._mp = None
"""Cancel the underlying multipart upload if any"""
if self._mp:
self._mp.abort()
self._mp = None

def _upload_next_part(self):
if not self._mp:
self._mp = self._object.initiate_multipart_upload(**self._upload_kwargs)
part_num = self._total_parts + 1
logger.info(
"uploading part #%i, %i bytes (total %.3fGB)",
Expand All @@ -242,4 +244,4 @@ def _upload_next_part(self):
logger.debug("upload of part #%i finished" % part_num)

self._total_parts += 1
self._buf = bytes()
self._buf.clear()

0 comments on commit 695b879

Please sign in to comment.