From b87fd409922f29b0bbd31259b77f1d0608cea497 Mon Sep 17 00:00:00 2001 From: Thomas Hahn Date: Tue, 25 Jun 2024 23:50:25 +0200 Subject: [PATCH] Add RateLimitedRestConnection with token bucket Add AbstanceType Enum --- src/homematicip/configuration/config.py | 7 +++ src/homematicip/connection/buckets.py | 58 +++++++++++++++++++ .../rate_limited_rest_connection.py | 21 +++++++ src/homematicip/model/enums.py | 8 +++ src/homematicip/model/hmip_base.py | 1 + src/homematicip/runner.py | 15 ++++- tests/connection/test_buckets.py | 46 +++++++++++++++ .../test_rate_limited_rest_connection.py | 37 ++++++++++++ 8 files changed, 191 insertions(+), 2 deletions(-) create mode 100644 src/homematicip/connection/buckets.py create mode 100644 src/homematicip/connection/rate_limited_rest_connection.py create mode 100644 tests/connection/test_buckets.py create mode 100644 tests/connection/test_rate_limited_rest_connection.py diff --git a/src/homematicip/configuration/config.py b/src/homematicip/configuration/config.py index 9f9c6416..465de5cb 100644 --- a/src/homematicip/configuration/config.py +++ b/src/homematicip/configuration/config.py @@ -11,6 +11,13 @@ class PersistentConfig: auth_token: str = None log_file: str = "" + # Use the normal RestConnection without limiting instead of the RateLimitedRestConnection + disable_rate_limit: bool = False + # Max number of tokens in the bucket + rate_limit_tokens: int = 10 + # Fill rate of the bucket. Every x second a new token + rate_limit_fill_rate: int = 3 + @dataclass class Config(PersistentConfig): diff --git a/src/homematicip/connection/buckets.py b/src/homematicip/connection/buckets.py new file mode 100644 index 00000000..2aaf5d92 --- /dev/null +++ b/src/homematicip/connection/buckets.py @@ -0,0 +1,58 @@ +import asyncio +import time + + +class Buckets: + """Class to manage the rate limiting of the HomematicIP Cloud API. + The implementation is based on the token bucket algorithm.""" + + def __init__(self, tokens, fill_rate): + """Initialize the Buckets with a token bucket algorithm. + + :param tokens: The number of tokens in the bucket. + :param fill_rate: The fill rate of the bucket in tokens every x seconds.""" + self.capacity = tokens + self._tokens = tokens + self.fill_rate = fill_rate + self.timestamp = time.time() + self.lock = asyncio.Lock() + + async def take(self, tokens=1) -> bool: + """Get a single token from the bucket. Return True if successful, False otherwise. + + :param tokens: The number of tokens to take from the bucket. Default is 1. + :return: True if successful, False otherwise. + """ + async with self.lock: + if tokens <= await self.tokens(): + self._tokens -= tokens + return True + return False + + async def wait_and_take(self, timeout=120, tokens=1) -> bool: + """Wait until a token is available and then take it. Return True if successful, False otherwise. + + :param timeout: The maximum time to wait for a token in seconds. Default is 120 seconds. + :param tokens: The number of tokens to take from the bucket. Default is 1. + :return: True if successful, False otherwise. + """ + start_time = time.time() + while True: + if tokens <= await self.tokens(): + self._tokens -= tokens + return True + + if time.time() - start_time > timeout: + raise asyncio.TimeoutError("Timeout while waiting for token.") + + await asyncio.sleep(1) # Wait for a second before checking again + + async def tokens(self): + """Get the number of tokens in the bucket. Refill the bucket if necessary.""" + if self._tokens < self.capacity: + now = time.time() + delta = int((now - self.timestamp) / self.fill_rate) + if delta > 0: + self._tokens = min(self.capacity, self._tokens + delta) + self.timestamp = now + return self._tokens diff --git a/src/homematicip/connection/rate_limited_rest_connection.py b/src/homematicip/connection/rate_limited_rest_connection.py new file mode 100644 index 00000000..eb249225 --- /dev/null +++ b/src/homematicip/connection/rate_limited_rest_connection.py @@ -0,0 +1,21 @@ +import json + +from homematicip.connection.buckets import Buckets +from homematicip.connection.rest_connection import RestConnection, ConnectionContext, RestResult + + +class RateLimitedRestConnection(RestConnection): + + def __init__(self, context: ConnectionContext, tokens: int = 10, fill_rate: int = 8): + """Initialize the RateLimitedRestConnection with a token bucket algorithm. + + :param context: The connection context. + :param tokens: The number of tokens in the bucket. Default is 10. + :param fill_rate: The fill rate of the bucket in tokens per second. Default is 8.""" + super().__init__(context) + self._buckets = Buckets(tokens=tokens, fill_rate=fill_rate) + + async def async_post(self, url: str, data: json = None, custom_header: dict = None) -> RestResult: + """Post data to the HomematicIP Cloud API.""" + await self._buckets.wait_and_take() + return await super().async_post(url, data, custom_header) \ No newline at end of file diff --git a/src/homematicip/model/enums.py b/src/homematicip/model/enums.py index 6b24ceab..c1cc13bc 100644 --- a/src/homematicip/model/enums.py +++ b/src/homematicip/model/enums.py @@ -223,6 +223,14 @@ class HeatingValveType(Enum): NORMALLY_OPEN = "NORMALLY_OPEN" +class AbsenceType(Enum): + NOT_ABSENT = "NOT_ABSENT" + PERIOD = "PERIOD" + PERMANENT = "PERMANENT" + VACATION = "VACATION" + PARTY = "PARTY" + + class ValveState(Enum): STATE_NOT_AVAILABLE = "STATE_NOT_AVAILABLE" RUN_TO_START = "RUN_TO_START" diff --git a/src/homematicip/model/hmip_base.py b/src/homematicip/model/hmip_base.py index 1c3784ff..1df6a2e9 100644 --- a/src/homematicip/model/hmip_base.py +++ b/src/homematicip/model/hmip_base.py @@ -73,3 +73,4 @@ def __eq__(self, other): if isinstance(other, self.__class__): return self.__dict__ == other.__dict__ return False + diff --git a/src/homematicip/runner.py b/src/homematicip/runner.py index 36c4a214..4994ccda 100644 --- a/src/homematicip/runner.py +++ b/src/homematicip/runner.py @@ -6,6 +6,7 @@ from homematicip.configuration.config import Config from homematicip.configuration.config_io import ConfigIO +from homematicip.connection.rate_limited_rest_connection import RateLimitedRestConnection from homematicip.connection.rest_connection import ( ClientCharacteristicsBuilder, ConnectionContext, @@ -94,7 +95,7 @@ def _initialize_connection_context(self) -> ConnectionContext: async def async_initialize_runner(self): self._connection_context = self._initialize_connection_context() - self._rest_connection = RestConnection(self._connection_context) + self._rest_connection = self._initialize_rest_connection() # download current configuration and build model current_configuration = await self.async_get_current_state() @@ -103,7 +104,17 @@ async def async_initialize_runner(self): async def async_initialize_runner_without_init_model(self): """Initialize just the context and connection. Use async_get_current_state to get the current state.""" self._connection_context = self._initialize_connection_context() - self._rest_connection = RestConnection(self._connection_context) + self._rest_connection = self._initialize_rest_connection() + + def _initialize_rest_connection(self) -> RestConnection: + """Initialize the rest connection based on configuration. + + :return: The rest connection instance""" + if self.config.disable_rate_limit: + return RestConnection(self._connection_context) + + return RateLimitedRestConnection(self._connection_context, self.config.rate_limit_tokens, + self.config.rate_limit_fill_rate) async def async_listening_for_updates(self): """Start listening for updates from HomematicIP Cloud. This method will not return.""" diff --git a/tests/connection/test_buckets.py b/tests/connection/test_buckets.py new file mode 100644 index 00000000..f94cee45 --- /dev/null +++ b/tests/connection/test_buckets.py @@ -0,0 +1,46 @@ +import asyncio + +import pytest + +from homematicip.connection.buckets import Buckets + + +@pytest.mark.asyncio +async def test_get_bucket(): + """Testing the get bucket method.""" + bucket = Buckets(2, 10) + + got_1st_token = await bucket.take() + got_2nd_token = await bucket.take() + got_3rd_token = await bucket.take() + + assert got_1st_token is True + assert got_2nd_token is True + assert got_3rd_token is False + + +async def test_get_bucket_with_timeout(): + """Testing the get bucket method with timeout.""" + bucket = Buckets(1, 100) + + got_1st_token = await bucket.take() + with pytest.raises(asyncio.TimeoutError): + await bucket.wait_and_take(timeout=1) + + +async def test_get_bucket_and_wait_for_new(): + """Testing the get bucket method and waiting for new tokens.""" + bucket = Buckets(1, 1) + + got_1st_token = await bucket.take() + got_2nd_token = await bucket.wait_and_take() + + assert got_1st_token is True + assert got_2nd_token is True + +def test_initial_tokens(): + """Testing the initial tokens of the bucket.""" + bucket = Buckets(2, 10) + assert bucket._tokens == 2 + assert bucket.capacity == 2 + assert bucket.fill_rate == 10 diff --git a/tests/connection/test_rate_limited_rest_connection.py b/tests/connection/test_rate_limited_rest_connection.py new file mode 100644 index 00000000..f1034b7f --- /dev/null +++ b/tests/connection/test_rate_limited_rest_connection.py @@ -0,0 +1,37 @@ +import httpx + +from homematicip.connection.rate_limited_rest_connection import RateLimitedRestConnection +from homematicip.connection.rest_connection import ConnectionContext + + +async def test_send_single_request(mocker): + response = mocker.Mock(spec=httpx.Response) + response.status_code = 200 + patched = mocker.patch("homematicip.connection.rest_connection.httpx.AsyncClient.post") + patched.return_value = response + + context = ConnectionContext(rest_url="http://asdf") + conn = RateLimitedRestConnection(context, 1, 1) + + result = await conn.async_post("url", {"a": "b"}, {"c": "d"}) + + assert patched.called + assert patched.call_args[0][0] == "http://asdf/hmip/url" + assert patched.call_args[1] == {"json": {"a": "b"}, "headers": {"c": "d"}} + assert result.status == 200 + + +async def test_send_and_wait_requests(mocker): + response = mocker.Mock(spec=httpx.Response) + response.status_code = 200 + patched = mocker.patch("homematicip.connection.rest_connection.httpx.AsyncClient.post") + patched.return_value = response + + context = ConnectionContext(rest_url="http://asdf") + conn = RateLimitedRestConnection(context, 1, 1) + + await conn.async_post("url", {"a": "b"}, {"c": "d"}) + await conn.async_post("url", {"a": "b"}, {"c": "d"}) + await conn.async_post("url", {"a": "b"}, {"c": "d"}) + + assert patched.call_count == 3 \ No newline at end of file