diff --git a/miniwdl-plugins/s3upload/miniwdl_s3upload.py b/miniwdl-plugins/s3upload/miniwdl_s3upload.py index 0b6cd38c..06719e51 100644 --- a/miniwdl-plugins/s3upload/miniwdl_s3upload.py +++ b/miniwdl-plugins/s3upload/miniwdl_s3upload.py @@ -75,7 +75,7 @@ def flag_temporary(s3uri): Tagging={ 'TagSet': [ { - 'Key': 'swipe_temporary', + 'Key': 'intermediate_output', 'Value': 'true' }, ] @@ -93,7 +93,7 @@ def remove_temporary_flag(s3uri, retry=0): ) remaining_tags = [] for tag in tags["TagSet"]: - if not (tag["Key"] == "swipe_temporary" and tag["Value"] == "true"): + if not (tag["Key"] == "intermediate_output" and tag["Value"] == "true"): remaining_tags.append(tag) try: if remaining_tags: @@ -219,7 +219,7 @@ def task(cfg, logger, run_id, run_dir, task, **recv): recv = yield recv def upload_file(abs_fn, s3uri): - s3cp(logger, abs_fn, s3uri) + s3cp(logger, abs_fn, s3uri, flag_temporary_file=True) # record in _uploaded_files (keyed by inode, so that it can be found from any # symlink or hardlink) with _uploaded_files_lock: @@ -339,13 +339,18 @@ def rewriter(fd): elif output_file and output_file.startswith("s3://"): remove_temporary_flag(output_file) - s3cp(logger, fn, os.environ.get("WDL_OUTPUT_URI", os.path.join(s3prefix, "outputs.s3.json"))) + s3cp( + logger, + fn, + os.environ.get("WDL_OUTPUT_URI", os.path.join(s3prefix, "outputs.s3.json")), + flag_temporary_file=False + ) _s3parcp_lock = threading.Lock() -def s3cp(logger, fn, s3uri): +def s3cp(logger, fn, s3uri, flag_temporary_file=False): with _s3parcp_lock: # when uploading many small outputs from the same pipeline you end up with a # quick intense burst of load that can bump into the S3 rate limit @@ -363,4 +368,5 @@ def s3cp(logger, fn, s3uri): ) ) raise WDL.Error.RuntimeError("failed: " + " ".join(cmd)) - flag_temporary(s3uri) + if flag_temporary_file: + flag_temporary(s3uri) diff --git a/test/test_wdl.py b/test/test_wdl.py index 335633c0..419a7331 100644 --- a/test/test_wdl.py +++ b/test/test_wdl.py @@ -441,7 +441,7 @@ def test_temp_tag(self): Key=f"{output_prefix}/test-temp-1/temporary.txt" ).get("TagSet", []) self.assertEqual(len(temporary_tagset), 1) - self.assertEqual(temporary_tagset[0].get("Key"), "swipe_temporary") + self.assertEqual(temporary_tagset[0].get("Key"), "intermediate_output") self.assertEqual(temporary_tagset[0].get("Value"), "true") # test temporary tag got removed for output file diff --git a/version b/version index 9bdb566f..c432e90f 100644 --- a/version +++ b/version @@ -1 +1 @@ -v1.4.1 \ No newline at end of file +v1.4.2