-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathflask_htpasswd.py
207 lines (183 loc) · 6.4 KB
/
flask_htpasswd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
#!/usr/bin/env python
"""
Decorator, configuration, and error handler for basic and token
authentication using htpasswd files
"""
from __future__ import absolute_import, unicode_literals
from functools import wraps
import hashlib
import jwt
import logging
from flask import request, Response, current_app, g
from passlib.apache import HtpasswdFile
log = logging.getLogger(__name__) # pylint: disable=invalid-name
class HtPasswdAuth:
"""Configure htpasswd based basic and token authentication."""
def __init__(self, app=None):
"""Boiler plate extension init with log_level being declared"""
self.users = HtpasswdFile()
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
"""
Find and configure the user database from specified file
"""
# pylint: disable=inconsistent-return-statements
app.config.setdefault('FLASK_AUTH_ALL', False)
app.config.setdefault('FLASK_AUTH_REALM', 'Login Required')
# Default set to bad file to trigger IOError
app.config.setdefault('FLASK_HTPASSWD_PATH', '/^^^/^^^')
# Load up user database
try:
self.load_users(app)
except IOError:
log.critical(
'No htpasswd file loaded, please set `FLASK_HTPASSWD`'
'or `FLASK_HTPASSWD_PATH` environment variable to a '
'valid apache htpasswd file.'
)
# Allow requiring auth for entire app, with pre request method
@app.before_request
def require_auth():
# pylint: disable=unused-variable
"""Pre request processing for enabling full app authentication."""
if not current_app.config['FLASK_AUTH_ALL']:
return None
is_valid, user = self.authenticate()
if not is_valid:
return self.auth_failed()
g.user = user
def load_users(self, app):
"""
Load users from configured file.
Args:
app (flask.Flask): Flask application to load users from.
Raises:
IOError: If the configured htpasswd file does not exist.
Returns:
None
"""
self.users = HtpasswdFile(
app.config['FLASK_HTPASSWD_PATH']
)
def check_basic_auth(self, username, password):
"""
This function is called to check if a username /
password combination is valid via the htpasswd file.
"""
valid = self.users.check_password(
username, password
)
if not valid:
log.warning('Invalid login from %s', username)
valid = False
return (
valid,
username
)
def get_signature(self):
"""
Setup crypto sig.
"""
with self.app.app_context():
return current_app.config['FLASK_SECRET']
def get_hashhash(self, username):
"""
Generate a digest of the htpasswd hash
"""
userhash = self.users.get_hash(username)
if not userhash:
return ''
return hashlib.sha256(userhash).hexdigest()
def generate_token(self, username):
"""
assumes user exists in htpasswd file.
Return the token for the given user by signing a token of
the username and a hash of the htpasswd string.
"""
key = self.get_signature()
return jwt.encode({
'username': username,
'hashhash': self.get_hashhash(username)
}, key, algorithm="HS512")
def check_token_auth(self, token):
"""
Check to see who this is and if their token gets
them into the system.
"""
key = self.get_signature()
try:
data = jwt.decode(token, key, algorithms=["HS512"])
except:
log.warning('Received bad token signature')
return False, None
if data['username'] not in self.users.users():
log.warning(
'Token auth signed message, but invalid user %s',
data['username']
)
return False, None
if data['hashhash'] != self.get_hashhash(data['username']):
log.warning(
'Token and password do not match, %s '
'needs to regenerate token',
data['username']
)
return False, None
return True, data['username']
@staticmethod
def auth_failed():
"""
Sends a 401 response that enables basic auth
"""
return Response(
'Could not verify your access level for that URL.\n'
'You have to login with proper credentials',
401,
{'WWW-Authenticate': 'Basic realm="{0}"'.format(
current_app.config['FLASK_AUTH_REALM']
)}
)
def authenticate(self):
"""Authenticate user by any means and return either true or false.
Args:
Returns:
tuple (is_valid, username): True is valid user, False if not
"""
basic_auth = request.authorization
is_valid = False
user = None
if basic_auth:
is_valid, user = self.check_basic_auth(
basic_auth.username, basic_auth.password
)
else: # Try token auth
token = request.headers.get('Authorization', None)
param_token = request.args.get('access_token')
if token or param_token:
if token:
# slice the 'token ' piece of the header (following
# github style):
token = token[6:]
else:
# Grab it from query dict instead
token = param_token
log.debug('Received token: %s', token)
is_valid, user = self.check_token_auth(token)
return (is_valid, user)
def required(self, func):
"""
Decorator function with basic and token authentication handler
"""
@wraps(func)
def decorated(*args, **kwargs):
"""
Actual wrapper to run the auth checks.
"""
is_valid, user = self.authenticate()
if not is_valid:
return self.auth_failed()
kwargs['user'] = user
return func(*args, **kwargs)
return decorated