forked from boris-arzur/bromine
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.py
104 lines (79 loc) · 2.99 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import bromine
import sys
from twisted.names import client, dns
from twisted.internet import reactor
from twisted.internet.task import LoopingCall
from twisted.internet.protocol import Factory, Protocol
from twisted.internet.endpoints import TCP4ServerEndpoint
# https://twistedmatrix.com/documents/13.1.0/core/howto/servers.html
TESTING = False
if TESTING:
SLOW = 1e-2
FAST = SLOW
REQS = 2
else:
SLOW = 1 # dont pump too fast when not busy
FAST = 1e-2
REQS = 5 # when busy, we may need help
class SocketInDns(Protocol):
def __init__(self):
if TESTING:
self.resolver = client.Resolver(servers=[('127.0.0.1', 5553)])
else:
# self.resolver = client.Resolver('/etc/resolv.conf')
self.resolver = client.Resolver(servers=[('8.8.8.8', 53)])
# data going out
self.score_board = bromine.Scoreboard()
self.last_ack = bromine.INVALID_MID
# data coming in
self.systems = bromine.Systems()
# keep track of callbacks
self.requested = 0 # looping call not withstanding
LoopingCall(self.pump).start(SLOW)
def dataReceived(self, data):
self.score_board.push_data(data)
reactor.callLater(FAST, self.pump)
def clientConnectionLost(self, connector, reason):
print('connection lost:', reason.getErrorMessage())
sys.exit(0)
def empty(self):
return all(l.type_ == bromine.TYPE_ACK for l in self.score_board.backlog.values())
def pump(self):
self.requested -= 1
if self.requested > REQS:
return
last_seen = self.systems.last_seen_remote_mid
if last_seen % bromine.CONFIG['ackperiod'] == 0 and self.last_ack != last_seen:
self.last_ack = last_seen
self.score_board.push_ack(last_seen)
else:
self.score_board.last_seen_remote_mid = last_seen
for d in self.systems.data:
self.transport.write(d)
for ack in self.systems.acks:
remote_last_seen_remote_mid = ack[0]
self.score_board.retire(remote_last_seen_remote_mid)
self.systems.commit()
host = self.score_board.transmit()
query = dns.Query(host, dns.CNAME, dns.IN)
task = self.resolver.queryUDP([query], [20 * SLOW])
task.addCallback(self.ok_)
task.addErrback(self.error_)
def ok_(self, reply):
for a in reply.answers:
cname = a.payload.name.name
self.systems.add(cname)
if not self.empty() or len(reply.answers) > 1:
reactor.callLater(SLOW, self.pump)
self.requested += 1
def error_(self, failure):
reactor.callLater(FAST, self.pump)
self.requested += 1
class ClientFactory(Factory):
def buildProtocol(self, addr):
print("connection from", addr)
return SocketInDns()
if __name__ == '__main__':
endpoint = TCP4ServerEndpoint(reactor, bromine.CONFIG['port'])
endpoint.listen(ClientFactory())
reactor.run()