Skip to content

Commit

Permalink
Add leader fallback for files without filesize and import them on the…
Browse files Browse the repository at this point in the history
… leader
  • Loading branch information
stxue1 committed Dec 18, 2024
1 parent fe9a811 commit 097740a
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 57 deletions.
182 changes: 129 additions & 53 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
TypeVar,
Union,
cast,
Literal, Protocol,
Literal,
Protocol,
)
from urllib.parse import quote, unquote, urlparse, urlsplit

Expand All @@ -66,6 +67,7 @@
import cwltool.main
import cwltool.resolver
import schema_salad.ref_resolver

# This is also in configargparse but MyPy doesn't know it
from argparse import RawDescriptionHelpFormatter
from configargparse import ArgParser, Namespace
Expand Down Expand Up @@ -132,6 +134,8 @@
unwrap,
ImportsJob,
get_file_sizes,
FileMetadata,
WorkerImportJob,
)
from toil.jobStores.abstractJobStore import (
AbstractJobStore,
Expand Down Expand Up @@ -1893,12 +1897,20 @@ def extract_file_uri_once(
return rp
return None


V = TypeVar("V", covariant=True)


class VisitFunc(Protocol[V]):
def __call__(self, fileindex: dict[str, str], existing: dict[str, str],
file_metadata: CWLObjectType, mark_broken: bool,
skip_remote: bool) -> V: ...
def __call__(
self,
fileindex: dict[str, str],
existing: dict[str, str],
file_metadata: CWLObjectType,
mark_broken: bool,
skip_remote: bool,
) -> V: ...


def visit_files(
func: VisitFunc[V],
Expand Down Expand Up @@ -2188,7 +2200,9 @@ def extract_and_convert_file_to_toil_uri(
Unless skip_remote is set, also run on remote files and sets their locations
to toil URIs as well.
"""
location = extract_file_uri_once(fileindex, existing, file_metadata, mark_broken, skip_remote)
location = extract_file_uri_once(
fileindex, existing, file_metadata, mark_broken, skip_remote
)
if location is not None:
file_metadata["location"] = convert_file_uri_to_toil_uri(
convertfunc, fileindex, existing, location
Expand Down Expand Up @@ -2896,7 +2910,9 @@ def file_import_function(url: str, log_level: int = logging.DEBUG) -> FileID:
logger.log(log_level, "Loading %s...", url)
return writeGlobalFileWrapper(file_store, url)

file_upload_function = functools.partial(extract_and_convert_file_to_toil_uri, file_import_function)
file_upload_function = functools.partial(
extract_and_convert_file_to_toil_uri, file_import_function
)

# Upload all the Files and set their and the Directories' locations, if
# needed.
Expand Down Expand Up @@ -2948,8 +2964,39 @@ def makeRootJob(
:return:
"""
if options.run_imports_on_workers:
filenames = extract_workflow_inputs(options, initialized_job_order, tool)
files_to_data = get_file_sizes(
filenames, toil._jobStore, include_remote_files=options.reference_inputs
)

# files with a associated filesize that are valid to be imported on workers
valid_files_to_data = dict()
# files without an associated filesize that should be imported on the leader
leftover_files_to_data = dict()
for filename, file_data in files_to_data.items():
if file_data.size is None:
leftover_files_to_data[filename] = file_data
else:
valid_files_to_data[filename] = file_data

# import the files for the leader first
path_to_fileid = WorkerImportJob.import_files(
list(leftover_files_to_data.keys()), toil._jobStore
)

# then install the imported files before importing the other files
# this way the control flow can fall from the leader to workers
tool, initialized_job_order = CWLInstallImportsJob.convert_files(
initialized_job_order,
tool,
path_to_fileid,
options.basedir,
options.reference_inputs,
options.bypass_file_store,
)

import_job = CWLImportWrapper(
initialized_job_order, tool, runtime_context, options
initialized_job_order, tool, runtime_context, valid_files_to_data, options
)
return import_job
else:
Expand Down Expand Up @@ -3538,22 +3585,23 @@ def __init__(
self.bypass_file_store = bypass_file_store
self.import_data = import_data

def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]:
"""
Convert the filenames in the workflow inputs into the URIs
:return: Promise of transformed workflow inputs. A tuple of the job order and process
"""
candidate_to_fileid: dict[str, FileID] = unwrap(self.import_data)

initialized_job_order = unwrap(self.initialized_job_order)
tool = unwrap(self.tool)

@staticmethod
def convert_files(
initialized_job_order: CWLObjectType,
tool: Process,
candidate_to_fileid: dict[str, FileID],
basedir: str,
skip_remote: bool,
bypass_file_store: bool,
) -> tuple[Process, CWLObjectType]:
def convert_file(filename: str) -> FileID:
fileid = candidate_to_fileid[filename]
return fileid

file_convert_function = functools.partial(extract_and_convert_file_to_toil_uri, convert_file)
fs_access = ToilFsAccess(self.basedir)
file_convert_function = functools.partial(
extract_and_convert_file_to_toil_uri, convert_file
)
fs_access = ToilFsAccess(basedir)
fileindex: dict[str, str] = {}
existing: dict[str, str] = {}
visit_files(
Expand All @@ -3563,8 +3611,8 @@ def convert_file(filename: str) -> FileID:
existing,
initialized_job_order,
mark_broken=True,
skip_remote=self.skip_remote,
bypass_file_store=self.bypass_file_store,
skip_remote=skip_remote,
bypass_file_store=bypass_file_store,
)
visitSteps(
tool,
Expand All @@ -3575,8 +3623,8 @@ def convert_file(filename: str) -> FileID:
fileindex,
existing,
mark_broken=True,
skip_remote=self.skip_remote,
bypass_file_store=self.bypass_file_store,
skip_remote=skip_remote,
bypass_file_store=bypass_file_store,
),
)

Expand All @@ -3588,9 +3636,26 @@ def convert_file(filename: str) -> FileID:
# This will properly make them cause an error later if they
# were required.
rm_unprocessed_secondary_files(param_value)

return tool, initialized_job_order

def run(self, file_store: AbstractFileStore) -> Tuple[Process, CWLObjectType]:
"""
Convert the filenames in the workflow inputs into the URIs
:return: Promise of transformed workflow inputs. A tuple of the job order and process
"""
candidate_to_fileid: dict[str, FileID] = unwrap(self.import_data)

initialized_job_order = unwrap(self.initialized_job_order)
tool = unwrap(self.tool)
return CWLInstallImportsJob.convert_files(
initialized_job_order,
tool,
candidate_to_fileid,
self.basedir,
self.skip_remote,
self.bypass_file_store,
)


class CWLImportWrapper(CWLNamedJob):
"""
Expand All @@ -3605,22 +3670,22 @@ def __init__(
initialized_job_order: CWLObjectType,
tool: Process,
runtime_context: cwltool.context.RuntimeContext,
file_to_data: dict[str, FileMetadata],
options: Namespace,
):
super().__init__(local=False, disk=options.import_workers_threshold)
self.initialized_job_order = initialized_job_order
self.tool = tool
self.options = options
self.runtime_context = runtime_context
self.file_to_data = file_to_data

def run(self, file_store: AbstractFileStore) -> Any:
filenames = extract_workflow_inputs(
self.options, self.initialized_job_order, self.tool
imports_job = ImportsJob(
self.file_to_data,
self.options.import_workers_threshold,
self.options.import_workers_disk,
)
file_to_data = get_file_sizes(
filenames, file_store.jobStore, include_remote_files=self.options.reference_inputs
)
imports_job = ImportsJob(file_to_data, self.options.import_workers_threshold, self.options.import_workers_disk)
self.addChild(imports_job)
install_imports_job = CWLInstallImportsJob(
initialized_job_order=self.initialized_job_order,
Expand All @@ -3634,7 +3699,9 @@ def run(self, file_store: AbstractFileStore) -> Any:
imports_job.addFollowOn(install_imports_job)

start_job = CWLStartJob(
install_imports_job.rv(0), install_imports_job.rv(1), runtime_context=self.runtime_context
install_imports_job.rv(0),
install_imports_job.rv(1),
runtime_context=self.runtime_context,
)
self.addChild(start_job)
install_imports_job.addFollowOn(start_job)
Expand All @@ -3645,7 +3712,7 @@ def run(self, file_store: AbstractFileStore) -> Any:
class CWLStartJob(CWLNamedJob):
"""
Job responsible for starting the CWL workflow.
Takes in the workflow/tool and inputs after all files are imported
and creates jobs to run those workflows.
"""
Expand Down Expand Up @@ -3744,7 +3811,10 @@ def import_workflow_inputs(
def file_import_function(url: str) -> FileID:
logger.log(log_level, "Loading %s...", url)
return jobstore.import_file(url, symlink=True)
import_function = functools.partial(extract_and_convert_file_to_toil_uri, file_import_function)

import_function = functools.partial(
extract_and_convert_file_to_toil_uri, file_import_function
)
# Import all the input files, some of which may be missing optional
# files.
logger.info("Importing input files...")
Expand All @@ -3763,8 +3833,13 @@ def file_import_function(url: str) -> FileID:
# Make another function for importing tool files. This one doesn't allow
# symlinking, since the tools might be coming from storage not accessible
# to all nodes.
tool_import_function = functools.partial(extract_and_convert_file_to_toil_uri,
cast(Callable[[str], FileID], functools.partial(jobstore.import_file, symlink=False)))
tool_import_function = functools.partial(
extract_and_convert_file_to_toil_uri,
cast(
Callable[[str], FileID],
functools.partial(jobstore.import_file, symlink=False),
),
)

# Import all the files associated with tools (binaries, etc.).
# Not sure why you would have an optional secondary file here, but
Expand Down Expand Up @@ -3795,6 +3870,8 @@ def file_import_function(url: str) -> FileID:


T = TypeVar("T")


def visitSteps(
cmdline_tool: Process,
op: Callable[[CommentedMap], list[T]],
Expand All @@ -3818,12 +3895,10 @@ def visitSteps(
# if they bothered to run the Process __init__.
return op(cmdline_tool.tool)
raise RuntimeError(
f"Unsupported type encountered in workflow "
f"traversal: {type(cmdline_tool)}"
f"Unsupported type encountered in workflow " f"traversal: {type(cmdline_tool)}"
)



def rm_unprocessed_secondary_files(job_params: Any) -> None:
if isinstance(job_params, list):
for j in job_params:
Expand Down Expand Up @@ -4081,7 +4156,8 @@ def get_options(args: list[str]) -> Namespace:
parser = ArgParser(
allow_abbrev=False,
usage="%(prog)s [options] WORKFLOW [INFILE] [WF_OPTIONS...]",
description=textwrap.dedent("""
description=textwrap.dedent(
"""
positional arguments:
WORKFLOW CWL file to run.
Expand All @@ -4096,10 +4172,11 @@ def get_options(args: list[str]) -> Namespace:
If an input has the same name as a Toil option, pass
'--' before it.
"""),
"""
),
formatter_class=RawDescriptionHelpFormatter,
)

addOptions(parser, jobstore_as_flag=True, cwl=True)
options: Namespace
options, extra = parser.parse_known_args(args)
Expand Down Expand Up @@ -4264,14 +4341,12 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int:

options.tool_help = None
options.debug = options.logLevel == "DEBUG"
job_order_object, options.basedir, jobloader = (
cwltool.main.load_job_order(
options,
sys.stdin,
loading_context.fetcher_constructor,
loading_context.overrides_list,
tool_file_uri,
)
job_order_object, options.basedir, jobloader = cwltool.main.load_job_order(
options,
sys.stdin,
loading_context.fetcher_constructor,
loading_context.overrides_list,
tool_file_uri,
)
if options.overrides:
loading_context.overrides_list.extend(
Expand Down Expand Up @@ -4332,7 +4407,8 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int:
if err.code == 2: # raised by argparse's parse_args() function
print(
"\nIf both a CWL file and an input object (YAML/JSON) file were "
"provided, the problem may be the argument order." + usage_message,
"provided, the problem may be the argument order."
+ usage_message,
file=sys.stderr,
)
raise
Expand All @@ -4345,9 +4421,9 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int:
shortname(inp["id"]) in initialized_job_order
and inp["type"] == "File"
):
cast(
CWLObjectType, initialized_job_order[shortname(inp["id"])]
)["streamable"] = inp.get("streamable", False)
cast(CWLObjectType, initialized_job_order[shortname(inp["id"])])[
"streamable"
] = inp.get("streamable", False)
# TODO also for nested types that contain streamable Files

runtime_context.use_container = not options.no_container
Expand Down
Loading

0 comments on commit 097740a

Please sign in to comment.