diff --git a/streamflow/core/data.py b/streamflow/core/data.py index f38d0694d..af5e584cd 100644 --- a/streamflow/core/data.py +++ b/streamflow/core/data.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio from abc import ABC, abstractmethod from enum import Enum from typing import TYPE_CHECKING @@ -19,18 +20,23 @@ def __init__(self, context: StreamFlowContext): self.context: StreamFlowContext = context @abstractmethod - def get_data_locations(self, resource: Text, path: Text) -> Set[DataLocation]: + def get_data_locations(self, + resource: Text, + path: Text, + location_type: Optional[DataLocationType] = None) -> Set[DataLocation]: ... @abstractmethod - def invalidate_location(self, resource: Text, path: Text) -> None: + def invalidate_location(self, + resource: Text, + path: Text) -> None: ... @abstractmethod - async def register_path(self, - job: Optional[Job], - resource: Optional[Text], - path: Text): + def register_path(self, + job: Optional[Job], + resource: Optional[Text], + path: Text): ... @abstractmethod @@ -49,25 +55,35 @@ class FileType(Enum): class DataLocation(object): - __slots__ = ('path', 'job', 'resource', 'valid') + __slots__ = ('path', 'job', 'location_type', 'resource', 'available') def __init__(self, path: Text, job: Optional[Text], + location_type: DataLocationType, resource: Optional[Text] = None, - valid: bool = True): + available: bool = False): self.path: Text = path self.job: Optional[Text] = job self.resource: Optional[Text] = resource - self.valid: bool = valid + self.location_type: DataLocationType = location_type + self.available: asyncio.Event = asyncio.Event() + if available: + self.available.set() def __eq__(self, other): if not isinstance(other, DataLocation): return False else: return (self.path == other.path and - self.job == other.job and self.resource == other.resource) def __hash__(self): - return hash((self.path, self.job, self.resource)) + return hash((self.path, self.resource)) + + +class DataLocationType(Enum): + PRIMARY = 0 + SYMBOLIC_LINK = 1 + WRITABLE_COPY = 3 + INVALID = 4 diff --git a/streamflow/cwl/token_processor.py b/streamflow/cwl/token_processor.py index 7f55ad52e..a4c1fc78c 100644 --- a/streamflow/cwl/token_processor.py +++ b/streamflow/cwl/token_processor.py @@ -10,7 +10,7 @@ from cwltool.utils import CONTENT_LIMIT from typing_extensions import Text -from streamflow.core.data import FileType, LOCAL_RESOURCE +from streamflow.core.data import FileType, LOCAL_RESOURCE, DataLocationType from streamflow.core.deployment import Connector from streamflow.core.exception import WorkflowExecutionException, WorkflowDefinitionException, \ UnrecoverableTokenException @@ -453,10 +453,10 @@ async def _recover_path(self, # Otherwise, get the list of other file locations from DataManager data_locations = set() for resource in resources: - data_locations.update(context.data_manager.get_data_locations(resource, path)) + data_locations.update(context.data_manager.get_data_locations(resource, path, DataLocationType.PRIMARY)) # Check if path is still present in original resources for location in data_locations: - if location.resource in job_resources and location.valid: + if location.resource in job_resources: if await remotepath.exists(connector, location.resource, path): return path else: @@ -531,16 +531,14 @@ async def _recover_token_value(self, token['secondaryFiles'] = secondary_files return token - async def _register_data(self, - job: Job, - token_value: Union[MutableSequence[MutableMapping[Text, Any]], MutableMapping[Text, Any]]): + def _register_data(self, + job: Job, + token_value: Union[MutableSequence[MutableMapping[Text, Any]], MutableMapping[Text, Any]]): context = self.get_context() # If `token_value` is a list, process every item independently if isinstance(token_value, MutableSequence): - register_path_tasks = [] for t in token_value: - register_path_tasks.append(asyncio.create_task(self._register_data(job, t))) - await asyncio.gather(*register_path_tasks) + self._register_data(job, t) # Otherwise, if token value is a dictionary and it refers to a File or a Directory, register the path elif (isinstance(token_value, MutableMapping) and 'class' in token_value @@ -562,11 +560,8 @@ async def _register_data(self, resources = job.get_resources() or [None] for path in paths: if resources: - await asyncio.gather(*[asyncio.create_task( + for resource in resources or [None]: context.data_manager.register_path(job, resource, path) - ) for resource in resources]) - else: - await context.data_manager.register_path(job, None, path) async def _transfer_file(self, src_job: Optional[Job], @@ -729,7 +724,7 @@ async def compute_token(self, job: Job, command_output: CWLCommandOutput) -> Any if command_output.status == Status.SKIPPED: return Token(name=self.port.name, value=None, job=job.name) token_value = await self._get_value_from_command(job, command_output) - await self._register_data(job, token_value) + self._register_data(job, token_value) weight = await self.weight_token(job, token_value) return Token(name=self.port.name, value=token_value, job=job.name, weight=weight) @@ -749,7 +744,8 @@ def get_related_resources(self, token: Token) -> Set[Text]: data_locations = set() for resource in resources: for path in paths: - data_locations.update(context.data_manager.get_data_locations(resource, path)) + data_locations.update(context.data_manager.get_data_locations( + resource, path, DataLocationType.PRIMARY)) return set(loc.resource for loc in filter(lambda l: l.resource not in resources, data_locations)) else: return set() diff --git a/streamflow/data/data_manager.py b/streamflow/data/data_manager.py index b37fd6ec9..36154f959 100644 --- a/streamflow/data/data_manager.py +++ b/streamflow/data/data_manager.py @@ -8,7 +8,8 @@ from pathlib import Path, PosixPath from typing import TYPE_CHECKING -from streamflow.core.data import DataManager, DataLocation, LOCAL_RESOURCE +from streamflow.core.data import DataManager, DataLocation, LOCAL_RESOURCE, DataLocationType +from streamflow.core.deployment import Connector from streamflow.data import remotepath from streamflow.deployment.connector.base import ConnectorCopyKind from streamflow.log_handler import logger @@ -20,6 +21,59 @@ from typing_extensions import Text +async def _copy(src_connector: Optional[Connector], + src_resource: Optional[Text], + src: Text, + dst_connector: Optional[Connector], + dst_resources: Optional[MutableSequence[Text]], + dst: Text, + writable: False) -> None: + if src_connector is None and dst_connector is None: + if os.path.isdir(src): + os.makedirs(dst, exist_ok=True) + shutil.copytree(src, dst, dirs_exist_ok=True) + else: + shutil.copy(src, dst) + elif src_connector == dst_connector: + await dst_connector.copy( + src=src, + dst=dst, + resources=dst_resources, + kind=ConnectorCopyKind.REMOTE_TO_REMOTE, + source_remote=src_resource, + read_only=not writable) + elif src_connector is None: + await dst_connector.copy( + src=src, + dst=dst, + resources=dst_resources, + kind=ConnectorCopyKind.LOCAL_TO_REMOTE, + read_only=not writable) + elif dst_connector is None: + await src_connector.copy( + src=src, + dst=dst, + resources=[src_resource], + kind=ConnectorCopyKind.REMOTE_TO_LOCAL, + read_only=not writable) + else: + temp_dir = tempfile.mkdtemp() + await src_connector.copy( + src=src, + dst=temp_dir, + resources=[src_resource], + kind=ConnectorCopyKind.REMOTE_TO_LOCAL, + read_only=not writable) + await asyncio.gather(*[asyncio.create_task(dst_connector.copy( + src=os.path.join(temp_dir, element), + dst=dst, + resources=dst_resources, + kind=ConnectorCopyKind.LOCAL_TO_REMOTE, + read_only=not writable + )) for element in os.listdir(temp_dir)]) + shutil.rmtree(temp_dir) + + class DefaultDataManager(DataManager): def __init__(self, context: StreamFlowContext): @@ -43,118 +97,95 @@ async def _transfer_from_resource(self, await remotepath.mkdir(dst_connector, dst_resources, str(Path(dst).parent)) # Follow symlink for source path src = await remotepath.follow_symlink(src_connector, src_resource, src) - # If jobs are both local - if src_connector is None and dst_connector is None: - if src != dst: - if not writable: - await remotepath.symlink(None, None, src, dst) - else: - if os.path.isdir(src): - os.makedirs(dst, exist_ok=True) - shutil.copytree(src, dst, dirs_exist_ok=True) + primary_locations = self.path_mapper.get(src_resource, src, DataLocationType.PRIMARY) + copy_tasks = [] + remote_resources = [] + data_locations = [] + for dst_resource in (dst_resources or [None]): + # Check if a primary copy of the source path is already present on the destination resource + found_existing_loc = False + for primary_loc in primary_locations: + if primary_loc.resource == (dst_resource or LOCAL_RESOURCE): + # Wait for the source location to be available on the destination path + await primary_loc.available.wait() + # If yes, perform a symbolic link if possible + if not writable: + await remotepath.symlink(dst_connector, dst_resource, primary_loc.path, dst) + self.path_mapper.create_mapping( + location_type=DataLocationType.SYMBOLIC_LINK, + src_path=src, + src_resource=src_resource, + dst_path=dst, + dst_job=dst_job, + dst_resource=dst_resource, + available=True) + # Otherwise, perform a copy operation else: - shutil.copy(src, dst) - await self.path_mapper.create_mapping( - src, src_job, None, - dst, dst_job, None, - valid_dst=not writable) - # If jobs are scheduled on the same model - elif src_connector == dst_connector: - remote_resources = [] - mapping_tasks = [] - for dst_resource in dst_resources: - # If jobs are scheduled on the same resource and it is possible to link, only create a symlink - if not writable and len(dst_resources) == 1 and src_resource == dst_resource: - if src != dst: - await remotepath.symlink(dst_connector, dst_resource, src, dst) - mapping_tasks.append(asyncio.create_task( - self.path_mapper.create_mapping( - src, src_job, src_resource, - dst, dst_job, dst_resource))) - # Otherwise perform a remote copy managed by the connector - else: - remote_resources.append(dst_resource) - await asyncio.gather(*mapping_tasks) - if remote_resources: - await dst_connector.copy( - src=src, - dst=dst, - resources=remote_resources, - kind=ConnectorCopyKind.REMOTE_TO_REMOTE, - source_remote=src_resource, - read_only=not writable) - # Register the new remote copies of the data - await asyncio.gather(*[asyncio.create_task( - self.path_mapper.create_mapping( - src, src_job, src_resource, - dst, dst_job, dst_resource, - valid_dst=not writable) - ) for dst_resource in remote_resources]) - # If source job is local, copy files to the remote resources - elif src_connector is None: - await dst_connector.copy( - src=src, - dst=dst, - resources=dst_resources, - kind=ConnectorCopyKind.LOCAL_TO_REMOTE, - read_only=not writable) - # Register the new remote copies of the data - await asyncio.gather(*[asyncio.create_task( - self.path_mapper.create_mapping( - src, src_job, src_resource, - dst, dst_job, dst_resource, - valid_dst=not writable) - ) for dst_resource in dst_resources]) - # If destination job is local, copy files from the remote resource - elif dst_connector is None: - await src_connector.copy( - src=src, - dst=dst, - resources=[src_resource], - kind=ConnectorCopyKind.REMOTE_TO_LOCAL, - read_only=not writable) - # Register the new local copy of the data - await self.path_mapper.create_mapping( - src, src_job, src_resource, - dst, dst_job, None, - valid_dst=not writable) - # If jobs are both remote and scheduled on different models, perform an intermediate local copy - else: - temp_dir = tempfile.mkdtemp() - await src_connector.copy( + copy_tasks.append(asyncio.create_task(_copy( + src_connector=dst_connector, + src_resource=dst_resource, + src=primary_loc.path, + dst_connector=dst_connector, + dst_resources=[dst_resource], + dst=dst, + writable=True))) + data_locations.append(self.path_mapper.create_mapping( + location_type=DataLocationType.WRITABLE_COPY, + src_path=src, + src_resource=src_resource, + dst_path=dst, + dst_job=dst_job, + dst_resource=dst_resource)) + found_existing_loc = True + break + # Otherwise, perform a remote copy and mark the destination as primary + if not found_existing_loc: + remote_resources.append(dst_resource) + data_locations.append(self.path_mapper.create_mapping( + location_type=DataLocationType.WRITABLE_COPY if writable else DataLocationType.PRIMARY, + src_path=src, + src_resource=src_resource, + dst_path=dst, + dst_job=dst_job, + dst_resource=dst_resource)) + # Perform all the copy operations + if remote_resources: + copy_tasks.append(asyncio.create_task(_copy( + src_connector=src_connector, + src_resource=src_resource, src=src, - dst=temp_dir, - resources=[src_resource], - kind=ConnectorCopyKind.REMOTE_TO_LOCAL, - read_only=not writable) - await asyncio.gather(*[asyncio.create_task(dst_connector.copy( - src=os.path.join(temp_dir, element), + dst_connector=dst_connector, + dst_resources=remote_resources, dst=dst, - resources=dst_resources, - kind=ConnectorCopyKind.LOCAL_TO_REMOTE, - read_only=not writable - )) for element in os.listdir(temp_dir)]) - shutil.rmtree(temp_dir) - # Register the new remote copies of the data - await asyncio.gather(*[asyncio.create_task( - self.path_mapper.create_mapping( - src, src_job, src_resource, - dst, dst_job, dst_resource, - valid_dst=not writable) - ) for dst_resource in dst_resources]) + writable=writable))) + await asyncio.gather(*copy_tasks) + # Mark all destination data locations as available + for data_location in data_locations: + data_location.available.set() - def get_data_locations(self, resource: Text, path: Text) -> Set[DataLocation]: - data_locations = self.path_mapper.get(resource, path) - return set(filter(lambda l: l.valid, data_locations)) + def get_data_locations(self, + resource: Text, + path: Text, + location_type: Optional[DataLocationType] = None) -> Set[DataLocation]: + data_locations = self.path_mapper.get(resource, path, location_type) + return {loc for loc in data_locations if loc.location_type != DataLocationType.INVALID} def invalidate_location(self, resource: Text, path: Text) -> None: self.path_mapper.invalidate_location(resource, path) - async def register_path(self, - job: Optional[Job], - resource: Optional[Text], - path: Text): - await self.path_mapper.create_mapping(path, job, resource) + def register_path(self, + job: Optional[Job], + resource: Optional[Text], + path: Text): + self.path_mapper.put( + resource=resource, + path=path, + data_locations={DataLocation( + path=path, + job=job.name if job is not None else None, + resource=resource, + location_type=DataLocationType.PRIMARY, + available=True)}) self.context.checkpoint_manager.register_path(job, path) async def transfer_data(self, @@ -224,36 +255,27 @@ def _process_resource(self, resource: Text) -> Text: self._filesystems[resource] = RemotePathNode() return resource - async def create_mapping(self, - src_path: Text, - src_job: Optional[Job], - src_resource: Optional[Text], - dst_path: Optional[Text] = None, - dst_job: Optional[Job] = None, - dst_resource: Optional[Text] = None, - valid_dst: bool = True): + def create_mapping(self, + location_type: DataLocationType, + src_path: Text, + src_resource: Optional[Text], + dst_path: Text, + dst_job: Optional[Job], + dst_resource: Optional[Text], + available: bool = False) -> DataLocation: src_resource = self._process_resource(src_resource) - src_data_location = DataLocation( - path=src_path, - job=src_job.name if src_job is not None else None, - resource=src_resource) - src_connector = src_job.step.get_connector() if src_job is not None else None - src_path = await remotepath.follow_symlink(src_connector, src_resource, src_path) data_locations = self.get(src_resource, src_path) - if src_data_location not in data_locations: - data_locations.add(src_data_location) - if dst_path is not None: - dst_resource = self._process_resource(dst_resource) - dst_data_location = DataLocation( - path=dst_path, - job=dst_job.name if dst_job is not None else None, - resource=dst_resource, - valid=valid_dst) - if dst_data_location not in data_locations: - data_locations.add(dst_data_location) - self.put(src_resource, src_path, data_locations) - if dst_path is not None: - self.put(dst_resource, dst_path, data_locations) + dst_resource = self._process_resource(dst_resource) + dst_data_location = DataLocation( + path=dst_path, + job=dst_job.name if dst_job is not None else None, + location_type=location_type, + resource=dst_resource, + available=available) + if dst_data_location not in data_locations: + data_locations.add(dst_data_location) + self.put(dst_resource, dst_path, data_locations) + return dst_data_location def _remove_node(self, resource: Text, node: RemotePathNode): node.locations = set(filter(lambda l: l.resource != resource, node.locations)) @@ -263,7 +285,7 @@ def _remove_node(self, resource: Text, node: RemotePathNode): self._remove_node(resource, n) return node - def get(self, resource: Text, path: Text) -> Set[DataLocation]: + def get(self, resource: Text, path: Text, location_type: Optional[DataLocationType] = None) -> Set[DataLocation]: resource = resource or LOCAL_RESOURCE node = self._filesystems.get(resource) if not node: @@ -274,18 +296,19 @@ def get(self, resource: Text, path: Text) -> Set[DataLocation]: node = node.children[token] else: return set() - return node.locations + return ({loc for loc in node.locations if loc.location_type == location_type} if location_type + else node.locations) def invalidate_location(self, resource: Optional[Text], path: Text) -> None: resource = resource or LOCAL_RESOURCE locations = self.get(resource, path) for location in locations: if location.resource == resource: - location.valid = False + location.location_type = DataLocationType.INVALID self.put(resource, path, locations) def put(self, resource: Text, path: Text, data_locations: Set[DataLocation]) -> None: - resource = resource or LOCAL_RESOURCE + resource = self._process_resource(resource) node = self._filesystems[resource] path = Path(path) if resource == LOCAL_RESOURCE else PosixPath(path) for token in path.parts: