From 732188e8aacb4c60ed85dd329062c4da2118eb00 Mon Sep 17 00:00:00 2001 From: "firstof9@gmail.com" Date: Sun, 9 Jun 2024 17:08:39 -0700 Subject: [PATCH] feat: add amazon otp sensor --- custom_components/mail_and_packages/const.py | 8 +++ .../mail_and_packages/helpers.py | 66 +++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/custom_components/mail_and_packages/const.py b/custom_components/mail_and_packages/const.py index 0e2ccc30..c92d1146 100644 --- a/custom_components/mail_and_packages/const.py +++ b/custom_components/mail_and_packages/const.py @@ -164,6 +164,9 @@ "pt_BR.UTF-8", "", ] +AMAZON_OTP = "amazon_otp" +AMAZON_OTP_REGEX = "[0-9]{6}" +AMAZON_OTP_SUBJECT = "A one-time password is required for your Amazon delivery" # Sensor Data SENSOR_DATA = { @@ -721,6 +724,11 @@ icon="mdi:package", key="amazon_hub", ), + "amazon_otp": SensorEntityDescription( + name="Mail Amazon OTP Code", + icon="mdi:counter", + key="amazon_otp", + ), # Canada Post "capost_delivered": SensorEntityDescription( name="Mail Canada Post Delivered", diff --git a/custom_components/mail_and_packages/helpers.py b/custom_components/mail_and_packages/helpers.py index e80925ec..d27f8ae9 100644 --- a/custom_components/mail_and_packages/helpers.py +++ b/custom_components/mail_and_packages/helpers.py @@ -49,6 +49,9 @@ AMAZON_HUB_SUBJECT_SEARCH, AMAZON_IMG_PATTERN, AMAZON_ORDER, + AMAZON_OTP, + AMAZON_OTP_REGEX, + AMAZON_OTP_SUBJECT, AMAZON_PACKAGES, AMAZON_PATTERN, AMAZON_SHIPMENT_TRACKING, @@ -389,6 +392,8 @@ async def fetch( info = amazon_exception(account, amazon_fwds) count[sensor] = info[ATTR_COUNT] count[AMAZON_EXCEPTION_ORDER] = info[ATTR_ORDER] + elif sensor == AMAZON_OTP: + count[sensor] = amazon_otp(account, amazon_fwds) elif "_packages" in sensor: prefix = sensor.replace("_packages", "") delivering = await fetch(hass, config, account, data, f"{prefix}_delivering") @@ -1238,6 +1243,67 @@ def amazon_hub(account: Type[imaplib.IMAP4_SSL], fwds: Optional[str] = None) -> return info +def amazon_otp(account: Type[imaplib.IMAP4_SSL], fwds: Optional[list] = None) -> dict: + """Find Amazon exception emails. + + Returns dict of sensor data + """ + tfmt = get_formatted_date() + info = {} + body_regex = AMAZON_OTP_REGEX + domains = AMAZON_DOMAINS + if isinstance(fwds, list): + for fwd in fwds: + if fwd and fwd != '""' and fwd not in domains: + domains.append(fwd) + _LOGGER.debug("Amazon email adding %s to list", str(fwd)) + + _LOGGER.debug("Amazon domains to be checked: %s", str(domains)) + + for domain in domains: + if "@" in domain: + email_address = domain.strip('"') + _LOGGER.debug("Amazon email search address: %s", str(email_address)) + else: + email_address = [] + email_address.append(f"{AMAZON_EMAIL}{domain}") + _LOGGER.debug("Amazon email search address: %s", str(email_address)) + + (server_response, sdata) = email_search( + account, email_address, tfmt, AMAZON_OTP_SUBJECT + ) + + if server_response == "OK": + id_list = sdata[0].split() + _LOGGER.debug("Found Amazon OTP email(s): %s", str(len(id_list))) + found = [] + for i in id_list: + data = email_fetch(account, i, "(RFC822)")[1] + for response_part in data: + if isinstance(response_part, tuple): + msg = email.message_from_bytes(response_part[1]) + + # Get code from message body + try: + email_msg = quopri.decodestring( + str(msg.get_payload(0)) + ) # msg.get_payload(0).encode('utf-8') + except Exception as err: + _LOGGER.debug( + "Problem decoding email message: %s", str(err) + ) + continue + email_msg = email_msg.decode("utf-8", "ignore") + pattern = re.compile(rf"{body_regex}") + search = pattern.search(email_msg) + if search is not None: + if len(search.groups()) > 1: + found.append(search.group(2)) + + info[ATTR_CODE] = found + return info + + def amazon_exception( account: Type[imaplib.IMAP4_SSL], fwds: Optional[list] = None ) -> dict: