Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Blocked timestepping support to PyRevolve #55

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 54 additions & 24 deletions pyrevolve/pyrevolve.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from abc import ABCMeta, abstractproperty, abstractmethod
import numpy as np
import math
from . import crevolve as cr
from .compression import init_compression as init
from .schedulers import CRevolve, HRevolve, Action, Architecture
Expand Down Expand Up @@ -69,11 +70,12 @@ def __init__(
fwd_operator,
rev_operator,
n_checkpoints,
n_timesteps,
op_timesteps,
storage_list=None,
scheduler=None,
timings=None,
profiler=None,
block_size=1,
):
"""
Initialises checkpointer for a given forward- and reverse operator, a
Expand All @@ -82,10 +84,10 @@ def __init__(
methods and a scheduler object must be provided as well. Otherwise
NumpyStorage and CRevolve are used as default
"""
if n_timesteps is None:
if op_timesteps is None:
raise Exception(
"Online checkpointing not yet supported. Specify \
number of time steps!"
number of Operator time steps!"
)

if profiler is None:
Expand All @@ -100,12 +102,15 @@ def __init__(

self.checkpoint = checkpoint
self.n_checkpoints = n_checkpoints
self.n_timesteps = n_timesteps
self.block_size = block_size
self.op_timesteps = op_timesteps
self.timings = timings
self.fwd_operator = fwd_operator
self.rev_operator = rev_operator
self.scheduler = scheduler

self.cp_timesteps = int(math.ceil(self.op_timesteps / self.block_size))

def addStorage(self, new_storage):
self.storage_list.append(new_storage)

Expand Down Expand Up @@ -175,6 +180,20 @@ def makespan(self):
def ratio(self):
return 0

@property
def op_old_capo(self):
return self.scheduler.old_capo * self.block_size

@property
def op_capo(self):
_op_capo = self.scheduler.capo * self.block_size
return _op_capo if _op_capo < self.op_timesteps else self.op_timesteps

@property
def next_op_capo(self):
_op_capo = (self.scheduler.capo + 1) * self.block_size
return _op_capo if _op_capo < self.op_timesteps else self.op_timesteps

def apply_forward(self):
"""Executes only the forward computation while storing checkpoints,
then returns."""
Expand All @@ -185,7 +204,7 @@ def apply_forward(self):
# advance forward computation
with self.profiler.get_timer("forward", "advance"):
self.fwd_operator.apply(
t_start=self.scheduler.old_capo, t_end=self.scheduler.capo
t_start=self.op_old_capo, t_end=self.op_capo
)
elif action.type == Action.TAKESHOT:
# take a snapshot: copy from workspace into storage
Expand All @@ -199,7 +218,7 @@ def apply_forward(self):
# final step in the forward computation
with self.profiler.get_timer("forward", "lastfw"):
self.fwd_operator.apply(
t_start=self.scheduler.old_capo, t_end=self.n_timesteps
t_start=self.op_old_capo, t_end=self.op_timesteps
)
break
elif action.type == Action.REVERSE:
Expand All @@ -224,10 +243,10 @@ def apply_reverse(self):
# advance adjoint computation by a single step
with self.profiler.get_timer("reverse", "reverse"):
self.fwd_operator.apply(
t_start=self.scheduler.capo, t_end=self.scheduler.capo + 1
t_start=self.op_capo, t_end=self.next_op_capo
)
self.rev_operator.apply(
t_start=self.scheduler.capo, t_end=self.scheduler.capo + 1
t_start=self.op_capo, t_end=self.next_op_capo
)
elif action.type == Action.REVSTART:
"""Sets the rev_operator to 'nt' only if its not already there.
Expand All @@ -236,7 +255,7 @@ def apply_reverse(self):
"""
with self.profiler.get_timer("reverse", "reverse"):
self.rev_operator.apply(
t_start=self.scheduler.capo, t_end=self.scheduler.capo + 1
t_start=self.op_capo, t_end=self.next_op_capo
)
elif action.type == Action.TAKESHOT:
# take a snapshot: copy from workspace into storage
Expand All @@ -246,7 +265,7 @@ def apply_reverse(self):
# advance forward computation
with self.profiler.get_timer("reverse", "advance"):
self.fwd_operator.apply(
t_start=self.scheduler.old_capo, t_end=self.scheduler.capo
t_start=self.op_old_capo, t_end=self.op_capo
)
elif action.type == Action.RESTORE:
# restore a snapshot: copy from storage into workspace
Expand Down Expand Up @@ -303,13 +322,14 @@ def __init__(
fwd_operator,
rev_operator,
n_checkpoints,
n_timesteps,
op_timesteps,
timings=None,
profiler=None,
compression_params=None,
diskstorage=False,
filedir="./",
singlefile=True,
block_size=1,
):
"""
Initializes a single-level Revolver
Expand All @@ -318,7 +338,7 @@ def __init__(
fwd_operator: forward operator
rev_operator: backward operator
n_checkpoints: number of checkpoints
n_timesteps: number of timesteps
op_timesteps: number of timesteps
timings: timings
profiler: Profiler
compression_params: compression scheme
Expand All @@ -331,20 +351,21 @@ def __init__(
fwd_operator,
rev_operator,
n_checkpoints,
n_timesteps,
op_timesteps,
timings=timings,
profiler=profiler,
block_size=block_size,
)

self.filedir = filedir
self.singlefile = singlefile

if n_checkpoints is None:
self.n_checkpoints = cr.adjust(n_timesteps)
self.n_checkpoints = cr.adjust(self.cp_timesteps)
else:
self.n_checkpoints = n_checkpoints

self.scheduler = CRevolve(self.n_checkpoints, self.n_timesteps)
self.scheduler = CRevolve(self.n_checkpoints, self.cp_timesteps)

# remove storage list to avoid memory overflow
self.resetStorageList()
Expand Down Expand Up @@ -388,13 +409,14 @@ def __init__(
checkpoint,
fwd_operator,
rev_operator,
n_timesteps,
op_timesteps,
storage_list,
timings=None,
profiler=None,
uf=1,
ub=1,
up=1,
block_size=1,
):
"""
Initializes a multi-level Revolver using HRevolve
Expand All @@ -408,7 +430,8 @@ def __init__(
checkpoint: checkpoint object
fwd_operator: forward operator
rev_operator: backward operator
n_timesteps: number of timesteps
n_checkpoints: number of checkpoints
op_timesteps: number of timesteps
timings: timings
profiler: profiler
storage_list: list of storage objects
Expand All @@ -420,12 +443,14 @@ def __init__(
checkpoint,
fwd_operator,
rev_operator,
n_timesteps,
n_timesteps,
op_timesteps,
op_timesteps,
storage_list=storage_list,
timings=timings,
profiler=profiler,
block_size=block_size,
)

self.uf = uf # forward cost (default=1)
self.ub = ub # backward cost (default=1)
self.up = up # turn cost (default=1)
Expand All @@ -449,7 +474,8 @@ def reload_scheduler(self, uf=1, ub=1, up=1):
self.up = up
self.arch = Architecture(self.storage_list)
self.scheduler = HRevolve(
self.n_checkpoints, self.n_timesteps, self.arch, self.uf, self.ub, self.up
self.n_checkpoints, self.cp_timesteps, self.arch, self.uf, self.ub,
self.up
)
else:
raise ValueError(
Expand Down Expand Up @@ -498,21 +524,23 @@ def __init__(
fwd_operator,
rev_operator,
n_checkpoints,
n_timesteps,
op_timesteps,
timings=None,
profiler=None,
compression_params=None,
block_size=1,
):
super().__init__(
checkpoint,
fwd_operator,
rev_operator,
n_checkpoints,
n_timesteps,
op_timesteps,
timings=timings,
profiler=profiler,
compression_params=compression_params,
diskstorage=False,
block_size=block_size,
)


Expand All @@ -537,23 +565,25 @@ def __init__(
fwd_operator,
rev_operator,
n_checkpoints,
n_timesteps,
op_timesteps,
timings=None,
profiler=None,
filedir="./",
singlefile=True,
block_size=1,
):
super().__init__(
checkpoint,
fwd_operator,
rev_operator,
n_checkpoints,
n_timesteps,
op_timesteps,
timings=timings,
profiler=profiler,
diskstorage=True,
filedir=filedir,
singlefile=singlefile,
block_size=block_size,
)


Expand Down
6 changes: 0 additions & 6 deletions pyrevolve/schedulers/hrevolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,12 +630,6 @@ class HRevolve(Scheduler):
def __init__(self, n_checkpoints, n_timesteps, architecture=None, uf=1, ub=1, up=1):
super().__init__(n_checkpoints, n_timesteps)

if n_checkpoints != n_timesteps:
raise ValueError(
"HRevolveError: the number of checkpoints \
must be equal to the number of timesteps"
)

self.hsequence = None
if architecture is None:
self.architecture = Architecture() # loads default arch
Expand Down
34 changes: 21 additions & 13 deletions tests/test_multilevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,11 @@
@pytest.mark.parametrize("drd", [0, 2])
@pytest.mark.parametrize("uf", [1, 2])
@pytest.mark.parametrize("ub", [1, 2])
def test_forward_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile):
@pytest.mark.parametrize("block_size", [1, 5, 10])
def test_forward_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile, block_size):
nx = 10
ny = 10
df = np.zeros([nx, ny])
df = np.zeros([block_size, nx, ny])
db = np.zeros([nx, ny])
cp = IncrementCheckpoint([df, db])
f = IncOperator(1, df)
Expand All @@ -28,7 +29,8 @@ def test_forward_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile):
cp.size, nt, cp.dtype, filedir="./", singlefile=singlefile, wd=dwd, rd=drd
)
st_list = [npStorage, dkStorage]
rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub)
rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub,
block_size=block_size)
assert f.counter == 0
rev.apply_forward()
assert f.counter == nt
Expand All @@ -42,10 +44,11 @@ def test_forward_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile):
@pytest.mark.parametrize("drd", [0, 2])
@pytest.mark.parametrize("uf", [1, 2])
@pytest.mark.parametrize("ub", [1, 2])
def test_reverse_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile):
@pytest.mark.parametrize("block_size", [1, 5, 10])
def test_reverse_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile, block_size):
nx = 10
ny = 10
df = np.zeros([nx, ny])
df = np.zeros([block_size, nx, ny])
db = np.zeros([nx, ny])
cp = IncrementCheckpoint([df])
f = IncOperator(1, df)
Expand All @@ -56,7 +59,8 @@ def test_reverse_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile):
cp.size, nt, cp.dtype, filedir="./", singlefile=singlefile, wd=dwd, rd=drd
)
st_list = [npStorage, dkStorage]
rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub)
rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub,
block_size=block_size)

