Skip to content

Commit

Permalink
code optimilization + isodateformat added
Browse files Browse the repository at this point in the history
  • Loading branch information
Bram van Dartel committed Jan 12, 2024
1 parent 47ed83d commit 6b1bd91
Show file tree
Hide file tree
Showing 15 changed files with 345 additions and 434 deletions.
188 changes: 104 additions & 84 deletions custom_components/afvalwijzer/collector/circulus.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,103 +2,123 @@
from ..common.main_functions import _waste_type_rename
from datetime import datetime, timedelta
import re

import requests
from urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


def get_waste_data_raw(provider, postal_code, street_number, suffix):
try:
suffix = suffix.strip().upper()

url = SENSOR_COLLECTORS_CIRCULUS[provider]

raw_response = requests.get(url, timeout=60, verify=False)
cookies = raw_response.cookies
session_cookie = ""
logged_in_cookies = ""
for item in cookies.items():
if item[0] == "CB_SESSION":
session_cookie = item[1]

raw_response.raise_for_status() # Raise an HTTPError for bad responses

if session_cookie:
authenticityToken = re.search('__AT=(.*)&___TS=', session_cookie)[
1
]
data = {
'authenticityToken': authenticityToken,
'zipCode': postal_code,
'number': street_number,
}

raw_response = requests.post(
f'{url}/register/zipcode.json', data=data, cookies=cookies
)

response = raw_response.json()
if not response:
_LOGGER.error("No waste data found!")
return

if response["flashMessage"]:
addresses = response["customData"]["addresses"]
authenticationUrl = ""
if suffix:
search_pattern = f' {street_number} {suffix.lower()}'
for address in addresses:
if re.search(search_pattern, address["address"]):
authenticationUrl = address["authenticationUrl"]
break
else:
authenticationUrl = addresses[0]["authenticationUrl"]
if authenticationUrl:
response = requests.get(
url + authenticationUrl, cookies=cookies)

logged_in_cookies = raw_response.cookies

else:
_LOGGER.error("Unable to get Session Cookie")

if logged_in_cookies:
startDate = (datetime.now() - timedelta(days=14)).strftime("%Y-%m-%d")
endDate = (datetime.now() + timedelta(days=90)).strftime("%Y-%m-%d")

headers = {
'Content-Type': 'application/json'
}

response = requests.get(
f'{url}/afvalkalender.json?from={startDate}&till={endDate}',
headers=headers,
cookies=logged_in_cookies,
).json()
if not response or 'customData' not in response or not response['customData']['response']['garbage']:
_LOGGER.error('No Waste data found!')
return

waste_data_raw_temp = response['customData']['response']['garbage']
else:
_LOGGER.error("Unable to get Logged-in Cookie")
url = SENSOR_COLLECTORS_CIRCULUS.get(provider)

if not url:
raise ValueError(f"Invalid provider: {provider}, please verify")

response, logged_in_cookies = get_session_cookie(url, postal_code, street_number, suffix)

if not response:
_LOGGER.error("No waste data found!")
return
if response["flashMessage"]:
addresses = response["customData"]["addresses"]
authenticationUrl = ""
if suffix:
search_pattern = f' {street_number} {suffix.lower()}'
for address in addresses:
if re.search(search_pattern, address["address"]):
authenticationUrl = address["authenticationUrl"]
break
else:
authenticationUrl = addresses[0]["authenticationUrl"]
if authenticationUrl:
response = requests.get(
url + authenticationUrl, cookies=logged_in_cookies)
waste_data_raw = get_waste_data(logged_in_cookies, url)
except requests.exceptions.RequestException as err:
raise ValueError(err) from err

return waste_data_raw


def get_session_cookie(url, postal_code, street_number, suffix):
raw_response = requests.get(url, timeout=60, verify=False)
raw_response.raise_for_status() # Raise an HTTPError for bad responses

cookies = raw_response.cookies
if session_cookie := cookies.get("CB_SESSION", ""):

authenticity_token = re.search('__AT=(.*)&___TS=', session_cookie)
authenticity_token = authenticity_token[1] if authenticity_token else ""
data = {
'authenticityToken': authenticity_token,
'zipCode': postal_code,
'number': street_number,
}

raw_response = requests.post(
f'{url}/register/zipcode.json', data=data, cookies=cookies
)
response = raw_response.json()

logged_in_cookies = raw_response.cookies

return response, logged_in_cookies
else:
_LOGGER.error("Unable to get Session Cookie")
return None


def find_authentication_url(addresses, street_number, suffix):
if suffix:
search_pattern = f' {street_number} {suffix.lower()}'
else:
search_pattern = f' {street_number}'

return next(
(
address["authentication_url"]
for address in addresses
if re.search(search_pattern, address["address"])
),
"",
)


def get_waste_data(logged_in_cookies, url):
if logged_in_cookies:

start_date = (datetime.now() - timedelta(days=14)).strftime("%Y-%m-%d")
end_date = (datetime.now() + timedelta(days=90)).strftime("%Y-%m-%d")

headers = {
'Content-Type': 'application/json'
}

response = requests.get(
f'{url}/afvalkalender.json?from={start_date}&till={end_date}',
headers=headers,
cookies=logged_in_cookies,
).json()

if not response or 'customData' not in response or not response['customData']['response']['garbage']:
_LOGGER.error('No Waste data found!')
return []

