diff --git a/nmigen_soc/csr/__init__.py b/nmigen_soc/csr/__init__.py index 3b2d416..9c70f5f 100644 --- a/nmigen_soc/csr/__init__.py +++ b/nmigen_soc/csr/__init__.py @@ -1 +1,2 @@ from .bus import * +from .periph import * diff --git a/nmigen_soc/csr/periph.py b/nmigen_soc/csr/periph.py new file mode 100644 index 0000000..c404085 --- /dev/null +++ b/nmigen_soc/csr/periph.py @@ -0,0 +1,328 @@ +from nmigen import * +from nmigen import tracer + +from .bus import * + + +__all__ = ["Peripheral", "IRQLine", "Event", "PeripheralBridge"] + + +class Peripheral: + """CSR-capable peripheral. + + A helper class to reduce the boilerplate needed to control a peripheral with a CSR interface. + It provides facilities for instantiating CSR registers and sending interrupt requests to the + CPU. + + The ``Peripheral`` class is not meant to be instantiated as-is, but rather as a base class for + actual peripherals. + + Usage example + ------------- + + ``` + class ExamplePeripheral(csr.Peripheral, Elaboratable): + def __init__(self): + super().__init__() + self._data = self.csr(8, "w") + self._rdy = self.event(mode="rise") + + self._bridge = self.csr_bridge() + self.csr_bus = self._bridge.bus + self.irq = self._bridge.irq + + def elaborate(self, platform): + m = Module() + m.submodules.bridge = self._bridge + # ... + return m + ``` + + Parameters + ---------- + name : str + Name of this peripheral. If ``None`` (default) the name is inferred from the variable + name this peripheral is assigned to. + + Attributes + ---------- + name : str + Name of the peripheral. + """ + def __init__(self, name=None, src_loc_at=1): + if name is not None and not isinstance(name, str): + raise TypeError("Name must be a string, not {!r}".format(name)) + self.name = name or tracer.get_var_name(depth=2 + src_loc_at) + + self._csr_regs = [] + self._csr_bus = None + + self._events = [] + self._irq = None + + @property + def csr_bus(self): + """CSR bus providing access to registers. + + Return value + ------------ + An instance of :class:`Interface`. + + Exceptions + ---------- + Raises :exn:`NotImplementedError` if the peripheral does not have a CSR bus. + """ + if self._csr_bus is None: + raise NotImplementedError("Peripheral {!r} does not have a CSR bus interface" + .format(self)) + return self._csr_bus + + @csr_bus.setter + def csr_bus(self, csr_bus): + if not isinstance(csr_bus, Interface): + raise TypeError("CSR bus interface must be an instance of csr.Interface, not {!r}" + .format(csr_bus)) + self._csr_bus = csr_bus + + @property + def irq(self): + """Interrupt request line. + + Return value + ------------ + An instance of :class:`IRQLine`. + + Exceptions + ---------- + Raises :exn:`NotImplementedError` if the peripheral does not have an IRQ line. + """ + if self._irq is None: + raise NotImplementedError("Peripheral {!r} does not have an IRQ line" + .format(self)) + return self._irq + + @irq.setter + def irq(self, irq): + if not isinstance(irq, IRQLine): + raise TypeError("IRQ line must be an instance of IRQLine, not {!r}" + .format(irq)) + self._irq = irq + + def csr(self, width, access, *, addr=None, alignment=0, name=None, src_loc_at=0): + """Request a CSR register. + + Parameters + ---------- + width : int + Width of the register. See :class:`Element`. + access : :class:`Access` + Register access mode. See :class:`Element`. + addr : int + Address of the register. See :meth:`Multiplexer.add`. + alignment : int + Register alignment. See :class:`Multiplexer`. + name : str + Name of the register. If ``None`` (default) the name is inferred from the variable + name this register is assigned to. + + Return value + ------------ + An instance of :class:`Element`. + """ + if name is not None and not isinstance(name, str): + raise TypeError("Name must be a string, not {!r}".format(name)) + elem_name = name or tracer.get_var_name(depth=2 + src_loc_at) + + elem = Element(width, access, name="{}_{}".format(self.name, elem_name)) + self._csr_regs.append((elem, addr, alignment)) + return elem + + def event(self, *, mode="level", name=None, src_loc_at=0): + """Request an event source. + + See :class:`Event` for details. + + Return value + ------------ + An instance of :class:`Event`. + """ + event = Event(mode=mode, name=name, src_loc_at=1 + src_loc_at) + self._events.append(event) + return event + + def csr_bridge(self, *, data_width=8, alignment=0): + """Request a bridge to the resources of the peripheral. + + See :class:`PeripheralBridge` for details. + + Return value + ------------ + An instance of :class:`PeripheralBridge` providing access to the registers + of the peripheral and managing its event sources. + """ + return PeripheralBridge(self, data_width=data_width, alignment=alignment) + + def csr_registers(self): + """Iterate requested CSR registers and their parameters. + + Yield values + ------------ + A tuple ``elem, addr, alignment`` describing the register and its parameters. + """ + for elem, addr, alignment in self._csr_regs: + yield elem, addr, alignment + + def events(self): + """Iterate requested event sources. + + Event sources are ordered by request order. + + Yield values + ------------ + An instance of :class:`Event`. + """ + for event in self._events: + yield event + + +class IRQLine(Signal): + """Interrupt request line.""" + def __init__(self, *, name=None, src_loc_at=0): + super().__init__(name=name, src_loc_at=1 + src_loc_at) + + +class Event: + """Event source. + + Parameters + ---------- + mode : ``"level"``, ``"rise"``, ``"fall"`` + Trigger mode. If ``"level"``, a notification is raised when the ``stb`` signal is high. + If ``"rise"`` (or ``"fall"``) a notification is raised on a rising (or falling) edge + of ``stb``. + name : str + Name of the event. If ``None`` (default) the name is inferred from the variable + name this event source is assigned to. + + Attributes + ---------- + name : str + Name of the event + mode : ``"level"``, ``"rise"``, ``"fall"`` + Trigger mode. + stb : Signal, in + Event strobe. + """ + def __init__(self, *, mode, name=None, src_loc_at=0): + if name is not None and not isinstance(name, str): + raise TypeError("Name must be a string, not {!r}".format(name)) + + choices = ("level", "rise", "fall") + if mode not in choices: + raise ValueError("Invalid trigger mode {!r}; must be one of {}" + .format(mode, ", ".join(choices))) + + self.name = name or tracer.get_var_name(depth=2 + src_loc_at) + self.mode = mode + self.stb = Signal(name="{}_stb".format(self.name)) + + +class PeripheralBridge(Elaboratable): + """Peripheral bridge. + + A bridge providing access to the registers of a peripheral, and support for interrupt + requests (IRQs) from its event sources. + + CSR registers + ------------- + ev_status : read-only + Event status. Each bit displays the value of the ``stb`` signal of an event source. + The register width is ``len(list(periph.events())`` bits. Event sources are ordered by + request order. + ev_pending : read/write + Event pending. Each bit displays whether an event source has a pending notification. + Writing 1 to a bit clears the notification. + The register width is ``len(list(periph.events())`` bits. Event sources are ordered by + request order. + ev_enable : read/write + Event enable. Writing 1 to a bit enables an event source. Writing 0 disables it. + The register width is ``len(list(periph.events())`` bits. Event sources are ordered by + request order. + + Parameters + ---------- + periph : :class:`Peripheral` + The peripheral whose resources are exposed by this bridge. + data_width : int + Data width of the CSR bus. See :class:`Multiplexer`. + alignment : int + Register alignment. See :class:`Multiplexer`. + + Attributes + ---------- + bus : :class:`Interface` + CSR bus providing access to the registers of the peripheral. + irq : :class:`IRQLine` or None + IRQ line providing notifications from local events to the CPU. It is raised if any + event source is both enabled and has a pending notification. If the peripheral has + no event sources, it is set to ``None``. + """ + def __init__(self, periph, *, data_width, alignment): + if not isinstance(periph, Peripheral): + raise TypeError("Peripheral must be an instance of Peripheral, not {!r}" + .format(periph)) + + self._mux = Multiplexer(addr_width=1, data_width=data_width, alignment=alignment) + for elem, elem_addr, elem_alignment in periph.csr_registers(): + self._mux.add(elem, addr=elem_addr, alignment=elem_alignment, extend=True) + + self._events = list(periph.events()) + if len(self._events) > 0: + width = len(self._events) + self._ev_status = Element(width, "r", name="{}_ev_status".format(periph.name)) + self._ev_pending = Element(width, "rw", name="{}_ev_pending".format(periph.name)) + self._ev_enable = Element(width, "rw", name="{}_ev_enable".format(periph.name)) + self._mux.add(self._ev_status, extend=True) + self._mux.add(self._ev_pending, extend=True) + self._mux.add(self._ev_enable, extend=True) + self.irq = IRQLine(name="{}_irq".format(periph.name)) + else: + self.irq = None + + self.bus = self._mux.bus + + def elaborate(self, platform): + m = Module() + + m.submodules.mux = self._mux + + if self.irq is not None: + with m.If(self._ev_pending.w_stb): + m.d.sync += self._ev_pending.r_data.eq( self._ev_pending.r_data + & ~self._ev_pending.w_data) + with m.If(self._ev_enable.w_stb): + m.d.sync += self._ev_enable.r_data.eq(self._ev_enable.w_data) + + for i, ev in enumerate(self._events): + m.d.sync += self._ev_status.r_data[i].eq(ev.stb) + + if ev.mode in ("rise", "fall"): + ev_stb_r = Signal.like(ev.stb, name_suffix="_r") + m.d.sync += ev_stb_r.eq(ev.stb) + + ev_trigger = Signal(name="{}_trigger".format(ev.name)) + if ev.mode == "level": + m.d.comb += ev_trigger.eq(ev.stb) + elif ev.mode == "rise": + m.d.comb += ev_trigger.eq(~ev_stb_r & ev.stb) + elif ev.mode == "fall": + m.d.comb += ev_trigger.eq( ev_stb_r & ~ev.stb) + else: + assert False # :nocov: + + with m.If(ev_trigger): + m.d.sync += self._ev_pending.r_data[i].eq(1) + + m.d.comb += self.irq.eq((self._ev_pending.r_data & self._ev_enable.r_data).any()) + + return m diff --git a/nmigen_soc/test/test_csr_periph.py b/nmigen_soc/test/test_csr_periph.py new file mode 100644 index 0000000..cfcef3f --- /dev/null +++ b/nmigen_soc/test/test_csr_periph.py @@ -0,0 +1,279 @@ +# nmigen: UnusedElaboratable=no + +import unittest +from nmigen import * +from nmigen.back.pysim import * + +from ..csr.bus import * +from ..csr.periph import * + + +def simulation_test(dut, process): + with Simulator(dut, vcd_file=open("test.vcd", "w")) as sim: + sim.add_clock(1e-6) + sim.add_sync_process(process) + sim.run() + + +class PeripheralTestCase(unittest.TestCase): + def test_periph_name(self): + class Wrapper(Peripheral): + def __init__(self): + super().__init__() + periph_0 = Wrapper() + periph_1 = Peripheral(name="periph_1") + self.assertEqual(periph_0.name, "periph_0") + self.assertEqual(periph_1.name, "periph_1") + + def test_periph_name_wrong(self): + with self.assertRaisesRegex(TypeError, + r"Name must be a string, not 2"): + periph = Peripheral(name=2) + + def test_set_csr_bus_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(TypeError, + r"CSR bus interface must be an instance of csr.Interface, not 'foo'"): + periph.csr_bus = "foo" + + def test_get_csr_bus_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(NotImplementedError, + r"Peripheral <.*> does not have a CSR bus interface"): + periph.csr_bus + + def test_set_irq_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(TypeError, + r"IRQ line must be an instance of IRQLine, not 'foo'"): + periph.irq = "foo" + + def test_get_irq_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(NotImplementedError, + r"Peripheral <.*> does not have an IRQ line"): + periph.irq + + def test_iter_csr_registers(self): + periph = Peripheral(src_loc_at=0) + csr_0 = periph.csr(1, "r") + csr_1 = periph.csr(8, "rw", addr=0x4, alignment=2) + self.assertEqual( + (csr_0.name, csr_0.width, csr_0.access), + ("periph_csr_0", 1, Element.Access.R) + ) + self.assertEqual( + (csr_1.name, csr_1.width, csr_1.access), + ("periph_csr_1", 8, Element.Access.RW) + ) + self.assertEqual(list(periph.csr_registers()), [ + (csr_0, None, 0), + (csr_1, 0x4, 2), + ]) + + def test_csr_name_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(TypeError, + r"Name must be a string, not 2"): + periph.csr(1, "r", name=2) + + def test_iter_events(self): + periph = Peripheral(src_loc_at=0) + ev_0 = periph.event() + ev_1 = periph.event(mode="rise") + self.assertEqual((ev_0.name, ev_0.mode), ("ev_0", "level")) + self.assertEqual((ev_1.name, ev_1.mode), ("ev_1", "rise")) + self.assertEqual(list(periph.events()), [ + ev_0, + ev_1, + ]) + + def test_event_name_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(TypeError, + r"Name must be a string, not 2"): + periph.event(name=2) + + def test_event_mode_wrong(self): + periph = Peripheral(src_loc_at=0) + with self.assertRaisesRegex(ValueError, + r"Invalid trigger mode 'foo'; must be one of level, rise, fall"): + periph.event(mode="foo") + + +class PeripheralBridgeTestCase(unittest.TestCase): + def test_periph_wrong(self): + with self.assertRaisesRegex(TypeError, + r"Peripheral must be an instance of Peripheral, not 'foo'"): + PeripheralBridge('foo', data_width=8, alignment=0) + + +class PeripheralSimulationTestCase(unittest.TestCase): + def test_csrs(self): + class TestPeripheral(Peripheral, Elaboratable): + def __init__(self): + super().__init__() + self.csr_0 = self.csr(8, "r") + self.csr_1 = self.csr(8, "r", addr=8, alignment=4) + self.csr_2 = self.csr(8, "w") + self._bridge = self.csr_bridge(data_width=8, alignment=0) + self.csr_bus = self._bridge.bus + + def elaborate(self, platform): + m = Module() + m.submodules.bridge = self._bridge + return m + + dut = TestPeripheral() + + def process(): + yield dut.csr_0.r_data.eq(0xa) + yield dut.csr_1.r_data.eq(0xb) + + yield dut.csr_bus.addr.eq(0) + yield dut.csr_bus.r_stb.eq(1) + yield + yield dut.csr_bus.r_stb.eq(0) + self.assertEqual((yield dut.csr_0.r_stb), 1) + yield + self.assertEqual((yield dut.csr_bus.r_data), 0xa) + + yield dut.csr_bus.addr.eq(8) + yield dut.csr_bus.r_stb.eq(1) + yield + yield dut.csr_bus.r_stb.eq(0) + self.assertEqual((yield dut.csr_1.r_stb), 1) + yield + self.assertEqual((yield dut.csr_bus.r_data), 0xb) + + yield dut.csr_bus.addr.eq(24) + yield dut.csr_bus.w_stb.eq(1) + yield dut.csr_bus.w_data.eq(0xc) + yield + yield dut.csr_bus.w_stb.eq(0) + yield + self.assertEqual((yield dut.csr_2.w_stb), 1) + self.assertEqual((yield dut.csr_2.w_data), 0xc) + + simulation_test(dut, process) + + def test_events(self): + class TestPeripheral(Peripheral, Elaboratable): + def __init__(self): + super().__init__() + self.ev_0 = self.event() + self.ev_1 = self.event(mode="rise") + self.ev_2 = self.event(mode="fall") + self._bridge = self.csr_bridge(data_width=8) + self.csr_bus = self._bridge.bus + self.irq = self._bridge.irq + + def elaborate(self, platform): + m = Module() + m.submodules.bridge = self._bridge + return m + + dut = TestPeripheral() + + ev_status_addr = 0x0 + ev_pending_addr = 0x1 + ev_enable_addr = 0x2 + + def process(): + yield dut.ev_0.stb.eq(1) + yield dut.ev_1.stb.eq(0) + yield dut.ev_2.stb.eq(1) + yield + self.assertEqual((yield dut.irq), 0) + + # read ev_status, check that ev_0 and ev_2 are active + yield dut.csr_bus.addr.eq(ev_status_addr) + yield dut.csr_bus.r_stb.eq(1) + yield + yield dut.csr_bus.r_stb.eq(0) + yield + self.assertEqual((yield dut.csr_bus.r_data), 0b101) + yield + + # enable all events, check that IRQ line goes high + yield dut.csr_bus.addr.eq(ev_enable_addr) + yield dut.csr_bus.w_data.eq(0b111) + yield dut.csr_bus.w_stb.eq(1) + yield + yield dut.csr_bus.w_stb.eq(0) + yield + yield + self.assertEqual((yield dut.irq), 1) + + # clear pending ev_0 + yield dut.csr_bus.addr.eq(ev_pending_addr) + yield dut.csr_bus.w_data.eq(0b001) + yield dut.csr_bus.w_stb.eq(1) + yield + yield dut.csr_bus.w_stb.eq(0) + yield + + # check that ev_0 is still pending, and IRQ line is still high + yield dut.csr_bus.addr.eq(ev_pending_addr) + yield dut.csr_bus.r_stb.eq(1) + yield + yield dut.csr_bus.r_stb.eq(0) + yield + self.assertEqual((yield dut.csr_bus.r_data), 0b001) + self.assertEqual((yield dut.irq), 1) + yield + + # deactivate ev_0, clear ev_pending, check that IRQ line goes low + yield dut.ev_0.stb.eq(0) + yield dut.csr_bus.addr.eq(ev_pending_addr) + yield dut.csr_bus.w_data.eq(0b001) + yield dut.csr_bus.w_stb.eq(1) + yield + yield dut.csr_bus.w_stb.eq(0) + yield + yield + self.assertEqual((yield dut.irq), 0) + + # activate ev_1, check that ev_1 is pending, and IRQ line goes high + yield dut.ev_1.stb.eq(1) + yield + yield dut.csr_bus.addr.eq(ev_pending_addr) + yield dut.csr_bus.r_stb.eq(1) + yield + yield dut.csr_bus.r_stb.eq(0) + yield + self.assertEqual((yield dut.csr_bus.r_data), 0b010) + self.assertEqual((yield dut.irq), 1) + + # clear ev_pending, check that IRQ line goes low + yield dut.csr_bus.addr.eq(ev_pending_addr) + yield dut.csr_bus.w_data.eq(0b010) + yield dut.csr_bus.w_stb.eq(1) + yield + yield dut.csr_bus.w_stb.eq(0) + yield + yield + self.assertEqual((yield dut.irq), 0) + + # deactivate ev_2, check that ev_2 is pending, and IRQ line goes high + yield dut.ev_2.stb.eq(0) + yield + yield dut.csr_bus.addr.eq(ev_pending_addr) + yield dut.csr_bus.r_stb.eq(1) + yield + yield dut.csr_bus.r_stb.eq(0) + yield + self.assertEqual((yield dut.csr_bus.r_data), 0b100) + self.assertEqual((yield dut.irq), 1) + + # clear ev_pending, check that IRQ line goes low + yield dut.csr_bus.addr.eq(ev_pending_addr) + yield dut.csr_bus.w_data.eq(0b100) + yield dut.csr_bus.w_stb.eq(1) + yield + yield dut.csr_bus.w_stb.eq(0) + yield + yield + self.assertEqual((yield dut.irq), 0) + + simulation_test(dut, process)