rev.apply_forward()
assert f.counter == nt
Expand All @@ -73,7 +77,8 @@ def test_reverse_nt(nt, mwd, mrd, dwd, drd, uf, ub, singlefile):
@pytest.mark.parametrize("drd", [0, 2])
@pytest.mark.parametrize("uf", [1, 2])
@pytest.mark.parametrize("ub", [1, 2])
def test_num_loads_and_saves(nt, mwd, mrd, dwd, drd, uf, ub, singlefile):
@pytest.mark.parametrize("block_size", [1, 5, 10])
def test_num_loads_and_saves(nt, mwd, mrd, dwd, drd, uf, ub, singlefile, block_size):
cp = SimpleCheckpoint()
f = SimpleOperator()
b = SimpleOperator()
Expand All @@ -83,7 +88,8 @@ def test_num_loads_and_saves(nt, mwd, mrd, dwd, drd, uf, ub, singlefile):
cp.size, nt, cp.dtype, filedir="./", singlefile=singlefile, wd=dwd, rd=drd
)
st_list = [npStorage, dkStorage]
rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub)
rev = MultiLevelRevolver(cp, f, b, nt, storage_list=st_list, uf=uf, ub=ub,
block_size=block_size)

rev.apply_forward()
assert cp.load_counter == 0
Expand All @@ -99,20 +105,21 @@ def test_num_loads_and_saves(nt, mwd, mrd, dwd, drd, uf, ub, singlefile):
@pytest.mark.parametrize("drd", [0, 2])
@pytest.mark.parametrize("uf", [1, 2])
@pytest.mark.parametrize("ub", [1, 2])
def test_multi_and_single_outputs(nt, mwd, mrd, dwd, drd, uf, ub):
@pytest.mark.parametrize("block_size", [1, 5, 10])
def test_multi_and_single_outputs(nt, mwd, mrd, dwd, drd, uf, ub, block_size):
"""
Tests whether SingleLevelRevolver and MultilevelRevolver are producing
the same outputs
"""
nx = 10
ny = 10
const = 1
m_df = np.zeros([nx, ny])
m_df = np.zeros([block_size, nx, ny])
m_db = np.zeros([nx, ny])
m_cp = IncrementCheckpoint([m_df])
m_fwd = IncOperator(const, m_df)
m_rev = IncOperator((-1) * const, m_df, m_db)
s_df = np.zeros([nx, ny])
s_df = np.zeros([block_size, nx, ny])
s_db = np.zeros([nx, ny])
s_cp = IncrementCheckpoint([s_df])
s_fwd = IncOperator(const, s_df)
Expand All @@ -124,10 +131,11 @@ def test_multi_and_single_outputs(nt, mwd, mrd, dwd, drd, uf, ub):
)
st_list = [m_npStorage, m_dkStorage]
m_wrp = MultiLevelRevolver(
m_cp, m_fwd, m_rev, nt, storage_list=st_list, uf=uf, ub=ub
m_cp, m_fwd, m_rev, nt, storage_list=st_list, uf=uf, ub=ub,
block_size=block_size
)

s_wrp = MemoryRevolver(s_cp, s_fwd, s_rev, nt, nt)
s_wrp = MemoryRevolver(s_cp, s_fwd, s_rev, nt, nt, block_size=block_size)

m_wrp.apply_forward()
s_wrp.apply_forward()
Expand Down
Loading