diff --git a/fs_s3fs/_s3fs_file.py b/fs_s3fs/_s3fs_file.py index ed18f4e..36224a0 100644 --- a/fs_s3fs/_s3fs_file.py +++ b/fs_s3fs/_s3fs_file.py @@ -139,12 +139,12 @@ def __init__( self._object = s3_object self._min_part_size = min_part_size - self._mp = self._object.initiate_multipart_upload(**self._upload_kwargs) - - self._buf = b'' + self._mp = None + self._buf = bytearray() self._total_bytes = 0 self._total_parts = 0 self._parts = [] + self._closed = False # # This member is part of the io.BufferedIOBase interface. @@ -156,7 +156,7 @@ def flush(self): @property def closed(self): - return self._mp is None + return self._closed def writable(self): """Return True if the stream supports writing.""" @@ -180,10 +180,7 @@ def write(self, b): There's buffering happening under the covers, so this may not actually do any HTTP transfer right away.""" - if self._buf: - self._buf += b - else: - self._buf = b + self._buf.extend(b) length = len(b) self._total_bytes += length @@ -199,36 +196,41 @@ def close(self): if tuple(sys.exc_info()) != (None, None, None): self.terminate() + self._closed = True return - if self._buf: - self._upload_next_part() - - if self._total_bytes: - self._mp.complete(MultipartUpload={"Parts": self._parts}) - logger.debug("completed multipart upload") - else: + if self._total_bytes < self._min_part_size: + # if we wrote less than min_part_size bytes + # then directly put buffer contents instead of starting + # multipart upload. It also fixes following: # # AWS complains with "The XML you provided was not well-formed or # did not validate against our published schema" when the input is # completely empty => abort the upload, no file created. # - # We work around this by creating an empty file explicitly. - # - logger.debug("empty input, ignoring multipart upload") - self.terminate() - self._object.put(Body=b"", **self._upload_kwargs) + assert not self._mp + logger.debug("small input, ignoring multipart upload") + self._object.put(Body=self._buf, **self._upload_kwargs) + else: + if self._buf: + self._upload_next_part() + self._mp.complete(MultipartUpload={"Parts": self._parts}) + logger.debug("completed multipart upload") + self._mp = None + self._closed = True logger.debug("successfully closed") @check_if_open def terminate(self): - """Cancel the underlying multipart upload.""" - assert self._mp, "no multipart upload in progress" - self._mp.abort() - self._mp = None + """Cancel the underlying multipart upload if any""" + if self._mp: + self._mp.abort() + self._mp = None def _upload_next_part(self): + if not self._mp: + self._mp = self._object.initiate_multipart_upload(**self._upload_kwargs) part_num = self._total_parts + 1 logger.info( "uploading part #%i, %i bytes (total %.3fGB)", @@ -242,4 +244,4 @@ def _upload_next_part(self): logger.debug("upload of part #%i finished" % part_num) self._total_parts += 1 - self._buf = bytes() + self._buf.clear()