From 26c0a520810bcf9f8a9affff5e1e559978d05f58 Mon Sep 17 00:00:00 2001 From: morrisnein Date: Mon, 20 May 2024 15:47:44 +0000 Subject: [PATCH] experiment stability update --- .../{ => configs}/config.yaml | 2 + .../{ => configs}/config_debug.yaml | 13 +- .../{ => configs}/config_light.yaml | 0 .../{ => configs}/evaluation_config.yaml | 0 .../{ => configs}/fedot_config.yaml | 2 +- .../use_configs.yaml} | 2 +- experiments/fedot_warm_start/run.py | 693 +++++++++++------- experiments/fedot_warm_start/run_v2.py | 0 .../approaches/knn_similarity_model_advice.py | 11 +- .../pymfe_extractor.py | 8 +- .../datasets_train_test_split.py | 33 +- 11 files changed, 470 insertions(+), 294 deletions(-) rename experiments/fedot_warm_start/{ => configs}/config.yaml (90%) rename experiments/fedot_warm_start/{ => configs}/config_debug.yaml (57%) rename experiments/fedot_warm_start/{ => configs}/config_light.yaml (100%) rename experiments/fedot_warm_start/{ => configs}/evaluation_config.yaml (100%) rename experiments/fedot_warm_start/{ => configs}/fedot_config.yaml (92%) rename experiments/fedot_warm_start/{configs_list.yaml => configs/use_configs.yaml} (69%) delete mode 100644 experiments/fedot_warm_start/run_v2.py diff --git a/experiments/fedot_warm_start/config.yaml b/experiments/fedot_warm_start/configs/config.yaml similarity index 90% rename from experiments/fedot_warm_start/config.yaml rename to experiments/fedot_warm_start/configs/config.yaml index 5effe3f9..cbf58399 100644 --- a/experiments/fedot_warm_start/config.yaml +++ b/experiments/fedot_warm_start/configs/config.yaml @@ -1,6 +1,8 @@ --- seed: 42 tmpdir: '/var/essdata/tmp' +update_train_test_datasets_split: true + #data_settings: n_datasets: null # null for all available datasets test_size: 0.25 diff --git a/experiments/fedot_warm_start/config_debug.yaml b/experiments/fedot_warm_start/configs/config_debug.yaml similarity index 57% rename from experiments/fedot_warm_start/config_debug.yaml rename to experiments/fedot_warm_start/configs/config_debug.yaml index 45cfbf20..99e1ec83 100644 --- a/experiments/fedot_warm_start/config_debug.yaml +++ b/experiments/fedot_warm_start/configs/config_debug.yaml @@ -1,16 +1,19 @@ --- seed: 42 save_dir_prefix: debug_ +update_train_test_datasets_split: true #data_settings: -n_datasets: 3 # null for all available datasets -test_size: 0.33 -train_timeout: 1 -test_timeout: 1 +n_datasets: 10 # null for all available datasets +test_size: 0.4 +train_timeout: 15 +test_timeout: 15 n_automl_repetitions: 1 #meta_learning_params: n_best_dataset_models_to_memorize: 10 mf_extractor_params: - groups: general + # groups: general + features: + - nr_inst assessor_params: n_neighbors: 2 advisor_params: diff --git a/experiments/fedot_warm_start/config_light.yaml b/experiments/fedot_warm_start/configs/config_light.yaml similarity index 100% rename from experiments/fedot_warm_start/config_light.yaml rename to experiments/fedot_warm_start/configs/config_light.yaml diff --git a/experiments/fedot_warm_start/evaluation_config.yaml b/experiments/fedot_warm_start/configs/evaluation_config.yaml similarity index 100% rename from experiments/fedot_warm_start/evaluation_config.yaml rename to experiments/fedot_warm_start/configs/evaluation_config.yaml diff --git a/experiments/fedot_warm_start/fedot_config.yaml b/experiments/fedot_warm_start/configs/fedot_config.yaml similarity index 92% rename from experiments/fedot_warm_start/fedot_config.yaml rename to experiments/fedot_warm_start/configs/fedot_config.yaml index 951024cc..bd8f5825 100644 --- a/experiments/fedot_warm_start/fedot_config.yaml +++ b/experiments/fedot_warm_start/configs/fedot_config.yaml @@ -1,7 +1,7 @@ fedot_params: problem: classification logging_level: 10 - n_jobs: -2 + n_jobs: 1 show_progress: false cache_dir: '/var/essdata/tmp/fedot_cache' use_auto_preprocessing: true diff --git a/experiments/fedot_warm_start/configs_list.yaml b/experiments/fedot_warm_start/configs/use_configs.yaml similarity index 69% rename from experiments/fedot_warm_start/configs_list.yaml rename to experiments/fedot_warm_start/configs/use_configs.yaml index b3e2b11f..ac61e8d0 100644 --- a/experiments/fedot_warm_start/configs_list.yaml +++ b/experiments/fedot_warm_start/configs/use_configs.yaml @@ -1,3 +1,3 @@ -- config_debug.yaml +- config.yaml - evaluation_config.yaml - fedot_config.yaml diff --git a/experiments/fedot_warm_start/run.py b/experiments/fedot_warm_start/run.py index 2d3280bd..154129ef 100644 --- a/experiments/fedot_warm_start/run.py +++ b/experiments/fedot_warm_start/run.py @@ -3,16 +3,18 @@ import json import logging import os +import sys import pickle import shutil import timeit -from datetime import datetime -from functools import partial, wraps, reduce +from datetime import datetime, timedelta +from functools import partial, wraps from pathlib import Path from typing import Any, Dict, List, Optional, Sequence, Tuple, Union from uuid import uuid4 import loguru +import numpy as np import openml import pandas as pd import yaml @@ -21,80 +23,118 @@ from fedot.core.optimisers.objective import MetricsObjective, PipelineObjectiveEvaluate from fedot.core.pipelines.pipeline import Pipeline from fedot.core.pipelines.pipeline_builder import PipelineBuilder -from fedot.core.repository.metrics_repository import MetricsRepository, QualityMetricsEnum +from fedot.core.repository.metrics_repository import ( + MetricsRepository, + QualityMetricsEnum, +) from golem.core.optimisers.fitness import Fitness -from golem.core.optimisers.opt_history_objects.opt_history import OptHistory from pecapiku import CacheDict from sklearn.model_selection import train_test_split from tqdm import tqdm from typing_extensions import Literal +sys.path.insert(0, str(Path(__file__).parents[2])) + from gamlet.approaches.knn_similarity_model_advice import KNNSimilarityModelAdvice from gamlet.data_preparation.dataset import DatasetIDType, OpenMLDataset, TabularData -from gamlet.data_preparation.datasets_train_test_split import openml_datasets_train_test_split +from gamlet.data_preparation.datasets_train_test_split import ( + openml_datasets_train_test_split, +) from gamlet.data_preparation.file_system import get_cache_dir -CONFIGS_DIR = Path(__file__).parent +CONFIGS_DIR = Path(__file__).parent / "configs" -with open(CONFIGS_DIR / 'configs_list.yaml', 'r') as config_file: +with open(CONFIGS_DIR / "use_configs.yaml", "r") as config_file: configs_list = yaml.load(config_file, yaml.Loader) config = {} for conf_name in configs_list: - with open(CONFIGS_DIR / conf_name, 'r') as config_file: + with open(CONFIGS_DIR / conf_name, "r") as config_file: conf = yaml.load(config_file, yaml.Loader) intersection = set(config).intersection(set(conf)) if intersection: - raise ValueError(f'Parameter values given twice: {conf_name}, {intersection}.') + raise ValueError(f"Parameter values given twice: {conf_name}, {intersection}.") config.update(conf) # Load constants -SEED = config['seed'] -N_DATASETS = config['n_datasets'] -TEST_SIZE = config['test_size'] -TRAIN_TIMEOUT = config['train_timeout'] -TEST_TIMEOUT = config['test_timeout'] -N_BEST_DATASET_MODELS_TO_MEMORIZE = config['n_best_dataset_models_to_memorize'] -ASSESSOR_PARAMS = config['assessor_params'] -ADVISOR_PARAMS = config['advisor_params'] -MF_EXTRACTOR_PARAMS = config['mf_extractor_params'] -COLLECT_METRICS = config['collect_metrics'] -FEDOT_PARAMS = config['fedot_params'] -DATA_TEST_SIZE = config['data_test_size'] -DATA_SPLIT_SEED = config['data_split_seed'] -BASELINE_MODEL = config['baseline_model'] -N_AUTOML_REPETITIONS = config['n_automl_repetitions'] +SEED = config["seed"] +N_DATASETS = config["n_datasets"] +TEST_SIZE = config["test_size"] +TRAIN_TIMEOUT = config["train_timeout"] +TEST_TIMEOUT = config["test_timeout"] +N_BEST_DATASET_MODELS_TO_MEMORIZE = config["n_best_dataset_models_to_memorize"] +ASSESSOR_PARAMS = config["assessor_params"] +ADVISOR_PARAMS = config["advisor_params"] +MF_EXTRACTOR_PARAMS = config["mf_extractor_params"] +COLLECT_METRICS = config["collect_metrics"] +FEDOT_PARAMS = config["fedot_params"] +DATA_TEST_SIZE = config["data_test_size"] +DATA_SPLIT_SEED = config["data_split_seed"] +BASELINE_MODEL = config["baseline_model"] +N_AUTOML_REPETITIONS = config["n_automl_repetitions"] # Optional values -TMPDIR = config.get('tmpdir') -SAVE_DIR_PREFIX = config.get('save_dir_prefix') +TMPDIR = config.get("tmpdir") +SAVE_DIR_PREFIX = config.get("save_dir_prefix") -UPDATE_TRAIN_TEST_DATASETS_SPLIT = config.get('update_train_test_datasets_split') +UPDATE_TRAIN_TEST_DATASETS_SPLIT = config.get("update_train_test_datasets_split") # Postprocess constants COLLECT_METRICS_ENUM = tuple(map(MetricsRepository.get_metric, COLLECT_METRICS)) -COLLECT_METRICS[COLLECT_METRICS.index('neg_log_loss')] = 'logloss' +COLLECT_METRICS[COLLECT_METRICS.index("neg_log_loss")] = "logloss" + + +def setup_experiment(): + # Preparation + experiment_date, experiment_date_iso, experiment_date_for_path = ( + get_current_formatted_date() + ) + save_dir = get_save_dir(experiment_date_for_path) + setup_logging(save_dir) + if TMPDIR: + os.environ.putenv("TMPDIR", TMPDIR) + meta_learner_path = save_dir.joinpath("meta_learner.pkl") + dataset_ids = get_dataset_ids() + dataset_ids_train, dataset_ids_test = split_datasets( + dataset_ids, N_DATASETS, UPDATE_TRAIN_TEST_DATASETS_SPLIT + ) + dataset_ids = dataset_ids_train + dataset_ids_test + experiment_params_dict = dict( + experiment_start_date_iso=experiment_date_iso, + input_config=config, + dataset_ids=dataset_ids, + dataset_ids_train=dataset_ids_train, + dataset_ids_test=dataset_ids_test, + baseline_pipeline=BASELINE_MODEL, + ) + save_experiment_params(experiment_params_dict, save_dir) + return ( + dataset_ids_test, + dataset_ids_train, + experiment_date, + meta_learner_path, + save_dir, + ) def setup_logging(save_dir: Path): - """ Creates "log.txt" at the "save_dir" and redirects all logging output to it. """ + """Creates "log.txt" at the "save_dir" and redirects all logging output to it.""" loguru.logger.add(save_dir / "file_{time}.log") - log_file = save_dir.joinpath('log.txt') + log_file = save_dir.joinpath("log.txt") logging.basicConfig( filename=log_file, - filemode='a', - format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', - datefmt='%H:%M:%S', + filemode="a", + format="%(asctime)s %(name)s %(levelname)s %(message)s", force=True, - level=logging.DEBUG, + level=logging.NOTSET, ) def get_current_formatted_date() -> Tuple[datetime, str, str]: - """ Returns current date in the following formats: + """Returns current date in the following formats: - 1. datetime - 2. str: ISO - 3. str: ISO compatible with Windows file system path (with "." instead of ":") """ + 1. datetime + 2. str: ISO + 3. str: ISO compatible with Windows file system path (with "." instead of ":")""" time_now = datetime.now() time_now_iso = time_now.isoformat(timespec="minutes") time_now_for_path = time_now_iso.replace(":", ".") @@ -102,8 +142,12 @@ def get_current_formatted_date() -> Tuple[datetime, str, str]: def get_save_dir(time_now_for_path) -> Path: - save_dir = get_cache_dir(). \ - joinpath('experiments').joinpath('fedot_warm_start').joinpath(f'run_{time_now_for_path}') + save_dir = ( + get_cache_dir() + .joinpath("experiments") + .joinpath("fedot_warm_start") + .joinpath(f"run_{time_now_for_path}") + ) if SAVE_DIR_PREFIX: save_dir = save_dir.with_name(SAVE_DIR_PREFIX + save_dir.name) if save_dir.exists(): @@ -121,18 +165,21 @@ def get_dataset_ids() -> List[DatasetIDType]: return list(dataset_ids) -def split_datasets(dataset_ids, n_datasets: Optional[int] = None, update_train_test_split: bool = False) \ - -> Tuple[pd.DataFrame, pd.DataFrame]: - split_path = Path(__file__).parent / 'train_test_datasets_split.csv' +def split_datasets( + dataset_ids, n_datasets: Optional[int] = None, update_train_test_split: bool = False +) -> Tuple[list, list]: + split_path = Path(__file__).parent / "train_test_datasets_split.csv" if update_train_test_split: - df_split_datasets = openml_datasets_train_test_split(dataset_ids, test_size=TEST_SIZE, seed=SEED) + df_split_datasets = openml_datasets_train_test_split( + dataset_ids, test_size=TEST_SIZE, seed=SEED + ) df_split_datasets.to_csv(split_path) else: df_split_datasets = pd.read_csv(split_path, index_col=0) - df_train = df_split_datasets[df_split_datasets['is_train'] == 1] - df_test = df_split_datasets[df_split_datasets['is_train'] == 0] + df_train = df_split_datasets[df_split_datasets["is_train"] == 1] + df_test = df_split_datasets[df_split_datasets["is_train"] == 0] if n_datasets is not None: frac = n_datasets / len(df_split_datasets) @@ -145,13 +192,14 @@ def split_datasets(dataset_ids, n_datasets: Optional[int] = None, update_train_t return datasets_train, datasets_test -def evaluate_pipeline(pipeline: Pipeline, - train_data: TabularData, - test_data: TabularData, - metrics: Sequence[QualityMetricsEnum] = COLLECT_METRICS_ENUM, - metric_names: Sequence[str] = COLLECT_METRICS, - mode: Literal['fitness', 'float'] = 'float' - ) -> Union[Dict[str, float], Tuple[Fitness, Sequence[str]]]: +def evaluate_pipeline( + pipeline: Pipeline, + train_data: TabularData, + test_data: TabularData, + metrics: Sequence[QualityMetricsEnum] = COLLECT_METRICS_ENUM, + metric_names: Sequence[str] = COLLECT_METRICS, + mode: Literal["fitness", "float"] = "float", +) -> Union[Dict[str, float], Tuple[Fitness, Sequence[str]]]: """Gets quality metrics for the fitted pipeline. The function is based on `Fedot.get_metrics()` @@ -165,87 +213,97 @@ def data_producer(): yield train_data, test_data objective = MetricsObjective(metrics) - obj_eval = PipelineObjectiveEvaluate(objective=objective, - data_producer=data_producer, - eval_n_jobs=-1) + obj_eval = PipelineObjectiveEvaluate( + objective=objective, data_producer=data_producer, eval_n_jobs=-1 + ) fitness = obj_eval.evaluate(pipeline) - if mode == 'float': + if mode == "float": metric_values = fitness.values - metric_values = {metric_name: round(value, 3) for (metric_name, value) in zip(metric_names, metric_values)} + metric_values = { + metric_name: round(value, 3) + for (metric_name, value) in zip(metric_names, metric_values) + } return metric_values - if mode == 'fitness': + if mode == "fitness": return fitness, metric_names -def timed(func, resolution: Literal['sec', 'min'] = 'min'): +def timed(func): @wraps(func) def wrapper(*args, **kwargs): time_start = timeit.default_timer() result = func(*args, **kwargs) - time_delta = timeit.default_timer() - time_start - if resolution == 'min': - time_delta /= 60 + time_delta = timedelta(seconds=timeit.default_timer() - time_start) return result, time_delta return wrapper -def fit_evaluate_automl(fit_func, evaluate_func) -> (Fedot, Dict[str, Any]): - """ Runs Fedot evaluation on the dataset, the evaluates the final pipeline on the dataset.. """ +def fit_evaluate_automl( + fit_func, evaluate_func +) -> Tuple[Fedot, Dict[str, Any], timedelta]: + """Runs Fedot evaluation on the dataset, the evaluates the final pipeline on the dataset..""" result, fit_time = timed(fit_func)() metrics = evaluate_func(result) return result, metrics, fit_time -def fit_evaluate_pipeline(pipeline, fit_func, evaluate_func) -> (Fedot, Dict[str, Any]): - """ Runs Fedot evaluation on the dataset, the evaluates the final pipeline on the dataset.. """ +def fit_evaluate_pipeline( + pipeline, fit_func, evaluate_func +) -> Tuple[Fedot, Dict[str, Any], timedelta]: + """Runs Fedot evaluation on the dataset, the evaluates the final pipeline on the dataset..""" _, fit_time = timed(fit_func)() metrics = evaluate_func(pipeline) return pipeline, metrics, fit_time def save_experiment_params(params_dict: Dict[str, Any], save_dir: Path): - """ Save the hyperparameters of the experiment """ - params_file_path = save_dir.joinpath('parameters.json') - with open(params_file_path, 'w') as params_file: + """Save the hyperparameters of the experiment""" + params_file_path = save_dir.joinpath("parameters.json") + with open(params_file_path, "w") as params_file: json.dump(params_dict, params_file, indent=2) def save_evaluation(save_dir: Path, dataset, pipeline, **kwargs): - run_results: Dict[str, Any] = dict(dataset_id=dataset.id, - dataset_name=dataset.name, - model_obj=pipeline, - model_str=pipeline.descriptive_id, - task_type='classification', - **kwargs) + run_results: Dict[str, Any] = dict( + dataset_id=dataset.id, + dataset_name=dataset.name, + model_obj=pipeline, + model_str=pipeline.descriptive_id, + task_type="classification", + **kwargs, + ) try: - histories_dir = save_dir.joinpath('histories') - models_dir = save_dir.joinpath('models') - eval_results_path = save_dir.joinpath('evaluation_results.csv') + histories_dir = save_dir.joinpath("histories") + models_dir = save_dir.joinpath("models") + eval_results_path = save_dir.joinpath("evaluation_results.csv") histories_dir.mkdir(exist_ok=True) models_dir.mkdir(exist_ok=True) - dataset_id = run_results['dataset_id'] - run_label = run_results['run_label'] + dataset_id = run_results["dataset_id"] + run_label = run_results["run_label"] # define saving paths uid = str(uuid4()) - model_path = models_dir.joinpath(f'{dataset_id}_{run_label}_{uid}') - history_path = histories_dir.joinpath(f'{dataset_id}_{run_label}_{uid}_history.json') + model_path = models_dir.joinpath(f"{dataset_id}_{run_label}_{uid}") + history_path = histories_dir.joinpath( + f"{dataset_id}_{run_label}_{uid}_history.json" + ) # replace objects with export paths for csv - run_results['model_path'] = str(model_path) - run_results.pop('model_obj').save(model_path, create_subdir=False) - run_results['history_path'] = str(history_path) - if 'history_obj' in run_results: - history_obj = run_results.pop('history_obj') + run_results["model_path"] = str(model_path) + run_results.pop("model_obj").save(model_path, create_subdir=False) + run_results["history_path"] = str(history_path) + if "history_obj" in run_results: + history_obj = run_results.pop("history_obj") if history_obj is not None: - history_obj.save(run_results['history_path']) + history_obj.save(run_results["history_path"]) + run_results["history_obj"] = history_obj df_evaluation_properties = pd.DataFrame([run_results]) if eval_results_path.exists(): - df_results = pd.read_csv(eval_results_path) + df_results = pd.read_csv(eval_results_path, index_col=None) df_results = pd.concat([df_results, df_evaluation_properties]) else: df_results = df_evaluation_properties @@ -257,138 +315,327 @@ def save_evaluation(save_dir: Path, dataset, pipeline, **kwargs): raise e -def run_fedot_attempt(train_data: TabularData, test_data: TabularData, timeout: float, - run_label: str, repetition: int, experiment_date: datetime, save_dir: Path, - fedot_evaluations_cache: CacheDict, - initial_assumption: Optional[Sequence[Pipeline]] = None, meta_learning_time_sec: float = 0.): - fedot = Fedot(timeout=timeout, initial_assumption=initial_assumption, **FEDOT_PARAMS) +def run_fedot_attempt( + train_data: TabularData, + test_data: TabularData, + timeout: float, + run_label: str, + repetition: int, + experiment_date: datetime, + save_dir: Path, + initial_assumption: Optional[Sequence[Pipeline]] = None, + fedot_evaluations_cache=None, +): + fedot = Fedot( + timeout=timeout, initial_assumption=initial_assumption, **FEDOT_PARAMS + ) fit_func = partial(fedot.fit, features=train_data.x, target=train_data.y) - evaluate_func = partial(evaluate_pipeline, train_data=train_data, test_data=test_data) + evaluate_func = partial( + evaluate_pipeline, train_data=train_data, test_data=test_data + ) run_date = datetime.now() - # cache_key = f'{run_label}_{train_data.id}_{timeout}_{repetition}' - # with fedot_evaluations_cache as cache_dict: - # cached_run = cache_dict[cache_key] - # if cached_run: - # fedot = cached_run['fedot'] - # pipeline = cached_run['pipeline'] - # metrics = cached_run['metrics'] - # fit_time = cached_run['fit_time'] - # else: - # pipeline, metrics, fit_time = fit_evaluate_automl(fit_func=fit_func, evaluate_func=evaluate_func) - # cached_run = dict( - # fedot=fedot, - # pipeline=pipeline, - # metrics=metrics, - # fit_time=fit_time, - # ) - # cache_dict[cache_key] = cached_run - pipeline, metrics, fit_time = fit_evaluate_automl(fit_func=fit_func, evaluate_func=evaluate_func) + cache_key = f"{run_label}_{train_data.id}_{timeout}_{repetition}" + with fedot_evaluations_cache as cache_dict: + cached_run = cache_dict[cache_key] + if cached_run: + fedot = cached_run["fedot"] + pipeline = cached_run["pipeline"] + metrics = cached_run["metrics"] + fit_time = cached_run["fit_time"] + else: + # pipeline, metrics, fit_time = fit_evaluate_automl(fit_func=fit_func, evaluate_func=evaluate_func) + # cached_run = dict( + # fedot=fedot, + # pipeline=pipeline, + # metrics=metrics, + # fit_time=fit_time, + # ) + # cache_dict[cache_key] = cached_run + pipeline, metrics, fit_time = fit_evaluate_automl( + fit_func=fit_func, evaluate_func=evaluate_func + ) eval_result = dict( dataset=train_data.dataset, run_label=run_label, pipeline=pipeline, - meta_learning_time_sec=meta_learning_time_sec, - automl_time_min=fit_time, + automl_time_min=fit_time.total_seconds() / 60, automl_timeout_min=fedot.params.timeout, generations_count=fedot.history.generations_count, history_obj=fedot.history, run_data=run_date, experiment_date=experiment_date, save_dir=save_dir, - **metrics + **metrics, ) return eval_result -def run_pipeline(train_data: TabularData, test_data: TabularData, pipeline: Pipeline, - run_label: str, experiment_date: datetime, save_dir: Path): +def run_pipeline( + train_data: TabularData, + test_data: TabularData, + pipeline: Pipeline, + run_label: str, + experiment_date: datetime, + save_dir: Path, +): train_data_for_fedot = array_to_input_data(train_data.x, train_data.y) fit_func = partial(pipeline.fit, train_data_for_fedot) - evaluate_func = partial(evaluate_pipeline, train_data=train_data, test_data=test_data) + evaluate_func = partial( + evaluate_pipeline, train_data=train_data, test_data=test_data + ) run_date = datetime.now() - pipeline, metrics, fit_time = fit_evaluate_pipeline(pipeline=pipeline, fit_func=fit_func, - evaluate_func=evaluate_func) - save_evaluation(dataset=train_data.dataset, - run_label=run_label, - pipeline=pipeline, - automl_time_min=0, - pipeline_fit_time=fit_time, - automl_timeout_min=0, - meta_learning_time_sec=0, - run_data=run_date, - experiment_date=experiment_date, - save_dir=save_dir, - **metrics) + pipeline, metrics, fit_time = fit_evaluate_pipeline( + pipeline=pipeline, fit_func=fit_func, evaluate_func=evaluate_func + ) + save_evaluation( + dataset=train_data.dataset, + run_label=run_label, + pipeline=pipeline, + automl_time_min=0, + pipeline_fit_time_sec=fit_time.total_seconds(), + automl_timeout_min=0, + meta_learning_time_sec=0, + run_data=run_date, + experiment_date=experiment_date, + save_dir=save_dir, + **metrics, + ) return pipeline +def get_datasets_eval_funcs(dataset_ids_train, dataset_splits): + dataset_eval_funcs = [] + for dataset_id in dataset_ids_train: + split = dataset_splits[dataset_id] + train_data, test_data = split["train"], split["test"] + model_eval_func = partial( + evaluate_pipeline, + train_data=train_data, + test_data=test_data, + mode="fitness", + ) + dataset_eval_funcs.append(model_eval_func) + return dataset_eval_funcs + + +def get_datasets_data_splits(dataset_ids): + dataset_splits = {} + for dataset_id in dataset_ids: + dataset = OpenMLDataset(dataset_id) + dataset_data = dataset.get_data() + if isinstance(dataset_data.y[0], bool): + dataset_data.y = np.array(list(map(str, dataset_data.y))) + idx_train, idx_test = train_test_split( + range(len(dataset_data.y)), + test_size=DATA_TEST_SIZE, + stratify=dataset_data.y, + shuffle=True, + random_state=DATA_SPLIT_SEED, + ) + train_data, test_data = dataset_data[idx_train], dataset_data[idx_test] + dataset_splits[dataset_id] = dict(train=train_data, test=test_data) + return dataset_splits + + +def evaluate_fedot_on_dataset( + train_data: TabularData, + test_data: TabularData, + timeout: float, + run_label: str, + experiment_date: datetime, + save_dir: Path, + fedot_evaluations_cache: CacheDict, + initial_assumption: Optional[Sequence[Pipeline]] = None, + meta_learning_time: Optional[timedelta] = None, +): + meta_learning_time = meta_learning_time or timedelta(0) + dataset = train_data.dataset + + eval_results = [] + for repetition in range(N_AUTOML_REPETITIONS): + try: + eval_result, time_delta = timed(run_fedot_attempt)( + train_data, + test_data, + timeout, + run_label, + repetition, + experiment_date, + save_dir, + initial_assumption, + fedot_evaluations_cache, + ) + time_limit = timedelta(minutes=timeout * 2) + if time_delta > time_limit: + logging.warning( + f'Dataset "{dataset.id}" TIMEOUT REACHED, {time_delta}.' + ) + return None + + eval_results.append(eval_result) + except Exception as e: + logging.warning(f'Dataset "{dataset.id}" skipepd: {e}') + logging.exception(f'Dataset "{dataset.id}"') + if __debug__: + raise e + return None + + generations_total = sum( + map(lambda ev_res: ev_res["history_obj"].generations_count, eval_results) + ) + if generations_total == 0: + logging.warning(f'Dataset "{dataset.id}": zero generations obtained.') + return None + + for eval_result in eval_results: + eval_result["meta_learning_time_sec"] = meta_learning_time.total_seconds() + save_evaluation(**eval_result) + + histories = list(map(lambda r: r["history_obj"], eval_results)) + + return histories + + @loguru.logger.catch def main(): - dataset_ids_test, dataset_ids_train, experiment_date, meta_learner_path, save_dir = setup_experiment() + ( + dataset_ids_test, + dataset_ids_train, + experiment_date, + meta_learner_path, + save_dir, + ) = setup_experiment() - # fit_fedot_cached = CacheDict.decorate(fit_evaluate_automl, get_cache_dir() / 'fedot_runs.pkl', inner_key='dataset.id') dataset_splits = get_datasets_data_splits(dataset_ids_test + dataset_ids_train) - datasets_eval_funcs = get_datasets_eval_funcs(dataset_ids_train, dataset_splits) algorithm = KNNSimilarityModelAdvice( N_BEST_DATASET_MODELS_TO_MEMORIZE, MF_EXTRACTOR_PARAMS, ASSESSOR_PARAMS, - ADVISOR_PARAMS + ADVISOR_PARAMS, ) - # Experiment start - knowledge_base = {dataset_id: [] for dataset_id in dataset_ids_train} - fedot_evaluations_cache = CacheDict(get_cache_dir() / 'fedot_runs.pkl') - description = 'FEDOT, train datasets' - for dataset_id in (pbar := tqdm(dataset_ids_train, description)): - pbar.set_description(description + f' ({dataset_id})') - train_data, test_data = dataset_splits[dataset_id]['train'], dataset_splits[dataset_id]['test'] - run_label = 'FEDOT' - evaluate_fedot_on_dataset(train_data, test_data, TRAIN_TIMEOUT, run_label, experiment_date, save_dir, - fedot_evaluations_cache) - # knowledge_base[dataset_id] = gain_knowledge_base_for_dataset(dataset_id, experiment_date, - # fedot_evaluations_cache, - # run_label, save_dir, - # test_data, TRAIN_TIMEOUT, train_data) - # knowledge_base[dataset_id] = [fedot.history for fedot in fedots] - - description = 'FEDOT, test datasets' - for dataset_id in (pbar := tqdm(dataset_ids_test, description)): - pbar.set_description(description + f' ({dataset_id})') - train_data, test_data = dataset_splits[dataset_id]['train'], dataset_splits[dataset_id]['test'] - run_label = 'FEDOT' - evaluate_fedot_on_dataset(train_data, test_data, TEST_TIMEOUT, run_label, experiment_date, save_dir, - fedot_evaluations_cache) + # knowledge_base = {dataset_id: [] for dataset_id in dataset_ids_train} + knowledge_base = {} + skipped_datasets = set() + fedot_evaluations_cache = CacheDict(get_cache_dir() / "fedot_runs.pkl") + # fedot_evaluations_cache = None + # evaluate_fedot_on_dataset_cached = CacheDict.decorate(evaluate_fedot_on_dataset, get_cache_dir() / 'fedot_runs.pkl', inner_key='train_data.id') + description = "FEDOT, all datasets ({dataset_id})" + for dataset_id in (pbar := tqdm(dataset_ids_train + dataset_ids_test, description)): + pbar.set_description(description.format(dataset_id=dataset_id)) + train_data, test_data = ( + dataset_splits[dataset_id]["train"], + dataset_splits[dataset_id]["test"], + ) + run_label = "FEDOT" + timeout = TRAIN_TIMEOUT if dataset_id in dataset_ids_test else TEST_TIMEOUT + histories = evaluate_fedot_on_dataset( + train_data, + test_data, + timeout, + run_label, + experiment_date, + save_dir, + fedot_evaluations_cache, + ) + if histories is not None: + if dataset_id in dataset_ids_train: + knowledge_base[dataset_id] = histories + continue + # Error processing - throw the dataset out + skipped_datasets.add(dataset_id) + if dataset_id in dataset_ids_train: + del dataset_ids_train[dataset_ids_train.index(dataset_id)] + else: + del dataset_ids_test[dataset_ids_test.index(dataset_id)] + + with open(save_dir / "skipped_datasets.txt", "w") as f: + f.write("\n".join(map(str, skipped_datasets))) ############################### - kb_datasets_data = [OpenMLDataset(dataset).get_data() for dataset in knowledge_base.keys()] + kb_datasets_data = [ + OpenMLDataset(dataset).get_data() for dataset in knowledge_base.keys() + ] + # datasets_eval_funcs = get_datasets_eval_funcs(dataset_ids_train, dataset_splits) + datasets_eval_funcs = None kb_histories = list(knowledge_base.values()) ############################### # Meta-Learning algorithm.fit(kb_datasets_data, kb_histories, datasets_eval_funcs) - with open(meta_learner_path, 'wb') as meta_learner_file: + for dataset_id in dataset_ids_train: + if dataset_id not in algorithm.data.dataset_ids: + skipped_datasets.add(dataset_id) + del dataset_ids_train[dataset_ids_train.index(dataset_id)] + with open(save_dir / "skipped_datasets.txt", "w") as f: + f.write("\n".join(map(str, skipped_datasets))) + + with open(meta_learner_path, "wb") as meta_learner_file: pickle.dump(algorithm, meta_learner_file) # Application - description = 'MetaFEDOT, Test datasets' + # evaluate_metafedot_on_dataset_cached = CacheDict.decorate(evaluate_fedot_on_dataset, get_cache_dir() / 'metafedot_runs.pkl', inner_key='train_data.id') + fedot_evaluations_cache = CacheDict(get_cache_dir() / "metafedot_runs.pkl") + description = "FEDOT, test datasets ({dataset_id})" for dataset_id in (pbar := tqdm(dataset_ids_test, description)): - pbar.set_description(description + f' ({dataset_id})') - train_data, test_data = dataset_splits[dataset_id]['train'], dataset_splits[dataset_id]['test'] + pbar.set_description(description.format(dataset_id=dataset_id)) + train_data, test_data = ( + dataset_splits[dataset_id]["train"], + dataset_splits[dataset_id]["test"], + ) # Run meta AutoML # 1 - initial_assumptions, meta_learning_time_sec = timed(algorithm.predict, resolution='sec')([train_data]) + try: + initial_assumptions, meta_learning_time = timed(algorithm.predict)( + [train_data] + ) + if not initial_assumptions: + raise ValueError("No intial assumptions.") + except Exception: + logging.exception( + f'Dataset "{dataset_id}" skipepd, meta learner could not predict: {e}' + ) + skipped_datasets.add(dataset_id) + del dataset_ids_test[dataset_ids_test.index(dataset_id)] + continue + initial_assumptions = initial_assumptions[0] assumption_pipelines = [model.predictor for model in initial_assumptions] # 2 baseline_pipeline = PipelineBuilder().add_node(BASELINE_MODEL).build() - run_label = 'MetaFEDOT' - evaluate_fedot_on_dataset(train_data, test_data, TEST_TIMEOUT, run_label, experiment_date, save_dir, - fedot_evaluations_cache, assumption_pipelines, meta_learning_time_sec) + run_label = "MetaFEDOT" + try: + histories = evaluate_fedot_on_dataset( + train_data, + test_data, + TEST_TIMEOUT, + run_label, + experiment_date, + save_dir, + fedot_evaluations_cache, + assumption_pipelines, + meta_learning_time, + ) + if histories is None: + raise ValueError("No results.") + except Exception as e: + logging.exception( + f'Dataset "{dataset_id}" skipepd, meta fedot could not finish: {e}' + ) + skipped_datasets.add(dataset_id) + del dataset_ids_test[dataset_ids_test.index(dataset_id)] + continue # Fit & evaluate simple baseline - run_label = 'simple baseline' + run_label = "simple baseline" try: - run_pipeline(train_data, test_data, baseline_pipeline, run_label, experiment_date, save_dir) + run_pipeline( + train_data, + test_data, + baseline_pipeline, + run_label, + experiment_date, + save_dir, + ) except Exception as e: logging.exception(f'Test dataset "{dataset_id}", {run_label}') if __debug__: @@ -397,107 +644,27 @@ def main(): for i, assumption in enumerate(initial_assumptions): try: pipeline = assumption.predictor - run_label = f'MetaFEDOT - initial assumption {i}' - run_pipeline(train_data, test_data, pipeline, run_label, experiment_date, save_dir) + run_label = f"MetaFEDOT - initial assumption {i}" + run_pipeline( + train_data, + test_data, + pipeline, + run_label, + experiment_date, + save_dir, + ) except Exception as e: logging.exception(f'Test dataset "{dataset_id}", {run_label}') if __debug__: raise e - -def get_datasets_eval_funcs(dataset_ids_train, dataset_splits): - dataset_eval_funcs = [] - for dataset_id in dataset_ids_train: - split = dataset_splits[dataset_id] - train_data, test_data = split['train'], split['test'] - model_eval_func = partial(evaluate_pipeline, train_data=train_data, test_data=test_data, mode='fitness') - dataset_eval_funcs.append(model_eval_func) - return dataset_eval_funcs - - -def get_datasets_data_splits(dataset_ids): - dataset_splits = {} - for dataset_id in dataset_ids: - dataset = OpenMLDataset(dataset_id) - dataset_data = dataset.get_data() - idx_train, idx_test = train_test_split(range(len(dataset_data.y)), - test_size=DATA_TEST_SIZE, - stratify=dataset_data.y, - shuffle=True, - random_state=DATA_SPLIT_SEED) - train_data, test_data = dataset_data[idx_train], dataset_data[idx_test] - dataset_splits[dataset_id] = dict(train=train_data, test=test_data) - return dataset_splits - - -def setup_experiment(): - # Preparation - experiment_date, experiment_date_iso, experiment_date_for_path = get_current_formatted_date() - save_dir = get_save_dir(experiment_date_for_path) - setup_logging(save_dir) - if TMPDIR: - os.environ.putenv('TMPDIR', TMPDIR) - meta_learner_path = save_dir.joinpath('meta_learner.pkl') - dataset_ids = get_dataset_ids() - dataset_ids_train, dataset_ids_test = split_datasets(dataset_ids, N_DATASETS, UPDATE_TRAIN_TEST_DATASETS_SPLIT) - dataset_ids = dataset_ids_train + dataset_ids_test - experiment_params_dict = dict( - experiment_start_date_iso=experiment_date_iso, - input_config=config, - dataset_ids=dataset_ids, - dataset_ids_train=dataset_ids_train, - dataset_ids_test=dataset_ids_test, - baseline_pipeline=BASELINE_MODEL, - ) - save_experiment_params(experiment_params_dict, save_dir) - return dataset_ids_test, dataset_ids_train, experiment_date, meta_learner_path, save_dir - - -def evaluate_fedot_on_dataset(train_data: TabularData, test_data: TabularData, timeout: float, - run_label: str, experiment_date: datetime, save_dir: Path, - fedot_evaluations_cache: CacheDict, - initial_assumption: Optional[Sequence[Pipeline]] = None, - meta_learning_time_sec: float = 0.): - dataset = train_data.dataset - eval_results = [] - for repetition in range(N_AUTOML_REPETITIONS): - try: - eval_result, time_delta = timed( - run_fedot_attempt(train_data, test_data, timeout, run_label, repetition, experiment_date, save_dir, - fedot_evaluations_cache)) - # TODO: - # x Start FEDOT `N_BEST_DATASET_MODELS_TO_MEMORIZE` times, but not in one run - - # TODO: Условие на прерывание - eval_results.append(eval_result) - except Exception as e: - logging.exception(f'Dataset "{dataset.id}"') - if __debug__: - raise e - - for eval_result in eval_results: - save_evaluation(**eval_result) - - return eval_results - - -def gain_knowledge_base_for_dataset(train_data: TabularData, test_data: TabularData, timeout: float, - run_label: str, experiment_date: datetime, save_dir: Path, - fedot_evaluations_cache: CacheDict, - initial_assumption: Optional[Sequence[Pipeline]] = None, - meta_learning_time_sec: float = 0.): - eval_results = evaluate_fedot_on_dataset(train_data, test_data, timeout, - run_label, experiment_date, save_dir, - fedot_evaluations_cache, - initial_assumption, - meta_learning_time_sec) - histories = reduce([OptHistory.load, ], [res['history_path'] for res in eval_results]) - return histories + with open(save_dir / "skipped_datasets.txt", "w") as f: + f.write("\n".join(map(str, skipped_datasets))) if __name__ == "__main__": try: main() except Exception as e: - logging.exception('Exception at main().') + logging.exception("Exception at main().") raise e diff --git a/experiments/fedot_warm_start/run_v2.py b/experiments/fedot_warm_start/run_v2.py deleted file mode 100644 index e69de29b..00000000 diff --git a/gamlet/approaches/knn_similarity_model_advice.py b/gamlet/approaches/knn_similarity_model_advice.py index d47b0835..9a42c204 100644 --- a/gamlet/approaches/knn_similarity_model_advice.py +++ b/gamlet/approaches/knn_similarity_model_advice.py @@ -4,6 +4,7 @@ from typing import Callable, List, Optional, Sequence from golem.core.optimisers.opt_history_objects.opt_history import OptHistory +import pandas as pd from sklearn.preprocessing import MinMaxScaler from gamlet.approaches import MetaLearningApproach @@ -55,7 +56,7 @@ class Components: class Data: meta_features: DatasetMetaFeatures = None datasets: List[OpenMLDataset] = None - datasets_data: List[OpenMLDataset] = None + datasets_data: List[TabularData] = None dataset_ids: List[DatasetIDType] = None best_models: List[List[EvaluatedModel]] = None @@ -66,11 +67,11 @@ def fit(self, data = self.data params = self.parameters - data.datasets_data = list(datasets_data) - data.datasets = [d.dataset for d in datasets_data] - data.dataset_ids = [d.id for d in datasets_data] + data.meta_features = self.extract_train_meta_features(datasets_data) + data.dataset_ids = list(data.meta_features.index) + data.datasets_data = [d_d for d_d in datasets_data if d_d.id in data.dataset_ids] + data.datasets = [d_d.dataset for d_d in data.datasets_data] - data.meta_features = self.extract_train_meta_features(data.datasets_data) self.fit_datasets_similarity_assessor(data.meta_features, data.dataset_ids) data.best_models = self.load_models(data.datasets, histories, params.n_best_dataset_models_to_memorize, diff --git a/gamlet/components/meta_features_extractors/pymfe_extractor.py b/gamlet/components/meta_features_extractors/pymfe_extractor.py index 91702523..4a02f29b 100644 --- a/gamlet/components/meta_features_extractors/pymfe_extractor.py +++ b/gamlet/components/meta_features_extractors/pymfe_extractor.py @@ -5,6 +5,7 @@ from functools import partial from typing import Any, Dict, Optional, Sequence, Tuple, Union +import numpy as np import pandas as pd from pymfe.mfe import MFE from tqdm import tqdm @@ -31,8 +32,11 @@ def extract(self, data_sequence: Sequence[Union[DatasetBase, TabularData]], for i, dataset_data in enumerate(tqdm(data_sequence, desc='Extracting meta features of the datasets')): if isinstance(dataset_data, DatasetBase): dataset_data = dataset_data.get_data() - meta_features = self._extract_single(dataset_data, fill_input_nans, fit_kwargs, extract_kwargs) - accumulated_meta_features.append(meta_features) + try: + meta_features = self._extract_single(dataset_data, fill_input_nans, fit_kwargs, extract_kwargs) + accumulated_meta_features.append(meta_features) + except Exception: + logger.exception(f'Dataset {dataset_data.dataset}: error while meta-features extractin.') output = DatasetMetaFeatures(pd.concat(accumulated_meta_features), is_summarized=self.summarize_features, features=self.features) diff --git a/gamlet/data_preparation/datasets_train_test_split.py b/gamlet/data_preparation/datasets_train_test_split.py index 75e97e19..ebd26abf 100644 --- a/gamlet/data_preparation/datasets_train_test_split.py +++ b/gamlet/data_preparation/datasets_train_test_split.py @@ -29,24 +29,23 @@ def openml_datasets_train_test_split(dataset_ids: List[OpenMLDatasetIDType], tes single_value_categories = cat_counts[cat_counts == 1].index idx = df_split_categories[df_split_categories['category'].isin(single_value_categories)].index df_split_categories.loc[idx, 'category'] = 'single_value' - df_datasets_to_split = df_split_categories[df_split_categories['category'] != 'single_value'] - df_test_only_datasets = df_split_categories[df_split_categories['category'] == 'single_value'] - if not df_datasets_to_split.empty: - df_train_datasets, df_test_datasets = train_test_split( - df_datasets_to_split, - test_size=test_size, - shuffle=True, - stratify=df_datasets_to_split['category'], - random_state=seed - ) - df_test_datasets = pd.concat([df_test_datasets, df_test_only_datasets]) + signle_value_datasets = df_split_categories[df_split_categories['category'] == 'single_value'] + if len(signle_value_datasets) >= 1: + df_datasets_to_split = df_split_categories + additional_datasets = pd.DataFrame([]) else: - df_train_datasets, df_test_datasets = train_test_split( - df_split_categories, - test_size=test_size, - shuffle=True, - random_state=seed - ) + df_datasets_to_split = df_split_categories[df_split_categories['category'] != 'single_value'] + additional_datasets = signle_value_datasets + + df_train_datasets, df_test_datasets = train_test_split( + df_datasets_to_split, + test_size=test_size, + shuffle=True, + stratify=df_datasets_to_split['category'], + random_state=seed + ) + df_train_datasets = pd.concat([df_train_datasets, additional_datasets]) + df_train_datasets['is_train'] = 1 df_test_datasets['is_train'] = 0 df_split_datasets = pd.concat([df_train_datasets, df_test_datasets]).join(