From 6789d088632a666c5702c1113d7d9af2ee779f84 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Mon, 8 Jan 2024 22:41:00 -0800 Subject: [PATCH 1/8] Update parameter_sweep.py --- backend/app/internal/parameter_sweep.py | 74 ++++++++++++++++++------- 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/backend/app/internal/parameter_sweep.py b/backend/app/internal/parameter_sweep.py index 2204eb71..5ef670b7 100644 --- a/backend/app/internal/parameter_sweep.py +++ b/backend/app/internal/parameter_sweep.py @@ -3,13 +3,15 @@ import numpy as np from watertap.tools.parameter_sweep import LinearSample, ParameterSweep, parameter_sweep import watertap.examples.flowsheets.case_studies.wastewater_resource_recovery.amo_1575_magprex.magprex as magprex +from watertap.tools.parameter_sweep import ParameterSweepReader + from importlib import import_module import idaes.logger as idaeslog _log = idaeslog.getLogger(__name__) -def set_up_sensitivity(m, solve, output_params): +def set_up_sensitivity(solve, output_params): outputs = {} # optimize_kwargs = {"fail_flag": False} optimize_kwargs = {} @@ -19,14 +21,21 @@ def set_up_sensitivity(m, solve, output_params): # we should have the user provide outputs as a parameter i = 0 for each in output_params: - outputs[f'{i}: {each["name"]}'] = each["param"] + outputs[each["name"]] = each["param"] i += 1 return outputs, optimize_kwargs, opt_function +def build_outputs(model, output_keys): + outputs = {} + for key, pyo_object in output_keys.items(): + outputs[key] = model.find_component(pyo_object) + return outputs + + def run_analysis( - m, + ui_config, flowsheet, parameters, output_params, @@ -34,23 +43,39 @@ def run_analysis( interpolate_nan_outputs=True, custom_do_param_sweep_kwargs=None, ): - flowsheet = import_module(flowsheet) + flowsheet_ui = import_module(flowsheet) + flowsheet = import_module(flowsheet.replace("_ui", "")) + # try: + # solve_function = flowsheet_ui.solve_flowsheet + # except: try: - solve_function = flowsheet.solve - except: solve_function = flowsheet.optimize + except: + solve_function = flowsheet.solve + outputs, optimize_kwargs, opt_function = set_up_sensitivity( - m, solve_function, output_params + solve_function, output_params ) + try: + build_function = flowsheet_ui.build_flowsheet + build_kwargs = {"build_options": ui_config.fs_exp.build_options} + except: + build_function = flowsheet.build + build_kwargs = {} sweep_params = {} # sensitivity analysis i = 0 for each in parameters: - sweep_params[f'{i}: {each["name"]}'] = LinearSample( - each["param"], each["lb"], each["ub"], int(each["num_samples"]) - ) + sweep_params[each["param"]] = { + "type": "LinearSample", + "param": each["param"], + "lower_limit": each["lb"], + "upper_limit": each["ub"], + "num_samples": int(each["num_samples"]), + } i += 1 + print(sweep_params) # Check if user provided custom kwargs, if not don't use cutm swep param # else check if user provided custom sweep function, if not use watertap default (will be merged) if custom_do_param_sweep_kwargs is None: @@ -64,17 +89,22 @@ def run_analysis( ps = ParameterSweep( csv_results_file_name=results_path, - optimize_function=opt_function, - optimize_kwargs=optimize_kwargs, + optimize_function=solve_function, + # optimize_kwargs=optimize_kwargs, interpolate_nan_outputs=False, custom_do_param_sweep=custom_do_param_sweep, custom_do_param_sweep_kwargs=custom_do_param_sweep_kwargs, reinitialize_before_sweep=False, + parallel_back_end="MultiProcessing", + number_of_subprocesses=18, ) global_results = ps.parameter_sweep( - m, - sweep_params, - outputs, + build_model=build_function, + build_sweep_params=ParameterSweepReader()._dict_to_params, + build_sweep_params_kwargs={"input_dict": sweep_params}, + build_model_kwargs=build_kwargs, + build_outputs=build_outputs, + build_outputs_kwargs={"output_keys": outputs}, ) return global_results @@ -110,7 +140,7 @@ def run_parameter_sweep(flowsheet, info): "num_samples": flowsheet.fs_exp.model_objects[ key ].num_samples, - "param": flowsheet.fs_exp.model_objects[key].obj, + "param": flowsheet.fs_exp.model_objects[key].obj.name, } ) except: @@ -120,7 +150,7 @@ def run_parameter_sweep(flowsheet, info): "lb": flowsheet.fs_exp.model_objects[key].obj.lb, "ub": flowsheet.fs_exp.model_objects[key].obj.ub, "num_samples": "5", - "param": flowsheet.fs_exp.model_objects[key].obj, + "param": flowsheet.fs_exp.model_objects[key].obj.name, } ) # HTTPException(500, detail=f"Sweep failed: {parameters}") @@ -146,7 +176,7 @@ def run_parameter_sweep(flowsheet, info): output_params.append( { "name": flowsheet.fs_exp.model_objects[key].name, - "param": flowsheet.fs_exp.model_objects[key].obj, + "param": flowsheet.fs_exp.model_objects[key].obj.name, } ) keys.append(key) @@ -154,14 +184,16 @@ def run_parameter_sweep(flowsheet, info): Path.home() / ".watertap" / "sweep_outputs" / f"{info.name}_sweep.csv" ) results = run_analysis( - m=flowsheet.fs_exp.m, - # flowsheet=info.module[0:-3], # replace _ui instead? - flowsheet=info.module.replace("_ui", ""), + ui_config=flowsheet, + flowsheet=info.module, + # # flowsheet=info.module[0:-3], # replace _ui instead? + # flowsheet=info.module.replace("_ui", ""), parameters=parameters, output_params=output_params, results_path=output_path, # custom_do_param_sweep_kwargs=flowsheet.custom_do_param_sweep_kwargs, ) + print(results) except Exception as err: _log.error(f"err: {err}") raise HTTPException(500, detail=f"Sweep failed: {err}") From fc64f6f0c0d021e65cbff58e9a11b5699600d1da Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Sun, 14 Jan 2024 18:02:24 -0800 Subject: [PATCH 2/8] update ps interpgae --- backend/app/error.inp | 67 ++++++++++++++ backend/app/internal/parameter_sweep.py | 112 ++++++++++++++++++------ 2 files changed, 154 insertions(+), 25 deletions(-) create mode 100644 backend/app/error.inp diff --git a/backend/app/error.inp b/backend/app/error.inp new file mode 100644 index 00000000..198ed10f --- /dev/null +++ b/backend/app/error.inp @@ -0,0 +1,67 @@ +SOLUTION_RAW 1 + -temp 25.000000005076 + -pressure 94.065734782909 + -potential 0 + -total_h 1118.7845278908 + -total_o 566.77388207882 + -cb 2.9126215328468e-12 + -density 1.0276874827527 + -totals + Ba 2.0650117217683e-05 + C(4) 0.43981793263844 + Ca 0.0028886459979373 + Cl 2.1531752986115 + K 0.065280473324413 + Mg 0.43952104225307 + Na 2.6857345018792 + S(6) 0.53907297667772 + Si 2.065882020971 + Sr 0.011649119492816 + -pH 7.254354578426 + -pe 4 + -mu 0.46480683956184 + -ah2o 0.98641400341 + -mass_water 10 + -soln_vol 10.148180961603 + -total_alkalinity 0.42785263895589 + -activities + Ba -6.4750176244296 + C(4) -4.6208191402998 + Ca -4.1837746912445 + Cl -0.85029972227016 + E -4 + K -2.3585340074292 + Mg -1.9883833455922 + Na -0.71647716549494 + S(6) -2.1821343740859 + Si -0.66886028535939 + Sr -3.6725311479511 + -gammas + Ba+2 -0.78994014564475 + CO2 0.02956768971895 + CO3-2 -0.90162409095947 + Ca+2 -0.64446901378706 + Cl- -0.18337911118352 + H+ -0.15266362438484 + H2SiO4-2 -1.0491909982768 + H3SiO4- -0.27288528176593 + H4SiO4 0.018486455549226 + HCO3- -0.19083217413178 + HSO4- -0.17745896547341 + K+ -0.17331730209409 + Mg+2 -0.62932572696671 + MgOH+ -0.10976775155995 + Na+ -0.14554024379494 + OH- -0.23619234093398 + SO4-2 -0.91378154658329 + Sr+2 -0.73882424808883 +REACTION_RAW 1 + -reactant_list + HCl 19271.598908994 + -steps + 1 + -count_steps 0 + -equal_increments 0 + -units Mol + # REACTION workspace variables # + -element_list diff --git a/backend/app/internal/parameter_sweep.py b/backend/app/internal/parameter_sweep.py index 5ef670b7..848f3f31 100644 --- a/backend/app/internal/parameter_sweep.py +++ b/backend/app/internal/parameter_sweep.py @@ -42,12 +42,11 @@ def run_analysis( results_path="output.csv", interpolate_nan_outputs=True, custom_do_param_sweep_kwargs=None, + fixed_parameters=None, ): flowsheet_ui = import_module(flowsheet) flowsheet = import_module(flowsheet.replace("_ui", "")) - # try: - # solve_function = flowsheet_ui.solve_flowsheet - # except: + try: solve_function = flowsheet.optimize except: @@ -75,7 +74,45 @@ def run_analysis( "num_samples": int(each["num_samples"]), } i += 1 - print(sweep_params) + # generating paramters that are fixed- length 1! + for f_param in fixed_parameters: + if f_param["fixed"]: + sweep_params[f_param["param"] + "_fixed_state"] = { + "type": "LinearSample", + "param": f_param["param"], + "lower_limit": f_param["value"], + "upper_limit": f_param["value"], + "num_samples": 1, + } + else: + sweep_params[f_param["param"] + "_fixed_state"] = { + "type": "LinearSample", + "param": f_param["param"], + "lower_limit": 0, + "upper_limit": 0, + "num_samples": 1, + "set_mode": "set_fixed_state", + "default_value": f_param["value"], + } + if f_param["lb"] is not None: + sweep_params[f_param["param"] + "_lb"] = { + "type": "LinearSample", + "param": f_param["param"], + "lower_limit": f_param["lb"], + "upper_limit": f_param["lb"], + "num_samples": 1, + "set_mode": "set_lb", + } + if f_param["ub"] is not None: + sweep_params[f_param["param"] + "_ub"] = { + "type": "LinearSample", + "param": f_param["param"], + "lower_limit": f_param["ub"], + "upper_limit": f_param["ub"], + "num_samples": 1, + "set_mode": "set_ub", + } + print("sweep_params", sweep_params) # Check if user provided custom kwargs, if not don't use cutm swep param # else check if user provided custom sweep function, if not use watertap default (will be merged) if custom_do_param_sweep_kwargs is None: @@ -86,7 +123,7 @@ def run_analysis( ) if custom_do_param_sweep is None: custom_do_param_sweep_kwargs = None - + print(fixed_parameters) ps = ParameterSweep( csv_results_file_name=results_path, optimize_function=solve_function, @@ -96,7 +133,7 @@ def run_analysis( custom_do_param_sweep_kwargs=custom_do_param_sweep_kwargs, reinitialize_before_sweep=False, parallel_back_end="MultiProcessing", - number_of_subprocesses=18, + number_of_subprocesses=10, ) global_results = ps.parameter_sweep( build_model=build_function, @@ -114,7 +151,13 @@ def run_parameter_sweep(flowsheet, info): try: _log.info("trying to sweep") parameters = [] + # for keeping track of user defined paramters in flowsheet that are changed + fixed_parameters = [] + fixed_parameters_keys = [] output_params = [] + # for keeping track of key to export, will be a list in same order a keys, and conversion factors + # but defines sweep_params and outputs. + export_keys = [] keys = [] conversion_factors = [] results_table = {"headers": []} @@ -134,7 +177,7 @@ def run_parameter_sweep(flowsheet, info): try: parameters.append( { - "name": flowsheet.fs_exp.model_objects[key].name, + "name": flowsheet.fs_exp.model_objects[key].obj.name, "lb": flowsheet.fs_exp.model_objects[key].obj.lb, "ub": flowsheet.fs_exp.model_objects[key].obj.ub, "num_samples": flowsheet.fs_exp.model_objects[ @@ -146,17 +189,33 @@ def run_parameter_sweep(flowsheet, info): except: parameters.append( { - "name": flowsheet.fs_exp.model_objects[key].name, + "name": flowsheet.fs_exp.model_objects[key].obj.name, "lb": flowsheet.fs_exp.model_objects[key].obj.lb, "ub": flowsheet.fs_exp.model_objects[key].obj.ub, "num_samples": "5", "param": flowsheet.fs_exp.model_objects[key].obj.name, } ) + export_keys.append( + ["sweep_params", flowsheet.fs_exp.model_objects[key].obj.name] + ) # HTTPException(500, detail=f"Sweep failed: {parameters}") flowsheet.fs_exp.model_objects[key].obj.fix() conversion_factors.append(conversion_factor) keys.append(key) + elif flowsheet.fs_exp.model_objects[key].is_input: + fixed_parameters.append( + { + "name": flowsheet.fs_exp.model_objects[key].obj.name, + "value": flowsheet.fs_exp.model_objects[key].obj.value, + "num_samples": "1", + "param": flowsheet.fs_exp.model_objects[key].obj.name, + "fixed": flowsheet.fs_exp.model_objects[key].fixed, + "lb": flowsheet.fs_exp.model_objects[key].obj.lb, + "ub": flowsheet.fs_exp.model_objects[key].obj.ub, + } + ) + fixed_parameters_keys.append(flowsheet.fs_exp.model_objects[key].name) for key in flowsheet.fs_exp.model_objects: if ( flowsheet.fs_exp.model_objects[key].is_output @@ -175,15 +234,18 @@ def run_parameter_sweep(flowsheet, info): conversion_factors.append(conversion_factor) output_params.append( { - "name": flowsheet.fs_exp.model_objects[key].name, + "name": flowsheet.fs_exp.model_objects[key].obj.name, "param": flowsheet.fs_exp.model_objects[key].obj.name, } ) + export_keys.append( + ["outputs", flowsheet.fs_exp.model_objects[key].obj.name] + ) keys.append(key) output_path = ( Path.home() / ".watertap" / "sweep_outputs" / f"{info.name}_sweep.csv" ) - results = run_analysis( + results_arr, output_dict = run_analysis( ui_config=flowsheet, flowsheet=info.module, # # flowsheet=info.module[0:-3], # replace _ui instead? @@ -191,29 +253,29 @@ def run_parameter_sweep(flowsheet, info): parameters=parameters, output_params=output_params, results_path=output_path, + fixed_parameters=fixed_parameters, # custom_do_param_sweep_kwargs=flowsheet.custom_do_param_sweep_kwargs, ) - print(results) + # print(results) except Exception as err: _log.error(f"err: {err}") raise HTTPException(500, detail=f"Sweep failed: {err}") - results_table["values"] = results[0].tolist() - for value in results_table["values"]: - for i in range(len(value)): - if np.isnan(value[i]): - value[i] = None - # error_params = "" - # for j in range(len(parameters)): - # error_param = parameters[j]["name"] - # error_value = value[j] - # error_params += f'{error_param}: {error_value}, ' - # error_params = error_params[:-2] - # _log.error(f'Sweep produced invalid results: {error_params}') - # raise HTTPException(500, detail=f"Sweep produced invalid results for input parameters: {error_params}") + num_samples = output_dict[export_keys[0][0]][export_keys[0][1]]["value"].size + result_arr = [] + for ns in range(num_samples): + result_arr.append([]) + for i, (result_type, key) in enumerate(export_keys): + value = output_dict[export_keys[i][0]][export_keys[i][1]]["value"][ns] + + if np.isnan(value) or output_dict["solve_successful"][ns] == False: + value = None else: conversion_factor = conversion_factors[i] - value[i] = value[i] * conversion_factor + value = value * conversion_factor + result_arr[-1].append(value) + results_table["values"] = result_arr results_table["keys"] = keys results_table["num_parameters"] = len(parameters) results_table["num_outputs"] = len(output_params) + print(results_table) return results_table From c679c28312eaa6871b63c8a4319546c8ef674457 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Sun, 14 Jan 2024 21:09:22 -0800 Subject: [PATCH 3/8] Delete backend/app/error.inp --- backend/app/error.inp | 67 ------------------------------------------- 1 file changed, 67 deletions(-) delete mode 100644 backend/app/error.inp diff --git a/backend/app/error.inp b/backend/app/error.inp deleted file mode 100644 index 198ed10f..00000000 --- a/backend/app/error.inp +++ /dev/null @@ -1,67 +0,0 @@ -SOLUTION_RAW 1 - -temp 25.000000005076 - -pressure 94.065734782909 - -potential 0 - -total_h 1118.7845278908 - -total_o 566.77388207882 - -cb 2.9126215328468e-12 - -density 1.0276874827527 - -totals - Ba 2.0650117217683e-05 - C(4) 0.43981793263844 - Ca 0.0028886459979373 - Cl 2.1531752986115 - K 0.065280473324413 - Mg 0.43952104225307 - Na 2.6857345018792 - S(6) 0.53907297667772 - Si 2.065882020971 - Sr 0.011649119492816 - -pH 7.254354578426 - -pe 4 - -mu 0.46480683956184 - -ah2o 0.98641400341 - -mass_water 10 - -soln_vol 10.148180961603 - -total_alkalinity 0.42785263895589 - -activities - Ba -6.4750176244296 - C(4) -4.6208191402998 - Ca -4.1837746912445 - Cl -0.85029972227016 - E -4 - K -2.3585340074292 - Mg -1.9883833455922 - Na -0.71647716549494 - S(6) -2.1821343740859 - Si -0.66886028535939 - Sr -3.6725311479511 - -gammas - Ba+2 -0.78994014564475 - CO2 0.02956768971895 - CO3-2 -0.90162409095947 - Ca+2 -0.64446901378706 - Cl- -0.18337911118352 - H+ -0.15266362438484 - H2SiO4-2 -1.0491909982768 - H3SiO4- -0.27288528176593 - H4SiO4 0.018486455549226 - HCO3- -0.19083217413178 - HSO4- -0.17745896547341 - K+ -0.17331730209409 - Mg+2 -0.62932572696671 - MgOH+ -0.10976775155995 - Na+ -0.14554024379494 - OH- -0.23619234093398 - SO4-2 -0.91378154658329 - Sr+2 -0.73882424808883 -REACTION_RAW 1 - -reactant_list - HCl 19271.598908994 - -steps - 1 - -count_steps 0 - -equal_increments 0 - -units Mol - # REACTION workspace variables # - -element_list From 770f1888e466089496186d1f71f360f908994adf Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Fri, 27 Sep 2024 13:39:44 -0700 Subject: [PATCH 4/8] fixes to latest version --- backend/app/internal/parameter_sweep.py | 257 +++++++++++------------- 1 file changed, 119 insertions(+), 138 deletions(-) diff --git a/backend/app/internal/parameter_sweep.py b/backend/app/internal/parameter_sweep.py index 26710604..f228cfc5 100644 --- a/backend/app/internal/parameter_sweep.py +++ b/backend/app/internal/parameter_sweep.py @@ -1,11 +1,13 @@ from pathlib import Path from fastapi import HTTPException import numpy as np -from parameter_sweep import LinearSample, ParameterSweep, parameter_sweep +from parameter_sweep import ( + ParameterSweep, + ParameterSweepReader, +) from importlib import import_module import idaes.logger as idaeslog from pyomo.environ import ( - ConcreteModel, value, Var, units as pyunits, @@ -62,13 +64,15 @@ def run_analysis( solve_function = flowsheet.optimize except: solve_function = flowsheet.solve - - m = flowsheet.fs_exp.m - solve_function = flowsheet.get_action("solve") + # TODO: this does not return an actual solve function from the flowsheet... + # solve_function_action= ui_config.get_action("solve") + # print(solve_function_action) # This does not return a solve, but a add_action object... not sure why outputs, optimize_kwargs, opt_function = set_up_sensitivity( solve_function, output_params ) - + number_of_subprocess = ui_config.get_number_of_subprocesses() + print(number_of_subprocess) + assert False try: build_function = flowsheet_ui.build_flowsheet build_kwargs = {"build_options": ui_config.fs_exp.build_options} @@ -76,7 +80,6 @@ def run_analysis( build_function = flowsheet.build build_kwargs = {} sweep_params = {} - # sensitivity analysis i = 0 for each in parameters: sweep_params[each["param"]] = { @@ -91,41 +94,33 @@ def run_analysis( for f_param in fixed_parameters: if f_param["fixed"]: sweep_params[f_param["param"] + "_fixed_state"] = { - "type": "LinearSample", + "type": "PredeterminedFixedSample", "param": f_param["param"], - "lower_limit": f_param["value"], - "upper_limit": f_param["value"], - "num_samples": 1, + "array": np.array([f_param["value"]]), } else: sweep_params[f_param["param"] + "_fixed_state"] = { "type": "LinearSample", + "type": "PredeterminedFixedSample", "param": f_param["param"], - "lower_limit": 0, - "upper_limit": 0, - "num_samples": 1, + "array": np.array([0]), "set_mode": "set_fixed_state", "default_value": f_param["value"], } if f_param["lb"] is not None: sweep_params[f_param["param"] + "_lb"] = { - "type": "LinearSample", + "type": "PredeterminedFixedSample", "param": f_param["param"], - "lower_limit": f_param["lb"], - "upper_limit": f_param["lb"], - "num_samples": 1, + "array": np.array([f_param["lb"]]), "set_mode": "set_lb", } if f_param["ub"] is not None: sweep_params[f_param["param"] + "_ub"] = { - "type": "LinearSample", + "type": "PredeterminedFixedSample", "param": f_param["param"], - "lower_limit": f_param["ub"], - "upper_limit": f_param["ub"], - "num_samples": 1, + "array": np.array([f_param["ub"]]), "set_mode": "set_ub", } - print("sweep_params", sweep_params) # Check if user provided custom kwargs, if not don't use cutm swep param # else check if user provided custom sweep function, if not use watertap default (will be merged) if custom_do_param_sweep_kwargs is None: @@ -136,17 +131,15 @@ def run_analysis( ) if custom_do_param_sweep is None: custom_do_param_sweep_kwargs = None - print(fixed_parameters) ps = ParameterSweep( csv_results_file_name=results_path, optimize_function=solve_function, - # optimize_kwargs=optimize_kwargs, interpolate_nan_outputs=False, custom_do_param_sweep=custom_do_param_sweep, custom_do_param_sweep_kwargs=custom_do_param_sweep_kwargs, reinitialize_before_sweep=False, parallel_back_end="MultiProcessing", - number_of_subprocesses=10, + number_of_subprocesses=2, ) global_results = ps.parameter_sweep( build_model=build_function, @@ -161,133 +154,121 @@ def run_analysis( def run_parameter_sweep(flowsheet, info): - try: - _log.info("trying to sweep") - parameters = [] - # for keeping track of user defined paramters in flowsheet that are changed - fixed_parameters = [] - fixed_parameters_keys = [] - output_params = [] - # for keeping track of key to export, will be a list in same order a keys, and conversion factors - # but defines sweep_params and outputs. - export_keys = [] - keys = [] - conversion_factors = [] - results_table = {"headers": []} - for key in flowsheet.fs_exp.exports: - if flowsheet.fs_exp.exports[key].is_sweep: - if ( - flowsheet.fs_exp.exports[key].lb is not None - and flowsheet.fs_exp.exports[key].ub is not None - ): - results_table["headers"].append(flowsheet.fs_exp.exports[key].name) - conversion_factor = get_conversion_unit(flowsheet, key) - try: - parameters.append( - { - "name": flowsheet.fs_exp.exports[key].name, - "lb": flowsheet.fs_exp.exports[key].obj.lb, - "ub": flowsheet.fs_exp.exports[key].obj.ub, - "num_samples": flowsheet.fs_exp.exports[ - key - ].num_samples, - "param": flowsheet.fs_exp.exports[key].obj, - } - ) - except: - parameters.append( - { - "name": flowsheet.fs_exp.exports[key].name, - "lb": flowsheet.fs_exp.exports[key].obj.lb, - "ub": flowsheet.fs_exp.exports[key].obj.ub, - "num_samples": "5", - "param": flowsheet.fs_exp.exports[key].obj, - } - ) - export_keys.append( - ["sweep_params", flowsheet.fs_exp.model_objects[key].obj.name] - ) - # HTTPException(500, detail=f"Sweep failed: {parameters}") - flowsheet.fs_exp.exports[key].obj.fix() - conversion_factors.append(conversion_factor) - keys.append(key) - elif flowsheet.fs_exp.model_objects[key].is_input: - fixed_parameters.append( - { - "name": flowsheet.fs_exp.model_objects[key].obj.name, - "value": flowsheet.fs_exp.model_objects[key].obj.value, - "num_samples": "1", - "param": flowsheet.fs_exp.model_objects[key].obj.name, - "fixed": flowsheet.fs_exp.model_objects[key].fixed, - "lb": flowsheet.fs_exp.model_objects[key].obj.lb, - "ub": flowsheet.fs_exp.model_objects[key].obj.ub, - } - ) - fixed_parameters_keys.append(flowsheet.fs_exp.model_objects[key].name) - for key in flowsheet.fs_exp.exports: + # try: + _log.info("trying to sweep") + parameters = [] + # for keeping track of user defined paramters in flowsheet that are changed + fixed_parameters = [] + fixed_parameters_keys = [] + output_params = [] + # for keeping track of key to export, will be a list in same order a keys, and conversion factors + # but defines sweep_params and outputs. + export_keys = [] + keys = [] + conversion_factors = {} + results_table = {"headers": []} + for key in flowsheet.fs_exp.exports: + if flowsheet.fs_exp.exports[key].is_sweep: if ( - flowsheet.fs_exp.exports[key].is_output - or ( - not flowsheet.fs_exp.exports[key].is_output - and flowsheet.fs_exp.exports[key].is_input - and not flowsheet.fs_exp.exports[key].fixed - ) - # and not flowsheet.fs_exp.exports[key].is_input + flowsheet.fs_exp.exports[key].lb is not None + and flowsheet.fs_exp.exports[key].ub is not None ): results_table["headers"].append(flowsheet.fs_exp.exports[key].name) - + conversion_factor = get_conversion_unit(flowsheet, key) try: - conversion_factor = get_conversion_unit(flowsheet, key) - except Exception as e: - conversion_factor = 1 - conversion_factors.append(conversion_factor) - output_params.append( - { - "name": flowsheet.fs_exp.exports[key].name, - "param": flowsheet.fs_exp.exports[key].obj, - } - ) + parameters.append( + { + "name": flowsheet.fs_exp.exports[key].name, + "lb": flowsheet.fs_exp.exports[key].obj.lb, + "ub": flowsheet.fs_exp.exports[key].obj.ub, + "num_samples": flowsheet.fs_exp.exports[key].num_samples, + "param": flowsheet.fs_exp.exports[key].obj.name, + } + ) + except: + parameters.append( + { + "name": flowsheet.fs_exp.exports[key].name, + "lb": flowsheet.fs_exp.exports[key].obj.lb, + "ub": flowsheet.fs_exp.exports[key].obj.ub, + "num_samples": "5", + "param": flowsheet.fs_exp.exports[key].obj.name, + } + ) export_keys.append( - ["outputs", flowsheet.fs_exp.model_objects[key].obj.name] + ["sweep_params", flowsheet.fs_exp.exports[key].obj.name] ) + flowsheet.fs_exp.exports[key].obj.fix() + conversion_factors[key] = conversion_factor keys.append(key) - output_path = Path.home() / ".nawi" / "sweep_outputs" / f"{info.name}_sweep.csv" - results_arr, output_dict = run_analysis( - ui_config=flowsheet, - flowsheet=info.module, - # # flowsheet=info.module[0:-3], # replace _ui instead? - # flowsheet=info.module.replace("_ui", ""), - parameters=parameters, - output_params=output_params, - results_path=output_path, - fixed_parameters=fixed_parameters, - # custom_do_param_sweep_kwargs=flowsheet.custom_do_param_sweep_kwargs, - ) - # print(results) - except Exception as err: - _log.error(f"err: {err}") - raise HTTPException(500, detail=f"Sweep failed: {err}") + elif flowsheet.fs_exp.exports[key].is_input: + fixed_parameters.append( + { + "name": flowsheet.fs_exp.exports[key].obj.name, + "value": flowsheet.fs_exp.exports[key].obj.value, + "num_samples": "1", + "param": flowsheet.fs_exp.exports[key].obj.name, + "fixed": flowsheet.fs_exp.exports[key].fixed, + "lb": flowsheet.fs_exp.exports[key].obj.lb, + "ub": flowsheet.fs_exp.exports[key].obj.ub, + } + ) + fixed_parameters_keys.append(flowsheet.fs_exp.exports[key].name) + for key in flowsheet.fs_exp.exports: + if ( + flowsheet.fs_exp.exports[key].is_output + or ( + not flowsheet.fs_exp.exports[key].is_output + and flowsheet.fs_exp.exports[key].is_input + and not flowsheet.fs_exp.exports[key].fixed + ) + # and not flowsheet.fs_exp.exports[key].is_input + ): + results_table["headers"].append(flowsheet.fs_exp.exports[key].name) + + try: + conversion_factor = get_conversion_unit(flowsheet, key) + except Exception as e: + conversion_factor = 1 + conversion_factors[key] = conversion_factor + # we will use obj names through out + output_params.append( + { + "name": flowsheet.fs_exp.exports[key].obj.name, + "param": flowsheet.fs_exp.exports[key].obj.name, + } + ) + export_keys.append(["outputs", flowsheet.fs_exp.exports[key].obj.name]) + keys.append(key) + output_path = Path.home() / ".nawi" / "sweep_outputs" / f"{info.name}_sweep.csv" + results_arr, output_dict = run_analysis( + ui_config=flowsheet, + flowsheet=info.module, + parameters=parameters, + output_params=output_params, + results_path=output_path, + fixed_parameters=fixed_parameters, + ) + # except Exception as err: + # _log.error(f"err: {err}") + # raise HTTPException(500, detail=f"Sweep failed: {err}") + # we can't rely on result_arr order, as it will be async and out of order, + # so instead here, we will rebuild expected order from out output_dict num_samples = output_dict[export_keys[0][0]][export_keys[0][1]]["value"].size result_arr = [] - # for ns in range(num_samples): - # result_arr.append([]) - # for i, (result_type, key) in enumerate(export_keys): - # value = output_dict[export_keys[i][0]][export_keys[i][1]]["value"][ns] + for ns in range(num_samples): + result_arr.append([]) + for i, (result_type, key) in enumerate(export_keys): + value = output_dict[export_keys[i][0]][export_keys[i][1]]["value"][ns] - # if np.isnan(value) or output_dict["solve_successful"][ns] == False: - # value = None - results_table["values"] = results[0].tolist() - for value in results_table["values"]: - for i in range(len(value)): - if np.isnan(value[i]): - value[i] = None + if np.isnan(value) or output_dict["solve_successful"][ns] == False: + value = None else: - conversion_factor = conversion_factors[i] + conversion_factor = conversion_factors[key] value = value * conversion_factor result_arr[-1].append(value) results_table["values"] = result_arr results_table["keys"] = keys results_table["num_parameters"] = len(parameters) results_table["num_outputs"] = len(output_params) - print(results_table) return results_table From d4e1d7b55bd4a708fb87e92f47547cd7c7d56107 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Fri, 27 Sep 2024 14:27:27 -0700 Subject: [PATCH 5/8] removed false interapt --- backend/app/internal/parameter_sweep.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/app/internal/parameter_sweep.py b/backend/app/internal/parameter_sweep.py index f228cfc5..4dfc2a15 100644 --- a/backend/app/internal/parameter_sweep.py +++ b/backend/app/internal/parameter_sweep.py @@ -70,9 +70,9 @@ def run_analysis( outputs, optimize_kwargs, opt_function = set_up_sensitivity( solve_function, output_params ) - number_of_subprocess = ui_config.get_number_of_subprocesses() - print(number_of_subprocess) - assert False + # number_of_subprocess = ui_config.get_number_of_subprocesses() + # print(number_of_subprocess) + # assert False try: build_function = flowsheet_ui.build_flowsheet build_kwargs = {"build_options": ui_config.fs_exp.build_options} From adedf7e80292931311906c8c79571f1b07501177 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Fri, 27 Sep 2024 15:49:48 -0700 Subject: [PATCH 6/8] Update FlowsheetTesting.cy.js --- electron/ui/cypress/e2e/FlowsheetTesting.cy.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/electron/ui/cypress/e2e/FlowsheetTesting.cy.js b/electron/ui/cypress/e2e/FlowsheetTesting.cy.js index d26df8b9..80d86203 100644 --- a/electron/ui/cypress/e2e/FlowsheetTesting.cy.js +++ b/electron/ui/cypress/e2e/FlowsheetTesting.cy.js @@ -164,7 +164,7 @@ describe('WaterTAP UI Testing', () => { // run sweep cy.solve_flowsheet() - cy.wait(5000) + cy.wait(10000) cy.screenshot('ran parameter sweep '+flowsheet.name) // verify that sweep was successful From f0e9c5558d60f1ede1c1588a7e3f0c7ff12ae6b9 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Tue, 1 Oct 2024 14:25:44 -0700 Subject: [PATCH 7/8] fixed multi proessing --- backend/app/internal/flowsheet_manager.py | 41 +++++++++++-------- backend/app/internal/parameter_sweep.py | 11 ++--- backend/app/routers/flowsheets.py | 50 +++++++++++++++-------- 3 files changed, 65 insertions(+), 37 deletions(-) diff --git a/backend/app/internal/flowsheet_manager.py b/backend/app/internal/flowsheet_manager.py index 4646bd71..dafed7d8 100644 --- a/backend/app/internal/flowsheet_manager.py +++ b/backend/app/internal/flowsheet_manager.py @@ -7,9 +7,11 @@ if sys.version_info < (3, 10): from importlib_resources import files + importlib_old = True else: from importlib.resources import files + importlib_old = False from importlib import metadata from pathlib import Path @@ -17,6 +19,7 @@ from types import ModuleType from typing import Optional, Dict, List, Union import app +from multiprocessing import cpu_count # third-party from fastapi import HTTPException @@ -78,7 +81,9 @@ def __init__(self, **kwargs): self.startup_time = time.time() # Add custom flowsheets path to the system path - self.custom_flowsheets_path = self.app_settings.data_basedir / "custom_flowsheets" + self.custom_flowsheets_path = ( + self.app_settings.data_basedir / "custom_flowsheets" + ) sys.path.append(str(self.custom_flowsheets_path)) for package in self.app_settings.packages: @@ -205,9 +210,7 @@ def get_diagram(self, id_: str) -> bytes: # _log.info(f"inside get diagram:: info is - {info}") if info.custom: # do this - data_path = ( - self.app_settings.custom_flowsheets_dir / f"{info.id_}.png" - ) + data_path = self.app_settings.custom_flowsheets_dir / f"{info.id_}.png" data = data_path.read_bytes() else: @@ -470,7 +473,7 @@ def add_custom_flowsheet(self, new_files, new_id): query = tinydb.Query() try: - + custom_flowsheets_dict = self._histdb.search( query.fragment({"custom_flowsheets_version": VERSION}) ) @@ -547,7 +550,7 @@ def remove_custom_flowsheet(self, id_): _log.info(f"unable to delete {id_} from flowsheets list") self.add_custom_flowsheets() - + def remove_custom_flowsheet_files(self, flowsheet_files): # remove each file for flowsheet_file in flowsheet_files: @@ -578,10 +581,15 @@ def add_custom_flowsheets(self): def get_number_of_subprocesses(self): # _log.info(f'getting number of subprocesses') - maxNumberOfSubprocesses = 8 + maxNumberOfSubprocesses = ( + int(cpu_count()) - 1 + ) # this will get max number of processors on system reserve 1 for UI + query = tinydb.Query() - item = self._histdb.search(query.fragment({"version": VERSION, "name": "numberOfSubprocesses"})) - # _log.info(f'item is : {item}') + item = self._histdb.search( + query.fragment({"version": VERSION, "name": "numberOfSubprocesses"}) + ) + _log.info(f"item MS is : {item}") if len(item) == 0: # _log.info(f'setting number of subprocesses to be 1') currentNumberOfSubprocesses = 1 @@ -589,24 +597,25 @@ def get_number_of_subprocesses(self): { "version": VERSION, "name": "numberOfSubprocesses", - "value": currentNumberOfSubprocesses + "value": currentNumberOfSubprocesses, }, (query.version == VERSION and query.name == "numberOfSubprocesses"), ) else: currentNumberOfSubprocesses = item[0]["value"] # _log.info(f'number of subprocesses is: {currentNumberOfSubprocesses}') + # prevent user from overspecifying number of sub processes on accident and killing thier system + print("get_number_of_subprocesses", currentNumberOfSubprocesses) + if currentNumberOfSubprocesses > maxNumberOfSubprocesses: + currentNumberOfSubprocesses = maxNumberOfSubprocesses + print("get_number_of_subprocesses", currentNumberOfSubprocesses) return currentNumberOfSubprocesses, maxNumberOfSubprocesses - + def set_number_of_subprocesses(self, value): # _log.info(f'setting number of subprocesses to {value}') query = tinydb.Query() self._histdb.upsert( - { - "version": VERSION, - "name": "numberOfSubprocesses", - "value": value - }, + {"version": VERSION, "name": "numberOfSubprocesses", "value": value}, (query.version == VERSION and query.name == "numberOfSubprocesses"), ) return value diff --git a/backend/app/internal/parameter_sweep.py b/backend/app/internal/parameter_sweep.py index 4dfc2a15..edbda836 100644 --- a/backend/app/internal/parameter_sweep.py +++ b/backend/app/internal/parameter_sweep.py @@ -56,6 +56,7 @@ def run_analysis( interpolate_nan_outputs=True, custom_do_param_sweep_kwargs=None, fixed_parameters=None, + number_of_subprocess=1, ): flowsheet_ui = import_module(flowsheet) flowsheet = import_module(flowsheet.replace("_ui", "")) @@ -70,9 +71,7 @@ def run_analysis( outputs, optimize_kwargs, opt_function = set_up_sensitivity( solve_function, output_params ) - # number_of_subprocess = ui_config.get_number_of_subprocesses() - # print(number_of_subprocess) - # assert False + try: build_function = flowsheet_ui.build_flowsheet build_kwargs = {"build_options": ui_config.fs_exp.build_options} @@ -139,8 +138,9 @@ def run_analysis( custom_do_param_sweep_kwargs=custom_do_param_sweep_kwargs, reinitialize_before_sweep=False, parallel_back_end="MultiProcessing", - number_of_subprocesses=2, + number_of_subprocesses=number_of_subprocess, ) + print("number_of_subprocess ", number_of_subprocess) global_results = ps.parameter_sweep( build_model=build_function, build_sweep_params=ParameterSweepReader()._dict_to_params, @@ -153,7 +153,7 @@ def run_analysis( return global_results -def run_parameter_sweep(flowsheet, info): +def run_parameter_sweep(flowsheet, info, number_of_subprocess): # try: _log.info("trying to sweep") parameters = [] @@ -248,6 +248,7 @@ def run_parameter_sweep(flowsheet, info): output_params=output_params, results_path=output_path, fixed_parameters=fixed_parameters, + number_of_subprocess=number_of_subprocess, ) # except Exception as err: # _log.error(f"err: {err}") diff --git a/backend/app/routers/flowsheets.py b/backend/app/routers/flowsheets.py index bac9e043..dac7a0b1 100644 --- a/backend/app/routers/flowsheets.py +++ b/backend/app/routers/flowsheets.py @@ -125,14 +125,20 @@ async def solve(flowsheet_id: str, request: Request): input_data = await request.json() try: if _log.isEnabledFor(idaeslog.DEBUG): - _log.debug(f"Solve: Loading new data into flowsheet '{flowsheet_id}':\n" - f"{json.dumps(input_data, indent=2)}\n") + _log.debug( + f"Solve: Loading new data into flowsheet '{flowsheet_id}':\n" + f"{json.dumps(input_data, indent=2)}\n" + ) flowsheet.load(input_data) except FlowsheetInterface.MissingObjectError as err: - _log.error(f"Solve: Loading new data into flowsheet {flowsheet_id} failed: {err}") + _log.error( + f"Solve: Loading new data into flowsheet {flowsheet_id} failed: {err}" + ) # XXX: return something about the error to caller except ValidationError as err: - _log.error(f"Solve: Loading new data into flowsheet {flowsheet_id} failed: {err}") + _log.error( + f"Solve: Loading new data into flowsheet {flowsheet_id} failed: {err}" + ) raise HTTPException( 400, f"Cannot update flowsheet id='{flowsheet_id}' due to invalid data input", @@ -163,19 +169,25 @@ async def solve(flowsheet_id: str, request: Request): async def sweep(flowsheet_id: str, request: Request): flowsheet = flowsheet_manager.get_obj(flowsheet_id) info = flowsheet_manager.get_info(flowsheet_id) - + number_of_subprocess, _ = flowsheet_manager.get_number_of_subprocesses() # update input data before running a sweep input_data = await request.json() try: if _log.isEnabledFor(idaeslog.DEBUG): - _log.debug(f"Sweep: Loading new data into flowsheet '{flowsheet_id}':\n" - f"{json.dumps(input_data, indent=2)}\n") + _log.debug( + f"Sweep: Loading new data into flowsheet '{flowsheet_id}':\n" + f"{json.dumps(input_data, indent=2)}\n" + ) flowsheet.load(input_data) except FlowsheetInterface.MissingObjectError as err: - _log.error(f"Sweep: Loading new data into flowsheet {flowsheet_id} failed: {err}") + _log.error( + f"Sweep: Loading new data into flowsheet {flowsheet_id} failed: {err}" + ) # XXX: return something about the error to caller except ValidationError as err: - _log.error(f"Sweep: Loading new data into flowsheet {flowsheet_id} failed: {err}") + _log.error( + f"Sweep: Loading new data into flowsheet {flowsheet_id} failed: {err}" + ) raise HTTPException( 400, f"Cannot update flowsheet id='{flowsheet_id}' due to invalid data input", @@ -192,8 +204,7 @@ async def sweep(flowsheet_id: str, request: Request): _log.info("trying to sweep") with idaeslog.solver_log(_log, level=idaeslog.INFO) as slc: results_table = run_parameter_sweep( - flowsheet=flowsheet, - info=info, + flowsheet=flowsheet, info=info, number_of_subprocess=number_of_subprocess ) flowsheet.fs_exp.sweep_results = results_table # set last run in tiny db @@ -233,16 +244,22 @@ async def update(flowsheet_id: str, request: Request): input_data = await request.json() try: if _log.isEnabledFor(idaeslog.DEBUG): - _log.debug(f"Update: Loading to flowsheet '{flowsheet_id}':\n" - f"{json.dumps(input_data, indent=2)}\n") + _log.debug( + f"Update: Loading to flowsheet '{flowsheet_id}':\n" + f"{json.dumps(input_data, indent=2)}\n" + ) flowsheet.load(input_data) except FlowsheetInterface.MissingObjectError as err: # this is unlikely, the model would need to change while running # (but could happen since 'build' and 'solve' can do anything they want) - _log.error(f"Update: Loading new data into flowsheet {flowsheet_id} failed: {err}") + _log.error( + f"Update: Loading new data into flowsheet {flowsheet_id} failed: {err}" + ) # XXX: return something about the error to caller except ValidationError as err: - _log.error(f"Update: Loading new data into flowsheet {flowsheet_id} failed: {err}") + _log.error( + f"Update: Loading new data into flowsheet {flowsheet_id} failed: {err}" + ) raise HTTPException( 400, f"Cannot update flowsheet id='{flowsheet_id}' due to invalid data input", @@ -300,7 +317,8 @@ async def upload_flowsheet(files: List[UploadFile]) -> str: if "_ui.py" in file.filename: new_id = file.filename.replace(".py", "") async with aiofiles.open( - f"{str(flowsheet_manager.app_settings.custom_flowsheets_dir)}/{file.filename}", "wb" + f"{str(flowsheet_manager.app_settings.custom_flowsheets_dir)}/{file.filename}", + "wb", ) as out_file: content = await file.read() # async read await out_file.write(content) From 781211bdcf7f653c5c174893e6955dc887517436 Mon Sep 17 00:00:00 2001 From: avdudchenko <33663878+avdudchenko@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:04:43 -0700 Subject: [PATCH 8/8] test fix --- backend/app/internal/flowsheet_manager.py | 3 +-- backend/app/internal/parameter_sweep.py | 8 ++++++-- electron/ui/cypress/support/commands.js | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/backend/app/internal/flowsheet_manager.py b/backend/app/internal/flowsheet_manager.py index dafed7d8..a02c1fe9 100644 --- a/backend/app/internal/flowsheet_manager.py +++ b/backend/app/internal/flowsheet_manager.py @@ -605,10 +605,9 @@ def get_number_of_subprocesses(self): currentNumberOfSubprocesses = item[0]["value"] # _log.info(f'number of subprocesses is: {currentNumberOfSubprocesses}') # prevent user from overspecifying number of sub processes on accident and killing thier system - print("get_number_of_subprocesses", currentNumberOfSubprocesses) + if currentNumberOfSubprocesses > maxNumberOfSubprocesses: currentNumberOfSubprocesses = maxNumberOfSubprocesses - print("get_number_of_subprocesses", currentNumberOfSubprocesses) return currentNumberOfSubprocesses, maxNumberOfSubprocesses def set_number_of_subprocesses(self, value): diff --git a/backend/app/internal/parameter_sweep.py b/backend/app/internal/parameter_sweep.py index edbda836..6137ab12 100644 --- a/backend/app/internal/parameter_sweep.py +++ b/backend/app/internal/parameter_sweep.py @@ -130,6 +130,11 @@ def run_analysis( ) if custom_do_param_sweep is None: custom_do_param_sweep_kwargs = None + + if number_of_subprocess == 1: + parallel_back_end = "Serial" + else: + parallel_back_end = "MultiProcessing" ps = ParameterSweep( csv_results_file_name=results_path, optimize_function=solve_function, @@ -137,10 +142,9 @@ def run_analysis( custom_do_param_sweep=custom_do_param_sweep, custom_do_param_sweep_kwargs=custom_do_param_sweep_kwargs, reinitialize_before_sweep=False, - parallel_back_end="MultiProcessing", + parallel_back_end=parallel_back_end, number_of_subprocesses=number_of_subprocess, ) - print("number_of_subprocess ", number_of_subprocess) global_results = ps.parameter_sweep( build_model=build_function, build_sweep_params=ParameterSweepReader()._dict_to_params, diff --git a/electron/ui/cypress/support/commands.js b/electron/ui/cypress/support/commands.js index 56b35bd6..4dfd6438 100644 --- a/electron/ui/cypress/support/commands.js +++ b/electron/ui/cypress/support/commands.js @@ -81,7 +81,7 @@ Cypress.Commands.add('solve_flowsheet', () => { url: 'http://localhost:8001/flowsheets/**', }).as('run'); cy.findAllByRole('button', {name: /run/i}).eq(0).click(); - cy.wait('@run', {timeout: 180000}); + cy.wait('@run', {timeout: 600000}); }) /**