From 28b2be087f561b492537f78dbf94496499e31899 Mon Sep 17 00:00:00 2001 From: James Scott Date: Sun, 7 Jan 2024 16:07:56 -0500 Subject: [PATCH] Ran the test suites and fixed messages. --- starlette/middleware/sessions.py | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/starlette/middleware/sessions.py b/starlette/middleware/sessions.py index c7deef485..9f1a07f1b 100644 --- a/starlette/middleware/sessions.py +++ b/starlette/middleware/sessions.py @@ -1,7 +1,7 @@ import json import typing -from datetime import datetime, timedelta, timezone from base64 import b64decode, b64encode +from datetime import datetime, timedelta, timezone import itsdangerous from itsdangerous.exc import BadSignature, SignatureExpired @@ -22,7 +22,7 @@ def __init__( same_site: typing.Literal["lax", "strict", "none"] = "lax", https_only: bool = False, persist_session: bool = False, - auto_refresh_window: int = 0, # seconds, default 0 to not auto refresh, 240 seconds for 4 minute window to refresh + auto_refresh_window: int = 0, domain: typing.Optional[str] = None, ) -> None: self.app = app @@ -39,12 +39,12 @@ def __init__( self.security_flags += f"; domain={domain}" - def decode_cookie(self,cookie): - result = {"session": {}} + def decode_cookie(self,cookie: bytes) -> typing.Dict[str,typing.Any]: + result: typing.Dict[str, typing.Any] = {"session": {}} try: - data = self.signer.unsign(cookie, max_age=self.max_age,return_timestamp=True) - result["session"] = json.loads(b64decode(data[0])) #first element of the data array is the json - result["datetime"] = data[1] #second element of the data array returned is a datetime object. + data = self.signer.unsign(cookie,max_age=self.max_age,return_timestamp=True) + result["session"] = json.loads(b64decode(data[0])) + result["datetime"] = data[1] #DateTime obj except (BadSignature, SignatureExpired): return result return result @@ -58,14 +58,14 @@ async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: update_session = True if self.session_cookie in connection.cookies: - data = self.decode_cookie(connection.cookies[self.session_cookie].encode("utf-8")) + data = self.decode_cookie(connection.cookies[self.session_cookie].encode("utf-8")) # noqa E501 scope["session"] = data["session"] - scope["exp"] = data["datetime"] + timedelta(seconds=self.max_age) + scope["exp"] = data["datetime"] + timedelta(seconds=self.max_age) # type: ignore[arg-type] if self.auto_refresh_window: now = datetime.now(timezone.utc) #if the expiry date not inside of the expiry window, do not update. - if not (now >= (scope["exp"] - timedelta(seconds=self.auto_refresh_window)) and now <= scope["exp"]): + if not (now >= (scope["exp"] - timedelta(seconds=self.auto_refresh_window)) and now <= scope["exp"]): # noqa E501 update_session = False elif self.persist_session: update_session = False @@ -77,8 +77,8 @@ async def send_wrapper(message: Message) -> None: session_changed = False if message["type"] == "http.response.start": if self.session_cookie in connection.cookies: - previous_session_data = self.decode_cookie(connection.cookies[self.session_cookie].encode("utf-8")) - if (previous_session_data["session"] and scope["session"]) and previous_session_data["session"] != scope["session"]: + previous_session_data = self.decode_cookie(connection.cookies[self.session_cookie].encode("utf-8")) # noqa E501 + if (previous_session_data["session"] and scope["session"]) and previous_session_data["session"] != scope["session"]: # noqa E501 session_changed = True if scope["session"] and (update_session or session_changed): @@ -95,7 +95,6 @@ async def send_wrapper(message: Message) -> None: ) headers.append("Set-Cookie", header_value) elif update_session and not scope["session"]: - # The session is cleared. BadSignature/SignatureExpired or the initial scope session was empty headers = MutableHeaders(scope=message) header_value = "{session_cookie}={data}; path={path}; {expires}{security_flags}".format( # noqa E501 session_cookie=self.session_cookie,