-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathuserdb.py
117 lines (94 loc) · 2.92 KB
/
userdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#!/usr/bin/env python
import sys, sqlite3, scrypt, random
def randstr(length):
return ''.join(chr(random.randint(0,255)) for i in range(length))
def create_hash(pin):
return scrypt.encrypt(randstr(64), pin, maxtime=1.).encode('hex')
def verify_hash(pin_hash, pin):
try:
scrypt.decrypt(pin_hash.decode('hex'), pin)
return True
except scrypt.error:
return False
USER_FIELDS = [
'rfid',
'hash',
'authorised',
]
USER_TYPES = {
'rfid': str,
'hash': str,
'authorised': int
}
def user_dict(row):
return dict( zip( USER_FIELDS, row) )
def init_db(conn):
c = conn.cursor()
c.execute('''CREATE TABLE users '''+\
'''(rfid text unique, hash text, authorised integer)''')
conn.commit()
def verify_login(conn, rfid, pin):
c = conn.cursor()
c.execute('''SELECT rfid, hash, authorised FROM users WHERE rfid=?''', (rfid,) )
result = c.fetchall()
# prevent infoleak :-)
c.execute('''SELECT rfid, hash, authorised FROM users LIMIT 1''' )
result_fake = c.fetchall()
if len(result) > 0:
pin_hash = result[0][1]
authorised = result[0][2]
elif len(result_fake) > 0:
pin_hash = result_fake[0][1]
authorised = 0
else:
return None
if verify_hash(pin_hash, pin) and authorised:
return user_dict(result[0])
else:
return None
def import_user(conn, rfid, pinhash, authorised):
c = conn.cursor()
try:
c.execute('''INSERT INTO users (rfid, hash, authorised) VALUES (?,?,?)''',
(rfid, pinhash, int(authorised)) )
conn.commit()
except sqlite3.IntegrityError:
return False
return c.rowcount != 0
def add_user(conn, rfid, pin, authorised):
return import_user(conn, rfid, create_hash(pin), authorised)
def update_pin(conn, rfid, pin):
c = conn.cursor()
c.execute('''UPDATE users SET hash=? WHERE rfid=?''',
(create_hash(pin), rfid) )
conn.commit()
return c.rowcount != 0
def enable(conn, rfid):
c = conn.cursor()
c.execute('''UPDATE users SET authorised=1 WHERE rfid=?''', (rfid,))
conn.commit()
return c.rowcount != 0
def disable(conn, rfid):
c = conn.cursor()
c.execute('''UPDATE users SET authorised=0 WHERE rfid=?''', (rfid,))
conn.commit()
return c.rowcount != 0
def find_user(conn, rfid):
c = conn.cursor()
c.execute('''SELECT rfid, hash, authorised FROM users WHERE rfid=?''', (rfid,) )
result = c.fetchall()
if len(result) > 0:
return user_dict(result[0])
else:
return None
def get_users(conn):
c = conn.cursor()
c.execute('''SELECT rfid, hash, authorised FROM users''')
return [ user_dict(row) for row in c ]
def user_exists(conn, rfid):
return find_user(conn, rfid) != None
def del_user(conn, rfid):
c = conn.cursor()
c.execute('''DELETE FROM users WHERE rfid=?''', (rfid,) )
conn.commit()
return c.rowcount