From 7593d20b5a81b604861dfec88fbc551657d121a6 Mon Sep 17 00:00:00 2001 From: Sebastien Poirier Date: Fri, 21 May 2021 00:51:28 +0200 Subject: [PATCH] refactored benchmark class inheritance to implement an execution plan that can run multiple frameworks, benchmarks and so on --- amlb/__init__.py | 8 +- amlb/benchmark.py | 391 ++++++++---------------------------- amlb/plan.py | 31 +++ amlb/results.py | 3 +- amlb/runners/__init__.py | 4 +- amlb/runners/aws.py | 61 +++--- amlb/runners/container.py | 18 +- amlb/runners/docker.py | 4 +- amlb/runners/local.py | 271 +++++++++++++++++++++++++ amlb/runners/singularity.py | 4 +- recover_results.py | 1 - resources/config.yaml | 1 - runbenchmark.py | 54 ++--- runscores.py | 1 - 14 files changed, 455 insertions(+), 397 deletions(-) create mode 100644 amlb/plan.py create mode 100644 amlb/runners/local.py diff --git a/amlb/__init__.py b/amlb/__init__.py index a1549d76e..0de3b5ab0 100644 --- a/amlb/__init__.py +++ b/amlb/__init__.py @@ -2,11 +2,11 @@ amlb entrypoint package. """ +from .benchmark import SetupMode from .logger import app_logger as log from .errors import AutoMLError from .resources import Resources -from .benchmark import Benchmark, SetupMode -from .runners import AWSBenchmark, DockerBenchmark, SingularityBenchmark +from .runners import LocalBenchmark, AWSBenchmark, DockerBenchmark, SingularityBenchmark from .results import TaskResult from .__version__ import __version__ @@ -14,10 +14,10 @@ "log", "AutoMLError", "Resources", - "Benchmark", + "LocalBenchmark", + "AWSBenchmark", "DockerBenchmark", "SingularityBenchmark", - "AWSBenchmark", "SetupMode", "TaskResult", "__version__" diff --git a/amlb/benchmark.py b/amlb/benchmark.py index aa6e43578..2e976ec5f 100644 --- a/amlb/benchmark.py +++ b/amlb/benchmark.py @@ -1,5 +1,5 @@ """ -**benchmark** module handles all the main logic: +**main** module handles all the main running logic: - load specified framework and benchmark. - extract the tasks and configure them. @@ -7,32 +7,25 @@ - run the jobs. - collect and save results. """ -from copy import copy from enum import Enum -from importlib import import_module, invalidate_caches import logging import math import os import re import signal -import sys +from typing import List, Union from .job import Job, JobError, SimpleJobRunner, MultiThreadingJobRunner -from .datasets import DataLoader, DataSourceType -from .data import DatasetType +from .datasets import DataLoader from .resources import get as rget, config as rconfig, output_dirs as routput_dirs -from .results import ErrorResult, Scoreboard, TaskResult -from .utils import Namespace as ns, OSMonitoring, as_list, datetime_iso, flatten, json_dump, lazy_property, profile, repr_def, \ - run_cmd, run_script, signal_handler, str2bool, str_sanitize, system_cores, system_memory_mb, system_volume_mb, touch +from .results import Scoreboard +from .utils import Namespace as ns, OSMonitoring, datetime_iso, flatten, lazy_property, \ + signal_handler, str2bool, str_sanitize, system_cores, system_memory_mb, system_volume_mb log = logging.getLogger(__name__) -__installed_file__ = '.installed' -__setup_env_file__ = '.setup_env' - - class SetupMode(Enum): auto = 0 skip = 1 @@ -42,26 +35,16 @@ class SetupMode(Enum): class Benchmark: - """Benchmark. - Structure containing the generic information needed to run a benchmark: - - the datasets - - the automl framework - - - we need to support: - - openml tasks - - openml datasets - - openml studies (=benchmark suites) - - user-defined (list of) datasets - """ + run_mode = None data_loader = None def __init__(self, framework_name: str, benchmark_name: str, constraint_name: str): self.job_runner = None - if rconfig().run_mode == 'script': - self.framework_def, self.framework_name, self.framework_module = None, None, None + if not any([framework_name, benchmark_name, constraint_name]): + # used to disable normal init in some unusual cases. + self.framework_def, self.framework_name = None, None self.benchmark_def, self.benchmark_name, self.benchmark_path = None, None, None self.constraint_def, self.constraint_name = None, None self.parallel_jobs = 1 @@ -87,140 +70,82 @@ def __init__(self, framework_name: str, benchmark_name: str, constraint_name: st str_sanitize(framework_name), str_sanitize(benchmark_name), constraint_name, - rconfig().run_mode, + self.run_mode, datetime_iso(micros=True, no_sep=True) ]).lower()) self._validate() - self.framework_module = import_module(self.framework_def.module) def _validate(self): - if self.parallel_jobs > 1: - log.warning("Parallelization is not supported in local mode: ignoring `parallel_jobs=%s` parameter.", self.parallel_jobs) - self.parallel_jobs = 1 - - def setup(self, mode: SetupMode): - """ - ensure all dependencies needed by framework are available - and possibly download them if necessary. - Delegates specific setup to the framework module - """ - Benchmark.data_loader = DataLoader(rconfig()) + pass - if mode == SetupMode.skip or mode == SetupMode.auto and self._is_setup_done(): - return + @lazy_property + def output_dirs(self): + return routput_dirs(rconfig().output_dir, session=self.sid, subdirs=['predictions', 'scores', 'logs']) - log.info("Setting up framework {}.".format(self.framework_name)) - - self._write_setup_env(self.framework_module.__path__[0], **dict(self.framework_def.setup_env)) - self._mark_setup_start() - - if hasattr(self.framework_module, 'setup'): - self.framework_module.setup(*self.framework_def.setup_args, - _live_output_=rconfig().setup.live_output, - _activity_timeout_=rconfig().setup.activity_timeout) - - if self.framework_def.setup_script is not None: - run_script(self.framework_def.setup_script, - _live_output_=rconfig().setup.live_output, - _activity_timeout_=rconfig().setup.activity_timeout) - - if self.framework_def.setup_cmd is not None: - def resolve_venv(cmd): - venvs = [ - *[os.path.join(p, "venv") for p in self.framework_module.__path__], - os.path.join(rconfig().root_dir, "venv"), - ] - venv = next((ve for ve in venvs if os.path.isdir(ve)), None) - py = os.path.join(venv, "bin", "python") if venv else "python" - pip = os.path.join(venv, "bin", "pip") if venv else "pip" - return cmd.format(py=py, pip=pip) - - setup_cmd = [resolve_venv(cmd) for cmd in self.framework_def.setup_cmd] - run_cmd('\n'.join(setup_cmd), - _executable_="/bin/bash", - _live_output_=rconfig().setup.live_output, - _activity_timeout_=rconfig().setup.activity_timeout) - - invalidate_caches() - log.info("Setup of framework {} completed successfully.".format(self.framework_name)) - - self._mark_setup_done() - - def _write_setup_env(self, dest_dir, **kwargs): - setup_env = dict( - AMLB_ROOT=rconfig().root_dir, - PY_EXEC_PATH=sys.executable - ) - setup_env.update(**kwargs) - with open(os.path.join(dest_dir, __setup_env_file__), 'w') as f: - f.write('\n'.join([f"{k}={v}" for k, v in setup_env.items()]+[""])) - - def _installed_file(self): - return os.path.join(self._framework_dir, __installed_file__) - - def _installed_version(self): - installed = self._installed_file() - versions = [] - if os.path.isfile(installed): - with open(installed, 'r') as f: - versions = list(filter(None, map(str.strip, f.readlines()))) - return versions - - def _is_setup_done(self): - return self.framework_def.version in self._installed_version() - - def _mark_setup_start(self): - installed = self._installed_file() - if os.path.isfile(installed): - os.remove(installed) - - def _mark_setup_done(self): - installed = self._installed_file() - versions = [] - if hasattr(self.framework_module, 'version'): - versions.append(self.framework_module.version()) - versions.extend([self.framework_def.version, ""]) - with open(installed, 'a') as f: - f.write('\n'.join(versions)) + def setup(self, mode: SetupMode): + self.data_loader = DataLoader(rconfig()) def cleanup(self): - # anything to do? pass - def run(self, task_name=None, fold=None): + def run(self, tasks: Union[str, List[str]] = None, folds: Union[int, List[int]] = None): """ - :param task_name: a single task name [str] or a list of task names to run. If None, then the whole benchmark will be used. - :param fold: a fold [str] or a list of folds to run. If None, then the all folds from each task definition will be used. + :param tasks: a single task name [str] or a list of task names to run. If None, then the whole benchmark will be used. + :param folds: a fold [str] or a list of folds to run. If None, then the all folds from each task definition will be used. """ - assert self._is_setup_done(), f"Framework {self.framework_name} [{self.framework_def.version}] is not installed." + jobs = self.create_jobs(tasks, folds) + return self.run_jobs(jobs) + + def create_jobs(self, tasks: Union[str, List[str]], folds: Union[int, List[int]]) -> List[Job]: + task_defs = self._get_task_defs(tasks) + jobs = flatten([self._task_jobs(task_def, folds) for task_def in task_defs]) + return jobs - task_defs = self._get_task_defs(task_name) - jobs = flatten([self._task_jobs(task_def, fold) for task_def in task_defs]) + def run_jobs(self, jobs): try: results = self._run_jobs(jobs) log.info(f"Processing results for {self.sid}") log.debug(results) - if task_name is None: - scoreboard = self._process_results(results) - else: - for task_def in task_defs: - task_results = filter(lambda res: res.result is not None and res.result.task == task_def.name, results) - scoreboard = self._process_results(task_results, task_name=task_def.name) - return scoreboard + return self._process_results(results) finally: self.cleanup() - def _create_job_runner(self, jobs): - if self.parallel_jobs == 1: - return SimpleJobRunner(jobs) - else: - # return ThreadPoolExecutorJobRunner(jobs, self.parallel_jobs) - return MultiThreadingJobRunner(jobs, self.parallel_jobs, - delay_secs=rconfig().job_scheduler.delay_between_jobs, - done_async=True) + def _get_task_defs(self, task_name): + task_defs = (self._benchmark_tasks() if task_name is None + else [self._get_task_def(name) for name in task_name] if isinstance(task_name, list) + else [self._get_task_def(task_name)]) + if len(task_defs) == 0: + raise ValueError("No task available.") + return task_defs - def _run_jobs(self, jobs): + def _benchmark_tasks(self): + return [task_def for task_def in self.benchmark_def if self._is_task_enabled(task_def)] + + def _get_task_def(self, task_name, include_disabled=False, fail_on_missing=True): + try: + task_def = next(task for task in self.benchmark_def if task.name.lower() == str_sanitize(task_name.lower())) + except StopIteration: + if fail_on_missing: + raise ValueError("Incorrect task name: {}.".format(task_name)) + return None + if not include_disabled and not self._is_task_enabled(task_def): + raise ValueError(f"Task {task_def.name} is disabled, please enable it first.") + return task_def + + def _task_jobs(self, task_def, folds=None): + folds = (range(task_def.folds) if folds is None + else folds if isinstance(folds, list) and all(isinstance(f, int) for f in folds) + else [folds] if isinstance(folds, int) + else None) + if folds is None: + raise ValueError("Fold value should be None, an int, or a list of ints.") + return list(filter(None, [self._make_job(task_def, f) for f in folds])) + + def _make_job(self, task_def, fold: int): + pass + + def _run_jobs(self, jobs: List[Job]): self.job_runner = self._create_job_runner(jobs) def on_interrupt(*_): @@ -249,66 +174,25 @@ def on_interrupt(*_): res.result.duration = res.duration return results - def _benchmark_tasks(self): - return [task_def for task_def in self.benchmark_def if Benchmark._is_task_enabled(task_def)] - - def _get_task_defs(self, task_name): - task_defs = (self._benchmark_tasks() if task_name is None - else [self._get_task_def(name) for name in task_name] if isinstance(task_name, list) - else [self._get_task_def(task_name)]) - if len(task_defs) == 0: - raise ValueError("No task available.") - return task_defs - - def _get_task_def(self, task_name, include_disabled=False, fail_on_missing=True): - try: - task_def = next(task for task in self.benchmark_def if task.name.lower() == str_sanitize(task_name.lower())) - except StopIteration: - if fail_on_missing: - raise ValueError("Incorrect task name: {}.".format(task_name)) - return None - if not include_disabled and not Benchmark._is_task_enabled(task_def): - raise ValueError(f"Task {task_def.name} is disabled, please enable it first.") - return task_def - - def _task_jobs(self, task_def, folds=None): - folds = (range(task_def.folds) if folds is None - else folds if isinstance(folds, list) and all(isinstance(f, int) for f in folds) - else [folds] if isinstance(folds, int) - else None) - if folds is None: - raise ValueError("Fold value should be None, an int, or a list of ints.") - return list(filter(None, [self._make_job(task_def, f) for f in folds])) - - def _make_job(self, task_def, fold: int): - """ - runs the framework against a given fold - :param task_def: the task to run - :param fold: the specific fold to use on this task - """ - if fold < 0 or fold >= task_def.folds: - # raise ValueError(f"Fold value {fold} is out of range for task {task_def.name}.") - log.warning(f"Fold value {fold} is out of range for task {task_def.name}, skipping it.") - return - - return BenchmarkTask(self, task_def, fold).as_job() + def _create_job_runner(self, jobs): + if self.parallel_jobs == 1: + return SimpleJobRunner(jobs) + else: + # return ThreadPoolExecutorJobRunner(jobs, self.parallel_jobs) + return MultiThreadingJobRunner(jobs, self.parallel_jobs, + delay_secs=rconfig().job_scheduler.delay_between_jobs, + done_async=True) - def _process_results(self, results, task_name=None): + def _process_results(self, results): scores = list(filter(None, flatten([res.result for res in results]))) if len(scores) == 0: return None - board = (Scoreboard(scores, - framework_name=self.framework_name, - task_name=task_name, - scores_dir=self.output_dirs.scores) if task_name - else Scoreboard(scores, - framework_name=self.framework_name, - benchmark_name=self.benchmark_name, - scores_dir=self.output_dirs.scores)) - - if rconfig().results.save: - self._save(board) + board = Scoreboard(scores, + framework_name=self.framework_name, + benchmark_name=self.benchmark_name, + scores_dir=self.output_dirs.scores) + self._save(board) log.info("Summing up scores for current run:\n%s", board.as_printable_data_frame(verbosity=2).dropna(how='all', axis='columns').to_string(index=False)) @@ -322,16 +206,8 @@ def _append(self, board): Scoreboard.all().append(board).save() Scoreboard.all(rconfig().output_dir).append(board).save() - @lazy_property - def output_dirs(self): - return routput_dirs(rconfig().output_dir, session=self.sid, subdirs=['predictions', 'scores', 'logs']) - - @property - def _framework_dir(self): - return os.path.dirname(self.framework_module.__file__) - - @staticmethod - def _is_task_enabled(task_def): + @classmethod + def _is_task_enabled(cls, task_def): return not hasattr(task_def, 'enabled') or str2bool(str(task_def.enabled)) @@ -339,7 +215,7 @@ class TaskConfig: def __init__(self, name, fold, metrics, seed, max_runtime_seconds, cores, max_mem_size_mb, min_vol_size_mb, - input_dir, output_dir): + input_dir, output_dir, run_mode): self.framework = None self.framework_params = None self.framework_version = None @@ -355,6 +231,7 @@ def __init__(self, name, fold, metrics, seed, self.input_dir = input_dir self.output_dir = output_dir self.output_predictions_file = os.path.join(output_dir, "predictions.csv") + self.run_mode = run_mode self.ext = ns() # used if frameworks require extra config points def __setattr__(self, name, value): @@ -410,109 +287,3 @@ def handle_unfulfilled(message, on_auto='warn'): handle_unfulfilled(f"Available storage ({sys_vol.free} MB / total={sys_vol.total} MB) does not meet requirements ({self.min_vol_size_mb+os_recommended_vol} MB)!") -class BenchmarkTask: - - def __init__(self, benchmark: Benchmark, task_def, fold): - """ - - :param task_def: - :param fold: - """ - self.benchmark = benchmark - self._task_def = task_def - self.fold = fold - self.task_config = TaskConfig( - name=task_def.name, - fold=fold, - metrics=task_def.metric, - seed=rget().seed(fold), - max_runtime_seconds=task_def.max_runtime_seconds, - cores=task_def.cores, - max_mem_size_mb=task_def.max_mem_size_mb, - min_vol_size_mb=task_def.min_vol_size_mb, - input_dir=rconfig().input_dir, - output_dir=benchmark.output_dirs.session, - ) - # allowing to override some task parameters through command line, e.g.: -Xt.max_runtime_seconds=60 - if rconfig()['t'] is not None: - for c in dir(self.task_config): - if rconfig().t[c] is not None: - setattr(self.task_config, c, rconfig().t[c]) - self._dataset = None - - def load_data(self): - """ - Loads the training dataset for the current given task - :return: path to the dataset file - """ - if hasattr(self._task_def, 'openml_task_id'): - self._dataset = Benchmark.data_loader.load(DataSourceType.openml_task, task_id=self._task_def.openml_task_id, fold=self.fold) - log.debug("Loaded OpenML dataset for task_id %s.", self._task_def.openml_task_id) - elif hasattr(self._task_def, 'openml_dataset_id'): - # TODO - raise NotImplementedError("OpenML datasets without task_id are not supported yet.") - elif hasattr(self._task_def, 'dataset'): - self._dataset = Benchmark.data_loader.load(DataSourceType.file, dataset=self._task_def.dataset, fold=self.fold) - else: - raise ValueError("Tasks should have one property among [openml_task_id, openml_dataset_id, dataset].") - - def as_job(self): - job = Job(name=rconfig().token_separator.join([ - 'local', - self.benchmark.benchmark_name, - self.benchmark.constraint_name, - self.task_config.name, - str(self.fold), - self.benchmark.framework_name - ]), - # specifying a job timeout to handle edge cases where framework never completes or hangs - # (adding 5min safety to let the potential subprocess handle the interruption first). - timeout_secs=self.task_config.job_timeout_seconds+5*60, - raise_on_failure=rconfig().job_scheduler.exit_on_job_failure, - ) - job._setup = self.setup - job._run = self.run - return job - - def setup(self): - self.task_config.estimate_system_params() - self.load_data() - - @profile(logger=log) - def run(self): - results = TaskResult(task_def=self._task_def, fold=self.fold, - constraint=self.benchmark.constraint_name, - predictions_dir=self.benchmark.output_dirs.predictions) - framework_def = self.benchmark.framework_def - task_config = copy(self.task_config) - task_config.type = 'regression' if self._dataset.type == DatasetType.regression else 'classification' - task_config.type_ = self._dataset.type.name - task_config.framework = self.benchmark.framework_name - task_config.framework_params = framework_def.params - task_config.framework_version = self.benchmark._installed_version()[0] - - # allowing to pass framework parameters through command line, e.g.: -Xf.verbose=True -Xf.n_estimators=3000 - if rconfig()['f'] is not None: - task_config.framework_params = ns.dict(ns(framework_def.params) + rconfig().f) - - task_config.output_predictions_file = results._predictions_file - task_config.output_metadata_file = results._metadata_file - touch(os.path.dirname(task_config.output_predictions_file), as_dir=True) - if task_config.metrics is None: - task_config.metrics = as_list(rconfig().benchmarks.metrics[self._dataset.type.name]) - task_config.metric = task_config.metrics[0] - - result = meta_result = None - try: - log.info("Running task %s on framework %s with config:\n%s", task_config.name, self.benchmark.framework_name, repr_def(task_config)) - json_dump(task_config, task_config.output_metadata_file, style='pretty') - meta_result = self.benchmark.framework_module.run(self._dataset, task_config) - except Exception as e: - if rconfig().job_scheduler.exit_on_job_failure: - raise - log.exception(e) - result = ErrorResult(e) - finally: - self._dataset.release() - return results.compute_score(result=result, meta_result=meta_result) - diff --git a/amlb/plan.py b/amlb/plan.py new file mode 100644 index 000000000..3ea1958b5 --- /dev/null +++ b/amlb/plan.py @@ -0,0 +1,31 @@ +from dataclasses import dataclass +import logging +from typing import List + +from .benchmark import Benchmark + +log = logging.getLogger(__name__) + + +@dataclass +class RunTask: + framework: str + benchmark: str + constraint: str + tasks: [str] + folds: [int] + + +class ExecutionPlan: + + def __init__(self, benchmark_cls, plan: List[RunTask]): + self.benchmark_cls = benchmark_cls + self.plan = plan + + def run(self): + main_instance = Benchmark(None, None, None) + bench_instances = [self.benchmark_cls(rt.framework, rt.benchmark, rt.constraint) for rt in self.plan] + jobs = [] + for instance, rt in zip(bench_instances, self.plan): + jobs.extend(instance.create_jobs(tasks=rt.tasks, folds=rt.folds)) + return main_instance.run_jobs(jobs) diff --git a/amlb/results.py b/amlb/results.py index 3a34529a5..525e9199b 100644 --- a/amlb/results.py +++ b/amlb/results.py @@ -50,7 +50,6 @@ class ResultError(Exception): pass - class Scoreboard: results_file = 'results.csv' @@ -404,7 +403,7 @@ def compute_score(self, result=None, meta_result=None): version=metadata.framework_version, params=repr(metadata.framework_params) if metadata.framework_params else '', fold=self.fold, - mode=rconfig().run_mode, + mode=metadata.run_mode, seed=metadata.seed, app_version=rget().app_version, utc=datetime_iso(), diff --git a/amlb/runners/__init__.py b/amlb/runners/__init__.py index ecef451da..b97c19ed4 100644 --- a/amlb/runners/__init__.py +++ b/amlb/runners/__init__.py @@ -2,12 +2,14 @@ benchmark runners """ +from .local import LocalBenchmark from .aws import AWSBenchmark from .docker import DockerBenchmark from .singularity import SingularityBenchmark __all__ = [ + "LocalBenchmark", + "AWSBenchmark", "DockerBenchmark", "SingularityBenchmark", - "AWSBenchmark", ] diff --git a/amlb/runners/aws.py b/amlb/runners/aws.py index 9e7be47ed..7992d28ec 100644 --- a/amlb/runners/aws.py +++ b/amlb/runners/aws.py @@ -62,6 +62,7 @@ class AWSBenchmark(Benchmark): """AWSBenchmark an extension of Benchmark class, to run benchmarks on AWS """ + run_mode = 'aws' @classmethod def fetch_results(cls, instances_file, instance_selector=None): @@ -112,7 +113,7 @@ def _on_done(job_self): finally: bench.cleanup() - def __init__(self, framework_name, benchmark_name, constraint_name, region=None): + def __init__(self, framework_name: str, benchmark_name: str, constraint_name: str, region: str = None): """ :param framework_name: @@ -183,41 +184,40 @@ def cleanup(self): if rconfig().aws.s3.temporary is True: self._delete_s3_bucket() - def run(self, task_name=None, fold=None): - task_defs = self._get_task_defs(task_name) # validates tasks - self._exec_start() - self._monitoring_start() + def run(self, tasks=None, folds=None): + task_defs = self._get_task_defs(tasks) # validates tasks + jobs = None if self.parallel_jobs > 1: if rconfig().aws.minimize_instances: # use one instance per task: all folds executed on same instance - try: - jobs = flatten([self._make_aws_job([task_def.name], fold) for task_def in task_defs]) - results = self._run_jobs(jobs) - return self._process_results(results, task_name=task_name) - finally: - self.cleanup() - else: - # use one instance per fold per task - return super().run(task_name, fold) + jobs = flatten([self._make_aws_job([task_def.name], folds) for task_def in task_defs]) else: # use one instance for all - try: - task_names = None if task_name is None else [task_def.name for task_def in task_defs] - job = self._make_aws_job(task_names, fold) - results = self._run_jobs([job]) - return self._process_results(results, task_name=task_name) - finally: - self.cleanup() + task_names = None if tasks is None else [task_def.name for task_def in task_defs] + jobs = [self._make_aws_job(task_names, folds)] - def _create_job_runner(self, jobs): - if self.parallel_jobs == 1: - return SimpleJobRunner(jobs) + if jobs is not None: + self.run_jobs(jobs) else: - queueing_strategy = MultiThreadingJobRunner.QueueingStrategy.enforce_job_priority - return MultiThreadingJobRunner(jobs, self.parallel_jobs, + # use one instance per fold per task + return super().run(tasks, folds) + + def _create_job_runner(self, jobs): + jr = (SimpleJobRunner(jobs) if self.parallel_jobs == 1 + else MultiThreadingJobRunner(jobs, self.parallel_jobs, delay_secs=rconfig().job_scheduler.delay_between_jobs, done_async=True, - queueing_strategy=queueing_strategy) + queueing_strategy=MultiThreadingJobRunner.QueueingStrategy.enforce_job_priority)) + + super_on_state = jr._on_state + + def _on_state(_self, state): + if state is JobState.starting: + self._exec_start() + self._monitoring_start() + super_on_state(state) + + jr._on_state = _on_state.__get__(jr) def _make_job(self, task_def, fold=int): return self._make_aws_job([task_def.name], [fold]) @@ -274,8 +274,8 @@ def _reset_retry(self): for j in self.jobs: j.ext.retry = None - def _make_aws_job(self, task_names=None, folds=None): - task_names = [] if task_names is None else task_names + def _make_aws_job(self, tasks=None, folds=None): + task_names = [] if tasks is None else tasks folds = [] if folds is None else [str(f) for f in folds] task_def = (self._get_task_def(task_names[0]) if len(task_names) >= 1 else self._get_task_def('__defaults__', include_disabled=True, fail_on_missing=False) or ns(name='all')) @@ -888,8 +888,7 @@ def download_file(obj, dest, dest_display_path=None): download_file(obj, dest_path) # if obj.key == result_key: if is_result and not success: - if rconfig().results.save: - self._exec_send(lambda path: self._append(Scoreboard.load_df(path)), dest_path) + self._exec_send(lambda path: self._append(Scoreboard.load_df(path)), dest_path) success = True except Exception as e: if is_result: diff --git a/amlb/runners/container.py b/amlb/runners/container.py index de3d37e12..e4af09daa 100644 --- a/amlb/runners/container.py +++ b/amlb/runners/container.py @@ -6,14 +6,12 @@ """ from abc import abstractmethod import logging -import os import re from ..benchmark import Benchmark, SetupMode from ..errors import InvalidStateError from ..job import Job from ..resources import config as rconfig, get as rget -from ..utils import dir_of, run_cmd from ..__version__ import __version__, _dev_version as dev @@ -40,7 +38,7 @@ def image_name(cls, framework_def, label=None, **kwargs): return f"{author}/{image}:{tag}" @abstractmethod - def __init__(self, framework_name, benchmark_name, constraint_name): + def __init__(self, framework_name: str, benchmark_name: str, constraint_name: str): """ :param framework_name: @@ -82,17 +80,13 @@ def cleanup(self): # TODO: remove generated script? anything else? pass - def run(self, task_name=None, fold=None): - self._get_task_defs(task_name) # validates tasks + def run(self, tasks=None, folds=None): + self._get_task_defs(tasks) # validates tasks if self.parallel_jobs > 1 or not self.minimize_instances: - return super().run(task_name, fold) + return super().run(tasks, folds) else: - job = self._make_container_job(task_name, fold) - try: - results = self._run_jobs([job]) - return self._process_results(results, task_name=task_name) - finally: - self.cleanup() + job = self._make_container_job(tasks, folds) + return self.run_jobs([job]) def _make_job(self, task_def, fold=int): return self._make_container_job([task_def.name], [fold]) diff --git a/amlb/runners/docker.py b/amlb/runners/docker.py index 8a2b82f65..04ba6f5d2 100644 --- a/amlb/runners/docker.py +++ b/amlb/runners/docker.py @@ -21,7 +21,9 @@ class DockerBenchmark(ContainerBenchmark): an extension of ContainerBenchmark to run benchmarks inside docker. """ - def __init__(self, framework_name, benchmark_name, constraint_name): + run_mode = 'docker' + + def __init__(self, framework_name: str, benchmark_name: str, constraint_name: str): """ :param framework_name: diff --git a/amlb/runners/local.py b/amlb/runners/local.py new file mode 100644 index 000000000..b79044e7f --- /dev/null +++ b/amlb/runners/local.py @@ -0,0 +1,271 @@ +""" +**main** module handles all the main running logic: + +- load specified framework and benchmark. +- extract the tasks and configure them. +- create jobs for each task. +- run the jobs. +- collect and save results. +""" +from copy import copy +from importlib import import_module, invalidate_caches +import logging +import os +import sys +from typing import List, Union + +from ..benchmark import SetupMode, Benchmark, TaskConfig +from ..data import DatasetType +from ..datasets import DataSourceType +from ..job import Job +from ..resources import get as rget, config as rconfig +from ..results import ErrorResult, TaskResult +from ..utils import Namespace as ns, as_list, json_dump, profile, repr_def, run_cmd, run_script, touch + + +log = logging.getLogger(__name__) + + +__installed_file__ = '.installed' +__setup_env_file__ = '.setup_env' + + +class LocalBenchmark(Benchmark): + """Benchmark. + Structure containing the generic information needed to run a benchmark: + - the datasets + - the automl framework + + + we need to support: + - openml tasks + - openml datasets + - openml studies (=benchmark suites) + - user-defined (list of) datasets + """ + + data_loader = None + run_mode = 'local' + + def __init__(self, framework_name: str, benchmark_name: str, constraint_name: str): + super().__init__(framework_name, benchmark_name, constraint_name) + self.framework_module = import_module(self.framework_def.module) + + def _validate(self): + if self.parallel_jobs > 1: + log.warning("Parallelization is not supported in local mode: ignoring `parallel_jobs=%s` parameter.", self.parallel_jobs) + self.parallel_jobs = 1 + + def setup(self, mode: SetupMode): + """ + ensure all dependencies needed by framework are available + and possibly download them if necessary. + Delegates specific setup to the framework module + """ + super().setup(mode) + if mode == SetupMode.skip or mode == SetupMode.auto and self._is_setup_done(): + return + + log.info("Setting up framework {}.".format(self.framework_name)) + + self._write_setup_env(self.framework_module.__path__[0], **dict(self.framework_def.setup_env)) + self._mark_setup_start() + + if hasattr(self.framework_module, 'setup'): + self.framework_module.setup(*self.framework_def.setup_args, + _live_output_=rconfig().setup.live_output, + _activity_timeout_=rconfig().setup.activity_timeout) + + if self.framework_def.setup_script is not None: + run_script(self.framework_def.setup_script, + _live_output_=rconfig().setup.live_output, + _activity_timeout_=rconfig().setup.activity_timeout) + + if self.framework_def.setup_cmd is not None: + def resolve_venv(cmd): + venvs = [ + *[os.path.join(p, "venv") for p in self.framework_module.__path__], + os.path.join(rconfig().root_dir, "venv"), + ] + venv = next((ve for ve in venvs if os.path.isdir(ve)), None) + py = os.path.join(venv, "bin", "python") if venv else "python" + pip = os.path.join(venv, "bin", "pip") if venv else "pip" + return cmd.format(py=py, pip=pip) + + setup_cmd = [resolve_venv(cmd) for cmd in self.framework_def.setup_cmd] + run_cmd('\n'.join(setup_cmd), + _executable_="/bin/bash", + _live_output_=rconfig().setup.live_output, + _activity_timeout_=rconfig().setup.activity_timeout) + + invalidate_caches() + log.info("Setup of framework {} completed successfully.".format(self.framework_name)) + + self._mark_setup_done() + + def _write_setup_env(self, dest_dir, **kwargs): + setup_env = dict( + AMLB_ROOT=rconfig().root_dir, + PY_EXEC_PATH=sys.executable + ) + setup_env.update(**kwargs) + with open(os.path.join(dest_dir, __setup_env_file__), 'w') as f: + f.write('\n'.join([f"{k}={v}" for k, v in setup_env.items()]+[""])) + + def _installed_file(self): + return os.path.join(self._framework_dir, __installed_file__) + + def _installed_version(self): + installed = self._installed_file() + versions = [] + if os.path.isfile(installed): + with open(installed, 'r') as f: + versions = list(filter(None, map(str.strip, f.readlines()))) + return versions + + def _is_setup_done(self): + return self.framework_def.version in self._installed_version() + + def _mark_setup_start(self): + installed = self._installed_file() + if os.path.isfile(installed): + os.remove(installed) + + def _mark_setup_done(self): + installed = self._installed_file() + versions = [] + if hasattr(self.framework_module, 'version'): + versions.append(self.framework_module.version()) + versions.extend([self.framework_def.version, ""]) + with open(installed, 'a') as f: + f.write('\n'.join(versions)) + + def run(self, tasks: Union[str, List[str]] = None, folds: Union[int, List[int]] = None): + assert self._is_setup_done(), f"Framework {self.framework_name} [{self.framework_def.version}] is not installed." + super().run(tasks, folds) + + def _make_job(self, task_def, fold: int): + """ + runs the framework against a given fold + :param task_def: the task to run + :param fold: the specific fold to use on this task + """ + if fold < 0 or fold >= task_def.folds: + # raise ValueError(f"Fold value {fold} is out of range for task {task_def.name}.") + log.warning(f"Fold value {fold} is out of range for task {task_def.name}, skipping it.") + return + + return LocalTask(self, task_def, fold).as_job() + + @property + def _framework_dir(self): + return os.path.dirname(self.framework_module.__file__) + + +class LocalTask: + + def __init__(self, benchmark: LocalBenchmark, task_def, fold): + """ + + :param task_def: + :param fold: + """ + self.benchmark = benchmark + self._task_def = task_def + self.fold = fold + self.task_config = TaskConfig( + name=task_def.name, + fold=fold, + metrics=task_def.metric, + seed=rget().seed(fold), + max_runtime_seconds=task_def.max_runtime_seconds, + cores=task_def.cores, + max_mem_size_mb=task_def.max_mem_size_mb, + min_vol_size_mb=task_def.min_vol_size_mb, + input_dir=rconfig().input_dir, + output_dir=benchmark.output_dirs.session, + run_mode=rconfig().run_mode + ) + # allowing to override some task parameters through command line, e.g.: -Xt.max_runtime_seconds=60 + if rconfig()['t'] is not None: + for c in dir(self.task_config): + if rconfig().t[c] is not None: + setattr(self.task_config, c, rconfig().t[c]) + self._dataset = None + + def load_data(self): + """ + Loads the training dataset for the current given task + :return: path to the dataset file + """ + if hasattr(self._task_def, 'openml_task_id'): + self._dataset = self.benchmark.data_loader.load(DataSourceType.openml_task, task_id=self._task_def.openml_task_id, fold=self.fold) + log.debug("Loaded OpenML dataset for task_id %s.", self._task_def.openml_task_id) + elif hasattr(self._task_def, 'openml_dataset_id'): + # TODO + raise NotImplementedError("OpenML datasets without task_id are not supported yet.") + elif hasattr(self._task_def, 'dataset'): + self._dataset = self.benchmark.data_loader.load(DataSourceType.file, dataset=self._task_def.dataset, fold=self.fold) + else: + raise ValueError("Tasks should have one property among [openml_task_id, openml_dataset_id, dataset].") + + def as_job(self): + job = Job(name=rconfig().token_separator.join([ + 'local', + self.benchmark.benchmark_name, + self.benchmark.constraint_name, + self.task_config.name, + str(self.fold), + self.benchmark.framework_name + ]), + # specifying a job timeout to handle edge cases where framework never completes or hangs + # (adding 5min safety to let the potential subprocess handle the interruption first). + timeout_secs=self.task_config.job_timeout_seconds+5*60, + raise_on_failure=rconfig().job_scheduler.exit_on_job_failure, + ) + job._setup = self.setup + job._run = self.run + return job + + def setup(self): + self.task_config.estimate_system_params() + self.load_data() + + @profile(logger=log) + def run(self): + results = TaskResult(task_def=self._task_def, fold=self.fold, + constraint=self.benchmark.constraint_name, + predictions_dir=self.benchmark.output_dirs.predictions) + framework_def = self.benchmark.framework_def + task_config = copy(self.task_config) + task_config.type = 'regression' if self._dataset.type == DatasetType.regression else 'classification' + task_config.type_ = self._dataset.type.name + task_config.framework = self.benchmark.framework_name + task_config.framework_params = framework_def.params + task_config.framework_version = self.benchmark._installed_version()[0] + + # allowing to pass framework parameters through command line, e.g.: -Xf.verbose=True -Xf.n_estimators=3000 + if rconfig()['f'] is not None: + task_config.framework_params = ns.dict(ns(framework_def.params) + rconfig().f) + + task_config.output_predictions_file = results._predictions_file + task_config.output_metadata_file = results._metadata_file + touch(os.path.dirname(task_config.output_predictions_file), as_dir=True) + if task_config.metrics is None: + task_config.metrics = as_list(rconfig().benchmarks.metrics[self._dataset.type.name]) + task_config.metric = task_config.metrics[0] + + result = meta_result = None + try: + log.info("Running task %s on framework %s with config:\n%s", task_config.name, self.benchmark.framework_name, repr_def(task_config)) + json_dump(task_config, task_config.output_metadata_file, style='pretty') + meta_result = self.benchmark.framework_module.run(self._dataset, task_config) + except Exception as e: + if rconfig().job_scheduler.exit_on_job_failure: + raise + log.exception(e) + result = ErrorResult(e) + finally: + self._dataset.release() + return results.compute_score(result=result, meta_result=meta_result) + diff --git a/amlb/runners/singularity.py b/amlb/runners/singularity.py index 1c00b8b76..481d85563 100644 --- a/amlb/runners/singularity.py +++ b/amlb/runners/singularity.py @@ -22,6 +22,8 @@ class SingularityBenchmark(ContainerBenchmark): an extension of ContainerBenchmark to run benchmarks inside Singularity. """ + run_mode = 'singularity' + @classmethod def image_name(cls, framework_def, label=None, as_docker_image=False, **kwargs): """ @@ -43,7 +45,7 @@ def image_name(cls, framework_def, label=None, as_docker_image=False, **kwargs): tag = re.sub(r"([^\w.-])", '.', '-'.join(tags)) return f"{author}{image}{separator}{tag}" - def __init__(self, framework_name, benchmark_name, constraint_name): + def __init__(self, framework_name: str, benchmark_name: str, constraint_name: str): """ :param framework_name: diff --git a/recover_results.py b/recover_results.py index 5055af378..af16ca37d 100644 --- a/recover_results.py +++ b/recover_results.py @@ -28,7 +28,6 @@ config_args = ns.parse( root_dir=root_dir, script=os.path.basename(__file__), - run_mode='script', ) + ns.parse(extras) config_args = ns({k: v for k, v in config_args if v is not None}) amlb.resources.from_configs(config, config_args) diff --git a/resources/config.yaml b/resources/config.yaml index 3d8754a89..38319cea1 100644 --- a/resources/config.yaml +++ b/resources/config.yaml @@ -78,7 +78,6 @@ monitoring: # configuration namespace describing the basic monitor results: # configuration namespace for the results.csv file. error_max_length: 200 # the max length of the error message as rendered in the results file. - save: true # set by runbenchmark.py openml: # configuration namespace for openML. apikey: c1994bdb7ecb3c6f3c8f3b35f4b47f1f diff --git a/runbenchmark.py b/runbenchmark.py index e70d1cf8e..383251242 100644 --- a/runbenchmark.py +++ b/runbenchmark.py @@ -9,7 +9,7 @@ import amlb.logger import amlb -from amlb.utils import Namespace as ns, config_load, datetime_iso, str2bool, str_sanitize, zip_path +from amlb.utils import Namespace as ns, config_load, datetime_iso, str2bool, str_iter, str_sanitize, zip_path from amlb import log, AutoMLError @@ -67,10 +67,22 @@ "\n• force: setup is always executed before the benchmark." "\n• only: only setup is executed (no benchmark)." "\n(default: '%(default)s')") -parser.add_argument('-k', '--keep-scores', type=str2bool, metavar='true|false', nargs='?', const=True, default=True, - help="Set to true (default) to save/add scores in output directory.") parser.add_argument('-e', '--exit-on-error', action='store_true', dest="exit_on_error", help="If set, terminates on the first task that does not complete with a model.") +parser.add_argument('-X', '--extra', default=[], action='append', + help="Any property defined in resources/config.yaml can be overridden using" + "\n -Xpath.to.property=value" + "\nIn addition framework params can be overridden using" + "\n -Xf.param=value" + "\nFinally task config properties (the config arg of the framework integration `run` function)" + "\ncan be overridden using" + "\n -Xt.param=value" + "\nNote that the value is converted using ast.literal_eval to allow passing non-string values." + "\nExamples:" + "\n -Xseed=42 (to apply a fixed seed)" + "\n -Xarchive=None (to avoid archiving logs by default)" + "\n -Xf._encode=True (for `constantpredictor`, will use encoded data)" + "\n -Xt.max_runtime_seconds=300 (force each task to timeout after 5min)") parser.add_argument('--logging', type=str, default="console:info,app:debug,root:info", help="Set the log levels for the 3 available loggers:" "\n• console" @@ -79,22 +91,11 @@ "\nAccepted values for each logger are: notset, debug, info, warning, error, fatal, critical." "\nExamples:" "\n --logging=info (applies the same level to all loggers)" - "\n --logging=root:debug (keeps defaults for non-specified loggers)" + "\n --logging=root:debug (keeps defaults for non-specified loggers)" "\n --logging=console:warning,app:info" "\n(default: '%(default)s')") parser.add_argument('--profiling', nargs='?', const=True, default=False, help=argparse.SUPPRESS) parser.add_argument('--session', type=str, default=None, help=argparse.SUPPRESS) -parser.add_argument('-X', '--extra', default=[], action='append', help=argparse.SUPPRESS) -# group = parser.add_mutually_exclusive_group() -# group.add_argument('--keep-scores', dest='keep_scores', action='store_true', -# help="Set to true [default] to save/add scores in output directory") -# group.add_argument('--no-keep-scores', dest='keep_scores', action='store_false') -# parser.set_defaults(keep_scores=True) - -# removing this command line argument for now: by default, we're using the user default region as defined in ~/aws/config -# on top of this, user can now override the aws.region setting in his custom ~/.config/automlbenchmark/config.yaml settings. -# parser.add_argument('-r', '--region', metavar='aws_region', default=None, -# help="The region on which to run the benchmark when using AWS.") args = parser.parse_args() script_name = os.path.splitext(os.path.basename(__file__))[0] @@ -133,7 +134,6 @@ config_user = config_load(extras.get('config', os.path.join(args.userdir or default_dirs.user_dir, "config.yaml"))) # config listing properties set by command line config_args = ns.parse( - {'results.save': args.keep_scores}, input_dir=args.indir, output_dir=args.outdir, user_dir=args.userdir, @@ -144,7 +144,7 @@ exit_on_error=args.exit_on_error, ) + ns.parse(extras) if args.mode != 'local': - config_args + ns.parse({'monitoring.frequency_seconds': 0}) + config_args + ns.parse({'monitoring.frequency_seconds': 0}) # disable system monitoring if benchmark doesn't run locally. config_args = ns({k: v for k, v in config_args if v is not None}) log.debug("Config args: %s.", config_args) # merging all configuration files @@ -153,26 +153,16 @@ code = 0 bench = None try: - if args.mode == 'local': - bench = amlb.Benchmark(args.framework, args.benchmark, args.constraint) - elif args.mode == 'docker': - bench = amlb.DockerBenchmark(args.framework, args.benchmark, args.constraint) - elif args.mode == 'singularity': - bench = amlb.SingularityBenchmark(args.framework, args.benchmark, args.constraint) - elif args.mode == 'aws': - bench = amlb.AWSBenchmark(args.framework, args.benchmark, args.constraint) - # bench = amlb.AWSBenchmark(args.framework, args.benchmark, args.constraint, region=args.region) - # elif args.mode == "aws-remote": - # bench = amlb.AWSRemoteBenchmark(args.framework, args.benchmark, args.constraint, region=args.region) + benchmark_classes = [amlb.LocalBenchmark, amlb.DockerBenchmark, amlb.SingularityBenchmark, amlb.AWSBenchmark] + cls = next((c for c in benchmark_classes if c.run_mode == args.mode), None) + if cls: + bench = cls(args.framework, args.benchmark, args.constraint) else: - raise ValueError("`mode` must be one of 'aws', 'docker', 'singularity' or 'local'.") + raise ValueError("`mode` must be one of %s.".format(str_iter([c.run_mode for c in benchmark_classes]))) if args.setup == 'only': log.warning("Setting up %s environment only for %s, no benchmark will be run.", args.mode, args.framework) - if not args.keep_scores and args.mode != 'local': - log.warning("`keep_scores` parameter is currently ignored in %s mode, scores are always saved in this mode.", args.mode) - bench.setup(amlb.SetupMode[args.setup]) if args.setup != 'only': res = bench.run(args.task, args.fold) diff --git a/runscores.py b/runscores.py index 44c868c72..14e7549ee 100644 --- a/runscores.py +++ b/runscores.py @@ -24,7 +24,6 @@ amlb.logger.setup(root_level='DEBUG', console_level='INFO') config = config_load("resources/config.yaml") -config.run_mode = 'script' config.root_dir = root_dir config.script = os.path.basename(__file__) amlb.resources.from_config(config)