Skip to content

Commit

Permalink
Stop tagging array files (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
rzlim08 authored Mar 10, 2023
1 parent aa46c82 commit 47fc1fe
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 19 deletions.
40 changes: 22 additions & 18 deletions miniwdl-plugins/s3upload/miniwdl_s3upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,22 @@ def get_s3_get_prefix(cfg: config.Loader) -> str:
def flag_temporary(s3uri):
uri = urlparse(s3uri)
bucket, key = uri.hostname, uri.path[1:]
s3_client.put_object_tagging(
Bucket=bucket,
Key=key,
Tagging={
'TagSet': [
{
'Key': 'intermediate_output',
'Value': 'true'
},
]
},
)
try:
s3_client.put_object_tagging(
Bucket=bucket,
Key=key,
Tagging={
'TagSet': [
{
'Key': 'intermediate_output',
'Value': 'true'
},
]
},
)
except botocore.exceptions.ClientError:
# If we get throttled better not to tag the file at all
pass


def remove_temporary_flag(s3uri, retry=0):
Expand All @@ -104,7 +108,7 @@ def remove_temporary_flag(s3uri, retry=0):
'TagSet': remaining_tags
},
)
else:
elif len(tags["TagSet"]) > 0: # Delete tags if they exist
s3_client.delete_object_tagging(
Bucket=bucket,
Key=key,
Expand Down Expand Up @@ -218,8 +222,8 @@ def task(cfg, logger, run_id, run_dir, task, **recv):
# ignore command/runtime/container
recv = yield recv

def upload_file(abs_fn, s3uri):
s3cp(logger, abs_fn, s3uri, flag_temporary_file=True)
def upload_file(abs_fn, s3uri, flag_temporary_file=False):
s3cp(logger, abs_fn, s3uri, flag_temporary_file=flag_temporary_file)
# record in _uploaded_files (keyed by inode, so that it can be found from any
# symlink or hardlink)
with _uploaded_files_lock:
Expand Down Expand Up @@ -259,13 +263,13 @@ def _raise(ex):
for fn in files:
abs_fn = os.path.join(dn, fn)
s3uri = os.path.join(s3prefix, os.path.relpath(abs_fn, abs_output))
upload_file(abs_fn, s3uri)
upload_file(abs_fn, s3uri, flag_temporary_file=False)
elif len(output_contents) == 1 and os.path.isfile(output_contents[0]):
# file output
basename = os.path.basename(output_contents[0])
abs_fn = os.path.join(abs_output, basename)
s3uri = os.path.join(s3prefix, basename)
upload_file(abs_fn, s3uri)
upload_file(abs_fn, s3uri, flag_temporary_file=True)
else:
# file array output
assert all(os.path.basename(abs_fn).isdigit() for abs_fn in output_contents), output_contents
Expand All @@ -274,7 +278,7 @@ def _raise(ex):
assert len(fns) == 1
abs_fn = os.path.join(index_dir, fns[0])
s3uri = os.path.join(s3prefix, fns[0])
upload_file(abs_fn, s3uri)
upload_file(abs_fn, s3uri, flag_temporary_file=False)
yield recv


Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.4.2
v1.4.3

0 comments on commit 47fc1fe

Please sign in to comment.