diff --git a/adafruit_esp32spi/adafruit_esp32spi.py b/adafruit_esp32spi/adafruit_esp32spi.py index fae4cc6..2cd2d4e 100644 --- a/adafruit_esp32spi/adafruit_esp32spi.py +++ b/adafruit_esp32spi/adafruit_esp32spi.py @@ -194,7 +194,7 @@ def _wait_for_ready(self): print(".", end="") time.sleep(0.05) else: - raise RuntimeError("ESP32 not responding") + raise TimeoutError("ESP32 not responding") if self._debug >= 3: print() @@ -242,7 +242,7 @@ def _send_command(self, cmd, params=None, *, param_len_16=False): if self._ready.value: # ok ready to send! break else: - raise RuntimeError("ESP32 timed out on SPI select") + raise TimeoutError("ESP32 timed out on SPI select") spi.write( self._sendbuf, start=0, end=packet_len ) # pylint: disable=no-member @@ -271,17 +271,17 @@ def _wait_spi_char(self, spi, desired): for _ in range(10): r = self._read_byte(spi) if r == _ERR_CMD: - raise RuntimeError("Error response to command") + raise BrokenPipeError("Error response to command") if r == desired: return True time.sleep(0.01) - raise RuntimeError("Timed out waiting for SPI char") + raise TimeoutError("Timed out waiting for SPI char") def _check_data(self, spi, desired): """Read a byte and verify its the value we want""" r = self._read_byte(spi) if r != desired: - raise RuntimeError("Expected %02X but got %02X" % (desired, r)) + raise BrokenPipeError("Expected %02X but got %02X" % (desired, r)) def _wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False): """Wait for ready, then parse the response""" @@ -294,7 +294,7 @@ def _wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False): if self._ready.value: # ok ready to send! break else: - raise RuntimeError("ESP32 timed out on SPI select") + raise TimeoutError("ESP32 timed out on SPI select") self._wait_spi_char(spi, _START_CMD) self._check_data(spi, cmd | _REPLY_FLAG) @@ -377,7 +377,7 @@ def start_scan_networks(self): print("Start scan") resp = self._send_command_get_response(_START_SCAN_NETWORKS) if resp[0][0] != 1: - raise RuntimeError("Failed to start AP scan") + raise OSError("Failed to start AP scan") def get_scan_networks(self): """The results of the latest SSID scan. Returns a list of dictionaries with @@ -440,7 +440,7 @@ def set_dns_config(self, dns1, dns2): _SET_DNS_CONFIG, [b"\x00", self.unpretty_ip(dns1), self.unpretty_ip(dns2)] ) if resp[0][0] != 1: - raise RuntimeError("Failed to set dns with esp32") + raise OSError("Failed to set dns with esp32") def set_hostname(self, hostname): """Tells the ESP32 to set hostname for DHCP. @@ -449,49 +449,49 @@ def set_hostname(self, hostname): """ resp = self._send_command_get_response(_SET_HOSTNAME, [hostname.encode()]) if resp[0][0] != 1: - raise RuntimeError("Failed to set hostname with esp32") + raise OSError("Failed to set hostname with esp32") def wifi_set_network(self, ssid): """Tells the ESP32 to set the access point to the given ssid""" resp = self._send_command_get_response(_SET_NET_CMD, [ssid]) if resp[0][0] != 1: - raise RuntimeError("Failed to set network") + raise OSError("Failed to set network") def wifi_set_passphrase(self, ssid, passphrase): """Sets the desired access point ssid and passphrase""" resp = self._send_command_get_response(_SET_PASSPHRASE_CMD, [ssid, passphrase]) if resp[0][0] != 1: - raise RuntimeError("Failed to set passphrase") + raise OSError("Failed to set passphrase") def wifi_set_entidentity(self, ident): """Sets the WPA2 Enterprise anonymous identity""" resp = self._send_command_get_response(_SET_ENT_IDENT_CMD, [ident]) if resp[0][0] != 1: - raise RuntimeError("Failed to set enterprise anonymous identity") + raise OSError("Failed to set enterprise anonymous identity") def wifi_set_entusername(self, username): """Sets the desired WPA2 Enterprise username""" resp = self._send_command_get_response(_SET_ENT_UNAME_CMD, [username]) if resp[0][0] != 1: - raise RuntimeError("Failed to set enterprise username") + raise OSError("Failed to set enterprise username") def wifi_set_entpassword(self, password): """Sets the desired WPA2 Enterprise password""" resp = self._send_command_get_response(_SET_ENT_PASSWD_CMD, [password]) if resp[0][0] != 1: - raise RuntimeError("Failed to set enterprise password") + raise OSError("Failed to set enterprise password") def wifi_set_entenable(self): """Enables WPA2 Enterprise mode""" resp = self._send_command_get_response(_SET_ENT_ENABLE_CMD) if resp[0][0] != 1: - raise RuntimeError("Failed to enable enterprise mode") + raise OSError("Failed to enable enterprise mode") def _wifi_set_ap_network(self, ssid, channel): """Creates an Access point with SSID and Channel""" resp = self._send_command_get_response(_SET_AP_NET_CMD, [ssid, channel]) if resp[0][0] != 1: - raise RuntimeError("Failed to setup AP network") + raise OSError("Failed to setup AP network") def _wifi_set_ap_passphrase(self, ssid, passphrase, channel): """Creates an Access point with SSID, passphrase, and Channel""" @@ -499,7 +499,7 @@ def _wifi_set_ap_passphrase(self, ssid, passphrase, channel): _SET_AP_PASSPHRASE_CMD, [ssid, passphrase, channel] ) if resp[0][0] != 1: - raise RuntimeError("Failed to setup AP password") + raise OSError("Failed to setup AP password") @property def ssid(self): @@ -539,7 +539,7 @@ def is_connected(self): """Whether the ESP32 is connected to an access point""" try: return self.status == WL_CONNECTED - except RuntimeError: + except OSError: self.reset() return False @@ -548,7 +548,7 @@ def ap_listening(self): """Returns if the ESP32 is in access point mode and is listening for connections""" try: return self.status == WL_AP_LISTENING - except RuntimeError: + except OSError: self.reset() return False @@ -556,7 +556,7 @@ def disconnect(self): """Disconnect from the access point""" resp = self._send_command_get_response(_DISCONNECT_CMD) if resp[0][0] != 1: - raise RuntimeError("Failed to disconnect") + raise OSError("Failed to disconnect") def connect(self, secrets): """Connect to an access point using a secrets dictionary @@ -589,10 +589,10 @@ def connect_AP(self, ssid, password, timeout_s=10): # pylint: disable=invalid-n return stat time.sleep(0.05) if stat in (WL_CONNECT_FAILED, WL_CONNECTION_LOST, WL_DISCONNECTED): - raise RuntimeError("Failed to connect to ssid", ssid) + raise ConnectionError("Failed to connect to ssid", ssid) if stat == WL_NO_SSID_AVAIL: - raise RuntimeError("No such ssid", ssid) - raise RuntimeError("Unknown error 0x%02X" % stat) + raise ConnectionError("No such ssid", ssid) + raise OSError("Unknown error 0x%02X" % stat) def create_AP( self, ssid, password, channel=1, timeout=10 @@ -607,11 +607,11 @@ def create_AP( :param int timeout: number of seconds until we time out and fail to create AP """ if len(ssid) > 32: - raise RuntimeError("ssid must be no more than 32 characters") + raise ValueError("ssid must be no more than 32 characters") if password and (len(password) < 8 or len(password) > 64): - raise RuntimeError("password must be 8 - 63 characters") + raise ValueError("password must be 8 - 63 characters") if channel < 1 or channel > 14: - raise RuntimeError("channel must be between 1 and 14") + raise ValueError("channel must be between 1 and 14") if isinstance(channel, int): channel = bytes(channel) @@ -631,8 +631,8 @@ def create_AP( return stat time.sleep(0.05) if stat == WL_AP_FAILED: - raise RuntimeError("Failed to create AP", ssid) - raise RuntimeError("Unknown error 0x%02x" % stat) + raise ConnectionError("Failed to create AP", ssid) + raise OSError("Unknown error 0x%02x" % stat) def pretty_ip(self, ip): # pylint: disable=no-self-use, invalid-name """Converts a bytearray IP address to a dotted-quad string for printing""" @@ -652,7 +652,7 @@ def get_host_by_name(self, hostname): hostname = bytes(hostname, "utf-8") resp = self._send_command_get_response(_REQ_HOST_BY_NAME_CMD, (hostname,)) if resp[0][0] != 1: - raise RuntimeError("Failed to request hostname") + raise ConnectionError("Failed to request hostname") resp = self._send_command_get_response(_GET_HOST_BY_NAME_CMD) return resp[0] @@ -708,7 +708,7 @@ def socket_open(self, socket_num, dest, port, conn_mode=TCP_MODE): (dest, port_param, self._socknum_ll[0], (conn_mode,)), ) if resp[0][0] != 1: - raise RuntimeError("Could not connect to remote server") + raise ConnectionError("Could not connect to remote server") if conn_mode == ESP_SPIcontrol.TLS_MODE: self._tls_socket = socket_num @@ -751,24 +751,24 @@ def socket_write(self, socket_num, buffer, conn_mode=TCP_MODE): if conn_mode == self.UDP_MODE: # UDP verifies chunks on write, not bytes if sent != total_chunks: - raise RuntimeError( + raise ConnectionError( "Failed to write %d chunks (sent %d)" % (total_chunks, sent) ) # UDP needs to finalize with this command, does the actual sending resp = self._send_command_get_response(_SEND_UDP_DATA_CMD, self._socknum_ll) if resp[0][0] != 1: - raise RuntimeError("Failed to send UDP data") + raise ConnectionError("Failed to send UDP data") return if sent != len(buffer): self.socket_close(socket_num) - raise RuntimeError( + raise ConnectionError( "Failed to send %d bytes (sent %d)" % (len(buffer), sent) ) resp = self._send_command_get_response(_DATA_SENT_TCP_CMD, self._socknum_ll) if resp[0][0] != 1: - raise RuntimeError("Failed to verify data sent") + raise ConnectionError("Failed to verify data sent") def socket_available(self, socket_num): """Determine how many bytes are waiting to be read on the socket""" @@ -815,7 +815,7 @@ def socket_connect(self, socket_num, dest, port, conn_mode=TCP_MODE): if self.socket_connected(socket_num): return True time.sleep(0.01) - raise RuntimeError("Failed to establish connection") + raise TimeoutError("Failed to establish connection") def socket_close(self, socket_num): """Close a socket using the ESP32's internal reference number""" @@ -824,7 +824,7 @@ def socket_close(self, socket_num): self._socknum_ll[0][0] = socket_num try: self._send_command_get_response(_STOP_CLIENT_TCP_CMD, self._socknum_ll) - except RuntimeError: + except OSError: pass if socket_num == self._tls_socket: self._tls_socket = None @@ -842,7 +842,7 @@ def start_server( resp = self._send_command_get_response(_START_SERVER_TCP_CMD, params) if resp[0][0] != 1: - raise RuntimeError("Could not start server") + raise OSError("Could not start server") def server_state(self, socket_num): """Get the state of the ESP32's internal reference server socket number""" @@ -863,7 +863,7 @@ def set_esp_debug(self, enabled): written to the ESP32's UART.""" resp = self._send_command_get_response(_SET_DEBUG_CMD, ((bool(enabled),),)) if resp[0][0] != 1: - raise RuntimeError("Failed to set debug mode") + raise OSError("Failed to set debug mode") def set_pin_mode(self, pin, mode): """Set the io mode for a GPIO pin. @@ -879,7 +879,7 @@ def set_pin_mode(self, pin, mode): pin_mode = mode resp = self._send_command_get_response(_SET_PIN_MODE_CMD, ((pin,), (pin_mode,))) if resp[0][0] != 1: - raise RuntimeError("Failed to set pin mode") + raise OSError("Failed to set pin mode") def set_digital_write(self, pin, value): """Set the digital output value of pin. @@ -891,7 +891,7 @@ def set_digital_write(self, pin, value): _SET_DIGITAL_WRITE_CMD, ((pin,), (value,)) ) if resp[0][0] != 1: - raise RuntimeError("Failed to write to pin") + raise OSError("Failed to write to pin") def set_analog_write(self, pin, analog_value): """Set the analog output value of pin, using PWM. @@ -904,7 +904,7 @@ def set_analog_write(self, pin, analog_value): _SET_ANALOG_WRITE_CMD, ((pin,), (value,)) ) if resp[0][0] != 1: - raise RuntimeError("Failed to write to pin") + raise OSError("Failed to write to pin") def set_digital_read(self, pin): """Get the digital input value of pin. Returns the boolean value of the pin. @@ -920,7 +920,7 @@ def set_digital_read(self, pin): return False if resp[0] == 1: return True - raise ValueError( + raise OSError( "_SET_DIGITAL_READ response error: response is not boolean", resp[0] ) @@ -950,13 +950,13 @@ def get_time(self): resp = self._send_command_get_response(_GET_TIME) resp_time = struct.unpack(" 0 and time.monotonic() - stamp > self._timeout: self.close() # Make sure to close socket so that we don't exhaust sockets. - raise RuntimeError("Didn't receive full response, failing out") + raise OSError("Didn't receive full response, failing out") firstline, self._buffer = self._buffer.split(eol, 1) gc.collect() return firstline diff --git a/adafruit_esp32spi/adafruit_esp32spi_wifimanager.py b/adafruit_esp32spi/adafruit_esp32spi_wifimanager.py index 739f7a9..0070b94 100755 --- a/adafruit_esp32spi/adafruit_esp32spi_wifimanager.py +++ b/adafruit_esp32spi/adafruit_esp32spi_wifimanager.py @@ -144,7 +144,7 @@ def connect_normal(self): self.esp.connect_AP(bytes(ssid, "utf-8"), bytes(password, "utf-8")) failure_count = 0 self.pixel_status((0, 100, 0)) - except (ValueError, RuntimeError) as error: + except OSError as error: print("Failed to connect, retrying\n", error) failure_count += 1 if failure_count >= self.attempts: @@ -173,7 +173,7 @@ def create_ap(self): self.esp.create_AP(bytes(self.ssid, "utf-8"), None) failure_count = 0 self.pixel_status((0, 100, 0)) - except (ValueError, RuntimeError) as error: + except OSError as error: print("Failed to create access point\n", error) failure_count += 1 if failure_count >= self.attempts: @@ -203,7 +203,7 @@ def connect_enterprise(self): failure_count = 0 self.pixel_status((0, 100, 0)) sleep(1) - except (ValueError, RuntimeError) as error: + except OSError as error: print("Failed to connect, retrying\n", error) failure_count += 1 if failure_count >= self.attempts: diff --git a/adafruit_esp32spi/digitalio.py b/adafruit_esp32spi/digitalio.py index c935794..3e4cc15 100755 --- a/adafruit_esp32spi/digitalio.py +++ b/adafruit_esp32spi/digitalio.py @@ -61,7 +61,7 @@ def init(self, mode=IN): self._mode = self.OUT self._esp.set_pin_mode(self.pin_id, 1) else: - raise RuntimeError("Invalid mode defined") + raise ValueError("Invalid mode defined") def value(self, val=None): """Sets ESP32 Pin GPIO output mode. @@ -76,7 +76,7 @@ def value(self, val=None): self._value = val self._esp.set_digital_write(self.pin_id, 1) else: - raise RuntimeError("Invalid value for pin") + raise ValueError("Invalid value for pin") else: raise NotImplementedError( "digitalRead not currently implemented in esp32spi" diff --git a/examples/esp32spi_aio_post.py b/examples/esp32spi_aio_post.py index b74e4e3..ef24db7 100644 --- a/examples/esp32spi_aio_post.py +++ b/examples/esp32spi_aio_post.py @@ -66,7 +66,7 @@ response.close() counter = counter + 1 print("OK") - except (ValueError, RuntimeError) as e: + except OSError as e: print("Failed to get data, retrying\n", e) wifi.reset() continue diff --git a/examples/esp32spi_cheerlights.py b/examples/esp32spi_cheerlights.py index 46d75af..7ded593 100644 --- a/examples/esp32spi_cheerlights.py +++ b/examples/esp32spi_cheerlights.py @@ -54,7 +54,7 @@ value = value[key] print(value) response.close() - except (ValueError, RuntimeError) as e: + except OSError as e: print("Failed to get data, retrying\n", e) wifi.reset() continue diff --git a/examples/esp32spi_ipconfig.py b/examples/esp32spi_ipconfig.py index 01f01c8..5f67044 100644 --- a/examples/esp32spi_ipconfig.py +++ b/examples/esp32spi_ipconfig.py @@ -49,7 +49,7 @@ while not esp.is_connected: try: esp.connect_AP(secrets["ssid"], secrets["password"]) - except RuntimeError as e: + except OSError as e: print("could not connect to AP, retrying: ", e) continue print("Connected to", str(esp.ssid, "utf-8"), "\tRSSI:", esp.rssi) diff --git a/examples/esp32spi_localtime.py b/examples/esp32spi_localtime.py index 1c27304..01dd93b 100644 --- a/examples/esp32spi_localtime.py +++ b/examples/esp32spi_localtime.py @@ -42,7 +42,7 @@ print("Fetching json from", TIME_API) response = wifi.get(TIME_API) break - except (ValueError, RuntimeError) as e: + except OSError as e: print("Failed to get data, retrying\n", e) continue diff --git a/examples/esp32spi_simpletest.py b/examples/esp32spi_simpletest.py index af8c2f1..7e01ad6 100644 --- a/examples/esp32spi_simpletest.py +++ b/examples/esp32spi_simpletest.py @@ -59,7 +59,7 @@ while not esp.is_connected: try: esp.connect_AP(secrets["ssid"], secrets["password"]) - except RuntimeError as e: + except OSError as e: print("could not connect to AP, retrying: ", e) continue print("Connected to", str(esp.ssid, "utf-8"), "\tRSSI:", esp.rssi) diff --git a/examples/esp32spi_simpletest_rp2040.py b/examples/esp32spi_simpletest_rp2040.py index eb52592..decd13e 100644 --- a/examples/esp32spi_simpletest_rp2040.py +++ b/examples/esp32spi_simpletest_rp2040.py @@ -42,7 +42,7 @@ while not esp.is_connected: try: esp.connect_AP(secrets["ssid"], secrets["password"]) - except RuntimeError as e: + except OSError as e: print("could not connect to AP, retrying: ", e) continue print("Connected to", str(esp.ssid, "utf-8"), "\tRSSI:", esp.rssi) diff --git a/examples/esp32spi_wpa2ent_aio_post.py b/examples/esp32spi_wpa2ent_aio_post.py index b6bdd99..227bf91 100644 --- a/examples/esp32spi_wpa2ent_aio_post.py +++ b/examples/esp32spi_wpa2ent_aio_post.py @@ -63,7 +63,7 @@ response.close() counter = counter + 1 print("OK") - except (ValueError, RuntimeError) as e: + except OSError as e: print("Failed to get data, retrying\n", e) wifi.reset() continue diff --git a/examples/gpio/esp32spi_gpio.py b/examples/gpio/esp32spi_gpio.py index a9c857d..6d7723b 100644 --- a/examples/gpio/esp32spi_gpio.py +++ b/examples/gpio/esp32spi_gpio.py @@ -94,7 +94,7 @@ def esp_init_pin_modes(din, dout): esp_init_pin_modes(ESP_D_R_PIN, ESP_D_W_PIN) esp_d_r_val = esp.set_digital_read(ESP_D_R_PIN) print("--> ESP read:", esp_d_r_val) - except (RuntimeError, AssertionError) as e: + except OSError as e: print("ESP32 Error", e) esp_reset_all() @@ -104,7 +104,7 @@ def esp_init_pin_modes(din, dout): esp_init_pin_modes(ESP_D_R_PIN, ESP_D_W_PIN) esp.set_digital_write(ESP_D_W_PIN, esp_d_w_val) print("ESP wrote:", esp_d_w_val, "--> Red LED") - except (RuntimeError) as e: + except OSError as e: print("ESP32 Error", e) esp_reset_all() @@ -121,7 +121,7 @@ def esp_init_pin_modes(din, dout): "v)", sep="", ) - except (RuntimeError, AssertionError) as e: + except OSError as e: print("ESP32 Error", e) esp_reset_all() @@ -166,7 +166,7 @@ def esp_init_pin_modes(din, dout): sep="", ) - except (RuntimeError) as e: + except OSError as e: print("ESP32 Error", e) esp_reset_all() diff --git a/examples/server/esp32spi_wsgiserver.py b/examples/server/esp32spi_wsgiserver.py index d9f0979..0220d2f 100755 --- a/examples/server/esp32spi_wsgiserver.py +++ b/examples/server/esp32spi_wsgiserver.py @@ -213,7 +213,7 @@ def led_color(environ): # pylint: disable=unused-argument static ) ) -except (OSError) as e: +except OSError as e: raise RuntimeError( """ This example depends on a static asset directory. @@ -240,7 +240,7 @@ def led_color(environ): # pylint: disable=unused-argument try: wsgiServer.update_poll() # Could do any other background tasks here, like reading sensors - except (ValueError, RuntimeError) as e: + except OSError as e: print("Failed to update server, restarting ESP32\n", e) wifi.reset() continue