Skip to content

Commit

Permalink
Improved data tranfer logic
Browse files Browse the repository at this point in the history
As an optimisation of the DataManager logic, now each input dependency
is transferred only once to each remote resource.

This is managed using the path mapper as a source of truth, and creating
symlinks to a primary location of an input dependency, even if it has
been transferred by a different, independent job.
  • Loading branch information
GlassOfWhiskey committed Apr 17, 2021
1 parent ebedb2c commit 313ba9e
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 163 deletions.
38 changes: 27 additions & 11 deletions streamflow/core/data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from abc import ABC, abstractmethod
from enum import Enum
from typing import TYPE_CHECKING
Expand All @@ -19,18 +20,23 @@ def __init__(self, context: StreamFlowContext):
self.context: StreamFlowContext = context

@abstractmethod
def get_data_locations(self, resource: Text, path: Text) -> Set[DataLocation]:
def get_data_locations(self,
resource: Text,
path: Text,
location_type: Optional[DataLocationType] = None) -> Set[DataLocation]:
...

@abstractmethod
def invalidate_location(self, resource: Text, path: Text) -> None:
def invalidate_location(self,
resource: Text,
path: Text) -> None:
...

@abstractmethod
async def register_path(self,
job: Optional[Job],
resource: Optional[Text],
path: Text):
def register_path(self,
job: Optional[Job],
resource: Optional[Text],
path: Text):
...

@abstractmethod
Expand All @@ -49,25 +55,35 @@ class FileType(Enum):


class DataLocation(object):
__slots__ = ('path', 'job', 'resource', 'valid')
__slots__ = ('path', 'job', 'location_type', 'resource', 'available')

def __init__(self,
path: Text,
job: Optional[Text],
location_type: DataLocationType,
resource: Optional[Text] = None,
valid: bool = True):
available: bool = False):
self.path: Text = path
self.job: Optional[Text] = job
self.resource: Optional[Text] = resource
self.valid: bool = valid
self.location_type: DataLocationType = location_type
self.available: asyncio.Event = asyncio.Event()
if available:
self.available.set()

def __eq__(self, other):
if not isinstance(other, DataLocation):
return False
else:
return (self.path == other.path and
self.job == other.job and
self.resource == other.resource)

def __hash__(self):
return hash((self.path, self.job, self.resource))
return hash((self.path, self.resource))


class DataLocationType(Enum):
PRIMARY = 0
SYMBOLIC_LINK = 1
WRITABLE_COPY = 3
INVALID = 4
26 changes: 11 additions & 15 deletions streamflow/cwl/token_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from cwltool.utils import CONTENT_LIMIT
from typing_extensions import Text

from streamflow.core.data import FileType, LOCAL_RESOURCE
from streamflow.core.data import FileType, LOCAL_RESOURCE, DataLocationType
from streamflow.core.deployment import Connector
from streamflow.core.exception import WorkflowExecutionException, WorkflowDefinitionException, \
UnrecoverableTokenException
Expand Down Expand Up @@ -453,10 +453,10 @@ async def _recover_path(self,
# Otherwise, get the list of other file locations from DataManager
data_locations = set()
for resource in resources:
data_locations.update(context.data_manager.get_data_locations(resource, path))
data_locations.update(context.data_manager.get_data_locations(resource, path, DataLocationType.PRIMARY))
# Check if path is still present in original resources
for location in data_locations:
if location.resource in job_resources and location.valid:
if location.resource in job_resources:
if await remotepath.exists(connector, location.resource, path):
return path
else:
Expand Down Expand Up @@ -531,16 +531,14 @@ async def _recover_token_value(self,
token['secondaryFiles'] = secondary_files
return token

async def _register_data(self,
job: Job,
token_value: Union[MutableSequence[MutableMapping[Text, Any]], MutableMapping[Text, Any]]):
def _register_data(self,
job: Job,
token_value: Union[MutableSequence[MutableMapping[Text, Any]], MutableMapping[Text, Any]]):
context = self.get_context()
# If `token_value` is a list, process every item independently
if isinstance(token_value, MutableSequence):
register_path_tasks = []
for t in token_value:
register_path_tasks.append(asyncio.create_task(self._register_data(job, t)))
await asyncio.gather(*register_path_tasks)
self._register_data(job, t)
# Otherwise, if token value is a dictionary and it refers to a File or a Directory, register the path
elif (isinstance(token_value, MutableMapping)
and 'class' in token_value
Expand All @@ -562,11 +560,8 @@ async def _register_data(self,
resources = job.get_resources() or [None]
for path in paths:
if resources:
await asyncio.gather(*[asyncio.create_task(
for resource in resources or [None]:
context.data_manager.register_path(job, resource, path)
) for resource in resources])
else:
await context.data_manager.register_path(job, None, path)

async def _transfer_file(self,
src_job: Optional[Job],
Expand Down Expand Up @@ -729,7 +724,7 @@ async def compute_token(self, job: Job, command_output: CWLCommandOutput) -> Any
if command_output.status == Status.SKIPPED:
return Token(name=self.port.name, value=None, job=job.name)
token_value = await self._get_value_from_command(job, command_output)
await self._register_data(job, token_value)
self._register_data(job, token_value)
weight = await self.weight_token(job, token_value)
return Token(name=self.port.name, value=token_value, job=job.name, weight=weight)

Expand All @@ -749,7 +744,8 @@ def get_related_resources(self, token: Token) -> Set[Text]:
data_locations = set()
for resource in resources:
for path in paths:
data_locations.update(context.data_manager.get_data_locations(resource, path))
data_locations.update(context.data_manager.get_data_locations(
resource, path, DataLocationType.PRIMARY))
return set(loc.resource for loc in filter(lambda l: l.resource not in resources, data_locations))
else:
return set()
Expand Down
Loading

0 comments on commit 313ba9e

Please sign in to comment.