From 11634ccaf7026d453da3f83e523b166226495052 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Speglich?= Date: Thu, 28 Apr 2022 14:07:36 +0100 Subject: [PATCH 1/3] ckp: implement blocked time stepping --- pyrevolve/pyrevolve.py | 78 +++++++++++++++++++++++++++++------------- 1 file changed, 54 insertions(+), 24 deletions(-) diff --git a/pyrevolve/pyrevolve.py b/pyrevolve/pyrevolve.py index a8ed59f..4aa21ef 100644 --- a/pyrevolve/pyrevolve.py +++ b/pyrevolve/pyrevolve.py @@ -1,5 +1,6 @@ from abc import ABCMeta, abstractproperty, abstractmethod import numpy as np +import math from . import crevolve as cr from .compression import init_compression as init from .schedulers import CRevolve, HRevolve, Action, Architecture @@ -69,11 +70,12 @@ def __init__( fwd_operator, rev_operator, n_checkpoints, - n_timesteps, + op_timesteps, storage_list=None, scheduler=None, timings=None, profiler=None, + block_size=1, ): """ Initialises checkpointer for a given forward- and reverse operator, a @@ -82,10 +84,10 @@ def __init__( methods and a scheduler object must be provided as well. Otherwise NumpyStorage and CRevolve are used as default """ - if n_timesteps is None: + if op_timesteps is None: raise Exception( "Online checkpointing not yet supported. Specify \ - number of time steps!" + number of Operator time steps!" ) if profiler is None: @@ -100,12 +102,15 @@ def __init__( self.checkpoint = checkpoint self.n_checkpoints = n_checkpoints - self.n_timesteps = n_timesteps + self.block_size = block_size + self.op_timesteps = op_timesteps self.timings = timings self.fwd_operator = fwd_operator self.rev_operator = rev_operator self.scheduler = scheduler + self.cp_timesteps = int(math.ceil(self.op_timesteps / self.block_size)) + def addStorage(self, new_storage): self.storage_list.append(new_storage) @@ -175,6 +180,20 @@ def makespan(self): def ratio(self): return 0 + @property + def op_old_capo(self): + return self.scheduler.old_capo * self.block_size + + @property + def op_capo(self): + _op_capo = self.scheduler.capo * self.block_size + return _op_capo if _op_capo < self.op_timesteps else self.op_timesteps + + @property + def next_op_capo(self): + _op_capo = (self.scheduler.capo + 1) * self.block_size + return _op_capo if _op_capo < self.op_timesteps else self.op_timesteps + def apply_forward(self): """Executes only the forward computation while storing checkpoints, then returns.""" @@ -185,7 +204,7 @@ def apply_forward(self): # advance forward computation with self.profiler.get_timer("forward", "advance"): self.fwd_operator.apply( - t_start=self.scheduler.old_capo, t_end=self.scheduler.capo + t_start=self.op_old_capo, t_end=self.op_capo ) elif action.type == Action.TAKESHOT: # take a snapshot: copy from workspace into storage @@ -199,7 +218,7 @@ def apply_forward(self): # final step in the forward computation with self.profiler.get_timer("forward", "lastfw"): self.fwd_operator.apply( - t_start=self.scheduler.old_capo, t_end=self.n_timesteps + t_start=self.op_old_capo, t_end=self.op_timesteps ) break elif action.type == Action.REVERSE: @@ -224,10 +243,10 @@ def apply_reverse(self): # advance adjoint computation by a single step with self.profiler.get_timer("reverse", "reverse"): self.fwd_operator.apply( - t_start=self.scheduler.capo, t_end=self.scheduler.capo + 1 + t_start=self.op_capo, t_end=self.next_op_capo ) self.rev_operator.apply( - t_start=self.scheduler.capo, t_end=self.scheduler.capo + 1 + t_start=self.op_capo, t_end=self.next_op_capo ) elif action.type == Action.REVSTART: """Sets the rev_operator to 'nt' only if its not already there. @@ -236,7 +255,7 @@ def apply_reverse(self): """ with self.profiler.get_timer("reverse", "reverse"): self.rev_operator.apply( - t_start=self.scheduler.capo, t_end=self.scheduler.capo + 1 + t_start=self.op_capo, t_end=self.next_op_capo ) elif action.type == Action.TAKESHOT: # take a snapshot: copy from workspace into storage @@ -246,7 +265,7 @@ def apply_reverse(self): # advance forward computation with self.profiler.get_timer("reverse", "advance"): self.fwd_operator.apply( - t_start=self.scheduler.old_capo, t_end=self.scheduler.capo + t_start=self.op_old_capo, t_end=self.op_capo ) elif action.type == Action.RESTORE: # restore a snapshot: copy from storage into workspace @@ -303,13 +322,14 @@ def __init__( fwd_operator, rev_operator, n_checkpoints, - n_timesteps, + op_timesteps, timings=None, profiler=None, compression_params=None, diskstorage=False, filedir="./", singlefile=True, + block_size=1, ): """ Initializes a single-level Revolver @@ -318,7 +338,7 @@ def __init__( fwd_operator: forward operator rev_operator: backward operator n_checkpoints: number of checkpoints - n_timesteps: number of timesteps + op_timesteps: number of timesteps timings: timings profiler: Profiler compression_params: compression scheme @@ -331,20 +351,21 @@ def __init__( fwd_operator, rev_operator, n_checkpoints, - n_timesteps, + op_timesteps, timings=timings, profiler=profiler, + block_size=block_size, ) self.filedir = filedir self.singlefile = singlefile if n_checkpoints is None: - self.n_checkpoints = cr.adjust(n_timesteps) + self.n_checkpoints = cr.adjust(self.cp_timesteps) else: self.n_checkpoints = n_checkpoints - self.scheduler = CRevolve(self.n_checkpoints, self.n_timesteps) + self.scheduler = CRevolve(self.n_checkpoints, self.cp_timesteps) # remove storage list to avoid memory overflow self.resetStorageList() @@ -388,13 +409,14 @@ def __init__( checkpoint, fwd_operator, rev_operator, - n_timesteps, + op_timesteps, storage_list, timings=None, profiler=None, uf=1, ub=1, up=1, + block_size=1, ): """ Initializes a multi-level Revolver using HRevolve @@ -408,7 +430,8 @@ def __init__( checkpoint: checkpoint object fwd_operator: forward operator rev_operator: backward operator - n_timesteps: number of timesteps + n_checkpoints: number of checkpoints + op_timesteps: number of timesteps timings: timings profiler: profiler storage_list: list of storage objects @@ -420,12 +443,14 @@ def __init__( checkpoint, fwd_operator, rev_operator, - n_timesteps, - n_timesteps, + op_timesteps, + op_timesteps, storage_list=storage_list, timings=timings, profiler=profiler, + block_size=block_size, ) + self.uf = uf # forward cost (default=1) self.ub = ub # backward cost (default=1) self.up = up # turn cost (default=1) @@ -449,7 +474,8 @@ def reload_scheduler(self, uf=1, ub=1, up=1): self.up = up self.arch = Architecture(self.storage_list) self.scheduler = HRevolve( - self.n_checkpoints, self.n_timesteps, self.arch, self.uf, self.ub, self.up + self.n_checkpoints, self.cp_timesteps, self.arch, self.uf, self.ub, + self.up ) else: raise ValueError( @@ -498,21 +524,23 @@ def __init__( fwd_operator, rev_operator, n_checkpoints, - n_timesteps, + op_timesteps, timings=None, profiler=None, compression_params=None, + block_size=1, ): super().__init__( checkpoint, fwd_operator, rev_operator, n_checkpoints, - n_timesteps, + op_timesteps, timings=timings, profiler=profiler, compression_params=compression_params, diskstorage=False, + block_size=block_size, ) @@ -537,23 +565,25 @@ def __init__( fwd_operator, rev_operator, n_checkpoints, - n_timesteps, + op_timesteps, timings=None, profiler=None, filedir="./", singlefile=True, + block_size=1, ): super().__init__( checkpoint, fwd_operator, rev_operator, n_checkpoints, - n_timesteps, + op_timesteps, timings=timings, profiler=profiler, diskstorage=True, filedir=filedir, singlefile=singlefile, + block_size=block_size, ) From 7f355149afac012ac171fd319e395cb2f90f14f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Speglich?= Date: Thu, 28 Apr 2022 14:09:21 +0100 Subject: [PATCH 2/3] ckp: remove n_checkpoint = n_timestep dependencie from H-Revolve --- pyrevolve/schedulers/hrevolve.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pyrevolve/schedulers/hrevolve.py b/pyrevolve/schedulers/hrevolve.py index 9ef8f3b..30931e6 100644 --- a/pyrevolve/schedulers/hrevolve.py +++ b/pyrevolve/schedulers/hrevolve.py @@ -630,12 +630,6 @@ class HRevolve(Scheduler): def __init__(self, n_checkpoints, n_timesteps, architecture=None, uf=1, ub=1, up=1): super().__init__(n_checkpoints, n_timesteps) - if n_checkpoints != n_timesteps: - raise ValueError( - "HRevolveError: the number of checkpoints \ - must be equal to the number of timesteps" - ) - self.hsequence = None if architecture is None: self.architecture = Architecture() # loads default arch From 573ae8e7a9f702aecbc2a6535ee1e7a7a50a3472 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Speglich?= Date: Thu, 28 Apr 2022 14:10:23 +0100 Subject: [PATCH 3/3] tests: add blocked time stepping parameter to PyRevolve tests --- tests/test_multilevel.py | 34 +++++++++++++++++++++------------- tests/test_storage.py | 10 ++++++---- tests/utils.py | 8 ++++++-- 3 files changed, 33 insertions(+), 19 deletions(-) diff --git a/tests/test_multilevel.py b/tests/test_multilevel.py index 3b5b124..a62b166 100644 --- a/tests/test_multilevel.py +++ b/tests/test_multilevel.py @@ -14,10 +14,11 @@ @pytest.mark.parametrize("drd", [0, 2]) @pytest.mark.parametrize("uf", [1, 2]) @pytest.mark.parametrize("ub", [1, 2]) -def test_forward_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile): +@pytest.mark.parametrize("block_size", [1, 5, 10]) +def test_forward_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile, block_size): nx = 10 ny = 10 - df = np.zeros([nx, ny]) + df = np.zeros([block_size, nx, ny]) db = np.zeros([nx, ny]) cp = IncrementCheckpoint([df, db]) f = IncOperator(1, df) @@ -28,7 +29,8 @@ def test_forward_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile): cp.size, nt, cp.dtype, filedir="./", singlefile=singlefile, wd=dwd, rd=drd ) st_list = [npStorage, dkStorage] - rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub) + rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub, + block_size=block_size) assert f.counter == 0 rev.apply_forward() assert f.counter == nt @@ -42,10 +44,11 @@ def test_forward_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile): @pytest.mark.parametrize("drd", [0, 2]) @pytest.mark.parametrize("uf", [1, 2]) @pytest.mark.parametrize("ub", [1, 2]) -def test_reverse_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile): +@pytest.mark.parametrize("block_size", [1, 5, 10]) +def test_reverse_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile, block_size): nx = 10 ny = 10 - df = np.zeros([nx, ny]) + df = np.zeros([block_size, nx, ny]) db = np.zeros([nx, ny]) cp = IncrementCheckpoint([df]) f = IncOperator(1, df) @@ -56,7 +59,8 @@ def test_reverse_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile): cp.size, nt, cp.dtype, filedir="./", singlefile=singlefile, wd=dwd, rd=drd ) st_list = [npStorage, dkStorage] - rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub) + rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub, + block_size=block_size) rev.apply_forward() assert f.counter == nt @@ -73,7 +77,8 @@ def test_reverse_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile): @pytest.mark.parametrize("drd", [0, 2]) @pytest.mark.parametrize("uf", [1, 2]) @pytest.mark.parametrize("ub", [1, 2]) -def test_num_loads_and_saves(nt, mwd, mrd, dwd, drd, uf, ub, singlefile): +@pytest.mark.parametrize("block_size", [1, 5, 10]) +def test_num_loads_and_saves(nt, mwd, mrd, dwd, drd, uf, ub, singlefile, block_size): cp = SimpleCheckpoint() f = SimpleOperator() b = SimpleOperator() @@ -83,7 +88,8 @@ def test_num_loads_and_saves(nt, mwd, mrd, dwd, drd, uf, ub, singlefile): cp.size, nt, cp.dtype, filedir="./", singlefile=singlefile, wd=dwd, rd=drd ) st_list = [npStorage, dkStorage] - rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub) + rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub, + block_size=block_size) rev.apply_forward() assert cp.load_counter == 0 @@ -99,7 +105,8 @@ def test_num_loads_and_saves(nt, mwd, mrd, dwd, drd, uf, ub, singlefile): @pytest.mark.parametrize("drd", [0, 2]) @pytest.mark.parametrize("uf", [1, 2]) @pytest.mark.parametrize("ub", [1, 2]) -def test_multi_and_single_outputs(nt, mwd, mrd, dwd, drd, uf, ub): +@pytest.mark.parametrize("block_size", [1, 5, 10]) +def test_multi_and_single_outputs(nt, mwd, mrd, dwd, drd, uf, ub, block_size): """ Tests whether SingleLevelRevolver and MultilevelRevolver are producing the same outputs @@ -107,12 +114,12 @@ def test_multi_and_single_outputs(nt, mwd, mrd, dwd, drd, uf, ub): nx = 10 ny = 10 const = 1 - m_df = np.zeros([nx, ny]) + m_df = np.zeros([block_size, nx, ny]) m_db = np.zeros([nx, ny]) m_cp = IncrementCheckpoint([m_df]) m_fwd = IncOperator(const, m_df) m_rev = IncOperator((-1) * const, m_df, m_db) - s_df = np.zeros([nx, ny]) + s_df = np.zeros([block_size, nx, ny]) s_db = np.zeros([nx, ny]) s_cp = IncrementCheckpoint([s_df]) s_fwd = IncOperator(const, s_df) @@ -124,10 +131,11 @@ def test_multi_and_single_outputs(nt, mwd, mrd, dwd, drd, uf, ub): ) st_list = [m_npStorage, m_dkStorage] m_wrp = MultiLevelRevolver( - m_cp, m_fwd, m_rev, nt, storage_list=st_list, uf=uf, ub=ub + m_cp, m_fwd, m_rev, nt, storage_list=st_list, uf=uf, ub=ub, + block_size=block_size ) - s_wrp = MemoryRevolver(s_cp, s_fwd, s_rev, nt, nt) + s_wrp = MemoryRevolver(s_cp, s_fwd, s_rev, nt, nt, block_size=block_size) m_wrp.apply_forward() s_wrp.apply_forward() diff --git a/tests/test_storage.py b/tests/test_storage.py index 7736f35..dfa3e87 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -30,8 +30,9 @@ def test_save_and_restore_with_compression(scheme): (10, 9), (10, 10), (10, 11), (10, 12)]) @pytest.mark.parametrize("singlefile", [True, False]) @pytest.mark.parametrize("diskckp", [True, False]) -def test_forward_nt(nt, ncp, singlefile, diskckp): - df = np.zeros([nt, ncp]) +@pytest.mark.parametrize("block_size", [1]) +def test_forward_nt(nt, ncp, singlefile, diskckp, block_size): + df = np.zeros([block_size, nt, ncp]) db = np.zeros([nt, ncp]) cp = IncrementCheckpoint([df, db]) f = IncOperator(1, df) @@ -50,8 +51,9 @@ def test_forward_nt(nt, ncp, singlefile, diskckp): (10, 9), (10, 10), (10, 11), (10, 12)]) @pytest.mark.parametrize("singlefile", [True, False]) @pytest.mark.parametrize("diskckp", [True, False]) -def test_reverse_nt(nt, ncp, singlefile, diskckp): - df = np.zeros([nt, ncp]) +@pytest.mark.parametrize("block_size", [1]) +def test_reverse_nt(nt, ncp, singlefile, diskckp, block_size): + df = np.zeros([block_size, nt, ncp]) db = np.zeros([nt, ncp]) cp = IncrementCheckpoint([df]) f = IncOperator(1, df) diff --git a/tests/utils.py b/tests/utils.py index da7fdfe..2aaddd2 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -123,7 +123,11 @@ def apply(self, **kwargs): t_end = kwargs['t_end'] assert(t_start <= t_end) if self.direction == 1: - self.u[:] = self.u[:] + self.direction * abs(t_start - t_end) + for t in range(t_start, t_end): + idx = t % np.shape(self.u)[0] + past_idx = (t-1) % np.shape(self.u)[0] + self.u[idx][:] = self.u[past_idx][:] + self.direction * 1 else: - self.v[:] = (self.u[:]*(-1) + 1) + idx = (t_start) % np.shape(self.u)[0] + self.v[:] = (self.u[idx][:]*(-1) + 1) self.counter += abs(t_end - t_start)