Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #7 from mjmeli/asyncio-changes
Browse files Browse the repository at this point in the history
skip unexpected errors and treat as a debug log instead of a warning
  • Loading branch information
mjmeli authored Nov 18, 2021
2 parents d5e1003 + a62524f commit 5f287ed
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions src/pyduke_energy/realtime.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
"""Client for connecting to Duke Energy realtime stream."""

import asyncio
import functools
import json
import logging
import ssl
import time
from typing import Optional

import paho.mqtt.client as mqtt
import paho.mqtt.client as mqtt # type: ignore

from pyduke_energy.client import DukeEnergyClient
from pyduke_energy.const import (
Expand Down Expand Up @@ -155,14 +156,16 @@ def _on_msg(self, _client: mqtt.Client, _userdata, msg):
msg : MQTTMessage
This is a class with members topic, payload, qos, retain
"""
if not self.rx_msg:
if not self.rx_msg or self.rx_msg.done():
msg_if_decoded = None
try:
msg_if_decoded = msg.payload.decode("utf8")
except Exception as ex:
msg_if_decoded = f"Could not decode message: {ex}"
_LOGGER.warning(
"Unexpected message: %s (decoded = %s)", msg, msg_if_decoded
_LOGGER.debug(
"Unexpected message, just skipping for now: %s (decoded = %s)",
msg,
msg_if_decoded,
)
else:
self.rx_msg.set_result((msg.payload.decode("utf8")))
Expand Down Expand Up @@ -209,7 +212,19 @@ async def connect_and_subscribe(self):
self.mqtt_client.tls_set_context(ssl.create_default_context())

mqtt_conn = MqttConnHelper(self.loop, self.mqtt_client)
self.mqtt_client.connect(MQTT_HOST, port=MQTT_PORT, keepalive=MQTT_KEEPALIVE)

# Run connect() within an executor thread, since it blocks on socket
# connection for up to `keepalive` seconds: https://git.io/Jt5Yc
await self.loop.run_in_executor(
None,
functools.partial(
self.mqtt_client.connect,
MQTT_HOST,
port=MQTT_PORT,
keepalive=MQTT_KEEPALIVE,
),
)

try:
while not mqtt_conn.misc.cancelled():
if time.perf_counter() - self.tstart > FASTPOLL_TIMEOUT:
Expand Down Expand Up @@ -302,13 +317,21 @@ def call_bk():
client.loop_read()

self.loop.add_reader(sock, call_bk)
self.misc = self.loop.create_task(self.misc_loop())

# paho-mqtt calls this function from the executor thread on which we've called
# `self._client.connect()`, so we create a callback function to schedule
# `_misc_loop()` and run it on the loop thread-safely.
def create_task_cb() -> None:
self.misc = self.loop.create_task(self.misc_loop())

self.loop.call_soon_threadsafe(create_task_cb)

def on_socket_close(self, _client: mqtt.Client, _userdata, sock):
"""Socket close callback."""
_LOGGER.debug("Socket closed")
self.loop.remove_reader(sock)
self.misc.cancel()
if self.misc is not None:
self.misc.cancel()

def on_socket_register_write(self, client: mqtt.Client, _userdata, sock):
"""Socket write reg callback."""
Expand Down

0 comments on commit 5f287ed

Please sign in to comment.