Skip to content

Commit

Permalink
FIXED data type when inner location are involved
Browse files Browse the repository at this point in the history
  • Loading branch information
LanderOtto committed Jan 31, 2025
1 parent 7b1e02b commit 9351079
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 30 deletions.
37 changes: 27 additions & 10 deletions streamflow/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,10 +563,10 @@ async def get_file_token(
) -> MutableMapping[str, Any]:
path_processor = get_path_processor(connector)
basename = basename or path_processor.basename(filepath)
location = "".join(["file://", urllib.parse.quote(filepath)])
file_location = "".join(["file://", urllib.parse.quote(filepath)])
token = {
"class": token_class,
"location": location,
"location": file_location,
"basename": basename,
"path": filepath,
"dirname": path_processor.dirname(filepath),
Expand All @@ -585,9 +585,12 @@ async def get_file_token(
token["size"],
cwl_version,
)
token["checksum"] = "sha1${checksum}".format(
checksum=await real_path.checksum()
)
if (checksum := await real_path.checksum()) is not None:
token["checksum"] = f"sha1${checksum}"
else:
raise WorkflowExecutionException(
f"Impossible to retrieve checksum of {real_path} on {location}"
)
break
elif token_class == "Directory" and load_listing != LoadListing.no_listing: # nosec
for location in locations:
Expand Down Expand Up @@ -1072,11 +1075,25 @@ async def write_remote_file(
)
)
await path.write_text(content)
context.data_manager.register_path(
location=location,
path=str(path),
relpath=relpath,
)
for loc in context.data_manager.get_data_locations(
path=str(path.parent),
deployment=location.deployment,
location_name=location.name,
):
if loc.path == str(path.parent):
context.data_manager.register_path(
location=location,
path=str(path),
relpath=relpath,
data_type=loc.data_type,
)
else:
context.data_manager.register_path(
location=loc.location,
path=os.path.join(loc.path, os.path.basename(path)),
relpath=os.path.join(loc.relpath, os.path.basename(path)),
data_type=loc.data_type,
)


def remap_path(
Expand Down
42 changes: 24 additions & 18 deletions streamflow/data/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,36 +265,42 @@ def register_path(
relpath: str | None = None,
data_type: DataType = DataType.PRIMARY,
) -> DataLocation:
data_location = DataLocation(
location=location,
path=path,
relpath=relpath or path,
data_type=data_type,
available=True,
)
self.path_mapper.put(path=path, data_location=data_location, recursive=True)
self.context.checkpoint_manager.register(data_location)
data_locations = [
DataLocation(
location=location,
path=path,
relpath=relpath or path,
data_type=data_type,
available=False,
)
]
self.path_mapper.put(path=path, data_location=data_locations[0], recursive=True)
self.context.checkpoint_manager.register(data_locations[0])
# Process wrapped locations if any
while (
path := _get_inner_path(
path=StreamFlowPath(path, context=self.context, location=location)
)
) is not None:
inner_location = DataLocation(
location=location.wraps,
path=str(path),
relpath=relpath or path,
data_type=data_type,
available=True,
data_locations.append(
DataLocation(
location=location.wraps,
path=str(path),
relpath=relpath or str(path),
data_type=data_type,
available=False,
)
)
self.path_mapper.put(
path=str(path), data_location=inner_location, recursive=True
path=str(path), data_location=data_locations[-1], recursive=True
)
self.register_relation(
src_location=data_location, dst_location=inner_location
src_location=data_locations[0], dst_location=data_locations[-1]
)
location = location.wraps
return data_location
for loc in data_locations:
loc.available.set()
return data_locations[0]

def register_relation(
self, src_location: DataLocation, dst_location: DataLocation
Expand Down
2 changes: 0 additions & 2 deletions streamflow/data/remotepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,6 @@ async def _get_inner_path(self) -> StreamFlowPath | None:
return self._inner_path
else:
path = path.parent
if str(path) == os.sep:
self._inner_path = None
return self._inner_path

async def _test(self, command: list[str]) -> bool:
Expand Down

0 comments on commit 9351079

Please sign in to comment.