Skip to content

Commit

Permalink
feat: add amazon otp sensor
Browse files Browse the repository at this point in the history
  • Loading branch information
firstof9 committed Jun 10, 2024
1 parent 50b5433 commit 732188e
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
8 changes: 8 additions & 0 deletions custom_components/mail_and_packages/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@
"pt_BR.UTF-8",
"",
]
AMAZON_OTP = "amazon_otp"
AMAZON_OTP_REGEX = "[0-9]{6}"
AMAZON_OTP_SUBJECT = "A one-time password is required for your Amazon delivery"

# Sensor Data
SENSOR_DATA = {
Expand Down Expand Up @@ -721,6 +724,11 @@
icon="mdi:package",
key="amazon_hub",
),
"amazon_otp": SensorEntityDescription(
name="Mail Amazon OTP Code",
icon="mdi:counter",
key="amazon_otp",
),
# Canada Post
"capost_delivered": SensorEntityDescription(
name="Mail Canada Post Delivered",
Expand Down
66 changes: 66 additions & 0 deletions custom_components/mail_and_packages/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
AMAZON_HUB_SUBJECT_SEARCH,
AMAZON_IMG_PATTERN,
AMAZON_ORDER,
AMAZON_OTP,
AMAZON_OTP_REGEX,
AMAZON_OTP_SUBJECT,
AMAZON_PACKAGES,
AMAZON_PATTERN,
AMAZON_SHIPMENT_TRACKING,
Expand Down Expand Up @@ -389,6 +392,8 @@ async def fetch(
info = amazon_exception(account, amazon_fwds)
count[sensor] = info[ATTR_COUNT]
count[AMAZON_EXCEPTION_ORDER] = info[ATTR_ORDER]
elif sensor == AMAZON_OTP:
count[sensor] = amazon_otp(account, amazon_fwds)
elif "_packages" in sensor:
prefix = sensor.replace("_packages", "")
delivering = await fetch(hass, config, account, data, f"{prefix}_delivering")
Expand Down Expand Up @@ -1238,6 +1243,67 @@ def amazon_hub(account: Type[imaplib.IMAP4_SSL], fwds: Optional[str] = None) ->
return info


def amazon_otp(account: Type[imaplib.IMAP4_SSL], fwds: Optional[list] = None) -> dict:
"""Find Amazon exception emails.
Returns dict of sensor data
"""
tfmt = get_formatted_date()
info = {}
body_regex = AMAZON_OTP_REGEX
domains = AMAZON_DOMAINS
if isinstance(fwds, list):
for fwd in fwds:
if fwd and fwd != '""' and fwd not in domains:
domains.append(fwd)
_LOGGER.debug("Amazon email adding %s to list", str(fwd))

_LOGGER.debug("Amazon domains to be checked: %s", str(domains))

for domain in domains:
if "@" in domain:
email_address = domain.strip('"')
_LOGGER.debug("Amazon email search address: %s", str(email_address))
else:
email_address = []
email_address.append(f"{AMAZON_EMAIL}{domain}")
_LOGGER.debug("Amazon email search address: %s", str(email_address))

(server_response, sdata) = email_search(
account, email_address, tfmt, AMAZON_OTP_SUBJECT
)

if server_response == "OK":
id_list = sdata[0].split()
_LOGGER.debug("Found Amazon OTP email(s): %s", str(len(id_list)))
found = []
for i in id_list:
data = email_fetch(account, i, "(RFC822)")[1]
for response_part in data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])

# Get code from message body
try:
email_msg = quopri.decodestring(
str(msg.get_payload(0))
) # msg.get_payload(0).encode('utf-8')
except Exception as err:
_LOGGER.debug(
"Problem decoding email message: %s", str(err)
)
continue
email_msg = email_msg.decode("utf-8", "ignore")
pattern = re.compile(rf"{body_regex}")
search = pattern.search(email_msg)
if search is not None:
if len(search.groups()) > 1:
found.append(search.group(2))

info[ATTR_CODE] = found
return info


def amazon_exception(
account: Type[imaplib.IMAP4_SSL], fwds: Optional[list] = None
) -> dict:
Expand Down

0 comments on commit 732188e

Please sign in to comment.