This repository has been archived by the owner on Jul 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathexample_realtime.py
81 lines (64 loc) · 2.46 KB
/
example_realtime.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""Example of realtime power measurement."""
# pylint: skip-file
import asyncio
import getpass
import os
import logging
import aiohttp
import jsonpickle
import sys
from pyduke_energy.client import DukeEnergyClient
from pyduke_energy.errors import DukeEnergyError
from pyduke_energy.realtime import DukeEnergyRealtime
PYDUKEENERGY_TEST_EMAIL = "PYDUKEENERGY_TEST_EMAIL"
PYDUKEENERGY_TEST_PASS = "PYDUKEENERGY_TEST_PASS"
_LOGGER = logging.getLogger(__name__)
class MyDukeRT(DukeEnergyRealtime):
"""My instance of DukeEnergyRealtime."""
def on_message(self, msg):
"""On Message callback.
Parameters
----------
msg : MQTTMessage
This is a class with members topic, payload, qos, retain
"""
try:
measurement = self.msg_to_usage_measurement(msg)
_LOGGER.debug(
"Received:\n%s",
jsonpickle.encode(measurement, indent=2, unpicklable=False),
)
except (ValueError, TypeError):
_LOGGER.warning("unexpected msg: %s", msg.payload.decode("utf8"))
async def main() -> None:
"""Duke Energy Realtime data demo."""
logging.basicConfig(
format="%(asctime)s.%(msecs)03d %(levelname)s {%(module)s} [%(funcName)s] %(message)s",
datefmt="%Y-%m-%d,%H:%M:%S",
level=logging.DEBUG,
)
# Pull email/password into environment variables
email = os.environ.get(PYDUKEENERGY_TEST_EMAIL)
password = os.environ.get(PYDUKEENERGY_TEST_PASS)
if email is None or password is None:
print(
"Enter your email and password in environment variables. To avoid typing them in, you can put them into environment variables {PYDUKEENERGY_TEST_EMAIL} and {PYDUKEENERGY_TEST_PASS}."
)
email = input("Email: ")
password = getpass.getpass("Password: ")
try:
async with aiohttp.ClientSession() as client:
duke_energy = DukeEnergyClient(email, password, client)
duke_rt = MyDukeRT(duke_energy)
await duke_rt.select_default_meter()
await duke_rt.connect_and_subscribe_forever()
except DukeEnergyError as err:
print(err)
if __name__ == "__main__":
# ensure selector event loop is started in windows
if sys.platform == "win32":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
try:
asyncio.run(main(), debug=True)
except KeyboardInterrupt:
_LOGGER.debug("keyboard interrupt")