diff --git a/.github/workflows/python-lint-test.yml b/.github/workflows/python-lint-test.yml index a0a6f6a..75228f5 100644 --- a/.github/workflows/python-lint-test.yml +++ b/.github/workflows/python-lint-test.yml @@ -10,7 +10,7 @@ on: branches: [master] jobs: - build: + test: runs-on: ubuntu-latest strategy: max-parallel: 5 @@ -26,7 +26,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install flake8 pytest pylint tox tox-gh-actions + pip install pytest tox tox-gh-actions if [ -f requirements.txt ]; then pip install -r requirements.txt; fi - name: Test with tox run: tox @@ -39,3 +39,24 @@ jobs: file: .tox/coverage.xml fail_ci_if_error: true verbose: true + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up Python 3.8 + uses: actions/setup-python@v2 + with: + python-version: 3.8 + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install flake8 pylint black + if [ -f requirements.txt ]; then pip install -r requirements.txt; fi + - name: Lint with flake8 + run: | + # stop the build if there are Python syntax errors or undefined names + flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics + # exit-zero treats all errors as warnings. + flake8 . --count --exit-zero --statistics + - name: Check formatting with black + run: black --check **/*.py diff --git a/example/listener_example.py b/example/listener_example.py index ff7e08c..0b465cc 100755 --- a/example/listener_example.py +++ b/example/listener_example.py @@ -1,35 +1,30 @@ #!/usr/bin/env python -__author__ = 'Igor Maculan ' +__author__ = "Igor Maculan " import logging -from pushbullet import Listener -from pushbullet import Pushbullet - +from pushbullet import Listener, Pushbullet logging.basicConfig(level=logging.DEBUG) -API_KEY = '' # YOUR API KEY +API_KEY = "" # YOUR API KEY HTTP_PROXY_HOST = None HTTP_PROXY_PORT = None def on_push(data): - print('Received data:\n{}'.format(data)) + print("Received data:\n{}".format(data)) def main(): pb = Pushbullet(API_KEY) - s = Listener(account=pb, - on_push=on_push, - http_proxy_host=HTTP_PROXY_HOST, - http_proxy_port=HTTP_PROXY_PORT) + s = Listener(account=pb, on_push=on_push, http_proxy_host=HTTP_PROXY_HOST, http_proxy_port=HTTP_PROXY_PORT) try: s.run_forever() except KeyboardInterrupt: s.close() -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/example/mirror_example.py b/example/mirror_example.py index 198912e..eeaea44 100644 --- a/example/mirror_example.py +++ b/example/mirror_example.py @@ -2,18 +2,19 @@ A simple example showing how to mirror notifications. """ -import json -import hashlib import base64 +import hashlib +import json +import os import subprocess -import os, sys +import sys import time -from pushbullet import PushBullet, Listener +from pushbullet import Listener, PushBullet -class Mirrorer(object): - def __init__(self, auth_key, temp_folder, device_name, last_push = time.time(), device_iden=None): +class Mirrorer(object): + def __init__(self, auth_key, temp_folder, device_name, last_push=time.time(), device_iden=None): self.temp_folder = temp_folder if not os.path.exists(self.temp_folder): os.makedirs(temp_folder) @@ -32,13 +33,12 @@ def __init__(self, auth_key, temp_folder, device_name, last_push = time.time(), if not self.device: try: device = self.pb.new_device(device_name) - print("Created new device:",device_name,"iden:",device.device_iden) + print("Created new device:", device_name, "iden:", device.device_iden) self.device = device - except: + except Exception: print("Error: Unable to create device") raise - self.check_pushes() def save_icon(self, b64_asset): @@ -55,10 +55,12 @@ def save_icon(self, b64_asset): def check_pushes(self): pushes = self.pb.get_pushes(self.last_push) for push in pushes: - if not isinstance(push,dict): + if not isinstance(push, dict): # not a push object continue - if ((push.get("target_device_iden", self.device.device_iden) == self.device.device_iden) and not (push.get("dismissed", True))): + if (push.get("target_device_iden", self.device.device_iden) == self.device.device_iden) and not ( + push.get("dismissed", True) + ): self.notify(push.get("title", ""), push.get("body", "")) self.pb.dismiss_push(push.get("iden")) self.last_push = max(self.last_push, push.get("created")) @@ -67,23 +69,23 @@ def watcher(self, push): if push["type"] == "push" and push["push"]["type"] == "mirror": print("MIRROR") image_path = self.save_icon(push["push"]["icon"]) - self.notify(push["push"]["title"], - push["push"]["body"], image_path) + self.notify(push["push"]["title"], push["push"]["body"], image_path) elif push["type"] == "tickle": print("TICKLE") self.check_pushes() - def notify(self, title, body, image=None): subprocess.Popen(["notify-send", title, body, "-i", image or ""]) print(title) print(body) def dump_config(self, path): - config = {"temp_folder": self.temp_folder, - "auth_key": self._auth_key, - "device_name": self.device.nickname, - "device_iden": self.device.device_iden} + config = { + "temp_folder": self.temp_folder, + "auth_key": self._auth_key, + "device_name": self.device.nickname, + "device_iden": self.device.device_iden, + } with open(path, "w") as conf: json.dump(config, conf) @@ -92,7 +94,7 @@ def run(self): self.listener.run_forever() except KeyboardInterrupt: self.listener.close() - + def main(): config_file = sys.argv[1] @@ -104,5 +106,5 @@ def main(): m.dump_config(config_file) -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/pushbullet/__init__.py b/pushbullet/__init__.py index 2ac8bdd..9b61c4b 100644 --- a/pushbullet/__init__.py +++ b/pushbullet/__init__.py @@ -1,7 +1,18 @@ from .__version__ import __version__ -from .pushbullet import Pushbullet from .device import Device +from .errors import InvalidKeyError, PushbulletError, PushError from .listener import Listener -from .errors import PushbulletError, InvalidKeyError, PushError +from .pushbullet import Pushbullet PushBullet = Pushbullet + +__all__ = [ + "__version__", + "Device", + "InvalidKeyError", + "PushbulletError", + "PushError", + "Listener", + "Pushbullet", + "PushBullet", +] diff --git a/pushbullet/_compat.py b/pushbullet/_compat.py index 90bdc7f..cad3252 100644 --- a/pushbullet/_compat.py +++ b/pushbullet/_compat.py @@ -1,13 +1,13 @@ import sys -PY2 = sys.version_info[0] == 2 -if PY2: +def _py2_b64encode(x): + return x.encode("base64") - standard_b64encode = lambda x: x.encode("base64") +if sys.version_info[0] == 2: + standard_b64encode = _py2_b64encode else: - from base64 import standard_b64encode -__all__ = ['standard_b64encode'] +__all__ = ["standard_b64encode"] diff --git a/pushbullet/channel.py b/pushbullet/channel.py index 7485c6a..1cbfd85 100644 --- a/pushbullet/channel.py +++ b/pushbullet/channel.py @@ -1,12 +1,9 @@ from __future__ import unicode_literals -import warnings - from .helpers import use_appropriate_encoding class Channel(object): - def __init__(self, account, channel_info): self._account = account self.channel_tag = channel_info.get("tag") diff --git a/pushbullet/chat.py b/pushbullet/chat.py index 4e23013..dcb5b4b 100644 --- a/pushbullet/chat.py +++ b/pushbullet/chat.py @@ -1,16 +1,15 @@ from __future__ import unicode_literals -from .helpers import use_appropriate_encoding from .device import Device +from .helpers import use_appropriate_encoding class Chat(Device): - def __init__(self, account, chat_info): self._account = account self.iden = chat_info.get("iden") - contact_info = chat_info['with'] + contact_info = chat_info["with"] for attr in ("created", "modified", "muted"): setattr(self, attr, chat_info.get(attr)) for attr in ("name", "email", "email_normalized", "image_url"): diff --git a/pushbullet/device.py b/pushbullet/device.py index c951a99..e568926 100644 --- a/pushbullet/device.py +++ b/pushbullet/device.py @@ -1,20 +1,29 @@ from __future__ import unicode_literals -import warnings - from .helpers import use_appropriate_encoding class Device(object): - def __init__(self, account, device_info): self._account = account self.device_iden = device_info.get("iden") if not device_info.get("icon", None): device_info["icon"] = "system" - for attr in ("push_token", "app_version", "fingerprint", "created", "modified", - "active", "nickname", "generated_nickname", "manufacturer", "icon", - "model", "has_sms", "key_fingerprint"): + for attr in ( + "push_token", + "app_version", + "fingerprint", + "created", + "modified", + "active", + "nickname", + "generated_nickname", + "manufacturer", + "icon", + "model", + "has_sms", + "key_fingerprint", + ): setattr(self, attr, device_info.get(attr)) def push_note(self, title, body): diff --git a/pushbullet/errors.py b/pushbullet/errors.py index e3f0405..dfb0b4d 100644 --- a/pushbullet/errors.py +++ b/pushbullet/errors.py @@ -1,9 +1,11 @@ class PushbulletError(Exception): pass + class InvalidKeyError(PushbulletError): pass + class PushError(PushbulletError): pass @@ -11,4 +13,7 @@ class PushError(PushbulletError): class NoEncryptionModuleError(Exception): def __init__(self, msg): super(NoEncryptionModuleError, self).__init__( - "cryptography is required for end-to-end encryption support and could not be imported: " + msg + "\nYou can install it by running 'pip install cryptography'") + "cryptography is required for end-to-end encryption support and could not be imported: " + + msg + + "\nYou can install it by running 'pip install cryptography'" + ) diff --git a/pushbullet/filetype.py b/pushbullet/filetype.py index fc423a5..dfbe2b1 100644 --- a/pushbullet/filetype.py +++ b/pushbullet/filetype.py @@ -1,4 +1,3 @@ - try: from magic import from_buffer as magic_from_buffer except ImportError: @@ -18,8 +17,7 @@ def get_file_type(file, filename): # decode because that results in unicode on python2 def maybe_decode(s): try: - decoded = s.decode('utf-8') - except AttributeError as e: + decoded = s.decode("utf-8") + except AttributeError: decoded = s return decoded - diff --git a/pushbullet/helpers.py b/pushbullet/helpers.py index 2f20c80..2cc568a 100644 --- a/pushbullet/helpers.py +++ b/pushbullet/helpers.py @@ -6,8 +6,10 @@ def use_appropriate_encoding(fn): if sys.version_info[0] < 3: + def _fn(*args, **kwargs): - return fn(*args, **kwargs).encode(sys.stdout.encoding or 'utf-8') + return fn(*args, **kwargs).encode(sys.stdout.encoding or "utf-8") + return _fn else: return fn diff --git a/pushbullet/listener.py b/pushbullet/listener.py index 958bb63..adb61a2 100644 --- a/pushbullet/listener.py +++ b/pushbullet/listener.py @@ -1,25 +1,19 @@ -__author__ = 'Igor Maculan ' +__author__ = "Igor Maculan " +import json import logging import time -import json from threading import Thread -import requests import websocket +log = logging.getLogger("pushbullet.Listener") -log = logging.getLogger('pushbullet.Listener') - -WEBSOCKET_URL = 'wss://stream.pushbullet.com/websocket/' +WEBSOCKET_URL = "wss://stream.pushbullet.com/websocket/" class Listener(Thread, websocket.WebSocketApp): - def __init__(self, account, - on_push=None, - on_error=None, - http_proxy_host=None, - http_proxy_port=None): + def __init__(self, account, on_push=None, on_error=None, http_proxy_host=None, http_proxy_port=None): """ :param api_key: pushbullet Key :param on_push: function that get's called on all pushes @@ -31,11 +25,14 @@ def __init__(self, account, self.on_error = on_error Thread.__init__(self) - websocket.WebSocketApp.__init__(self, WEBSOCKET_URL + self._api_key, - on_open=self.on_open, - on_error=self.on_error, - on_message=self.on_message, - on_close=self.on_close) + websocket.WebSocketApp.__init__( + self, + WEBSOCKET_URL + self._api_key, + on_open=self.on_open, + on_error=self.on_error, + on_message=self.on_message, + on_close=self.on_close, + ) self.connected = False self.last_update = time.time() @@ -64,11 +61,11 @@ def on_open(self): self.last_update = time.time() def on_close(self): - log.debug('Listener closed') + log.debug("Listener closed") self.connected = False def on_message(self, message): - log.debug('Message received:' + message) + log.debug("Message received:" + message) try: json_message = json.loads(message) if json_message["type"] != "nop": @@ -77,10 +74,15 @@ def on_message(self, message): logging.exception(e) def run_forever(self, sockopt=None, sslopt=None, ping_interval=0, ping_timeout=None): - websocket.WebSocketApp.run_forever(self, sockopt=sockopt, sslopt=sslopt, ping_interval=ping_interval, - ping_timeout=ping_timeout, - http_proxy_host=self.http_proxy_host, - http_proxy_port=self.http_proxy_port) + websocket.WebSocketApp.run_forever( + self, + sockopt=sockopt, + sslopt=sslopt, + ping_interval=ping_interval, + ping_timeout=ping_timeout, + http_proxy_host=self.http_proxy_host, + http_proxy_port=self.http_proxy_port, + ) def run(self): self.run_forever() diff --git a/pushbullet/pushbullet.py b/pushbullet/pushbullet.py index 94c861c..99af0d9 100644 --- a/pushbullet/pushbullet.py +++ b/pushbullet/pushbullet.py @@ -1,15 +1,15 @@ -import os import json +import os + import requests -import warnings from requests import ConnectionError -from .device import Device +from ._compat import standard_b64encode from .channel import Channel from .chat import Chat -from .errors import PushbulletError, InvalidKeyError, PushError, NoEncryptionModuleError +from .device import Device +from .errors import InvalidKeyError, NoEncryptionModuleError, PushbulletError, PushError from .filetype import get_file_type -from ._compat import standard_b64encode class Pushbullet(object): @@ -24,7 +24,7 @@ class Pushbullet(object): def __init__(self, api_key, encryption_password=None, proxy=None): self.api_key = api_key - self._json_header = {'Content-Type': 'application/json'} + self._json_header = {"Content-Type": "application/json"} self._session = requests.Session() self._session.auth = (self.api_key, "") @@ -40,9 +40,9 @@ def __init__(self, api_key, encryption_password=None, proxy=None): self._encryption_key = None if encryption_password: try: - from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes + from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC except ImportError as e: raise NoEncryptionModuleError(str(e)) @@ -51,7 +51,7 @@ def __init__(self, api_key, encryption_password=None, proxy=None): length=32, salt=self.user_info["iden"].encode("ASCII"), iterations=30000, - backend=default_backend() + backend=default_backend(), ) self._encryption_key = kdf.derive(encryption_password.encode("UTF-8")) @@ -115,8 +115,7 @@ def _recipient(device=None, chat=None, email=None, channel=None): def new_device(self, nickname, manufacturer=None, model=None, icon="system"): data = {"nickname": nickname, "icon": icon} - data.update({k: v for k, v in - (("model", model), ("manufacturer", manufacturer)) if v is not None}) + data.update({k: v for k, v in (("model", model), ("manufacturer", manufacturer)) if v is not None}) r = self._session.post(self.DEVICES_URL, data=json.dumps(data)) if r.status_code == requests.codes.ok: new_device = Device(self, r.json()) @@ -136,9 +135,16 @@ def new_chat(self, name, email): raise PushbulletError(r.text) def edit_device(self, device, nickname=None, model=None, manufacturer=None, icon=None): - data = {k: v for k, v in - (("nickname", nickname or device.nickname), ("model", model), - ("manufacturer", manufacturer), ("icon", icon)) if v is not None} + data = { + k: v + for k, v in ( + ("nickname", nickname or device.nickname), + ("model", model), + ("manufacturer", manufacturer), + ("icon", icon), + ) + if v is not None + } iden = device.device_iden r = self._session.post("{}/{}".format(self.DEVICES_URL, iden), data=json.dumps(data)) if r.status_code == requests.codes.ok: @@ -195,7 +201,7 @@ def get_channel(self, channel_tag): def get_pushes(self, modified_after=None, limit=None, filter_inactive=True): data = {"modified_after": modified_after, "limit": limit} if filter_inactive: - data['active'] = "true" + data["active"] = "true" pushes_list = [] get_more_pushes = True @@ -205,8 +211,8 @@ def get_pushes(self, modified_after=None, limit=None, filter_inactive=True): raise PushbulletError(r.text) pushes_list += r.json().get("pushes") - if 'cursor' in r.json() and (not limit or len(pushes_list) < limit): - data['cursor'] = r.json()['cursor'] + if "cursor" in r.json() and (not limit or len(pushes_list) < limit): + data["cursor"] = r.json()["cursor"] else: get_more_pushes = False @@ -242,11 +248,13 @@ def upload_file(self, f, file_name, file_type=None): file_url = r.json().get("file_url") upload_url = r.json().get("upload_url") - upload = requests.post(upload_url, data=upload_data, files={"file": f}) + requests.post(upload_url, data=upload_data, files={"file": f}) return {"file_type": file_type, "file_url": file_url, "file_name": file_name} - def push_file(self, file_name, file_url, file_type, body=None, title=None, device=None, chat=None, email=None, channel=None): + def push_file( + self, file_name, file_url, file_type, body=None, title=None, device=None, chat=None, email=None, channel=None + ): data = {"type": "file", "file_type": file_type, "file_url": file_url, "file_name": file_name} if body: data["body"] = body @@ -272,9 +280,9 @@ def _push(self, data): if r.status_code == requests.codes.ok: js = r.json() rate_limit = {} - rate_limit['reset'] = r.headers.get('X-Ratelimit-Reset') - rate_limit['limit'] = r.headers.get('X-Ratelimit-Limit') - rate_limit['remaining'] = r.headers.get('X-Ratelimit-Remaining') + rate_limit["reset"] = r.headers.get("X-Ratelimit-Reset") + rate_limit["limit"] = r.headers.get("X-Ratelimit-Limit") + rate_limit["remaining"] = r.headers.get("X-Ratelimit-Remaining") js["rate_limit"] = rate_limit return js @@ -287,18 +295,15 @@ def push_sms(self, device, number, message): "push": { "type": "messaging_extension_reply", "package_name": "com.pushbullet.android", - "source_user_iden": self.user_info['iden'], + "source_user_iden": self.user_info["iden"], "target_device_iden": device.device_iden, "conversation_iden": number, - "message": message - } + "message": message, + }, } if self._encryption_key: - data["push"] = { - "ciphertext": self._encrypt_data(data["push"]), - "encrypted": True - } + data["push"] = {"ciphertext": self._encrypt_data(data["push"]), "encrypted": True} r = self._session.post(self.EPHEMERALS_URL, data=json.dumps(data)) if r.status_code == requests.codes.ok: @@ -308,15 +313,11 @@ def push_sms(self, device, number, message): def _encrypt_data(self, data): assert self._encryption_key - from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes iv = os.urandom(12) - encryptor = Cipher( - algorithms.AES(self._encryption_key), - modes.GCM(iv), - backend=default_backend() - ).encryptor() + encryptor = Cipher(algorithms.AES(self._encryption_key), modes.GCM(iv), backend=default_backend()).encryptor() ciphertext = encryptor.update(json.dumps(data).encode("UTF-8")) + encryptor.finalize() ciphertext = b"1" + encryptor.tag + iv + ciphertext @@ -325,10 +326,11 @@ def _encrypt_data(self, data): def _decrypt_data(self, data): assert self._encryption_key - from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - from cryptography.hazmat.backends import default_backend from binascii import a2b_base64 + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes + key = self._encryption_key encoded_message = a2b_base64(data) @@ -340,15 +342,13 @@ def _decrypt_data(self, data): if version != b"1": raise Exception("Invalid Version") - cipher = Cipher(algorithms.AES(key), - modes.GCM(initialization_vector, tag), - backend=default_backend()) + cipher = Cipher(algorithms.AES(key), modes.GCM(initialization_vector, tag), backend=default_backend()) decryptor = cipher.decryptor() decrypted = decryptor.update(encrypted_message) + decryptor.finalize() decrypted = decrypted.decode() - return(decrypted) + return decrypted def refresh(self): self._load_devices() diff --git a/pyproject.toml b/pyproject.toml index beb7f5a..19b465e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,12 @@ +[tool.black] +line-length = 120 +target-version = ["py38"] + +[tool.isort] +profile = "black" +src_paths = ["src", "tests", "exa,ple"] +line_length = 120 +known_first_party = "pushbullet" [tool.coverage.run] branch = true diff --git a/setup.cfg b/setup.cfg index 8951ea8..58b390b 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,8 @@ [wheel] universal = 1 +[flake8] +ignore = E501, W503 +max-line-length = 120 +max-complexity = 18 +select = B,C,E,F,W,T4,B9 diff --git a/setup.py b/setup.py index 52a4c58..dc17980 100755 --- a/setup.py +++ b/setup.py @@ -5,17 +5,13 @@ from setuptools import setup with open("./pushbullet/__version__.py") as version_file: - version = version_file.read().split("\"")[1] + version = version_file.read().split('"')[1] -if sys.argv[-1] == 'publish': - os.system('python setup.py sdist upload') +if sys.argv[-1] == "publish": + os.system("python setup.py sdist upload") sys.exit() -install_reqs = [ - "requests>=1.0.0", - "python-magic", - "websocket-client>=0.53.0" -] +install_reqs = ["requests>=1.0.0", "python-magic", "websocket-client>=0.53.0"] def read(fname): @@ -25,18 +21,19 @@ def read(fname): except IOError: return "" + setup( - name = "pushbullet.py", - version = version, - author = "Richard B", - author_email = "pypi@richardb.me", - description = ("A simple python client for pushbullet.com"), - license = "MIT", - keywords = "push android pushbullet notification", - url = "https://github.com/rbrcsk/pushbullet.py", + name="pushbullet.py", + version=version, + author="Richard B", + author_email="pypi@richardb.me", + description=("A simple python client for pushbullet.com"), + license="MIT", + keywords="push android pushbullet notification", + url="https://github.com/rbrcsk/pushbullet.py", download_url="https://github.com/rbrcsk/pushbullet.py/tarball/" + version, - packages=['pushbullet'], - long_description=read('readme.rst'), + packages=["pushbullet"], + long_description=read("readme.rst"), classifiers=[ "Development Status :: 4 - Beta", "Intended Audience :: Developers", @@ -47,7 +44,7 @@ def read(fname): "Programming Language :: Python :: 3", "Programming Language :: Python :: 3.2", "Topic :: Software Development :: Libraries :: Python Modules", - "Topic :: Utilities" + "Topic :: Utilities", ], - install_requires=install_reqs + install_requires=install_reqs, ) diff --git a/tests/test_auth.py b/tests/test_auth.py index 40a84b4..d86b0b7 100644 --- a/tests/test_auth.py +++ b/tests/test_auth.py @@ -1,18 +1,17 @@ import os import pytest + import pushbullet API_KEY = os.environ["PUSHBULLET_API_KEY"] def test_auth_fail(): - with pytest.raises(pushbullet.InvalidKeyError) as exinfo: - pb = pushbullet.Pushbullet("faultykey") + with pytest.raises(pushbullet.InvalidKeyError): + pushbullet.Pushbullet("faultykey") def test_auth_success(): pb = pushbullet.Pushbullet(API_KEY) - assert pb.user_info["name"] == os.environ.get( - "PUSHBULLET_NAME", "Pushbullet Tester" - ) + assert pb.user_info["name"] == os.environ.get("PUSHBULLET_NAME", "Pushbullet Tester") diff --git a/tests/test_channels.py b/tests/test_channels.py index 66188d2..48e678e 100644 --- a/tests/test_channels.py +++ b/tests/test_channels.py @@ -31,10 +31,7 @@ def test_encoding_support(self): print(self.channel) def test_repr(self): - assert ( - repr(self.channel) - == "Channel(name: 'test channel' tag: '%s')" % self.channel_tag - ) + assert repr(self.channel) == "Channel(name: 'test channel' tag: '%s')" % self.channel_tag def test_push_note(self): title = "test title" diff --git a/tests/test_client.py b/tests/test_client.py index 8b3cacc..39026a3 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,15 +1,16 @@ -from pushbullet import PushBullet import pytest +from pushbullet import PushBullet + try: - from unittest.mock import patch, Mock + from unittest.mock import Mock, patch except ImportError: from mock import patch, Mock -from .helpers import mock_refresh - from pushbullet.errors import InvalidKeyError, PushbulletError +from .helpers import mock_refresh + @patch.object(PushBullet, "refresh", mock_refresh) def test_get_data_ok(): diff --git a/tests/test_client_init.py b/tests/test_client_init.py index 100cb5f..005a27b 100644 --- a/tests/test_client_init.py +++ b/tests/test_client_init.py @@ -1,20 +1,20 @@ import sys +from binascii import a2b_base64 + import pytest from requests import ConnectionError -from binascii import a2b_base64 from pushbullet import PushBullet from pushbullet.errors import NoEncryptionModuleError try: - from unittest.mock import patch, Mock + from unittest.mock import Mock, patch except ImportError: from mock import patch, Mock +from .fixtures import channels_list_response, chats_list_response, devices_list_response from .helpers import mock_refresh -from .fixtures import devices_list_response, chats_list_response, channels_list_response - @patch.object(PushBullet, "refresh") def test_proxy_is_applied(pb_refresh): @@ -49,9 +49,7 @@ def test_crypto_found(): pb = PushBullet("apikey", encryption_password="hunter2") - assert pb._encryption_key == a2b_base64( - "ZFKZG50hJs5DrGKWf8fBQ6CSLB1LtTNw+xwwT2ZBl9g=" - ) + assert pb._encryption_key == a2b_base64("ZFKZG50hJs5DrGKWf8fBQ6CSLB1LtTNw+xwwT2ZBl9g=") @patch.object(PushBullet, "refresh") diff --git a/tests/test_client_operations.py b/tests/test_client_operations.py index 43e245c..c19bbab 100644 --- a/tests/test_client_operations.py +++ b/tests/test_client_operations.py @@ -1,18 +1,20 @@ import pytest + from pushbullet import PushBullet try: - from unittest.mock import patch, Mock + from unittest.mock import Mock, patch except ImportError: from mock import patch, Mock -from .helpers import mock_refresh - -from pushbullet.errors import PushbulletError import json -from pushbullet.device import Device -from pushbullet.chat import Chat + from pushbullet.channel import Channel +from pushbullet.chat import Chat +from pushbullet.device import Device +from pushbullet.errors import PushbulletError + +from .helpers import mock_refresh @patch.object(PushBullet, "refresh", mock_refresh) @@ -393,9 +395,7 @@ def test_dismiss_push(): pb.dismiss_push("123") - session.post.assert_called_once_with( - pb.PUSH_URL + "/123", data=json.dumps({"dismissed": True}) - ) + session.post.assert_called_once_with(pb.PUSH_URL + "/123", data=json.dumps({"dismissed": True})) @patch.object(PushBullet, "refresh", mock_refresh) diff --git a/tests/test_client_push.py b/tests/test_client_push.py index 60b7ffa..0040593 100644 --- a/tests/test_client_push.py +++ b/tests/test_client_push.py @@ -1,14 +1,17 @@ -from pushbullet import PushBullet import pytest +from pushbullet import PushBullet + try: - from unittest.mock import patch, Mock + from unittest.mock import Mock, patch except ImportError: from mock import patch, Mock -from .helpers import mock_refresh import json -from pushbullet.errors import PushError, PushbulletError + +from pushbullet.errors import PushbulletError, PushError + +from .helpers import mock_refresh @patch.object(PushBullet, "refresh") @@ -349,13 +352,11 @@ def test_get_pushes_no_cursor(): pushes = pb.get_pushes(filter_inactive=False) assert len(pushes) == 0 - session.get.assert_called_once_with( - pb.PUSH_URL, params={"modified_after": None, "limit": None, "active": False} - ) + session.get.assert_called_once_with(pb.PUSH_URL, params={"modified_after": None, "limit": None}) @patch.object(PushBullet, "refresh", mock_refresh) -def test_get_pushes_no_cursor(): +def test_get_pushes_with_cursor(): response1 = Mock() response1.status_code = 200 response1.json.return_value = {"pushes": ["push1", "push2"], "cursor": "cursor1"} diff --git a/tests/test_e2e.py b/tests/test_e2e.py index ee22d9d..36cb15e 100644 --- a/tests/test_e2e.py +++ b/tests/test_e2e.py @@ -1,9 +1,10 @@ -import os import json +import os +from binascii import a2b_base64 import pytest + from pushbullet import PushBullet -from binascii import a2b_base64 try: from unittest.mock import patch diff --git a/tests/test_listener.py b/tests/test_listener.py index 2872241..eaafcbb 100644 --- a/tests/test_listener.py +++ b/tests/test_listener.py @@ -1,4 +1,4 @@ -from pushbullet import PushBullet, Listener +from pushbullet import Listener, PushBullet try: from unittest.mock import patch