diff --git a/lambdalib/cores/lcd/ssd1306.py b/lambdalib/cores/lcd/ssd1306.py index 71a6556..f2c82fe 100644 --- a/lambdalib/cores/lcd/ssd1306.py +++ b/lambdalib/cores/lcd/ssd1306.py @@ -36,6 +36,60 @@ SSD1306_SETCOMPINS = 0xDA SSD1306_SETVCOMDETECT = 0xDB + +class SSD1306_Wrapper(Elaboratable): + def __init__(self): + self.sink = stream.Endpoint([ + ("d_cn", 1), + ("data", 8), + ]) + self.source = stream.Endpoint([ + ("r_wn", 1), + ("data", 8), + ]) + + def elaborate(self, platform): + sink = self.sink + source = self.source + + m = Module() + + m.d.comb += source.r_wn.eq(0) # Write only + + with m.FSM(): + # Send the I2C address and control byte + with m.State("ADDR"): + m.d.comb += [ + source.data .eq(SSD1306_I2C_ADDRESS << 1), + source.valid.eq(sink.valid), + ] + with m.If(source.valid & source.ready): + m.next = "CONTROL" + + with m.State("CONTROL"): + m.d.comb += [ + # Control byte: + # Command: 0x00: Co = 0, D/C# = 0 + # Data: 0x40: Co = 0, D/C# = 1 + source.data .eq(Mux(sink.d_cn, 0x40, 0x00)), + source.valid.eq(sink.valid), + ] + with m.If(source.valid & source.ready): + m.next = "DATA" + + with m.State("DATA"): + m.d.comb += [ + source.data. eq(sink.data), + source.valid.eq(sink.valid), + source.last .eq(sink.last), + sink .ready.eq(source.ready), + ] + with m.If(source.valid & source.ready & source.last): + m.next = "ADDR" + + return m + + class SSD1306(Elaboratable): """ Driver for SSD1306 based LCD screen. @@ -48,19 +102,14 @@ class SSD1306(Elaboratable): The screen width in pixels. height : int The screen height in pixels. - burst_len : int - Specify the maximum amount of framebuffer bytes that can be - sent at a time before closing the I2C transaction. - 0 means unlimited. por_init : bool When True, the screen is automatically initialized upon power on reset. when False, the user need to assert `reset` for one clock cycle. """ - def __init__(self, width, height, burst_len=0, por_init=True): + def __init__(self, width, height, por_init=True): self.width = width self.height = height - self.burst_len = burst_len self.por_init = por_init # Table from https://github.com/rm-hull/luma.oled/blob/main/luma/oled/device/__init__.py @@ -80,9 +129,6 @@ def __init__(self, width, height, burst_len=0, por_init=True): self._colstart = settings["colstart"] self._colend = self._colstart + width - if self.burst_len == 0: - self.burst_len = self._size - self.reset = Signal() self.ready = Signal() @@ -96,16 +142,6 @@ def __init__(self, width, height, burst_len=0, por_init=True): ]) self.error = Signal() - def cmds_to_mem(self, cmds): - mem = [] - - for cmd in cmds: - mem.append(SSD1306_I2C_ADDRESS << 1) # Write - mem.append(0x00) # Co = 0, D/C# = 0 - mem.append(cmd) - - return mem - def elaborate(self, platform): sink = self.sink source = self.source @@ -140,10 +176,7 @@ def elaborate(self, platform): SSD1306_NORMALDISPLAY, SSD1306_DISPLAYON, ] - blob = self.cmds_to_mem(cmds) - - m.submodules.init = init = \ - LastInserter(3)(MemoryStreamReader(8, blob)) + m.submodules.init = init = MemoryStreamReader(8, cmds) # Recipe for sending a framebuffer cmds = [ @@ -154,12 +187,13 @@ def elaborate(self, platform): 0, # Page start address. (0 = reset) self._pages - 1, # Page end address. ] - blob = self.cmds_to_mem(cmds) + m.submodules.display = display = MemoryStreamReader(8, cmds) - m.submodules.display = display = \ - LastInserter(3)(MemoryStreamReader(8, blob)) + # Instanciate the I2C address and control byte wrapper + m.submodules.wrapper = wrapper = SSD1306_Wrapper() + m.d.comb += wrapper.source.connect(source) - cnt = Signal(range(self.burst_len + 2)) + cnt = Signal(range(self._size)) with m.FSM(): with m.State("UNKNOWN"): @@ -178,10 +212,11 @@ def elaborate(self, platform): # Send the appropriate sequence to power on # and initialize the display. m.d.comb += [ - init.source.connect(source), - source.r_wn.eq(0), # Write only + init.source.connect(wrapper.sink, exclude={"last"}), + wrapper.sink.d_cn.eq(0), # Commands + wrapper.sink.last.eq(1), ] - with m.If(init.done & ~source.valid): + with m.If(init.done & ~init.source.valid): m.next = "DISPLAY" with m.State("DISPLAY"): @@ -190,56 +225,44 @@ def elaborate(self, platform): # Send the appropriate sequence to prepare # for a frame buffer write. - with m.Elif(sink.valid): # ~sink.ready + with m.Elif(~self.ready | sink.valid): m.d.comb += [ - display.source.connect(source), - source.r_wn.eq(0), # Write only + display.source.connect(wrapper.sink, exclude={"last"}), + wrapper.sink.d_cn.eq(0), # Commands + wrapper.sink.last.eq(1), ] - with m.If(display.done & ~source.valid): + with m.If(display.done & ~display.source.valid): # On the first time after initialization # we want to clear the frame buffer to make # sure we do not display crap. - # with m.If(~self.ready): - # m.next = "CLEAR" - # with m.Else(): - m.next = "FRAMEBUFFER" + with m.If(~self.ready): + m.next = "CLEAR" + with m.Else(): + m.next = "FRAMEBUFFER" - with m.State("FRAMEBUFFER"): + with m.State("CLEAR"): m.d.comb += [ - source.r_wn.eq(0), - source.last.eq((cnt == self.burst_len+2-1) | sink.last), + wrapper.sink.valid.eq(1), + wrapper.sink.data .eq(0), # Black pixels + wrapper.sink.last .eq(cnt == self._size-1), + wrapper.sink.d_cn .eq(1), # Framebuffer data ] - - # Send the I2C address and control byte, - # then send the frame buffer data up to burst length. - with m.If(cnt == 0): - m.d.comb += [ - source.data .eq(SSD1306_I2C_ADDRESS << 1), - source.valid.eq(1), - ] - with m.Elif(cnt == 1): - m.d.comb += [ - source.data .eq(0x40), # Control byte: Co = 0, D/C# = 1 - source.valid.eq(1), - ] - with m.Else(): - m.d.comb += [ - source.data .eq(sink.data), - source.valid.eq(sink.valid), - sink .ready.eq(source.ready), - ] - - # End of burst detection - # Reset the counter and stay in this state - # to send the next burst, or go back to the - # DISPLAY state when the end of the framebuffer is reached. - with m.If(source.valid & source.ready): - with m.If(~source.last): + with m.If(wrapper.sink.ready): + with m.If(~wrapper.sink.last): m.d.sync += cnt.eq(cnt + 1) with m.Else(): m.d.sync += cnt.eq(0) - with m.If(sink.last): - m.d.comb += display.rewind.eq(1) - m.next = "DISPLAY" + m.d.sync += self.ready.eq(1) + m.d.comb += display.rewind.eq(1) + m.next = "DISPLAY" + + with m.State("FRAMEBUFFER"): + m.d.comb += [ + sink.connect(wrapper.sink), + wrapper.sink.d_cn.eq(1), + ] + with m.If(sink.valid & sink.ready & sink.last): + m.d.comb += display.rewind.eq(1) + m.next = "DISPLAY" return m