Skip to content

Commit

Permalink
cocotb port optimizations, fixes and working multi-clocking
Browse files Browse the repository at this point in the history
  • Loading branch information
psychogenic committed Nov 22, 2024
1 parent d9102ce commit c729d6c
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from examples.tt_um_psychogenic_neptuneproportional.tt_um_psychogenic_neptuneproportional import run
# from examples.tt_um_psychogenic_neptuneproportional.tt_um_psychogenic_neptuneproportional import run

61 changes: 39 additions & 22 deletions src/examples/tt_um_psychogenic_neptuneproportional/tb.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,15 @@
@copyright: Copyright (C) 2024 Pat Deegan, https://psychogenic.com
'''
import math
import gc
from ttboard.demoboard import DemoBoard
gc.collect()
print(f'mf tb: {gc.mem_free()}')
from ttboard.cocotb.clock import Clock
print(f'mf tb: {gc.mem_free()}')
from ttboard.cocotb.triggers import Timer, ClockCycles # RisingEdge, FallingEdge, Timer, ClockCycles

print(f'mf tb: {gc.mem_free()}')
import ttboard.cocotb as cocotb

displayNotes = {
Expand Down Expand Up @@ -76,17 +82,19 @@ async def getDisplayValues(dut):
# dut._log.info(f'Display Segments: {displayedValues} ( [ {bin(displayedValues[0])} , {bin(displayedValues[1])}])')
return displayedValues

async def inputPulsesFor(dut, tunerInputFreqHz:int, inputTimeSecs=0.51, sysClkHz=1e3):
numPulses = tunerInputFreqHz * inputTimeSecs
pulsePeriod = 1/tunerInputFreqHz
pulseHalfCycleUs = round(1e6*pulsePeriod/2.0)

for _pidx in range(math.ceil(numPulses)):
dut.input_pulse.value = 1
await Timer(pulseHalfCycleUs, units='us')
dut.input_pulse.value = 0
await Timer(pulseHalfCycleUs, units='us')

async def inputPulsesFor(dut, tunerInputFreqHz:int, inputTimeSecs=0.51):
#numPulses = tunerInputFreqHz * inputTimeSecs
#pulsePeriod = 1/tunerInputFreqHz
#pulseHalfCycleUs = round(1e6*pulsePeriod/2.0)

pulseClock = Clock(dut.input_pulse, 1000*(1.0/tunerInputFreqHz), units='ms')
cocotb.start_soon(pulseClock.start())
# for _pidx in range(math.ceil(numPulses)):
# dut.input_pulse.value = 1
# await Timer(pulseHalfCycleUs, units='us')
# dut.input_pulse.value = 0
# await Timer(pulseHalfCycleUs, units='us')
await Timer(inputTimeSecs, 'sec')
dispV = await getDisplayValues(dut)

return dispV
Expand All @@ -98,7 +106,7 @@ async def setup_tuner(dut):
await startup(dut)


async def note_toggle(dut, freq, delta=0, msg="", toggleTime=4):
async def note_toggle(dut, freq, delta=0, msg="", toggleTime=1.2):
dut._log.info(msg)
await startup(dut)
dispValues = await inputPulsesFor(dut, freq + delta, toggleTime)
Expand All @@ -109,16 +117,18 @@ async def note_toggle(dut, freq, delta=0, msg="", toggleTime=4):
async def note_e(dut, eFreq=330, delta=0, msg=""):
dut._log.info(f"E @ {eFreq} delta {delta}")
dispValues = await note_toggle(dut, freq=eFreq, delta=delta, msg=msg);
assert dispValues[1] == (displayNotes['E'] & SegmentMask), "Note E FAIL"
dut._log.info(f"Note E @ {eFreq} pass")
note_target = (displayNotes['E'] & SegmentMask)
assert dispValues[1] == note_target, f"Note E FAIL: {dispValues[1]} != {note_target}"
dut._log.info(f"Note E @ {eFreq} pass ({bin(dispValues[1])})")
return dispValues



@cocotb.test()
async def note_e_highfar(dut):
dispValues = await note_e(dut, eFreq=330, delta=20, msg="little E high/far")
assert dispValues[0] == (displayProx['hifar'] & ProxSegMask) , "High/far Fail"
dispValues = await note_e(dut, eFreq=330, delta=12, msg="little E high/far")
target_value = (displayProx['hifar'] & ProxSegMask)
assert dispValues[0] == target_value, f"high/far fail {dispValues[0]} != {target_value}"
dut._log.info("Note E full pass")


Expand All @@ -128,14 +138,17 @@ async def note_g(dut, delta=0, msg=""):

dut._log.info(f"G delta {delta}")
dispValues = await note_toggle(dut, freq=gFreq, delta=delta, msg=msg);
assert dispValues[1] == (displayNotes['G'] & SegmentMask), "Note G FAIL"
dut._log.info("Note G: PASS")

note_target = (displayNotes['G'] & SegmentMask)
assert dispValues[1] == note_target, f"Note G FAIL: {dispValues[1]} != {note_target}"
dut._log.info(f"Note G: PASS ({bin(dispValues[1])})")
return dispValues

@cocotb.test()
async def note_g_highclose(dut):
dispValues = await note_g(dut, delta=3, msg="High/close")
assert dispValues[0] == (displayProx['hiclose'] & ProxSegMask) , "High/close fail"
target_value = (displayProx['hiclose'] & ProxSegMask)
assert dispValues[0] == target_value, f"High/close fail {dispValues[0]} != {target_value}"
dut._log.info("Note G full pass")


Expand All @@ -145,14 +158,18 @@ async def note_a(dut, delta=0, msg=""):

dut._log.info(f"A delta {delta}")
dispValues = await note_toggle(dut, freq=aFreq, delta=delta, msg=msg);
assert dispValues[1] == (displayNotes['A'] & SegmentMask), "Note A fail"
dut._log.info("Note A pass")

note_target = (displayNotes['A'] & SegmentMask)
assert dispValues[1] == note_target, f"Note A FAIL: {dispValues[1]} != {note_target}"
dut._log.info(f"Note A pass ({bin(dispValues[1])})")
return dispValues

@cocotb.test()
async def note_a_exact(dut):
dispValues = await note_a(dut, delta=0, msg="A exact")
assert dispValues[0] == (displayProx['exact'] & ProxSegMask) , "exact fail"

target_value = (displayProx['exact'] & ProxSegMask)
assert dispValues[0] == target_value, f"exact fail {dispValues[0]} != {target_value}"
dut._log.info("Note A full pass")

def main():
Expand Down
4 changes: 3 additions & 1 deletion src/ttboard/cocotb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@ def add_test(self, func, name:str=None):
self.tests_to_run[name] = func

def test(self, dut):
from ttboard.cocotb.time import SystemTime
failures = 0
for nm in self.test_names:
SystemTime.reset()
try:
self.tests_to_run[nm](dut)
except Exception as e:
Expand Down Expand Up @@ -64,7 +66,7 @@ def my_decorator_func(func):
runner = Runner.get()
test_name = func.__name__ if name is None else name
def wrapper_func(dut):
dut._log.info(f"Running {test_name}")
dut._log.info(f"Running Test: {test_name}")
asyncio.run(func(dut))

def skipper_func(dut):
Expand Down
28 changes: 22 additions & 6 deletions src/ttboard/cocotb/clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,23 @@ def __init__(self, signal, period, units):
self.running = False

self.half_period = TimeValue(period/2, units)
self.next_toggle = TimeValue(period/2, units)
self.next_toggle = TimeValue((period/2) - 1, units)

self.current_signal_value = 0
self.sleep_us = 0
half_per_secs = float(self.half_period)
if half_per_secs > 250e-6:
self.sleep_us = round(half_per_secs*1e-6)
if half_per_secs > 1e-3:
self.sleep_us = round(half_per_secs*1e6)

self._toggle_count = 0
self._period = None


@property
def period(self):
if self._period is None:
self._period = self.half_period * 2

return self._period
@property
def event_interval(self):
return self.half_period
Expand All @@ -61,12 +68,18 @@ def num_events_in(self, time_or_timevalue:int, units:str=None):
return tv / self.half_period

def time_has_passed(self):
while self.next_toggle <= SystemTime.current():
#print(f"time passed to {SystemTime.current()} next is {self.next_toggle}")
while self.next_toggle < SystemTime.current():
self.toggle()
self.next_toggle += self.half_period
#print(f"SMALL, next toggle {self.next_toggle}")


#print(f"increment done {SystemTime.current()} next is {self.next_toggle}")
#raise Exception('fuuuuk')

def toggle(self):
# print(f"toggle {self._toggle_count}")
# print(f"toggle!")
new_val = 1 if not self.current_signal_value else 0
self.signal.value = new_val
self.current_signal_value = new_val
Expand All @@ -78,4 +91,7 @@ def tick(self):
self.toggle()
self.toggle()

def __repr__(self):
return f'<Clock {self.period} on {self.signal}>'


22 changes: 18 additions & 4 deletions src/ttboard/cocotb/time.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,42 @@ def scale(cls, units:str):

@classmethod
def time_to_clockticks(cls, clock, t:int, units:str):
time_secs = t*cls.scale(units)
return round(time_secs / clock.period_s)
tval = TimeValue(t, units)
return round(tval / clock.period)

@classmethod
def rescale(cls,t:int, units:str, to_units:str):
if units == to_units:
return t
return t*(cls.scale(units)/cls.scale(to_units))

class TimeValue:
def __init__(self, time:int, units:str):
self.time = time
self._time = time
self._units = units
self.scale = TimeConverter.scale(units)
self._as_float = None

@property
def time(self):
return self._time

@time.setter
def time(self, set_to:int):
self._time = set_to
self._as_float = None
@property
def units(self):
return self._units
@units.setter
def units(self, set_to:str):
self._units = set_to
self.scale = TimeConverter.scale(set_to)
self._as_float = None
def __float__(self):
return self.time*self.scale
if self._as_float is None:
self._as_float = self.time*self.scale
return self._as_float

def __gt__(self, other):
if isinstance(other, (TimeValue, float)):
Expand Down
9 changes: 9 additions & 0 deletions src/ttboard/cocotb/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ def next(self):
all_clocks = sorted(Clock.all(), key=lambda x: float(x.half_period))
fastest_clock = all_clocks[0]
time_increment = fastest_clock.half_period
#print(f"Is now {SystemTime.current()}, running until {target_time}, increment is {time_increment}")
while SystemTime.current() < target_time:
#print("Advancing time")
SystemTime.advance(time_increment)
for clk in all_clocks:
#print("telling clocks")
clk.time_has_passed()
raise StopIteration

Expand All @@ -62,10 +65,16 @@ def __init__(self, time:int, units:str):

def run_timer(self):
all_clocks = sorted(Clock.all(), key=lambda x: float(x.half_period))
# print(f"All clocks on timer: {all_clocks}")
fastest_clock = all_clocks[0]
time_increment = fastest_clock.half_period
target_time = SystemTime.current() + self.time
increment_count = 0
while SystemTime.current() < target_time:
#if increment_count % 250 == 0:
# print(f"Systime: {SystemTime.current()} (target {target_time})")

#increment_count += 1
SystemTime.advance(time_increment)
for clk in all_clocks:
clk.time_has_passed()
Expand Down

0 comments on commit c729d6c

Please sign in to comment.