-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththe_home_gateway.py
118 lines (100 loc) · 3.56 KB
/
the_home_gateway.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/python3
#
# Remote Controlled Home Light
#
# MIT License
# Copyright (c) 2019 SrdjanStankovic
import os
import sys
import time
import logging
from typing import Optional
from typing import Tuple
from typing import Union
module_path = os.sep + ".." + os.sep + ".." + os.sep
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + module_path)
import wolk # noqa
logging.basicConfig(level=logging.DEBUG)
from sonoff_control import SWITCH1_REF
from sonoff_control import SWITCH2_REF
from sonoff_control import SWITCH1_ADD
from sonoff_control import SWITCH2_ADD
from sonoff_control import sonoff_switch
VERSION = 1.0
def main():
"""
Demonstrate all functionality of wolk module.
Create actuation handler and actuator status provider
for switch and slider actuators.
Pass all of these to a WolkConnect class
and start a loop to send different types of random readings.
"""
# Insert the device credentials received
# from WolkAbout IoT Platform when creating the device
# List actuator references included on your device
actuator_references = [SWITCH1_REF, SWITCH2_REF]
device = wolk.Device(
key="some-key",
password="some-password",
actuator_references=actuator_references,
)
class Actuator:
def __init__(
self, inital_value: Optional[Union[bool, int, float, str]]
):
self.value = inital_value
switch1 = Actuator(False)
switch2 = Actuator(False)
# Provide a way to read actuator status if your device has actuators
def actuator_status_provider(
reference: str,
) -> Tuple[wolk.State, Optional[Union[bool, int, float, str]]]:
if reference == actuator_references[0]:
return wolk.State.READY, switch1.value
elif reference == actuator_references[1]:
return wolk.State.READY, switch2.value
return wolk.State.ERROR, None
# Provide an actuation handler if your device has actuators
def actuation_handler(
reference: str, value: Union[bool, int, float, str]
) -> None:
logging.info(f"Setting actuator '{reference}' to value: {value}")
if reference == actuator_references[0]:
if sonoff_switch(SWITCH1_ADD, value):
switch1.value = value
else:
# Set switch in inactive state
switch1.value = 0
elif reference == actuator_references[1]:
if sonoff_switch(SWITCH2_ADD, value):
switch2.value = value
else:
# Set switch in inactive state
switch2.value = 0
# Pass device and optionally connection details
# Provided connection details are the default value
# Provide actuation handler and actuator status provider via with_actuators
wolk_device = (
wolk.WolkConnect(
device=device,
host="api-demo.wolkabout.com",
port=8883,
ca_cert="utility/ca.crt",
)
.with_actuators(
actuation_handler=actuation_handler,
actuator_status_provider=actuator_status_provider,
)
)
# Establish a connection to the WolkAbout IoT Platform
logging.info("Connecting to WolkAbout IoT Platform")
try:
wolk_device.connect()
except RuntimeError as e:
logging.error(str(e))
sys.exit(1)
# Successfully connecting to the platform will publish actuator status
wolk_device.publish_actuator_status(SWITCH1_REF)
wolk_device.publish_actuator_status(SWITCH2_REF)
if __name__ == "__main__":
main()