forked from MihaiBalint/Sanic-HTTPAuth
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsanic_httpauth_compat.py
304 lines (262 loc) · 9.57 KB
/
sanic_httpauth_compat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# Borrowed code from werkzeug: https://github.com/pallets/werkzeug
import base64
import hmac
import logging
import sanic.response
import sys
from sanic.request import Request
from urllib.request import parse_http_list as _parse_list_header
log = logging.getLogger(__name__)
_builtin_safe_str_cmp = getattr(hmac, "compare_digest", None)
def safe_str_cmp(a, b):
"""This function compares strings in somewhat constant time. This
requires that the length of at least one string is known in advance.
Returns `True` if the two strings are equal, or `False` if they are not.
.. versionadded:: 0.7
"""
if isinstance(a, str):
a = a.encode("utf-8")
if isinstance(b, str):
b = b.encode("utf-8")
if _builtin_safe_str_cmp is not None:
return _builtin_safe_str_cmp(a, b)
if len(a) != len(b):
return False
rv = 0
if PY2:
for x, y in izip(a, b):
rv |= ord(x) ^ ord(y)
else:
for x, y in izip(a, b):
rv |= x ^ y
return rv == 0
class ImmutableDictMixin(object):
"""Makes a :class:`dict` immutable.
.. versionadded:: 0.5
:private:
"""
_hash_cache = None
@classmethod
def fromkeys(cls, keys, value=None):
instance = super(cls, cls).__new__(cls)
instance.__init__(zip(keys, repeat(value)))
return instance
def __reduce_ex__(self, protocol):
return type(self), (dict(self),)
def _iter_hashitems(self):
return iteritems(self)
def __hash__(self):
if self._hash_cache is not None:
return self._hash_cache
rv = self._hash_cache = hash(frozenset(self._iter_hashitems()))
return rv
def setdefault(self, key, default=None):
is_immutable(self)
def update(self, *args, **kwargs):
is_immutable(self)
def pop(self, key, default=None):
is_immutable(self)
def popitem(self):
is_immutable(self)
def __setitem__(self, key, value):
is_immutable(self)
def __delitem__(self, key):
is_immutable(self)
def clear(self):
is_immutable(self)
class Authorization(ImmutableDictMixin, dict):
"""Represents an `Authorization` header sent by the client. You should
not create this kind of object yourself but use it when it's returned by
the `parse_authorization_header` function.
This object is a dict subclass and can be altered by setting dict items
but it should be considered immutable as it's returned by the client and
not meant for modifications.
.. versionchanged:: 0.5
This object became immutable.
"""
def __init__(self, auth_type, data=None):
dict.__init__(self, data or {})
self.type = auth_type
username = property(
lambda self: self.get("username"),
doc="""
The username transmitted. This is set for both basic and digest
auth all the time.""",
)
password = property(
lambda self: self.get("password"),
doc="""
When the authentication type is basic this is the password
transmitted by the client, else `None`.""",
)
realm = property(
lambda self: self.get("realm"),
doc="""
This is the server realm sent back for HTTP digest auth.""",
)
nonce = property(
lambda self: self.get("nonce"),
doc="""
The nonce the server sent for digest auth, sent back by the client.
A nonce should be unique for every 401 response for HTTP digest
auth.""",
)
uri = property(
lambda self: self.get("uri"),
doc="""
The URI from Request-URI of the Request-Line; duplicated because
proxies are allowed to change the Request-Line in transit. HTTP
digest auth only.""",
)
nc = property(
lambda self: self.get("nc"),
doc="""
The nonce count value transmitted by clients if a qop-header is
also transmitted. HTTP digest auth only.""",
)
cnonce = property(
lambda self: self.get("cnonce"),
doc="""
If the server sent a qop-header in the ``WWW-Authenticate``
header, the client has to provide this value for HTTP digest auth.
See the RFC for more details.""",
)
response = property(
lambda self: self.get("response"),
doc="""
A string of 32 hex digits computed as defined in RFC 2617, which
proves that the user knows a password. Digest auth only.""",
)
opaque = property(
lambda self: self.get("opaque"),
doc="""
The opaque header from the server returned unchanged by the client.
It is recommended that this string be base64 or hexadecimal data.
Digest auth only.""",
)
qop = property(
lambda self: self.get("qop"),
doc="""
Indicates what "quality of protection" the client has applied to
the message for HTTP digest auth. Note that this is a single token,
not a quoted list of alternatives as in WWW-Authenticate.""",
)
def to_unicode(
x, charset=sys.getdefaultencoding(), errors="strict", allow_none_charset=False
):
if x is None:
return None
if not isinstance(x, bytes):
return str(x)
if charset is None and allow_none_charset:
return x
return x.decode(charset, errors)
def bytes_to_wsgi(data):
assert isinstance(data, bytes), "data must be bytes"
if isinstance(data, str):
return data
else:
return data.decode("latin1")
def wsgi_to_bytes(data):
"""coerce wsgi unicode represented bytes to real ones"""
if isinstance(data, bytes):
return data
return data.encode("latin1") # XXX: utf8 fallback?
def unquote_header_value(value, is_filename=False):
r"""Unquotes a header value. (Reversal of :func:`quote_header_value`).
This does not use the real unquoting but what browsers are actually
using for quoting.
.. versionadded:: 0.5
:param value: the header value to unquote.
"""
if value and value[0] == value[-1] == '"':
# this is not the real unquoting, but fixing this so that the
# RFC is met will result in bugs with internet explorer and
# probably some other browsers as well. IE for example is
# uploading files with "C:\foo\bar.txt" as filename
value = value[1:-1]
# if this is a filename and the starting characters look like
# a UNC path, then just return the value without quotes. Using the
# replace sequence below on a UNC path has the effect of turning
# the leading double slash into a single slash and then
# _fix_ie_filename() doesn't work correctly. See #458.
if not is_filename or value[:2] != "\\\\":
return value.replace("\\\\", "\\").replace('\\"', '"')
return value
def parse_dict_header(value, cls=dict):
"""Parse lists of key, value pairs as described by RFC 2068 Section 2 and
convert them into a python dict (or any other mapping object created from
the type with a dict like interface provided by the `cls` argument):
>>> d = parse_dict_header('foo="is a fish", bar="as well"')
>>> type(d) is dict
True
>>> sorted(d.items())
[('bar', 'as well'), ('foo', 'is a fish')]
If there is no value for a key it will be `None`:
>>> parse_dict_header('key_without_value')
{'key_without_value': None}
To create a header from the :class:`dict` again, use the
:func:`dump_header` function.
.. versionchanged:: 0.9
Added support for `cls` argument.
:param value: a string with a dict header.
:param cls: callable to use for storage of parsed results.
:return: an instance of `cls`
"""
result = cls()
if not isinstance(value, str):
# XXX: validate
value = bytes_to_wsgi(value)
for item in _parse_list_header(value):
if "=" not in item:
result[item] = None
continue
name, value = item.split("=", 1)
if value[:1] == value[-1:] == '"':
value = unquote_header_value(value[1:-1])
result[name] = value
return result
def parse_authorization_header(value):
"""Parse an HTTP basic/digest authorization header transmitted by the web
browser. The return value is either `None` if the header was invalid or
not given, otherwise an :class:`~werkzeug.datastructures.Authorization`
object.
:param value: the authorization header to parse.
:return: a :class:`~werkzeug.datastructures.Authorization` object or `None`.
"""
if not value:
return
value = wsgi_to_bytes(value)
try:
auth_type, auth_info = value.split(None, 1)
auth_type = auth_type.lower()
except ValueError:
return
if auth_type == b"basic":
try:
username, password = base64.b64decode(auth_info).split(b":", 1)
except Exception:
return
return Authorization(
"basic",
{
"username": to_unicode(username, "utf-8"),
"password": to_unicode(password, "utf-8"),
},
)
elif auth_type == b"digest":
auth_map = parse_dict_header(auth_info)
for key in "username", "realm", "nonce", "uri", "response":
if key not in auth_map:
return
if "qop" in auth_map:
if not auth_map.get("nc") or not auth_map.get("cnonce"):
return
return Authorization("digest", auth_map)
def get_request(*args, **kwargs):
for a in args:
if isinstance(a, Request):
return a
for k, v in kwargs.items():
if isinstance(v, Request):
return v