Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polestar auth update #2076

Merged
merged 4 commits into from
Jan 2, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 80 additions & 29 deletions packages/modules/vehicles/polestar/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@
import os
import re
from datetime import datetime, timedelta
import base64
import hashlib
from modules.common.store import RAMDISK_PATH

AUTH_CLIENT_ID = 'l3oopkc_10'
BASE_URL = 'https://polestarid.eu.polestar.com'
REDIRECT_URI = 'https://www.polestar.com/sign-in-callback'

log = logging.getLogger(__name__)
AUTH_CLIENT_ID = "l3oopkc_10"


def b64urlencode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode().rstrip("=")


class PolestarAuth:
Expand All @@ -26,6 +35,16 @@ def __init__(self, username: str, password: str, vin: str) -> None:
self.token = None
self.token_file = str(RAMDISK_PATH)+'/polestar2_token_'+vin+'.json'
self.token_store: dict = {}
self.code_verifier = None
self.oidc_configuration = {}
self.update_oidc_configuration()

def update_oidc_configuration(self) -> None:
result = self.client_session.get(
BASE_URL+"/.well-known/openid-configuration"
)
result.raise_for_status()
self.oidc_configuration = result.json()

def delete_token(self) -> None:
self.access_token = None
Expand Down Expand Up @@ -79,42 +98,54 @@ def get_auth_token(self) -> str or None:
log.info("get_auth_token:using token from file %s", self.token_file)
return self.access_token
else:
log.info("get_auth_token:token from file %s expired. New authentication required", self.token_file)
log.info("get_auth_token:token from file %s expired. New token required", self.token_file)

code = self._get_auth_code()
if code is None:
return None
if self.refresh_token is not None:
# try refresh token
params = {
"grant_type": "refresh_token",
"client_id": AUTH_CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"refresh_token": self.refresh_token
}
log.info("get_auth_token:using refresh_token to get new token")
else:
# first get code, then token
code = self._get_auth_code()
if code is None:
return None

# get token
params = {
"query": "query getAuthToken($code: String) { getAuthToken(code: $code) { id_token access_token \
refresh_token expires_in }}",
"operationName": "getAuthToken",
"variables": json.dumps({"code": code})
}
log.info("get_auth_token:attempting to get new token")
params = {
"grant_type": "authorization_code",
"client_id": AUTH_CLIENT_ID,
"code": code,
"redirect_uri": REDIRECT_URI,
"code_verifier": self.code_verifier,
}

headers = {
"Content-Type": "application/json"
}
log.info("get_auth_token:attempting to get new token")
# get token
try:
result = self.client_session.get("https://pc-api.polestar.com/eu-north-1/auth/",
params=params, headers=headers)
result = self.client_session.post(self.oidc_configuration["token_endpoint"],
data=params)

except requests.RequestException as e:
log.error("get_auth_token:http error:%s", e)
self.refresh_token = None
return None
if result.status_code != 200:
log.error("get_auth_token:error:get response:%d", result.status_code)
self.refresh_token = None
return None

result_data = result.json()
log.info(result_data)

if result_data['data']['getAuthToken'] is not None:
self.access_token = result_data['data']['getAuthToken']['access_token']
self.refresh_token = result_data['data']['getAuthToken']['refresh_token']
if result_data['access_token'] is not None:
self.access_token = result_data['access_token']
self.refresh_token = result_data['refresh_token']
self.token_expiry = datetime.now(
) + timedelta(seconds=result_data['data']['getAuthToken']['expires_in'])
) + timedelta(seconds=result_data['expires_in'])
# save tokens to ramdisk
self.token_store['access_token'] = self.access_token
self.token_store['refresh_token'] = self.refresh_token
Expand All @@ -134,8 +165,9 @@ def _get_auth_code(self) -> str or None:
return None

params = {
'client_id': AUTH_CLIENT_ID
}
"grant_type": "authorization_code",
'client_id': AUTH_CLIENT_ID,
}
data = {
'pf.username': self.username,
'pf.pass': self.password
Expand All @@ -144,7 +176,7 @@ def _get_auth_code(self) -> str or None:
log.info("_get_auth_code:attempting to get new code")
try:
result = self.client_session.post(
f"https://polestarid.eu.polestar.com/as/{self.resume_path}/resume/as/authorization.ping",
BASE_URL+f"/as/{self.resume_path}/resume/as/authorization.ping",
params=params,
data=data
)
Expand All @@ -158,7 +190,7 @@ def _get_auth_code(self) -> str or None:
log.error("_get_auth_code:error:check username/password")
return None
# get code
m = re.search(r"code=(.+)", result.request.path_url)
m = re.search(r"code=([^&]+)", result.request.path_url)
if m is not None:
code = m.group(1)
log.info("_get_auth_code:got code %s", code)
Expand All @@ -170,7 +202,7 @@ def _get_auth_code(self) -> str or None:
log.info("_get_auth_code:accept terms and conditions for uid %s", uid)
data = {"pf.submit": True, "subject": uid}
result = self.client_session.post(
f"https://polestarid.eu.polestar.com/as/{self.resume_path}/resume/as/authorization.ping",
BASE_URL+f"/as/{self.resume_path}/resume/as/authorization.ping",
data=data,
)
m = re.search(r"code=(.+)", result.request.path_url)
Expand All @@ -189,11 +221,16 @@ def _get_auth_resumePath(self) -> str or None:
params = {
"response_type": "code",
"client_id": AUTH_CLIENT_ID,
"redirect_uri": "https://www.polestar.com/sign-in-callback"
"redirect_uri": REDIRECT_URI,
"state": self.get_state(),
"code_challenge": self.get_code_challenge(),
"code_challenge_method": "S256",
"scope": "openid profile email customer:attributes"
}

log.info("_get_auth_resumePath:attempting to get resumePath")
try:
result = self.client_session.get("https://polestarid.eu.polestar.com/as/authorization.oauth2",
result = self.client_session.get(self.oidc_configuration["authorization_endpoint"],
params=params)
except requests.RequestException as e:
log.error("_get_auth_resumePath:http error:%s", e)
Expand All @@ -210,3 +247,17 @@ def _get_auth_resumePath(self) -> str or None:
log.info("_get_auth_resumePath:error getting resumePath")

return resume_path

@staticmethod
def get_state() -> str:
return b64urlencode(os.urandom(32))

@staticmethod
def get_code_verifier() -> str:
return b64urlencode(os.urandom(32))

def get_code_challenge(self) -> str:
self.code_verifier = self.get_code_verifier()
m = hashlib.sha256()
m.update(self.code_verifier.encode())
return b64urlencode(m.digest())