From d0dc2e86b152d6d2d268371f8ac04461e2b6fed9 Mon Sep 17 00:00:00 2001 From: Michael Meli Date: Fri, 3 Dec 2021 11:05:08 -0500 Subject: [PATCH] remove paho auto-reconnect and replace with our custom reconnect logic --- setup.cfg | 2 +- src/pyduke_energy/realtime.py | 28 +++++++++++++++++----------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/setup.cfg b/setup.cfg index 2be4232..8f2d07a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = pyduke-energy -version = 0.0.13 +version = 0.0.14 author = Michael Meli author_email = mjmeli94@gmail.com description = Python Wrapper for unofficial Duke Energy REST API diff --git a/src/pyduke_energy/realtime.py b/src/pyduke_energy/realtime.py index a16af51..cb59f62 100644 --- a/src/pyduke_energy/realtime.py +++ b/src/pyduke_energy/realtime.py @@ -128,11 +128,11 @@ def on_discon(self, _client: mqtt.Client, _userdata, conn_res): "MQTT disconnected with result code: %s", mqtt.error_string(conn_res), ) - if self.disconnecting: - self.disconnected.set_result(conn_res) - else: - _LOGGER.warning("Unexpected MQTT disconnect. Attempting reconnect.") - self.mqtt_client.reconnect() + if not self.disconnecting: + _LOGGER.warning( + "Unexpected MQTT disconnect. Will attempt reconnect shortly." + ) + self.disconnected.set_result(conn_res) @staticmethod def on_msg(msg): @@ -196,7 +196,9 @@ async def connect_and_subscribe(self): self.topicid = f'DESH/{self.mqtt_auth["gateway"]}/out/sm/1/live' self.mqtt_client = mqtt.Client( - self.mqtt_auth["clientid"], transport="websockets" + self.mqtt_auth["clientid"], + transport="websockets", + reconnect_on_failure=False, ) self.mqtt_client.on_connect = self.on_conn self.mqtt_client.on_subscribe = self.on_sub @@ -208,7 +210,6 @@ async def connect_and_subscribe(self): self.mqtt_client.username_pw_set( self.mqtt_auth["user"], password=self.mqtt_auth["pass"] ) - self.mqtt_client.reconnect_delay_set(3, 60) # create default ssl context to get SSLKEYLOGFILE env variable self.mqtt_client.tls_set_context(ssl.create_default_context()) @@ -226,12 +227,17 @@ async def connect_and_subscribe(self): self.retrycount = 0 except asyncio.TimeoutError: self.retrycount += 1 - if self.retrycount <= FASTPOLL_RETRY_COUNT: - _LOGGER.debug("Message timeout, requesting fastpoll") - await self._fastpoll_req() - else: + if self.disconnected.done(): + _LOGGER.debug( + "Unexpected disconnect detected, attemping reconnect" + ) + await self._reconnect() + elif self.retrycount > FASTPOLL_RETRY_COUNT: _LOGGER.debug("Multiple msg timeout, attempting reconnect") await self._reconnect() + else: + _LOGGER.debug("Message timeout, requesting fastpoll") + await self._fastpoll_req() self.rx_msg = None finally: res = self.mqtt_client.unsubscribe(self.topicid)