From 1630ea28fc4c0fc47c6759f0e3bf0bcdb130ca0c Mon Sep 17 00:00:00 2001 From: Michael Meli Date: Fri, 3 Dec 2021 09:22:06 -0500 Subject: [PATCH] ensure connects and reconnects are done async safe since they block --- setup.cfg | 2 +- src/pyduke_energy/realtime.py | 38 ++++++++++++++++++++++------------- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/setup.cfg b/setup.cfg index 5709a10..2be4232 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = pyduke-energy -version = 0.0.12 +version = 0.0.13 author = Michael Meli author_email = mjmeli94@gmail.com description = Python Wrapper for unofficial Duke Energy REST API diff --git a/src/pyduke_energy/realtime.py b/src/pyduke_energy/realtime.py index 52aca04..f0e72f7 100644 --- a/src/pyduke_energy/realtime.py +++ b/src/pyduke_energy/realtime.py @@ -131,7 +131,8 @@ def on_discon(self, _client: mqtt.Client, _userdata, conn_res): if self.disconnecting: self.disconnected.set_result(conn_res) else: - self.mqtt_client.reconnect() + _LOGGER.warning("Unexpected MQTT disconnect. Attempting reconnect.") + self.async_mqtt_client_reconnect() @staticmethod def on_msg(msg): @@ -212,18 +213,7 @@ async def connect_and_subscribe(self): self.mqtt_client.tls_set_context(ssl.create_default_context()) mqtt_conn = MqttConnHelper(self.loop, self.mqtt_client) - - # Run connect() within an executor thread, since it blocks on socket - # connection for up to `keepalive` seconds: https://git.io/Jt5Yc - await self.loop.run_in_executor( - None, - functools.partial( - self.mqtt_client.connect, - MQTT_HOST, - port=MQTT_PORT, - keepalive=MQTT_KEEPALIVE, - ), - ) + self.async_mqtt_client_connect() try: while not mqtt_conn.misc.cancelled(): @@ -292,7 +282,27 @@ async def _reconnect(self): self.mqtt_client.username_pw_set( self.mqtt_auth["user"], password=self.mqtt_auth["pass"] ) - self.mqtt_client.connect(MQTT_HOST, port=MQTT_PORT, keepalive=MQTT_KEEPALIVE) + self.async_mqtt_client_connect() + + async def async_mqtt_client_connect(self): + """Run connect() in an async safe manner to avoid blocking.""" + # Run connect() within an executor thread, since it blocks on socket + # connection for up to `keepalive` seconds: https://git.io/Jt5Yc + await self.loop.run_in_executor( + None, + functools.partial( + self.mqtt_client.connect, + MQTT_HOST, + port=MQTT_PORT, + keepalive=MQTT_KEEPALIVE, + ), + ) + + async def async_mqtt_client_reconnect(self): + """Run reconnect() in an async safe manner to avoid blocking.""" + # Run reconnect() within an executor thread, since it blocks on socket + # connection for up to `keepalive` seconds: https://git.io/Jt5Yc + await self.loop.run_in_executor(None, self.mqtt_client.reconnect) class MqttConnHelper: