Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Jan 29, 2025
1 parent 89823bb commit d1d1696
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 38 deletions.
35 changes: 18 additions & 17 deletions streamflow/cwl/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,14 @@ async def _process_file_token(
for t in token_value["listing"]
)
)
if filepath:
for file in listing:
await register_data(
context=streamflow_context,
connector=connector,
locations=locations,
base_path=job.output_directory,
token_value=file,
)
for file in listing:
await register_data(
context=streamflow_context,
connector=connector,
locations=locations,
base_path=job.output_directory,
token_value=file,
)
new_token_value |= {"listing": listing}
return new_token_value

Expand Down Expand Up @@ -533,20 +532,22 @@ async def _update_listing(
existing = []
tasks = []
for element in token_value["listing"]:
if src_location and self.workflow.context.data_manager.get_data_locations(
path=element["path"],
deployment=src_location.deployment,
location_name=src_location.name,
# data_type=DataType.PRIMARY # todo: is this param needed?
if (
src_location
and self.workflow.context.data_manager.get_data_locations(
path=element["path"],
deployment=src_location.deployment,
location_name=src_location.name,
)
):
# adjust the path
existing.append(
utils.remap_file_value(
utils.remap_token_value(
path_processor=get_path_processor(
self.workflow.context.scheduler.get_connector(job.name)
),
output_directory=token_value["path"],
new_dir=dest_path,
old_dir=token_value["path"],
new_dir=str(dest_path),
value=element,
)
)
Expand Down
10 changes: 6 additions & 4 deletions streamflow/cwl/translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
LoadListing,
SecondaryFile,
process_embedded_tool,
remap_file_value,
remap_token_value,
resolve_dependencies,
)
from streamflow.cwl.workflow import CWLWorkflow
Expand Down Expand Up @@ -1500,9 +1500,11 @@ def _inject_input(
is not None
):
path_processor = os.path if target.deployment.type == "local" else posixpath
value = remap_file_value(
path_processor, output_directory, target.workdir, value
value = remap_token_value(
path_processor, output_directory, target.workdir, _inject_value(value)
)
else:
value = _inject_value(value)
# Create a schedule step and connect it to the local DeployStep
schedule_step = workflow.create_step(
cls=ScheduleStep,
Expand All @@ -1522,7 +1524,7 @@ def _inject_input(
)
# Create an input port and inject values
input_port = workflow.create_port()
input_port.put(Token(value=_inject_value(value)))
input_port.put(Token(value=value))
input_port.put(TerminationToken())
# Connect input and output ports to the injector step
injector_step.add_input_port(port_name, input_port)
Expand Down
27 changes: 14 additions & 13 deletions streamflow/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -848,8 +848,9 @@ async def register_data(
elif "listing" in token_value:
paths.extend(
[
t["path"] if "path" in t else t["location"]
t.get("path", t["location"])
for t in token_value["listing"]
if "path" in t or "location" in t
]
)
if "secondaryFiles" in token_value:
Expand Down Expand Up @@ -1101,12 +1102,12 @@ def remap_path(
)


def remap_file_value(
path_processor: ModuleType, output_directory: str, new_dir, value: Any
def remap_token_value(
path_processor: ModuleType, old_dir: str, new_dir: str, value: Any
) -> Any:
if isinstance(value, MutableSequence):
return [
remap_file_value(path_processor, output_directory, new_dir, v)
remap_token_value(path_processor, old_dir, new_dir, v)
for v in value
]
elif isinstance(
Expand All @@ -1116,25 +1117,25 @@ def remap_file_value(
value.path = remap_path(
path_processor=path_processor,
path=value.path,
old_dir=output_directory,
old_dir=old_dir,
new_dir=new_dir,
)
if value.location:
value.location = remap_path(
path_processor=path_processor,
path=value.location,
old_dir=output_directory,
old_dir=old_dir,
new_dir=new_dir,
)
if isinstance(value, get_args(cwl_utils.parser.File)):
if value.secondaryFiles:
value.secondaryFiles = [
remap_file_value(path_processor, output_directory, new_dir, sf)
remap_token_value(path_processor, old_dir, new_dir, sf)
for sf in value.secondaryFiles
]
elif value.listing:
value.listing = [
remap_file_value(path_processor, output_directory, new_dir, sf)
remap_token_value(path_processor, old_dir, new_dir, sf)
for sf in value.listing
]
return value
Expand All @@ -1144,30 +1145,30 @@ def remap_file_value(
value["location"] = remap_path(
path_processor=path_processor,
path=value["location"],
old_dir=output_directory,
old_dir=old_dir,
new_dir=new_dir,
)
if "path" in value:
value["path"] = remap_path(
path_processor=path_processor,
path=value["path"],
old_dir=output_directory,
old_dir=old_dir,
new_dir=new_dir,
)
if "secondaryFiles" in value:
value["secondaryFiles"] = [
remap_file_value(path_processor, output_directory, new_dir, sf)
remap_token_value(path_processor, old_dir, new_dir, sf)
for sf in value["secondaryFiles"]
]
if "listing" in value:
value["listing"] = [
remap_file_value(path_processor, output_directory, new_dir, sf)
remap_token_value(path_processor, old_dir, new_dir, sf)
for sf in value["listing"]
]
return value
else:
return {
k: remap_file_value(path_processor, output_directory, new_dir, v)
k: remap_token_value(path_processor, old_dir, new_dir, v)
for k, v in value.items()
}
else:
Expand Down
6 changes: 2 additions & 4 deletions tests/test_translator.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,8 @@ async def test_inject_remote_input(context: StreamFlowContext, config: str) -> N

# Check input tokens
input_tokens = input_injector_step.get_input_port(port_name).token_list
assert (
input_tokens[0].value["class"] == file_type
and input_tokens[0].value["path"] == cwl_inputs[port_name].path
)
assert input_tokens[0].value["class"] == file_type
assert input_tokens[0].value["path"] == str(remote_path)
assert isinstance(input_tokens[1], TerminationToken)

# Execute workflow
Expand Down

0 comments on commit d1d1696

Please sign in to comment.