-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPeer.py
467 lines (370 loc) · 12 KB
/
Peer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
"""
CSCI 334: Distributed Systems
Williams College
Spring 2014
Final Project: BLiP: Peer-to-Peer Chat
Authors:
Jeremy Boissevain
Nile Livingston
Nehemiah Paramore
This is going to be the main executable for the chat program.
Each Peer is composed of various threaded processes:
-A ServerThread that manages incoming chat requests.
-Some number of ChatThreads, each of which individually
manages an end-to-end chat session.
"""
# Libraries
import sys
import threading
import socket
import rsa
import os
import os.path
import random
import json
import entangled.kademlia.node
import twisted
import ast
import inspect
# Local files
import ServerThread as sv
import ChatThread as ch
import BLiPGUI as gui
import LoginGUI as lgui
class Peer():
def __init__(self, gui_enabled=True):
# A list to hold ChatThreads.
self.chats = []
# To hold (username, public user key) pairs.
self.friends = dict()
self.friendObjects = dict()
# The file path for the persistent friends list.
self.friendsJson = 'friends.json'
# The filename storing the user keys
self.userKeyFilePath = 'user_keys.pem'
# Loading data from Json file.
if os.path.isfile(self.friendsJson):
f = open(self.friendsJson, "r+")
try:
data = json.load(f)
for info in data.itervalues():
for key, val in info.iteritems():
if isinstance(val, unicode):
info[key] = val.encode('utf-8')
self.friendObjects = data
for friend, info in self.friendObjects.iteritems():
if info["key"] != None:
info["key"] = rsa.PublicKey.load_pkcs1(info["key"])
self.friends[friend] = info["key"]
except Exception, e:
print e
d = {}
json.dump(d, f)
f.close()
else:
data = open(self.friendsJson, "w")
data.close()
# The port for the listener server.
# TODO: Currently a command line argument for testing. Set to constant later.
self.port = int(sys.argv[1])
#the port for the kademlia node
self.udp_port = int(sys.argv[2])
# To hold the listener server.
self.server = None
# Is the user logged into the chat system?
self.authenticated = False
# Is the GUI enabled? Used for stress testing.
self.gui_enabled = gui_enabled
# Randomly assign port: used for stress testing
if not self.gui_enabled:
self.port = random.randint(40000, 50000)
# The TKinter GUI used for BLiP.
self.gui = None
self.username = None
# The key size for the public and private user keys.
self.key_size = 1024
# To hold the user's public and private user keys.
self.public_user_key = None
self.private_user_key = None
# DHT node thread
self.node = None
self.result = None
#########################################
# ACCESSOR METHODS
#########################################
# Return the list of active ChatThreads
def getActiveChats(self):
return self.chats
# Returns address of our own peer.
def getOwnAddress(self):
return (str(socket.gethostbyname(socket.getfqdn())), self.port)
# Returns the (IP, port) pair associated with a username.
# TODO: Currently hardcoded for Nile's machine.
# Do address resolution stuff with a real DHT.
def getAddress(self, username):
return ("137.165.169.58", 50007)
# key = username
# def gotValue(result):
# sender.result = result
# df = self.node.searchForKeywords([key])
# df.addCallback(gotValue)
# Returns the chat thread associated with a particular username.
# Return None if no such chat session exists.
def getChatSession(self, username):
for chat in self.chats:
if chat.getReceiver() == username: return chat
return None
# Return the list of friend usernames.
def getFriends(self):
output = self.friends.keys()
output.sort()
return output
# Return the private user key.
def getPrivateKey(self):
return self.private_user_key
# Return the public user key.
def getPublicKey(self):
return self.public_user_key
# Return the username.
def getUsername(self):
return self.username
# Get the public user key for a particular username.
def getUserKeyFor(self, username):
return self.friends[username]
# Is the user logged into the chat system?
def isAuthenticated(self):
return self.authenticated
#########################################
# MUTATOR METHODS
#########################################
# Add a username to the friends list without knowing public key.
def addFriend(self, username):
if username in self.friends:
if self.gui_enabled:
self.gui.showMessage("This username is already in your friends list.")
return
# Forever alone
if username == self.username:
if self.gui_enabled:
self.gui.showMessage("You cannot be friends with yourself.")
return
self.addUserAndKey(username, None, None, None)
# Add a username and associated key to friends list.
def addUserAndKey(self, username, key, ip, port):
#change friend dictionary
friend = {"key":key, "ip": ip, "port":port}
self.friendObjects[username] = friend
self.friends[username] = key
#remove existing file
os.remove(self.friendsJson)
#copy the friendObjects, and then change the value of key so that it can be saved in a file
tmp = self.friendObjects.copy()
if tmp[username]["key"] != None:
tmp[username]["key"] = tmp[username]["key"].save_pkcs1()
#write to file
f = open(self.friendsJson, "w")
json.dump(tmp, f)
# Update the dict and the GUI.
self.friends[username] = key
if self.gui_enabled:
self.gui.updateFriends()
if self.gui_enabled:
self.gui.updateFriends()
# Add a ChatThread to the list and start it.
def addNewChatThread(self, socket, auth_stance):
chat = ch.ChatThread(self, socket, self.gui, auth_stance)
self.chats.append(chat)
chat.start()
# End the chat session associated with a particular username.
# stance describes whether we're leaving or being left.
# values are "ACTIVE" and "PASSIVE", respectively.
def endChat(self, username, stance):
for chat in self.chats:
if chat.getReceiver() == username:
chat.exit(stance)
self.chats.remove(chat)
if self.gui_enabled:
self.gui.updateChatSessions()
self.gui.updateChatLog()
return
# Make a TCP connection and start a new chat thread.
def initiateChat(self, username):
# Get the address and make the connection.
(IP, port) = self.getAddress(username) # we need to ensure that this calls
# DHT stuff.
#self.getAddress(self, username)
#(IP, port) = self.getValue(username)
#IP, port = None, None
#if self.result != None:
#print self.result
#(IP, port) = self.result
sock = self.makeConnection(IP, port)
# If socket fails, friend is not online.
if sock == None:
if self.gui_enabled:
self.gui.showMessage("Friend is not online.")
return
# Create chat thread.
self.addNewChatThread(sock, "ACTIVE")
# Authenticate the user.
def login(self, username):
self.username = username
# .pem file exists, read from it.
if os.path.isfile(self.userKeyFilePath):
keyFile = open(self.userKeyFilePath, 'r').read()
self.public_user_key = rsa.PublicKey.load_pkcs1(keyFile)
self.private_user_key = rsa.PrivateKey.load_pkcs1(keyFile)
# .pem file doesn't exist, generate new keys and write to file.
else:
(self.public_user_key, self.private_user_key) = rsa.newkeys(self.key_size)
keyFile = open(self.userKeyFilePath, 'w')
public_Pem = self.public_user_key.save_pkcs1()
private_Pem = self.private_user_key.save_pkcs1()
keyFile.write(public_Pem)
keyFile.write(private_Pem)
self.authenticated = True
# Start up the listener server.
self.server = sv.ServerThread(self, self.port)
# the server listening may be blocking the twisted port
self.server.start()
#Create entangled node and start node thread
self.node = entangled.node.EntangledNode( udpPort = self.udp_port )
#Generate list of known friend IPs
ipList = []
portList = []
for info in self.friendObjects.itervalues():
if info["ip"] != None:
ipList.append( info["ip"])
portList.append( int(info["port"]))
# Attempt to join network using list of known IPs
result = zip(ipList, portList)
self.node.joinNetwork(result)
self.node.printContacts()
#TODO: Add notification if none of the IPs found were online and able to be joined.
# nothing is being published.
self.publishData()
############################
#### Callback functions ####
############################
def genericErrorCallback(self,failure):
print 'Error occured:', failure.getErrorMessage()
twisted.internet.reactor.callLater(0, self.stop)
def stop():
twisted.internet.reactor.stop()
def getValue(self, key):
print "getValue called."
print key
key = key.encode('utf-8')
df = self.node.searchForKeywords(key)
df.addCallback(self.getValueCallback)
df.addErrback(self.genericErrorCallback)
def getValueCallback(self, result):
if type(result) == dict:
print result
return result
else:
print 'Value not found'
print 'Scheduling key removal'
twisted.internet.reactor.callLater(1, self.deleteValue)
def publishDataCallback(self, *args, **kwargs):
print "Data published in the DHT"
print "Scheduling retrieval of published data"
twisted.internet.reactor.callLater(1, self.getValue)
def publishData(self):
df = self.node.publishData(self.username, self.getOwnAddress())
print self.username
df.addCallback(self.publishDataCallback)
df.addErrback(self.genericErrorCallback)
# Exit out of everything.
def logout(self):
for chat in self.chats:
chat.exit("ACTIVE")
self.server.exit()
self.authenticated = False
# Make a TCP connection with (IP, port) and return socket.
# Return None if socket fails.
def makeConnection(self, IP, port):
# DEBUGGING tool
print "Connecting to " + IP + ":" + str(port) + "..."
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
except:
print "CLIENT: Socket error: socket()"
return None
try:
sock.connect((IP, port))
return sock
except:
print "CLIENT: Socket error: connect()"
return None
# Remove a username from the friends list.
def removeFriend(self, username):
if username in self.friends:
# Update dicts
self.friends.pop(username)
self.friendObjects.pop(username)
#remove the file
os.remove(self.friendsJson)
#recreate the file
f = open(self.friendsJson, 'w+')
#copy data, modify to allow json serialization
tmp = self.friendObjects.copy()
for info in tmp.itervalues():
if info["key"] != None:
info["key"] = info["key"].save_pkcs1()
#dump updated friend dict to file
json.dump(tmp, f)
# Update dict and GUI.
if self.gui_enabled:
self.gui.updateFriends()
else:
if self.gui_enabled:
self.gui.showMessage("This username is not in your friends list.")
return
# Start up the BLiP GUI.
def startGUI(self):
self.gui = gui.BLiPGUI(self)
self.gui.start()
# Send a message to the receiving end of an active chat.
# If no such user is active, return None.
def sendMessage(self, username, message):
for thread in self.chats:
if thread.getReceiver() == username:
thread.sendMessage(message)
return
return None
def stressSendMessage(self, username, message):
while True:
for thread in self.chats:
if thread.getReceiver() == username:
while thread.isAuthenticating():
pass
for i in range(1, 11):
thread.sendMessage("[" + str(i) + "]" + message)
return
################################################################
# Main method stuff below.
################################################################
# Login/logout loop for BLiP.
def run(peer):
loginwindow = lgui.LoginGUI(peer)
loginwindow.run()
while not peer.isAuthenticated():
pass
peer.startGUI()
while peer.isAuthenticated():
pass
run(peer)
# Used for stress testing number of concurrent chats.
def stressRun(the_peer, receiver):
# Use random username to avoid conflicts
the_peer.login(str(random.getrandbits(20)))
the_peer.addFriend(receiver)
the_peer.initiateChat(receiver)
the_peer.stressSendMessage(receiver, "This is a stress message. It is pretty short.")
the_peer.endChat(receiver, "ACTIVE")
the_peer.logout()
if __name__ == '__main__':
peer = Peer()
run(peer)
twisted.internet.reactor.run()