Skip to content

Commit

Permalink
Polestar auth update (#2076)
Browse files Browse the repository at this point in the history
* auth adapted:code challenge using PKCE, refresh_tooken implemented

* auth adapted:code challenge using PKCE, refresh_tooken implemented

* Satisfy Flake checks

* Satisfy Flake checks
  • Loading branch information
isomacM authored Jan 2, 2025
1 parent ece3cf7 commit 9e8dc40
Showing 1 changed file with 80 additions and 29 deletions.
109 changes: 80 additions & 29 deletions packages/modules/vehicles/polestar/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@
import os
import re
from datetime import datetime, timedelta
import base64
import hashlib
from modules.common.store import RAMDISK_PATH

AUTH_CLIENT_ID = 'l3oopkc_10'
BASE_URL = 'https://polestarid.eu.polestar.com'
REDIRECT_URI = 'https://www.polestar.com/sign-in-callback'

log = logging.getLogger(__name__)
AUTH_CLIENT_ID = "l3oopkc_10"


def b64urlencode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode().rstrip("=")


class PolestarAuth:
Expand All @@ -26,6 +35,16 @@ def __init__(self, username: str, password: str, vin: str) -> None:
self.token = None
self.token_file = str(RAMDISK_PATH)+'/polestar2_token_'+vin+'.json'
self.token_store: dict = {}
self.code_verifier = None
self.oidc_configuration = {}
self.update_oidc_configuration()

def update_oidc_configuration(self) -> None:
result = self.client_session.get(
BASE_URL+"/.well-known/openid-configuration"
)
result.raise_for_status()
self.oidc_configuration = result.json()

def delete_token(self) -> None:
self.access_token = None
Expand Down Expand Up @@ -79,42 +98,54 @@ def get_auth_token(self) -> str or None:
log.info("get_auth_token:using token from file %s", self.token_file)
return self.access_token
else:
log.info("get_auth_token:token from file %s expired. New authentication required", self.token_file)
log.info("get_auth_token:token from file %s expired. New token required", self.token_file)

code = self._get_auth_code()
if code is None:
return None
if self.refresh_token is not None:
# try refresh token
params = {
"grant_type": "refresh_token",
"client_id": AUTH_CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"refresh_token": self.refresh_token
}
log.info("get_auth_token:using refresh_token to get new token")
else:
# first get code, then token
code = self._get_auth_code()
if code is None:
return None

# get token
params = {
"query": "query getAuthToken($code: String) { getAuthToken(code: $code) { id_token access_token \
refresh_token expires_in }}",
"operationName": "getAuthToken",
"variables": json.dumps({"code": code})
}
log.info("get_auth_token:attempting to get new token")
params = {
"grant_type": "authorization_code",
"client_id": AUTH_CLIENT_ID,
"code": code,
"redirect_uri": REDIRECT_URI,
"code_verifier": self.code_verifier,
}

headers = {
"Content-Type": "application/json"
}
log.info("get_auth_token:attempting to get new token")
# get token
try:
result = self.client_session.get("https://pc-api.polestar.com/eu-north-1/auth/",
params=params, headers=headers)
result = self.client_session.post(self.oidc_configuration["token_endpoint"],
data=params)

except requests.RequestException as e:
log.error("get_auth_token:http error:%s", e)
self.refresh_token = None
return None
if result.status_code != 200:
log.error("get_auth_token:error:get response:%d", result.status_code)
self.refresh_token = None
return None

result_data = result.json()
log.info(result_data)

if result_data['data']['getAuthToken'] is not None:
self.access_token = result_data['data']['getAuthToken']['access_token']
self.refresh_token = result_data['data']['getAuthToken']['refresh_token']
if result_data['access_token'] is not None:
self.access_token = result_data['access_token']
self.refresh_token = result_data['refresh_token']
self.token_expiry = datetime.now(
) + timedelta(seconds=result_data['data']['getAuthToken']['expires_in'])
) + timedelta(seconds=result_data['expires_in'])
# save tokens to ramdisk
self.token_store['access_token'] = self.access_token
self.token_store['refresh_token'] = self.refresh_token
Expand All @@ -134,8 +165,9 @@ def _get_auth_code(self) -> str or None:
return None

params = {
'client_id': AUTH_CLIENT_ID
}
"grant_type": "authorization_code",
'client_id': AUTH_CLIENT_ID,
}
data = {
'pf.username': self.username,
'pf.pass': self.password
Expand All @@ -144,7 +176,7 @@ def _get_auth_code(self) -> str or None:
log.info("_get_auth_code:attempting to get new code")
try:
result = self.client_session.post(
f"https://polestarid.eu.polestar.com/as/{self.resume_path}/resume/as/authorization.ping",
BASE_URL+f"/as/{self.resume_path}/resume/as/authorization.ping",
params=params,
data=data
)
Expand All @@ -158,7 +190,7 @@ def _get_auth_code(self) -> str or None:
log.error("_get_auth_code:error:check username/password")
return None
# get code
m = re.search(r"code=(.+)", result.request.path_url)
m = re.search(r"code=([^&]+)", result.request.path_url)
if m is not None:
code = m.group(1)
log.info("_get_auth_code:got code %s", code)
Expand All @@ -170,7 +202,7 @@ def _get_auth_code(self) -> str or None:
log.info("_get_auth_code:accept terms and conditions for uid %s", uid)
data = {"pf.submit": True, "subject": uid}
result = self.client_session.post(
f"https://polestarid.eu.polestar.com/as/{self.resume_path}/resume/as/authorization.ping",
BASE_URL+f"/as/{self.resume_path}/resume/as/authorization.ping",
data=data,
)
m = re.search(r"code=(.+)", result.request.path_url)
Expand All @@ -189,11 +221,16 @@ def _get_auth_resumePath(self) -> str or None:
params = {
"response_type": "code",
"client_id": AUTH_CLIENT_ID,
"redirect_uri": "https://www.polestar.com/sign-in-callback"
"redirect_uri": REDIRECT_URI,
"state": self.get_state(),
"code_challenge": self.get_code_challenge(),
"code_challenge_method": "S256",
"scope": "openid profile email customer:attributes"
}

log.info("_get_auth_resumePath:attempting to get resumePath")
try:
result = self.client_session.get("https://polestarid.eu.polestar.com/as/authorization.oauth2",
result = self.client_session.get(self.oidc_configuration["authorization_endpoint"],
params=params)
except requests.RequestException as e:
log.error("_get_auth_resumePath:http error:%s", e)
Expand All @@ -210,3 +247,17 @@ def _get_auth_resumePath(self) -> str or None:
log.info("_get_auth_resumePath:error getting resumePath")

return resume_path

@staticmethod
def get_state() -> str:
return b64urlencode(os.urandom(32))

@staticmethod
def get_code_verifier() -> str:
return b64urlencode(os.urandom(32))

def get_code_challenge(self) -> str:
self.code_verifier = self.get_code_verifier()
m = hashlib.sha256()
m.update(self.code_verifier.encode())
return b64urlencode(m.digest())

0 comments on commit 9e8dc40

Please sign in to comment.