From 06f2e5d5800755d9066bd1ac4ae1d7237c17e89b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Reni=C3=A9?= Date: Mon, 16 Dec 2024 16:15:30 +0100 Subject: [PATCH] client: add wait() method to poll for operation result (#66) Poll interval is handled internally and adapts to job run time. Optionally, the user can set a max wait time to stop polling even if the operation is still pending. Supersedes #64 --- docs/changes.rst | 2 + docs/v2.rst | 9 +- exoscale/api/v2.py | 211 ++++++++++++++++++++++++++++++++----------- tests/test_client.py | 142 ++++++++++++++++++++++++++++- 4 files changed, 304 insertions(+), 60 deletions(-) diff --git a/docs/changes.rst b/docs/changes.rst index 414f4ee..23760ea 100644 --- a/docs/changes.rst +++ b/docs/changes.rst @@ -8,6 +8,8 @@ Changelog or 5xx HTTP statuses. * Bump python version requirement to 3.8 due to the use of the walrus operator. +* Add ``Client.wait(operation_id)`` to poll for the result of an asynchronous + API operation. 0.9.1 (2024-04-29) ~~~~~~~~~~~~~~~~~~ diff --git a/docs/v2.rst b/docs/v2.rst index 4b3be47..cd34e8e 100644 --- a/docs/v2.rst +++ b/docs/v2.rst @@ -12,5 +12,10 @@ V2 API bindings build/html/v2.html. .. automodule:: exoscale.api.v2 - :members: Client - :undoc-members: + :members: + :exclude-members: Client + + +.. autoclass:: exoscale.api.v2.Client + :members: + :inherited-members: wait diff --git a/exoscale/api/v2.py b/exoscale/api/v2.py index 0f2eac7..1b933e1 100644 --- a/exoscale/api/v2.py +++ b/exoscale/api/v2.py @@ -19,24 +19,43 @@ >>> c.list_instances() {'instances': []} + Waiting for an asynchronous operation to complete: + + >>> from exoscale.api.v2 import Client + >>> c = Client("api-key", "api-secret", zone="ch-gva-2") + >>> version = c.list_sks_cluster_versions()["sks-cluster-versions"][0] + >>> operation = c.create_sks_cluster( + ... cni="cilium", + ... name="my-cluster", + ... level="starter", + ... version=version, + ... ) + >>> c.wait(operation["id"]) + {'id': 'e2047130-b86e-11ef-83b3-0d8312b2c2d7', + 'state': 'success', + 'reference': { + 'id': '8561ee34-09f0-42da-a765-abde807f944b', + 'link': '/v2/sks-cluster/8561ee34-09f0-42da-a765-abde807f944b', + 'command': 'get-sks-cluster'}} """ import copy import json +import time from itertools import chain from pathlib import Path from exoscale_auth import ExoscaleV2Auth + +import requests + from .exceptions import ( + ExoscaleAPIAuthException, ExoscaleAPIClientException, ExoscaleAPIServerException, - ExoscaleAPIAuthException, ) -import requests - - with open(Path(__file__).parent.parent / "openapi.json", "r") as f: API_SPEC = json.load(f) @@ -96,6 +115,91 @@ def _get_ref(path): return payload +_type_translations = { + "string": "str", + "integer": "int", + "object": "dict", + "array": "list", + "boolean": "bool", + "number": "float", +} + + +def _return_docstring(operation): + [status_code] = operation["responses"].keys() + [ctype] = operation["responses"][status_code]["content"].keys() + return_schema = operation["responses"][status_code]["content"][ctype][ + "schema" + ] + if "$ref" in return_schema: + ref = _get_ref(return_schema["$ref"]) + if ( + "properties" in ref + and ref["type"] == "object" + and "description" in ref + ): + body = {} + for name, prop in ref["properties"].items(): + if "$ref" in prop: + _ref = _get_ref(prop["$ref"]) + item = _ref + else: + item = prop + typ = _type_translations[item["type"]] + desc = prop.get("description") + if "enum" in item: + choices = "``, ``".join(map(repr, item["enum"])) + desc += f". Values are ``{choices}``" + suffix = f": {desc}" if desc else "" + normalized_name = name.replace("-", "_") + body[normalized_name] = ( + f"**{normalized_name}** ({typ}){suffix}." + ) + + doc = ( + f"dict: {ref['description']}. A dictionnary with the following keys:" + + "\n\n * ".join([""] + list(body.values())) + ) + elif "description" in ref: + doc = f'{_type_translations[ref["type"]]}: {ref["description"]}.' + else: + doc = _type_translations[ref["type"]] + else: + doc = _type_translations[return_schema["type"]] + return doc + + +def _poll_interval(run_time): + """ + Returns the wait interval before next poll, given the current run time of a job. + We poll + - every 3 seconds for the first 30 seconds + - then increase linearly to reach 1 minute at 15 minutes of run time + - then every minute + """ + # y = a * x + b. Solve a and b for: + # 60 = a * 900 + b + # 3 = a * 30 + b + a = 57 / 870 + b = 3 - 30 * a + min_wait = 3 + max_wait = 60 + interval = a * run_time + b + interval = max(min_wait, interval) + interval = min(max_wait, interval) + return interval + + +def _time(): + return time.time() + + +def _sleep(start_time): + run_time = _time() - start_time + interval = _poll_interval(run_time) + return time.sleep(interval) + + class BaseClient: def __init__(self, key, secret, url=None, **kwargs): if url is None: @@ -123,6 +227,7 @@ def __init__(self, key, secret, url=None, **kwargs): session.auth = ExoscaleV2Auth(key, secret) self.session = session self.key = key + self.WAIT_ABORT_ERRORS_COUNT = 5 def __repr__(self): return ( @@ -178,59 +283,55 @@ def _call_operation(self, operation_id, parameters=None, body=None): return response.json() - -_type_translations = { - "string": "str", - "integer": "int", - "object": "dict", - "array": "list", - "boolean": "bool", - "number": "float", -} - - -def _return_docstring(operation): - [status_code] = operation["responses"].keys() - [ctype] = operation["responses"][status_code]["content"].keys() - return_schema = operation["responses"][status_code]["content"][ctype][ - "schema" - ] - if "$ref" in return_schema: - ref = _get_ref(return_schema["$ref"]) - if ( - "properties" in ref - and ref["type"] == "object" - and "description" in ref - ): - body = {} - for name, prop in ref["properties"].items(): - if "$ref" in prop: - _ref = _get_ref(prop["$ref"]) - item = _ref - else: - item = prop - typ = _type_translations[item["type"]] - desc = prop.get("description") - if "enum" in item: - choices = "``, ``".join(map(repr, item["enum"])) - desc += f". Values are ``{choices}``" - suffix = f": {desc}" if desc else "" - normalized_name = name.replace("-", "_") - body[normalized_name] = ( - f"**{normalized_name}** ({typ}){suffix}." + def wait(self, operation_id: str, max_wait_time: int = None): + """ + Wait for completion of an asynchronous operation. + + Args: + operation_id (str) + max_wait_time (int): When set, stop waiting after this time in + seconds. Defaults to ``None``, which waits until operation + completion. + + Returns: + {ret} + """ + start_time = _time() + subsequent_errors = 0 + while True: + try: + result = self.get_operation(id=operation_id) + subsequent_errors = 0 + except ExoscaleAPIServerException as e: + subsequent_errors += 1 + if subsequent_errors >= self.WAIT_ABORT_ERRORS_COUNT: + raise ExoscaleAPIServerException( + "Server error while polling operation" + ) from e + _sleep(start_time) + continue + state = result["state"] + if state == "success": + return result + elif state in {"failure", "timeout"}: + raise ExoscaleAPIServerException( + f"Operation error: {state}, {result['reason']}" + ) + elif state == "pending": + run_time = _time() - start_time + if max_wait_time is not None and run_time > max_wait_time: + raise ExoscaleAPIClientException( + "Operation max wait time reached" + ) + _sleep(start_time) + else: + raise ExoscaleAPIServerException( + f"Invalid operation state: {state}" ) - doc = ( - f"dict: {ref['description']}. A dictionnary with the following keys:" - + "\n\n * ".join([""] + list(body.values())) - ) - elif "description" in ref: - doc = f'{_type_translations[ref["type"]]}: {ref["description"]}.' - else: - doc = _type_translations[ref["type"]] - else: - doc = _type_translations[return_schema["type"]] - return doc + wait.__doc__ = wait.__doc__.format( + ret=_return_docstring(BY_OPERATION["get-operation"]["operation"]) + ) def _args_docstring(parameters, body): diff --git a/tests/test_client.py b/tests/test_client.py index cc4490b..04a8517 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,10 +1,15 @@ -import pytest -from exoscale.api.v2 import Client +import json + +from unittest.mock import patch + from exoscale.api.exceptions import ( + ExoscaleAPIAuthException, ExoscaleAPIClientException, ExoscaleAPIServerException, - ExoscaleAPIAuthException, ) +from exoscale.api.v2 import Client, _poll_interval + +import pytest def test_client_creation(): @@ -71,5 +76,136 @@ def test_client_error_handling(requests_mock): ) +def test_wait_interval(): + assert _poll_interval(25) == 3 + assert 3 < _poll_interval(33) < 4 + assert 8 < _poll_interval(120) < 9 + assert 40 < _poll_interval(600) < 41 + assert _poll_interval(1000) == 60 + assert _poll_interval(999999) == 60 + + +def test_operation_poll_failure(requests_mock): + requests_mock.get( + "https://api-ch-gva-2.exoscale.com/v2/operation/e2047130-b86e-11ef-83b3-0d8312b2c2d7", # noqa + status_code=200, + text='{"id": "4c5547c0-b870-11ef-83b3-0d8312b2c2d7", "state": "failure", "reason": "unknown", "reference": {"id": "97d7426f-8b25-4591-91d5-4a19e9a1d61a", "link": "/v2/sks-cluster/97d7426f-8b25-4591-91d5-4a19e9a1d61a", "command": "get-sks-cluster"}}', # noqa + ) + + client = Client(key="EXOtest", secret="sdsd") + try: + client.wait(operation_id="e2047130-b86e-11ef-83b3-0d8312b2c2d7") + except ExoscaleAPIServerException as e: + assert "Operation error: failure" in str(e) + else: + raise AssertionError("exception not raised") + + +def test_operation_invalid_state(requests_mock): + requests_mock.get( + "https://api-ch-gva-2.exoscale.com/v2/operation/e2047130-b86e-11ef-83b3-0d8312b2c2d7", # noqa + status_code=200, + text='{"id": "4c5547c0-b870-11ef-83b3-0d8312b2c2d7", "state": "weird", "reference": {"id": "97d7426f-8b25-4591-91d5-4a19e9a1d61a", "link": "/v2/sks-cluster/97d7426f-8b25-4591-91d5-4a19e9a1d61a", "command": "get-sks-cluster"}}', # noqa + ) + + client = Client(key="EXOtest", secret="sdsd") + try: + client.wait(operation_id="e2047130-b86e-11ef-83b3-0d8312b2c2d7") + except ExoscaleAPIServerException as e: + assert "Invalid operation state: weird" in str(e) + else: + raise AssertionError("exception not raised") + + +def _mock_poll_response(poll_counts, status_code=200, result="success"): + return [ + { + "status_code": status_code, + "text": '{"id": "4c5547c0-b870-11ef-83b3-0d8312b2c2d7", "state": "pending", "reference": {"id": "97d7426f-8b25-4591-91d5-4a19e9a1d61a", "link": "/v2/sks-cluster/97d7426f-8b25-4591-91d5-4a19e9a1d61a", "command": "get-sks-cluster"}}', # noqa + } + ] * (poll_counts - 1) + [ + { + "status_code": status_code, + "text": json.dumps( + { + "id": "4c5547c0-b870-11ef-83b3-0d8312b2c2d7", + "state": result, + "reason": "some reason", + "reference": { + "id": "97d7426f-8b25-4591-91d5-4a19e9a1d61a", + "link": "/v2/sks-cluster/97d7426f-8b25-4591-91d5-4a19e9a1d61a", # noqa + "command": "get-sks-cluster", + }, + } + ), + } + ] + + +def test_wait_time_success(requests_mock): + requests_mock.get( + "https://api-ch-gva-2.exoscale.com/v2/operation/e2047130-b86e-11ef-83b3-0d8312b2c2d7", # noqa + _mock_poll_response(3), + ) + with patch( + "exoscale.api.v2._time", + side_effect=[ + 0, # start of poll + 1, # duration of first loop: 1s + 5, # duration of second loop: 4s + ], + ) as time, patch("exoscale.api.v2._sleep") as sleep: + client = Client(key="EXOtest", secret="sdsd") + client.wait(operation_id="e2047130-b86e-11ef-83b3-0d8312b2c2d7") + assert len(time.call_args_list) == 3 + assert len(sleep.call_args_list) == 2 + + +def test_wait_time_poll_errors(requests_mock): + requests_mock.get( + "https://api-ch-gva-2.exoscale.com/v2/operation/e2047130-b86e-11ef-83b3-0d8312b2c2d7", # noqa + _mock_poll_response(6, status_code=500), + ) + with patch( + "exoscale.api.v2._time", + side_effect=[ + 0, # start of poll + ], + ) as time, patch("exoscale.api.v2._sleep") as sleep: + client = Client(key="EXOtest", secret="sdsd") + try: + client.wait(operation_id="e2047130-b86e-11ef-83b3-0d8312b2c2d7") + except ExoscaleAPIServerException: + pass + else: + raise AssertionError("Exception not raised") + assert len(time.call_args_list) == 1 + assert len(sleep.call_args_list) == 4 + + +def test_wait_time_failure(requests_mock): + requests_mock.get( + "https://api-ch-gva-2.exoscale.com/v2/operation/e2047130-b86e-11ef-83b3-0d8312b2c2d7", # noqa + _mock_poll_response(3, result="failure"), + ) + with patch( + "exoscale.api.v2._time", + side_effect=[ + 0, # start of poll + 1, # duration of first loop: 1s + 5, # duration of second loop: 4s + ], + ) as time, patch("exoscale.api.v2._sleep") as sleep: + client = Client(key="EXOtest", secret="sdsd") + try: + client.wait(operation_id="e2047130-b86e-11ef-83b3-0d8312b2c2d7") + except ExoscaleAPIServerException as e: + assert "Operation error" in str(e) + else: + raise AssertionError("Exception not raised") + assert len(time.call_args_list) == 3 + assert len(sleep.call_args_list) == 2 + + if __name__ == "__main__": pytest.main()