diff --git a/src/pyduke_energy/realtime.py b/src/pyduke_energy/realtime.py index f0e72f7..a16af51 100644 --- a/src/pyduke_energy/realtime.py +++ b/src/pyduke_energy/realtime.py @@ -132,7 +132,7 @@ def on_discon(self, _client: mqtt.Client, _userdata, conn_res): self.disconnected.set_result(conn_res) else: _LOGGER.warning("Unexpected MQTT disconnect. Attempting reconnect.") - self.async_mqtt_client_reconnect() + self.mqtt_client.reconnect() @staticmethod def on_msg(msg): @@ -213,7 +213,7 @@ async def connect_and_subscribe(self): self.mqtt_client.tls_set_context(ssl.create_default_context()) mqtt_conn = MqttConnHelper(self.loop, self.mqtt_client) - self.async_mqtt_client_connect() + await self.async_mqtt_client_connect() try: while not mqtt_conn.misc.cancelled(): @@ -282,7 +282,7 @@ async def _reconnect(self): self.mqtt_client.username_pw_set( self.mqtt_auth["user"], password=self.mqtt_auth["pass"] ) - self.async_mqtt_client_connect() + await self.async_mqtt_client_connect() async def async_mqtt_client_connect(self): """Run connect() in an async safe manner to avoid blocking.""" @@ -298,12 +298,6 @@ async def async_mqtt_client_connect(self): ), ) - async def async_mqtt_client_reconnect(self): - """Run reconnect() in an async safe manner to avoid blocking.""" - # Run reconnect() within an executor thread, since it blocks on socket - # connection for up to `keepalive` seconds: https://git.io/Jt5Yc - await self.loop.run_in_executor(None, self.mqtt_client.reconnect) - class MqttConnHelper: """Helper for asyncio mqtt."""