Skip to content

Commit

Permalink
feat: add Amazon OTP password sensor (#1037)
Browse files Browse the repository at this point in the history
  • Loading branch information
firstof9 authored Dec 19, 2024
1 parent d618f42 commit 9972855
Show file tree
Hide file tree
Showing 5 changed files with 718 additions and 0 deletions.
8 changes: 8 additions & 0 deletions custom_components/mail_and_packages/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@
"fr_CA.UTF-8",
"",
]
AMAZON_OTP = "amazon_otp"
AMAZON_OTP_REGEX = "(\n)(\\d{6})(\n)"
AMAZON_OTP_SUBJECT = "A one-time password is required for your Amazon delivery"

# Sensor Data
SENSOR_DATA = {
Expand Down Expand Up @@ -797,6 +800,11 @@
icon="mdi:package",
key="amazon_hub",
),
"amazon_otp": SensorEntityDescription(
name="Mail Amazon OTP Code",
icon="mdi:counter",
key="amazon_otp",
),
# Canada Post
"capost_delivered": SensorEntityDescription(
name="Mail Canada Post Delivered",
Expand Down
60 changes: 60 additions & 0 deletions custom_components/mail_and_packages/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
AMAZON_HUB_SUBJECT_SEARCH,
AMAZON_IMG_PATTERN,
AMAZON_ORDER,
AMAZON_OTP,
AMAZON_OTP_REGEX,
AMAZON_OTP_SUBJECT,
AMAZON_PACKAGES,
AMAZON_PATTERN,
AMAZON_SHIPMENT_TRACKING,
Expand Down Expand Up @@ -415,6 +418,8 @@ def fetch(
info = amazon_exception(account, amazon_fwds, amazon_domain)
count[sensor] = info[ATTR_COUNT]
count[AMAZON_EXCEPTION_ORDER] = info[ATTR_ORDER]
elif sensor == AMAZON_OTP:
count[sensor] = amazon_otp(account, amazon_fwds)
elif "_packages" in sensor:
prefix = sensor.replace("_packages", "")
delivering = fetch(hass, config, account, data, f"{prefix}_delivering")
Expand Down Expand Up @@ -1310,6 +1315,61 @@ def amazon_hub(account: Type[imaplib.IMAP4_SSL], fwds: Optional[str] = None) ->
return info


def amazon_otp(account: Type[imaplib.IMAP4_SSL], fwds: Optional[list] = None) -> dict:
"""Find Amazon exception emails.
Returns dict of sensor data
"""
tfmt = get_formatted_date()
info = {}
body_regex = AMAZON_OTP_REGEX
email_addresses = []
email_addresses.extend(_process_amazon_forwards(fwds))

for address in email_addresses:

(server_response, sdata) = email_search(
account, address, tfmt, AMAZON_OTP_SUBJECT
)

if server_response == "OK":
id_list = sdata[0].split()
_LOGGER.debug("Found Amazon OTP email(s): %s", str(len(id_list)))
found = []
for i in id_list:
data = email_fetch(account, i, "(RFC822)")[1]
for response_part in data:
if isinstance(response_part, tuple):
msg = email.message_from_bytes(response_part[1])

_LOGGER.debug("Email Multipart: %s", str(msg.is_multipart()))
_LOGGER.debug("Content Type: %s", str(msg.get_content_type()))

# Get code from message body
try:
_LOGGER.debug("Decoding OTP email...")
email_msg = quopri.decodestring(
str(msg.get_payload(0))
) # msg.get_payload(0).encode('utf-8')
except Exception as err:
_LOGGER.debug(
"Problem decoding email message: %s", str(err)
)
continue
email_msg = email_msg.decode("utf-8", "ignore")
pattern = re.compile(rf"{body_regex}")
search = pattern.search(email_msg)
if search is not None:
if len(search.groups()) > 1:
_LOGGER.debug(
"Amazon OTP search results: %s", search.group(2)
)
found.append(search.group(2))

info[ATTR_CODE] = found
return info


def amazon_exception(
account: Type[imaplib.IMAP4_SSL],
fwds: Optional[list] = None,
Expand Down
24 changes: 24 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1708,3 +1708,27 @@ def mock_update_amazon_image():
# value = mock.Mock()
mock_update.return_value = FAKE_UPDATE_DATA_BIN
yield mock_update


@pytest.fixture()
def mock_imap_amazon_otp():
"""Mock imap class values."""
with patch("custom_components.mail_and_packages.helpers.imaplib") as mock_imap:
mock_conn = mock.Mock(autospec=imaplib.IMAP4_SSL)
mock_imap.IMAP4_SSL.return_value = mock_conn

mock_conn.login.return_value = (
"OK",
[b"[email protected] authenticated (Success)"],
)
mock_conn.list.return_value = (
"OK",
[b'(\\HasNoChildren) "/" "INBOX"'],
)
mock_conn.search.return_value = ("OK", [b"1"])
mock_conn.uid.return_value = ("OK", [b"1"])
f = open("tests/test_emails/amazon_otp.eml", "r")
email_file = f.read()
mock_conn.fetch.return_value = ("OK", [(b"", email_file.encode("utf-8"))])
mock_conn.select.return_value = ("OK", [])
yield mock_conn
Loading

0 comments on commit 9972855

Please sign in to comment.