-
Notifications
You must be signed in to change notification settings - Fork 52
/
Copy pathutils.py
83 lines (62 loc) · 2.88 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import time
import asyncio
import gmqtt
import logging
class Callbacks:
def __init__(self):
self.messages = []
self.publisheds = []
self.subscribeds = []
self.connack = None
self.disconnected = False
self.connected = False
def __str__(self):
return str(self.messages) + str(self.messagedicts) + str(self.publisheds) + \
str(self.subscribeds) + str(self.unsubscribeds) + str(self.disconnects)
def clear(self):
self.__init__()
def on_disconnect(self, client, packet):
logging.info('[DISCONNECTED {}]'.format(client._client_id))
self.disconnected = True
def on_message(self, client, topic, payload, qos, properties):
logging.info('[RECV MSG {}] TOPIC: {} PAYLOAD: {} QOS: {} PROPERTIES: {}'
.format(client._client_id, topic, payload, qos, properties))
self.messages.append((topic, payload, qos, properties))
def on_subscribe(self, client, mid, qos, properties):
logging.info('[SUBSCRIBED {}] QOS: {}, properties: {}'.format(client._client_id, qos, properties))
self.subscribeds.append(mid)
def on_connect(self, client, flags, rc, properties):
logging.info('[CONNECTED {}]'.format(client._client_id))
self.connected = True
self.connack = (flags, rc, properties)
def register_for_client(self, client):
client.on_disconnect = self.on_disconnect
client.on_message = self.on_message
client.on_connect = self.on_connect
client.on_subscribe = self.on_subscribe
async def clean_retained(host, port, username, password=None, prefix=None):
def on_message(client, topic, payload, qos, properties):
curclient.publish(topic, b"", qos=0, retain=True)
curclient = gmqtt.Client(prefix + "cleanretained", clean_session=True)
curclient.set_auth_credentials(username, password)
curclient.on_message = on_message
await curclient.connect(host=host, port=port)
topic = '#' if not prefix else prefix + '#'
curclient.subscribe(topic)
await asyncio.sleep(10) # wait for all retained messages to arrive
await curclient.disconnect()
time.sleep(.1)
async def cleanup(host, port=1883, username=None, password=None, client_ids=None, prefix=None):
# clean all client state
print("clean up starting")
client_ids = client_ids or (prefix + "myclientid", prefix + "myclientid2", prefix + "myclientid3")
for clientid in client_ids:
curclient = gmqtt.Client(clientid.encode("utf-8"), clean_session=True)
curclient.set_auth_credentials(username=username, password=password)
await curclient.connect(host=host, port=port)
time.sleep(.1)
await curclient.disconnect()
time.sleep(.1)
# clean retained messages
await clean_retained(host, port, username, password=password, prefix=prefix)
print("clean up finished")