Skip to content
This repository has been archived by the owner on Jul 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #14 from mjmeli/refactoring-and-indefinite-reconnect
Browse files Browse the repository at this point in the history
Refactoring and indefinite reconnect (v1.0.0)
  • Loading branch information
mjmeli authored Dec 18, 2021
2 parents f10bbbe + 1d65758 commit b83a4f9
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 137 deletions.
1 change: 0 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ name: Tests

on:
push:
branches: [main]
pull_request:
schedule:
- cron: "0 0 * * *"
Expand Down
4 changes: 2 additions & 2 deletions examples/example_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
class MyDukeRT(DukeEnergyRealtime):
"""My instance of DukeEnergyRealtime."""

def on_msg(self, msg):
def on_message(self, msg):
"""On Message callback.
Parameters
Expand Down Expand Up @@ -66,7 +66,7 @@ async def main() -> None:

duke_rt = MyDukeRT(duke_energy)
await duke_rt.select_default_meter()
await duke_rt.connect_and_subscribe()
await duke_rt.connect_and_subscribe_forever()
except DukeEnergyError as err:
print(err)

Expand Down
8 changes: 4 additions & 4 deletions examples/example_realtime_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from kafka.errors import KafkaTimeoutError

from pyduke_energy.client import DukeEnergyClient
from pyduke_energy.const import FASTPOLL_TIMEOUT
from pyduke_energy.const import FASTPOLL_TIMEOUT_SEC
from pyduke_energy.errors import DukeEnergyError
from pyduke_energy.realtime import DukeEnergyRealtime

Expand All @@ -39,7 +39,7 @@ def close_kafka(self):
self.kfk.close()
_LOGGER.debug("Disconnected from Kafka")

def on_msg(self, msg):
def on_message(self, msg):
"""On Message callback.
Parameters
Expand Down Expand Up @@ -85,14 +85,14 @@ async def main() -> None:
duke_rt = MyDukeRT(duke_energy)
duke_rt.connect_kafka("smartmeter")
await duke_rt.select_default_meter()
await duke_rt.connect_and_subscribe()
await duke_rt.connect_and_subscribe_forever()
except DukeEnergyError as err:
# attempt sleep and retry
_LOGGER.warning("Error: %s\nAttempt sleep and retry.", err)
await duke_rt.mqtt_client.unsubscribe()
duke_rt.close_kafka()

time.sleep(FASTPOLL_TIMEOUT)
time.sleep(FASTPOLL_TIMEOUT_SEC)
finally:
duke_rt.close_kafka()

Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = pyduke-energy
version = 0.0.15
version = 1.0.0
author = Michael Meli
author_email = [email protected]
description = Python Wrapper for unofficial Duke Energy REST API
Expand Down
12 changes: 6 additions & 6 deletions src/pyduke_energy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ def __init__(self):
self.id_token: Optional[str] = None
self.mqtt_username: Optional[str] = None
self.mqtt_password: Optional[str] = None
self.mqtt_clientid: Optional[str] = None
self.mqtt_clientid_error: Optional[str] = None
self.mqtt_client_id: Optional[str] = None
self.mqtt_client_id_error: Optional[str] = None
self.gateway: Optional[str] = None


Expand Down Expand Up @@ -372,8 +372,8 @@ async def _gateway_login(self) -> None:
self._gateway_auth_info.id_token = resp.get("id_token")
self._gateway_auth_info.mqtt_username = resp.get("mqtt_username")
self._gateway_auth_info.mqtt_password = resp.get("mqtt_password")
self._gateway_auth_info.mqtt_clientid = resp.get("mqtt_clientId")
self._gateway_auth_info.mqtt_clientid_error = resp.get("mqtt_clientId_error")
self._gateway_auth_info.mqtt_client_id = resp.get("mqtt_clientId")
self._gateway_auth_info.mqtt_client_id_error = resp.get("mqtt_clientId_error")
self._gateway_auth_info.gateway = resp.get("gateway")

async def _get_gateway_auth_headers(self) -> dict:
Expand All @@ -392,7 +392,7 @@ async def _get_mqtt_auth(self) -> dict:
if self._gateway_auth_info.needs_new_access_token():
await self._gateway_login()
return {
"clientid": self._gateway_auth_info.mqtt_clientid,
"clientid": self._gateway_auth_info.mqtt_client_id,
"user": self._gateway_auth_info.mqtt_username,
"pass": self._gateway_auth_info.mqtt_password,
"gateway": self._gateway_auth_info.gateway,
Expand Down Expand Up @@ -433,7 +433,7 @@ async def _async_request(
) from timeout_err
except ClientError as client_err:
raise RequestError(
f"Request failed with unexpected error [{full_url}]: {client_err}"
f"Request failed with unexpected error [{full_url}]: ({client_err.__class__.__name__}) {client_err}"
) from client_err
finally:
if not use_running_session:
Expand Down
23 changes: 19 additions & 4 deletions src/pyduke_energy/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,27 @@
GW_STATUS_ENDPOINT = "gw/gateways/status"
GW_USAGE_ENDPOINT = "smartmeter/usageByHour"

DEFAULT_TIMEOUT = 10 # seconds

MQTT_HOST = "app-core1.de-iot.io"
MQTT_PORT = 443
MQTT_ENDPOINT = "/app-mqtt"
MQTT_KEEPALIVE = 50 # Seconds, it appears the server will disconnect after 60s idle
FASTPOLL_TIMEOUT = 900 - 3 # seconds
FASTPOLL_RETRY = 60 # if no messages after this time, retry fastpoll request
FASTPOLL_RETRY_COUNT = 3

DEFAULT_TIMEOUT = 10 # seconds
# in seconds, how long to until the fastpoll request has timed out and a new one needs to be made
FASTPOLL_TIMEOUT_SEC = 900 - 3 # seconds

# in seconds, how long to wait for a message before retrying fastpoll or reconnecting
MESSAGE_TIMEOUT_SEC = 60

# number of times a message timeout can occur before just reconnecting
MESSAGE_TIMEOUT_RETRY_COUNT = 3

# in minutes, minimum amount of time to wait before retrying connection on forever loop
FOREVER_RETRY_MIN_MINUTES = 1

# in minutes, maximum amount of time to wait before trying connection on forever loop
FOREVER_RETRY_MAX_MINUTES = 60

# in seconds, how long to wait for a connection before timing out
CONNECT_TIMEOUT_SEC = 60
25 changes: 25 additions & 0 deletions src/pyduke_energy/errors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
"""Error Types."""

from typing import Any, Union

import paho.mqtt.client as mqtt


class DukeEnergyError(Exception):
"""Base error."""
Expand All @@ -11,3 +15,24 @@ class RequestError(DukeEnergyError):

class InputError(DukeEnergyError):
"""Error for input issues."""


class MqttError(DukeEnergyError):
"""Error for issues relating to the MQTT connection."""


class MqttCodeError(MqttError):
"""Error for MQTT issues associated with an error code."""

def __init__(self, operation: str, code: Union[int, mqtt.ReasonCodes], *args: Any):
super().__init__(*args)
self.operation = operation
self.code = code

def __str__(self) -> str:
"""Output a formatted string for the error code."""
if isinstance(self.code, mqtt.ReasonCodes):
return f"{self.operation}: ({self.code.value}) {str(self.code)}"
if isinstance(self.code, int):
return f"{self.operation}: ({self.code}) {mqtt.error_string(self.code)}"
return f"{self.operation}: ({self.code}) {super().__str__()}"
Loading

0 comments on commit b83a4f9

Please sign in to comment.