diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index b34507f..6b97f56 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -4,7 +4,6 @@ name: Tests on: push: - branches: [main] pull_request: schedule: - cron: "0 0 * * *" diff --git a/examples/example_realtime.py b/examples/example_realtime.py index b42330a..1391608 100644 --- a/examples/example_realtime.py +++ b/examples/example_realtime.py @@ -24,7 +24,7 @@ class MyDukeRT(DukeEnergyRealtime): """My instance of DukeEnergyRealtime.""" - def on_msg(self, msg): + def on_message(self, msg): """On Message callback. Parameters @@ -66,7 +66,7 @@ async def main() -> None: duke_rt = MyDukeRT(duke_energy) await duke_rt.select_default_meter() - await duke_rt.connect_and_subscribe() + await duke_rt.connect_and_subscribe_forever() except DukeEnergyError as err: print(err) diff --git a/examples/example_realtime_kafka.py b/examples/example_realtime_kafka.py index d379afd..0296d98 100644 --- a/examples/example_realtime_kafka.py +++ b/examples/example_realtime_kafka.py @@ -14,7 +14,7 @@ from kafka.errors import KafkaTimeoutError from pyduke_energy.client import DukeEnergyClient -from pyduke_energy.const import FASTPOLL_TIMEOUT +from pyduke_energy.const import FASTPOLL_TIMEOUT_SEC from pyduke_energy.errors import DukeEnergyError from pyduke_energy.realtime import DukeEnergyRealtime @@ -39,7 +39,7 @@ def close_kafka(self): self.kfk.close() _LOGGER.debug("Disconnected from Kafka") - def on_msg(self, msg): + def on_message(self, msg): """On Message callback. Parameters @@ -85,14 +85,14 @@ async def main() -> None: duke_rt = MyDukeRT(duke_energy) duke_rt.connect_kafka("smartmeter") await duke_rt.select_default_meter() - await duke_rt.connect_and_subscribe() + await duke_rt.connect_and_subscribe_forever() except DukeEnergyError as err: # attempt sleep and retry _LOGGER.warning("Error: %s\nAttempt sleep and retry.", err) await duke_rt.mqtt_client.unsubscribe() duke_rt.close_kafka() - time.sleep(FASTPOLL_TIMEOUT) + time.sleep(FASTPOLL_TIMEOUT_SEC) finally: duke_rt.close_kafka() diff --git a/setup.cfg b/setup.cfg index 435a2ca..3edae70 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = pyduke-energy -version = 0.0.15 +version = 1.0.0 author = Michael Meli author_email = mjmeli94@gmail.com description = Python Wrapper for unofficial Duke Energy REST API diff --git a/src/pyduke_energy/client.py b/src/pyduke_energy/client.py index 913adbc..cf374fa 100644 --- a/src/pyduke_energy/client.py +++ b/src/pyduke_energy/client.py @@ -75,8 +75,8 @@ def __init__(self): self.id_token: Optional[str] = None self.mqtt_username: Optional[str] = None self.mqtt_password: Optional[str] = None - self.mqtt_clientid: Optional[str] = None - self.mqtt_clientid_error: Optional[str] = None + self.mqtt_client_id: Optional[str] = None + self.mqtt_client_id_error: Optional[str] = None self.gateway: Optional[str] = None @@ -372,8 +372,8 @@ async def _gateway_login(self) -> None: self._gateway_auth_info.id_token = resp.get("id_token") self._gateway_auth_info.mqtt_username = resp.get("mqtt_username") self._gateway_auth_info.mqtt_password = resp.get("mqtt_password") - self._gateway_auth_info.mqtt_clientid = resp.get("mqtt_clientId") - self._gateway_auth_info.mqtt_clientid_error = resp.get("mqtt_clientId_error") + self._gateway_auth_info.mqtt_client_id = resp.get("mqtt_clientId") + self._gateway_auth_info.mqtt_client_id_error = resp.get("mqtt_clientId_error") self._gateway_auth_info.gateway = resp.get("gateway") async def _get_gateway_auth_headers(self) -> dict: @@ -392,7 +392,7 @@ async def _get_mqtt_auth(self) -> dict: if self._gateway_auth_info.needs_new_access_token(): await self._gateway_login() return { - "clientid": self._gateway_auth_info.mqtt_clientid, + "clientid": self._gateway_auth_info.mqtt_client_id, "user": self._gateway_auth_info.mqtt_username, "pass": self._gateway_auth_info.mqtt_password, "gateway": self._gateway_auth_info.gateway, @@ -433,7 +433,7 @@ async def _async_request( ) from timeout_err except ClientError as client_err: raise RequestError( - f"Request failed with unexpected error [{full_url}]: {client_err}" + f"Request failed with unexpected error [{full_url}]: ({client_err.__class__.__name__}) {client_err}" ) from client_err finally: if not use_running_session: diff --git a/src/pyduke_energy/const.py b/src/pyduke_energy/const.py index 37a07f0..faddd8c 100644 --- a/src/pyduke_energy/const.py +++ b/src/pyduke_energy/const.py @@ -12,12 +12,27 @@ GW_STATUS_ENDPOINT = "gw/gateways/status" GW_USAGE_ENDPOINT = "smartmeter/usageByHour" +DEFAULT_TIMEOUT = 10 # seconds + MQTT_HOST = "app-core1.de-iot.io" MQTT_PORT = 443 MQTT_ENDPOINT = "/app-mqtt" MQTT_KEEPALIVE = 50 # Seconds, it appears the server will disconnect after 60s idle -FASTPOLL_TIMEOUT = 900 - 3 # seconds -FASTPOLL_RETRY = 60 # if no messages after this time, retry fastpoll request -FASTPOLL_RETRY_COUNT = 3 -DEFAULT_TIMEOUT = 10 # seconds +# in seconds, how long to until the fastpoll request has timed out and a new one needs to be made +FASTPOLL_TIMEOUT_SEC = 900 - 3 # seconds + +# in seconds, how long to wait for a message before retrying fastpoll or reconnecting +MESSAGE_TIMEOUT_SEC = 60 + +# number of times a message timeout can occur before just reconnecting +MESSAGE_TIMEOUT_RETRY_COUNT = 3 + +# in minutes, minimum amount of time to wait before retrying connection on forever loop +FOREVER_RETRY_MIN_MINUTES = 1 + +# in minutes, maximum amount of time to wait before trying connection on forever loop +FOREVER_RETRY_MAX_MINUTES = 60 + +# in seconds, how long to wait for a connection before timing out +CONNECT_TIMEOUT_SEC = 60 diff --git a/src/pyduke_energy/errors.py b/src/pyduke_energy/errors.py index 73c7d54..3404e3e 100644 --- a/src/pyduke_energy/errors.py +++ b/src/pyduke_energy/errors.py @@ -1,5 +1,9 @@ """Error Types.""" +from typing import Any, Union + +import paho.mqtt.client as mqtt + class DukeEnergyError(Exception): """Base error.""" @@ -11,3 +15,24 @@ class RequestError(DukeEnergyError): class InputError(DukeEnergyError): """Error for input issues.""" + + +class MqttError(DukeEnergyError): + """Error for issues relating to the MQTT connection.""" + + +class MqttCodeError(MqttError): + """Error for MQTT issues associated with an error code.""" + + def __init__(self, operation: str, code: Union[int, mqtt.ReasonCodes], *args: Any): + super().__init__(*args) + self.operation = operation + self.code = code + + def __str__(self) -> str: + """Output a formatted string for the error code.""" + if isinstance(self.code, mqtt.ReasonCodes): + return f"{self.operation}: ({self.code.value}) {str(self.code)}" + if isinstance(self.code, int): + return f"{self.operation}: ({self.code}) {mqtt.error_string(self.code)}" + return f"{self.operation}: ({self.code}) {super().__str__()}" diff --git a/src/pyduke_energy/realtime.py b/src/pyduke_energy/realtime.py index a9e8277..b929018 100644 --- a/src/pyduke_energy/realtime.py +++ b/src/pyduke_energy/realtime.py @@ -4,23 +4,27 @@ import functools import json import logging +import socket import ssl import time -from typing import Optional +from typing import Any, Dict, Optional import paho.mqtt.client as mqtt from pyduke_energy.client import DukeEnergyClient from pyduke_energy.const import ( - FASTPOLL_RETRY, - FASTPOLL_RETRY_COUNT, - FASTPOLL_TIMEOUT, + CONNECT_TIMEOUT_SEC, + FASTPOLL_TIMEOUT_SEC, + FOREVER_RETRY_MAX_MINUTES, + FOREVER_RETRY_MIN_MINUTES, + MESSAGE_TIMEOUT_RETRY_COUNT, + MESSAGE_TIMEOUT_SEC, MQTT_ENDPOINT, MQTT_HOST, MQTT_KEEPALIVE, MQTT_PORT, ) -from pyduke_energy.errors import RequestError +from pyduke_energy.errors import MqttCodeError, MqttError, RequestError from pyduke_energy.types import RealtimeUsageMeasurement _LOGGER = logging.getLogger(__name__) @@ -30,19 +34,23 @@ class DukeEnergyRealtime: """Duke Energy Realtime Client.""" def __init__(self, duke_energy: DukeEnergyClient): - self.duke_energy = duke_energy - self.loop = asyncio.get_event_loop() - self.disconnected = None - self.disconnecting = False - self.rx_msg = None - self.mqtt_client = None - self.topicid = None - self.tstart = -FASTPOLL_TIMEOUT # ensure fastpoll is requested on first run - self.retrycount = 0 - self.mqtt_auth = None - self.headers = None - - def on_conn(self, client: mqtt.Client, _userdata, _flags, conn_res): + self._duke_energy = duke_energy + self._loop = asyncio.get_event_loop() + self._disconnected: Optional["asyncio.Future[int]"] = None + self._disconnecting: bool = False + self._connected: Optional["asyncio.Future[int]"] = None + self._rx_msg: Optional["asyncio.Future[int]"] = None + self._t_start: int = 0 + self._msg_retry_count: int = 0 + self._forever_retry_count: int = 0 + self._mqtt_auth: dict = {} + self._headers: dict = {} + self._topic_id: str = None + self._mqtt_client: Optional[mqtt.Client] = None + + def on_connect( + self, client: mqtt.Client, _userdata: Any, _flags: Dict[str, int], conn_res: int + ): """On Connect callback. Parameters @@ -58,21 +66,37 @@ def on_conn(self, client: mqtt.Client, _userdata, _flags, conn_res): This will call the client.subscribe() method if the connection was successful. """ + # Return early if already connected. Sometimes this will be called multiple times. + if not self._connected or self._connected.done(): + return + if conn_res: _LOGGER.error( "MQTT connection error with result code: %s", mqtt.connack_string(conn_res), ) + self._connected.set_exception(MqttCodeError("Connect", conn_res)) else: _LOGGER.debug( "MQTT connected with result code: %s", mqtt.connack_string(conn_res) ) - res = client.subscribe(self.topicid, qos=0) - if not res: - _LOGGER.warning("Subscribe error: %s", mqtt.error_string(res)) + + # Automatically subscribe to the topic + sub_res, _ = client.subscribe(self._topic_id, qos=0) + if sub_res: + _LOGGER.error("Subscribe error: %s", mqtt.error_string(sub_res)) + self._connected.set_exception( + MqttCodeError( + "Subscribe", + sub_res, + "Could not subscribe to topic after connect", + ) + ) + else: + self._connected.set_result(conn_res) @staticmethod - def on_sub(_client: mqtt.Client, _userdata, mid, granted_qos): + def on_subscribe(_client: mqtt.Client, _userdata: Any, mid: int, granted_qos: int): """On Subscribe callback. Parameters @@ -88,7 +112,7 @@ def on_sub(_client: mqtt.Client, _userdata, mid, granted_qos): """ _LOGGER.debug("MQTT subscribed msg_id: %s qos: %s", str(mid), str(granted_qos)) - def on_unsub(self, client: mqtt.Client, _userdata, mid): + def on_unsubscribe(self, client: mqtt.Client, _userdata: Any, mid: int): """On Unubscribe callback. Parameters @@ -103,10 +127,10 @@ def on_unsub(self, client: mqtt.Client, _userdata, mid): This will call the client.disconnect() method """ _LOGGER.debug("MQTT unsubscribed msg_id: %s", str(mid)) - self.disconnecting = True + self._disconnecting = True client.disconnect() - def on_discon(self, _client: mqtt.Client, _userdata, conn_res): + def on_disconnect(self, _client: mqtt.Client, _userdata: Any, disconn_res: int): """On Disconnect callback. Parameters @@ -115,27 +139,31 @@ def on_discon(self, _client: mqtt.Client, _userdata, conn_res): The paho-mqtt client userdata user data passed by the client - conn_res + disconn_res Disconnect error code """ - if conn_res: + # Return early if already connected. Sometimes this will be called multiple times. + if not self._disconnected or self._disconnected.done(): + return + + if disconn_res: _LOGGER.error( "MQTT disconnect error, result code: %s (This may not be accurate)", - mqtt.error_string(conn_res), + mqtt.error_string(disconn_res), ) + self._disconnected.set_exception(MqttCodeError("Disconnect", disconn_res)) else: _LOGGER.debug( "MQTT disconnected with result code: %s", - mqtt.error_string(conn_res), + mqtt.error_string(disconn_res), ) - if not self.disconnecting: - _LOGGER.warning( - "Unexpected MQTT disconnect. Will attempt reconnect shortly." - ) - self.disconnected.set_result(conn_res) + self._disconnected.set_result(disconn_res) + + if not self._disconnecting: + _LOGGER.debug("Unexpected MQTT disconnect. Will attempt reconnect shortly.") @staticmethod - def on_msg(msg): + def on_message(msg: mqtt.MQTTMessage): """On Message Callback. Parameters @@ -145,7 +173,7 @@ def on_msg(msg): """ _LOGGER.debug("rx msg on %s\n%s", msg.topic, msg.payload.decode("utf8")) - def _on_msg(self, _client: mqtt.Client, _userdata, msg): + def _on_message(self, _client: mqtt.Client, _userdata: Any, msg: mqtt.MQTTMessage): """Private On Message callback. Parameters @@ -157,7 +185,7 @@ def _on_msg(self, _client: mqtt.Client, _userdata, msg): msg : MQTTMessage This is a class with members topic, payload, qos, retain """ - if not self.rx_msg or self.rx_msg.done(): + if not self._rx_msg or self._rx_msg.done(): msg_if_decoded = None try: msg_if_decoded = msg.payload.decode("utf8") @@ -169,11 +197,13 @@ def _on_msg(self, _client: mqtt.Client, _userdata, msg): msg_if_decoded, ) else: - self.rx_msg.set_result((msg.payload.decode("utf8"))) - self.on_msg(msg) + self._rx_msg.set_result((msg.payload.decode("utf8"))) + self.on_message(msg) @staticmethod - def msg_to_usage_measurement(msg) -> Optional[RealtimeUsageMeasurement]: + def msg_to_usage_measurement( + msg: mqtt.MQTTMessage, + ) -> Optional[RealtimeUsageMeasurement]: """Parse a raw message to the realtime usage measurement type.""" raw_json = msg.payload.decode("utf8") try: @@ -185,65 +215,101 @@ def msg_to_usage_measurement(msg) -> Optional[RealtimeUsageMeasurement]: async def select_default_meter(self): """Call select_default_meter method of duke_energy client.""" - await self.duke_energy.select_default_meter() - - async def connect_and_subscribe(self): - """Mqtt client connection.""" - self.disconnected = self.loop.create_future() - self.rx_msg = None + await self._duke_energy.select_default_meter() - self.mqtt_auth, self.headers = await self.duke_energy.get_mqtt_auth() - self.topicid = f'DESH/{self.mqtt_auth["gateway"]}/out/sm/1/live' + async def connect_and_subscribe_forever(self): + """MQTT client connection that runs indefinitely and restarts the connection upon any failure.""" + while True: + try: + await self.connect_and_subscribe() + except (MqttError, RequestError) as retry_err: + # Exponential backoff of retry interval, maxing out at FOREVER_RETRY_BASE_MAX_MINUTES + self._forever_retry_count += 1 + reconnect_interval = min( + FOREVER_RETRY_MIN_MINUTES * 2 ** (self._forever_retry_count - 1), + FOREVER_RETRY_MAX_MINUTES, + ) + _LOGGER.warning( + "Caught retryable error '%s' in forever loop. Will attempt reconnect in %d minute(s). Attempt #%d. Error: %s'", + retry_err.__class__.__name__, + reconnect_interval, + self._forever_retry_count, + retry_err, + ) + await asyncio.sleep(reconnect_interval * 60) # interval is in minutes + except Exception as error: + _LOGGER.error( + "Caught non-retryable error '%s' in forever loop. Will not attempt reconnect. Error: %s", + error.__class__.__name__, + error, + ) + raise - self.mqtt_client = mqtt.Client( - self.mqtt_auth["clientid"], + async def connect_and_subscribe(self): + """MQTT client connection.""" + # Reinitialize everything for a new connection + self._disconnected = self._loop.create_future() + self._disconnecting = False + self._connected = self._loop.create_future() + self._rx_msg = None + self._t_start = ( + -FASTPOLL_TIMEOUT_SEC + ) # ensure fastpoll is requested on first run + self._msg_retry_count = 0 + + self._mqtt_auth, self._headers = await self._duke_energy.get_mqtt_auth() + self._topic_id = f'DESH/{self._mqtt_auth["gateway"]}/out/sm/1/live' + + self._mqtt_client = mqtt.Client( + self._mqtt_auth["clientid"], transport="websockets", reconnect_on_failure=False, ) - self.mqtt_client.on_connect = self.on_conn - self.mqtt_client.on_subscribe = self.on_sub - self.mqtt_client.on_unsubscribe = self.on_unsub - self.mqtt_client.on_disconnect = self.on_discon - self.mqtt_client.on_message = self._on_msg - self.mqtt_client.enable_logger(logger=_LOGGER) - self.mqtt_client.ws_set_options(path=MQTT_ENDPOINT, headers=self.headers) - self.mqtt_client.username_pw_set( - self.mqtt_auth["user"], password=self.mqtt_auth["pass"] + self._mqtt_client.on_connect = self.on_connect + self._mqtt_client.on_subscribe = self.on_subscribe + self._mqtt_client.on_unsubscribe = self.on_unsubscribe + self._mqtt_client.on_disconnect = self.on_disconnect + self._mqtt_client.on_message = self._on_message + self._mqtt_client.enable_logger(logger=_LOGGER) + self._mqtt_client.ws_set_options(path=MQTT_ENDPOINT, headers=self._headers) + self._mqtt_client.username_pw_set( + self._mqtt_auth["user"], password=self._mqtt_auth["pass"] ) # create default ssl context to get SSLKEYLOGFILE env variable - self.mqtt_client.tls_set_context(ssl.create_default_context()) + self._mqtt_client.tls_set_context(ssl.create_default_context()) - mqtt_conn = MqttConnHelper(self.loop, self.mqtt_client) - await self.async_mqtt_client_connect() + mqtt_conn = MqttConnHelper(self._loop, self._mqtt_client) + await self._async_mqtt_client_connect() try: while not mqtt_conn.misc.cancelled(): - if time.perf_counter() - self.tstart > FASTPOLL_TIMEOUT: + if time.perf_counter() - self._t_start > FASTPOLL_TIMEOUT_SEC: # Request fastpoll await self._fastpoll_req() - self.rx_msg = self.loop.create_future() + self._rx_msg = self._loop.create_future() try: - await asyncio.wait_for(self.rx_msg, FASTPOLL_RETRY) - self.retrycount = 0 + await asyncio.wait_for(self._rx_msg, MESSAGE_TIMEOUT_SEC) + self._msg_retry_count = 0 + self._forever_retry_count = 0 except asyncio.TimeoutError: - self.retrycount += 1 - if self.disconnected.done(): + self._msg_retry_count += 1 + if self._disconnected.done(): _LOGGER.debug( "Unexpected disconnect detected, attemping reconnect" ) await self._reconnect() - elif self.retrycount > FASTPOLL_RETRY_COUNT: + elif self._msg_retry_count > MESSAGE_TIMEOUT_RETRY_COUNT: _LOGGER.debug("Multiple msg timeout, attempting reconnect") await self._reconnect() else: _LOGGER.debug("Message timeout, requesting fastpoll") await self._fastpoll_req() - self.rx_msg = None + self._rx_msg = None finally: - res = self.mqtt_client.unsubscribe(self.topicid) + res = self._mqtt_client.unsubscribe(self._topic_id) if not res: _LOGGER.warning("Unsubscribe error: %s", mqtt.error_string(res)) - await self.disconnected + await self._disconnected async def _fastpoll_req(self): """Request fastpoll, with auth check.""" @@ -251,73 +317,90 @@ async def _fastpoll_req(self): ( mqtt_auth_new, headers_new, - ) = await self.duke_energy.get_mqtt_auth() + ) = await self._duke_energy.get_mqtt_auth() except RequestError: _LOGGER.warning( "Error requesting smartmeter auth, will retry after 5 seconds." ) # Attempt clearing auth and try again. - self.duke_energy._gateway_auth_info.clear_access_token() # pylint: disable=W0212 - time.sleep(5) + self._duke_energy._gateway_auth_info.clear_access_token() # pylint: disable=W0212 + await asyncio.sleep(5) ( mqtt_auth_new, headers_new, - ) = await self.duke_energy.get_mqtt_auth() - if mqtt_auth_new != self.mqtt_auth or headers_new != self.headers: + ) = await self._duke_energy.get_mqtt_auth() + if mqtt_auth_new != self._mqtt_auth or headers_new != self._headers: _LOGGER.debug("mqtt auth or headers updated, reconnecting...") - self.mqtt_auth = mqtt_auth_new - self.headers = headers_new + self._mqtt_auth = mqtt_auth_new + self._headers = headers_new await self._reconnect() - self.tstart = await self.duke_energy.start_smartmeter_fastpoll() + self._t_start = await self._duke_energy.start_smartmeter_fastpoll() async def _reconnect(self): """Reconnect in case of updated auth or headers.""" # Unsub and disconnect first - res = self.mqtt_client.unsubscribe(self.topicid) + res = self._mqtt_client.unsubscribe(self._topic_id) if not res: _LOGGER.warning("Unsubscribe error: %s", mqtt.error_string(res)) - await self.disconnected - self.disconnected = self.loop.create_future() # re-create the future + await self._disconnected # Update mqtt_auth and header info - clientid = self.mqtt_auth["clientid"] - if isinstance(clientid, str): - clientid = clientid.encode("utf-8") - self.mqtt_client._client_id = clientid # pylint: disable=W0212 - self.mqtt_client.ws_set_options(path=MQTT_ENDPOINT, headers=self.headers) - self.mqtt_client.username_pw_set( - self.mqtt_auth["user"], password=self.mqtt_auth["pass"] + client_id = self._mqtt_auth["clientid"] + if isinstance(client_id, str): + client_id = client_id.encode("utf-8") + self._mqtt_client._client_id = client_id # pylint: disable=W0212 + self._mqtt_client.ws_set_options(path=MQTT_ENDPOINT, headers=self._headers) + self._mqtt_client.username_pw_set( + self._mqtt_auth["user"], password=self._mqtt_auth["pass"] ) - await self.async_mqtt_client_connect() + await self._async_mqtt_client_connect() - async def async_mqtt_client_connect(self): + async def _async_mqtt_client_connect(self): """Run connect() in an async safe manner to avoid blocking.""" + # Re-create the futures + if self._disconnected.done(): + self._disconnected = self._loop.create_future() + if self._connected.done(): + self._connected = self._loop.create_future() + # Run connect() within an executor thread, since it blocks on socket # connection for up to `keepalive` seconds: https://git.io/Jt5Yc - await self.loop.run_in_executor( - None, - functools.partial( - self.mqtt_client.connect, - MQTT_HOST, - port=MQTT_PORT, - keepalive=MQTT_KEEPALIVE, - ), - ) + try: + await self._loop.run_in_executor( + None, + functools.partial( + self._mqtt_client.connect, + MQTT_HOST, + port=MQTT_PORT, + keepalive=MQTT_KEEPALIVE, + ), + ) + except (socket.error, OSError, mqtt.WebsocketConnectionError) as error: + raise MqttError(f"Failure attempting MQTT connect: {error}") from error + + try: + await asyncio.wait_for(self._connected, CONNECT_TIMEOUT_SEC) + except asyncio.TimeoutError as to_err: + raise MqttError( + f"Connect operation timed out after {CONNECT_TIMEOUT_SEC} seconds" + ) from to_err class MqttConnHelper: """Helper for asyncio mqtt.""" def __init__(self, loop: asyncio.AbstractEventLoop, mqtt_client: mqtt.Client): - self.loop = loop - self.mqtt_client = mqtt_client - self.mqtt_client.on_socket_open = self.on_socket_open - self.mqtt_client.on_socket_close = self.on_socket_close - self.mqtt_client.on_socket_register_write = self.on_socket_register_write - self.mqtt_client.on_socket_unregister_write = self.on_socket_unregister_write + self._loop = loop + self._mqtt_client = mqtt_client + self._mqtt_client.on_socket_open = self.on_socket_open + self._mqtt_client.on_socket_close = self.on_socket_close + self._mqtt_client.on_socket_register_write = self.on_socket_register_write + self._mqtt_client.on_socket_unregister_write = self.on_socket_unregister_write self.misc = None - def on_socket_open(self, client: mqtt.Client, _userdata, sock): + def on_socket_open( + self, client: mqtt.Client, _userdata: Any, sock: mqtt.WebsocketWrapper + ): """Socket open callback.""" _LOGGER.debug("Socket opened") @@ -326,24 +409,28 @@ def call_bk(): _LOGGER.debug("Socket readable, calling loop_read()") client.loop_read() - self.loop.add_reader(sock, call_bk) + self._loop.add_reader(sock, call_bk) # paho-mqtt calls this function from the executor thread on which we've called # `self._client.connect()`, so we create a callback function to schedule # `_misc_loop()` and run it on the loop thread-safely. def create_task_cb() -> None: - self.misc = self.loop.create_task(self.misc_loop()) + self.misc = self._loop.create_task(self.misc_loop()) - self.loop.call_soon_threadsafe(create_task_cb) + self._loop.call_soon_threadsafe(create_task_cb) - def on_socket_close(self, _client: mqtt.Client, _userdata, sock): + def on_socket_close( + self, _client: mqtt.Client, _userdata: Any, sock: mqtt.WebsocketWrapper + ): """Socket close callback.""" _LOGGER.debug("Socket closed") - self.loop.remove_reader(sock) + self._loop.remove_reader(sock) if self.misc is not None: self.misc.cancel() - def on_socket_register_write(self, client: mqtt.Client, _userdata, sock): + def on_socket_register_write( + self, client: mqtt.Client, _userdata: Any, sock: mqtt.WebsocketWrapper + ): """Socket write reg callback.""" _LOGGER.debug("Watching socket for writability.") @@ -352,18 +439,20 @@ def call_bk(): _LOGGER.debug("Socket is writable, calling loop_write") client.loop_write() - self.loop.add_writer(sock, call_bk) + self._loop.add_writer(sock, call_bk) - def on_socket_unregister_write(self, _client: mqtt.Client, _userdata, sock): + def on_socket_unregister_write( + self, _client: mqtt.Client, _userdata: Any, sock: mqtt.WebsocketWrapper + ): """Socket unreg write callback.""" _LOGGER.debug("Stop watching socket for writability.") - self.loop.remove_writer(sock) + self._loop.remove_writer(sock) async def misc_loop(self): """Misc loop call.""" _LOGGER.debug("Misc loop started") - while self.mqtt_client.loop_misc() == mqtt.MQTT_ERR_SUCCESS: + while self._mqtt_client.loop_misc() == mqtt.MQTT_ERR_SUCCESS: try: await asyncio.sleep(1) except asyncio.CancelledError: diff --git a/tox.ini b/tox.ini index 76ceb08..f8bd177 100644 --- a/tox.ini +++ b/tox.ini @@ -18,12 +18,18 @@ commands = [testenv:lint] basepython = python3 +allowlist_externals = + black + flake8 + pylint + pydocstyle + isort commands = black . flake8 src/pyduke_energy tests - pylint src/pyduke_energy tests pydocstyle src/pyduke_energy tests isort src/pyduke_energy tests + pylint src/pyduke_energy tests [flake8] max-line-length = 88