waste_data_raw = []
for item in waste_data_raw_temp:
for item in response['customData']['response']['garbage']:
for date in item['dates']:
waste_type = _waste_type_rename(item["code"].strip().lower())
if not waste_type:
continue
temp = {"type": waste_type, "date": date}
# temp = datetime.strptime(sorted(item["pickupDates"])[0], "%Y-%m-%dT%H:%M:%S").strftime("%Y-%m-%d")
waste_data_raw.append(temp)
if waste_type := _waste_type_rename(item["code"].strip().lower()):
temp = {"type": waste_type, "date": date}
waste_data_raw.append(temp)

except requests.exceptions.RequestException as err:
raise ValueError(err) from err
except ValueError as err:
raise ValueError(f"Invalid and/or no data received from {url}") from err
return waste_data_raw
return waste_data_raw

else:
_LOGGER.error("Unable to get Logged-in Cookie")
return []


if __name__ == "__main__":
Expand Down
32 changes: 10 additions & 22 deletions custom_components/afvalwijzer/collector/deafvalapp.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,24 @@
from ..const.const import _LOGGER, SENSOR_COLLECTORS_DEAFVALAPP
from ..common.main_functions import _waste_type_rename
from datetime import datetime
import re

import requests
from urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


def get_waste_data_raw(
provider,
postal_code,
street_number,
suffix,
):
if provider not in SENSOR_COLLECTORS_DEAFVALAPP.keys():
def get_waste_data_raw(provider, postal_code, street_number, suffix):
if provider not in SENSOR_COLLECTORS_DEAFVALAPP:
raise ValueError(f"Invalid provider: {provider}, please verify")

corrected_postal_code_parts = re.search(r"(\d\d\d\d) ?([A-z][A-z])", postal_code)
corrected_postal_code = (
corrected_postal_code_parts[1] + corrected_postal_code_parts[2].upper()
)

try:
url = SENSOR_COLLECTORS_DEAFVALAPP[provider].format(
corrected_postal_code,
postal_code,
street_number,
suffix,
)
raw_response = requests.get(url, timeout=60, verify=False)
raw_response.raise_for_status() # Raise an HTTPError for bad responses
except requests.exceptions.RequestException as err:
raise ValueError(err) from err

Expand All @@ -39,17 +29,15 @@ def get_waste_data_raw(

if not response:
_LOGGER.error("No waste data found!")
return
return []

waste_data_raw = []

for rows in response.strip().split("\n"):
for ophaaldatum in rows.split(";")[1:-1]:
temp = {"type": _waste_type_rename(rows.split(";")[0].strip().lower())}
temp["date"] = datetime.strptime(ophaaldatum, "%d-%m-%Y").strftime(
"%Y-%m-%d"
)
waste_data_raw.append(temp)
for date in rows.split(";")[1:-1]:
waste_type = _waste_type_rename(rows.split(";")[0].strip().lower())
waste_date = datetime.strptime(date, "%d-%m-%Y").strftime("%Y-%m-%d")
waste_data_raw.append({"type": waste_type, "date": waste_date})

return waste_data_raw

Expand Down
6 changes: 1 addition & 5 deletions custom_components/afvalwijzer/collector/icalendar.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
from datetime import datetime
import re
import requests
import logging

from ..common.main_functions import _waste_type_rename
from ..const.const import SENSOR_COLLECTORS_ICALENDAR

_LOGGER = logging.getLogger(__name__)
from ..const.const import _LOGGER, SENSOR_COLLECTORS_ICALENDAR


def get_waste_data_raw(provider, postal_code, street_number, suffix):
Expand Down Expand Up @@ -69,5 +66,4 @@ def get_waste_data_raw(provider, postal_code, street_number, suffix):


if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
print("Yell something at a mountain!")
29 changes: 8 additions & 21 deletions custom_components/afvalwijzer/collector/mijnafvalwijzer.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
from ..const.const import (
_LOGGER,
SENSOR_COLLECTOR_TO_URL,
SENSOR_COLLECTORS_AFVALWIJZER,
)
from ..const.const import _LOGGER, SENSOR_COLLECTOR_TO_URL, SENSOR_COLLECTORS_AFVALWIJZER
from ..common.main_functions import _waste_type_rename
from datetime import datetime

import requests
from urllib3.exceptions import InsecureRequestWarning

requests.packages.urllib3.disable_warnings(InsecureRequestWarning)


def get_waste_data_raw(
provider,
postal_code,
street_number,
suffix,
):
def get_waste_data_raw(provider, postal_code, street_number, suffix):
if provider not in SENSOR_COLLECTORS_AFVALWIJZER:
raise ValueError(f"Invalid provider: {provider}, please verify")

Expand All @@ -32,26 +23,22 @@ def get_waste_data_raw(
datetime.now().strftime("%Y-%m-%d"),
)
raw_response = requests.get(url, timeout=60, verify=False)
raw_response.raise_for_status() # Raise an HTTPError for bad responses
except requests.exceptions.RequestException as err:
raise ValueError(err) from err

try:
response = raw_response.json()
ophaaldagen_data = response.get("ophaaldagen", {}).get("data", [])
ophaaldagen_next_data = response.get("ophaaldagenNext", {}).get("data", [])

if not response["ophaaldagen"]:
if not ophaaldagen_data and not ophaaldagen_next_data:
_LOGGER.error("Address not found or no data available!")
raise KeyError
except KeyError as err:
raise KeyError(f"Invalid and/or no data received from {url}") from err

try:
waste_data_raw = (
response["ophaaldagen"]["data"] + response["ophaaldagenNext"]["data"]
)
except KeyError as err:
raise KeyError(f"Invalid and/or no data received from {url}") from err

return waste_data_raw
return ophaaldagen_data + ophaaldagen_next_data


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 6b1bd91

Please sign in to comment.