-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgoodwee_provider.py
52 lines (39 loc) · 1.5 KB
/
goodwee_provider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import asyncio
import json
import threading
import time
from dataclasses import asdict
import goodwe
from db.db_prov import InverterLog
async def __get_runtime_data():
ip_address = '192.168.1.18'
sensors = [
"ppv", # PV power (W)
"pbattery1", # battery power (W) + = charging, - = discharging
"battery_mode", # 1=standby, 2=discharge, 3=charge
"battery_soc", # battery state of charge (%)
"active_power", # grid power (W): - = buy, + = sell
"grid_in_out", # 1=sell or export, 2=buy or import
"house_consumption", # own consumption (W)
"e_day", # today's PV energy production (kWh)
"e_total", # total PV energy production (kWh)
"meter_e_total_exp", # total sold (exported) energy (kWh)
"meter_e_total_imp" # total bought or imported energy (kWh)
]
inverter = await goodwe.connect(ip_address)
runtime_data = await inverter.read_runtime_data()
return runtime_data
def get_runtime_data() -> dict:
asyncio.set_event_loop(asyncio.SelectorEventLoop())
loop = asyncio.get_event_loop()
result = loop.run_until_complete(__get_runtime_data())
return result
def get_runtime_data_as_dataclass()-> InverterLog:
data_as_dict = get_runtime_data()
data_as_dataclass = InverterLog(**data_as_dict)
return data_as_dataclass
if __name__ == '__main__':
while (True):
result = get_runtime_data_as_dataclass()
print(json.dumps(asdict(result), indent=4))
time.sleep(1)