Skip to content

Commit

Permalink
Add switch for toggling deepcopy off (cadCAD-org#316)
Browse files Browse the repository at this point in the history
* fix tests + rm simulations/ folder

* add types.py

* single run / multi mc is ok

* fix for single run / single param

* add support for single proc runs

* add switch for using deepcopy + fix bug on additional_objs

* bug fix

* bug fix

---------

Co-authored-by: Emanuel Lima <[email protected]>
  • Loading branch information
danlessa and emanuellima1 authored Dec 14, 2023
1 parent 1b3241e commit 0aa86be
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 29 deletions.
6 changes: 4 additions & 2 deletions cadCAD/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from time import time
from typing import Callable, Dict, List, Any, Tuple
from typing import Callable, Dict, List, Any, Tuple, Union

from cadCAD.utils import flatten
from cadCAD.utils.execution import print_exec_info
Expand Down Expand Up @@ -39,6 +39,7 @@ def auto_mode_switcher(config_amt: int):
class ExecutionContext:
def __init__(self, context=ExecutionMode.local_mode, method=None, additional_objs=None) -> None:
self.name = context
self.additional_objs = additional_objs
if context == 'local_proc':
self.method = local_simulations
elif context == 'single_proc':
Expand Down Expand Up @@ -74,6 +75,7 @@ def __init__(self,
self.SimExecutor = SimExecutor
self.exec_method = exec_context.method
self.exec_context = exec_context.name
self.additional_objs = exec_context.additional_objs
self.configs = configs
self.empty_return = empty_return

Expand Down Expand Up @@ -174,7 +176,7 @@ def get_final_results(simulations: List[StateHistory],
print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs,
ExpIDs, SubsetIDs, SubsetWindows, original_N
ExpIDs, SubsetIDs, SubsetWindows, original_N, self.additional_objs
)

final_result = get_final_results(
Expand Down
17 changes: 10 additions & 7 deletions cadCAD/engine/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def single_proc_exec(
ExpIDs: List[int],
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
configured_n: List[N_Runs],
additional_objs=None
):

# HACK for making it run with N_Runs=1
Expand All @@ -38,7 +39,7 @@ def single_proc_exec(
map(lambda x: x.pop(), raw_params)
)
result = simulation_exec(
var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n
var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n, additional_objs
)
return flatten(result)

Expand All @@ -58,7 +59,8 @@ def parallelize_simulations(
ExpIDs: List[int],
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
configured_n: List[N_Runs],
additional_objs=None
):

print(f'Execution Mode: parallelized')
Expand Down Expand Up @@ -96,7 +98,7 @@ def process_executor(params):
if len_configs_structs > 1:
pp = PPool(processes=len_configs_structs)
results = pp.map(
lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n), params
lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n, additional_objs), params
)
pp.close()
pp.join()
Expand All @@ -123,18 +125,19 @@ def local_simulations(
ExpIDs: List[int],
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
configured_n: List[N_Runs],
additional_objs=None
):
config_amt = len(configs_structs)

if config_amt == 1: # and configured_n != 1
return single_proc_exec(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs
)
elif config_amt > 1: # and configured_n != 1
return parallelize_simulations(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs
)
# elif config_amt > 1 and configured_n == 1:
50 changes: 31 additions & 19 deletions cadCAD/engine/simulation.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from typing import Any, Callable, Dict, List, Tuple
from copy import deepcopy
from types import MappingProxyType
from functools import reduce
from funcy import curry
from funcy import curry # type: ignore

from cadCAD.utils import flatten
from cadCAD.engine.utils import engine_exception
from cadCAD.types import *

id_exception: Callable = curry(engine_exception)(KeyError)(KeyError)(None)

Expand Down Expand Up @@ -102,20 +104,30 @@ def env_composition(target_field, state_dict, target_value):
# mech_step
def partial_state_update(
self,
sweep_dict: Dict[str, List[Any]],
sub_step: int,
sL,
sH,
state_funcs: List[Callable],
policy_funcs: List[Callable],
env_processes: Dict[str, Callable],
sweep_dict: Parameters,
sub_step: Substep,
sL: list[State],
sH: StateHistory,
state_funcs: List[StateUpdateFunction],
policy_funcs: List[PolicyFunction],
env_processes: EnvProcesses,
time_step: int,
run: int,
additional_objs
) -> List[Dict[str, Any]]:

# last_in_obj: Dict[str, Any] = MappingProxyType(sL[-1])
last_in_obj: Dict[str, Any] = deepcopy(sL[-1])
if type(additional_objs) == dict:
if additional_objs.get('deepcopy_off', False) == True:
last_in_obj = MappingProxyType(sL[-1])
if len(additional_objs) == 1:
additional_objs = None
# XXX: drop the additional objects if only used for deepcopy
# toggling.
else:
last_in_obj = deepcopy(sL[-1])
else:
last_in_obj = deepcopy(sL[-1])

_input: Dict[str, Any] = self.policy_update_exception(
self.get_policy_input(sweep_dict, sub_step, sH, last_in_obj, policy_funcs, additional_objs)
)
Expand Down Expand Up @@ -206,18 +218,18 @@ def run_pipeline(

def simulation(
self,
sweep_dict: Dict[str, List[Any]],
states_list: List[Dict[str, Any]],
sweep_dict: SweepableParameters,
states_list: StateHistory,
configs,
env_processes: Dict[str, Callable],
time_seq: range,
simulation_id: int,
env_processes: EnvProcesses,
time_seq: TimeSeq,
simulation_id: SimulationID,
run: int,
subset_id,
subset_window,
configured_N,
subset_id: SubsetID,
subset_window: SubsetWindow,
configured_N: int,
# remote_ind
additional_objs=None
additional_objs: Union[None, Dict]=None
):
run += 1
subset_window.appendleft(subset_id)
Expand Down
4 changes: 3 additions & 1 deletion cadCAD/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ class ConfigurationDict(TypedDict):
M: Union[Parameters, SweepableParameters] # Parameters / List of Parameter to Sweep


EnvProcesses = object
TargetValue = object
EnvProcess: Callable[[State, SweepableParameters, TargetValue], TargetValue]
EnvProcesses = dict[str, Callable]
TimeSeq = Iterator
SimulationID = int
Run = int
Expand Down
187 changes: 187 additions & 0 deletions testing/test_additional_objs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
from typing import Dict, List
from cadCAD.engine import Executor, ExecutionContext, ExecutionMode
from cadCAD.configuration import Experiment
from cadCAD.configuration.utils import env_trigger, var_substep_trigger, config_sim, psub_list
from cadCAD.types import *
import pandas as pd # type: ignore
import types
import inspect
import pytest

def describe_or_return(v: object) -> object:
"""
Thanks @LinuxIsCool!
"""
if isinstance(v, types.FunctionType):
return f'function: {v.__name__}'
elif isinstance(v, types.LambdaType) and v.__name__ == '<lambda>':
return f'lambda: {inspect.signature(v)}'
else:
return v


def select_M_dict(M_dict: Dict[str, object], keys: set) -> Dict[str, object]:
"""
Thanks @LinuxIsCool!
"""
return {k: describe_or_return(v) for k, v in M_dict.items() if k in keys}


def select_config_M_dict(configs: list, i: int, keys: set) -> Dict[str, object]:
return select_M_dict(configs[i].sim_config['M'], keys)


def drop_substeps(_df):
first_ind = (_df.substep == 0) & (_df.timestep == 0)
last_ind = _df.substep == max(_df.substep)
inds_to_drop = first_ind | last_ind
return _df.copy().loc[inds_to_drop].drop(columns=['substep'])


def assign_params(_df: pd.DataFrame, configs) -> pd.DataFrame:
"""
Based on `cadCAD-tools` package codebase, by @danlessa
"""
M_dict = configs[0].sim_config['M']
params_set = set(M_dict.keys())
selected_params = params_set

# Attribute parameters to each row
# 1. Assign the parameter set from the first row first, so that
# columns are created
first_param_dict = select_config_M_dict(configs, 0, selected_params)

# 2. Attribute parameter on an (simulation, subset, run) basis
df = _df.assign(**first_param_dict).copy()
for i, (_, subset_df) in enumerate(df.groupby(['simulation', 'subset', 'run'])):
df.loc[subset_df.index] = subset_df.assign(**select_config_M_dict(configs,
i,
selected_params))
return df




SWEEP_PARAMS: Dict[str, List] = {
'alpha': [1],
'beta': [lambda x: 2 * x, lambda x: x],
'gamma': [3, 4],
'omega': [7]
}

SINGLE_PARAMS: Dict[str, object] = {
'alpha': 1,
'beta': lambda x: x,
'gamma': 3,
'omega': 5
}


def create_experiment(N_RUNS=2, N_TIMESTEPS=3, params: dict=SWEEP_PARAMS):
psu_steps = ['m1', 'm2', 'm3']
system_substeps = len(psu_steps)
var_timestep_trigger = var_substep_trigger([0, system_substeps])
env_timestep_trigger = env_trigger(system_substeps)
env_process = {}


# ['s1', 's2', 's3', 's4']
# Policies per Mechanism
def gamma(params: Parameters, substep: Substep, history: StateHistory, state: State, **kwargs):
return {'gamma': params['gamma']}


def omega(params: Parameters, substep: Substep, history: StateHistory, state: State, **kwarg):
return {'omega': params['omega']}


# Internal States per Mechanism
def alpha(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'alpha_var', params['alpha']


def beta(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'beta_var', params['beta']

def gamma_var(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'gamma_var', params['gamma']

def omega_var(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'omega_var', params['omega']


def policies(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'policies', _input


def sweeped(params: Parameters, substep: Substep, history: StateHistory, state: State, _input: PolicyOutput, **kwargs):
return 'sweeped', {'beta': params['beta'], 'gamma': params['gamma']}

psu_block: dict = {k: {"policies": {}, "states": {}} for k in psu_steps}
for m in psu_steps:
psu_block[m]['policies']['gamma'] = gamma
psu_block[m]['policies']['omega'] = omega
psu_block[m]["states"]['alpha_var'] = alpha
psu_block[m]["states"]['beta_var'] = beta
psu_block[m]["states"]['gamma_var'] = gamma_var
psu_block[m]["states"]['omega_var'] = omega_var
psu_block[m]['states']['policies'] = policies
psu_block[m]["states"]['sweeped'] = var_timestep_trigger(y='sweeped', f=sweeped)


# Genesis States
genesis_states = {
'alpha_var': 0,
'beta_var': 0,
'gamma_var': 0,
'omega_var': 0,
'policies': {},
'sweeped': {}
}

# Environment Process
env_process['sweeped'] = env_timestep_trigger(trigger_field='timestep', trigger_vals=[5], funct_list=[lambda _g, x: _g['beta']])

sim_config = config_sim(
{
"N": N_RUNS,
"T": range(N_TIMESTEPS),
"M": params, # Optional
}
)

# New Convention
partial_state_update_blocks = psub_list(psu_block, psu_steps)

exp = Experiment()
exp.append_model(
sim_configs=sim_config,
initial_state=genesis_states,
env_processes=env_process,
partial_state_update_blocks=partial_state_update_blocks
)
return exp


def test_deepcopy_off():
exp = create_experiment()
mode = ExecutionMode().local_mode
exec_context = ExecutionContext(mode, additional_objs={'deepcopy_off': True})
executor = Executor(exec_context=exec_context, configs=exp.configs)
(records, tensor_field, _) = executor.execute()
df = drop_substeps(assign_params(pd.DataFrame(records), exp.configs))

# XXX: parameters should always be of the same type. Else, the test will fail
first_sim_config = exp.configs[0].sim_config['M']


for (i, row) in df.iterrows():
if row.timestep > 0:

assert row['alpha_var'] == row['alpha']
assert type(row['alpha_var']) == type(first_sim_config['alpha'])
assert row['gamma_var'] == row['gamma']
assert type(row['gamma_var']) == type(first_sim_config['gamma'])
assert row['omega_var'] == row['omega']
assert type(row['omega_var']) == type(first_sim_config['omega'])

Loading

0 comments on commit 0aa86be

Please sign in to comment.