Skip to content

Commit

Permalink
Stop tagging output files as temporary (#106)
Browse files Browse the repository at this point in the history
* stop tagging output files as intermediate

* linting

* even more linting

* bump version

* fix swipe intermediate test

* change tag to intermediate_output
  • Loading branch information
rzlim08 authored Mar 10, 2023
1 parent b7987f1 commit aa46c82
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 8 deletions.
18 changes: 12 additions & 6 deletions miniwdl-plugins/s3upload/miniwdl_s3upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def flag_temporary(s3uri):
Tagging={
'TagSet': [
{
'Key': 'swipe_temporary',
'Key': 'intermediate_output',
'Value': 'true'
},
]
Expand All @@ -93,7 +93,7 @@ def remove_temporary_flag(s3uri, retry=0):
)
remaining_tags = []
for tag in tags["TagSet"]:
if not (tag["Key"] == "swipe_temporary" and tag["Value"] == "true"):
if not (tag["Key"] == "intermediate_output" and tag["Value"] == "true"):
remaining_tags.append(tag)
try:
if remaining_tags:
Expand Down Expand Up @@ -219,7 +219,7 @@ def task(cfg, logger, run_id, run_dir, task, **recv):
recv = yield recv

def upload_file(abs_fn, s3uri):
s3cp(logger, abs_fn, s3uri)
s3cp(logger, abs_fn, s3uri, flag_temporary_file=True)
# record in _uploaded_files (keyed by inode, so that it can be found from any
# symlink or hardlink)
with _uploaded_files_lock:
Expand Down Expand Up @@ -339,13 +339,18 @@ def rewriter(fd):
elif output_file and output_file.startswith("s3://"):
remove_temporary_flag(output_file)

s3cp(logger, fn, os.environ.get("WDL_OUTPUT_URI", os.path.join(s3prefix, "outputs.s3.json")))
s3cp(
logger,
fn,
os.environ.get("WDL_OUTPUT_URI", os.path.join(s3prefix, "outputs.s3.json")),
flag_temporary_file=False
)


_s3parcp_lock = threading.Lock()


def s3cp(logger, fn, s3uri):
def s3cp(logger, fn, s3uri, flag_temporary_file=False):
with _s3parcp_lock:
# when uploading many small outputs from the same pipeline you end up with a
# quick intense burst of load that can bump into the S3 rate limit
Expand All @@ -363,4 +368,5 @@ def s3cp(logger, fn, s3uri):
)
)
raise WDL.Error.RuntimeError("failed: " + " ".join(cmd))
flag_temporary(s3uri)
if flag_temporary_file:
flag_temporary(s3uri)
2 changes: 1 addition & 1 deletion test/test_wdl.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ def test_temp_tag(self):
Key=f"{output_prefix}/test-temp-1/temporary.txt"
).get("TagSet", [])
self.assertEqual(len(temporary_tagset), 1)
self.assertEqual(temporary_tagset[0].get("Key"), "swipe_temporary")
self.assertEqual(temporary_tagset[0].get("Key"), "intermediate_output")
self.assertEqual(temporary_tagset[0].get("Value"), "true")

# test temporary tag got removed for output file
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.4.1
v1.4.2

0 comments on commit aa46c82

Please sign in to comment.