From e8b2d4221d2ac6a43188c3577a4974fe851d3df0 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Mon, 16 Sep 2024 21:50:20 -0400 Subject: [PATCH 1/5] Support for PGP Email Support --- all-plugin-requirements.txt | 3 + apprise/plugins/email.py | 275 ++++++++++++++++++++++++++++++++++-- test/test_plugin_email.py | 76 +++++++++- 3 files changed, 340 insertions(+), 14 deletions(-) diff --git a/all-plugin-requirements.txt b/all-plugin-requirements.txt index b2d0fc560..5d70bb406 100644 --- a/all-plugin-requirements.txt +++ b/all-plugin-requirements.txt @@ -11,3 +11,6 @@ gntp # Provides mqtt:// support # use v1.x due to https://github.com/eclipse/paho.mqtt.python/issues/814 paho-mqtt < 2.0.0 + +# Pretty Good Privacy (PGP) Provides mailto:// and deltachat:// support +PGPy diff --git a/apprise/plugins/email.py b/apprise/plugins/email.py index 2e423916a..89eb18115 100644 --- a/apprise/plugins/email.py +++ b/apprise/plugins/email.py @@ -27,6 +27,7 @@ # POSSIBILITY OF SUCH DAMAGE. import dataclasses +import os import re import smtplib import typing as t @@ -36,19 +37,30 @@ from email.utils import formataddr, make_msgid from email.header import Header from email import charset +import hashlib from socket import error as SocketError from datetime import datetime +from datetime import timedelta from datetime import timezone from .base import NotifyBase from ..url import PrivacyMode -from ..common import NotifyFormat, NotifyType +from ..common import NotifyFormat, NotifyType, PersistentStoreMode from ..conversion import convert_between -from ..utils import is_ipaddr, is_email, parse_emails, is_hostname +from ..utils import is_ipaddr, is_email, parse_emails, is_hostname, parse_bool from ..locale import gettext_lazy as _ from ..logger import logger +try: + import pgpy + # Pretty Good Privacy (PGP) Support enabled + PGP_SUPPORT = True + +except ImportError: + # Pretty Good Privacy (PGP) Support disabled + PGP_SUPPORT = False + # Globally Default encoding mode set to Quoted Printable. charset.add_charset('utf-8', charset.QP, charset.QP, 'utf-8') @@ -439,6 +451,12 @@ class NotifyEmail(NotifyBase): 'type': 'string', 'map_to': 'smtp_host', }, + 'pgp': { + 'name': _('PGP Encryption'), + 'type': 'bool', + 'map_to': 'use_pgp', + 'default': False, + }, 'mode': { 'name': _('Secure Mode'), 'type': 'choice:string', @@ -463,7 +481,7 @@ class NotifyEmail(NotifyBase): def __init__(self, smtp_host=None, from_addr=None, secure_mode=None, targets=None, cc=None, bcc=None, reply_to=None, headers=None, - **kwargs): + use_pgp=None, **kwargs): """ Initialize Email Object @@ -500,6 +518,17 @@ def __init__(self, smtp_host=None, from_addr=None, secure_mode=None, self.smtp_host = \ smtp_host if isinstance(smtp_host, str) else '' + # pgp hash + self.pgp_public_keys = {} + + self.use_pgp = use_pgp if not None \ + else self.template_args['pgp']['default'] + + if self.use_pgp and not PGP_SUPPORT: + self.logger.warning( + 'PGP Support is not available on this installation; ' + 'ask admin to install PGPy') + # Now detect secure mode if secure_mode: self.secure_mode = None \ @@ -831,6 +860,12 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, mixed.attach(app) base = mixed + if self.use_pgp: + # Apply our encryption + encrypted_content = self.pgp_encrypt_message(base.as_string()) + if encrypted_content: + base = MIMEText(encrypted_content, "plain") + # Apply any provided custom headers for k, v in self.headers.items(): base[k] = Header(v, self._get_charset(v)) @@ -901,20 +936,21 @@ def submit(self, messages: t.List[EmailMessage]): message.to_addrs, message.body) - self.logger.info( - f'Sent Email notification to "{message.recipient}".') + self.logger.info('Sent Email to %s', message.recipient) + except (SocketError, smtplib.SMTPException, RuntimeError) as e: self.logger.warning( - f'Sending email to "{message.recipient}" failed. ' - f'Reason: {e}') + 'Sending email to "%s" failed.', message.recipient) + self.logger.debug(f'Socket Exception: {e}') # Mark as failure has_error = True except (SocketError, smtplib.SMTPException, RuntimeError) as e: self.logger.warning( - f'Connection error while submitting email to {self.smtp_host}.' - f' Reason: {e}') + 'Connection error while submitting email to "%s"', + self.smtp_host) + self.logger.debug(f'Socket Exception: {e}') # Mark as failure has_error = True @@ -924,15 +960,224 @@ def submit(self, messages: t.List[EmailMessage]): if socket is not None: # pragma: no branch socket.quit() + # Reduce our dictionary (eliminate expired keys if any) + self.pgp_public_keys = { + key: value for key, value in self.pgp_public_keys.items() + if value['expires'] > datetime.now(timezone.utc)} + return not has_error + def pgp_generate_keys(self, path=None): + """ + Generates a set of keys based on email configured + """ + if path is None: + if self.store.mode == PersistentStoreMode.MEMORY: + # Not possible - no write permissions + return False + + # Set our path + path = self.store.path + + try: + # Create a new RSA key pair with 2048-bit strength + key = pgpy.PGPKey.new( + pgpy.constants.PubKeyAlgorithm.RSAEncryptOrSign, 2048) + + except NameError: + # PGPy not installed + self.logger.debug('PGPy not installed; ignoring PGP file: %s') + return False + + # Prepare our uid + name, email = self.names[self.from_addr[1]], self.from_addr[1] + uid = pgpy.PGPUID.new(name, email=email) + + # Filenames + file_prefix = email.split('@')[0].lower() + pub_path = os.path.join(path, f'{file_prefix}-pub.asc') + prv_path = os.path.join(path, f'{file_prefix}-prv.asc') + + # Add the user ID to the key + key.add_uid(uid, usage={ + pgpy.constants.KeyFlags.Sign, + pgpy.constants.KeyFlags.EncryptCommunications}, + hashes=[pgpy.constants.HashAlgorithm.SHA256], + ciphers=[pgpy.constants.SymmetricKeyAlgorithm.AES256], + compression=[pgpy.constants.CompressionAlgorithm.ZLIB]) + + try: + # Write our keys to disk + with open(pub_path, 'w') as f: + f.write(str(key.pubkey)) + + except OSError as e: + self.logger.warning('Error writing PGP file %s', pub_path) + self.logger.debug(f'I/O Exception: {e}') + + # Cleanup + try: + os.unlink(pub_path) + self.logger.trace('Removed %s', pub_path) + + except OSError: + pass + + try: + + with open(prv_path, 'w') as f: + f.write(str(key)) + + except OSError as e: + self.logger.warning('Error writing PGP file %s', prv_path) + self.logger.debug(f'I/O Exception: {e}') + try: + os.unlink(pub_path) + self.logger.trace('Removed %s', pub_path) + + except OSError: + pass + + try: + os.unlink(prv_path) + self.logger.trace('Removed %s', prv_path) + + except OSError: + pass + + return False + + self.logger.info( + 'Wrote PGP Keys for %s/%s', + os.path.dirname(pub_path), + os.path.basename(pub_path)) + return True + + @property + def pgp_fnames(self): + """ + Returns a list of filenames worth scanning for + """ + fnames = [ + 'pgp-public.asc', + 'pgp-pub.asc', + 'public.asc', + 'pub.asc', + ] + + # Prepare our key files: + email = self.from_addr[1] + + _entry = email.split('@')[0].lower() + if _entry not in fnames: + fnames.insert(0, f'{_entry}-pub.asc') + + # Lowercase email (Highest Priority) + _entry = email.lower() + if _entry not in fnames: + fnames.insert(0, f'{_entry}-pub.asc') + + return fnames + + def pgp_public_key(self, path=None): + """ + Opens a spcified pgp public file and returns the key from it which + is used to encrypt the message + """ + if path is None: + path = next( + (os.path.join(self.store.path, fname) + for fname in self.pgp_fnames + if os.path.isfile(os.path.join(self.store.path, fname))), + None) + + if not path: + if self.pgp_generate_keys(path=self.store.path): + path = next( + (os.path.join(self.store.path, fname) + for fname in self.pgp_fnames + if os.path.isfile( + os.path.join(self.store.path, fname))), None) + + if path: + # We should get a hit now + return self.pgp_public_key(path=path) + + self.logger.warning('No PGP Public Key could be loaded') + return None + + if not isinstance(path, str): + raise AttributeError( + 'Invalid path to PGP Public Key specified: %s: %s', + type(path), str(path)) + + # Persistent storage key: + ps_key = hashlib.sha1( + os.path.abspath(path).encode('utf-8')).hexdigest() + if ps_key in self.pgp_public_keys: + # Take an early exit + return self.pgp_public_keys[ps_key]['public_key'] + + try: + with open(path, 'r') as key_file: + public_key, _ = pgpy.PGPKey.from_blob(key_file.read()) + + except NameError: + # PGPy not installed + self.logger.debug( + 'PGPy not installed; skipping PGP support: %s', path) + return None + + except FileNotFoundError: + # Generate keys + self.logger.debug('PGP Public Key file not found: %s', path) + return None + + except OSError as e: + self.logger.warning('Error accessing PGP Public Key file %s', path) + self.logger.debug(f'I/O Exception: {e}') + return None + + self.store.set(ps_key, public_key, expires=86400) + self.pgp_public_keys[ps_key] = { + 'public_key': public_key, + 'expires': + datetime.now(timezone.utc) + timedelta(seconds=86400) + } + return public_key + + # Encrypt message using the recipient's public key + def pgp_encrypt_message(self, message, path=None): + """ + If provided a path to a pgp-key, content is encrypted + """ + + # Acquire our key + public_key = self.pgp_public_key(path=path) + if not public_key: + # Encryption not possible + return False + + try: + message_object = pgpy.PGPMessage.new(message) + encrypted_message = public_key.encrypt(message_object) + return str(encrypted_message) + + except NameError: + # PGPy not installed + self.logger.debug('PGPy not installed; Skipping PGP encryption') + + return None + def url(self, privacy=False, *args, **kwargs): """ Returns the URL built dynamically based on specified arguments. """ # Define an URL parameters - params = {} + params = { + 'pgp': 'yes' if self.use_pgp else 'no', + } # Append our headers into our parameters params.update({'+{}'.format(k): v for k, v in self.headers.items()}) @@ -1044,7 +1289,7 @@ def url_identifier(self): """ return ( self.secure_protocol if self.secure else self.protocol, - self.user, self.password, self.host, + self.user, self.password, self.host, self.smtp_host, self.port if self.port else SECURE_MODES[self.secure_mode]['default_port'], ) @@ -1053,8 +1298,7 @@ def __len__(self): """ Returns the number of targets associated with this notification """ - targets = len(self.targets) - return targets if targets > 0 else 1 + return len(self.targets) if self.targets else 1 @staticmethod def parse_url(url): @@ -1086,6 +1330,11 @@ def parse_url(url): # value if invalid; we'll attempt to figure this out later on results['host'] = '' + # Get PGP Flag + results['use_pgp'] = \ + parse_bool(results['qsd'].get( + 'pgp', NotifyEmail.template_args['pgp']['default'])) + # The From address is a must; either through the use of templates # from= entry and/or merging the user and hostname together, this # must be calculated or parse_url will fail. diff --git a/test/test_plugin_email.py b/test/test_plugin_email.py index 59b4d3842..6142e9d89 100644 --- a/test/test_plugin_email.py +++ b/test/test_plugin_email.py @@ -29,6 +29,7 @@ import logging import pytest import os +import sys import re from unittest import mock from inspect import cleandoc @@ -40,6 +41,7 @@ from apprise import Apprise from apprise import AttachBase from apprise import AppriseAsset +from apprise import PersistentStoreMode from apprise.config import ConfigBase from apprise import AppriseAttachment from apprise.plugins.email import NotifyEmail @@ -48,7 +50,6 @@ # Disable logging for a cleaner testing output logging.disable(logging.CRITICAL) - # Attachment Directory TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var') @@ -130,6 +131,10 @@ ('mailtos://%20@domain.com?user=admin@mail-domain.com', { 'instance': NotifyEmail, }), + ('mailtos://%20@domain.com?user=admin@mail-domain.com?pgp=yes', { + # Test pgp flag + 'instance': NotifyEmail, + }), ('mailtos://user:pass@nuxref.com:567/l2g@nuxref.com', { 'instance': NotifyEmail, }), @@ -2049,3 +2054,72 @@ def test_plugin_email_by_ipaddr_1113(mock_smtp, mock_smtp_ssl): assert email.smtp_host == '10.0.0.195' assert email.port == 25 assert email.targets == [(False, 'alerts@example.com')] + + +@pytest.mark.skipif('pgpy' not in sys.modules, reason="Requires PGPy") +@mock.patch('smtplib.SMTP_SSL') +@mock.patch('smtplib.SMTP') +def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): + """ + NotifyEmail() PGP Tests + + """ + + # Initialize our email (no from name) + obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes') + + # Test our names + fnames = obj.pgp_fnames + assert isinstance(fnames, list) + + # login is pgp + obj = Apprise.instantiate('mailto://pgp:pass@nuxref.com?pgp=yes') + + # Test our names + fnames = obj.pgp_fnames + assert isinstance(fnames, list) + + # login is pgp + obj = Apprise.instantiate('mailto://chris:pass@nuxref.com?pgp=yes') + fnames = obj.pgp_fnames + assert isinstance(fnames, list) + + # Attempt to generate keys + obj = Apprise.instantiate('mailto://chris:pass@nuxref.com?pgp=yes') + # We're in memory mode + assert obj.store.mode == PersistentStoreMode.MEMORY + assert obj.pgp_generate_keys() is False + tmpdir1 = tmpdir.mkdir('tmp01') + # However explicitly setting a path works + assert obj.pgp_generate_keys(str(tmpdir1)) is True + + tmpdir2 = tmpdir.mkdir('tmp02') + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir2), + ) + obj = Apprise.instantiate( + 'mailto://chris:pass@nuxref.com?pgp=yes', asset=asset) + + assert obj.store.mode == PersistentStoreMode.FLUSH + assert obj.pgp_generate_keys() is True + + # We do this again but even when we do a requisition for a public key + # it will generate a new pair or keys for us once it detects we don't + # have any + tmpdir3 = tmpdir.mkdir('tmp03') + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir3), + ) + obj = Apprise.instantiate( + 'mailto://chris:pass@nuxref.com?pgp=yes', asset=asset) + + assert obj.store.mode == PersistentStoreMode.FLUSH + + # We'll have a public key object to encrypt with + assert obj.pgp_public_key() is not None + + encrypted = obj.pgp_encrypt_message("hello world") + assert encrypted.startswith('-----BEGIN PGP MESSAGE-----') + assert encrypted.rstrip().endswith('-----END PGP MESSAGE-----') From 1a9d1dddd5cc64b8ebcadd952ddac9945d263e44 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Tue, 17 Sep 2024 21:32:10 -0400 Subject: [PATCH 2/5] some refactoring --- apprise/plugins/email.py | 76 +++++++++++++++++++++++++++++------- test/test_plugin_email.py | 49 +++++++++++++++-------- test/var/pgp/corrupt-pub.asc | 18 +++++++++ test/var/pgp/valid-prv.asc | 31 +++++++++++++++ test/var/pgp/valid-pub.asc | 18 +++++++++ 5 files changed, 160 insertions(+), 32 deletions(-) create mode 100644 test/var/pgp/corrupt-pub.asc create mode 100644 test/var/pgp/valid-prv.asc create mode 100644 test/var/pgp/valid-pub.asc diff --git a/apprise/plugins/email.py b/apprise/plugins/email.py index 89eb18115..7b8af50ca 100644 --- a/apprise/plugins/email.py +++ b/apprise/plugins/email.py @@ -44,6 +44,7 @@ from datetime import timedelta from datetime import timezone +from ..apprise_attachment import AppriseAttachment from .base import NotifyBase from ..url import PrivacyMode from ..common import NotifyFormat, NotifyType, PersistentStoreMode @@ -367,6 +368,10 @@ class NotifyEmail(NotifyBase): # Support attachments attachment_support = True + # There is no reason a PGP Public Key should exceed 8K in size + # If it is more than this, then it is not accepted + max_pgp_public_key_size = 8000 + # Default Notify Format notify_format = NotifyFormat.HTML @@ -469,6 +474,14 @@ class NotifyEmail(NotifyBase): 'type': 'list:string', 'map_to': 'reply_to', }, + 'pgpkey': { + 'name': _('PGP Public Key Path'), + 'type': 'string', + 'private': True, + # By default persistent storage is referenced + 'default': '', + 'map_to': 'pgp_key', + }, }) # Define any kwargs we're using @@ -481,7 +494,7 @@ class NotifyEmail(NotifyBase): def __init__(self, smtp_host=None, from_addr=None, secure_mode=None, targets=None, cc=None, bcc=None, reply_to=None, headers=None, - use_pgp=None, **kwargs): + use_pgp=None, pgp_key=None, **kwargs): """ Initialize Email Object @@ -529,6 +542,18 @@ def __init__(self, smtp_host=None, from_addr=None, secure_mode=None, 'PGP Support is not available on this installation; ' 'ask admin to install PGPy') + # Our template object is just an AppriseAttachment object + if pgp_key: + self.pgp_key = AppriseAttachment(asset=self.asset) + # Add our definition to our pgp_key reference + self.pgp_key.add(pgp_key) + # Enforce maximum file size + self.pgp_key[0].max_file_size = self.max_pgp_public_key_size + + else: + # No key; use auto-generation + self.pgp_key = None + # Now detect secure mode if secure_mode: self.secure_mode = None \ @@ -861,6 +886,7 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, base = mixed if self.use_pgp: + self.logger.debug("Securing email with PGP Encryption") # Apply our encryption encrypted_content = self.pgp_encrypt_message(base.as_string()) if encrypted_content: @@ -1054,10 +1080,26 @@ def pgp_generate_keys(self, path=None): return True @property - def pgp_fnames(self): + def pgp_pubkey(self): """ Returns a list of filenames worth scanning for """ + if self.pgp_key is not None: + # If our code reaches here, then we fetch our public key + pgp_key = self.pgp_key[0] + if not pgp_key: + # We could not access the attachment + self.logger.error( + 'Could not access PGP Public Key {}.'.format( + pgp_key.url(privacy=True))) + return False + + return pgp_key.path + + elif not self.store.path: + # No path + return None + fnames = [ 'pgp-public.asc', 'pgp-pub.asc', @@ -1077,7 +1119,11 @@ def pgp_fnames(self): if _entry not in fnames: fnames.insert(0, f'{_entry}-pub.asc') - return fnames + return next( + (os.path.join(self.store.path, fname) + for fname in fnames + if os.path.isfile(os.path.join(self.store.path, fname))), + None) def pgp_public_key(self, path=None): """ @@ -1085,20 +1131,11 @@ def pgp_public_key(self, path=None): is used to encrypt the message """ if path is None: - path = next( - (os.path.join(self.store.path, fname) - for fname in self.pgp_fnames - if os.path.isfile(os.path.join(self.store.path, fname))), - None) + path = self.pgp_pubkey if not path: if self.pgp_generate_keys(path=self.store.path): - path = next( - (os.path.join(self.store.path, fname) - for fname in self.pgp_fnames - if os.path.isfile( - os.path.join(self.store.path, fname))), None) - + path = self.pgp_pubkey if path: # We should get a hit now return self.pgp_public_key(path=path) @@ -1138,7 +1175,6 @@ def pgp_public_key(self, path=None): self.logger.debug(f'I/O Exception: {e}') return None - self.store.set(ps_key, public_key, expires=86400) self.pgp_public_keys[ps_key] = { 'public_key': public_key, 'expires': @@ -1179,6 +1215,11 @@ def url(self, privacy=False, *args, **kwargs): 'pgp': 'yes' if self.use_pgp else 'no', } + # Store oure public key back into your URL + if self.pgp_key is not None: + params['pgp_key'] = NotifyEmail.quote( + self.pgp_key[0].url(privacy=privacy), safe=':') + # Append our headers into our parameters params.update({'+{}'.format(k): v for k, v in self.headers.items()}) @@ -1335,6 +1376,11 @@ def parse_url(url): parse_bool(results['qsd'].get( 'pgp', NotifyEmail.template_args['pgp']['default'])) + # Get PGP Public Key Override + if 'pgpkey' in results['qsd'] and results['qsd']['pgpkey']: + results['pgp_key'] = \ + NotifyEmail.unquote(results['qsd']['pgpkey']) + # The From address is a must; either through the use of templates # from= entry and/or merging the user and hostname together, this # must be calculated or parse_url will fail. diff --git a/test/test_plugin_email.py b/test/test_plugin_email.py index 6142e9d89..e520614b7 100644 --- a/test/test_plugin_email.py +++ b/test/test_plugin_email.py @@ -2068,27 +2068,42 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): # Initialize our email (no from name) obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes') - # Test our names - fnames = obj.pgp_fnames - assert isinstance(fnames, list) + # Nothing to lookup + assert obj.pgp_pubkey is None + assert obj.pgp_public_key() is None + assert obj.pgp_encrypt_message("message") is False + # Keys can not be generated in memory mode + assert obj.pgp_generate_keys() is False - # login is pgp - obj = Apprise.instantiate('mailto://pgp:pass@nuxref.com?pgp=yes') + # The reason... no location to store data + assert obj.store.mode == PersistentStoreMode.MEMORY - # Test our names - fnames = obj.pgp_fnames - assert isinstance(fnames, list) + tmpdir0 = tmpdir.mkdir('tmp00') + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir0), + ) - # login is pgp - obj = Apprise.instantiate('mailto://chris:pass@nuxref.com?pgp=yes') - fnames = obj.pgp_fnames - assert isinstance(fnames, list) + # Prepare PGP + obj = Apprise.instantiate( + 'mailto://pgp:pass@nuxref.com?pgp=yes', asset=asset) + assert obj.store.mode == PersistentStoreMode.FLUSH + + # Still no public key + assert obj.pgp_pubkey is None + + assert obj.pgp_generate_keys() is True + # Now we'll have a public key + assert isinstance(obj.pgp_pubkey, str) + + # Prepare PGP + obj = Apprise.instantiate( + f'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey={obj.pgp_pubkey}', + asset=asset) + + # We will find our key + assert obj.pgp_public_key() is not None - # Attempt to generate keys - obj = Apprise.instantiate('mailto://chris:pass@nuxref.com?pgp=yes') - # We're in memory mode - assert obj.store.mode == PersistentStoreMode.MEMORY - assert obj.pgp_generate_keys() is False tmpdir1 = tmpdir.mkdir('tmp01') # However explicitly setting a path works assert obj.pgp_generate_keys(str(tmpdir1)) is True diff --git a/test/var/pgp/corrupt-pub.asc b/test/var/pgp/corrupt-pub.asc new file mode 100644 index 000000000..26a219021 --- /dev/null +++ b/test/var/pgp/corrupt-pub.asc @@ -0,0 +1,18 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +xsBNBGbo3ycBCACjECe63GpZanFYLE678OIhzpvY0J6pCBCZblGxXOuwuv7VPIq7 +XQN91UdOL8huDb/JYRxarDoS6+JvyempbzZt0S+veXDl4pRFb+Q4a5sp1l/Mduw0 +rDFErwfF8SpMPBI2WJUhN9n72UEqXVDvTPdcAka8v8p7dS6/RKZzch8P+EjgJME0 +p9+N/2Lrbi2nDDXD+xD4Odw83J5V8Xn/jie3GCxtXda5XIX3EzTSB30elLLNBMcq +ooooooooooooooooooooooooooooo6bRS9TT34rZZk7qyB2iroq9SdIBCGyn1q4r +uIskVNCsqgP9cMa+S1XePUT77VNNN1yzhACpABEBAAHNIUNocmlzIENhcm9uIDxs +ZWFkMmdvbGRAZ21haWwuooooooooooooooooooooooooooooogILCQIVCAIWAgIe +ARYhBEHHWtq4Kh8dGraFnkmZAD9B29oPAAoJEEmZAD9B29oPAawIAImCijTdvDl8 +Sibwo7gL4ooooooooooooooooooooooooooooofjiEEW8gVQ4W2KDs74aCGkQtQJ +irvNA7WnuyMyXZyvhYa63U7GTk5RdVkMygT0a5n8/8HVAenZrBL6VNaZYw/LlgWd +0knhsmqdGTsjKuYdZ3Cooooooooooooooooooooooooooooo2GWBnvOQje+lQGIf +rE6TIwsf4QoKXSkTakzggbpZZl2hg2O6dJiij1cH+DYFVTaVXw4rVmo8ckTJ9DiF +T9H/EmsNqlSKTTv1Aw4raCFZ+T/Ocsw/vIOoEtVhiT/mfDcIbi0VB3EhYvI3eFso +yiZsjyu9xY0= +=ZY2q +-----END PGP PUBLIC KEY BLOCK----- diff --git a/test/var/pgp/valid-prv.asc b/test/var/pgp/valid-prv.asc new file mode 100644 index 000000000..c98a8e311 --- /dev/null +++ b/test/var/pgp/valid-prv.asc @@ -0,0 +1,31 @@ +-----BEGIN PGP PRIVATE KEY BLOCK----- + +xcLYBGbo3ycBCACjECe63GpZanFYLE678OIhzpvY0J6pCBCZblGxXOuwuv7VPIq7 +XQN91UdOL8huDb/JYRxarDoS6+JvyempbzZt0S+veXDl4pRFb+Q4a5sp1l/Mduw0 +rDFErwfF8SpMPBI2WJUhN9n72UEqXVDvTPdcAka8v8p7dS6/RKZzch8P+EjgJME0 +p9+N/2Lrbi2nDDXD+xD4Odw83J5V8Xn/jie3GCxtXda5XIX3EzTSB30elLLNBMcq +N5xJBTrjhciDzU85Gb+bUecnoj9Oj6bRS9TT34rZZk7qyB2iroq9SdIBCGyn1q4r +uIskVNCsqgP9cMa+S1XePUT77VNNN1yzhACpABEBAAEAB/4tOpPqjqyo9I9Px6pn +Et+GRQqRTvxTIjuIc0MRkRaGxLdeahaI9bm8M2Y9158ed43Uy6zTsaXCDc+W9khr +iL9uInG5mFOqT/iUcf65b49wQVf9HJdT3Ncll+7uBoCW+KqMjHGA7z71TkN2/r8u +QQjzamY4gHInYE+BGgeZSfQ3t1MJMSdjQopPGDwSsco5hQJYtVH8K/0/Ig5S5E9Q +KD9ku3W9bCliERkljIwEbbyDv/vmbxPKdWW83T+UQK6CQhkH6h69EoMGQ76a6y/H +UuppNSpxuR4BiX4ZlcUyARrLluaRS0K1/OZCoScA0LLjY8pCEBoWT0uhhvy3t2xD +/bipBADS3bM3yGGZKtNgLPQx0BAyWk07OD3AlObykz4yTIc9DZj1bzhHKgAhwUAN +k7StwA22HoxMCKSoxhherZaXAQaJJOJKNXw3DphHCexrBq77nxBu3yo9UStj04Lx +tCEibclsQcwgh7TjjjDQdRYiirZvu9IGQBf27xKvTepibn7NnwQAxfcePfXsHza3 +7CuJbxOGFaPf4ENSpFRYSZbH3dErtSlGDzz8e8jI0Ck9LQgp9MfjksWUwaMQbXdV +zNbQe1lAWQxtN9amVvEWvrAJhbhEU6RLsSjpZ9W5r3xAbfkoDg/icjbdoOqwI6LE +aTEhwaz+XZMLYJiT22AMJyC7TcL6fLcD/0nBRheQBqTsuYKimKI5yZ3ZdlGHfaLN +OqMGfuaEQCUAhSaXNliuP3XAWfiVXCaRw9De+Eod6DfGMGTTx8EVy7N4y4w3TORp +fFKaMGD3oiw1Eh63K1jV2yWPPpOnyc+YtXCPuGS+n/3CITc5cKxaPapQtCJA9Gw0 +OaZ7ikUNs0Q9NbfNIUNocmlzIENhcm9uIDxsZWFkMmdvbGRAZ21haWwuY29tPsLA +ggQTAQgALAUCZujfJwIbBgILCQIVCAIWAgIeARYhBEHHWtq4Kh8dGraFnkmZAD9B +29oPAAoJEEmZAD9B29oPAawIAImCijTdvDl8Sibwo7gL4ayF4S3KhaKCYORcMM1o +e4pesy5ME6fjiEEW8gVQ4W2KDs74aCGkQtQJirvNA7WnuyMyXZyvhYa63U7GTk5R +dVkMygT0a5n8/8HVAenZrBL6VNaZYw/LlgWd0knhsmqdGTsjKuYdZ3CHED85pv/M +Owe0pyGOQKtJ1t9qwc6l2GWBnvOQje+lQGIfrE6TIwsf4QoKXSkTakzggbpZZl2h +g2O6dJiij1cH+DYFVTaVXw4rVmo8ckTJ9DiFT9H/EmsNqlSKTTv1Aw4raCFZ+T/O +csw/vIOoEtVhiT/mfDcIbi0VB3EhYvI3eFsoyiZsjyu9xY0= +=dBp6 +-----END PGP PRIVATE KEY BLOCK----- diff --git a/test/var/pgp/valid-pub.asc b/test/var/pgp/valid-pub.asc new file mode 100644 index 000000000..dd604e843 --- /dev/null +++ b/test/var/pgp/valid-pub.asc @@ -0,0 +1,18 @@ +-----BEGIN PGP PUBLIC KEY BLOCK----- + +xsBNBGbo3ycBCACjECe63GpZanFYLE678OIhzpvY0J6pCBCZblGxXOuwuv7VPIq7 +XQN91UdOL8huDb/JYRxarDoS6+JvyempbzZt0S+veXDl4pRFb+Q4a5sp1l/Mduw0 +rDFErwfF8SpMPBI2WJUhN9n72UEqXVDvTPdcAka8v8p7dS6/RKZzch8P+EjgJME0 +p9+N/2Lrbi2nDDXD+xD4Odw83J5V8Xn/jie3GCxtXda5XIX3EzTSB30elLLNBMcq +N5xJBTrjhciDzU85Gb+bUecnoj9Oj6bRS9TT34rZZk7qyB2iroq9SdIBCGyn1q4r +uIskVNCsqgP9cMa+S1XePUT77VNNN1yzhACpABEBAAHNIUNocmlzIENhcm9uIDxs +ZWFkMmdvbGRAZ21haWwuY29tPsLAggQTAQgALAUCZujfJwIbBgILCQIVCAIWAgIe +ARYhBEHHWtq4Kh8dGraFnkmZAD9B29oPAAoJEEmZAD9B29oPAawIAImCijTdvDl8 +Sibwo7gL4ayF4S3KhaKCYORcMM1oe4pesy5ME6fjiEEW8gVQ4W2KDs74aCGkQtQJ +irvNA7WnuyMyXZyvhYa63U7GTk5RdVkMygT0a5n8/8HVAenZrBL6VNaZYw/LlgWd +0knhsmqdGTsjKuYdZ3CHED85pv/MOwe0pyGOQKtJ1t9qwc6l2GWBnvOQje+lQGIf +rE6TIwsf4QoKXSkTakzggbpZZl2hg2O6dJiij1cH+DYFVTaVXw4rVmo8ckTJ9DiF +T9H/EmsNqlSKTTv1Aw4raCFZ+T/Ocsw/vIOoEtVhiT/mfDcIbi0VB3EhYvI3eFso +yiZsjyu9xY0= +=ZY2q +-----END PGP PUBLIC KEY BLOCK----- From a9ed27ee029d69ca4877be0114645704f2e0997a Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Mon, 30 Sep 2024 22:51:53 -0400 Subject: [PATCH 3/5] PGP Encryption working --- apprise/plugins/email.py | 68 ++++++++++++++++++++++++++++++--------- test/test_plugin_email.py | 8 ++--- 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/apprise/plugins/email.py b/apprise/plugins/email.py index 7b8af50ca..b1fbbd5c0 100644 --- a/apprise/plugins/email.py +++ b/apprise/plugins/email.py @@ -34,6 +34,7 @@ from email.mime.text import MIMEText from email.mime.application import MIMEApplication from email.mime.multipart import MIMEMultipart +from email.mime.base import MIMEBase from email.utils import formataddr, make_msgid from email.header import Header from email import charset @@ -787,6 +788,10 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, 'There are no Email recipients to notify') return False + elif self.use_pgp and not PGP_SUPPORT: + self.logger.warning('PGP Support unavailable') + return False + messages: t.List[EmailMessage] = [] # Create a copy of the targets list @@ -887,10 +892,33 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, if self.use_pgp: self.logger.debug("Securing email with PGP Encryption") + # Set our header information to include in the encryption + base['From'] = formataddr( + (None, self.from_addr[1]), charset='utf-8') + base['To'] = formataddr((None, to_addr), charset='utf-8') + base['Subject'] = Header(title, self._get_charset(title)) + # Apply our encryption encrypted_content = self.pgp_encrypt_message(base.as_string()) if encrypted_content: - base = MIMEText(encrypted_content, "plain") + # prepare our messsage + base = MIMEMultipart( + "encrypted", protocol="application/pgp-encrypted") + + # Store Autocrypt header (DeltaChat Support) + base.add_header( + "Autocrypt", + "addr=%s; prefer-encrypt=mutual" % formataddr( + (False, to_addr), charset='utf-8')) + + # Set Encryption Info Part + enc_payload = MIMEText("Version: 1", "plain") + enc_payload.set_type("application/pgp-encrypted") + base.attach(enc_payload) + + enc_payload = MIMEBase("application", "octet-stream") + enc_payload.set_payload(encrypted_content) + base.attach(enc_payload) # Apply any provided custom headers for k, v in self.headers.items(): @@ -984,7 +1012,13 @@ def submit(self, messages: t.List[EmailMessage]): finally: # Gracefully terminate the connection with the server if socket is not None: # pragma: no branch - socket.quit() + try: + socket.quit() + + except (SocketError, smtplib.SMTPException): + # No need to make this a bigger issue as we were exiting + # anyway + pass # Reduce our dictionary (eliminate expired keys if any) self.pgp_public_keys = { @@ -1079,8 +1113,7 @@ def pgp_generate_keys(self, path=None): os.path.basename(pub_path)) return True - @property - def pgp_pubkey(self): + def pgp_pubkey(self, email=None): """ Returns a list of filenames worth scanning for """ @@ -1107,17 +1140,22 @@ def pgp_pubkey(self): 'pub.asc', ] + emails = [] + if email: + emails.append(email) + # Prepare our key files: - email = self.from_addr[1] + emails.append(self.from_addr[1]) - _entry = email.split('@')[0].lower() - if _entry not in fnames: - fnames.insert(0, f'{_entry}-pub.asc') + for email in emails: + _entry = email.split('@')[0].lower() + if _entry not in fnames: + fnames.insert(0, f'{_entry}-pub.asc') - # Lowercase email (Highest Priority) - _entry = email.lower() - if _entry not in fnames: - fnames.insert(0, f'{_entry}-pub.asc') + # Lowercase email (Highest Priority) + _entry = email.lower() + if _entry not in fnames: + fnames.insert(0, f'{_entry}-pub.asc') return next( (os.path.join(self.store.path, fname) @@ -1125,17 +1163,17 @@ def pgp_pubkey(self): if os.path.isfile(os.path.join(self.store.path, fname))), None) - def pgp_public_key(self, path=None): + def pgp_public_key(self, path=None, email=None): """ Opens a spcified pgp public file and returns the key from it which is used to encrypt the message """ if path is None: - path = self.pgp_pubkey + path = self.pgp_pubkey(email=email) if not path: if self.pgp_generate_keys(path=self.store.path): - path = self.pgp_pubkey + path = self.pgp_pubkey(email=email) if path: # We should get a hit now return self.pgp_public_key(path=path) diff --git a/test/test_plugin_email.py b/test/test_plugin_email.py index e520614b7..fce5eb985 100644 --- a/test/test_plugin_email.py +++ b/test/test_plugin_email.py @@ -2069,7 +2069,7 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes') # Nothing to lookup - assert obj.pgp_pubkey is None + assert obj.pgp_pubkey() is None assert obj.pgp_public_key() is None assert obj.pgp_encrypt_message("message") is False # Keys can not be generated in memory mode @@ -2090,15 +2090,15 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): assert obj.store.mode == PersistentStoreMode.FLUSH # Still no public key - assert obj.pgp_pubkey is None + assert obj.pgp_pubkey() is None assert obj.pgp_generate_keys() is True # Now we'll have a public key - assert isinstance(obj.pgp_pubkey, str) + assert isinstance(obj.pgp_pubkey(), str) # Prepare PGP obj = Apprise.instantiate( - f'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey={obj.pgp_pubkey}', + 'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey=%s' % obj.pgp_pubkey(), asset=asset) # We will find our key From b8a3b6edfe4d41a40247a6102f8d0ae1783c5829 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Wed, 2 Oct 2024 22:21:49 -0400 Subject: [PATCH 4/5] more test coverage --- apprise/plugins/email.py | 27 +-- test/test_plugin_email.py | 392 ++++++++++++++++++++++++++------------ 2 files changed, 278 insertions(+), 141 deletions(-) diff --git a/apprise/plugins/email.py b/apprise/plugins/email.py index b1fbbd5c0..09e4c2a81 100644 --- a/apprise/plugins/email.py +++ b/apprise/plugins/email.py @@ -1011,14 +1011,8 @@ def submit(self, messages: t.List[EmailMessage]): finally: # Gracefully terminate the connection with the server - if socket is not None: # pragma: no branch - try: - socket.quit() - - except (SocketError, smtplib.SMTPException): - # No need to make this a bigger issue as we were exiting - # anyway - pass + if socket is not None: + socket.quit() # Reduce our dictionary (eliminate expired keys if any) self.pgp_public_keys = { @@ -1058,6 +1052,12 @@ def pgp_generate_keys(self, path=None): pub_path = os.path.join(path, f'{file_prefix}-pub.asc') prv_path = os.path.join(path, f'{file_prefix}-prv.asc') + if os.path.isfile(pub_path): + self.logger.warning( + 'PGP generation aborted; Public key already exists: %s', + pub_path) + return True + # Add the user ID to the key key.add_uid(uid, usage={ pgpy.constants.KeyFlags.Sign, @@ -1140,13 +1140,11 @@ def pgp_pubkey(self, email=None): 'pub.asc', ] - emails = [] + # Prepare our key files + emails = [self.from_addr[1]] if email: emails.append(email) - # Prepare our key files: - emails.append(self.from_addr[1]) - for email in emails: _entry = email.split('@')[0].lower() if _entry not in fnames: @@ -1181,11 +1179,6 @@ def pgp_public_key(self, path=None, email=None): self.logger.warning('No PGP Public Key could be loaded') return None - if not isinstance(path, str): - raise AttributeError( - 'Invalid path to PGP Public Key specified: %s: %s', - type(path), str(path)) - # Persistent storage key: ps_key = hashlib.sha1( os.path.abspath(path).encode('utf-8')).hexdigest() diff --git a/test/test_plugin_email.py b/test/test_plugin_email.py index fce5eb985..3c32f2e6b 100644 --- a/test/test_plugin_email.py +++ b/test/test_plugin_email.py @@ -29,6 +29,7 @@ import logging import pytest import os +import shutil import sys import re from unittest import mock @@ -44,8 +45,7 @@ from apprise import PersistentStoreMode from apprise.config import ConfigBase from apprise import AppriseAttachment -from apprise.plugins.email import NotifyEmail -from apprise.plugins import email as NotifyEmailModule +from apprise.plugins import email # Disable logging for a cleaner testing output logging.disable(logging.CRITICAL) @@ -74,137 +74,137 @@ # Pre-Configured Email Services ('mailto://user:pass@gmail.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@hotmail.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@live.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@prontomail.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@yahoo.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@yahoo.ca', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@fastmail.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@sendgrid.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), # Yandex ('mailto://user:pass@yandex.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@yandex.ru', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@yandex.fr', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), # Custom Emails ('mailtos://user:pass@nuxref.com:567', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@nuxref.com?mode=ssl', { # mailto:// with mode=ssl causes us to convert to ssl - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, # Our expected url(privacy=True) startswith() response: 'privacy_url': 'mailtos://user:****@nuxref.com', }), ('mailto://user:pass@nuxref.com:567?format=html', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailtos://user:pass@nuxref.com:567?to=l2g@nuxref.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailtos://user:pass@domain.com?user=admin@mail-domain.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailtos://%20@domain.com?user=admin@mail-domain.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailtos://%20@domain.com?user=admin@mail-domain.com?pgp=yes', { # Test pgp flag - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailtos://user:pass@nuxref.com:567/l2g@nuxref.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ( 'mailto://user:pass@example.com:2525?user=l2g@example.com' '&pass=l2g@apprise!is!Awesome', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }, ), ( 'mailto://user:pass@example.com:2525?user=l2g@example.com' '&pass=l2g@apprise!is!Awesome&format=text', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }, ), ( # Test Carbon Copy 'mailtos://user:pass@example.com?smtp=smtp.example.com' '&name=l2g&cc=noreply@example.com,test@example.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }, ), ( # Test Blind Carbon Copy 'mailtos://user:pass@example.com?smtp=smtp.example.com' '&name=l2g&bcc=noreply@example.com,test@example.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }, ), ( # Test Carbon Copy with bad email 'mailtos://user:pass@example.com?smtp=smtp.example.com' '&name=l2g&cc=noreply@example.com,@', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }, ), ( # Test Blind Carbon Copy with bad email 'mailtos://user:pass@example.com?smtp=smtp.example.com' '&name=l2g&bcc=noreply@example.com,@', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }, ), ( # Test Reply To 'mailtos://user:pass@example.com?smtp=smtp.example.com' '&name=l2g&reply=test@example.com,test2@example.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }, ), ( # Test Reply To with bad email 'mailtos://user:pass@example.com?smtp=smtp.example.com' '&name=l2g&reply=test@example.com,@', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }, ), # headers ('mailto://user:pass@localhost.localdomain' '?+X-Customer-Campaign-ID=Apprise', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), # No Password ('mailtos://user:@nuxref.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), # Invalid From Address; but just gets put as the from name instead # Hence the below generats From: "@ " ('mailtos://user:pass@nuxref.com?from=@', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), # Invalid From Address ('mailtos://nuxref.com?user=&pass=.', { @@ -213,7 +213,7 @@ # Invalid To Address is accepted, but we won't be able to properly email # using the notify() call ('mailtos://user:pass@nuxref.com?to=@', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, 'response': False, }), # Valid URL, but can't structure a proper email @@ -230,22 +230,22 @@ }), # STARTTLS flag checking ('mailtos://user:pass@gmail.com?mode=starttls', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, # Our expected url(privacy=True) startswith() response: 'privacy_url': 'mailtos://user:****@gmail.com', }), # SSL flag checking ('mailtos://user:pass@gmail.com?mode=ssl', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), # Can make a To address using what we have (l2g@nuxref.com) ('mailtos://nuxref.com?user=l2g&pass=.', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, # Our expected url(privacy=True) startswith() response: 'privacy_url': 'mailtos://l2g:****@nuxref.com', }), ('mailto://user:pass@localhost:2525', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, # Throws a series of connection and transfer exceptions when this flag # is set and tests that we gracfully handle them 'test_smtplib_exceptions': True, @@ -253,25 +253,25 @@ # Use of both 'name' and 'from' together; these are synonymous ('mailtos://user:pass@nuxref.com?' 'from=jack@gmail.com&name=Jason', { - 'instance': NotifyEmail}), + 'instance': email.NotifyEmail}), # Test no auth at all ('mailto://localhost?from=test@example.com&to=test@example.com', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, 'privacy_url': 'mailto://localhost', }), # Test multi-emails where some are bad ('mailto://user:pass@localhost/test@example.com/test2@/$@!/', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, 'privacy_url': 'mailto://user:****@localhost/' }), ('mailto://user:pass@localhost/?bcc=test2@,$@!/', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@localhost/?cc=test2@,$@!/', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ('mailto://user:pass@localhost/?reply=test2@,$@!/', { - 'instance': NotifyEmail, + 'instance': email.NotifyEmail, }), ) @@ -466,7 +466,7 @@ def test_plugin_email_webbase_lookup(mock_smtp, mock_smtpssl): """ # Insert a test email at the head of our table - NotifyEmailModule.EMAIL_TEMPLATES = ( + email.EMAIL_TEMPLATES = ( ( # Testing URL 'Testing Lookup', @@ -475,15 +475,15 @@ def test_plugin_email_webbase_lookup(mock_smtp, mock_smtpssl): 'port': 123, 'smtp_host': 'smtp.l2g.com', 'secure': True, - 'login_type': (NotifyEmailModule.WebBaseLogin.USERID, ) + 'login_type': (email.WebBaseLogin.USERID, ) }, ), - ) + NotifyEmailModule.EMAIL_TEMPLATES + ) + email.EMAIL_TEMPLATES obj = Apprise.instantiate( 'mailto://user:pass@l2g.com', suppress_exceptions=True) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert len(obj.targets) == 1 assert (False, 'user@l2g.com') in obj.targets assert obj.from_addr[0] == obj.app_id @@ -510,7 +510,7 @@ def test_plugin_email_smtplib_init_fail(mock_smtplib): obj = Apprise.instantiate( 'mailto://user:pass@gmail.com', suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # Support Exception handling of smtplib.SMTP mock_smtplib.side_effect = RuntimeError('Test') @@ -534,7 +534,7 @@ def test_plugin_email_smtplib_send_okay(mock_smtplib): # Defaults to HTML obj = Apprise.instantiate( 'mailto://user:pass@gmail.com', suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # Support an email simulation where we can correctly quit mock_smtplib.starttls.return_value = True @@ -548,7 +548,7 @@ def test_plugin_email_smtplib_send_okay(mock_smtplib): # Set Text obj = Apprise.instantiate( 'mailto://user:pass@gmail.com?format=text', suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.notify( body='body', title='test', notify_type=NotifyType.INFO) is True @@ -602,7 +602,7 @@ def test_plugin_email_smtplib_send_multiple_recipients(mock_smtplib): 'mailto://user:pass@mail.example.org?' 'to=foo@example.net,bar@example.com&' 'cc=baz@example.org&bcc=qux@example.org', suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.notify( body='body', title='test', notify_type=NotifyType.INFO) is True @@ -650,7 +650,7 @@ def test_plugin_email_smtplib_internationalization(mock_smtp): obj = Apprise.instantiate( 'mailto://user:pass@gmail.com?name=Например%20так', suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) class SMTPMock: def sendmail(self, *args, **kwargs): @@ -714,7 +714,7 @@ def test_plugin_email_url_escaping(): # So the above translates to ' %20' (a space in front of %20). We want # to verify the handling of the password escaping and when it happens. # a very bad response would be ' ' (double space) - obj = NotifyEmail.parse_url( + obj = email.NotifyEmail.parse_url( 'mailto://user:{}@gmail.com?format=text'.format(passwd)) assert isinstance(obj, dict) @@ -727,7 +727,7 @@ def test_plugin_email_url_escaping(): obj = Apprise.instantiate( 'mailto://user:{}@gmail.com?format=text'.format(passwd), suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # The password is escaped only 'once' assert obj.password == ' %20' @@ -745,7 +745,7 @@ def test_plugin_email_url_variations(): user='apprise%40example21.ca', passwd='abcd123'), suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.password == 'abcd123' assert obj.user == 'apprise@example21.ca' @@ -768,7 +768,7 @@ def test_plugin_email_url_variations(): user='apprise%40example21.ca', passwd='abcd123'), suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.password == 'abcd123' assert obj.user == 'apprise@example21.ca' @@ -790,7 +790,7 @@ def test_plugin_email_url_variations(): user='apprise%40example21.ca', passwd='abcd123'), suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.password == 'abcd123' assert obj.user == 'apprise@example21.ca' @@ -820,7 +820,7 @@ def test_plugin_email_url_variations(): user='apprise%40example21.ca', passwd='abcd123'), suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.password == 'abcd123' assert obj.user == 'apprise@example21.ca' @@ -854,7 +854,7 @@ def test_plugin_email_url_variations(): that='to@example.jp', smtp_host='smtp.example.edu'), suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.password == 'abcd123' assert obj.user == 'apprise@example21.ca' @@ -876,7 +876,7 @@ def test_plugin_email_url_variations(): obj = Apprise.instantiate( 'mailto://user:pass@domain.com{}'.format(toaddr)) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.password == 'pass' assert obj.user == 'user' assert obj.host == 'domain.com' @@ -899,7 +899,7 @@ def test_plugin_email_dict_variations(): 'user': 'apprise@example.com', 'password': 'abd123', 'host': 'example.com'}, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) @mock.patch('smtplib.SMTP_SSL') @@ -917,7 +917,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # Test variations of username required to be an email address # user@example.com; we also test an over-ride port on a template driven # mailto:// entry - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:pass123@hotmail.com:444' '?to=user2@yahoo.com&name=test%20name') assert isinstance(results, dict) @@ -929,7 +929,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert 'user2@yahoo.com' in results['targets'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert mock_smtp.call_count == 0 assert mock_smtp_ssl.call_count == 0 @@ -966,7 +966,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # The below switches the `name` with the `to` to verify the results # are the same; it also verfies that the mode gets changed to SSL # instead of STARTTLS - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:pass123@hotmail.com?smtp=override.com' '&name=test%20name&to=user2@yahoo.com&mode=ssl') assert isinstance(results, dict) @@ -977,7 +977,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert 'user2@yahoo.com' in results['targets'] assert 'ssl' == results['secure_mode'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert mock_smtp.call_count == 0 assert mock_smtp_ssl.call_count == 0 @@ -1020,10 +1020,10 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # # Test outlook/hotmail lookups # - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:pass123@hotmail.com') obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.smtp_host == 'smtp-mail.outlook.com' # No entries in the reply_to assert not obj.reply_to @@ -1046,10 +1046,10 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.reset_mock() response.reset_mock() - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:pass123@outlook.com') obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.smtp_host == 'smtp.outlook.com' # No entries in the reply_to assert not obj.reply_to @@ -1072,10 +1072,10 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.reset_mock() response.reset_mock() - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:pass123@outlook.com.au') obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.smtp_host == 'smtp.outlook.com' # No entries in the reply_to assert not obj.reply_to @@ -1099,11 +1099,11 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): response.reset_mock() # Consisitency Checks - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://outlook.com?smtp=smtp.outlook.com' '&user=user@outlook.com&pass=app.pw') obj1 = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj1, NotifyEmail) + assert isinstance(obj1, email.NotifyEmail) assert obj1.smtp_host == 'smtp.outlook.com' assert obj1.user == 'user@outlook.com' assert obj1.password == 'app.pw' @@ -1128,10 +1128,10 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.reset_mock() response.reset_mock() - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:app.pw@outlook.com') obj2 = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj2, NotifyEmail) + assert isinstance(obj2, email.NotifyEmail) assert obj2.smtp_host == obj1.smtp_host assert obj2.user == obj1.user assert obj2.password == obj1.password @@ -1156,10 +1156,10 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.reset_mock() response.reset_mock() - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailto://user:pass@comcast.net') obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert obj.smtp_host == 'smtp.comcast.net' assert obj.user == 'user@comcast.net' assert obj.password == 'pass' @@ -1184,10 +1184,10 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.reset_mock() response.reset_mock() - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:pass123@live.com') obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # No entries in the reply_to assert not obj.reply_to @@ -1209,10 +1209,10 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.reset_mock() response.reset_mock() - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:pass123@hotmail.com') obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # No entries in the reply_to assert not obj.reply_to @@ -1237,11 +1237,11 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # # Test Port Over-Riding # - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( "mailtos://abc:password@xyz.cn:465?" "smtp=smtp.exmail.qq.com&mode=ssl") obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # Verify our over-rides are in place assert obj.smtp_host == 'smtp.exmail.qq.com' @@ -1281,11 +1281,11 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.reset_mock() response.reset_mock() - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( "mailtos://abc:password@xyz.cn?" "smtp=smtp.exmail.qq.com&mode=ssl&port=465") obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # Verify our over-rides are in place assert obj.smtp_host == 'smtp.exmail.qq.com' @@ -1317,10 +1317,10 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # # Test Reply-To Email # - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( "mailtos://user:pass@example.com?reply=noreply@example.com") obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # Verify our over-rides are in place assert obj.smtp_host == 'example.com' assert obj.from_addr[0] == obj.app_id @@ -1352,10 +1352,10 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # # Test Reply-To Email with Name Inline # - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( "mailtos://user:pass@example.com?reply=Chris") obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # Verify our over-rides are in place assert obj.smtp_host == 'example.com' assert obj.from_addr[0] == obj.app_id @@ -1389,7 +1389,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # Test variations of username required to be an email address # user@example.com; we also test an over-ride port on a template driven # mailto:// entry - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailto://fastmail.com/?to=hello@concordium-explorer.nl' '&user=joe@mydomain.nl&pass=abc123' '&from=Concordium Explorer Bot') @@ -1403,7 +1403,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert 'hello@concordium-explorer.nl' in results['targets'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert mock_smtp.call_count == 0 assert mock_smtp_ssl.call_count == 0 @@ -1441,7 +1441,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # should just have to be written like (to= omitted) # mailto://fastmail.com?user=username@customdomain.com&pass=password123 # - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailto://fastmail.com?user=username@customdomain.com' '&pass=password123') assert isinstance(results, dict) @@ -1453,7 +1453,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert results['smtp_host'] == '' obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # During instantiation, our variables get detected assert obj.smtp_host == 'smtp.fastmail.com' assert obj.from_addr == ['Apprise', 'username@customdomain.com'] @@ -1492,7 +1492,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # Similar test as above, just showing that we can over-ride the From= # with these custom URLs as well and not require a full email - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailto://fastmail.com?user=username@customdomain.com' '&pass=password123&from=Custom') assert isinstance(results, dict) @@ -1504,7 +1504,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert results['smtp_host'] == '' obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # During instantiation, our variables get detected assert obj.smtp_host == 'smtp.fastmail.com' assert obj.from_addr == ['Custom', 'username@customdomain.com'] @@ -1547,7 +1547,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): # host domain = domain.subdomain.com # PASSWORD needs to be fetched since a user= was provided # - this is an edge case that is tested here - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://PASSWORD@domain.subdomain.com:587?' 'user=admin@mail-domain.com&to=mail@mail-domain.com') assert isinstance(results, dict) @@ -1561,7 +1561,7 @@ def test_plugin_email_url_parsing(mock_smtp, mock_smtp_ssl): assert 'mail@mail-domain.com' in results['targets'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) # Not that our from_address takes on 'admin@domain.subdomain.com' assert obj.from_addr == ['Apprise', 'admin@domain.subdomain.com'] @@ -1607,7 +1607,7 @@ def test_plugin_email_plus_in_toemail(mock_smtp, mock_smtp_ssl): # We want to test the case where a + is found in the To address; we want to # ensure that it is supported - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:pass123@gmail.com' '?to=Plus Support') assert isinstance(results, dict) @@ -1618,7 +1618,7 @@ def test_plugin_email_plus_in_toemail(mock_smtp, mock_smtp_ssl): assert 'Plus Support' in results['targets'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert len(obj.targets) == 1 assert ('Plus Support', 'test+notification@gmail.com') in obj.targets @@ -1655,7 +1655,7 @@ def test_plugin_email_plus_in_toemail(mock_smtp, mock_smtp_ssl): # Perform the same test where the To field jsut contains the + in the # address # - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:pass123@gmail.com' '?to=test+notification@gmail.com') assert isinstance(results, dict) @@ -1666,7 +1666,7 @@ def test_plugin_email_plus_in_toemail(mock_smtp, mock_smtp_ssl): assert 'test+notification@gmail.com' in results['targets'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert len(obj.targets) == 1 assert (False, 'test+notification@gmail.com') in obj.targets @@ -1699,7 +1699,7 @@ def test_plugin_email_plus_in_toemail(mock_smtp, mock_smtp_ssl): # # Perform the same test where the To field is in the URL itself # - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://user:pass123@gmail.com' '/test+notification@gmail.com') assert isinstance(results, dict) @@ -1710,7 +1710,7 @@ def test_plugin_email_plus_in_toemail(mock_smtp, mock_smtp_ssl): assert 'test+notification@gmail.com' in results['targets'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert len(obj.targets) == 1 assert (False, 'test+notification@gmail.com') in obj.targets @@ -1751,7 +1751,7 @@ def test_plugin_email_formatting_990(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.return_value = response mock_smtp.return_value = response - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://mydomain.com?smtp=mail.local.mydomain.com' '&user=noreply@mydomain.com&pass=mypassword' '&from=noreply@mydomain.com&to=me@mydomain.com&mode=ssl&port=465') @@ -1766,7 +1766,7 @@ def test_plugin_email_formatting_990(mock_smtp, mock_smtp_ssl): assert 'me@mydomain.com' in results['targets'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) + assert isinstance(obj, email.NotifyEmail) assert len(obj.targets) == 1 assert (False, 'me@mydomain.com') in obj.targets @@ -1840,7 +1840,7 @@ def test_plugin_host_detection_from_source_email(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.return_value = response mock_smtp.return_value = response - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://spectrum.net?smtp=mobile.charter.net' '&pass=password&user=name@spectrum.net') @@ -1851,7 +1851,7 @@ def test_plugin_host_detection_from_source_email(mock_smtp, mock_smtp_ssl): assert 'password' == results['password'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) is True + assert isinstance(obj, email.NotifyEmail) is True assert len(obj.targets) == 1 assert (False, 'name@spectrum.net') in obj.targets @@ -1893,7 +1893,7 @@ def test_plugin_host_detection_from_source_email(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.reset_mock() response.reset_mock() - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://?smtp=mobile.charter.net' '&pass=password&user=name@spectrum.net') @@ -1904,7 +1904,7 @@ def test_plugin_host_detection_from_source_email(mock_smtp, mock_smtp_ssl): assert 'password' == results['password'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) is True + assert isinstance(obj, email.NotifyEmail) is True assert len(obj.targets) == 1 assert (False, 'name@spectrum.net') in obj.targets @@ -1946,7 +1946,7 @@ def test_plugin_host_detection_from_source_email(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.reset_mock() response.reset_mock() - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://?smtp=mobile.charter.net' '&pass=password&user=userid-without-domain') @@ -1968,7 +1968,7 @@ def test_plugin_host_detection_from_source_email(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.reset_mock() response.reset_mock() - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailtos://John Doe?smtp=mobile.charter.net' '&pass=password&user=name@spectrum.net') @@ -1979,7 +1979,7 @@ def test_plugin_host_detection_from_source_email(mock_smtp, mock_smtp_ssl): assert 'password' == results['password'] obj = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(obj, NotifyEmail) is True + assert isinstance(obj, email.NotifyEmail) is True assert len(obj.targets) == 1 assert ('John Doe', 'john@yahoo.ca') in obj.targets @@ -2028,7 +2028,7 @@ def test_plugin_email_by_ipaddr_1113(mock_smtp, mock_smtp_ssl): mock_smtp_ssl.return_value = response mock_smtp.return_value = response - results = NotifyEmail.parse_url( + results = email.NotifyEmail.parse_url( 'mailto://10.0.0.195:25/?to=alerts@example.com&' 'from=sender@example.com') @@ -2042,18 +2042,18 @@ def test_plugin_email_by_ipaddr_1113(mock_smtp, mock_smtp_ssl): assert results['targets'][0] == 'alerts@example.com' assert results['port'] == 25 - email = Apprise.instantiate(results, suppress_exceptions=False) - assert isinstance(email, NotifyEmail) is True + _email = Apprise.instantiate(results, suppress_exceptions=False) + assert isinstance(_email, email.NotifyEmail) is True - assert len(email.targets) == 1 - assert (False, 'alerts@example.com') in email.targets + assert len(_email.targets) == 1 + assert (False, 'alerts@example.com') in _email.targets - assert email.from_addr == (False, 'sender@example.com') - assert email.user is None - assert email.password is None - assert email.smtp_host == '10.0.0.195' - assert email.port == 25 - assert email.targets == [(False, 'alerts@example.com')] + assert _email.from_addr == (False, 'sender@example.com') + assert _email.user is None + assert _email.password is None + assert _email.smtp_host == '10.0.0.195' + assert _email.port == 25 + assert _email.targets == [(False, 'alerts@example.com')] @pytest.mark.skipif('pgpy' not in sys.modules, reason="Requires PGPy") @@ -2064,6 +2064,25 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): NotifyEmail() PGP Tests """ + # Our mock of our socket action + mock_socket = mock.Mock() + mock_socket.starttls.return_value = True + mock_socket.login.return_value = True + + # Create a mock SMTP Object + mock_smtp.return_value = mock_socket + mock_smtpssl.return_value = mock_socket + + assert email.PGP_SUPPORT is True + email.PGP_SUPPORT = False + # Forces to run through section of code that produces a warning there is + # no PGP + obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes') + # No PGP Support and set enabled + assert obj.notify('test body') is False + + # Return the PGP status for remaining checks + email.PGP_SUPPORT = True # Initialize our email (no from name) obj = Apprise.instantiate('mailto://user:pass@nuxref.com?pgp=yes') @@ -2096,11 +2115,31 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): # Now we'll have a public key assert isinstance(obj.pgp_pubkey(), str) + # Generate warning by second call + assert obj.pgp_generate_keys() is True + + # Remove newly generated files + os.unlink(os.path.join(obj.store.path, 'pgp-pub.asc')) + os.unlink(os.path.join(obj.store.path, 'pgp-prv.asc')) + obj = Apprise.instantiate( + 'mailto://pgp:pass@nuxref.com?pgp=yes', asset=asset) + assert obj.store.mode == PersistentStoreMode.FLUSH + assert obj.pgp_generate_keys() is True + # Prepare PGP obj = Apprise.instantiate( 'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey=%s' % obj.pgp_pubkey(), asset=asset) + # Regenerate our keys even with a pgpkey provided + assert obj.pgp_generate_keys() is True + + # Second call uses cache + assert obj.pgp_generate_keys() is True + + # Utilize path parameter + assert obj.pgp_generate_keys(path=obj.store.path) is True + # We will find our key assert obj.pgp_public_key() is not None @@ -2119,6 +2158,17 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): assert obj.store.mode == PersistentStoreMode.FLUSH assert obj.pgp_generate_keys() is True + # Second call uses cache + assert obj.pgp_generate_keys() is True + # Utilize path parameter + assert obj.pgp_generate_keys(path=obj.store.path) is True + + # We will find our key + assert obj.pgp_public_key() is not None + + # However explicitly setting a path works + assert obj.pgp_generate_keys(str(tmpdir1)) is True + # We do this again but even when we do a requisition for a public key # it will generate a new pair or keys for us once it detects we don't # have any @@ -2128,7 +2178,7 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): storage_path=str(tmpdir3), ) obj = Apprise.instantiate( - 'mailto://chris:pass@nuxref.com?pgp=yes', asset=asset) + 'mailto://chris:pass@nuxref.com/user@example.com?pgp=yes', asset=asset) assert obj.store.mode == PersistentStoreMode.FLUSH @@ -2138,3 +2188,97 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): encrypted = obj.pgp_encrypt_message("hello world") assert encrypted.startswith('-----BEGIN PGP MESSAGE-----') assert encrypted.rstrip().endswith('-----END PGP MESSAGE-----') + + dir_content = os.listdir(obj.store.path) + assert 'chris-pub.asc' in dir_content + assert 'chris-prv.asc' in dir_content + + assert obj.pgp_pubkey().endswith('chris-pub.asc') + + assert obj.notify('test body') is True + + # The private key is not needed for sending the encrypted messages + os.unlink(os.path.join(obj.store.path, 'chris-prv.asc')) + os.rename( + os.path.join(obj.store.path, 'chris-pub.asc'), + os.path.join(obj.store.path, 'user@example.com-pub.asc')) + + assert obj.pgp_pubkey() is None + assert obj.pgp_pubkey(email="user@example.com")\ + .endswith('user@example.com-pub.asc') + assert obj.pgp_pubkey(email="User@Example.com")\ + .endswith('user@example.com-pub.asc') + assert obj.pgp_pubkey(email="unknown") is None + + shutil.copyfile( + os.path.join(obj.store.path, 'user@example.com-pub.asc'), + os.path.join(obj.store.path, 'user-pub.asc'), + ) + assert obj.pgp_pubkey(email="user@example.com")\ + .endswith('user@example.com-pub.asc') + assert obj.pgp_pubkey(email="User@Example.com")\ + .endswith('user@example.com-pub.asc') + + # Remove file + os.unlink(os.path.join(obj.store.path, 'user@example.com-pub.asc')) + assert obj.pgp_pubkey(email="user@example.com").endswith('user-pub.asc') + shutil.copyfile( + os.path.join(obj.store.path, 'user-pub.asc'), + os.path.join(obj.store.path, 'chris-pub.asc'), + ) + # user-pub.asc still trumps still trumps + assert obj.pgp_pubkey(email="user@example.com").endswith('user-pub.asc') + shutil.copyfile( + os.path.join(obj.store.path, 'chris-pub.asc'), + os.path.join(obj.store.path, 'chris@nuxref.com-pub.asc'), + ) + # user-pub still trumps + assert obj.pgp_pubkey(email="user@example.com").endswith('user-pub.asc') + assert obj.pgp_pubkey(email="invalid@example.com")\ + .endswith('chris@nuxref.com-pub.asc') + + # remove this file + os.unlink(os.path.join(obj.store.path, 'user-pub.asc')) + + # now we fall back to basic/default configuration + assert obj.pgp_pubkey(email="user@example.com")\ + .endswith('chris@nuxref.com-pub.asc') + os.unlink(os.path.join(obj.store.path, 'chris@nuxref.com-pub.asc')) + assert obj.pgp_pubkey(email="user@example.com").endswith('chris-pub.asc') + + # Testing again + tmpdir4 = tmpdir.mkdir('tmp04') + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir4), + ) + obj = Apprise.instantiate( + 'mailto://chris:pass@nuxref.com/user@example.com?pgp=yes', asset=asset) + + with mock.patch('builtins.open', side_effect=OSError()): + # can't open key + assert obj.pgp_public_key(path=obj.store.path) is None + # Test unlink + with mock.patch('os.unlink', side_effect=OSError()): + assert obj.pgp_public_key(path=obj.store.path) is None + + with mock.patch('pgpy.PGPKey.new', side_effect=NameError): + # Can't Generate keys + assert obj.pgp_generate_keys() is False + # can't open key + assert obj.pgp_public_key(path=obj.store.path) is None + + with mock.patch('pgpy.PGPKey.new', side_effect=FileNotFoundError): + # can't open key + assert obj.pgp_public_key(path=obj.store.path) is None + + with mock.patch('pgpy.PGPKey.new', side_effect=OSError): + # can't open key + assert obj.pgp_public_key(path=obj.store.path) is None + + # Can't encrypt key + with mock.patch('pgpy.PGPKey.from_blob', side_effect=NameError): + assert obj.pgp_public_key() is None + + with mock.patch('pgpy.PGPMessage.new', side_effect=NameError): + assert obj.pgp_encrypt_message("message") is None From 800aad776577256d67b7a3d30d36b2e8c8a217af Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Thu, 3 Oct 2024 20:49:18 -0400 Subject: [PATCH 5/5] test coverage complete --- apprise/plugins/email.py | 55 +++++++++++++----------- test/test_plugin_email.py | 86 +++++++++++++++++++++++++++++++++++++- test/var/pgp/valid-prv.asc | 31 -------------- 3 files changed, 115 insertions(+), 57 deletions(-) delete mode 100644 test/var/pgp/valid-prv.asc diff --git a/apprise/plugins/email.py b/apprise/plugins/email.py index 09e4c2a81..776873018 100644 --- a/apprise/plugins/email.py +++ b/apprise/plugins/email.py @@ -900,25 +900,29 @@ def send(self, body, title='', notify_type=NotifyType.INFO, attach=None, # Apply our encryption encrypted_content = self.pgp_encrypt_message(base.as_string()) - if encrypted_content: - # prepare our messsage - base = MIMEMultipart( - "encrypted", protocol="application/pgp-encrypted") - - # Store Autocrypt header (DeltaChat Support) - base.add_header( - "Autocrypt", - "addr=%s; prefer-encrypt=mutual" % formataddr( - (False, to_addr), charset='utf-8')) - - # Set Encryption Info Part - enc_payload = MIMEText("Version: 1", "plain") - enc_payload.set_type("application/pgp-encrypted") - base.attach(enc_payload) - - enc_payload = MIMEBase("application", "octet-stream") - enc_payload.set_payload(encrypted_content) - base.attach(enc_payload) + if not encrypted_content: + self.logger.warning('Unable to PGP encrypt email') + # Unable to send notification + return False + + # prepare our messsage + base = MIMEMultipart( + "encrypted", protocol="application/pgp-encrypted") + + # Store Autocrypt header (DeltaChat Support) + base.add_header( + "Autocrypt", + "addr=%s; prefer-encrypt=mutual" % formataddr( + (False, to_addr), charset='utf-8')) + + # Set Encryption Info Part + enc_payload = MIMEText("Version: 1", "plain") + enc_payload.set_type("application/pgp-encrypted") + base.attach(enc_payload) + + enc_payload = MIMEBase("application", "octet-stream") + enc_payload.set_payload(encrypted_content) + base.attach(enc_payload) # Apply any provided custom headers for k, v in self.headers.items(): @@ -1147,13 +1151,11 @@ def pgp_pubkey(self, email=None): for email in emails: _entry = email.split('@')[0].lower() - if _entry not in fnames: - fnames.insert(0, f'{_entry}-pub.asc') + fnames.insert(0, f'{_entry}-pub.asc') # Lowercase email (Highest Priority) _entry = email.lower() - if _entry not in fnames: - fnames.insert(0, f'{_entry}-pub.asc') + fnames.insert(0, f'{_entry}-pub.asc') return next( (os.path.join(self.store.path, fname) @@ -1179,7 +1181,7 @@ def pgp_public_key(self, path=None, email=None): self.logger.warning('No PGP Public Key could be loaded') return None - # Persistent storage key: + # Persistent storage key ps_key = hashlib.sha1( os.path.abspath(path).encode('utf-8')).hexdigest() if ps_key in self.pgp_public_keys: @@ -1230,6 +1232,11 @@ def pgp_encrypt_message(self, message, path=None): encrypted_message = public_key.encrypt(message_object) return str(encrypted_message) + except pgpy.errors.PGPError: + # Encryption not Possible + self.logger.debug( + 'PGP Public Key Corruption; encryption not possible') + except NameError: # PGPy not installed self.logger.debug('PGPy not installed; Skipping PGP encryption') diff --git a/test/test_plugin_email.py b/test/test_plugin_email.py index 3c32f2e6b..8a365c9f5 100644 --- a/test/test_plugin_email.py +++ b/test/test_plugin_email.py @@ -2147,6 +2147,12 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): # However explicitly setting a path works assert obj.pgp_generate_keys(str(tmpdir1)) is True + # Prepare Invalid PGP Key + obj = Apprise.instantiate( + 'mailto://pgp:pass@nuxref.com?pgp=yes&pgpkey=invalid', + asset=asset) + assert obj.pgp_pubkey() is False + tmpdir2 = tmpdir.mkdir('tmp02') asset = AppriseAsset( storage_mode=PersistentStoreMode.FLUSH, @@ -2204,6 +2210,10 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): os.path.join(obj.store.path, 'user@example.com-pub.asc')) assert obj.pgp_pubkey() is None + assert obj.pgp_pubkey(email="not-reference@example.com") is None + assert obj.pgp_pubkey(email="user@example.com")\ + .endswith('user@example.com-pub.asc') + assert obj.pgp_pubkey(email="user@example.com")\ .endswith('user@example.com-pub.asc') assert obj.pgp_pubkey(email="User@Example.com")\ @@ -2214,6 +2224,7 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): os.path.join(obj.store.path, 'user@example.com-pub.asc'), os.path.join(obj.store.path, 'user-pub.asc'), ) + assert obj.pgp_pubkey(email="user@example.com")\ .endswith('user@example.com-pub.asc') assert obj.pgp_pubkey(email="User@Example.com")\ @@ -2255,13 +2266,20 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): obj = Apprise.instantiate( 'mailto://chris:pass@nuxref.com/user@example.com?pgp=yes', asset=asset) - with mock.patch('builtins.open', side_effect=OSError()): + with mock.patch('builtins.open', side_effect=FileNotFoundError): + # can't open key + assert obj.pgp_public_key(path=obj.store.path) is None + + with mock.patch('builtins.open', side_effect=OSError): # can't open key assert obj.pgp_public_key(path=obj.store.path) is None # Test unlink - with mock.patch('os.unlink', side_effect=OSError()): + with mock.patch('os.unlink', side_effect=OSError): assert obj.pgp_public_key(path=obj.store.path) is None + # Key Generation will fail + assert obj.pgp_generate_keys() is False + with mock.patch('pgpy.PGPKey.new', side_effect=NameError): # Can't Generate keys assert obj.pgp_generate_keys() is False @@ -2282,3 +2300,67 @@ def test_plugin_email_pgp(mock_smtp, mock_smtpssl, tmpdir): with mock.patch('pgpy.PGPMessage.new', side_effect=NameError): assert obj.pgp_encrypt_message("message") is None + # Attempts to encrypt a message + assert obj.notify('test') is False + + # Create new keys + assert obj.pgp_generate_keys() is True + with mock.patch('os.path.isfile', return_value=False): + with mock.patch('builtins.open', side_effect=OSError): + with mock.patch('os.unlink', return_value=None): + assert obj.pgp_generate_keys() is False + + # Testing again + tmpdir5 = tmpdir.mkdir('tmp05') + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir5), + ) + obj = Apprise.instantiate( + 'mailto://chris:pass@nuxref.com/user@example.com?pgp=yes', asset=asset) + + # Catch edge case where we just can't generate the the key + with mock.patch('os.path.isfile', side_effect=( + # 5x False to skip through pgp_pubkey() + False, False, False, False, False, False, + # 1x True to pass pgp_generate_keys() + True, + # 5x False to skip through pgp_pubkey() second call + False, False, False, False, False, False)): + with mock.patch('pgpy.PGPKey.from_blob', + side_effect=FileNotFoundError): + assert obj.pgp_public_key() is None + + # Corrupt Data + tmpdir6 = tmpdir.mkdir('tmp06') + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir6), + ) + obj = Apprise.instantiate( + 'mailto://chris:pass@nuxref.com/user@example.com?pgp=yes', asset=asset) + + shutil.copyfile( + os.path.join(TEST_VAR_DIR, 'pgp', 'corrupt-pub.asc'), + os.path.join(obj.store.path, 'chris-pub.asc'), + ) + + # Key is corrupted + obj.notify('test') is False + + shutil.copyfile( + os.path.join(TEST_VAR_DIR, 'apprise-test.jpeg'), + os.path.join(obj.store.path, 'chris-pub.asc'), + ) + + # Key is a binary image; definitely not a valid key + obj.notify('test') is False + + # Using a public key + shutil.copyfile( + os.path.join(TEST_VAR_DIR, 'pgp', 'valid-pub.asc'), + os.path.join(obj.store.path, 'chris-pub.asc'), + ) + + # Notification goes through + obj.notify('test') is True diff --git a/test/var/pgp/valid-prv.asc b/test/var/pgp/valid-prv.asc deleted file mode 100644 index c98a8e311..000000000 --- a/test/var/pgp/valid-prv.asc +++ /dev/null @@ -1,31 +0,0 @@ ------BEGIN PGP PRIVATE KEY BLOCK----- - -xcLYBGbo3ycBCACjECe63GpZanFYLE678OIhzpvY0J6pCBCZblGxXOuwuv7VPIq7 -XQN91UdOL8huDb/JYRxarDoS6+JvyempbzZt0S+veXDl4pRFb+Q4a5sp1l/Mduw0 -rDFErwfF8SpMPBI2WJUhN9n72UEqXVDvTPdcAka8v8p7dS6/RKZzch8P+EjgJME0 -p9+N/2Lrbi2nDDXD+xD4Odw83J5V8Xn/jie3GCxtXda5XIX3EzTSB30elLLNBMcq -N5xJBTrjhciDzU85Gb+bUecnoj9Oj6bRS9TT34rZZk7qyB2iroq9SdIBCGyn1q4r -uIskVNCsqgP9cMa+S1XePUT77VNNN1yzhACpABEBAAEAB/4tOpPqjqyo9I9Px6pn -Et+GRQqRTvxTIjuIc0MRkRaGxLdeahaI9bm8M2Y9158ed43Uy6zTsaXCDc+W9khr -iL9uInG5mFOqT/iUcf65b49wQVf9HJdT3Ncll+7uBoCW+KqMjHGA7z71TkN2/r8u -QQjzamY4gHInYE+BGgeZSfQ3t1MJMSdjQopPGDwSsco5hQJYtVH8K/0/Ig5S5E9Q -KD9ku3W9bCliERkljIwEbbyDv/vmbxPKdWW83T+UQK6CQhkH6h69EoMGQ76a6y/H -UuppNSpxuR4BiX4ZlcUyARrLluaRS0K1/OZCoScA0LLjY8pCEBoWT0uhhvy3t2xD -/bipBADS3bM3yGGZKtNgLPQx0BAyWk07OD3AlObykz4yTIc9DZj1bzhHKgAhwUAN -k7StwA22HoxMCKSoxhherZaXAQaJJOJKNXw3DphHCexrBq77nxBu3yo9UStj04Lx -tCEibclsQcwgh7TjjjDQdRYiirZvu9IGQBf27xKvTepibn7NnwQAxfcePfXsHza3 -7CuJbxOGFaPf4ENSpFRYSZbH3dErtSlGDzz8e8jI0Ck9LQgp9MfjksWUwaMQbXdV -zNbQe1lAWQxtN9amVvEWvrAJhbhEU6RLsSjpZ9W5r3xAbfkoDg/icjbdoOqwI6LE -aTEhwaz+XZMLYJiT22AMJyC7TcL6fLcD/0nBRheQBqTsuYKimKI5yZ3ZdlGHfaLN -OqMGfuaEQCUAhSaXNliuP3XAWfiVXCaRw9De+Eod6DfGMGTTx8EVy7N4y4w3TORp -fFKaMGD3oiw1Eh63K1jV2yWPPpOnyc+YtXCPuGS+n/3CITc5cKxaPapQtCJA9Gw0 -OaZ7ikUNs0Q9NbfNIUNocmlzIENhcm9uIDxsZWFkMmdvbGRAZ21haWwuY29tPsLA -ggQTAQgALAUCZujfJwIbBgILCQIVCAIWAgIeARYhBEHHWtq4Kh8dGraFnkmZAD9B -29oPAAoJEEmZAD9B29oPAawIAImCijTdvDl8Sibwo7gL4ayF4S3KhaKCYORcMM1o -e4pesy5ME6fjiEEW8gVQ4W2KDs74aCGkQtQJirvNA7WnuyMyXZyvhYa63U7GTk5R -dVkMygT0a5n8/8HVAenZrBL6VNaZYw/LlgWd0knhsmqdGTsjKuYdZ3CHED85pv/M -Owe0pyGOQKtJ1t9qwc6l2GWBnvOQje+lQGIfrE6TIwsf4QoKXSkTakzggbpZZl2h -g2O6dJiij1cH+DYFVTaVXw4rVmo8ckTJ9DiFT9H/EmsNqlSKTTv1Aw4raCFZ+T/O -csw/vIOoEtVhiT/mfDcIbi0VB3EhYvI3eFsoyiZsjyu9xY0= -=dBp6 ------END PGP PRIVATE KEY BLOCK-----